# HG changeset patch # User Goffi # Date 1466367732 -7200 # Node ID c73e08094a95c349a5f1b4a1bbd1420c4280f7cf # Parent 3e168cde7a7d526415b581d6db5a6b4b00255029 memory (sqlite): better handling of IntegrityError diff -r 3e168cde7a7d -r c73e08094a95 src/memory/sqlite.py --- a/src/memory/sqlite.py Sun Jun 19 22:22:08 2016 +0200 +++ b/src/memory/sqlite.py Sun Jun 19 22:22:12 2016 +0200 @@ -26,11 +26,13 @@ from sat.tools.config import fixConfigOption from twisted.enterprise import adbapi from twisted.internet import defer +from twisted.python import failure from collections import OrderedDict import re import os.path import cPickle as pickle import hashlib +import sqlite3 CURRENT_DB_VERSION = 3 @@ -95,6 +97,17 @@ # this is specific to this sqlite storage and for now only used for received_timestamp # because this value is stored in a separate field + +class ConnectionPool(adbapi.ConnectionPool): + # Workaround to avoid IntegrityError causing (i)pdb to be launched in debug mode + def _runQuery(self, trans, *args, **kw): + try: + trans.execute(*args, **kw) + except sqlite3.IntegrityError as e: + raise failure.Failure(e) + return trans.fetchall() + + class SqliteStorage(object): """This class manage storage with Sqlite database""" @@ -110,7 +123,7 @@ dir_ = os.path.dirname(db_filename) if not os.path.exists(dir_): os.makedirs(dir_, 0700) - self.dbpool = adbapi.ConnectionPool("sqlite3", db_filename, check_same_thread=False) + self.dbpool = ConnectionPool("sqlite3", db_filename, check_same_thread=False) # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done # XXX: foreign_keys activation doesn't seem to work, probably because of the multi-threading @@ -188,9 +201,9 @@ """Delete profile @param name: name of the profile @return: deferred triggered once profile is actually deleted""" - def deletionError(failure): + def deletionError(failure_): log.error(_(u"Can't delete profile [%s]") % name) - return failure + return failure_ def delete(txn): profile_id = self.profiles.pop(name) @@ -292,16 +305,24 @@ d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( uid=uid, thread=thread, parent=thread_parent)))) - def _addToHistoryEb(self, failure, data): - try: - sqlite_msg = failure.value.args[0] - except (AttributeError, IndexError): - raise failure + def _addToHistoryEb(self, failure_, data): + failure_.trap(sqlite3.IntegrityError) + sqlite_msg = failure_.value.args[0] + if "UNIQUE constraint failed" in sqlite_msg: + log.debug(u"message {} is already in history, not storing it again".format(data['uid'])) + if 'received_timestamp' not in data: + log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data)) + # we cancel message to avoid sending duplicate message to frontends + raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) else: - if "UNIQUE constraint failed" in sqlite_msg: - log.debug(u"Message is already in history, not storing it again") - else: - log.error(u"Can't store message in history: {}".format(failure)) + log.error(u"Can't store message in history: {}".format(failure_)) + + def _logHistoryError(self, failure_, from_jid, to_jid, data): + if failure_.check(exceptions.CancelError): + # we propagate CancelError to avoid sending message to frontends + raise failure_ + log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" + .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))) def addToHistory(self, data, profile): """Store a new message in history @@ -314,8 +335,7 @@ d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra)) d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) - d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" - .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])))) + d.addErrback(self._logHistoryError, from_jid, to_jid, data) return d def sqliteHistoryToList(self, query_result):