# HG changeset patch # User Goffi # Date 1563479914 -7200 # Node ID 860c550028d639c4ecad236cb6e4dfab07485166 # Parent 2224fbbd45dd61b1be3e894cae45c4c34b3aa1ad memory (sqlite): properly wait for messages to be writen in database: the deferreds of the queries writing message to databases where not gathered and returned, so the caller was not waiting for them to continue its workflow. This was resulting in messages not always written when database was read just after the write (a case common with MUC implementation), and message was appearing empty when sent to bridge. fix 328 diff -r 2224fbbd45dd -r 860c550028d6 sat/memory/sqlite.py --- a/sat/memory/sqlite.py Thu Jul 18 20:26:49 2019 +0200 +++ b/sat/memory/sqlite.py Thu Jul 18 21:58:34 2019 +0200 @@ -421,30 +421,44 @@ # Message metadata were successfuly added to history # now we can add message and subject uid = data['uid'] + d_list = [] for key in ('message', 'subject'): for lang, value in data[key].iteritems(): - d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key), + d = self.dbpool.runQuery( + "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)" + .format(key=key), (uid, value, lang or None)) - d.addErrback(lambda __: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format( - key=key, uid=uid, lang=lang, value=value)))) + d.addErrback(lambda __: log.error( + _(u"Can't save following {key} in history (uid: {uid}, lang:{lang}):" + u" {value}").format( + key=key, uid=uid, lang=lang, value=value))) + d_list.append(d) try: thread = data['extra']['thread'] except KeyError: pass else: thread_parent = data['extra'].get('thread_parent') - d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", + d = self.dbpool.runQuery( + "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", (uid, thread, thread_parent)) - d.addErrback(lambda __: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( - uid=uid, thread=thread, parent=thread_parent)))) + d.addErrback(lambda __: log.error( + _(u"Can't save following thread in history (uid: {uid}): thread: " + u"{thread}), parent:{parent}").format( + uid=uid, thread=thread, parent=thread_parent))) + d_list.append(d) + return defer.DeferredList(d_list) def _addToHistoryEb(self, failure_, data): failure_.trap(sqlite3.IntegrityError) sqlite_msg = failure_.value.args[0] if "UNIQUE constraint failed" in sqlite_msg: - log.debug(u"message {} is already in history, not storing it again".format(data['uid'])) + log.debug(u"message {} is already in history, not storing it again" + .format(data['uid'])) if 'received_timestamp' not in data: - log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data)) + log.warning( + u"duplicate message is not delayed, this is maybe a bug: data={}" + .format(data)) # we cancel message to avoid sending duplicate message to frontends raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) else: @@ -454,20 +468,32 @@ if failure_.check(exceptions.CancelError): # we propagate CancelError to avoid sending message to frontends raise failure_ - log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" - .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))) + log.error(_( + u"Can't save following message in history: from [{from_jid}] to [{to_jid}] " + u"(uid: {uid})") + .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])) def addToHistory(self, data, profile): """Store a new message in history @param data(dict): message data as build by SatMessageProtocol.onMessage """ - extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) + extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() + if k not in NOT_IN_EXTRA}, 0) from_jid = data['from'] to_jid = data['to'] - d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", - (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) - d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) + d = self.dbpool.runQuery( + u"INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, " + u"source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES " + u"(?,?,?,?,?,?,?,?,?,?,?,?)", + (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), + self.profiles[profile], data['from'].userhost(), to_jid.userhost(), + from_jid.resource, to_jid.resource, data['timestamp'], + data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) + d.addCallbacks(self._addToHistoryCb, + self._addToHistoryEb, + callbackArgs=[data], + errbackArgs=[data]) d.addErrback(self._logHistoryError, from_jid, to_jid, data) return d @@ -476,8 +502,9 @@ result = [] current = {'uid': None} for row in reversed(query_result): - uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ - type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row + (uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, + received_timestamp, type_, extra, message, message_lang, subject, + subject_lang, thread, thread_parent) = row if uid != current['uid']: # new message try: @@ -515,7 +542,9 @@ current_extra['thread_parent'] = thread_parent else: if thread_parent is not None: - log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})" + log.error( + u"Database inconsistency: thread parent without thread (uid: " + u"{uid}, thread_parent: {parent})" .format(uid=uid, parent=thread_parent)) return result