Mercurial > libervia-backend
comparison sat/memory/sqlite.py @ 2562:26edcf3a30eb
core, setup: huge cleaning:
- moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention
- move twisted directory to root
- removed all hacks from setup.py, and added missing dependencies, it is now clean
- use https URL for website in setup.py
- removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed
- renamed sat.sh to sat and fixed its installation
- added python_requires to specify Python version needed
- replaced glib2reactor which use deprecated code by gtk3reactor
sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Apr 2018 19:44:50 +0200 |
parents | src/memory/sqlite.py@35d591086974 |
children | 310e41bd6666 |
comparison
equal
deleted
inserted
replaced
2561:bd30dc3ffe5a | 2562:26edcf3a30eb |
---|---|
1 #!/usr/bin/env python2 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from sat.core.i18n import _ | |
21 from sat.core.constants import Const as C | |
22 from sat.core import exceptions | |
23 from sat.core.log import getLogger | |
24 log = getLogger(__name__) | |
25 from sat.memory.crypto import BlockCipher, PasswordHasher | |
26 from sat.tools.config import fixConfigOption | |
27 from twisted.enterprise import adbapi | |
28 from twisted.internet import defer | |
29 from twisted.words.protocols.jabber import jid | |
30 from twisted.python import failure | |
31 from collections import OrderedDict | |
32 import re | |
33 import os.path | |
34 import cPickle as pickle | |
35 import hashlib | |
36 import sqlite3 | |
37 import json | |
38 | |
39 CURRENT_DB_VERSION = 5 | |
40 | |
41 # XXX: DATABASE schemas are used in the following way: | |
42 # - 'current' key is for the actual database schema, for a new base | |
43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update | |
44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed | |
45 # a 'current' data dict can contains the keys: | |
46 # - 'CREATE': it contains an Ordered dict with table to create as keys, and a len 2 tuple as value, where value[0] are the columns definitions and value[1] are the table constraints | |
47 # - 'INSERT': it contains an Ordered dict with table where values have to be inserted, and many tuples containing values to insert in the order of the rows (#TODO: manage named columns) | |
48 # an update data dict (the ones with a number) can contains the keys 'create', 'delete', 'cols create', 'cols delete', 'cols modify', 'insert' or 'specific'. See Updater.generateUpdateData for more infos. This method can be used to autogenerate update_data, to ease the work of the developers. | |
49 # TODO: indexes need to be improved | |
50 | |
51 DATABASE_SCHEMAS = { | |
52 "current": {'CREATE': OrderedDict(( | |
53 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"), | |
54 ("UNIQUE (name)",))), | |
55 ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"), | |
56 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))), | |
57 ('message_types', (("type TEXT PRIMARY KEY",), | |
58 ())), | |
59 ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", | |
60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) | |
61 "type TEXT", "extra BLOB"), | |
62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", | |
63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones) | |
64 ))), | |
65 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"), | |
66 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), | |
67 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"), | |
68 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), | |
69 ('thread', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "thread_id TEXT", "parent_id TEXT"),("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), | |
70 ('param_gen', (("category TEXT", "name TEXT", "value TEXT"), | |
71 ("PRIMARY KEY (category,name)",))), | |
72 ('param_ind', (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"), | |
73 ("PRIMARY KEY (category,name,profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), | |
74 ('private_gen', (("namespace TEXT", "key TEXT", "value TEXT"), | |
75 ("PRIMARY KEY (namespace, key)",))), | |
76 ('private_ind', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"), | |
77 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), | |
78 ('private_gen_bin', (("namespace TEXT", "key TEXT", "value BLOB"), | |
79 ("PRIMARY KEY (namespace, key)",))), | |
80 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"), | |
81 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), | |
82 ('files', (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", | |
83 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( | |
84 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), | |
85 "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER", | |
86 "namespace TEXT", "mime_type TEXT", | |
87 "created DATETIME NOT NULL", "modified DATETIME", | |
88 "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"), | |
89 ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), | |
90 )), | |
91 'INSERT': OrderedDict(( | |
92 ('message_types', (("'chat'",), | |
93 ("'error'",), | |
94 ("'groupchat'",), | |
95 ("'headline'",), | |
96 ("'normal'",), | |
97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC | |
98 )), | |
99 )), | |
100 }, | |
101 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", | |
102 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( | |
103 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), | |
104 "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER", | |
105 "namespace TEXT", "mime_type TEXT", | |
106 "created DATETIME NOT NULL", "modified DATETIME", | |
107 "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"), | |
108 ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))}, | |
109 }, | |
110 4: {'create': {'components': (('profile_id INTEGER PRIMARY KEY', 'entry_point TEXT NOT NULL'), ('FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE',))} | |
111 }, | |
112 3: {'specific': 'update_v3' | |
113 }, | |
114 2: {'specific': 'update2raw_v2' | |
115 }, | |
116 1: {'cols create': {'history': ('extra BLOB',)}, | |
117 }, | |
118 } | |
119 | |
120 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field | |
121 # this is specific to this sqlite storage and for now only used for received_timestamp | |
122 # because this value is stored in a separate field | |
123 | |
124 | |
125 class ConnectionPool(adbapi.ConnectionPool): | |
126 # Workaround to avoid IntegrityError causing (i)pdb to be launched in debug mode | |
127 def _runQuery(self, trans, *args, **kw): | |
128 retry = kw.pop('query_retry', 6) | |
129 try: | |
130 trans.execute(*args, **kw) | |
131 except sqlite3.IntegrityError as e: | |
132 raise failure.Failure(e) | |
133 except Exception as e: | |
134 # FIXME: in case of error, we retry a couple of times | |
135 # this is a workaround, we need to move to better | |
136 # Sqlite integration, probably with high level library | |
137 retry -= 1 | |
138 if retry == 0: | |
139 log.error(_(u'too many db tries, we abandon! Error message: {msg}').format( | |
140 msg = e)) | |
141 raise e | |
142 log.warning(_(u'exception while running query, retrying ({try_}): {msg}').format( | |
143 try_ = 6 - retry, | |
144 msg = e)) | |
145 kw['query_retry'] = retry | |
146 return self._runQuery(trans, *args, **kw) | |
147 return trans.fetchall() | |
148 | |
149 | |
150 class SqliteStorage(object): | |
151 """This class manage storage with Sqlite database""" | |
152 | |
153 def __init__(self, db_filename, sat_version): | |
154 """Connect to the given database | |
155 | |
156 @param db_filename: full path to the Sqlite database | |
157 """ | |
158 self.initialized = defer.Deferred() # triggered when memory is fully initialised and ready | |
159 self.profiles = {} # we keep cache for the profiles (key: profile name, value: profile id) | |
160 | |
161 log.info(_("Connecting database")) | |
162 new_base = not os.path.exists(db_filename) # do we have to create the database ? | |
163 if new_base: # the dir may not exist if it's not the XDG recommended one | |
164 dir_ = os.path.dirname(db_filename) | |
165 if not os.path.exists(dir_): | |
166 os.makedirs(dir_, 0700) | |
167 | |
168 def foreignKeysOn(sqlite): | |
169 sqlite.execute('PRAGMA foreign_keys = ON') | |
170 | |
171 self.dbpool = ConnectionPool("sqlite3", db_filename, cp_openfun=foreignKeysOn, check_same_thread=False, timeout=15) | |
172 | |
173 def getNewBaseSql(): | |
174 log.info(_("The database is new, creating the tables")) | |
175 database_creation = ["PRAGMA user_version=%d" % CURRENT_DB_VERSION] | |
176 database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE'])) | |
177 database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT'])) | |
178 return database_creation | |
179 | |
180 def getUpdateSql(): | |
181 updater = Updater(self.dbpool, sat_version) | |
182 return updater.checkUpdates() | |
183 | |
184 def commitStatements(statements): | |
185 | |
186 if statements is None: | |
187 return defer.succeed(None) | |
188 log.debug(u"===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) | |
189 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) | |
190 return d | |
191 | |
192 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done | |
193 | |
194 init_defer = defer.succeed(None) | |
195 | |
196 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql()) | |
197 init_defer.addCallback(commitStatements) | |
198 | |
199 def fillProfileCache(ignore): | |
200 d = self.dbpool.runQuery("SELECT profile_id, entry_point FROM components").addCallback(self._cacheComponentsAndProfiles) | |
201 d.chainDeferred(self.initialized) | |
202 | |
203 init_defer.addCallback(fillProfileCache) | |
204 | |
205 def _updateDb(self, interaction, statements): | |
206 for statement in statements: | |
207 interaction.execute(statement) | |
208 | |
209 ## Profiles | |
210 | |
211 def _cacheComponentsAndProfiles(self, components_result): | |
212 """Get components results and send requests profiles | |
213 | |
214 they will be both put in cache in _profilesCache | |
215 """ | |
216 return self.dbpool.runQuery("SELECT name,id FROM profiles").addCallback( | |
217 self._cacheComponentsAndProfiles2, components_result) | |
218 | |
219 def _cacheComponentsAndProfiles2(self, profiles_result, components): | |
220 """Fill the profiles cache | |
221 | |
222 @param profiles_result: result of the sql profiles query | |
223 """ | |
224 self.components = dict(components) | |
225 for profile in profiles_result: | |
226 name, id_ = profile | |
227 self.profiles[name] = id_ | |
228 | |
229 def getProfilesList(self): | |
230 """"Return list of all registered profiles""" | |
231 return self.profiles.keys() | |
232 | |
233 def hasProfile(self, profile_name): | |
234 """return True if profile_name exists | |
235 | |
236 @param profile_name: name of the profile to check | |
237 """ | |
238 return profile_name in self.profiles | |
239 | |
240 def profileIsComponent(self, profile_name): | |
241 try: | |
242 return self.profiles[profile_name] in self.components | |
243 except KeyError: | |
244 raise exceptions.NotFound(u"the requested profile doesn't exists") | |
245 | |
246 def getEntryPoint(self, profile_name): | |
247 try: | |
248 return self.components[self.profiles[profile_name]] | |
249 except KeyError: | |
250 raise exceptions.NotFound(u"the requested profile doesn't exists or is not a component") | |
251 | |
252 def createProfile(self, name, component=None): | |
253 """Create a new profile | |
254 | |
255 @param name(unicode): name of the profile | |
256 @param component(None, unicode): if not None, must point to a component entry point | |
257 @return: deferred triggered once profile is actually created | |
258 """ | |
259 | |
260 def getProfileId(ignore): | |
261 return self.dbpool.runQuery("SELECT (id) FROM profiles WHERE name = ?", (name, )) | |
262 | |
263 def setComponent(profile_id): | |
264 id_ = profile_id[0][0] | |
265 d_comp = self.dbpool.runQuery("INSERT INTO components(profile_id, entry_point) VALUES (?, ?)", (id_, component)) | |
266 d_comp.addCallback(lambda dummy: profile_id) | |
267 return d_comp | |
268 | |
269 def profile_created(profile_id): | |
270 id_= profile_id[0][0] | |
271 self.profiles[name] = id_ # we synchronise the cache | |
272 | |
273 d = self.dbpool.runQuery("INSERT INTO profiles(name) VALUES (?)", (name, )) | |
274 d.addCallback(getProfileId) | |
275 if component is not None: | |
276 d.addCallback(setComponent) | |
277 d.addCallback(profile_created) | |
278 return d | |
279 | |
280 def deleteProfile(self, name): | |
281 """Delete profile | |
282 | |
283 @param name: name of the profile | |
284 @return: deferred triggered once profile is actually deleted | |
285 """ | |
286 def deletionError(failure_): | |
287 log.error(_(u"Can't delete profile [%s]") % name) | |
288 return failure_ | |
289 | |
290 def delete(txn): | |
291 profile_id = self.profiles.pop(name) | |
292 txn.execute("DELETE FROM profiles WHERE name = ?", (name,)) | |
293 # FIXME: the following queries should be done by the ON DELETE CASCADE | |
294 # but it seems they are not, so we explicitly do them by security | |
295 # this need more investigation | |
296 txn.execute("DELETE FROM history WHERE profile_id = ?", (profile_id,)) | |
297 txn.execute("DELETE FROM param_ind WHERE profile_id = ?", (profile_id,)) | |
298 txn.execute("DELETE FROM private_ind WHERE profile_id = ?", (profile_id,)) | |
299 txn.execute("DELETE FROM private_ind_bin WHERE profile_id = ?", (profile_id,)) | |
300 txn.execute("DELETE FROM components WHERE profile_id = ?", (profile_id,)) | |
301 return None | |
302 | |
303 d = self.dbpool.runInteraction(delete) | |
304 d.addCallback(lambda ignore: log.info(_("Profile [%s] deleted") % name)) | |
305 d.addErrback(deletionError) | |
306 return d | |
307 | |
308 ## Params | |
309 def loadGenParams(self, params_gen): | |
310 """Load general parameters | |
311 | |
312 @param params_gen: dictionary to fill | |
313 @return: deferred | |
314 """ | |
315 | |
316 def fillParams(result): | |
317 for param in result: | |
318 category, name, value = param | |
319 params_gen[(category, name)] = value | |
320 log.debug(_(u"loading general parameters from database")) | |
321 return self.dbpool.runQuery("SELECT category,name,value FROM param_gen").addCallback(fillParams) | |
322 | |
323 def loadIndParams(self, params_ind, profile): | |
324 """Load individual parameters | |
325 | |
326 @param params_ind: dictionary to fill | |
327 @param profile: a profile which *must* exist | |
328 @return: deferred | |
329 """ | |
330 | |
331 def fillParams(result): | |
332 for param in result: | |
333 category, name, value = param | |
334 params_ind[(category, name)] = value | |
335 log.debug(_(u"loading individual parameters from database")) | |
336 d = self.dbpool.runQuery("SELECT category,name,value FROM param_ind WHERE profile_id=?", (self.profiles[profile], )) | |
337 d.addCallback(fillParams) | |
338 return d | |
339 | |
340 def getIndParam(self, category, name, profile): | |
341 """Ask database for the value of one specific individual parameter | |
342 | |
343 @param category: category of the parameter | |
344 @param name: name of the parameter | |
345 @param profile: %(doc_profile)s | |
346 @return: deferred | |
347 """ | |
348 d = self.dbpool.runQuery("SELECT value FROM param_ind WHERE category=? AND name=? AND profile_id=?", (category, name, self.profiles[profile])) | |
349 d.addCallback(self.__getFirstResult) | |
350 return d | |
351 | |
352 def setGenParam(self, category, name, value): | |
353 """Save the general parameters in database | |
354 | |
355 @param category: category of the parameter | |
356 @param name: name of the parameter | |
357 @param value: value to set | |
358 @return: deferred""" | |
359 d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value)) | |
360 d.addErrback(lambda ignore: log.error(_(u"Can't set general parameter (%(category)s/%(name)s) in database" % {"category": category, "name": name}))) | |
361 return d | |
362 | |
363 def setIndParam(self, category, name, value, profile): | |
364 """Save the individual parameters in database | |
365 | |
366 @param category: category of the parameter | |
367 @param name: name of the parameter | |
368 @param value: value to set | |
369 @param profile: a profile which *must* exist | |
370 @return: deferred | |
371 """ | |
372 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) | |
373 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) | |
374 return d | |
375 | |
376 ## History | |
377 | |
378 def _addToHistoryCb(self, dummy, data): | |
379 # Message metadata were successfuly added to history | |
380 # now we can add message and subject | |
381 uid = data['uid'] | |
382 for key in ('message', 'subject'): | |
383 for lang, value in data[key].iteritems(): | |
384 d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key), | |
385 (uid, value, lang or None)) | |
386 d.addErrback(lambda dummy: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format( | |
387 key=key, uid=uid, lang=lang, value=value)))) | |
388 try: | |
389 thread = data['extra']['thread'] | |
390 except KeyError: | |
391 pass | |
392 else: | |
393 thread_parent = data['extra'].get('thread_parent') | |
394 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", | |
395 (uid, thread, thread_parent)) | |
396 d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format( | |
397 uid=uid, thread=thread, parent=thread_parent)))) | |
398 | |
399 def _addToHistoryEb(self, failure_, data): | |
400 failure_.trap(sqlite3.IntegrityError) | |
401 sqlite_msg = failure_.value.args[0] | |
402 if "UNIQUE constraint failed" in sqlite_msg: | |
403 log.debug(u"message {} is already in history, not storing it again".format(data['uid'])) | |
404 if 'received_timestamp' not in data: | |
405 log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data)) | |
406 # we cancel message to avoid sending duplicate message to frontends | |
407 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) | |
408 else: | |
409 log.error(u"Can't store message in history: {}".format(failure_)) | |
410 | |
411 def _logHistoryError(self, failure_, from_jid, to_jid, data): | |
412 if failure_.check(exceptions.CancelError): | |
413 # we propagate CancelError to avoid sending message to frontends | |
414 raise failure_ | |
415 log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})" | |
416 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))) | |
417 | |
418 def addToHistory(self, data, profile): | |
419 """Store a new message in history | |
420 | |
421 @param data(dict): message data as build by SatMessageProtocol.onMessage | |
422 """ | |
423 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) | |
424 from_jid = data['from'] | |
425 to_jid = data['to'] | |
426 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", | |
427 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) | |
428 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) | |
429 d.addErrback(self._logHistoryError, from_jid, to_jid, data) | |
430 return d | |
431 | |
432 def sqliteHistoryToList(self, query_result): | |
433 """Get SQL query result and return a list of message data dicts""" | |
434 result = [] | |
435 current = {'uid': None} | |
436 for row in reversed(query_result): | |
437 uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ | |
438 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row | |
439 if uid != current['uid']: | |
440 # new message | |
441 try: | |
442 extra = pickle.loads(str(extra or "")) | |
443 except EOFError: | |
444 extra = {} | |
445 current = { | |
446 'from': "%s/%s" % (source, source_res) if source_res else source, | |
447 'to': "%s/%s" % (dest, dest_res) if dest_res else dest, | |
448 'uid': uid, | |
449 'message': {}, | |
450 'subject': {}, | |
451 'type': type_, | |
452 'extra': extra, | |
453 'timestamp': timestamp, | |
454 } | |
455 if update_uid is not None: | |
456 current['extra']['update_uid'] = update_uid | |
457 if received_timestamp is not None: | |
458 current['extra']['received_timestamp'] = str(received_timestamp) | |
459 result.append(current) | |
460 | |
461 if message is not None: | |
462 current['message'][message_lang or ''] = message | |
463 | |
464 if subject is not None: | |
465 current['subject'][subject_lang or ''] = subject | |
466 | |
467 if thread is not None: | |
468 current_extra = current['extra'] | |
469 current_extra['thread'] = thread | |
470 if thread_parent is not None: | |
471 current_extra['thread_parent'] = thread_parent | |
472 else: | |
473 if thread_parent is not None: | |
474 log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})" | |
475 .format(uid=uid, parent=thread_parent)) | |
476 | |
477 return result | |
478 | |
479 def listDict2listTuple(self, messages_data): | |
480 """Return a list of tuple as used in bridge from a list of messages data""" | |
481 ret = [] | |
482 for m in messages_data: | |
483 ret.append((m['uid'], m['timestamp'], m['from'], m['to'], m['message'], m['subject'], m['type'], m['extra'])) | |
484 return ret | |
485 | |
486 def historyGet(self, from_jid, to_jid, limit=None, between=True, filters=None, profile=None): | |
487 """Retrieve messages in history | |
488 | |
489 @param from_jid (JID): source JID (full, or bare for catchall) | |
490 @param to_jid (JID): dest JID (full, or bare for catchall) | |
491 @param limit (int): maximum number of messages to get: | |
492 - 0 for no message (returns the empty list) | |
493 - None for unlimited | |
494 @param between (bool): confound source and dest (ignore the direction) | |
495 @param search (unicode): pattern to filter the history results | |
496 @param profile (unicode): %(doc_profile)s | |
497 @return: list of tuple as in [messageNew] | |
498 """ | |
499 assert profile | |
500 if filters is None: | |
501 filters = {} | |
502 if limit == 0: | |
503 return defer.succeed([]) | |
504 | |
505 query_parts = [u"SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ | |
506 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ | |
507 FROM history LEFT JOIN message ON history.uid = message.history_uid\ | |
508 LEFT JOIN subject ON history.uid=subject.history_uid\ | |
509 LEFT JOIN thread ON history.uid=thread.history_uid\ | |
510 WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here | |
511 values = [self.profiles[profile]] | |
512 | |
513 def test_jid(type_, _jid): | |
514 values.append(_jid.userhost()) | |
515 if _jid.resource: | |
516 values.append(_jid.resource) | |
517 return u'(%s=? AND %s_res=?)' % (type_, type_) | |
518 return u'%s=?' % (type_, ) | |
519 | |
520 if between: | |
521 query_parts.append(u"((%s AND %s) OR (%s AND %s))" % (test_jid('source', from_jid), | |
522 test_jid('dest', to_jid), | |
523 test_jid('source', to_jid), | |
524 test_jid('dest', from_jid))) | |
525 else: | |
526 query_parts.append(u"%s AND %s" % (test_jid('source', from_jid), | |
527 test_jid('dest', to_jid))) | |
528 | |
529 if filters: | |
530 if 'body' in filters: | |
531 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html | |
532 query_parts.append(u"AND message LIKE ?") | |
533 values.append(u"%{}%".format(filters['body'])) | |
534 if 'search' in filters: | |
535 query_parts.append(u"AND (message LIKE ? OR source_res LIKE ?)") | |
536 values.extend([u"%{}%".format(filters['search'])] * 2) | |
537 if 'types' in filters: | |
538 types = filters['types'].split() | |
539 query_parts.append(u"AND type IN ({})".format(u','.join("?"*len(types)))) | |
540 values.extend(types) | |
541 if 'not_types' in filters: | |
542 types = filters['not_types'].split() | |
543 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types)))) | |
544 values.extend(types) | |
545 | |
546 | |
547 query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList | |
548 # we use DESC here so LIMIT keep the last messages | |
549 if limit is not None: | |
550 query_parts.append(u"LIMIT ?") | |
551 values.append(limit) | |
552 | |
553 d = self.dbpool.runQuery(u" ".join(query_parts), values) | |
554 d.addCallback(self.sqliteHistoryToList) | |
555 d.addCallback(self.listDict2listTuple) | |
556 return d | |
557 | |
558 ## Private values | |
559 | |
560 def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None): | |
561 """generic errback for data queries""" | |
562 log.error(_(u"Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format( | |
563 operation = operation, | |
564 namespace = namespace, | |
565 and_key = (u" and key " + key) if key is not None else u"", | |
566 for_profile = (u' [' + profile + u']') if profile is not None else u'', | |
567 msg = failure_)) | |
568 | |
569 def _generateDataDict(self, query_result, binary): | |
570 if binary: | |
571 return {k: pickle.loads(str(v)) for k,v in query_result} | |
572 else: | |
573 return dict(query_result) | |
574 | |
575 def _getPrivateTable(self, binary, profile): | |
576 """Get table to use for private values""" | |
577 table = [u'private'] | |
578 | |
579 if profile is None: | |
580 table.append(u'gen') | |
581 else: | |
582 table.append(u'ind') | |
583 | |
584 if binary: | |
585 table.append(u'bin') | |
586 | |
587 return u'_'.join(table) | |
588 | |
589 def getPrivates(self, namespace, keys=None, binary=False, profile=None): | |
590 """Get private value(s) from databases | |
591 | |
592 @param namespace(unicode): namespace of the values | |
593 @param keys(iterable, None): keys of the values to get | |
594 None to get all keys/values | |
595 @param binary(bool): True to deserialise binary values | |
596 @param profile(unicode, None): profile to use for individual values | |
597 None to use general values | |
598 @return (dict[unicode, object]): gotten keys/values | |
599 """ | |
600 log.debug(_(u"getting {type}{binary} private values from database for namespace {namespace}{keys}".format( | |
601 type = u"general" if profile is None else "individual", | |
602 binary = u" binary" if binary else u"", | |
603 namespace = namespace, | |
604 keys = u" with keys {}".format(u", ".join(keys)) if keys is not None else u""))) | |
605 table = self._getPrivateTable(binary, profile) | |
606 query_parts = [u"SELECT key,value FROM", table, "WHERE namespace=?"] | |
607 args = [namespace] | |
608 | |
609 if keys is not None: | |
610 placeholders = u','.join(len(keys) * u'?') | |
611 query_parts.append(u'AND key IN (' + placeholders + u')') | |
612 args.extend(keys) | |
613 | |
614 if profile is not None: | |
615 query_parts.append(u'AND profile_id=?') | |
616 args.append(self.profiles[profile]) | |
617 | |
618 d = self.dbpool.runQuery(u" ".join(query_parts), args) | |
619 d.addCallback(self._generateDataDict, binary) | |
620 d.addErrback(self._privateDataEb, u"get", namespace, profile=profile) | |
621 return d | |
622 | |
623 def setPrivateValue(self, namespace, key, value, binary=False, profile=None): | |
624 """Set a private value in database | |
625 | |
626 @param namespace(unicode): namespace of the values | |
627 @param key(unicode): key of the value to set | |
628 @param value(object): value to set | |
629 @param binary(bool): True if it's a binary values | |
630 binary values need to be serialised, used for everything but strings | |
631 @param profile(unicode, None): profile to use for individual value | |
632 if None, it's a general value | |
633 """ | |
634 table = self._getPrivateTable(binary, profile) | |
635 query_values_names = [u'namespace', u'key', u'value'] | |
636 query_values = [namespace, key] | |
637 | |
638 if binary: | |
639 value = sqlite3.Binary(pickle.dumps(value, 0)) | |
640 | |
641 query_values.append(value) | |
642 | |
643 if profile is not None: | |
644 query_values_names.append(u'profile_id') | |
645 query_values.append(self.profiles[profile]) | |
646 | |
647 query_parts = [u"REPLACE INTO", table, u'(', u','.join(query_values_names), u')', | |
648 u"VALUES (", u",".join(u'?'*len(query_values_names)), u')'] | |
649 | |
650 d = self.dbpool.runQuery(u" ".join(query_parts), query_values) | |
651 d.addErrback(self._privateDataEb, u"set", namespace, key, profile=profile) | |
652 return d | |
653 | |
654 def delPrivateValue(self, namespace, key, binary=False, profile=None): | |
655 """Delete private value from database | |
656 | |
657 @param category: category of the privateeter | |
658 @param key: key of the private value | |
659 @param binary(bool): True if it's a binary values | |
660 @param profile(unicode, None): profile to use for individual value | |
661 if None, it's a general value | |
662 """ | |
663 table = self._getPrivateTable(binary, profile) | |
664 query_parts = [u"DELETE FROM", table, u"WHERE namespace=? AND key=?"] | |
665 args = [namespace, key] | |
666 if profile is not None: | |
667 query_parts.append(u"AND profile_id=?") | |
668 args.append(self.profiles[profile]) | |
669 d = self.dbpool.runQuery(u" ".join(query_parts), args) | |
670 d.addErrback(self._privateDataEb, u"delete", namespace, key, profile=profile) | |
671 return d | |
672 | |
673 ## Files | |
674 | |
675 @defer.inlineCallbacks | |
676 def getFiles(self, client, file_id=None, version=u'', parent=None, type_=None, | |
677 file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None, | |
678 owner=None, access=None, projection=None, unique=False): | |
679 """retrieve files with with given filters | |
680 | |
681 @param file_id(unicode, None): id of the file | |
682 None to ignore | |
683 @param version(unicode, None): version of the file | |
684 None to ignore | |
685 empty string to look for current version | |
686 @param parent(unicode, None): id of the directory containing the files | |
687 None to ignore | |
688 empty string to look for root files/directories | |
689 @param projection(list[unicode], None): name of columns to retrieve | |
690 None to retrieve all | |
691 @param unique(bool): if True will remove duplicates | |
692 other params are the same as for [setFile] | |
693 @return (list[dict]): files corresponding to filters | |
694 """ | |
695 query_parts = ["SELECT"] | |
696 if unique: | |
697 query_parts.append('DISTINCT') | |
698 if projection is None: | |
699 projection = ['id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name', | |
700 'size', 'namespace', 'mime_type', 'created', 'modified', 'owner', | |
701 'access', 'extra'] | |
702 query_parts.append(','.join(projection)) | |
703 query_parts.append("FROM files WHERE") | |
704 filters = ['profile_id=?'] | |
705 args = [self.profiles[client.profile]] | |
706 | |
707 if file_id is not None: | |
708 filters.append(u'id=?') | |
709 args.append(file_id) | |
710 if version is not None: | |
711 filters.append(u'version=?') | |
712 args.append(version) | |
713 if parent is not None: | |
714 filters.append(u'parent=?') | |
715 args.append(parent) | |
716 if type_ is not None: | |
717 filters.append(u'type=?') | |
718 args.append(type_) | |
719 if file_hash is not None: | |
720 filters.append(u'file_hash=?') | |
721 args.append(file_hash) | |
722 if hash_algo is not None: | |
723 filters.append(u'hash_algo=?') | |
724 args.append(hash_algo) | |
725 if name is not None: | |
726 filters.append(u'name=?') | |
727 args.append(name) | |
728 if namespace is not None: | |
729 filters.append(u'namespace=?') | |
730 args.append(namespace) | |
731 if mime_type is not None: | |
732 filters.append(u'mime_type=?') | |
733 args.append(mime_type) | |
734 if owner is not None: | |
735 filters.append(u'owner=?') | |
736 args.append(owner.full()) | |
737 if access is not None: | |
738 raise NotImplementedError('Access check is not implemented yet') | |
739 # a JSON comparison is needed here | |
740 | |
741 filters = u' AND '.join(filters) | |
742 query_parts.append(filters) | |
743 query = u' '.join(query_parts) | |
744 | |
745 result = yield self.dbpool.runQuery(query, args) | |
746 files_data = [dict(zip(projection, row)) for row in result] | |
747 to_parse = {'access', 'extra'}.intersection(projection) | |
748 to_filter = {'owner'}.intersection(projection) | |
749 if to_parse or to_filter: | |
750 for file_data in files_data: | |
751 for key in to_parse: | |
752 value = file_data[key] | |
753 file_data[key] = {} if value is None else json.loads(value) | |
754 owner = file_data.get('owner') | |
755 if owner is not None: | |
756 file_data['owner'] = jid.JID(owner) | |
757 defer.returnValue(files_data) | |
758 | |
759 def setFile(self, client, name, file_id, version=u'', parent=None, type_=C.FILE_TYPE_FILE, | |
760 file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None, | |
761 created=None, modified=None, owner=None, access=None, extra=None): | |
762 """set a file metadata | |
763 | |
764 @param client(SatXMPPClient): client owning the file | |
765 @param name(unicode): name of the file (must not contain "/") | |
766 @param file_id(unicode): unique id of the file | |
767 @param version(unicode): version of this file | |
768 @param parent(unicode): id of the directory containing this file | |
769 None if it is a root file/directory | |
770 @param type_(unicode): one of: | |
771 - file | |
772 - directory | |
773 @param file_hash(unicode): unique hash of the payload | |
774 @param hash_algo(unicode): algorithm used for hashing the file (usually sha-256) | |
775 @param size(int): size in bytes | |
776 @param namespace(unicode, None): identifier (human readable is better) to group files | |
777 for instance, namespace could be used to group files in a specific photo album | |
778 @param mime_type(unicode): MIME type of the file, or None if not known/guessed | |
779 @param created(int): UNIX time of creation | |
780 @param modified(int,None): UNIX time of last modification, or None to use created date | |
781 @param owner(jid.JID, None): jid of the owner of the file (mainly useful for component) | |
782 @param access(dict, None): serialisable dictionary with access rules. See [memory.memory] for details | |
783 @param extra(dict, None): serialisable dictionary of any extra data | |
784 will be encoded to json in database | |
785 """ | |
786 if extra is not None: | |
787 assert isinstance(extra, dict) | |
788 query = ('INSERT INTO files(id, version, parent, type, file_hash, hash_algo, name, size, namespace, ' | |
789 'mime_type, created, modified, owner, access, extra, profile_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)') | |
790 d = self.dbpool.runQuery(query, (file_id, version.strip(), parent, type_, | |
791 file_hash, hash_algo, | |
792 name, size, namespace, | |
793 mime_type, created, modified, | |
794 owner.full() if owner is not None else None, | |
795 json.dumps(access) if access else None, | |
796 json.dumps(extra) if extra else None, | |
797 self.profiles[client.profile])) | |
798 d.addErrback(lambda failure: log.error(_(u"Can't save file metadata for [{profile}]: {reason}".format(profile=client.profile, reason=failure)))) | |
799 return d | |
800 | |
801 def _fileUpdate(self, cursor, file_id, column, update_cb): | |
802 query = 'SELECT {column} FROM files where id=?'.format(column=column) | |
803 for i in xrange(5): | |
804 cursor.execute(query, [file_id]) | |
805 try: | |
806 older_value_raw = cursor.fetchone()[0] | |
807 except TypeError: | |
808 raise exceptions.NotFound | |
809 value = json.loads(older_value_raw) | |
810 update_cb(value) | |
811 value_raw = json.dumps(value) | |
812 update_query = 'UPDATE files SET {column}=? WHERE id=? AND {column}=?'.format(column=column) | |
813 update_args = (value_raw, file_id, older_value_raw) | |
814 try: | |
815 cursor.execute(update_query, update_args) | |
816 except sqlite3.Error: | |
817 pass | |
818 else: | |
819 if cursor.rowcount == 1: | |
820 break; | |
821 log.warning(_(u"table not updated, probably due to race condition, trying again ({tries})").format(tries=i+1)) | |
822 else: | |
823 log.error(_(u"Can't update file table")) | |
824 | |
825 def fileUpdate(self, file_id, column, update_cb): | |
826 """update a column value using a method to avoid race conditions | |
827 | |
828 the older value will be retrieved from database, then update_cb will be applied | |
829 to update it, and file will be updated checking that older value has not been changed meanwhile | |
830 by an other user. If it has changed, it tries again a couple of times before failing | |
831 @param column(str): column name (only "access" or "extra" are allowed) | |
832 @param update_cb(callable): method to update the value of the colum | |
833 the method will take older value as argument, and must update it in place | |
834 update_cb must not care about serialization, | |
835 it get the deserialized data (i.e. a Python object) directly | |
836 Note that the callable must be thread-safe | |
837 @raise exceptions.NotFound: there is not file with this id | |
838 """ | |
839 if column not in ('access', 'extra'): | |
840 raise exceptions.InternalError('bad column name') | |
841 return self.dbpool.runInteraction(self._fileUpdate, file_id, column, update_cb) | |
842 | |
843 ##Helper methods## | |
844 | |
845 def __getFirstResult(self, result): | |
846 """Return the first result of a database query | |
847 Useful when we are looking for one specific value""" | |
848 return None if not result else result[0][0] | |
849 | |
850 | |
851 class Updater(object): | |
852 stmnt_regex = re.compile(r"[\w/' ]+(?:\(.*?\))?[^,]*") | |
853 clean_regex = re.compile(r"^ +|(?<= ) +|(?<=,) +| +$") | |
854 CREATE_SQL = "CREATE TABLE %s (%s)" | |
855 INSERT_SQL = "INSERT INTO %s VALUES (%s)" | |
856 DROP_SQL = "DROP TABLE %s" | |
857 ALTER_SQL = "ALTER TABLE %s ADD COLUMN %s" | |
858 RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s" | |
859 | |
860 CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN') | |
861 TMP_TABLE = "tmp_sat_update" | |
862 | |
863 def __init__(self, dbpool, sat_version): | |
864 self._sat_version = sat_version | |
865 self.dbpool = dbpool | |
866 | |
867 def getLocalVersion(self): | |
868 """ Get local database version | |
869 | |
870 @return: version (int) | |
871 """ | |
872 return self.dbpool.runQuery("PRAGMA user_version").addCallback(lambda ret: int(ret[0][0])) | |
873 | |
874 def _setLocalVersion(self, version): | |
875 """ Set local database version | |
876 | |
877 @param version: version (int) | |
878 @return: deferred | |
879 """ | |
880 return self.dbpool.runOperation("PRAGMA user_version=%d" % version) | |
881 | |
882 def getLocalSchema(self): | |
883 """ return raw local schema | |
884 | |
885 @return: list of strings with CREATE sql statements for local database | |
886 """ | |
887 d = self.dbpool.runQuery("select sql from sqlite_master where type = 'table'") | |
888 d.addCallback(lambda result: [row[0] for row in result]) | |
889 return d | |
890 | |
891 @defer.inlineCallbacks | |
892 def checkUpdates(self): | |
893 """ Check if a database schema/content update is needed, according to DATABASE_SCHEMAS | |
894 | |
895 @return: deferred which fire a list of SQL update statements, or None if no update is needed | |
896 """ | |
897 local_version = yield self.getLocalVersion() | |
898 raw_local_sch = yield self.getLocalSchema() | |
899 | |
900 local_sch = self.rawStatements2data(raw_local_sch) | |
901 current_sch = DATABASE_SCHEMAS['current']['CREATE'] | |
902 local_hash = self.statementHash(local_sch) | |
903 current_hash = self.statementHash(current_sch) | |
904 | |
905 # Force the update if the schemas are unchanged but a specific update is needed | |
906 force_update = local_hash == current_hash and local_version < CURRENT_DB_VERSION \ | |
907 and 'specific' in DATABASE_SCHEMAS[CURRENT_DB_VERSION] | |
908 | |
909 if local_hash == current_hash and not force_update: | |
910 if local_version != CURRENT_DB_VERSION: | |
911 log.warning(_("Your local schema is up-to-date, but database versions mismatch, fixing it...")) | |
912 yield self._setLocalVersion(CURRENT_DB_VERSION) | |
913 else: | |
914 # an update is needed | |
915 | |
916 if local_version == CURRENT_DB_VERSION: | |
917 # Database mismatch and we have the latest version | |
918 if self._sat_version.endswith('D'): | |
919 # we are in a development version | |
920 update_data = self.generateUpdateData(local_sch, current_sch, False) | |
921 log.warning(_("There is a schema mismatch, but as we are on a dev version, database will be updated")) | |
922 update_raw = yield self.update2raw(update_data, True) | |
923 defer.returnValue(update_raw) | |
924 else: | |
925 log.error(_(u"schema version is up-to-date, but local schema differ from expected current schema")) | |
926 update_data = self.generateUpdateData(local_sch, current_sch, True) | |
927 update_raw = yield self.update2raw(update_data) | |
928 log.warning(_(u"Here are the commands that should fix the situation, use at your own risk (do a backup before modifying database), you can go to SàT's MUC room at sat@chat.jabberfr.org for help\n### SQL###\n%s\n### END SQL ###\n") % u'\n'.join("%s;" % statement for statement in update_raw)) | |
929 raise exceptions.DatabaseError("Database mismatch") | |
930 else: | |
931 # Database is not up-to-date, we'll do the update | |
932 if force_update: | |
933 log.info(_("Database content needs a specific processing, local database will be updated")) | |
934 else: | |
935 log.info(_("Database schema has changed, local database will be updated")) | |
936 update_raw = [] | |
937 for version in xrange(local_version + 1, CURRENT_DB_VERSION + 1): | |
938 try: | |
939 update_data = DATABASE_SCHEMAS[version] | |
940 except KeyError: | |
941 raise exceptions.InternalError("Missing update definition (version %d)" % version) | |
942 update_raw_step = yield self.update2raw(update_data) | |
943 update_raw.extend(update_raw_step) | |
944 update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION) | |
945 defer.returnValue(update_raw) | |
946 | |
947 @staticmethod | |
948 def createData2Raw(data): | |
949 """ Generate SQL statements from statements data | |
950 | |
951 @param data: dictionary with table as key, and statements data in tuples as value | |
952 @return: list of strings with raw statements | |
953 """ | |
954 ret = [] | |
955 for table in data: | |
956 defs, constraints = data[table] | |
957 assert isinstance(defs, tuple) | |
958 assert isinstance(constraints, tuple) | |
959 ret.append(Updater.CREATE_SQL % (table, ', '.join(defs + constraints))) | |
960 return ret | |
961 | |
962 @staticmethod | |
963 def insertData2Raw(data): | |
964 """ Generate SQL statements from statements data | |
965 | |
966 @param data: dictionary with table as key, and statements data in tuples as value | |
967 @return: list of strings with raw statements | |
968 """ | |
969 ret = [] | |
970 for table in data: | |
971 values_tuple = data[table] | |
972 assert isinstance(values_tuple, tuple) | |
973 for values in values_tuple: | |
974 assert isinstance(values, tuple) | |
975 ret.append(Updater.INSERT_SQL % (table, ', '.join(values))) | |
976 return ret | |
977 | |
978 def statementHash(self, data): | |
979 """ Generate hash of template data | |
980 | |
981 useful to compare schemas | |
982 @param data: dictionary of "CREATE" statement, with tables names as key, | |
983 and tuples of (col_defs, constraints) as values | |
984 @return: hash as string | |
985 """ | |
986 hash_ = hashlib.sha1() | |
987 tables = data.keys() | |
988 tables.sort() | |
989 | |
990 def stmnts2str(stmts): | |
991 return ','.join([self.clean_regex.sub('',stmt) for stmt in sorted(stmts)]) | |
992 | |
993 for table in tables: | |
994 col_defs, col_constr = data[table] | |
995 hash_.update("%s:%s:%s" % (table, stmnts2str(col_defs), stmnts2str(col_constr))) | |
996 return hash_.digest() | |
997 | |
998 def rawStatements2data(self, raw_statements): | |
999 """ separate "CREATE" statements into dictionary/tuples data | |
1000 | |
1001 @param raw_statements: list of CREATE statements as strings | |
1002 @return: dictionary with table names as key, and a (col_defs, constraints) tuple | |
1003 """ | |
1004 schema_dict = {} | |
1005 for create_statement in raw_statements: | |
1006 if not create_statement.startswith("CREATE TABLE "): | |
1007 log.warning("Unexpected statement, ignoring it") | |
1008 continue | |
1009 _create_statement = create_statement[13:] | |
1010 table, raw_col_stats = _create_statement.split(' ',1) | |
1011 if raw_col_stats[0] != '(' or raw_col_stats[-1] != ')': | |
1012 log.warning("Unexpected statement structure, ignoring it") | |
1013 continue | |
1014 col_stats = [stmt.strip() for stmt in self.stmnt_regex.findall(raw_col_stats[1:-1])] | |
1015 col_defs = [] | |
1016 constraints = [] | |
1017 for col_stat in col_stats: | |
1018 name = col_stat.split(' ',1)[0] | |
1019 if name in self.CONSTRAINTS: | |
1020 constraints.append(col_stat) | |
1021 else: | |
1022 col_defs.append(col_stat) | |
1023 schema_dict[table] = (tuple(col_defs), tuple(constraints)) | |
1024 return schema_dict | |
1025 | |
1026 def generateUpdateData(self, old_data, new_data, modify=False): | |
1027 """ Generate data for automatic update between two schema data | |
1028 | |
1029 @param old_data: data of the former schema (which must be updated) | |
1030 @param new_data: data of the current schema | |
1031 @param modify: if True, always use "cols modify" table, else try to ALTER tables | |
1032 @return: update data, a dictionary with: | |
1033 - 'create': dictionary of tables to create | |
1034 - 'delete': tuple of tables to delete | |
1035 - 'cols create': dictionary of columns to create (table as key, tuple of columns to create as value) | |
1036 - 'cols delete': dictionary of columns to delete (table as key, tuple of columns to delete as value) | |
1037 - 'cols modify': dictionary of columns to modify (table as key, tuple of old columns to transfert as value). With this table, a new table will be created, and content from the old table will be transfered to it, only cols specified in the tuple will be transfered. | |
1038 """ | |
1039 | |
1040 create_tables_data = {} | |
1041 create_cols_data = {} | |
1042 modify_cols_data = {} | |
1043 delete_cols_data = {} | |
1044 old_tables = set(old_data.keys()) | |
1045 new_tables = set(new_data.keys()) | |
1046 | |
1047 def getChanges(set_olds, set_news): | |
1048 to_create = set_news.difference(set_olds) | |
1049 to_delete = set_olds.difference(set_news) | |
1050 to_check = set_news.intersection(set_olds) | |
1051 return tuple(to_create), tuple(to_delete), tuple(to_check) | |
1052 | |
1053 tables_to_create, tables_to_delete, tables_to_check = getChanges(old_tables, new_tables) | |
1054 | |
1055 for table in tables_to_create: | |
1056 create_tables_data[table] = new_data[table] | |
1057 | |
1058 for table in tables_to_check: | |
1059 old_col_defs, old_constraints = old_data[table] | |
1060 new_col_defs, new_constraints = new_data[table] | |
1061 for obj in old_col_defs, old_constraints, new_col_defs, new_constraints: | |
1062 if not isinstance(obj, tuple): | |
1063 raise exceptions.InternalError("Columns definitions must be tuples") | |
1064 defs_create, defs_delete, ignore = getChanges(set(old_col_defs), set(new_col_defs)) | |
1065 constraints_create, constraints_delete, ignore = getChanges(set(old_constraints), set(new_constraints)) | |
1066 created_col_names = set([name.split(' ',1)[0] for name in defs_create]) | |
1067 deleted_col_names = set([name.split(' ',1)[0] for name in defs_delete]) | |
1068 if (created_col_names.intersection(deleted_col_names or constraints_create or constraints_delete) or | |
1069 (modify and (defs_create or constraints_create or defs_delete or constraints_delete))): | |
1070 # we have modified columns, we need to transfer table | |
1071 # we determinate which columns are in both schema so we can transfer them | |
1072 old_names = set([name.split(' ',1)[0] for name in old_col_defs]) | |
1073 new_names = set([name.split(' ',1)[0] for name in new_col_defs]) | |
1074 modify_cols_data[table] = tuple(old_names.intersection(new_names)); | |
1075 else: | |
1076 if defs_create: | |
1077 create_cols_data[table] = (defs_create) | |
1078 if defs_delete or constraints_delete: | |
1079 delete_cols_data[table] = (defs_delete) | |
1080 | |
1081 return {'create': create_tables_data, | |
1082 'delete': tables_to_delete, | |
1083 'cols create': create_cols_data, | |
1084 'cols delete': delete_cols_data, | |
1085 'cols modify': modify_cols_data | |
1086 } | |
1087 | |
1088 @defer.inlineCallbacks | |
1089 def update2raw(self, update, dev_version=False): | |
1090 """ Transform update data to raw SQLite statements | |
1091 | |
1092 @param update: update data as returned by generateUpdateData | |
1093 @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database. | |
1094 @return: list of string with SQL statements needed to update the base | |
1095 """ | |
1096 ret = self.createData2Raw(update.get('create', {})) | |
1097 drop = [] | |
1098 for table in update.get('delete', tuple()): | |
1099 drop.append(self.DROP_SQL % table) | |
1100 if dev_version: | |
1101 if drop: | |
1102 log.info("Dev version, SQL NOT EXECUTED:\n--\n%s\n--\n" % "\n".join(drop)) | |
1103 else: | |
1104 ret.extend(drop) | |
1105 | |
1106 cols_create = update.get('cols create', {}) | |
1107 for table in cols_create: | |
1108 for col_def in cols_create[table]: | |
1109 ret.append(self.ALTER_SQL % (table, col_def)) | |
1110 | |
1111 cols_delete = update.get('cols delete', {}) | |
1112 for table in cols_delete: | |
1113 log.info("Following columns in table [%s] are not needed anymore, but are kept for dev version: %s" % (table, ", ".join(cols_delete[table]))) | |
1114 | |
1115 cols_modify = update.get('cols modify', {}) | |
1116 for table in cols_modify: | |
1117 ret.append(self.RENAME_TABLE_SQL % (table, self.TMP_TABLE)) | |
1118 main, extra = DATABASE_SCHEMAS['current']['CREATE'][table] | |
1119 ret.append(self.CREATE_SQL % (table, ', '.join(main + extra))) | |
1120 common_cols = ', '.join(cols_modify[table]) | |
1121 ret.append("INSERT INTO %s (%s) SELECT %s FROM %s" % (table, common_cols, common_cols, self.TMP_TABLE)) | |
1122 ret.append(self.DROP_SQL % self.TMP_TABLE) | |
1123 | |
1124 insert = update.get('insert', {}) | |
1125 ret.extend(self.insertData2Raw(insert)) | |
1126 | |
1127 specific = update.get('specific', None) | |
1128 if specific: | |
1129 cmds = yield getattr(self, specific)() | |
1130 ret.extend(cmds or []) | |
1131 defer.returnValue(ret) | |
1132 | |
1133 @defer.inlineCallbacks | |
1134 def update_v3(self): | |
1135 """Update database from v2 to v3 (message refactoring)""" | |
1136 # XXX: this update do all the messages in one huge transaction | |
1137 # this is really memory consuming, but was OK on a reasonably | |
1138 # big database for tests. If issues are happening, we can cut it | |
1139 # in smaller transactions using LIMIT and by deleting already updated | |
1140 # messages | |
1141 log.info(u"Database update to v3, this may take a while") | |
1142 | |
1143 # we need to fix duplicate timestamp, as it can result in conflicts with the new schema | |
1144 rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1") | |
1145 if rows: | |
1146 log.info("fixing duplicate timestamp") | |
1147 fixed = [] | |
1148 for timestamp, dummy in rows: | |
1149 ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,)) | |
1150 for idx, (id_,) in enumerate(ids_rows): | |
1151 fixed.append(id_) | |
1152 yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_)) | |
1153 log.info(u"fixed messages with ids {}".format(u', '.join([unicode(id_) for id_ in fixed]))) | |
1154 | |
1155 def historySchema(txn): | |
1156 log.info(u"History schema update") | |
1157 txn.execute("ALTER TABLE history RENAME TO tmp_sat_update") | |
1158 txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))") | |
1159 txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update") | |
1160 | |
1161 yield self.dbpool.runInteraction(historySchema) | |
1162 | |
1163 def newTables(txn): | |
1164 log.info(u"Creating new tables") | |
1165 txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | |
1166 txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | |
1167 txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | |
1168 | |
1169 yield self.dbpool.runInteraction(newTables) | |
1170 | |
1171 log.info(u"inserting new message type") | |
1172 yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',)) | |
1173 | |
1174 log.info(u"messages update") | |
1175 rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update") | |
1176 total = len(rows) | |
1177 | |
1178 def updateHistory(txn, queries): | |
1179 for query, args in iter(queries): | |
1180 txn.execute(query, args) | |
1181 | |
1182 queries = [] | |
1183 for idx, row in enumerate(rows, 1): | |
1184 if idx % 1000 == 0 or total - idx == 0: | |
1185 log.info("preparing message {}/{}".format(idx, total)) | |
1186 id_, timestamp, message, extra = row | |
1187 try: | |
1188 extra = pickle.loads(str(extra or "")) | |
1189 except EOFError: | |
1190 extra = {} | |
1191 except Exception: | |
1192 log.warning(u"Can't handle extra data for message id {}, ignoring it".format(id_)) | |
1193 extra = {} | |
1194 | |
1195 queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message))) | |
1196 | |
1197 try: | |
1198 subject = extra.pop('subject') | |
1199 except KeyError: | |
1200 pass | |
1201 else: | |
1202 try: | |
1203 subject = subject.decode('utf-8') | |
1204 except UnicodeEncodeError: | |
1205 log.warning(u"Error while decoding subject, ignoring it") | |
1206 del extra['subject'] | |
1207 else: | |
1208 queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject))) | |
1209 | |
1210 received_timestamp = extra.pop('timestamp', None) | |
1211 try: | |
1212 del extra['archive'] | |
1213 except KeyError: | |
1214 # archive was not used | |
1215 pass | |
1216 | |
1217 queries.append(("UPDATE history SET received_timestamp=?,extra=? WHERE uid=?",(id_, received_timestamp, sqlite3.Binary(pickle.dumps(extra, 0))))) | |
1218 | |
1219 yield self.dbpool.runInteraction(updateHistory, queries) | |
1220 | |
1221 log.info("Dropping temporary table") | |
1222 yield self.dbpool.runQuery("DROP TABLE tmp_sat_update") | |
1223 log.info("Database update finished :)") | |
1224 | |
1225 def update2raw_v2(self): | |
1226 """Update the database from v1 to v2 (add passwords encryptions): | |
1227 | |
1228 - the XMPP password value is re-used for the profile password (new parameter) | |
1229 - the profile password is stored hashed | |
1230 - the XMPP password is stored encrypted, with the profile password as key | |
1231 - as there are no other stored passwords yet, it is enough, otherwise we | |
1232 would need to encrypt the other passwords as it's done for XMPP password | |
1233 """ | |
1234 xmpp_pass_path = ('Connection', 'Password') | |
1235 | |
1236 def encrypt_values(values): | |
1237 ret = [] | |
1238 list_ = [] | |
1239 | |
1240 def prepare_queries(result, xmpp_password): | |
1241 try: | |
1242 id_ = result[0][0] | |
1243 except IndexError: | |
1244 log.error(u"Profile of id %d is referenced in 'param_ind' but it doesn't exist!" % profile_id) | |
1245 return defer.succeed(None) | |
1246 | |
1247 sat_password = xmpp_password | |
1248 d1 = PasswordHasher.hash(sat_password) | |
1249 personal_key = BlockCipher.getRandomKey(base64=True) | |
1250 d2 = BlockCipher.encrypt(sat_password, personal_key) | |
1251 d3 = BlockCipher.encrypt(personal_key, xmpp_password) | |
1252 | |
1253 def gotValues(res): | |
1254 sat_cipher, personal_cipher, xmpp_cipher = res[0][1], res[1][1], res[2][1] | |
1255 ret.append("INSERT INTO param_ind(category,name,profile_id,value) VALUES ('%s','%s',%s,'%s')" % | |
1256 (C.PROFILE_PASS_PATH[0], C.PROFILE_PASS_PATH[1], id_, sat_cipher)) | |
1257 | |
1258 ret.append("INSERT INTO private_ind(namespace,key,profile_id,value) VALUES ('%s','%s',%s,'%s')" % | |
1259 (C.MEMORY_CRYPTO_NAMESPACE, C.MEMORY_CRYPTO_KEY, id_, personal_cipher)) | |
1260 | |
1261 ret.append("REPLACE INTO param_ind(category,name,profile_id,value) VALUES ('%s','%s',%s,'%s')" % | |
1262 (xmpp_pass_path[0], xmpp_pass_path[1], id_, xmpp_cipher)) | |
1263 | |
1264 return defer.DeferredList([d1, d2, d3]).addCallback(gotValues) | |
1265 | |
1266 for profile_id, xmpp_password in values: | |
1267 d = self.dbpool.runQuery("SELECT id FROM profiles WHERE id=?", (profile_id,)) | |
1268 d.addCallback(prepare_queries, xmpp_password) | |
1269 list_.append(d) | |
1270 | |
1271 d_list = defer.DeferredList(list_) | |
1272 d_list.addCallback(lambda dummy: ret) | |
1273 return d_list | |
1274 | |
1275 def updateLiberviaConf(values): | |
1276 try: | |
1277 profile_id = values[0][0] | |
1278 except IndexError: | |
1279 return # no profile called "libervia" | |
1280 | |
1281 def cb(selected): | |
1282 try: | |
1283 password = selected[0][0] | |
1284 except IndexError: | |
1285 log.error("Libervia profile exists but no password is set! Update Libervia configuration will be skipped.") | |
1286 return | |
1287 fixConfigOption('libervia', 'passphrase', password, False) | |
1288 d = self.dbpool.runQuery("SELECT value FROM param_ind WHERE category=? AND name=? AND profile_id=?", xmpp_pass_path + (profile_id,)) | |
1289 return d.addCallback(cb) | |
1290 | |
1291 d = self.dbpool.runQuery("SELECT id FROM profiles WHERE name='libervia'") | |
1292 d.addCallback(updateLiberviaConf) | |
1293 d.addCallback(lambda dummy: self.dbpool.runQuery("SELECT profile_id,value FROM param_ind WHERE category=? AND name=?", xmpp_pass_path)) | |
1294 d.addCallback(encrypt_values) | |
1295 return d |