comparison sat/memory/sqlite.py @ 2562:26edcf3a30eb

core, setup: huge cleaning: - moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention - move twisted directory to root - removed all hacks from setup.py, and added missing dependencies, it is now clean - use https URL for website in setup.py - removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed - renamed sat.sh to sat and fixed its installation - added python_requires to specify Python version needed - replaced glib2reactor which use deprecated code by gtk3reactor sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author Goffi <goffi@goffi.org>
date Mon, 02 Apr 2018 19:44:50 +0200
parents src/memory/sqlite.py@35d591086974
children 310e41bd6666
comparison
equal deleted inserted replaced
2561:bd30dc3ffe5a 2562:26edcf3a30eb
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core.constants import Const as C
22 from sat.core import exceptions
23 from sat.core.log import getLogger
24 log = getLogger(__name__)
25 from sat.memory.crypto import BlockCipher, PasswordHasher
26 from sat.tools.config import fixConfigOption
27 from twisted.enterprise import adbapi
28 from twisted.internet import defer
29 from twisted.words.protocols.jabber import jid
30 from twisted.python import failure
31 from collections import OrderedDict
32 import re
33 import os.path
34 import cPickle as pickle
35 import hashlib
36 import sqlite3
37 import json
38
39 CURRENT_DB_VERSION = 5
40
41 # XXX: DATABASE schemas are used in the following way:
42 # - 'current' key is for the actual database schema, for a new base
43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update
44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed
45 # a 'current' data dict can contains the keys:
46 # - 'CREATE': it contains an Ordered dict with table to create as keys, and a len 2 tuple as value, where value[0] are the columns definitions and value[1] are the table constraints
47 # - 'INSERT': it contains an Ordered dict with table where values have to be inserted, and many tuples containing values to insert in the order of the rows (#TODO: manage named columns)
48 # an update data dict (the ones with a number) can contains the keys 'create', 'delete', 'cols create', 'cols delete', 'cols modify', 'insert' or 'specific'. See Updater.generateUpdateData for more infos. This method can be used to autogenerate update_data, to ease the work of the developers.
49 # TODO: indexes need to be improved
50
51 DATABASE_SCHEMAS = {
52 "current": {'CREATE': OrderedDict((
53 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"),
54 ("UNIQUE (name)",))),
55 ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"),
56 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))),
57 ('message_types', (("type TEXT PRIMARY KEY",),
58 ())),
59 ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT",
60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception)
61 "type TEXT", "extra BLOB"),
62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)",
63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones)
64 ))),
65 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"),
66 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
67 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"),
68 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
69 ('thread', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "thread_id TEXT", "parent_id TEXT"),("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
70 ('param_gen', (("category TEXT", "name TEXT", "value TEXT"),
71 ("PRIMARY KEY (category,name)",))),
72 ('param_ind', (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"),
73 ("PRIMARY KEY (category,name,profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
74 ('private_gen', (("namespace TEXT", "key TEXT", "value TEXT"),
75 ("PRIMARY KEY (namespace, key)",))),
76 ('private_ind', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"),
77 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
78 ('private_gen_bin', (("namespace TEXT", "key TEXT", "value BLOB"),
79 ("PRIMARY KEY (namespace, key)",))),
80 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"),
81 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
82 ('files', (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
83 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format(
84 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY),
85 "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER",
86 "namespace TEXT", "mime_type TEXT",
87 "created DATETIME NOT NULL", "modified DATETIME",
88 "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"),
89 ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
90 )),
91 'INSERT': OrderedDict((
92 ('message_types', (("'chat'",),
93 ("'error'",),
94 ("'groupchat'",),
95 ("'headline'",),
96 ("'normal'",),
97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC
98 )),
99 )),
100 },
101 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
102 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format(
103 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY),
104 "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER",
105 "namespace TEXT", "mime_type TEXT",
106 "created DATETIME NOT NULL", "modified DATETIME",
107 "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"),
108 ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))},
109 },
110 4: {'create': {'components': (('profile_id INTEGER PRIMARY KEY', 'entry_point TEXT NOT NULL'), ('FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE',))}
111 },
112 3: {'specific': 'update_v3'
113 },
114 2: {'specific': 'update2raw_v2'
115 },
116 1: {'cols create': {'history': ('extra BLOB',)},
117 },
118 }
119
120 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field
121 # this is specific to this sqlite storage and for now only used for received_timestamp
122 # because this value is stored in a separate field
123
124
125 class ConnectionPool(adbapi.ConnectionPool):
126 # Workaround to avoid IntegrityError causing (i)pdb to be launched in debug mode
127 def _runQuery(self, trans, *args, **kw):
128 retry = kw.pop('query_retry', 6)
129 try:
130 trans.execute(*args, **kw)
131 except sqlite3.IntegrityError as e:
132 raise failure.Failure(e)
133 except Exception as e:
134 # FIXME: in case of error, we retry a couple of times
135 # this is a workaround, we need to move to better
136 # Sqlite integration, probably with high level library
137 retry -= 1
138 if retry == 0:
139 log.error(_(u'too many db tries, we abandon! Error message: {msg}').format(
140 msg = e))
141 raise e
142 log.warning(_(u'exception while running query, retrying ({try_}): {msg}').format(
143 try_ = 6 - retry,
144 msg = e))
145 kw['query_retry'] = retry
146 return self._runQuery(trans, *args, **kw)
147 return trans.fetchall()
148
149
150 class SqliteStorage(object):
151 """This class manage storage with Sqlite database"""
152
153 def __init__(self, db_filename, sat_version):
154 """Connect to the given database
155
156 @param db_filename: full path to the Sqlite database
157 """
158 self.initialized = defer.Deferred() # triggered when memory is fully initialised and ready
159 self.profiles = {} # we keep cache for the profiles (key: profile name, value: profile id)
160
161 log.info(_("Connecting database"))
162 new_base = not os.path.exists(db_filename) # do we have to create the database ?
163 if new_base: # the dir may not exist if it's not the XDG recommended one
164 dir_ = os.path.dirname(db_filename)
165 if not os.path.exists(dir_):
166 os.makedirs(dir_, 0700)
167
168 def foreignKeysOn(sqlite):
169 sqlite.execute('PRAGMA foreign_keys = ON')
170
171 self.dbpool = ConnectionPool("sqlite3", db_filename, cp_openfun=foreignKeysOn, check_same_thread=False, timeout=15)
172
173 def getNewBaseSql():
174 log.info(_("The database is new, creating the tables"))
175 database_creation = ["PRAGMA user_version=%d" % CURRENT_DB_VERSION]
176 database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE']))
177 database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT']))
178 return database_creation
179
180 def getUpdateSql():
181 updater = Updater(self.dbpool, sat_version)
182 return updater.checkUpdates()
183
184 def commitStatements(statements):
185
186 if statements is None:
187 return defer.succeed(None)
188 log.debug(u"===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
189 d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
190 return d
191
192 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done
193
194 init_defer = defer.succeed(None)
195
196 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql())
197 init_defer.addCallback(commitStatements)
198
199 def fillProfileCache(ignore):
200 d = self.dbpool.runQuery("SELECT profile_id, entry_point FROM components").addCallback(self._cacheComponentsAndProfiles)
201 d.chainDeferred(self.initialized)
202
203 init_defer.addCallback(fillProfileCache)
204
205 def _updateDb(self, interaction, statements):
206 for statement in statements:
207 interaction.execute(statement)
208
209 ## Profiles
210
211 def _cacheComponentsAndProfiles(self, components_result):
212 """Get components results and send requests profiles
213
214 they will be both put in cache in _profilesCache
215 """
216 return self.dbpool.runQuery("SELECT name,id FROM profiles").addCallback(
217 self._cacheComponentsAndProfiles2, components_result)
218
219 def _cacheComponentsAndProfiles2(self, profiles_result, components):
220 """Fill the profiles cache
221
222 @param profiles_result: result of the sql profiles query
223 """
224 self.components = dict(components)
225 for profile in profiles_result:
226 name, id_ = profile
227 self.profiles[name] = id_
228
229 def getProfilesList(self):
230 """"Return list of all registered profiles"""
231 return self.profiles.keys()
232
233 def hasProfile(self, profile_name):
234 """return True if profile_name exists
235
236 @param profile_name: name of the profile to check
237 """
238 return profile_name in self.profiles
239
240 def profileIsComponent(self, profile_name):
241 try:
242 return self.profiles[profile_name] in self.components
243 except KeyError:
244 raise exceptions.NotFound(u"the requested profile doesn't exists")
245
246 def getEntryPoint(self, profile_name):
247 try:
248 return self.components[self.profiles[profile_name]]
249 except KeyError:
250 raise exceptions.NotFound(u"the requested profile doesn't exists or is not a component")
251
252 def createProfile(self, name, component=None):
253 """Create a new profile
254
255 @param name(unicode): name of the profile
256 @param component(None, unicode): if not None, must point to a component entry point
257 @return: deferred triggered once profile is actually created
258 """
259
260 def getProfileId(ignore):
261 return self.dbpool.runQuery("SELECT (id) FROM profiles WHERE name = ?", (name, ))
262
263 def setComponent(profile_id):
264 id_ = profile_id[0][0]
265 d_comp = self.dbpool.runQuery("INSERT INTO components(profile_id, entry_point) VALUES (?, ?)", (id_, component))
266 d_comp.addCallback(lambda dummy: profile_id)
267 return d_comp
268
269 def profile_created(profile_id):
270 id_= profile_id[0][0]
271 self.profiles[name] = id_ # we synchronise the cache
272
273 d = self.dbpool.runQuery("INSERT INTO profiles(name) VALUES (?)", (name, ))
274 d.addCallback(getProfileId)
275 if component is not None:
276 d.addCallback(setComponent)
277 d.addCallback(profile_created)
278 return d
279
280 def deleteProfile(self, name):
281 """Delete profile
282
283 @param name: name of the profile
284 @return: deferred triggered once profile is actually deleted
285 """
286 def deletionError(failure_):
287 log.error(_(u"Can't delete profile [%s]") % name)
288 return failure_
289
290 def delete(txn):
291 profile_id = self.profiles.pop(name)
292 txn.execute("DELETE FROM profiles WHERE name = ?", (name,))
293 # FIXME: the following queries should be done by the ON DELETE CASCADE
294 # but it seems they are not, so we explicitly do them by security
295 # this need more investigation
296 txn.execute("DELETE FROM history WHERE profile_id = ?", (profile_id,))
297 txn.execute("DELETE FROM param_ind WHERE profile_id = ?", (profile_id,))
298 txn.execute("DELETE FROM private_ind WHERE profile_id = ?", (profile_id,))
299 txn.execute("DELETE FROM private_ind_bin WHERE profile_id = ?", (profile_id,))
300 txn.execute("DELETE FROM components WHERE profile_id = ?", (profile_id,))
301 return None
302
303 d = self.dbpool.runInteraction(delete)
304 d.addCallback(lambda ignore: log.info(_("Profile [%s] deleted") % name))
305 d.addErrback(deletionError)
306 return d
307
308 ## Params
309 def loadGenParams(self, params_gen):
310 """Load general parameters
311
312 @param params_gen: dictionary to fill
313 @return: deferred
314 """
315
316 def fillParams(result):
317 for param in result:
318 category, name, value = param
319 params_gen[(category, name)] = value
320 log.debug(_(u"loading general parameters from database"))
321 return self.dbpool.runQuery("SELECT category,name,value FROM param_gen").addCallback(fillParams)
322
323 def loadIndParams(self, params_ind, profile):
324 """Load individual parameters
325
326 @param params_ind: dictionary to fill
327 @param profile: a profile which *must* exist
328 @return: deferred
329 """
330
331 def fillParams(result):
332 for param in result:
333 category, name, value = param
334 params_ind[(category, name)] = value
335 log.debug(_(u"loading individual parameters from database"))
336 d = self.dbpool.runQuery("SELECT category,name,value FROM param_ind WHERE profile_id=?", (self.profiles[profile], ))
337 d.addCallback(fillParams)
338 return d
339
340 def getIndParam(self, category, name, profile):
341 """Ask database for the value of one specific individual parameter
342
343 @param category: category of the parameter
344 @param name: name of the parameter
345 @param profile: %(doc_profile)s
346 @return: deferred
347 """
348 d = self.dbpool.runQuery("SELECT value FROM param_ind WHERE category=? AND name=? AND profile_id=?", (category, name, self.profiles[profile]))
349 d.addCallback(self.__getFirstResult)
350 return d
351
352 def setGenParam(self, category, name, value):
353 """Save the general parameters in database
354
355 @param category: category of the parameter
356 @param name: name of the parameter
357 @param value: value to set
358 @return: deferred"""
359 d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value))
360 d.addErrback(lambda ignore: log.error(_(u"Can't set general parameter (%(category)s/%(name)s) in database" % {"category": category, "name": name})))
361 return d
362
363 def setIndParam(self, category, name, value, profile):
364 """Save the individual parameters in database
365
366 @param category: category of the parameter
367 @param name: name of the parameter
368 @param value: value to set
369 @param profile: a profile which *must* exist
370 @return: deferred
371 """
372 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value))
373 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile})))
374 return d
375
376 ## History
377
378 def _addToHistoryCb(self, dummy, data):
379 # Message metadata were successfuly added to history
380 # now we can add message and subject
381 uid = data['uid']
382 for key in ('message', 'subject'):
383 for lang, value in data[key].iteritems():
384 d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key),
385 (uid, value, lang or None))
386 d.addErrback(lambda dummy: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format(
387 key=key, uid=uid, lang=lang, value=value))))
388 try:
389 thread = data['extra']['thread']
390 except KeyError:
391 pass
392 else:
393 thread_parent = data['extra'].get('thread_parent')
394 d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)",
395 (uid, thread, thread_parent))
396 d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format(
397 uid=uid, thread=thread, parent=thread_parent))))
398
399 def _addToHistoryEb(self, failure_, data):
400 failure_.trap(sqlite3.IntegrityError)
401 sqlite_msg = failure_.value.args[0]
402 if "UNIQUE constraint failed" in sqlite_msg:
403 log.debug(u"message {} is already in history, not storing it again".format(data['uid']))
404 if 'received_timestamp' not in data:
405 log.warning(u"duplicate message is not delayed, this is maybe a bug: data={}".format(data))
406 # we cancel message to avoid sending duplicate message to frontends
407 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message"))
408 else:
409 log.error(u"Can't store message in history: {}".format(failure_))
410
411 def _logHistoryError(self, failure_, from_jid, to_jid, data):
412 if failure_.check(exceptions.CancelError):
413 # we propagate CancelError to avoid sending message to frontends
414 raise failure_
415 log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})"
416 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])))
417
418 def addToHistory(self, data, profile):
419 """Store a new message in history
420
421 @param data(dict): message data as build by SatMessageProtocol.onMessage
422 """
423 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0)
424 from_jid = data['from']
425 to_jid = data['to']
426 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
427 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra)))
428 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data])
429 d.addErrback(self._logHistoryError, from_jid, to_jid, data)
430 return d
431
432 def sqliteHistoryToList(self, query_result):
433 """Get SQL query result and return a list of message data dicts"""
434 result = []
435 current = {'uid': None}
436 for row in reversed(query_result):
437 uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
438 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row
439 if uid != current['uid']:
440 # new message
441 try:
442 extra = pickle.loads(str(extra or ""))
443 except EOFError:
444 extra = {}
445 current = {
446 'from': "%s/%s" % (source, source_res) if source_res else source,
447 'to': "%s/%s" % (dest, dest_res) if dest_res else dest,
448 'uid': uid,
449 'message': {},
450 'subject': {},
451 'type': type_,
452 'extra': extra,
453 'timestamp': timestamp,
454 }
455 if update_uid is not None:
456 current['extra']['update_uid'] = update_uid
457 if received_timestamp is not None:
458 current['extra']['received_timestamp'] = str(received_timestamp)
459 result.append(current)
460
461 if message is not None:
462 current['message'][message_lang or ''] = message
463
464 if subject is not None:
465 current['subject'][subject_lang or ''] = subject
466
467 if thread is not None:
468 current_extra = current['extra']
469 current_extra['thread'] = thread
470 if thread_parent is not None:
471 current_extra['thread_parent'] = thread_parent
472 else:
473 if thread_parent is not None:
474 log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})"
475 .format(uid=uid, parent=thread_parent))
476
477 return result
478
479 def listDict2listTuple(self, messages_data):
480 """Return a list of tuple as used in bridge from a list of messages data"""
481 ret = []
482 for m in messages_data:
483 ret.append((m['uid'], m['timestamp'], m['from'], m['to'], m['message'], m['subject'], m['type'], m['extra']))
484 return ret
485
486 def historyGet(self, from_jid, to_jid, limit=None, between=True, filters=None, profile=None):
487 """Retrieve messages in history
488
489 @param from_jid (JID): source JID (full, or bare for catchall)
490 @param to_jid (JID): dest JID (full, or bare for catchall)
491 @param limit (int): maximum number of messages to get:
492 - 0 for no message (returns the empty list)
493 - None for unlimited
494 @param between (bool): confound source and dest (ignore the direction)
495 @param search (unicode): pattern to filter the history results
496 @param profile (unicode): %(doc_profile)s
497 @return: list of tuple as in [messageNew]
498 """
499 assert profile
500 if filters is None:
501 filters = {}
502 if limit == 0:
503 return defer.succeed([])
504
505 query_parts = [u"SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
506 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\
507 FROM history LEFT JOIN message ON history.uid = message.history_uid\
508 LEFT JOIN subject ON history.uid=subject.history_uid\
509 LEFT JOIN thread ON history.uid=thread.history_uid\
510 WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here
511 values = [self.profiles[profile]]
512
513 def test_jid(type_, _jid):
514 values.append(_jid.userhost())
515 if _jid.resource:
516 values.append(_jid.resource)
517 return u'(%s=? AND %s_res=?)' % (type_, type_)
518 return u'%s=?' % (type_, )
519
520 if between:
521 query_parts.append(u"((%s AND %s) OR (%s AND %s))" % (test_jid('source', from_jid),
522 test_jid('dest', to_jid),
523 test_jid('source', to_jid),
524 test_jid('dest', from_jid)))
525 else:
526 query_parts.append(u"%s AND %s" % (test_jid('source', from_jid),
527 test_jid('dest', to_jid)))
528
529 if filters:
530 if 'body' in filters:
531 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html
532 query_parts.append(u"AND message LIKE ?")
533 values.append(u"%{}%".format(filters['body']))
534 if 'search' in filters:
535 query_parts.append(u"AND (message LIKE ? OR source_res LIKE ?)")
536 values.extend([u"%{}%".format(filters['search'])] * 2)
537 if 'types' in filters:
538 types = filters['types'].split()
539 query_parts.append(u"AND type IN ({})".format(u','.join("?"*len(types))))
540 values.extend(types)
541 if 'not_types' in filters:
542 types = filters['not_types'].split()
543 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types))))
544 values.extend(types)
545
546
547 query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList
548 # we use DESC here so LIMIT keep the last messages
549 if limit is not None:
550 query_parts.append(u"LIMIT ?")
551 values.append(limit)
552
553 d = self.dbpool.runQuery(u" ".join(query_parts), values)
554 d.addCallback(self.sqliteHistoryToList)
555 d.addCallback(self.listDict2listTuple)
556 return d
557
558 ## Private values
559
560 def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None):
561 """generic errback for data queries"""
562 log.error(_(u"Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format(
563 operation = operation,
564 namespace = namespace,
565 and_key = (u" and key " + key) if key is not None else u"",
566 for_profile = (u' [' + profile + u']') if profile is not None else u'',
567 msg = failure_))
568
569 def _generateDataDict(self, query_result, binary):
570 if binary:
571 return {k: pickle.loads(str(v)) for k,v in query_result}
572 else:
573 return dict(query_result)
574
575 def _getPrivateTable(self, binary, profile):
576 """Get table to use for private values"""
577 table = [u'private']
578
579 if profile is None:
580 table.append(u'gen')
581 else:
582 table.append(u'ind')
583
584 if binary:
585 table.append(u'bin')
586
587 return u'_'.join(table)
588
589 def getPrivates(self, namespace, keys=None, binary=False, profile=None):
590 """Get private value(s) from databases
591
592 @param namespace(unicode): namespace of the values
593 @param keys(iterable, None): keys of the values to get
594 None to get all keys/values
595 @param binary(bool): True to deserialise binary values
596 @param profile(unicode, None): profile to use for individual values
597 None to use general values
598 @return (dict[unicode, object]): gotten keys/values
599 """
600 log.debug(_(u"getting {type}{binary} private values from database for namespace {namespace}{keys}".format(
601 type = u"general" if profile is None else "individual",
602 binary = u" binary" if binary else u"",
603 namespace = namespace,
604 keys = u" with keys {}".format(u", ".join(keys)) if keys is not None else u"")))
605 table = self._getPrivateTable(binary, profile)
606 query_parts = [u"SELECT key,value FROM", table, "WHERE namespace=?"]
607 args = [namespace]
608
609 if keys is not None:
610 placeholders = u','.join(len(keys) * u'?')
611 query_parts.append(u'AND key IN (' + placeholders + u')')
612 args.extend(keys)
613
614 if profile is not None:
615 query_parts.append(u'AND profile_id=?')
616 args.append(self.profiles[profile])
617
618 d = self.dbpool.runQuery(u" ".join(query_parts), args)
619 d.addCallback(self._generateDataDict, binary)
620 d.addErrback(self._privateDataEb, u"get", namespace, profile=profile)
621 return d
622
623 def setPrivateValue(self, namespace, key, value, binary=False, profile=None):
624 """Set a private value in database
625
626 @param namespace(unicode): namespace of the values
627 @param key(unicode): key of the value to set
628 @param value(object): value to set
629 @param binary(bool): True if it's a binary values
630 binary values need to be serialised, used for everything but strings
631 @param profile(unicode, None): profile to use for individual value
632 if None, it's a general value
633 """
634 table = self._getPrivateTable(binary, profile)
635 query_values_names = [u'namespace', u'key', u'value']
636 query_values = [namespace, key]
637
638 if binary:
639 value = sqlite3.Binary(pickle.dumps(value, 0))
640
641 query_values.append(value)
642
643 if profile is not None:
644 query_values_names.append(u'profile_id')
645 query_values.append(self.profiles[profile])
646
647 query_parts = [u"REPLACE INTO", table, u'(', u','.join(query_values_names), u')',
648 u"VALUES (", u",".join(u'?'*len(query_values_names)), u')']
649
650 d = self.dbpool.runQuery(u" ".join(query_parts), query_values)
651 d.addErrback(self._privateDataEb, u"set", namespace, key, profile=profile)
652 return d
653
654 def delPrivateValue(self, namespace, key, binary=False, profile=None):
655 """Delete private value from database
656
657 @param category: category of the privateeter
658 @param key: key of the private value
659 @param binary(bool): True if it's a binary values
660 @param profile(unicode, None): profile to use for individual value
661 if None, it's a general value
662 """
663 table = self._getPrivateTable(binary, profile)
664 query_parts = [u"DELETE FROM", table, u"WHERE namespace=? AND key=?"]
665 args = [namespace, key]
666 if profile is not None:
667 query_parts.append(u"AND profile_id=?")
668 args.append(self.profiles[profile])
669 d = self.dbpool.runQuery(u" ".join(query_parts), args)
670 d.addErrback(self._privateDataEb, u"delete", namespace, key, profile=profile)
671 return d
672
673 ## Files
674
675 @defer.inlineCallbacks
676 def getFiles(self, client, file_id=None, version=u'', parent=None, type_=None,
677 file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None,
678 owner=None, access=None, projection=None, unique=False):
679 """retrieve files with with given filters
680
681 @param file_id(unicode, None): id of the file
682 None to ignore
683 @param version(unicode, None): version of the file
684 None to ignore
685 empty string to look for current version
686 @param parent(unicode, None): id of the directory containing the files
687 None to ignore
688 empty string to look for root files/directories
689 @param projection(list[unicode], None): name of columns to retrieve
690 None to retrieve all
691 @param unique(bool): if True will remove duplicates
692 other params are the same as for [setFile]
693 @return (list[dict]): files corresponding to filters
694 """
695 query_parts = ["SELECT"]
696 if unique:
697 query_parts.append('DISTINCT')
698 if projection is None:
699 projection = ['id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name',
700 'size', 'namespace', 'mime_type', 'created', 'modified', 'owner',
701 'access', 'extra']
702 query_parts.append(','.join(projection))
703 query_parts.append("FROM files WHERE")
704 filters = ['profile_id=?']
705 args = [self.profiles[client.profile]]
706
707 if file_id is not None:
708 filters.append(u'id=?')
709 args.append(file_id)
710 if version is not None:
711 filters.append(u'version=?')
712 args.append(version)
713 if parent is not None:
714 filters.append(u'parent=?')
715 args.append(parent)
716 if type_ is not None:
717 filters.append(u'type=?')
718 args.append(type_)
719 if file_hash is not None:
720 filters.append(u'file_hash=?')
721 args.append(file_hash)
722 if hash_algo is not None:
723 filters.append(u'hash_algo=?')
724 args.append(hash_algo)
725 if name is not None:
726 filters.append(u'name=?')
727 args.append(name)
728 if namespace is not None:
729 filters.append(u'namespace=?')
730 args.append(namespace)
731 if mime_type is not None:
732 filters.append(u'mime_type=?')
733 args.append(mime_type)
734 if owner is not None:
735 filters.append(u'owner=?')
736 args.append(owner.full())
737 if access is not None:
738 raise NotImplementedError('Access check is not implemented yet')
739 # a JSON comparison is needed here
740
741 filters = u' AND '.join(filters)
742 query_parts.append(filters)
743 query = u' '.join(query_parts)
744
745 result = yield self.dbpool.runQuery(query, args)
746 files_data = [dict(zip(projection, row)) for row in result]
747 to_parse = {'access', 'extra'}.intersection(projection)
748 to_filter = {'owner'}.intersection(projection)
749 if to_parse or to_filter:
750 for file_data in files_data:
751 for key in to_parse:
752 value = file_data[key]
753 file_data[key] = {} if value is None else json.loads(value)
754 owner = file_data.get('owner')
755 if owner is not None:
756 file_data['owner'] = jid.JID(owner)
757 defer.returnValue(files_data)
758
759 def setFile(self, client, name, file_id, version=u'', parent=None, type_=C.FILE_TYPE_FILE,
760 file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None,
761 created=None, modified=None, owner=None, access=None, extra=None):
762 """set a file metadata
763
764 @param client(SatXMPPClient): client owning the file
765 @param name(unicode): name of the file (must not contain "/")
766 @param file_id(unicode): unique id of the file
767 @param version(unicode): version of this file
768 @param parent(unicode): id of the directory containing this file
769 None if it is a root file/directory
770 @param type_(unicode): one of:
771 - file
772 - directory
773 @param file_hash(unicode): unique hash of the payload
774 @param hash_algo(unicode): algorithm used for hashing the file (usually sha-256)
775 @param size(int): size in bytes
776 @param namespace(unicode, None): identifier (human readable is better) to group files
777 for instance, namespace could be used to group files in a specific photo album
778 @param mime_type(unicode): MIME type of the file, or None if not known/guessed
779 @param created(int): UNIX time of creation
780 @param modified(int,None): UNIX time of last modification, or None to use created date
781 @param owner(jid.JID, None): jid of the owner of the file (mainly useful for component)
782 @param access(dict, None): serialisable dictionary with access rules. See [memory.memory] for details
783 @param extra(dict, None): serialisable dictionary of any extra data
784 will be encoded to json in database
785 """
786 if extra is not None:
787 assert isinstance(extra, dict)
788 query = ('INSERT INTO files(id, version, parent, type, file_hash, hash_algo, name, size, namespace, '
789 'mime_type, created, modified, owner, access, extra, profile_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)')
790 d = self.dbpool.runQuery(query, (file_id, version.strip(), parent, type_,
791 file_hash, hash_algo,
792 name, size, namespace,
793 mime_type, created, modified,
794 owner.full() if owner is not None else None,
795 json.dumps(access) if access else None,
796 json.dumps(extra) if extra else None,
797 self.profiles[client.profile]))
798 d.addErrback(lambda failure: log.error(_(u"Can't save file metadata for [{profile}]: {reason}".format(profile=client.profile, reason=failure))))
799 return d
800
801 def _fileUpdate(self, cursor, file_id, column, update_cb):
802 query = 'SELECT {column} FROM files where id=?'.format(column=column)
803 for i in xrange(5):
804 cursor.execute(query, [file_id])
805 try:
806 older_value_raw = cursor.fetchone()[0]
807 except TypeError:
808 raise exceptions.NotFound
809 value = json.loads(older_value_raw)
810 update_cb(value)
811 value_raw = json.dumps(value)
812 update_query = 'UPDATE files SET {column}=? WHERE id=? AND {column}=?'.format(column=column)
813 update_args = (value_raw, file_id, older_value_raw)
814 try:
815 cursor.execute(update_query, update_args)
816 except sqlite3.Error:
817 pass
818 else:
819 if cursor.rowcount == 1:
820 break;
821 log.warning(_(u"table not updated, probably due to race condition, trying again ({tries})").format(tries=i+1))
822 else:
823 log.error(_(u"Can't update file table"))
824
825 def fileUpdate(self, file_id, column, update_cb):
826 """update a column value using a method to avoid race conditions
827
828 the older value will be retrieved from database, then update_cb will be applied
829 to update it, and file will be updated checking that older value has not been changed meanwhile
830 by an other user. If it has changed, it tries again a couple of times before failing
831 @param column(str): column name (only "access" or "extra" are allowed)
832 @param update_cb(callable): method to update the value of the colum
833 the method will take older value as argument, and must update it in place
834 update_cb must not care about serialization,
835 it get the deserialized data (i.e. a Python object) directly
836 Note that the callable must be thread-safe
837 @raise exceptions.NotFound: there is not file with this id
838 """
839 if column not in ('access', 'extra'):
840 raise exceptions.InternalError('bad column name')
841 return self.dbpool.runInteraction(self._fileUpdate, file_id, column, update_cb)
842
843 ##Helper methods##
844
845 def __getFirstResult(self, result):
846 """Return the first result of a database query
847 Useful when we are looking for one specific value"""
848 return None if not result else result[0][0]
849
850
851 class Updater(object):
852 stmnt_regex = re.compile(r"[\w/' ]+(?:\(.*?\))?[^,]*")
853 clean_regex = re.compile(r"^ +|(?<= ) +|(?<=,) +| +$")
854 CREATE_SQL = "CREATE TABLE %s (%s)"
855 INSERT_SQL = "INSERT INTO %s VALUES (%s)"
856 DROP_SQL = "DROP TABLE %s"
857 ALTER_SQL = "ALTER TABLE %s ADD COLUMN %s"
858 RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s"
859
860 CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN')
861 TMP_TABLE = "tmp_sat_update"
862
863 def __init__(self, dbpool, sat_version):
864 self._sat_version = sat_version
865 self.dbpool = dbpool
866
867 def getLocalVersion(self):
868 """ Get local database version
869
870 @return: version (int)
871 """
872 return self.dbpool.runQuery("PRAGMA user_version").addCallback(lambda ret: int(ret[0][0]))
873
874 def _setLocalVersion(self, version):
875 """ Set local database version
876
877 @param version: version (int)
878 @return: deferred
879 """
880 return self.dbpool.runOperation("PRAGMA user_version=%d" % version)
881
882 def getLocalSchema(self):
883 """ return raw local schema
884
885 @return: list of strings with CREATE sql statements for local database
886 """
887 d = self.dbpool.runQuery("select sql from sqlite_master where type = 'table'")
888 d.addCallback(lambda result: [row[0] for row in result])
889 return d
890
891 @defer.inlineCallbacks
892 def checkUpdates(self):
893 """ Check if a database schema/content update is needed, according to DATABASE_SCHEMAS
894
895 @return: deferred which fire a list of SQL update statements, or None if no update is needed
896 """
897 local_version = yield self.getLocalVersion()
898 raw_local_sch = yield self.getLocalSchema()
899
900 local_sch = self.rawStatements2data(raw_local_sch)
901 current_sch = DATABASE_SCHEMAS['current']['CREATE']
902 local_hash = self.statementHash(local_sch)
903 current_hash = self.statementHash(current_sch)
904
905 # Force the update if the schemas are unchanged but a specific update is needed
906 force_update = local_hash == current_hash and local_version < CURRENT_DB_VERSION \
907 and 'specific' in DATABASE_SCHEMAS[CURRENT_DB_VERSION]
908
909 if local_hash == current_hash and not force_update:
910 if local_version != CURRENT_DB_VERSION:
911 log.warning(_("Your local schema is up-to-date, but database versions mismatch, fixing it..."))
912 yield self._setLocalVersion(CURRENT_DB_VERSION)
913 else:
914 # an update is needed
915
916 if local_version == CURRENT_DB_VERSION:
917 # Database mismatch and we have the latest version
918 if self._sat_version.endswith('D'):
919 # we are in a development version
920 update_data = self.generateUpdateData(local_sch, current_sch, False)
921 log.warning(_("There is a schema mismatch, but as we are on a dev version, database will be updated"))
922 update_raw = yield self.update2raw(update_data, True)
923 defer.returnValue(update_raw)
924 else:
925 log.error(_(u"schema version is up-to-date, but local schema differ from expected current schema"))
926 update_data = self.generateUpdateData(local_sch, current_sch, True)
927 update_raw = yield self.update2raw(update_data)
928 log.warning(_(u"Here are the commands that should fix the situation, use at your own risk (do a backup before modifying database), you can go to SàT's MUC room at sat@chat.jabberfr.org for help\n### SQL###\n%s\n### END SQL ###\n") % u'\n'.join("%s;" % statement for statement in update_raw))
929 raise exceptions.DatabaseError("Database mismatch")
930 else:
931 # Database is not up-to-date, we'll do the update
932 if force_update:
933 log.info(_("Database content needs a specific processing, local database will be updated"))
934 else:
935 log.info(_("Database schema has changed, local database will be updated"))
936 update_raw = []
937 for version in xrange(local_version + 1, CURRENT_DB_VERSION + 1):
938 try:
939 update_data = DATABASE_SCHEMAS[version]
940 except KeyError:
941 raise exceptions.InternalError("Missing update definition (version %d)" % version)
942 update_raw_step = yield self.update2raw(update_data)
943 update_raw.extend(update_raw_step)
944 update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION)
945 defer.returnValue(update_raw)
946
947 @staticmethod
948 def createData2Raw(data):
949 """ Generate SQL statements from statements data
950
951 @param data: dictionary with table as key, and statements data in tuples as value
952 @return: list of strings with raw statements
953 """
954 ret = []
955 for table in data:
956 defs, constraints = data[table]
957 assert isinstance(defs, tuple)
958 assert isinstance(constraints, tuple)
959 ret.append(Updater.CREATE_SQL % (table, ', '.join(defs + constraints)))
960 return ret
961
962 @staticmethod
963 def insertData2Raw(data):
964 """ Generate SQL statements from statements data
965
966 @param data: dictionary with table as key, and statements data in tuples as value
967 @return: list of strings with raw statements
968 """
969 ret = []
970 for table in data:
971 values_tuple = data[table]
972 assert isinstance(values_tuple, tuple)
973 for values in values_tuple:
974 assert isinstance(values, tuple)
975 ret.append(Updater.INSERT_SQL % (table, ', '.join(values)))
976 return ret
977
978 def statementHash(self, data):
979 """ Generate hash of template data
980
981 useful to compare schemas
982 @param data: dictionary of "CREATE" statement, with tables names as key,
983 and tuples of (col_defs, constraints) as values
984 @return: hash as string
985 """
986 hash_ = hashlib.sha1()
987 tables = data.keys()
988 tables.sort()
989
990 def stmnts2str(stmts):
991 return ','.join([self.clean_regex.sub('',stmt) for stmt in sorted(stmts)])
992
993 for table in tables:
994 col_defs, col_constr = data[table]
995 hash_.update("%s:%s:%s" % (table, stmnts2str(col_defs), stmnts2str(col_constr)))
996 return hash_.digest()
997
998 def rawStatements2data(self, raw_statements):
999 """ separate "CREATE" statements into dictionary/tuples data
1000
1001 @param raw_statements: list of CREATE statements as strings
1002 @return: dictionary with table names as key, and a (col_defs, constraints) tuple
1003 """
1004 schema_dict = {}
1005 for create_statement in raw_statements:
1006 if not create_statement.startswith("CREATE TABLE "):
1007 log.warning("Unexpected statement, ignoring it")
1008 continue
1009 _create_statement = create_statement[13:]
1010 table, raw_col_stats = _create_statement.split(' ',1)
1011 if raw_col_stats[0] != '(' or raw_col_stats[-1] != ')':
1012 log.warning("Unexpected statement structure, ignoring it")
1013 continue
1014 col_stats = [stmt.strip() for stmt in self.stmnt_regex.findall(raw_col_stats[1:-1])]
1015 col_defs = []
1016 constraints = []
1017 for col_stat in col_stats:
1018 name = col_stat.split(' ',1)[0]
1019 if name in self.CONSTRAINTS:
1020 constraints.append(col_stat)
1021 else:
1022 col_defs.append(col_stat)
1023 schema_dict[table] = (tuple(col_defs), tuple(constraints))
1024 return schema_dict
1025
1026 def generateUpdateData(self, old_data, new_data, modify=False):
1027 """ Generate data for automatic update between two schema data
1028
1029 @param old_data: data of the former schema (which must be updated)
1030 @param new_data: data of the current schema
1031 @param modify: if True, always use "cols modify" table, else try to ALTER tables
1032 @return: update data, a dictionary with:
1033 - 'create': dictionary of tables to create
1034 - 'delete': tuple of tables to delete
1035 - 'cols create': dictionary of columns to create (table as key, tuple of columns to create as value)
1036 - 'cols delete': dictionary of columns to delete (table as key, tuple of columns to delete as value)
1037 - 'cols modify': dictionary of columns to modify (table as key, tuple of old columns to transfert as value). With this table, a new table will be created, and content from the old table will be transfered to it, only cols specified in the tuple will be transfered.
1038 """
1039
1040 create_tables_data = {}
1041 create_cols_data = {}
1042 modify_cols_data = {}
1043 delete_cols_data = {}
1044 old_tables = set(old_data.keys())
1045 new_tables = set(new_data.keys())
1046
1047 def getChanges(set_olds, set_news):
1048 to_create = set_news.difference(set_olds)
1049 to_delete = set_olds.difference(set_news)
1050 to_check = set_news.intersection(set_olds)
1051 return tuple(to_create), tuple(to_delete), tuple(to_check)
1052
1053 tables_to_create, tables_to_delete, tables_to_check = getChanges(old_tables, new_tables)
1054
1055 for table in tables_to_create:
1056 create_tables_data[table] = new_data[table]
1057
1058 for table in tables_to_check:
1059 old_col_defs, old_constraints = old_data[table]
1060 new_col_defs, new_constraints = new_data[table]
1061 for obj in old_col_defs, old_constraints, new_col_defs, new_constraints:
1062 if not isinstance(obj, tuple):
1063 raise exceptions.InternalError("Columns definitions must be tuples")
1064 defs_create, defs_delete, ignore = getChanges(set(old_col_defs), set(new_col_defs))
1065 constraints_create, constraints_delete, ignore = getChanges(set(old_constraints), set(new_constraints))
1066 created_col_names = set([name.split(' ',1)[0] for name in defs_create])
1067 deleted_col_names = set([name.split(' ',1)[0] for name in defs_delete])
1068 if (created_col_names.intersection(deleted_col_names or constraints_create or constraints_delete) or
1069 (modify and (defs_create or constraints_create or defs_delete or constraints_delete))):
1070 # we have modified columns, we need to transfer table
1071 # we determinate which columns are in both schema so we can transfer them
1072 old_names = set([name.split(' ',1)[0] for name in old_col_defs])
1073 new_names = set([name.split(' ',1)[0] for name in new_col_defs])
1074 modify_cols_data[table] = tuple(old_names.intersection(new_names));
1075 else:
1076 if defs_create:
1077 create_cols_data[table] = (defs_create)
1078 if defs_delete or constraints_delete:
1079 delete_cols_data[table] = (defs_delete)
1080
1081 return {'create': create_tables_data,
1082 'delete': tables_to_delete,
1083 'cols create': create_cols_data,
1084 'cols delete': delete_cols_data,
1085 'cols modify': modify_cols_data
1086 }
1087
1088 @defer.inlineCallbacks
1089 def update2raw(self, update, dev_version=False):
1090 """ Transform update data to raw SQLite statements
1091
1092 @param update: update data as returned by generateUpdateData
1093 @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database.
1094 @return: list of string with SQL statements needed to update the base
1095 """
1096 ret = self.createData2Raw(update.get('create', {}))
1097 drop = []
1098 for table in update.get('delete', tuple()):
1099 drop.append(self.DROP_SQL % table)
1100 if dev_version:
1101 if drop:
1102 log.info("Dev version, SQL NOT EXECUTED:\n--\n%s\n--\n" % "\n".join(drop))
1103 else:
1104 ret.extend(drop)
1105
1106 cols_create = update.get('cols create', {})
1107 for table in cols_create:
1108 for col_def in cols_create[table]:
1109 ret.append(self.ALTER_SQL % (table, col_def))
1110
1111 cols_delete = update.get('cols delete', {})
1112 for table in cols_delete:
1113 log.info("Following columns in table [%s] are not needed anymore, but are kept for dev version: %s" % (table, ", ".join(cols_delete[table])))
1114
1115 cols_modify = update.get('cols modify', {})
1116 for table in cols_modify:
1117 ret.append(self.RENAME_TABLE_SQL % (table, self.TMP_TABLE))
1118 main, extra = DATABASE_SCHEMAS['current']['CREATE'][table]
1119 ret.append(self.CREATE_SQL % (table, ', '.join(main + extra)))
1120 common_cols = ', '.join(cols_modify[table])
1121 ret.append("INSERT INTO %s (%s) SELECT %s FROM %s" % (table, common_cols, common_cols, self.TMP_TABLE))
1122 ret.append(self.DROP_SQL % self.TMP_TABLE)
1123
1124 insert = update.get('insert', {})
1125 ret.extend(self.insertData2Raw(insert))
1126
1127 specific = update.get('specific', None)
1128 if specific:
1129 cmds = yield getattr(self, specific)()
1130 ret.extend(cmds or [])
1131 defer.returnValue(ret)
1132
1133 @defer.inlineCallbacks
1134 def update_v3(self):
1135 """Update database from v2 to v3 (message refactoring)"""
1136 # XXX: this update do all the messages in one huge transaction
1137 # this is really memory consuming, but was OK on a reasonably
1138 # big database for tests. If issues are happening, we can cut it
1139 # in smaller transactions using LIMIT and by deleting already updated
1140 # messages
1141 log.info(u"Database update to v3, this may take a while")
1142
1143 # we need to fix duplicate timestamp, as it can result in conflicts with the new schema
1144 rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1")
1145 if rows:
1146 log.info("fixing duplicate timestamp")
1147 fixed = []
1148 for timestamp, dummy in rows:
1149 ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,))
1150 for idx, (id_,) in enumerate(ids_rows):
1151 fixed.append(id_)
1152 yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_))
1153 log.info(u"fixed messages with ids {}".format(u', '.join([unicode(id_) for id_ in fixed])))
1154
1155 def historySchema(txn):
1156 log.info(u"History schema update")
1157 txn.execute("ALTER TABLE history RENAME TO tmp_sat_update")
1158 txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))")
1159 txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update")
1160
1161 yield self.dbpool.runInteraction(historySchema)
1162
1163 def newTables(txn):
1164 log.info(u"Creating new tables")
1165 txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
1166 txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
1167 txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
1168
1169 yield self.dbpool.runInteraction(newTables)
1170
1171 log.info(u"inserting new message type")
1172 yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',))
1173
1174 log.info(u"messages update")
1175 rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update")
1176 total = len(rows)
1177
1178 def updateHistory(txn, queries):
1179 for query, args in iter(queries):
1180 txn.execute(query, args)
1181
1182 queries = []
1183 for idx, row in enumerate(rows, 1):
1184 if idx % 1000 == 0 or total - idx == 0:
1185 log.info("preparing message {}/{}".format(idx, total))
1186 id_, timestamp, message, extra = row
1187 try:
1188 extra = pickle.loads(str(extra or ""))
1189 except EOFError:
1190 extra = {}
1191 except Exception:
1192 log.warning(u"Can't handle extra data for message id {}, ignoring it".format(id_))
1193 extra = {}
1194
1195 queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message)))
1196
1197 try:
1198 subject = extra.pop('subject')
1199 except KeyError:
1200 pass
1201 else:
1202 try:
1203 subject = subject.decode('utf-8')
1204 except UnicodeEncodeError:
1205 log.warning(u"Error while decoding subject, ignoring it")
1206 del extra['subject']
1207 else:
1208 queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject)))
1209
1210 received_timestamp = extra.pop('timestamp', None)
1211 try:
1212 del extra['archive']
1213 except KeyError:
1214 # archive was not used
1215 pass
1216
1217 queries.append(("UPDATE history SET received_timestamp=?,extra=? WHERE uid=?",(id_, received_timestamp, sqlite3.Binary(pickle.dumps(extra, 0)))))
1218
1219 yield self.dbpool.runInteraction(updateHistory, queries)
1220
1221 log.info("Dropping temporary table")
1222 yield self.dbpool.runQuery("DROP TABLE tmp_sat_update")
1223 log.info("Database update finished :)")
1224
1225 def update2raw_v2(self):
1226 """Update the database from v1 to v2 (add passwords encryptions):
1227
1228 - the XMPP password value is re-used for the profile password (new parameter)
1229 - the profile password is stored hashed
1230 - the XMPP password is stored encrypted, with the profile password as key
1231 - as there are no other stored passwords yet, it is enough, otherwise we
1232 would need to encrypt the other passwords as it's done for XMPP password
1233 """
1234 xmpp_pass_path = ('Connection', 'Password')
1235
1236 def encrypt_values(values):
1237 ret = []
1238 list_ = []
1239
1240 def prepare_queries(result, xmpp_password):
1241 try:
1242 id_ = result[0][0]
1243 except IndexError:
1244 log.error(u"Profile of id %d is referenced in 'param_ind' but it doesn't exist!" % profile_id)
1245 return defer.succeed(None)
1246
1247 sat_password = xmpp_password
1248 d1 = PasswordHasher.hash(sat_password)
1249 personal_key = BlockCipher.getRandomKey(base64=True)
1250 d2 = BlockCipher.encrypt(sat_password, personal_key)
1251 d3 = BlockCipher.encrypt(personal_key, xmpp_password)
1252
1253 def gotValues(res):
1254 sat_cipher, personal_cipher, xmpp_cipher = res[0][1], res[1][1], res[2][1]
1255 ret.append("INSERT INTO param_ind(category,name,profile_id,value) VALUES ('%s','%s',%s,'%s')" %
1256 (C.PROFILE_PASS_PATH[0], C.PROFILE_PASS_PATH[1], id_, sat_cipher))
1257
1258 ret.append("INSERT INTO private_ind(namespace,key,profile_id,value) VALUES ('%s','%s',%s,'%s')" %
1259 (C.MEMORY_CRYPTO_NAMESPACE, C.MEMORY_CRYPTO_KEY, id_, personal_cipher))
1260
1261 ret.append("REPLACE INTO param_ind(category,name,profile_id,value) VALUES ('%s','%s',%s,'%s')" %
1262 (xmpp_pass_path[0], xmpp_pass_path[1], id_, xmpp_cipher))
1263
1264 return defer.DeferredList([d1, d2, d3]).addCallback(gotValues)
1265
1266 for profile_id, xmpp_password in values:
1267 d = self.dbpool.runQuery("SELECT id FROM profiles WHERE id=?", (profile_id,))
1268 d.addCallback(prepare_queries, xmpp_password)
1269 list_.append(d)
1270
1271 d_list = defer.DeferredList(list_)
1272 d_list.addCallback(lambda dummy: ret)
1273 return d_list
1274
1275 def updateLiberviaConf(values):
1276 try:
1277 profile_id = values[0][0]
1278 except IndexError:
1279 return # no profile called "libervia"
1280
1281 def cb(selected):
1282 try:
1283 password = selected[0][0]
1284 except IndexError:
1285 log.error("Libervia profile exists but no password is set! Update Libervia configuration will be skipped.")
1286 return
1287 fixConfigOption('libervia', 'passphrase', password, False)
1288 d = self.dbpool.runQuery("SELECT value FROM param_ind WHERE category=? AND name=? AND profile_id=?", xmpp_pass_path + (profile_id,))
1289 return d.addCallback(cb)
1290
1291 d = self.dbpool.runQuery("SELECT id FROM profiles WHERE name='libervia'")
1292 d.addCallback(updateLiberviaConf)
1293 d.addCallback(lambda dummy: self.dbpool.runQuery("SELECT profile_id,value FROM param_ind WHERE category=? AND name=?", xmpp_pass_path))
1294 d.addCallback(encrypt_values)
1295 return d