changeset 3013:860c550028d6

memory (sqlite): properly wait for messages to be writen in database: the deferreds of the queries writing message to databases where not gathered and returned, so the caller was not waiting for them to continue its workflow. This was resulting in messages not always written when database was read just after the write (a case common with MUC implementation), and message was appearing empty when sent to bridge. fix 328
author Goffi <goffi@goffi.org>
date Thu, 18 Jul 2019 21:58:34 +0200 (2019-07-18)
parents 2224fbbd45dd
children b6abf8af87db
files sat/memory/sqlite.py
diffstat 1 files changed, 46 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/sat/memory/sqlite.py	Thu Jul 18 20:26:49 2019 +0200
+++ b/sat/memory/sqlite.py	Thu Jul 18 21:58:34 2019 +0200
@@ -421,30 +421,44 @@
         # Message metadata were successfuly added to history
         # now we can add message and subject
         uid = data['uid']
+        d_list = []
         for key in ('message', 'subject'):
             for lang, value in data[key].iteritems():
-                d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key),
+                d = self.dbpool.runQuery(
+                    "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)"
+                    .format(key=key),
                     (uid, value, lang or None))
-                d.addErrback(lambda __: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format(
-                    key=key, uid=uid, lang=lang, value=value))))
+                d.addErrback(lambda __: log.error(
+                    _(u"Can't save following {key} in history (uid: {uid}, lang:{lang}):"
+                      u" {value}").format(
+                    key=key, uid=uid, lang=lang, value=value)))
+                d_list.append(d)
         try:
             thread = data['extra']['thread']
         except KeyError:
             pass
         else:
             thread_parent = data['extra'].get('thread_parent')
-            d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)",
+            d = self.dbpool.runQuery(
+                "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)",
                 (uid, thread, thread_parent))
-            d.addErrback(lambda __: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format(
-                uid=uid, thread=thread, parent=thread_parent))))
+            d.addErrback(lambda __: log.error(
+                _(u"Can't save following thread in history (uid: {uid}): thread: "
+                  u"{thread}), parent:{parent}").format(
+                uid=uid, thread=thread, parent=thread_parent)))
+            d_list.append(d)
+        return defer.DeferredList(d_list)
 
     def _addToHistoryEb(self, failure_, data):
         failure_.trap(sqlite3.IntegrityError)
         sqlite_msg = failure_.value.args[0]
         if "UNIQUE constraint failed" in sqlite_msg:
-            log.debug(u"message {} is already in history, not storing it again".format(data['uid']))
+            log.debug(u"message {} is already in history, not storing it again"
+                      .format(data['uid']))
             if 'received_timestamp' not in data:
-                log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data))
+                log.warning(
+                    u"duplicate message is not delayed, this is maybe a bug: data={}"
+                    .format(data))
             # we cancel message to avoid sending duplicate message to frontends
             raise failure.Failure(exceptions.CancelError("Cancelled duplicated message"))
         else:
@@ -454,20 +468,32 @@
         if failure_.check(exceptions.CancelError):
             # we propagate CancelError to avoid sending message to frontends
             raise failure_
-        log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})"
-            .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])))
+        log.error(_(
+            u"Can't save following message in history: from [{from_jid}] to [{to_jid}] "
+            u"(uid: {uid})")
+            .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))
 
     def addToHistory(self, data, profile):
         """Store a new message in history
 
         @param data(dict): message data as build by SatMessageProtocol.onMessage
         """
-        extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0)
+        extra = pickle.dumps({k: v for k, v in data['extra'].iteritems()
+                              if k not in NOT_IN_EXTRA}, 0)
         from_jid = data['from']
         to_jid = data['to']
-        d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
-                                 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra)))
-        d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data])
+        d = self.dbpool.runQuery(
+            u"INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, "
+            u"source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES "
+            u"(?,?,?,?,?,?,?,?,?,?,?,?)",
+            (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'),
+            self.profiles[profile], data['from'].userhost(), to_jid.userhost(),
+            from_jid.resource, to_jid.resource, data['timestamp'],
+            data.get('received_timestamp'), data['type'], sqlite3.Binary(extra)))
+        d.addCallbacks(self._addToHistoryCb,
+                       self._addToHistoryEb,
+                       callbackArgs=[data],
+                       errbackArgs=[data])
         d.addErrback(self._logHistoryError, from_jid, to_jid, data)
         return d
 
@@ -476,8 +502,9 @@
         result = []
         current = {'uid': None}
         for row in reversed(query_result):
-            uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
-            type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row
+            (uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp,
+             received_timestamp, type_, extra, message, message_lang, subject,
+             subject_lang, thread, thread_parent) = row
             if uid != current['uid']:
                 # new message
                 try:
@@ -515,7 +542,9 @@
                     current_extra['thread_parent'] = thread_parent
             else:
                 if thread_parent is not None:
-                    log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})"
+                    log.error(
+                        u"Database inconsistency: thread parent without thread (uid: "
+                        u"{uid}, thread_parent: {parent})"
                         .format(uid=uid, parent=thread_parent))
 
         return result