Mercurial > libervia-backend
comparison src/memory/sqlite.py @ 1961:c73e08094a95
memory (sqlite): better handling of IntegrityError
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 19 Jun 2016 22:22:12 +0200 |
parents | 633b5c21aefd |
children | a45235d8dc93 |
comparison
equal
deleted
inserted
replaced
1960:3e168cde7a7d | 1961:c73e08094a95 |
---|---|
24 log = getLogger(__name__) | 24 log = getLogger(__name__) |
25 from sat.memory.crypto import BlockCipher, PasswordHasher | 25 from sat.memory.crypto import BlockCipher, PasswordHasher |
26 from sat.tools.config import fixConfigOption | 26 from sat.tools.config import fixConfigOption |
27 from twisted.enterprise import adbapi | 27 from twisted.enterprise import adbapi |
28 from twisted.internet import defer | 28 from twisted.internet import defer |
29 from twisted.python import failure | |
29 from collections import OrderedDict | 30 from collections import OrderedDict |
30 import re | 31 import re |
31 import os.path | 32 import os.path |
32 import cPickle as pickle | 33 import cPickle as pickle |
33 import hashlib | 34 import hashlib |
35 import sqlite3 | |
34 | 36 |
35 CURRENT_DB_VERSION = 3 | 37 CURRENT_DB_VERSION = 3 |
36 | 38 |
37 # XXX: DATABASE schemas are used in the following way: | 39 # XXX: DATABASE schemas are used in the following way: |
38 # - 'current' key is for the actual database schema, for a new base | 40 # - 'current' key is for the actual database schema, for a new base |
93 | 95 |
94 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field | 96 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field |
95 # this is specific to this sqlite storage and for now only used for received_timestamp | 97 # this is specific to this sqlite storage and for now only used for received_timestamp |
96 # because this value is stored in a separate field | 98 # because this value is stored in a separate field |
97 | 99 |
100 | |
101 class ConnectionPool(adbapi.ConnectionPool): | |
102 # Workaround to avoid IntegrityError causing (i)pdb to be launched in debug mode | |
103 def _runQuery(self, trans, *args, **kw): | |
104 try: | |
105 trans.execute(*args, **kw) | |
106 except sqlite3.IntegrityError as e: | |
107 raise failure.Failure(e) | |
108 return trans.fetchall() | |
109 | |
110 | |
98 class SqliteStorage(object): | 111 class SqliteStorage(object): |
99 """This class manage storage with Sqlite database""" | 112 """This class manage storage with Sqlite database""" |
100 | 113 |
101 def __init__(self, db_filename, sat_version): | 114 def __init__(self, db_filename, sat_version): |
102 """Connect to the given database | 115 """Connect to the given database |
108 new_base = not os.path.exists(db_filename) # do we have to create the database ? | 121 new_base = not os.path.exists(db_filename) # do we have to create the database ? |
109 if new_base: # the dir may not exist if it's not the XDG recommended one | 122 if new_base: # the dir may not exist if it's not the XDG recommended one |
110 dir_ = os.path.dirname(db_filename) | 123 dir_ = os.path.dirname(db_filename) |
111 if not os.path.exists(dir_): | 124 if not os.path.exists(dir_): |
112 os.makedirs(dir_, 0700) | 125 os.makedirs(dir_, 0700) |
113 self.dbpool = adbapi.ConnectionPool("sqlite3", db_filename, check_same_thread=False) | 126 self.dbpool = ConnectionPool("sqlite3", db_filename, check_same_thread=False) |
114 | 127 |
115 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done | 128 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done |
116 # XXX: foreign_keys activation doesn't seem to work, probably because of the multi-threading | 129 # XXX: foreign_keys activation doesn't seem to work, probably because of the multi-threading |
117 # All the requests that need to use this feature should be run with runInteraction instead, | 130 # All the requests that need to use this feature should be run with runInteraction instead, |
118 # so you can set the PRAGMA as it is done in self.deleteProfile | 131 # so you can set the PRAGMA as it is done in self.deleteProfile |
186 | 199 |
187 def deleteProfile(self, name): | 200 def deleteProfile(self, name): |
188 """Delete profile | 201 """Delete profile |
189 @param name: name of the profile | 202 @param name: name of the profile |
190 @return: deferred triggered once profile is actually deleted""" | 203 @return: deferred triggered once profile is actually deleted""" |
191 def deletionError(failure): | 204 def deletionError(failure_): |
192 log.error(_(u"Can't delete profile [%s]") % name) | 205 log.error(_(u"Can't delete profile [%s]") % name) |
193 return failure | 206 return failure_ |
194 | 207 |
195 def delete(txn): | 208 def delete(txn): |
196 profile_id = self.profiles.pop(name) | 209 profile_id = self.profiles.pop(name) |
197 txn.execute("PRAGMA foreign_keys = ON") | 210 txn.execute("PRAGMA foreign_keys = ON") |
198 txn.execute("DELETE FROM profiles WHERE name = ?", (name,)) | 211 txn.execute("DELETE FROM profiles WHERE name = ?", (name,)) |
290 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", | 303 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", |
291 (uid, thread, thread_parent)) | 304 (uid, thread, thread_parent)) |
292 d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( | 305 d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( |
293 uid=uid, thread=thread, parent=thread_parent)))) | 306 uid=uid, thread=thread, parent=thread_parent)))) |
294 | 307 |
295 def _addToHistoryEb(self, failure, data): | 308 def _addToHistoryEb(self, failure_, data): |
296 try: | 309 failure_.trap(sqlite3.IntegrityError) |
297 sqlite_msg = failure.value.args[0] | 310 sqlite_msg = failure_.value.args[0] |
298 except (AttributeError, IndexError): | 311 if "UNIQUE constraint failed" in sqlite_msg: |
299 raise failure | 312 log.debug(u"message {} is already in history, not storing it again".format(data['uid'])) |
313 if 'received_timestamp' not in data: | |
314 log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data)) | |
315 # we cancel message to avoid sending duplicate message to frontends | |
316 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) | |
300 else: | 317 else: |
301 if "UNIQUE constraint failed" in sqlite_msg: | 318 log.error(u"Can't store message in history: {}".format(failure_)) |
302 log.debug(u"Message is already in history, not storing it again") | 319 |
303 else: | 320 def _logHistoryError(self, failure_, from_jid, to_jid, data): |
304 log.error(u"Can't store message in history: {}".format(failure)) | 321 if failure_.check(exceptions.CancelError): |
322 # we propagate CancelError to avoid sending message to frontends | |
323 raise failure_ | |
324 log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" | |
325 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))) | |
305 | 326 |
306 def addToHistory(self, data, profile): | 327 def addToHistory(self, data, profile): |
307 """Store a new message in history | 328 """Store a new message in history |
308 | 329 |
309 @param data(dict): message data as build by SatMessageProtocol.onMessage | 330 @param data(dict): message data as build by SatMessageProtocol.onMessage |
312 from_jid = data['from'] | 333 from_jid = data['from'] |
313 to_jid = data['to'] | 334 to_jid = data['to'] |
314 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", | 335 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", |
315 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra)) | 336 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra)) |
316 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) | 337 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) |
317 d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" | 338 d.addErrback(self._logHistoryError, from_jid, to_jid, data) |
318 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])))) | |
319 return d | 339 return d |
320 | 340 |
321 def sqliteHistoryToList(self, query_result): | 341 def sqliteHistoryToList(self, query_result): |
322 """Get SQL query result and return a list of message data dicts""" | 342 """Get SQL query result and return a list of message data dicts""" |
323 result = [] | 343 result = [] |