comparison sat/memory/sqla_mapping.py @ 3715:b9718216a1c0 0.9

merge bookmark 0.9
author Goffi <goffi@goffi.org>
date Wed, 01 Dec 2021 16:13:31 +0100
parents 162866ca4be7
children 658ddbabaf36
comparison
equal deleted inserted replaced
3714:af09b5aaa5d7 3715:b9718216a1c0
1 #!/usr/bin/env python3
2
3 # Libervia: an XMPP client
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import pickle
20 import json
21 from datetime import datetime
22 import time
23 import enum
24 from sqlalchemy import (
25 MetaData, Column, Integer, Text, Float, Boolean, DateTime, Enum, JSON, ForeignKey,
26 UniqueConstraint, Index, DDL, event
27 )
28
29 from sqlalchemy.orm import declarative_base, relationship
30 from sqlalchemy.types import TypeDecorator
31 from sqlalchemy.sql.functions import now
32 from twisted.words.protocols.jabber import jid
33 from wokkel import generic
34
35
36 Base = declarative_base(
37 metadata=MetaData(
38 naming_convention={
39 "ix": 'ix_%(column_0_label)s',
40 "uq": "uq_%(table_name)s_%(column_0_name)s",
41 "ck": "ck_%(table_name)s_%(constraint_name)s",
42 "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
43 "pk": "pk_%(table_name)s"
44 }
45 )
46 )
47 # keys which are in message data extra but not stored in extra field this is
48 # because those values are stored in separate fields
49 NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid')
50
51
52 class SyncState(enum.Enum):
53 #: synchronisation is currently in progress
54 IN_PROGRESS = 1
55 #: synchronisation is done
56 COMPLETED = 2
57 #: something wrong happened during synchronisation, won't sync
58 ERROR = 3
59 #: synchronisation won't be done even if a syncing analyser matches
60 NO_SYNC = 4
61
62
63 class LegacyPickle(TypeDecorator):
64 """Handle troubles with data pickled by former version of SàT
65
66 This type is temporary until we do migration to a proper data type
67 """
68 # Blob is used on SQLite but gives errors when used here, while Text works fine
69 impl = Text
70 cache_ok = True
71
72 def process_bind_param(self, value, dialect):
73 if value is None:
74 return None
75 return pickle.dumps(value, 0)
76
77 def process_result_value(self, value, dialect):
78 if value is None:
79 return None
80 # value types are inconsistent (probably a consequence of Python 2/3 port
81 # and/or SQLite dynamic typing)
82 try:
83 value = value.encode()
84 except AttributeError:
85 pass
86 # "utf-8" encoding is needed to handle Python 2 pickled data
87 return pickle.loads(value, encoding="utf-8")
88
89
90 class Json(TypeDecorator):
91 """Handle JSON field in DB independant way"""
92 # Blob is used on SQLite but gives errors when used here, while Text works fine
93 impl = Text
94 cache_ok = True
95
96 def process_bind_param(self, value, dialect):
97 if value is None:
98 return None
99 return json.dumps(value)
100
101 def process_result_value(self, value, dialect):
102 if value is None:
103 return None
104 return json.loads(value)
105
106
107 class JsonDefaultDict(Json):
108 """Json type which convert NULL to empty dict instead of None"""
109
110 def process_result_value(self, value, dialect):
111 if value is None:
112 return {}
113 return json.loads(value)
114
115
116 class Xml(TypeDecorator):
117 impl = Text
118 cache_ok = True
119
120 def process_bind_param(self, value, dialect):
121 if value is None:
122 return None
123 return value.toXml()
124
125 def process_result_value(self, value, dialect):
126 if value is None:
127 return None
128 return generic.parseXml(value.encode())
129
130
131 class JID(TypeDecorator):
132 """Store twisted JID in text fields"""
133 impl = Text
134 cache_ok = True
135
136 def process_bind_param(self, value, dialect):
137 if value is None:
138 return None
139 return value.full()
140
141 def process_result_value(self, value, dialect):
142 if value is None:
143 return None
144 return jid.JID(value)
145
146
147 class Profile(Base):
148 __tablename__ = "profiles"
149
150 id = Column(
151 Integer,
152 primary_key=True,
153 nullable=True,
154 )
155 name = Column(Text, unique=True)
156
157 params = relationship("ParamInd", back_populates="profile", passive_deletes=True)
158 private_data = relationship(
159 "PrivateInd", back_populates="profile", passive_deletes=True
160 )
161 private_bin_data = relationship(
162 "PrivateIndBin", back_populates="profile", passive_deletes=True
163 )
164
165
166 class Component(Base):
167 __tablename__ = "components"
168
169 profile_id = Column(
170 ForeignKey("profiles.id", ondelete="CASCADE"),
171 nullable=True,
172 primary_key=True
173 )
174 entry_point = Column(Text, nullable=False)
175 profile = relationship("Profile")
176
177
178 class History(Base):
179 __tablename__ = "history"
180 __table_args__ = (
181 UniqueConstraint("profile_id", "stanza_id", "source", "dest"),
182 Index("history__profile_id_timestamp", "profile_id", "timestamp"),
183 Index(
184 "history__profile_id_received_timestamp", "profile_id", "received_timestamp"
185 )
186 )
187
188 uid = Column(Text, primary_key=True)
189 stanza_id = Column(Text)
190 update_uid = Column(Text)
191 profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"))
192 source = Column(Text)
193 dest = Column(Text)
194 source_res = Column(Text)
195 dest_res = Column(Text)
196 timestamp = Column(Float, nullable=False)
197 received_timestamp = Column(Float)
198 type = Column(
199 Enum(
200 "chat",
201 "error",
202 "groupchat",
203 "headline",
204 "normal",
205 # info is not XMPP standard, but used to keep track of info like join/leave
206 # in a MUC
207 "info",
208 name="message_type",
209 create_constraint=True,
210 ),
211 nullable=False,
212 )
213 extra = Column(LegacyPickle)
214
215 profile = relationship("Profile")
216 messages = relationship("Message", backref="history", passive_deletes=True)
217 subjects = relationship("Subject", backref="history", passive_deletes=True)
218 thread = relationship(
219 "Thread", uselist=False, back_populates="history", passive_deletes=True
220 )
221
222 def __init__(self, *args, **kwargs):
223 source_jid = kwargs.pop("source_jid", None)
224 if source_jid is not None:
225 kwargs["source"] = source_jid.userhost()
226 kwargs["source_res"] = source_jid.resource
227 dest_jid = kwargs.pop("dest_jid", None)
228 if dest_jid is not None:
229 kwargs["dest"] = dest_jid.userhost()
230 kwargs["dest_res"] = dest_jid.resource
231 super().__init__(*args, **kwargs)
232
233 @property
234 def source_jid(self) -> jid.JID:
235 return jid.JID(f"{self.source}/{self.source_res or ''}")
236
237 @source_jid.setter
238 def source_jid(self, source_jid: jid.JID) -> None:
239 self.source = source_jid.userhost
240 self.source_res = source_jid.resource
241
242 @property
243 def dest_jid(self):
244 return jid.JID(f"{self.dest}/{self.dest_res or ''}")
245
246 @dest_jid.setter
247 def dest_jid(self, dest_jid: jid.JID) -> None:
248 self.dest = dest_jid.userhost
249 self.dest_res = dest_jid.resource
250
251 def __repr__(self):
252 dt = datetime.fromtimestamp(self.timestamp)
253 return f"History<{self.source_jid.full()}->{self.dest_jid.full()} [{dt}]>"
254
255 def serialise(self):
256 extra = self.extra
257 if self.stanza_id is not None:
258 extra["stanza_id"] = self.stanza_id
259 if self.update_uid is not None:
260 extra["update_uid"] = self.update_uid
261 if self.received_timestamp is not None:
262 extra["received_timestamp"] = self.received_timestamp
263 if self.thread is not None:
264 extra["thread"] = self.thread.thread_id
265 if self.thread.parent_id is not None:
266 extra["thread_parent"] = self.thread.parent_id
267
268
269 return {
270 "from": f"{self.source}/{self.source_res}" if self.source_res
271 else self.source,
272 "to": f"{self.dest}/{self.dest_res}" if self.dest_res else self.dest,
273 "uid": self.uid,
274 "message": {m.language or '': m.message for m in self.messages},
275 "subject": {m.language or '': m.subject for m in self.subjects},
276 "type": self.type,
277 "extra": extra,
278 "timestamp": self.timestamp,
279 }
280
281 def as_tuple(self):
282 d = self.serialise()
283 return (
284 d['uid'], d['timestamp'], d['from'], d['to'], d['message'], d['subject'],
285 d['type'], d['extra']
286 )
287
288 @staticmethod
289 def debug_collection(history_collection):
290 for idx, history in enumerate(history_collection):
291 history.debug_msg(idx)
292
293 def debug_msg(self, idx=None):
294 """Print messages"""
295 dt = datetime.fromtimestamp(self.timestamp)
296 if idx is not None:
297 dt = f"({idx}) {dt}"
298 parts = []
299 parts.append(f"[{dt}]<{self.source_jid.full()}->{self.dest_jid.full()}> ")
300 for message in self.messages:
301 if message.language:
302 parts.append(f"[{message.language}] ")
303 parts.append(f"{message.message}\n")
304 print("".join(parts))
305
306
307 class Message(Base):
308 __tablename__ = "message"
309 __table_args__ = (
310 Index("message__history_uid", "history_uid"),
311 )
312
313 id = Column(
314 Integer,
315 primary_key=True,
316 )
317 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"))
318 message = Column(Text)
319 language = Column(Text)
320
321 def __repr__(self):
322 lang_str = f"[{self.language}]" if self.language else ""
323 msg = f"{self.message[:20]}…" if len(self.message)>20 else self.message
324 content = f"{lang_str}{msg}"
325 return f"Message<{content}>"
326
327
328 class Subject(Base):
329 __tablename__ = "subject"
330 __table_args__ = (
331 Index("subject__history_uid", "history_uid"),
332 )
333
334 id = Column(
335 Integer,
336 primary_key=True,
337 )
338 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"))
339 subject = Column(Text)
340 language = Column(Text)
341
342 def __repr__(self):
343 lang_str = f"[{self.language}]" if self.language else ""
344 msg = f"{self.subject[:20]}…" if len(self.subject)>20 else self.subject
345 content = f"{lang_str}{msg}"
346 return f"Subject<{content}>"
347
348
349 class Thread(Base):
350 __tablename__ = "thread"
351 __table_args__ = (
352 Index("thread__history_uid", "history_uid"),
353 )
354
355 id = Column(
356 Integer,
357 primary_key=True,
358 )
359 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"))
360 thread_id = Column(Text)
361 parent_id = Column(Text)
362
363 history = relationship("History", uselist=False, back_populates="thread")
364
365 def __repr__(self):
366 return f"Thread<{self.thread_id} [parent: {self.parent_id}]>"
367
368
369 class ParamGen(Base):
370 __tablename__ = "param_gen"
371
372 category = Column(Text, primary_key=True)
373 name = Column(Text, primary_key=True)
374 value = Column(Text)
375
376
377 class ParamInd(Base):
378 __tablename__ = "param_ind"
379
380 category = Column(Text, primary_key=True)
381 name = Column(Text, primary_key=True)
382 profile_id = Column(
383 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True
384 )
385 value = Column(Text)
386
387 profile = relationship("Profile", back_populates="params")
388
389
390 class PrivateGen(Base):
391 __tablename__ = "private_gen"
392
393 namespace = Column(Text, primary_key=True)
394 key = Column(Text, primary_key=True)
395 value = Column(Text)
396
397
398 class PrivateInd(Base):
399 __tablename__ = "private_ind"
400
401 namespace = Column(Text, primary_key=True)
402 key = Column(Text, primary_key=True)
403 profile_id = Column(
404 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True
405 )
406 value = Column(Text)
407
408 profile = relationship("Profile", back_populates="private_data")
409
410
411 class PrivateGenBin(Base):
412 __tablename__ = "private_gen_bin"
413
414 namespace = Column(Text, primary_key=True)
415 key = Column(Text, primary_key=True)
416 value = Column(LegacyPickle)
417
418
419 class PrivateIndBin(Base):
420 __tablename__ = "private_ind_bin"
421
422 namespace = Column(Text, primary_key=True)
423 key = Column(Text, primary_key=True)
424 profile_id = Column(
425 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True
426 )
427 value = Column(LegacyPickle)
428
429 profile = relationship("Profile", back_populates="private_bin_data")
430
431
432 class File(Base):
433 __tablename__ = "files"
434 __table_args__ = (
435 Index("files__profile_id_owner_parent", "profile_id", "owner", "parent"),
436 Index(
437 "files__profile_id_owner_media_type_media_subtype",
438 "profile_id",
439 "owner",
440 "media_type",
441 "media_subtype"
442 )
443 )
444
445 id = Column(Text, primary_key=True)
446 public_id = Column(Text, unique=True)
447 version = Column(Text, primary_key=True)
448 parent = Column(Text, nullable=False)
449 type = Column(
450 Enum(
451 "file", "directory",
452 name="file_type",
453 create_constraint=True
454 ),
455 nullable=False,
456 server_default="file",
457 )
458 file_hash = Column(Text)
459 hash_algo = Column(Text)
460 name = Column(Text, nullable=False)
461 size = Column(Integer)
462 namespace = Column(Text)
463 media_type = Column(Text)
464 media_subtype = Column(Text)
465 created = Column(Float, nullable=False)
466 modified = Column(Float)
467 owner = Column(JID)
468 access = Column(JsonDefaultDict)
469 extra = Column(JsonDefaultDict)
470 profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"))
471
472 profile = relationship("Profile")
473
474
475 class PubsubNode(Base):
476 __tablename__ = "pubsub_nodes"
477 __table_args__ = (
478 UniqueConstraint("profile_id", "service", "name"),
479 )
480
481 id = Column(Integer, primary_key=True)
482 profile_id = Column(
483 ForeignKey("profiles.id", ondelete="CASCADE")
484 )
485 service = Column(JID)
486 name = Column(Text, nullable=False)
487 subscribed = Column(
488 Boolean(create_constraint=True, name="subscribed_bool"), nullable=False
489 )
490 analyser = Column(Text)
491 sync_state = Column(
492 Enum(
493 SyncState,
494 name="sync_state",
495 create_constraint=True,
496 ),
497 nullable=True
498 )
499 sync_state_updated = Column(
500 Float,
501 nullable=False,
502 default=time.time()
503 )
504 type_ = Column(
505 Text, name="type", nullable=True
506 )
507 subtype = Column(
508 Text, nullable=True
509 )
510 extra = Column(JSON)
511
512 items = relationship("PubsubItem", back_populates="node", passive_deletes=True)
513
514 def __str__(self):
515 return f"Pubsub node {self.name!r} at {self.service}"
516
517
518 class PubsubItem(Base):
519 __tablename__ = "pubsub_items"
520 __table_args__ = (
521 UniqueConstraint("node_id", "name"),
522 )
523 id = Column(Integer, primary_key=True)
524 node_id = Column(ForeignKey("pubsub_nodes.id", ondelete="CASCADE"), nullable=False)
525 name = Column(Text, nullable=False)
526 data = Column(Xml, nullable=False)
527 created = Column(DateTime, nullable=False, server_default=now())
528 updated = Column(DateTime, nullable=False, server_default=now(), onupdate=now())
529 parsed = Column(JSON)
530
531 node = relationship("PubsubNode", back_populates="items")
532
533
534 ## Full-Text Search
535
536 # create
537
538 @event.listens_for(PubsubItem.__table__, "after_create")
539 def fts_create(target, connection, **kw):
540 """Full-Text Search table creation"""
541 if connection.engine.name == "sqlite":
542 # Using SQLite FTS5
543 queries = [
544 "CREATE VIRTUAL TABLE pubsub_items_fts "
545 "USING fts5(data, content=pubsub_items, content_rowid=id)",
546 "CREATE TRIGGER pubsub_items_fts_sync_ins AFTER INSERT ON pubsub_items BEGIN"
547 " INSERT INTO pubsub_items_fts(rowid, data) VALUES (new.id, new.data);"
548 "END",
549 "CREATE TRIGGER pubsub_items_fts_sync_del AFTER DELETE ON pubsub_items BEGIN"
550 " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) "
551 "VALUES('delete', old.id, old.data);"
552 "END",
553 "CREATE TRIGGER pubsub_items_fts_sync_upd AFTER UPDATE ON pubsub_items BEGIN"
554 " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) VALUES"
555 "('delete', old.id, old.data);"
556 " INSERT INTO pubsub_items_fts(rowid, data) VALUES(new.id, new.data);"
557 "END"
558 ]
559 for q in queries:
560 connection.execute(DDL(q))
561
562 # drop
563
564 @event.listens_for(PubsubItem.__table__, "before_drop")
565 def fts_drop(target, connection, **kw):
566 "Full-Text Search table drop" ""
567 if connection.engine.name == "sqlite":
568 # Using SQLite FTS5
569 queries = [
570 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_ins",
571 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_del",
572 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_upd",
573 "DROP TABLE IF EXISTS pubsub_items_fts",
574 ]
575 for q in queries:
576 connection.execute(DDL(q))