Mercurial > libervia-backend
comparison sat/memory/sqlite.py @ 3013:860c550028d6
memory (sqlite): properly wait for messages to be writen in database:
the deferreds of the queries writing message to databases where not gathered and returned,
so the caller was not waiting for them to continue its workflow. This was resulting in
messages not always written when database was read just after the write (a case common
with MUC implementation), and message was appearing empty when sent to bridge.
fix 328
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 18 Jul 2019 21:58:34 +0200 |
parents | c13333fcde5e |
children | ab2696e34d29 |
comparison
equal
deleted
inserted
replaced
3012:2224fbbd45dd | 3013:860c550028d6 |
---|---|
419 | 419 |
420 def _addToHistoryCb(self, __, data): | 420 def _addToHistoryCb(self, __, data): |
421 # Message metadata were successfuly added to history | 421 # Message metadata were successfuly added to history |
422 # now we can add message and subject | 422 # now we can add message and subject |
423 uid = data['uid'] | 423 uid = data['uid'] |
424 d_list = [] | |
424 for key in ('message', 'subject'): | 425 for key in ('message', 'subject'): |
425 for lang, value in data[key].iteritems(): | 426 for lang, value in data[key].iteritems(): |
426 d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key), | 427 d = self.dbpool.runQuery( |
428 "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)" | |
429 .format(key=key), | |
427 (uid, value, lang or None)) | 430 (uid, value, lang or None)) |
428 d.addErrback(lambda __: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format( | 431 d.addErrback(lambda __: log.error( |
429 key=key, uid=uid, lang=lang, value=value)))) | 432 _(u"Can't save following {key} in history (uid: {uid}, lang:{lang}):" |
433 u" {value}").format( | |
434 key=key, uid=uid, lang=lang, value=value))) | |
435 d_list.append(d) | |
430 try: | 436 try: |
431 thread = data['extra']['thread'] | 437 thread = data['extra']['thread'] |
432 except KeyError: | 438 except KeyError: |
433 pass | 439 pass |
434 else: | 440 else: |
435 thread_parent = data['extra'].get('thread_parent') | 441 thread_parent = data['extra'].get('thread_parent') |
436 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", | 442 d = self.dbpool.runQuery( |
443 "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", | |
437 (uid, thread, thread_parent)) | 444 (uid, thread, thread_parent)) |
438 d.addErrback(lambda __: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( | 445 d.addErrback(lambda __: log.error( |
439 uid=uid, thread=thread, parent=thread_parent)))) | 446 _(u"Can't save following thread in history (uid: {uid}): thread: " |
447 u"{thread}), parent:{parent}").format( | |
448 uid=uid, thread=thread, parent=thread_parent))) | |
449 d_list.append(d) | |
450 return defer.DeferredList(d_list) | |
440 | 451 |
441 def _addToHistoryEb(self, failure_, data): | 452 def _addToHistoryEb(self, failure_, data): |
442 failure_.trap(sqlite3.IntegrityError) | 453 failure_.trap(sqlite3.IntegrityError) |
443 sqlite_msg = failure_.value.args[0] | 454 sqlite_msg = failure_.value.args[0] |
444 if "UNIQUE constraint failed" in sqlite_msg: | 455 if "UNIQUE constraint failed" in sqlite_msg: |
445 log.debug(u"message {} is already in history, not storing it again".format(data['uid'])) | 456 log.debug(u"message {} is already in history, not storing it again" |
457 .format(data['uid'])) | |
446 if 'received_timestamp' not in data: | 458 if 'received_timestamp' not in data: |
447 log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data)) | 459 log.warning( |
460 u"duplicate message is not delayed, this is maybe a bug: data={}" | |
461 .format(data)) | |
448 # we cancel message to avoid sending duplicate message to frontends | 462 # we cancel message to avoid sending duplicate message to frontends |
449 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) | 463 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) |
450 else: | 464 else: |
451 log.error(u"Can't store message in history: {}".format(failure_)) | 465 log.error(u"Can't store message in history: {}".format(failure_)) |
452 | 466 |
453 def _logHistoryError(self, failure_, from_jid, to_jid, data): | 467 def _logHistoryError(self, failure_, from_jid, to_jid, data): |
454 if failure_.check(exceptions.CancelError): | 468 if failure_.check(exceptions.CancelError): |
455 # we propagate CancelError to avoid sending message to frontends | 469 # we propagate CancelError to avoid sending message to frontends |
456 raise failure_ | 470 raise failure_ |
457 log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" | 471 log.error(_( |
458 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))) | 472 u"Can't save following message in history: from [{from_jid}] to [{to_jid}] " |
473 u"(uid: {uid})") | |
474 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])) | |
459 | 475 |
460 def addToHistory(self, data, profile): | 476 def addToHistory(self, data, profile): |
461 """Store a new message in history | 477 """Store a new message in history |
462 | 478 |
463 @param data(dict): message data as build by SatMessageProtocol.onMessage | 479 @param data(dict): message data as build by SatMessageProtocol.onMessage |
464 """ | 480 """ |
465 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) | 481 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() |
482 if k not in NOT_IN_EXTRA}, 0) | |
466 from_jid = data['from'] | 483 from_jid = data['from'] |
467 to_jid = data['to'] | 484 to_jid = data['to'] |
468 d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", | 485 d = self.dbpool.runQuery( |
469 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) | 486 u"INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, " |
470 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) | 487 u"source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES " |
488 u"(?,?,?,?,?,?,?,?,?,?,?,?)", | |
489 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), | |
490 self.profiles[profile], data['from'].userhost(), to_jid.userhost(), | |
491 from_jid.resource, to_jid.resource, data['timestamp'], | |
492 data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) | |
493 d.addCallbacks(self._addToHistoryCb, | |
494 self._addToHistoryEb, | |
495 callbackArgs=[data], | |
496 errbackArgs=[data]) | |
471 d.addErrback(self._logHistoryError, from_jid, to_jid, data) | 497 d.addErrback(self._logHistoryError, from_jid, to_jid, data) |
472 return d | 498 return d |
473 | 499 |
474 def sqliteHistoryToList(self, query_result): | 500 def sqliteHistoryToList(self, query_result): |
475 """Get SQL query result and return a list of message data dicts""" | 501 """Get SQL query result and return a list of message data dicts""" |
476 result = [] | 502 result = [] |
477 current = {'uid': None} | 503 current = {'uid': None} |
478 for row in reversed(query_result): | 504 for row in reversed(query_result): |
479 uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ | 505 (uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, |
480 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row | 506 received_timestamp, type_, extra, message, message_lang, subject, |
507 subject_lang, thread, thread_parent) = row | |
481 if uid != current['uid']: | 508 if uid != current['uid']: |
482 # new message | 509 # new message |
483 try: | 510 try: |
484 extra = pickle.loads(str(extra or "")) | 511 extra = pickle.loads(str(extra or "")) |
485 except EOFError: | 512 except EOFError: |
513 current_extra['thread'] = thread | 540 current_extra['thread'] = thread |
514 if thread_parent is not None: | 541 if thread_parent is not None: |
515 current_extra['thread_parent'] = thread_parent | 542 current_extra['thread_parent'] = thread_parent |
516 else: | 543 else: |
517 if thread_parent is not None: | 544 if thread_parent is not None: |
518 log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})" | 545 log.error( |
546 u"Database inconsistency: thread parent without thread (uid: " | |
547 u"{uid}, thread_parent: {parent})" | |
519 .format(uid=uid, parent=thread_parent)) | 548 .format(uid=uid, parent=thread_parent)) |
520 | 549 |
521 return result | 550 return result |
522 | 551 |
523 def listDict2listTuple(self, messages_data): | 552 def listDict2listTuple(self, messages_data): |