comparison sat/memory/sqlite.py @ 3013:860c550028d6

memory (sqlite): properly wait for messages to be writen in database: the deferreds of the queries writing message to databases where not gathered and returned, so the caller was not waiting for them to continue its workflow. This was resulting in messages not always written when database was read just after the write (a case common with MUC implementation), and message was appearing empty when sent to bridge. fix 328
author Goffi <goffi@goffi.org>
date Thu, 18 Jul 2019 21:58:34 +0200
parents c13333fcde5e
children ab2696e34d29
comparison
equal deleted inserted replaced
3012:2224fbbd45dd 3013:860c550028d6
419 419
420 def _addToHistoryCb(self, __, data): 420 def _addToHistoryCb(self, __, data):
421 # Message metadata were successfuly added to history 421 # Message metadata were successfuly added to history
422 # now we can add message and subject 422 # now we can add message and subject
423 uid = data['uid'] 423 uid = data['uid']
424 d_list = []
424 for key in ('message', 'subject'): 425 for key in ('message', 'subject'):
425 for lang, value in data[key].iteritems(): 426 for lang, value in data[key].iteritems():
426 d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key), 427 d = self.dbpool.runQuery(
428 "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)"
429 .format(key=key),
427 (uid, value, lang or None)) 430 (uid, value, lang or None))
428 d.addErrback(lambda __: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format( 431 d.addErrback(lambda __: log.error(
429 key=key, uid=uid, lang=lang, value=value)))) 432 _(u"Can't save following {key} in history (uid: {uid}, lang:{lang}):"
433 u" {value}").format(
434 key=key, uid=uid, lang=lang, value=value)))
435 d_list.append(d)
430 try: 436 try:
431 thread = data['extra']['thread'] 437 thread = data['extra']['thread']
432 except KeyError: 438 except KeyError:
433 pass 439 pass
434 else: 440 else:
435 thread_parent = data['extra'].get('thread_parent') 441 thread_parent = data['extra'].get('thread_parent')
436 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", 442 d = self.dbpool.runQuery(
443 "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)",
437 (uid, thread, thread_parent)) 444 (uid, thread, thread_parent))
438 d.addErrback(lambda __: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( 445 d.addErrback(lambda __: log.error(
439 uid=uid, thread=thread, parent=thread_parent)))) 446 _(u"Can't save following thread in history (uid: {uid}): thread: "
447 u"{thread}), parent:{parent}").format(
448 uid=uid, thread=thread, parent=thread_parent)))
449 d_list.append(d)
450 return defer.DeferredList(d_list)
440 451
441 def _addToHistoryEb(self, failure_, data): 452 def _addToHistoryEb(self, failure_, data):
442 failure_.trap(sqlite3.IntegrityError) 453 failure_.trap(sqlite3.IntegrityError)
443 sqlite_msg = failure_.value.args[0] 454 sqlite_msg = failure_.value.args[0]
444 if "UNIQUE constraint failed" in sqlite_msg: 455 if "UNIQUE constraint failed" in sqlite_msg:
445 log.debug(u"message {} is already in history, not storing it again".format(data['uid'])) 456 log.debug(u"message {} is already in history, not storing it again"
457 .format(data['uid']))
446 if 'received_timestamp' not in data: 458 if 'received_timestamp' not in data:
447 log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data)) 459 log.warning(
460 u"duplicate message is not delayed, this is maybe a bug: data={}"
461 .format(data))
448 # we cancel message to avoid sending duplicate message to frontends 462 # we cancel message to avoid sending duplicate message to frontends
449 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) 463 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message"))
450 else: 464 else:
451 log.error(u"Can't store message in history: {}".format(failure_)) 465 log.error(u"Can't store message in history: {}".format(failure_))
452 466
453 def _logHistoryError(self, failure_, from_jid, to_jid, data): 467 def _logHistoryError(self, failure_, from_jid, to_jid, data):
454 if failure_.check(exceptions.CancelError): 468 if failure_.check(exceptions.CancelError):
455 # we propagate CancelError to avoid sending message to frontends 469 # we propagate CancelError to avoid sending message to frontends
456 raise failure_ 470 raise failure_
457 log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" 471 log.error(_(
458 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))) 472 u"Can't save following message in history: from [{from_jid}] to [{to_jid}] "
473 u"(uid: {uid})")
474 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))
459 475
460 def addToHistory(self, data, profile): 476 def addToHistory(self, data, profile):
461 """Store a new message in history 477 """Store a new message in history
462 478
463 @param data(dict): message data as build by SatMessageProtocol.onMessage 479 @param data(dict): message data as build by SatMessageProtocol.onMessage
464 """ 480 """
465 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) 481 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems()
482 if k not in NOT_IN_EXTRA}, 0)
466 from_jid = data['from'] 483 from_jid = data['from']
467 to_jid = data['to'] 484 to_jid = data['to']
468 d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", 485 d = self.dbpool.runQuery(
469 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) 486 u"INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, "
470 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) 487 u"source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES "
488 u"(?,?,?,?,?,?,?,?,?,?,?,?)",
489 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'),
490 self.profiles[profile], data['from'].userhost(), to_jid.userhost(),
491 from_jid.resource, to_jid.resource, data['timestamp'],
492 data.get('received_timestamp'), data['type'], sqlite3.Binary(extra)))
493 d.addCallbacks(self._addToHistoryCb,
494 self._addToHistoryEb,
495 callbackArgs=[data],
496 errbackArgs=[data])
471 d.addErrback(self._logHistoryError, from_jid, to_jid, data) 497 d.addErrback(self._logHistoryError, from_jid, to_jid, data)
472 return d 498 return d
473 499
474 def sqliteHistoryToList(self, query_result): 500 def sqliteHistoryToList(self, query_result):
475 """Get SQL query result and return a list of message data dicts""" 501 """Get SQL query result and return a list of message data dicts"""
476 result = [] 502 result = []
477 current = {'uid': None} 503 current = {'uid': None}
478 for row in reversed(query_result): 504 for row in reversed(query_result):
479 uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ 505 (uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp,
480 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row 506 received_timestamp, type_, extra, message, message_lang, subject,
507 subject_lang, thread, thread_parent) = row
481 if uid != current['uid']: 508 if uid != current['uid']:
482 # new message 509 # new message
483 try: 510 try:
484 extra = pickle.loads(str(extra or "")) 511 extra = pickle.loads(str(extra or ""))
485 except EOFError: 512 except EOFError:
513 current_extra['thread'] = thread 540 current_extra['thread'] = thread
514 if thread_parent is not None: 541 if thread_parent is not None:
515 current_extra['thread_parent'] = thread_parent 542 current_extra['thread_parent'] = thread_parent
516 else: 543 else:
517 if thread_parent is not None: 544 if thread_parent is not None:
518 log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})" 545 log.error(
546 u"Database inconsistency: thread parent without thread (uid: "
547 u"{uid}, thread_parent: {parent})"
519 .format(uid=uid, parent=thread_parent)) 548 .format(uid=uid, parent=thread_parent))
520 549
521 return result 550 return result
522 551
523 def listDict2listTuple(self, messages_data): 552 def listDict2listTuple(self, messages_data):