Mercurial > libervia-backend
comparison src/memory/sqlite.py @ 1955:633b5c21aefd
backend, frontend: messages refactoring (huge commit, not finished):
/!\ database schema has been modified, do a backup before updating
message have been refactored, here are the main changes:
- languages are now handled
- all messages have an uid (internal to SàT)
- message updating is anticipated
- subject is now first class
- new naming scheme is used newMessage => messageNew, getHistory => historyGet, sendMessage => messageSend
- minimal compatibility refactoring in quick_frontend/Primitivus, better refactoring should follow
- threads handling
- delayed messages are saved into history
- info messages may also be saved in history (e.g. to keep track of people joining/leaving a room)
- duplicate messages should be avoided
- historyGet return messages in right order, no need to sort again
- plugins have been updated to follow new features, some of them need to be reworked (e.g. OTR)
- XEP-0203 (Delayed Delivery) is now fully handled in core, the plugin just handle disco and creation of a delay element
- /!\ jp and Libervia are currently broken, as some features of Primitivus
It has been put in one huge commit to avoid breaking messaging between changes.
This is the main part of message refactoring, other commits will follow to take profit of the new features/behaviour.
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 24 May 2016 22:11:04 +0200 |
parents | 2daf7b4c6756 |
children | c73e08094a95 |
comparison
equal
deleted
inserted
replaced
1943:ccfe45302a5c | 1955:633b5c21aefd |
---|---|
25 from sat.memory.crypto import BlockCipher, PasswordHasher | 25 from sat.memory.crypto import BlockCipher, PasswordHasher |
26 from sat.tools.config import fixConfigOption | 26 from sat.tools.config import fixConfigOption |
27 from twisted.enterprise import adbapi | 27 from twisted.enterprise import adbapi |
28 from twisted.internet import defer | 28 from twisted.internet import defer |
29 from collections import OrderedDict | 29 from collections import OrderedDict |
30 from time import time | |
31 import re | 30 import re |
32 import os.path | 31 import os.path |
33 import cPickle as pickle | 32 import cPickle as pickle |
34 import hashlib | 33 import hashlib |
35 | 34 |
36 CURRENT_DB_VERSION = 2 | 35 CURRENT_DB_VERSION = 3 |
37 | 36 |
38 # XXX: DATABASE schemas are used in the following way: | 37 # XXX: DATABASE schemas are used in the following way: |
39 # - 'current' key is for the actual database schema, for a new base | 38 # - 'current' key is for the actual database schema, for a new base |
40 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update | 39 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update |
41 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed | 40 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed |
47 DATABASE_SCHEMAS = { | 46 DATABASE_SCHEMAS = { |
48 "current": {'CREATE': OrderedDict(( | 47 "current": {'CREATE': OrderedDict(( |
49 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"), | 48 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"), |
50 ("UNIQUE (name)",))), | 49 ("UNIQUE (name)",))), |
51 ('message_types', (("type TEXT PRIMARY KEY",), | 50 ('message_types', (("type TEXT PRIMARY KEY",), |
52 tuple())), | 51 tuple())), |
53 ('history', (("id INTEGER PRIMARY KEY ASC", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", "timestamp DATETIME", "message TEXT", "type TEXT", "extra BLOB"), | 52 ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", |
54 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)"))), | 53 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) |
54 "type TEXT", "extra BLOB"), | |
55 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", | |
56 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed cones) | |
57 ))), | |
58 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"), | |
59 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), | |
60 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"), | |
61 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), | |
62 ('thread', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "thread_id TEXT", "parent_id TEXT"),("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), | |
55 ('param_gen', (("category TEXT", "name TEXT", "value TEXT"), | 63 ('param_gen', (("category TEXT", "name TEXT", "value TEXT"), |
56 ("PRIMARY KEY (category,name)",))), | 64 ("PRIMARY KEY (category,name)",))), |
57 ('param_ind', (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"), | 65 ('param_ind', (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"), |
58 ("PRIMARY KEY (category,name,profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), | 66 ("PRIMARY KEY (category,name,profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), |
59 ('private_gen', (("namespace TEXT", "key TEXT", "value TEXT"), | 67 ('private_gen', (("namespace TEXT", "key TEXT", "value TEXT"), |
64 ("PRIMARY KEY (namespace, key)",))), | 72 ("PRIMARY KEY (namespace, key)",))), |
65 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"), | 73 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"), |
66 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))) | 74 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))) |
67 )), | 75 )), |
68 'INSERT': OrderedDict(( | 76 'INSERT': OrderedDict(( |
69 ('message_types', (("'chat'",), ("'error'",), ("'groupchat'",), ("'headline'",), ("'normal'",))), | 77 ('message_types', (("'chat'",), |
78 ("'error'",), | |
79 ("'groupchat'",), | |
80 ("'headline'",), | |
81 ("'normal'",), | |
82 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC | |
83 )), | |
70 )), | 84 )), |
71 }, | 85 }, |
86 3: {'specific': 'update_v3' | |
87 }, | |
72 2: {'specific': 'update2raw_v2' | 88 2: {'specific': 'update2raw_v2' |
73 }, | 89 }, |
74 1: {'cols create': {'history': ('extra BLOB',)} | 90 1: {'cols create': {'history': ('extra BLOB',)}, |
75 }, | 91 }, |
76 } | 92 } |
77 | 93 |
94 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field | |
95 # this is specific to this sqlite storage and for now only used for received_timestamp | |
96 # because this value is stored in a separate field | |
78 | 97 |
79 class SqliteStorage(object): | 98 class SqliteStorage(object): |
80 """This class manage storage with Sqlite database""" | 99 """This class manage storage with Sqlite database""" |
81 | 100 |
82 def __init__(self, db_filename, sat_version): | 101 def __init__(self, db_filename, sat_version): |
112 | 131 |
113 def commitStatements(statements): | 132 def commitStatements(statements): |
114 | 133 |
115 if statements is None: | 134 if statements is None: |
116 return defer.succeed(None) | 135 return defer.succeed(None) |
117 log.debug(u"===== COMMITING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) | 136 log.debug(u"===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) |
118 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) | 137 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) |
119 return d | 138 return d |
120 | 139 |
121 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql()) | 140 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql()) |
122 init_defer.addCallback(commitStatements) | 141 init_defer.addCallback(commitStatements) |
249 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) | 268 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) |
250 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) | 269 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) |
251 return d | 270 return d |
252 | 271 |
253 #History | 272 #History |
254 def addToHistory(self, from_jid, to_jid, message, _type='chat', extra=None, timestamp=None, profile=None): | 273 |
274 def _addToHistoryCb(self, dummy, data): | |
275 # Message metadata were successfuly added to history | |
276 # now we can add message and subject | |
277 uid = data['uid'] | |
278 for key in ('message', 'subject'): | |
279 for lang, value in data[key].iteritems(): | |
280 d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key), | |
281 (uid, value, lang or None)) | |
282 d.addErrback(lambda dummy: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format( | |
283 key=key, uid=uid, lang=lang, value=value)))) | |
284 try: | |
285 thread = data['extra']['thread'] | |
286 except KeyError: | |
287 pass | |
288 else: | |
289 thread_parent = data['extra'].get('thread_parent') | |
290 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", | |
291 (uid, thread, thread_parent)) | |
292 d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( | |
293 uid=uid, thread=thread, parent=thread_parent)))) | |
294 | |
295 def _addToHistoryEb(self, failure, data): | |
296 try: | |
297 sqlite_msg = failure.value.args[0] | |
298 except (AttributeError, IndexError): | |
299 raise failure | |
300 else: | |
301 if "UNIQUE constraint failed" in sqlite_msg: | |
302 log.debug(u"Message is already in history, not storing it again") | |
303 else: | |
304 log.error(u"Can't store message in history: {}".format(failure)) | |
305 | |
306 def addToHistory(self, data, profile): | |
255 """Store a new message in history | 307 """Store a new message in history |
256 @param from_jid: full source JID | 308 |
257 @param to_jid: full dest JID | 309 @param data(dict): message data as build by SatMessageProtocol.onMessage |
258 @param message: message | 310 """ |
259 @param _type: message type (see RFC 6121 §5.2.2) | 311 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) |
260 @param extra: dictionary (keys and values are unicode) of extra message data | 312 from_jid = data['from'] |
261 @param timestamp: timestamp in seconds since epoch, or None to use current time | 313 to_jid = data['to'] |
262 """ | 314 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", |
263 assert(profile) | 315 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra)) |
264 if extra is None: | 316 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) |
265 extra = {} | 317 d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" |
266 extra_ = pickle.dumps({k: v.encode('utf-8') for k, v in extra.items()}, 0).decode('utf-8') | 318 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])))) |
267 d = self.dbpool.runQuery("INSERT INTO history(source, source_res, dest, dest_res, timestamp, message, type, extra, profile_id) VALUES (?,?,?,?,?,?,?,?,?)", | 319 return d |
268 (from_jid.userhost(), from_jid.resource, to_jid.userhost(), to_jid.resource, timestamp or time(), | 320 |
269 message, _type, extra_, self.profiles[profile])) | 321 def sqliteHistoryToList(self, query_result): |
270 d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [%(from_jid)s] to [%(to_jid)s] ==> [%(message)s]" % | 322 """Get SQL query result and return a list of message data dicts""" |
271 {"from_jid": from_jid.full(), "to_jid": to_jid.full(), "message": message}))) | 323 result = [] |
272 return d | 324 current = {'uid': None} |
273 | 325 for row in reversed(query_result): |
274 def getHistory(self, from_jid, to_jid, limit=None, between=True, search=None, profile=None): | 326 uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ |
327 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row | |
328 if uid != current['uid']: | |
329 # new message | |
330 try: | |
331 extra = pickle.loads(str(extra or "")) | |
332 except EOFError: | |
333 extra = {} | |
334 current = { | |
335 'from': "%s/%s" % (source, source_res) if source_res else source, | |
336 'to': "%s/%s" % (dest, dest_res) if dest_res else dest, | |
337 'uid': uid, | |
338 'message': {}, | |
339 'subject': {}, | |
340 'type': type_, | |
341 'extra': extra, | |
342 'timestamp': timestamp, | |
343 } | |
344 if update_uid is not None: | |
345 current['extra']['update_uid'] = update_uid | |
346 if received_timestamp is not None: | |
347 current['extra']['received_timestamp'] = str(received_timestamp) | |
348 result.append(current) | |
349 | |
350 if message is not None: | |
351 current['message'][message_lang or ''] = message | |
352 | |
353 if subject is not None: | |
354 current['subject'][subject_lang or ''] = subject | |
355 | |
356 if thread is not None: | |
357 current_extra = current['extra'] | |
358 current_extra['thread'] = thread | |
359 if thread_parent is not None: | |
360 current_extra['thread_parent'] = thread_parent | |
361 else: | |
362 if thread_parent is not None: | |
363 log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})" | |
364 .format(uid=uid, parent=thread_parent)) | |
365 | |
366 return result | |
367 | |
368 def listDict2listTuple(self, messages_data): | |
369 """Return a list of tuple as used in bridge from a list of messages data""" | |
370 ret = [] | |
371 for m in messages_data: | |
372 ret.append((m['uid'], m['timestamp'], m['from'], m['to'], m['message'], m['subject'], m['type'], m['extra'])) | |
373 return ret | |
374 | |
375 def historyGet(self, from_jid, to_jid, limit=None, between=True, search=None, profile=None): | |
275 """Retrieve messages in history | 376 """Retrieve messages in history |
377 | |
276 @param from_jid (JID): source JID (full, or bare for catchall) | 378 @param from_jid (JID): source JID (full, or bare for catchall) |
277 @param to_jid (JID): dest JID (full, or bare for catchall) | 379 @param to_jid (JID): dest JID (full, or bare for catchall) |
278 @param limit (int): maximum number of messages to get: | 380 @param limit (int): maximum number of messages to get: |
279 - 0 for no message (returns the empty list) | 381 - 0 for no message (returns the empty list) |
280 - None for unlimited | 382 - None for unlimited |
281 @param between (bool): confound source and dest (ignore the direction) | 383 @param between (bool): confound source and dest (ignore the direction) |
282 @param search (str): pattern to filter the history results | 384 @param search (unicode): pattern to filter the history results |
283 @param profile (str): %(doc_profile)s | 385 @param profile (unicode): %(doc_profile)s |
284 @return: list of tuple as in http://wiki.goffi.org/wiki/Bridge_API#getHistory | 386 @return: list of tuple as in [messageNew] |
285 """ | 387 """ |
286 assert(profile) | 388 assert profile |
287 if limit == 0: | 389 if limit == 0: |
288 return defer.succeed([]) | 390 return defer.succeed([]) |
289 | 391 |
290 def sqliteToList(query_result): | 392 query_parts = ["SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ |
291 query_result.reverse() | 393 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ |
292 result = [] | 394 FROM history LEFT JOIN message ON history.uid = message.history_uid\ |
293 for row in query_result: | 395 LEFT JOIN subject ON history.uid=subject.history_uid\ |
294 timestamp, source, source_res, dest, dest_res, message, type_, extra_raw = row | 396 LEFT JOIN thread ON history.uid=thread.history_uid\ |
295 try: | 397 WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here |
296 extra = pickle.loads(str(extra_raw or "")) | |
297 except EOFError: | |
298 extra = {} | |
299 result.append((timestamp, "%s/%s" % (source, source_res) if source_res else source, | |
300 "%s/%s" % (dest, dest_res) if dest_res else dest, | |
301 message, type_, extra)) | |
302 return result | |
303 | |
304 query_parts = ["SELECT timestamp, source, source_res, dest, dest_res, message, type, extra FROM history WHERE profile_id=? AND"] | |
305 values = [self.profiles[profile]] | 398 values = [self.profiles[profile]] |
306 | 399 |
307 def test_jid(type_, _jid): | 400 def test_jid(type_, _jid): |
308 values.append(_jid.userhost()) | 401 values.append(_jid.userhost()) |
309 if _jid.resource: | 402 if _jid.resource: |
322 if search: | 415 if search: |
323 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html | 416 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html |
324 query_parts.append("AND message GLOB ?") | 417 query_parts.append("AND message GLOB ?") |
325 values.append("*%s*" % search) | 418 values.append("*%s*" % search) |
326 | 419 |
327 query_parts.append("ORDER BY timestamp DESC") | 420 query_parts.append("ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList |
328 | 421 # we use DESC here so LIMIT keep the last messages |
329 if limit is not None: | 422 if limit is not None: |
330 query_parts.append("LIMIT ?") | 423 query_parts.append("LIMIT ?") |
331 values.append(limit) | 424 values.append(limit) |
332 | 425 |
333 d = self.dbpool.runQuery(" ".join(query_parts), values) | 426 d = self.dbpool.runQuery(" ".join(query_parts), values) |
334 return d.addCallback(sqliteToList) | 427 d.addCallback(self.sqliteHistoryToList) |
428 d.addCallback(self.listDict2listTuple) | |
429 return d | |
335 | 430 |
336 #Private values | 431 #Private values |
337 def loadGenPrivates(self, private_gen, namespace): | 432 def loadGenPrivates(self, private_gen, namespace): |
338 """Load general private values | 433 """Load general private values |
339 @param private_gen: dictionary to fill | 434 @param private_gen: dictionary to fill |
663 schema_dict[table] = (tuple(col_defs), tuple(constraints)) | 758 schema_dict[table] = (tuple(col_defs), tuple(constraints)) |
664 return schema_dict | 759 return schema_dict |
665 | 760 |
666 def generateUpdateData(self, old_data, new_data, modify=False): | 761 def generateUpdateData(self, old_data, new_data, modify=False): |
667 """ Generate data for automatic update between two schema data | 762 """ Generate data for automatic update between two schema data |
763 | |
668 @param old_data: data of the former schema (which must be updated) | 764 @param old_data: data of the former schema (which must be updated) |
669 @param new_data: data of the current schema | 765 @param new_data: data of the current schema |
670 @param modify: if True, always use "cols modify" table, else try to ALTER tables | 766 @param modify: if True, always use "cols modify" table, else try to ALTER tables |
671 @return: update data, a dictionary with: | 767 @return: update data, a dictionary with: |
672 - 'create': dictionary of tables to create | 768 - 'create': dictionary of tables to create |
726 } | 822 } |
727 | 823 |
728 @defer.inlineCallbacks | 824 @defer.inlineCallbacks |
729 def update2raw(self, update, dev_version=False): | 825 def update2raw(self, update, dev_version=False): |
730 """ Transform update data to raw SQLite statements | 826 """ Transform update data to raw SQLite statements |
827 | |
731 @param update: update data as returned by generateUpdateData | 828 @param update: update data as returned by generateUpdateData |
732 @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database. | 829 @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database. |
733 @return: list of string with SQL statements needed to update the base | 830 @return: list of string with SQL statements needed to update the base |
734 | |
735 """ | 831 """ |
736 ret = self.createData2Raw(update.get('create', {})) | 832 ret = self.createData2Raw(update.get('create', {})) |
737 drop = [] | 833 drop = [] |
738 for table in update.get('delete', tuple()): | 834 for table in update.get('delete', tuple()): |
739 drop.append(self.DROP_SQL % table) | 835 drop.append(self.DROP_SQL % table) |
765 ret.extend(self.insertData2Raw(insert)) | 861 ret.extend(self.insertData2Raw(insert)) |
766 | 862 |
767 specific = update.get('specific', None) | 863 specific = update.get('specific', None) |
768 if specific: | 864 if specific: |
769 cmds = yield getattr(self, specific)() | 865 cmds = yield getattr(self, specific)() |
770 ret.extend(cmds) | 866 ret.extend(cmds or []) |
771 defer.returnValue(ret) | 867 defer.returnValue(ret) |
868 | |
869 @defer.inlineCallbacks | |
870 def update_v3(self): | |
871 """Update database from v2 to v3 (message refactoring)""" | |
872 # XXX: this update do all the messages in one huge transaction | |
873 # this is really memory consuming, but was OK on a reasonably | |
874 # big database for tests. If issues are happening, we can cut it | |
875 # in smaller transactions using LIMIT and by deleting already updated | |
876 # messages | |
877 log.info(u"Database update to v3, this may take a while") | |
878 | |
879 # we need to fix duplicate timestamp, as it can result in conflicts with the new schema | |
880 rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1") | |
881 if rows: | |
882 log.info("fixing duplicate timestamp") | |
883 fixed = [] | |
884 for timestamp, dummy in rows: | |
885 ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,)) | |
886 for idx, (id_,) in enumerate(ids_rows): | |
887 fixed.append(id_) | |
888 yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_)) | |
889 log.info(u"fixed messages with ids {}".format(u', '.join([unicode(id_) for id_ in fixed]))) | |
890 | |
891 def historySchema(txn): | |
892 log.info(u"History schema update") | |
893 txn.execute("ALTER TABLE history RENAME TO tmp_sat_update") | |
894 txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))") | |
895 txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update") | |
896 | |
897 yield self.dbpool.runInteraction(historySchema) | |
898 | |
899 def newTables(txn): | |
900 log.info(u"Creating new tables") | |
901 txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | |
902 txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | |
903 txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | |
904 | |
905 yield self.dbpool.runInteraction(newTables) | |
906 | |
907 log.info(u"inserting new message type") | |
908 yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',)) | |
909 | |
910 log.info(u"messages update") | |
911 rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update") | |
912 total = len(rows) | |
913 | |
914 def updateHistory(txn, queries): | |
915 for query, args in iter(queries): | |
916 txn.execute(query, args) | |
917 | |
918 queries = [] | |
919 for idx, row in enumerate(rows, 1): | |
920 if idx % 1000 == 0 or total - idx == 0: | |
921 log.info("preparing message {}/{}".format(idx, total)) | |
922 id_, timestamp, message, extra = row | |
923 try: | |
924 extra = pickle.loads(str(extra or "")) | |
925 except EOFError: | |
926 extra = {} | |
927 except Exception: | |
928 log.warning(u"Can't handle extra data for message id {}, ignoring it".format(id_)) | |
929 extra = {} | |
930 | |
931 queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message))) | |
932 | |
933 try: | |
934 subject = extra.pop('subject') | |
935 except KeyError: | |
936 pass | |
937 else: | |
938 try: | |
939 subject = subject.decode('utf-8') | |
940 except UnicodeEncodeError: | |
941 log.warning(u"Error while decoding subject, ignoring it") | |
942 del extra['subject'] | |
943 else: | |
944 queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject))) | |
945 | |
946 received_timestamp = extra.pop('timestamp', None) | |
947 try: | |
948 del extra['archive'] | |
949 except KeyError: | |
950 # archive was not used | |
951 pass | |
952 | |
953 queries.append(("UPDATE history SET received_timestamp=?,extra=? WHERE uid=?",(id_, received_timestamp, pickle.dumps(extra, 0)))) | |
954 | |
955 yield self.dbpool.runInteraction(updateHistory, queries) | |
956 | |
957 log.info("Dropping temporary table") | |
958 yield self.dbpool.runQuery("DROP TABLE tmp_sat_update") | |
959 log.info("Database update finished :)") | |
772 | 960 |
773 def update2raw_v2(self): | 961 def update2raw_v2(self): |
774 """Update the database from v1 to v2 (add passwords encryptions): | 962 """Update the database from v1 to v2 (add passwords encryptions): |
963 | |
775 - the XMPP password value is re-used for the profile password (new parameter) | 964 - the XMPP password value is re-used for the profile password (new parameter) |
776 - the profile password is stored hashed | 965 - the profile password is stored hashed |
777 - the XMPP password is stored encrypted, with the profile password as key | 966 - the XMPP password is stored encrypted, with the profile password as key |
778 - as there are no other stored passwords yet, it is enough, otherwise we | 967 - as there are no other stored passwords yet, it is enough, otherwise we |
779 would need to encrypt the other passwords as it's done for XMPP password | 968 would need to encrypt the other passwords as it's done for XMPP password |