comparison src/memory/sqlite.py @ 1955:633b5c21aefd

backend, frontend: messages refactoring (huge commit, not finished): /!\ database schema has been modified, do a backup before updating message have been refactored, here are the main changes: - languages are now handled - all messages have an uid (internal to SàT) - message updating is anticipated - subject is now first class - new naming scheme is used newMessage => messageNew, getHistory => historyGet, sendMessage => messageSend - minimal compatibility refactoring in quick_frontend/Primitivus, better refactoring should follow - threads handling - delayed messages are saved into history - info messages may also be saved in history (e.g. to keep track of people joining/leaving a room) - duplicate messages should be avoided - historyGet return messages in right order, no need to sort again - plugins have been updated to follow new features, some of them need to be reworked (e.g. OTR) - XEP-0203 (Delayed Delivery) is now fully handled in core, the plugin just handle disco and creation of a delay element - /!\ jp and Libervia are currently broken, as some features of Primitivus It has been put in one huge commit to avoid breaking messaging between changes. This is the main part of message refactoring, other commits will follow to take profit of the new features/behaviour.
author Goffi <goffi@goffi.org>
date Tue, 24 May 2016 22:11:04 +0200
parents 2daf7b4c6756
children c73e08094a95
comparison
equal deleted inserted replaced
1943:ccfe45302a5c 1955:633b5c21aefd
25 from sat.memory.crypto import BlockCipher, PasswordHasher 25 from sat.memory.crypto import BlockCipher, PasswordHasher
26 from sat.tools.config import fixConfigOption 26 from sat.tools.config import fixConfigOption
27 from twisted.enterprise import adbapi 27 from twisted.enterprise import adbapi
28 from twisted.internet import defer 28 from twisted.internet import defer
29 from collections import OrderedDict 29 from collections import OrderedDict
30 from time import time
31 import re 30 import re
32 import os.path 31 import os.path
33 import cPickle as pickle 32 import cPickle as pickle
34 import hashlib 33 import hashlib
35 34
36 CURRENT_DB_VERSION = 2 35 CURRENT_DB_VERSION = 3
37 36
38 # XXX: DATABASE schemas are used in the following way: 37 # XXX: DATABASE schemas are used in the following way:
39 # - 'current' key is for the actual database schema, for a new base 38 # - 'current' key is for the actual database schema, for a new base
40 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update 39 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update
41 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed 40 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed
47 DATABASE_SCHEMAS = { 46 DATABASE_SCHEMAS = {
48 "current": {'CREATE': OrderedDict(( 47 "current": {'CREATE': OrderedDict((
49 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"), 48 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"),
50 ("UNIQUE (name)",))), 49 ("UNIQUE (name)",))),
51 ('message_types', (("type TEXT PRIMARY KEY",), 50 ('message_types', (("type TEXT PRIMARY KEY",),
52 tuple())), 51 tuple())),
53 ('history', (("id INTEGER PRIMARY KEY ASC", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", "timestamp DATETIME", "message TEXT", "type TEXT", "extra BLOB"), 52 ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT",
54 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)"))), 53 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception)
54 "type TEXT", "extra BLOB"),
55 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)",
56 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed cones)
57 ))),
58 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"),
59 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
60 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"),
61 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
62 ('thread', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "thread_id TEXT", "parent_id TEXT"),("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
55 ('param_gen', (("category TEXT", "name TEXT", "value TEXT"), 63 ('param_gen', (("category TEXT", "name TEXT", "value TEXT"),
56 ("PRIMARY KEY (category,name)",))), 64 ("PRIMARY KEY (category,name)",))),
57 ('param_ind', (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"), 65 ('param_ind', (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"),
58 ("PRIMARY KEY (category,name,profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), 66 ("PRIMARY KEY (category,name,profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
59 ('private_gen', (("namespace TEXT", "key TEXT", "value TEXT"), 67 ('private_gen', (("namespace TEXT", "key TEXT", "value TEXT"),
64 ("PRIMARY KEY (namespace, key)",))), 72 ("PRIMARY KEY (namespace, key)",))),
65 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"), 73 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"),
66 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))) 74 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")))
67 )), 75 )),
68 'INSERT': OrderedDict(( 76 'INSERT': OrderedDict((
69 ('message_types', (("'chat'",), ("'error'",), ("'groupchat'",), ("'headline'",), ("'normal'",))), 77 ('message_types', (("'chat'",),
78 ("'error'",),
79 ("'groupchat'",),
80 ("'headline'",),
81 ("'normal'",),
82 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC
83 )),
70 )), 84 )),
71 }, 85 },
86 3: {'specific': 'update_v3'
87 },
72 2: {'specific': 'update2raw_v2' 88 2: {'specific': 'update2raw_v2'
73 }, 89 },
74 1: {'cols create': {'history': ('extra BLOB',)} 90 1: {'cols create': {'history': ('extra BLOB',)},
75 }, 91 },
76 } 92 }
77 93
94 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field
95 # this is specific to this sqlite storage and for now only used for received_timestamp
96 # because this value is stored in a separate field
78 97
79 class SqliteStorage(object): 98 class SqliteStorage(object):
80 """This class manage storage with Sqlite database""" 99 """This class manage storage with Sqlite database"""
81 100
82 def __init__(self, db_filename, sat_version): 101 def __init__(self, db_filename, sat_version):
112 131
113 def commitStatements(statements): 132 def commitStatements(statements):
114 133
115 if statements is None: 134 if statements is None:
116 return defer.succeed(None) 135 return defer.succeed(None)
117 log.debug(u"===== COMMITING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) 136 log.debug(u"===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
118 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) 137 d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
119 return d 138 return d
120 139
121 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql()) 140 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql())
122 init_defer.addCallback(commitStatements) 141 init_defer.addCallback(commitStatements)
249 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) 268 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value))
250 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) 269 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile})))
251 return d 270 return d
252 271
253 #History 272 #History
254 def addToHistory(self, from_jid, to_jid, message, _type='chat', extra=None, timestamp=None, profile=None): 273
274 def _addToHistoryCb(self, dummy, data):
275 # Message metadata were successfuly added to history
276 # now we can add message and subject
277 uid = data['uid']
278 for key in ('message', 'subject'):
279 for lang, value in data[key].iteritems():
280 d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key),
281 (uid, value, lang or None))
282 d.addErrback(lambda dummy: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format(
283 key=key, uid=uid, lang=lang, value=value))))
284 try:
285 thread = data['extra']['thread']
286 except KeyError:
287 pass
288 else:
289 thread_parent = data['extra'].get('thread_parent')
290 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)",
291 (uid, thread, thread_parent))
292 d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format(
293 uid=uid, thread=thread, parent=thread_parent))))
294
295 def _addToHistoryEb(self, failure, data):
296 try:
297 sqlite_msg = failure.value.args[0]
298 except (AttributeError, IndexError):
299 raise failure
300 else:
301 if "UNIQUE constraint failed" in sqlite_msg:
302 log.debug(u"Message is already in history, not storing it again")
303 else:
304 log.error(u"Can't store message in history: {}".format(failure))
305
306 def addToHistory(self, data, profile):
255 """Store a new message in history 307 """Store a new message in history
256 @param from_jid: full source JID 308
257 @param to_jid: full dest JID 309 @param data(dict): message data as build by SatMessageProtocol.onMessage
258 @param message: message 310 """
259 @param _type: message type (see RFC 6121 §5.2.2) 311 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0)
260 @param extra: dictionary (keys and values are unicode) of extra message data 312 from_jid = data['from']
261 @param timestamp: timestamp in seconds since epoch, or None to use current time 313 to_jid = data['to']
262 """ 314 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
263 assert(profile) 315 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra))
264 if extra is None: 316 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data])
265 extra = {} 317 d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})"
266 extra_ = pickle.dumps({k: v.encode('utf-8') for k, v in extra.items()}, 0).decode('utf-8') 318 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))))
267 d = self.dbpool.runQuery("INSERT INTO history(source, source_res, dest, dest_res, timestamp, message, type, extra, profile_id) VALUES (?,?,?,?,?,?,?,?,?)", 319 return d
268 (from_jid.userhost(), from_jid.resource, to_jid.userhost(), to_jid.resource, timestamp or time(), 320
269 message, _type, extra_, self.profiles[profile])) 321 def sqliteHistoryToList(self, query_result):
270 d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [%(from_jid)s] to [%(to_jid)s] ==> [%(message)s]" % 322 """Get SQL query result and return a list of message data dicts"""
271 {"from_jid": from_jid.full(), "to_jid": to_jid.full(), "message": message}))) 323 result = []
272 return d 324 current = {'uid': None}
273 325 for row in reversed(query_result):
274 def getHistory(self, from_jid, to_jid, limit=None, between=True, search=None, profile=None): 326 uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
327 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row
328 if uid != current['uid']:
329 # new message
330 try:
331 extra = pickle.loads(str(extra or ""))
332 except EOFError:
333 extra = {}
334 current = {
335 'from': "%s/%s" % (source, source_res) if source_res else source,
336 'to': "%s/%s" % (dest, dest_res) if dest_res else dest,
337 'uid': uid,
338 'message': {},
339 'subject': {},
340 'type': type_,
341 'extra': extra,
342 'timestamp': timestamp,
343 }
344 if update_uid is not None:
345 current['extra']['update_uid'] = update_uid
346 if received_timestamp is not None:
347 current['extra']['received_timestamp'] = str(received_timestamp)
348 result.append(current)
349
350 if message is not None:
351 current['message'][message_lang or ''] = message
352
353 if subject is not None:
354 current['subject'][subject_lang or ''] = subject
355
356 if thread is not None:
357 current_extra = current['extra']
358 current_extra['thread'] = thread
359 if thread_parent is not None:
360 current_extra['thread_parent'] = thread_parent
361 else:
362 if thread_parent is not None:
363 log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})"
364 .format(uid=uid, parent=thread_parent))
365
366 return result
367
368 def listDict2listTuple(self, messages_data):
369 """Return a list of tuple as used in bridge from a list of messages data"""
370 ret = []
371 for m in messages_data:
372 ret.append((m['uid'], m['timestamp'], m['from'], m['to'], m['message'], m['subject'], m['type'], m['extra']))
373 return ret
374
375 def historyGet(self, from_jid, to_jid, limit=None, between=True, search=None, profile=None):
275 """Retrieve messages in history 376 """Retrieve messages in history
377
276 @param from_jid (JID): source JID (full, or bare for catchall) 378 @param from_jid (JID): source JID (full, or bare for catchall)
277 @param to_jid (JID): dest JID (full, or bare for catchall) 379 @param to_jid (JID): dest JID (full, or bare for catchall)
278 @param limit (int): maximum number of messages to get: 380 @param limit (int): maximum number of messages to get:
279 - 0 for no message (returns the empty list) 381 - 0 for no message (returns the empty list)
280 - None for unlimited 382 - None for unlimited
281 @param between (bool): confound source and dest (ignore the direction) 383 @param between (bool): confound source and dest (ignore the direction)
282 @param search (str): pattern to filter the history results 384 @param search (unicode): pattern to filter the history results
283 @param profile (str): %(doc_profile)s 385 @param profile (unicode): %(doc_profile)s
284 @return: list of tuple as in http://wiki.goffi.org/wiki/Bridge_API#getHistory 386 @return: list of tuple as in [messageNew]
285 """ 387 """
286 assert(profile) 388 assert profile
287 if limit == 0: 389 if limit == 0:
288 return defer.succeed([]) 390 return defer.succeed([])
289 391
290 def sqliteToList(query_result): 392 query_parts = ["SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
291 query_result.reverse() 393 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\
292 result = [] 394 FROM history LEFT JOIN message ON history.uid = message.history_uid\
293 for row in query_result: 395 LEFT JOIN subject ON history.uid=subject.history_uid\
294 timestamp, source, source_res, dest, dest_res, message, type_, extra_raw = row 396 LEFT JOIN thread ON history.uid=thread.history_uid\
295 try: 397 WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here
296 extra = pickle.loads(str(extra_raw or ""))
297 except EOFError:
298 extra = {}
299 result.append((timestamp, "%s/%s" % (source, source_res) if source_res else source,
300 "%s/%s" % (dest, dest_res) if dest_res else dest,
301 message, type_, extra))
302 return result
303
304 query_parts = ["SELECT timestamp, source, source_res, dest, dest_res, message, type, extra FROM history WHERE profile_id=? AND"]
305 values = [self.profiles[profile]] 398 values = [self.profiles[profile]]
306 399
307 def test_jid(type_, _jid): 400 def test_jid(type_, _jid):
308 values.append(_jid.userhost()) 401 values.append(_jid.userhost())
309 if _jid.resource: 402 if _jid.resource:
322 if search: 415 if search:
323 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html 416 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html
324 query_parts.append("AND message GLOB ?") 417 query_parts.append("AND message GLOB ?")
325 values.append("*%s*" % search) 418 values.append("*%s*" % search)
326 419
327 query_parts.append("ORDER BY timestamp DESC") 420 query_parts.append("ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList
328 421 # we use DESC here so LIMIT keep the last messages
329 if limit is not None: 422 if limit is not None:
330 query_parts.append("LIMIT ?") 423 query_parts.append("LIMIT ?")
331 values.append(limit) 424 values.append(limit)
332 425
333 d = self.dbpool.runQuery(" ".join(query_parts), values) 426 d = self.dbpool.runQuery(" ".join(query_parts), values)
334 return d.addCallback(sqliteToList) 427 d.addCallback(self.sqliteHistoryToList)
428 d.addCallback(self.listDict2listTuple)
429 return d
335 430
336 #Private values 431 #Private values
337 def loadGenPrivates(self, private_gen, namespace): 432 def loadGenPrivates(self, private_gen, namespace):
338 """Load general private values 433 """Load general private values
339 @param private_gen: dictionary to fill 434 @param private_gen: dictionary to fill
663 schema_dict[table] = (tuple(col_defs), tuple(constraints)) 758 schema_dict[table] = (tuple(col_defs), tuple(constraints))
664 return schema_dict 759 return schema_dict
665 760
666 def generateUpdateData(self, old_data, new_data, modify=False): 761 def generateUpdateData(self, old_data, new_data, modify=False):
667 """ Generate data for automatic update between two schema data 762 """ Generate data for automatic update between two schema data
763
668 @param old_data: data of the former schema (which must be updated) 764 @param old_data: data of the former schema (which must be updated)
669 @param new_data: data of the current schema 765 @param new_data: data of the current schema
670 @param modify: if True, always use "cols modify" table, else try to ALTER tables 766 @param modify: if True, always use "cols modify" table, else try to ALTER tables
671 @return: update data, a dictionary with: 767 @return: update data, a dictionary with:
672 - 'create': dictionary of tables to create 768 - 'create': dictionary of tables to create
726 } 822 }
727 823
728 @defer.inlineCallbacks 824 @defer.inlineCallbacks
729 def update2raw(self, update, dev_version=False): 825 def update2raw(self, update, dev_version=False):
730 """ Transform update data to raw SQLite statements 826 """ Transform update data to raw SQLite statements
827
731 @param update: update data as returned by generateUpdateData 828 @param update: update data as returned by generateUpdateData
732 @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database. 829 @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database.
733 @return: list of string with SQL statements needed to update the base 830 @return: list of string with SQL statements needed to update the base
734
735 """ 831 """
736 ret = self.createData2Raw(update.get('create', {})) 832 ret = self.createData2Raw(update.get('create', {}))
737 drop = [] 833 drop = []
738 for table in update.get('delete', tuple()): 834 for table in update.get('delete', tuple()):
739 drop.append(self.DROP_SQL % table) 835 drop.append(self.DROP_SQL % table)
765 ret.extend(self.insertData2Raw(insert)) 861 ret.extend(self.insertData2Raw(insert))
766 862
767 specific = update.get('specific', None) 863 specific = update.get('specific', None)
768 if specific: 864 if specific:
769 cmds = yield getattr(self, specific)() 865 cmds = yield getattr(self, specific)()
770 ret.extend(cmds) 866 ret.extend(cmds or [])
771 defer.returnValue(ret) 867 defer.returnValue(ret)
868
869 @defer.inlineCallbacks
870 def update_v3(self):
871 """Update database from v2 to v3 (message refactoring)"""
872 # XXX: this update do all the messages in one huge transaction
873 # this is really memory consuming, but was OK on a reasonably
874 # big database for tests. If issues are happening, we can cut it
875 # in smaller transactions using LIMIT and by deleting already updated
876 # messages
877 log.info(u"Database update to v3, this may take a while")
878
879 # we need to fix duplicate timestamp, as it can result in conflicts with the new schema
880 rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1")
881 if rows:
882 log.info("fixing duplicate timestamp")
883 fixed = []
884 for timestamp, dummy in rows:
885 ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,))
886 for idx, (id_,) in enumerate(ids_rows):
887 fixed.append(id_)
888 yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_))
889 log.info(u"fixed messages with ids {}".format(u', '.join([unicode(id_) for id_ in fixed])))
890
891 def historySchema(txn):
892 log.info(u"History schema update")
893 txn.execute("ALTER TABLE history RENAME TO tmp_sat_update")
894 txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))")
895 txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update")
896
897 yield self.dbpool.runInteraction(historySchema)
898
899 def newTables(txn):
900 log.info(u"Creating new tables")
901 txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
902 txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
903 txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
904
905 yield self.dbpool.runInteraction(newTables)
906
907 log.info(u"inserting new message type")
908 yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',))
909
910 log.info(u"messages update")
911 rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update")
912 total = len(rows)
913
914 def updateHistory(txn, queries):
915 for query, args in iter(queries):
916 txn.execute(query, args)
917
918 queries = []
919 for idx, row in enumerate(rows, 1):
920 if idx % 1000 == 0 or total - idx == 0:
921 log.info("preparing message {}/{}".format(idx, total))
922 id_, timestamp, message, extra = row
923 try:
924 extra = pickle.loads(str(extra or ""))
925 except EOFError:
926 extra = {}
927 except Exception:
928 log.warning(u"Can't handle extra data for message id {}, ignoring it".format(id_))
929 extra = {}
930
931 queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message)))
932
933 try:
934 subject = extra.pop('subject')
935 except KeyError:
936 pass
937 else:
938 try:
939 subject = subject.decode('utf-8')
940 except UnicodeEncodeError:
941 log.warning(u"Error while decoding subject, ignoring it")
942 del extra['subject']
943 else:
944 queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject)))
945
946 received_timestamp = extra.pop('timestamp', None)
947 try:
948 del extra['archive']
949 except KeyError:
950 # archive was not used
951 pass
952
953 queries.append(("UPDATE history SET received_timestamp=?,extra=? WHERE uid=?",(id_, received_timestamp, pickle.dumps(extra, 0))))
954
955 yield self.dbpool.runInteraction(updateHistory, queries)
956
957 log.info("Dropping temporary table")
958 yield self.dbpool.runQuery("DROP TABLE tmp_sat_update")
959 log.info("Database update finished :)")
772 960
773 def update2raw_v2(self): 961 def update2raw_v2(self):
774 """Update the database from v1 to v2 (add passwords encryptions): 962 """Update the database from v1 to v2 (add passwords encryptions):
963
775 - the XMPP password value is re-used for the profile password (new parameter) 964 - the XMPP password value is re-used for the profile password (new parameter)
776 - the profile password is stored hashed 965 - the profile password is stored hashed
777 - the XMPP password is stored encrypted, with the profile password as key 966 - the XMPP password is stored encrypted, with the profile password as key
778 - as there are no other stored passwords yet, it is enough, otherwise we 967 - as there are no other stored passwords yet, it is enough, otherwise we
779 would need to encrypt the other passwords as it's done for XMPP password 968 would need to encrypt the other passwords as it's done for XMPP password