comparison src/memory/sqlite.py @ 1961:c73e08094a95

memory (sqlite): better handling of IntegrityError
author Goffi <goffi@goffi.org>
date Sun, 19 Jun 2016 22:22:12 +0200
parents 633b5c21aefd
children a45235d8dc93
comparison
equal deleted inserted replaced
1960:3e168cde7a7d 1961:c73e08094a95
24 log = getLogger(__name__) 24 log = getLogger(__name__)
25 from sat.memory.crypto import BlockCipher, PasswordHasher 25 from sat.memory.crypto import BlockCipher, PasswordHasher
26 from sat.tools.config import fixConfigOption 26 from sat.tools.config import fixConfigOption
27 from twisted.enterprise import adbapi 27 from twisted.enterprise import adbapi
28 from twisted.internet import defer 28 from twisted.internet import defer
29 from twisted.python import failure
29 from collections import OrderedDict 30 from collections import OrderedDict
30 import re 31 import re
31 import os.path 32 import os.path
32 import cPickle as pickle 33 import cPickle as pickle
33 import hashlib 34 import hashlib
35 import sqlite3
34 36
35 CURRENT_DB_VERSION = 3 37 CURRENT_DB_VERSION = 3
36 38
37 # XXX: DATABASE schemas are used in the following way: 39 # XXX: DATABASE schemas are used in the following way:
38 # - 'current' key is for the actual database schema, for a new base 40 # - 'current' key is for the actual database schema, for a new base
93 95
94 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field 96 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field
95 # this is specific to this sqlite storage and for now only used for received_timestamp 97 # this is specific to this sqlite storage and for now only used for received_timestamp
96 # because this value is stored in a separate field 98 # because this value is stored in a separate field
97 99
100
101 class ConnectionPool(adbapi.ConnectionPool):
102 # Workaround to avoid IntegrityError causing (i)pdb to be launched in debug mode
103 def _runQuery(self, trans, *args, **kw):
104 try:
105 trans.execute(*args, **kw)
106 except sqlite3.IntegrityError as e:
107 raise failure.Failure(e)
108 return trans.fetchall()
109
110
98 class SqliteStorage(object): 111 class SqliteStorage(object):
99 """This class manage storage with Sqlite database""" 112 """This class manage storage with Sqlite database"""
100 113
101 def __init__(self, db_filename, sat_version): 114 def __init__(self, db_filename, sat_version):
102 """Connect to the given database 115 """Connect to the given database
108 new_base = not os.path.exists(db_filename) # do we have to create the database ? 121 new_base = not os.path.exists(db_filename) # do we have to create the database ?
109 if new_base: # the dir may not exist if it's not the XDG recommended one 122 if new_base: # the dir may not exist if it's not the XDG recommended one
110 dir_ = os.path.dirname(db_filename) 123 dir_ = os.path.dirname(db_filename)
111 if not os.path.exists(dir_): 124 if not os.path.exists(dir_):
112 os.makedirs(dir_, 0700) 125 os.makedirs(dir_, 0700)
113 self.dbpool = adbapi.ConnectionPool("sqlite3", db_filename, check_same_thread=False) 126 self.dbpool = ConnectionPool("sqlite3", db_filename, check_same_thread=False)
114 127
115 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done 128 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done
116 # XXX: foreign_keys activation doesn't seem to work, probably because of the multi-threading 129 # XXX: foreign_keys activation doesn't seem to work, probably because of the multi-threading
117 # All the requests that need to use this feature should be run with runInteraction instead, 130 # All the requests that need to use this feature should be run with runInteraction instead,
118 # so you can set the PRAGMA as it is done in self.deleteProfile 131 # so you can set the PRAGMA as it is done in self.deleteProfile
186 199
187 def deleteProfile(self, name): 200 def deleteProfile(self, name):
188 """Delete profile 201 """Delete profile
189 @param name: name of the profile 202 @param name: name of the profile
190 @return: deferred triggered once profile is actually deleted""" 203 @return: deferred triggered once profile is actually deleted"""
191 def deletionError(failure): 204 def deletionError(failure_):
192 log.error(_(u"Can't delete profile [%s]") % name) 205 log.error(_(u"Can't delete profile [%s]") % name)
193 return failure 206 return failure_
194 207
195 def delete(txn): 208 def delete(txn):
196 profile_id = self.profiles.pop(name) 209 profile_id = self.profiles.pop(name)
197 txn.execute("PRAGMA foreign_keys = ON") 210 txn.execute("PRAGMA foreign_keys = ON")
198 txn.execute("DELETE FROM profiles WHERE name = ?", (name,)) 211 txn.execute("DELETE FROM profiles WHERE name = ?", (name,))
290 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", 303 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)",
291 (uid, thread, thread_parent)) 304 (uid, thread, thread_parent))
292 d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( 305 d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format(
293 uid=uid, thread=thread, parent=thread_parent)))) 306 uid=uid, thread=thread, parent=thread_parent))))
294 307
295 def _addToHistoryEb(self, failure, data): 308 def _addToHistoryEb(self, failure_, data):
296 try: 309 failure_.trap(sqlite3.IntegrityError)
297 sqlite_msg = failure.value.args[0] 310 sqlite_msg = failure_.value.args[0]
298 except (AttributeError, IndexError): 311 if "UNIQUE constraint failed" in sqlite_msg:
299 raise failure 312 log.debug(u"message {} is already in history, not storing it again".format(data['uid']))
313 if 'received_timestamp' not in data:
314 log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data))
315 # we cancel message to avoid sending duplicate message to frontends
316 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message"))
300 else: 317 else:
301 if "UNIQUE constraint failed" in sqlite_msg: 318 log.error(u"Can't store message in history: {}".format(failure_))
302 log.debug(u"Message is already in history, not storing it again") 319
303 else: 320 def _logHistoryError(self, failure_, from_jid, to_jid, data):
304 log.error(u"Can't store message in history: {}".format(failure)) 321 if failure_.check(exceptions.CancelError):
322 # we propagate CancelError to avoid sending message to frontends
323 raise failure_
324 log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})"
325 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])))
305 326
306 def addToHistory(self, data, profile): 327 def addToHistory(self, data, profile):
307 """Store a new message in history 328 """Store a new message in history
308 329
309 @param data(dict): message data as build by SatMessageProtocol.onMessage 330 @param data(dict): message data as build by SatMessageProtocol.onMessage
312 from_jid = data['from'] 333 from_jid = data['from']
313 to_jid = data['to'] 334 to_jid = data['to']
314 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", 335 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
315 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra)) 336 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra))
316 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) 337 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data])
317 d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" 338 d.addErrback(self._logHistoryError, from_jid, to_jid, data)
318 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))))
319 return d 339 return d
320 340
321 def sqliteHistoryToList(self, query_result): 341 def sqliteHistoryToList(self, query_result):
322 """Get SQL query result and return a list of message data dicts""" 342 """Get SQL query result and return a list of message data dicts"""
323 result = [] 343 result = []