changeset 1961:c73e08094a95

memory (sqlite): better handling of IntegrityError
author Goffi <goffi@goffi.org>
date Sun, 19 Jun 2016 22:22:12 +0200
parents 3e168cde7a7d
children a45235d8dc93
files src/memory/sqlite.py
diffstat 1 files changed, 34 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/src/memory/sqlite.py	Sun Jun 19 22:22:08 2016 +0200
+++ b/src/memory/sqlite.py	Sun Jun 19 22:22:12 2016 +0200
@@ -26,11 +26,13 @@
 from sat.tools.config import fixConfigOption
 from twisted.enterprise import adbapi
 from twisted.internet import defer
+from twisted.python import failure
 from collections import OrderedDict
 import re
 import os.path
 import cPickle as pickle
 import hashlib
+import sqlite3
 
 CURRENT_DB_VERSION = 3
 
@@ -95,6 +97,17 @@
                                                     # this is specific to this sqlite storage and for now only used for received_timestamp
                                                     # because this value is stored in a separate field
 
+
+class ConnectionPool(adbapi.ConnectionPool):
+    # Workaround to avoid IntegrityError causing (i)pdb to be launched in debug mode
+    def _runQuery(self, trans, *args, **kw):
+        try:
+            trans.execute(*args, **kw)
+        except sqlite3.IntegrityError as e:
+            raise failure.Failure(e)
+        return trans.fetchall()
+
+
 class SqliteStorage(object):
     """This class manage storage with Sqlite database"""
 
@@ -110,7 +123,7 @@
             dir_ = os.path.dirname(db_filename)
             if not os.path.exists(dir_):
                 os.makedirs(dir_, 0700)
-        self.dbpool = adbapi.ConnectionPool("sqlite3", db_filename, check_same_thread=False)
+        self.dbpool = ConnectionPool("sqlite3", db_filename, check_same_thread=False)
 
         # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done
         # XXX: foreign_keys activation doesn't seem to work, probably because of the multi-threading
@@ -188,9 +201,9 @@
         """Delete profile
         @param name: name of the profile
         @return: deferred triggered once profile is actually deleted"""
-        def deletionError(failure):
+        def deletionError(failure_):
             log.error(_(u"Can't delete profile [%s]") % name)
-            return failure
+            return failure_
 
         def delete(txn):
             profile_id = self.profiles.pop(name)
@@ -292,16 +305,24 @@
             d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format(
                 uid=uid, thread=thread, parent=thread_parent))))
 
-    def _addToHistoryEb(self, failure, data):
-        try:
-            sqlite_msg = failure.value.args[0]
-        except (AttributeError, IndexError):
-            raise failure
+    def _addToHistoryEb(self, failure_, data):
+        failure_.trap(sqlite3.IntegrityError)
+        sqlite_msg = failure_.value.args[0]
+        if "UNIQUE constraint failed" in sqlite_msg:
+            log.debug(u"message {} is already in history, not storing it again".format(data['uid']))
+            if 'received_timestamp' not in data:
+                log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data))
+            # we cancel message to avoid sending duplicate message to frontends
+            raise failure.Failure(exceptions.CancelError("Cancelled duplicated message"))
         else:
-            if "UNIQUE constraint failed" in sqlite_msg:
-                log.debug(u"Message is already in history, not storing it again")
-            else:
-                log.error(u"Can't store message in history: {}".format(failure))
+            log.error(u"Can't store message in history: {}".format(failure_))
+
+    def _logHistoryError(self, failure_, from_jid, to_jid, data):
+        if failure_.check(exceptions.CancelError):
+            # we propagate CancelError to avoid sending message to frontends
+            raise failure_
+        log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})"
+            .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])))
 
     def addToHistory(self, data, profile):
         """Store a new message in history
@@ -314,8 +335,7 @@
         d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
                                  (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra))
         d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data])
-        d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})"
-            .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))))
+        d.addErrback(self._logHistoryError, from_jid, to_jid, data)
         return d
 
     def sqliteHistoryToList(self, query_result):