Mercurial > libervia-backend
diff sat/memory/sqlite.py @ 3013:860c550028d6
memory (sqlite): properly wait for messages to be writen in database:
the deferreds of the queries writing message to databases where not gathered and returned,
so the caller was not waiting for them to continue its workflow. This was resulting in
messages not always written when database was read just after the write (a case common
with MUC implementation), and message was appearing empty when sent to bridge.
fix 328
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 18 Jul 2019 21:58:34 +0200 |
parents | c13333fcde5e |
children | ab2696e34d29 |
line wrap: on
line diff
--- a/sat/memory/sqlite.py Thu Jul 18 20:26:49 2019 +0200 +++ b/sat/memory/sqlite.py Thu Jul 18 21:58:34 2019 +0200 @@ -421,30 +421,44 @@ # Message metadata were successfuly added to history # now we can add message and subject uid = data['uid'] + d_list = [] for key in ('message', 'subject'): for lang, value in data[key].iteritems(): - d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key), + d = self.dbpool.runQuery( + "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)" + .format(key=key), (uid, value, lang or None)) - d.addErrback(lambda __: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format( - key=key, uid=uid, lang=lang, value=value)))) + d.addErrback(lambda __: log.error( + _(u"Can't save following {key} in history (uid: {uid}, lang:{lang}):" + u" {value}").format( + key=key, uid=uid, lang=lang, value=value))) + d_list.append(d) try: thread = data['extra']['thread'] except KeyError: pass else: thread_parent = data['extra'].get('thread_parent') - d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", + d = self.dbpool.runQuery( + "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", (uid, thread, thread_parent)) - d.addErrback(lambda __: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( - uid=uid, thread=thread, parent=thread_parent)))) + d.addErrback(lambda __: log.error( + _(u"Can't save following thread in history (uid: {uid}): thread: " + u"{thread}), parent:{parent}").format( + uid=uid, thread=thread, parent=thread_parent))) + d_list.append(d) + return defer.DeferredList(d_list) def _addToHistoryEb(self, failure_, data): failure_.trap(sqlite3.IntegrityError) sqlite_msg = failure_.value.args[0] if "UNIQUE constraint failed" in sqlite_msg: - log.debug(u"message {} is already in history, not storing it again".format(data['uid'])) + log.debug(u"message {} is already in history, not storing it again" + .format(data['uid'])) if 'received_timestamp' not in data: - log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data)) + log.warning( + u"duplicate message is not delayed, this is maybe a bug: data={}" + .format(data)) # we cancel message to avoid sending duplicate message to frontends raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) else: @@ -454,20 +468,32 @@ if failure_.check(exceptions.CancelError): # we propagate CancelError to avoid sending message to frontends raise failure_ - log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" - .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))) + log.error(_( + u"Can't save following message in history: from [{from_jid}] to [{to_jid}] " + u"(uid: {uid})") + .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])) def addToHistory(self, data, profile): """Store a new message in history @param data(dict): message data as build by SatMessageProtocol.onMessage """ - extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) + extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() + if k not in NOT_IN_EXTRA}, 0) from_jid = data['from'] to_jid = data['to'] - d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", - (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) - d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) + d = self.dbpool.runQuery( + u"INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, " + u"source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES " + u"(?,?,?,?,?,?,?,?,?,?,?,?)", + (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), + self.profiles[profile], data['from'].userhost(), to_jid.userhost(), + from_jid.resource, to_jid.resource, data['timestamp'], + data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) + d.addCallbacks(self._addToHistoryCb, + self._addToHistoryEb, + callbackArgs=[data], + errbackArgs=[data]) d.addErrback(self._logHistoryError, from_jid, to_jid, data) return d @@ -476,8 +502,9 @@ result = [] current = {'uid': None} for row in reversed(query_result): - uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ - type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row + (uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, + received_timestamp, type_, extra, message, message_lang, subject, + subject_lang, thread, thread_parent) = row if uid != current['uid']: # new message try: @@ -515,7 +542,9 @@ current_extra['thread_parent'] = thread_parent else: if thread_parent is not None: - log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})" + log.error( + u"Database inconsistency: thread parent without thread (uid: " + u"{uid}, thread_parent: {parent})" .format(uid=uid, parent=thread_parent)) return result