Mercurial > libervia-backend
comparison sat/memory/sqla_mapping.py @ 3715:b9718216a1c0 0.9
merge bookmark 0.9
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Dec 2021 16:13:31 +0100 |
parents | 162866ca4be7 |
children | 658ddbabaf36 |
comparison
equal
deleted
inserted
replaced
3714:af09b5aaa5d7 | 3715:b9718216a1c0 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia: an XMPP client | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import pickle | |
20 import json | |
21 from datetime import datetime | |
22 import time | |
23 import enum | |
24 from sqlalchemy import ( | |
25 MetaData, Column, Integer, Text, Float, Boolean, DateTime, Enum, JSON, ForeignKey, | |
26 UniqueConstraint, Index, DDL, event | |
27 ) | |
28 | |
29 from sqlalchemy.orm import declarative_base, relationship | |
30 from sqlalchemy.types import TypeDecorator | |
31 from sqlalchemy.sql.functions import now | |
32 from twisted.words.protocols.jabber import jid | |
33 from wokkel import generic | |
34 | |
35 | |
36 Base = declarative_base( | |
37 metadata=MetaData( | |
38 naming_convention={ | |
39 "ix": 'ix_%(column_0_label)s', | |
40 "uq": "uq_%(table_name)s_%(column_0_name)s", | |
41 "ck": "ck_%(table_name)s_%(constraint_name)s", | |
42 "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", | |
43 "pk": "pk_%(table_name)s" | |
44 } | |
45 ) | |
46 ) | |
47 # keys which are in message data extra but not stored in extra field this is | |
48 # because those values are stored in separate fields | |
49 NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid') | |
50 | |
51 | |
52 class SyncState(enum.Enum): | |
53 #: synchronisation is currently in progress | |
54 IN_PROGRESS = 1 | |
55 #: synchronisation is done | |
56 COMPLETED = 2 | |
57 #: something wrong happened during synchronisation, won't sync | |
58 ERROR = 3 | |
59 #: synchronisation won't be done even if a syncing analyser matches | |
60 NO_SYNC = 4 | |
61 | |
62 | |
63 class LegacyPickle(TypeDecorator): | |
64 """Handle troubles with data pickled by former version of SàT | |
65 | |
66 This type is temporary until we do migration to a proper data type | |
67 """ | |
68 # Blob is used on SQLite but gives errors when used here, while Text works fine | |
69 impl = Text | |
70 cache_ok = True | |
71 | |
72 def process_bind_param(self, value, dialect): | |
73 if value is None: | |
74 return None | |
75 return pickle.dumps(value, 0) | |
76 | |
77 def process_result_value(self, value, dialect): | |
78 if value is None: | |
79 return None | |
80 # value types are inconsistent (probably a consequence of Python 2/3 port | |
81 # and/or SQLite dynamic typing) | |
82 try: | |
83 value = value.encode() | |
84 except AttributeError: | |
85 pass | |
86 # "utf-8" encoding is needed to handle Python 2 pickled data | |
87 return pickle.loads(value, encoding="utf-8") | |
88 | |
89 | |
90 class Json(TypeDecorator): | |
91 """Handle JSON field in DB independant way""" | |
92 # Blob is used on SQLite but gives errors when used here, while Text works fine | |
93 impl = Text | |
94 cache_ok = True | |
95 | |
96 def process_bind_param(self, value, dialect): | |
97 if value is None: | |
98 return None | |
99 return json.dumps(value) | |
100 | |
101 def process_result_value(self, value, dialect): | |
102 if value is None: | |
103 return None | |
104 return json.loads(value) | |
105 | |
106 | |
107 class JsonDefaultDict(Json): | |
108 """Json type which convert NULL to empty dict instead of None""" | |
109 | |
110 def process_result_value(self, value, dialect): | |
111 if value is None: | |
112 return {} | |
113 return json.loads(value) | |
114 | |
115 | |
116 class Xml(TypeDecorator): | |
117 impl = Text | |
118 cache_ok = True | |
119 | |
120 def process_bind_param(self, value, dialect): | |
121 if value is None: | |
122 return None | |
123 return value.toXml() | |
124 | |
125 def process_result_value(self, value, dialect): | |
126 if value is None: | |
127 return None | |
128 return generic.parseXml(value.encode()) | |
129 | |
130 | |
131 class JID(TypeDecorator): | |
132 """Store twisted JID in text fields""" | |
133 impl = Text | |
134 cache_ok = True | |
135 | |
136 def process_bind_param(self, value, dialect): | |
137 if value is None: | |
138 return None | |
139 return value.full() | |
140 | |
141 def process_result_value(self, value, dialect): | |
142 if value is None: | |
143 return None | |
144 return jid.JID(value) | |
145 | |
146 | |
147 class Profile(Base): | |
148 __tablename__ = "profiles" | |
149 | |
150 id = Column( | |
151 Integer, | |
152 primary_key=True, | |
153 nullable=True, | |
154 ) | |
155 name = Column(Text, unique=True) | |
156 | |
157 params = relationship("ParamInd", back_populates="profile", passive_deletes=True) | |
158 private_data = relationship( | |
159 "PrivateInd", back_populates="profile", passive_deletes=True | |
160 ) | |
161 private_bin_data = relationship( | |
162 "PrivateIndBin", back_populates="profile", passive_deletes=True | |
163 ) | |
164 | |
165 | |
166 class Component(Base): | |
167 __tablename__ = "components" | |
168 | |
169 profile_id = Column( | |
170 ForeignKey("profiles.id", ondelete="CASCADE"), | |
171 nullable=True, | |
172 primary_key=True | |
173 ) | |
174 entry_point = Column(Text, nullable=False) | |
175 profile = relationship("Profile") | |
176 | |
177 | |
178 class History(Base): | |
179 __tablename__ = "history" | |
180 __table_args__ = ( | |
181 UniqueConstraint("profile_id", "stanza_id", "source", "dest"), | |
182 Index("history__profile_id_timestamp", "profile_id", "timestamp"), | |
183 Index( | |
184 "history__profile_id_received_timestamp", "profile_id", "received_timestamp" | |
185 ) | |
186 ) | |
187 | |
188 uid = Column(Text, primary_key=True) | |
189 stanza_id = Column(Text) | |
190 update_uid = Column(Text) | |
191 profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) | |
192 source = Column(Text) | |
193 dest = Column(Text) | |
194 source_res = Column(Text) | |
195 dest_res = Column(Text) | |
196 timestamp = Column(Float, nullable=False) | |
197 received_timestamp = Column(Float) | |
198 type = Column( | |
199 Enum( | |
200 "chat", | |
201 "error", | |
202 "groupchat", | |
203 "headline", | |
204 "normal", | |
205 # info is not XMPP standard, but used to keep track of info like join/leave | |
206 # in a MUC | |
207 "info", | |
208 name="message_type", | |
209 create_constraint=True, | |
210 ), | |
211 nullable=False, | |
212 ) | |
213 extra = Column(LegacyPickle) | |
214 | |
215 profile = relationship("Profile") | |
216 messages = relationship("Message", backref="history", passive_deletes=True) | |
217 subjects = relationship("Subject", backref="history", passive_deletes=True) | |
218 thread = relationship( | |
219 "Thread", uselist=False, back_populates="history", passive_deletes=True | |
220 ) | |
221 | |
222 def __init__(self, *args, **kwargs): | |
223 source_jid = kwargs.pop("source_jid", None) | |
224 if source_jid is not None: | |
225 kwargs["source"] = source_jid.userhost() | |
226 kwargs["source_res"] = source_jid.resource | |
227 dest_jid = kwargs.pop("dest_jid", None) | |
228 if dest_jid is not None: | |
229 kwargs["dest"] = dest_jid.userhost() | |
230 kwargs["dest_res"] = dest_jid.resource | |
231 super().__init__(*args, **kwargs) | |
232 | |
233 @property | |
234 def source_jid(self) -> jid.JID: | |
235 return jid.JID(f"{self.source}/{self.source_res or ''}") | |
236 | |
237 @source_jid.setter | |
238 def source_jid(self, source_jid: jid.JID) -> None: | |
239 self.source = source_jid.userhost | |
240 self.source_res = source_jid.resource | |
241 | |
242 @property | |
243 def dest_jid(self): | |
244 return jid.JID(f"{self.dest}/{self.dest_res or ''}") | |
245 | |
246 @dest_jid.setter | |
247 def dest_jid(self, dest_jid: jid.JID) -> None: | |
248 self.dest = dest_jid.userhost | |
249 self.dest_res = dest_jid.resource | |
250 | |
251 def __repr__(self): | |
252 dt = datetime.fromtimestamp(self.timestamp) | |
253 return f"History<{self.source_jid.full()}->{self.dest_jid.full()} [{dt}]>" | |
254 | |
255 def serialise(self): | |
256 extra = self.extra | |
257 if self.stanza_id is not None: | |
258 extra["stanza_id"] = self.stanza_id | |
259 if self.update_uid is not None: | |
260 extra["update_uid"] = self.update_uid | |
261 if self.received_timestamp is not None: | |
262 extra["received_timestamp"] = self.received_timestamp | |
263 if self.thread is not None: | |
264 extra["thread"] = self.thread.thread_id | |
265 if self.thread.parent_id is not None: | |
266 extra["thread_parent"] = self.thread.parent_id | |
267 | |
268 | |
269 return { | |
270 "from": f"{self.source}/{self.source_res}" if self.source_res | |
271 else self.source, | |
272 "to": f"{self.dest}/{self.dest_res}" if self.dest_res else self.dest, | |
273 "uid": self.uid, | |
274 "message": {m.language or '': m.message for m in self.messages}, | |
275 "subject": {m.language or '': m.subject for m in self.subjects}, | |
276 "type": self.type, | |
277 "extra": extra, | |
278 "timestamp": self.timestamp, | |
279 } | |
280 | |
281 def as_tuple(self): | |
282 d = self.serialise() | |
283 return ( | |
284 d['uid'], d['timestamp'], d['from'], d['to'], d['message'], d['subject'], | |
285 d['type'], d['extra'] | |
286 ) | |
287 | |
288 @staticmethod | |
289 def debug_collection(history_collection): | |
290 for idx, history in enumerate(history_collection): | |
291 history.debug_msg(idx) | |
292 | |
293 def debug_msg(self, idx=None): | |
294 """Print messages""" | |
295 dt = datetime.fromtimestamp(self.timestamp) | |
296 if idx is not None: | |
297 dt = f"({idx}) {dt}" | |
298 parts = [] | |
299 parts.append(f"[{dt}]<{self.source_jid.full()}->{self.dest_jid.full()}> ") | |
300 for message in self.messages: | |
301 if message.language: | |
302 parts.append(f"[{message.language}] ") | |
303 parts.append(f"{message.message}\n") | |
304 print("".join(parts)) | |
305 | |
306 | |
307 class Message(Base): | |
308 __tablename__ = "message" | |
309 __table_args__ = ( | |
310 Index("message__history_uid", "history_uid"), | |
311 ) | |
312 | |
313 id = Column( | |
314 Integer, | |
315 primary_key=True, | |
316 ) | |
317 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE")) | |
318 message = Column(Text) | |
319 language = Column(Text) | |
320 | |
321 def __repr__(self): | |
322 lang_str = f"[{self.language}]" if self.language else "" | |
323 msg = f"{self.message[:20]}…" if len(self.message)>20 else self.message | |
324 content = f"{lang_str}{msg}" | |
325 return f"Message<{content}>" | |
326 | |
327 | |
328 class Subject(Base): | |
329 __tablename__ = "subject" | |
330 __table_args__ = ( | |
331 Index("subject__history_uid", "history_uid"), | |
332 ) | |
333 | |
334 id = Column( | |
335 Integer, | |
336 primary_key=True, | |
337 ) | |
338 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE")) | |
339 subject = Column(Text) | |
340 language = Column(Text) | |
341 | |
342 def __repr__(self): | |
343 lang_str = f"[{self.language}]" if self.language else "" | |
344 msg = f"{self.subject[:20]}…" if len(self.subject)>20 else self.subject | |
345 content = f"{lang_str}{msg}" | |
346 return f"Subject<{content}>" | |
347 | |
348 | |
349 class Thread(Base): | |
350 __tablename__ = "thread" | |
351 __table_args__ = ( | |
352 Index("thread__history_uid", "history_uid"), | |
353 ) | |
354 | |
355 id = Column( | |
356 Integer, | |
357 primary_key=True, | |
358 ) | |
359 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE")) | |
360 thread_id = Column(Text) | |
361 parent_id = Column(Text) | |
362 | |
363 history = relationship("History", uselist=False, back_populates="thread") | |
364 | |
365 def __repr__(self): | |
366 return f"Thread<{self.thread_id} [parent: {self.parent_id}]>" | |
367 | |
368 | |
369 class ParamGen(Base): | |
370 __tablename__ = "param_gen" | |
371 | |
372 category = Column(Text, primary_key=True) | |
373 name = Column(Text, primary_key=True) | |
374 value = Column(Text) | |
375 | |
376 | |
377 class ParamInd(Base): | |
378 __tablename__ = "param_ind" | |
379 | |
380 category = Column(Text, primary_key=True) | |
381 name = Column(Text, primary_key=True) | |
382 profile_id = Column( | |
383 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True | |
384 ) | |
385 value = Column(Text) | |
386 | |
387 profile = relationship("Profile", back_populates="params") | |
388 | |
389 | |
390 class PrivateGen(Base): | |
391 __tablename__ = "private_gen" | |
392 | |
393 namespace = Column(Text, primary_key=True) | |
394 key = Column(Text, primary_key=True) | |
395 value = Column(Text) | |
396 | |
397 | |
398 class PrivateInd(Base): | |
399 __tablename__ = "private_ind" | |
400 | |
401 namespace = Column(Text, primary_key=True) | |
402 key = Column(Text, primary_key=True) | |
403 profile_id = Column( | |
404 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True | |
405 ) | |
406 value = Column(Text) | |
407 | |
408 profile = relationship("Profile", back_populates="private_data") | |
409 | |
410 | |
411 class PrivateGenBin(Base): | |
412 __tablename__ = "private_gen_bin" | |
413 | |
414 namespace = Column(Text, primary_key=True) | |
415 key = Column(Text, primary_key=True) | |
416 value = Column(LegacyPickle) | |
417 | |
418 | |
419 class PrivateIndBin(Base): | |
420 __tablename__ = "private_ind_bin" | |
421 | |
422 namespace = Column(Text, primary_key=True) | |
423 key = Column(Text, primary_key=True) | |
424 profile_id = Column( | |
425 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True | |
426 ) | |
427 value = Column(LegacyPickle) | |
428 | |
429 profile = relationship("Profile", back_populates="private_bin_data") | |
430 | |
431 | |
432 class File(Base): | |
433 __tablename__ = "files" | |
434 __table_args__ = ( | |
435 Index("files__profile_id_owner_parent", "profile_id", "owner", "parent"), | |
436 Index( | |
437 "files__profile_id_owner_media_type_media_subtype", | |
438 "profile_id", | |
439 "owner", | |
440 "media_type", | |
441 "media_subtype" | |
442 ) | |
443 ) | |
444 | |
445 id = Column(Text, primary_key=True) | |
446 public_id = Column(Text, unique=True) | |
447 version = Column(Text, primary_key=True) | |
448 parent = Column(Text, nullable=False) | |
449 type = Column( | |
450 Enum( | |
451 "file", "directory", | |
452 name="file_type", | |
453 create_constraint=True | |
454 ), | |
455 nullable=False, | |
456 server_default="file", | |
457 ) | |
458 file_hash = Column(Text) | |
459 hash_algo = Column(Text) | |
460 name = Column(Text, nullable=False) | |
461 size = Column(Integer) | |
462 namespace = Column(Text) | |
463 media_type = Column(Text) | |
464 media_subtype = Column(Text) | |
465 created = Column(Float, nullable=False) | |
466 modified = Column(Float) | |
467 owner = Column(JID) | |
468 access = Column(JsonDefaultDict) | |
469 extra = Column(JsonDefaultDict) | |
470 profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) | |
471 | |
472 profile = relationship("Profile") | |
473 | |
474 | |
475 class PubsubNode(Base): | |
476 __tablename__ = "pubsub_nodes" | |
477 __table_args__ = ( | |
478 UniqueConstraint("profile_id", "service", "name"), | |
479 ) | |
480 | |
481 id = Column(Integer, primary_key=True) | |
482 profile_id = Column( | |
483 ForeignKey("profiles.id", ondelete="CASCADE") | |
484 ) | |
485 service = Column(JID) | |
486 name = Column(Text, nullable=False) | |
487 subscribed = Column( | |
488 Boolean(create_constraint=True, name="subscribed_bool"), nullable=False | |
489 ) | |
490 analyser = Column(Text) | |
491 sync_state = Column( | |
492 Enum( | |
493 SyncState, | |
494 name="sync_state", | |
495 create_constraint=True, | |
496 ), | |
497 nullable=True | |
498 ) | |
499 sync_state_updated = Column( | |
500 Float, | |
501 nullable=False, | |
502 default=time.time() | |
503 ) | |
504 type_ = Column( | |
505 Text, name="type", nullable=True | |
506 ) | |
507 subtype = Column( | |
508 Text, nullable=True | |
509 ) | |
510 extra = Column(JSON) | |
511 | |
512 items = relationship("PubsubItem", back_populates="node", passive_deletes=True) | |
513 | |
514 def __str__(self): | |
515 return f"Pubsub node {self.name!r} at {self.service}" | |
516 | |
517 | |
518 class PubsubItem(Base): | |
519 __tablename__ = "pubsub_items" | |
520 __table_args__ = ( | |
521 UniqueConstraint("node_id", "name"), | |
522 ) | |
523 id = Column(Integer, primary_key=True) | |
524 node_id = Column(ForeignKey("pubsub_nodes.id", ondelete="CASCADE"), nullable=False) | |
525 name = Column(Text, nullable=False) | |
526 data = Column(Xml, nullable=False) | |
527 created = Column(DateTime, nullable=False, server_default=now()) | |
528 updated = Column(DateTime, nullable=False, server_default=now(), onupdate=now()) | |
529 parsed = Column(JSON) | |
530 | |
531 node = relationship("PubsubNode", back_populates="items") | |
532 | |
533 | |
534 ## Full-Text Search | |
535 | |
536 # create | |
537 | |
538 @event.listens_for(PubsubItem.__table__, "after_create") | |
539 def fts_create(target, connection, **kw): | |
540 """Full-Text Search table creation""" | |
541 if connection.engine.name == "sqlite": | |
542 # Using SQLite FTS5 | |
543 queries = [ | |
544 "CREATE VIRTUAL TABLE pubsub_items_fts " | |
545 "USING fts5(data, content=pubsub_items, content_rowid=id)", | |
546 "CREATE TRIGGER pubsub_items_fts_sync_ins AFTER INSERT ON pubsub_items BEGIN" | |
547 " INSERT INTO pubsub_items_fts(rowid, data) VALUES (new.id, new.data);" | |
548 "END", | |
549 "CREATE TRIGGER pubsub_items_fts_sync_del AFTER DELETE ON pubsub_items BEGIN" | |
550 " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) " | |
551 "VALUES('delete', old.id, old.data);" | |
552 "END", | |
553 "CREATE TRIGGER pubsub_items_fts_sync_upd AFTER UPDATE ON pubsub_items BEGIN" | |
554 " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) VALUES" | |
555 "('delete', old.id, old.data);" | |
556 " INSERT INTO pubsub_items_fts(rowid, data) VALUES(new.id, new.data);" | |
557 "END" | |
558 ] | |
559 for q in queries: | |
560 connection.execute(DDL(q)) | |
561 | |
562 # drop | |
563 | |
564 @event.listens_for(PubsubItem.__table__, "before_drop") | |
565 def fts_drop(target, connection, **kw): | |
566 "Full-Text Search table drop" "" | |
567 if connection.engine.name == "sqlite": | |
568 # Using SQLite FTS5 | |
569 queries = [ | |
570 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_ins", | |
571 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_del", | |
572 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_upd", | |
573 "DROP TABLE IF EXISTS pubsub_items_fts", | |
574 ] | |
575 for q in queries: | |
576 connection.execute(DDL(q)) |