Mercurial > libervia-backend
comparison libervia/backend/memory/sqla_mapping.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/memory/sqla_mapping.py@fe4725bf42fb |
children | 684ba556a617 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia: an XMPP client | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import Dict, Any | |
20 from datetime import datetime | |
21 import enum | |
22 import json | |
23 import pickle | |
24 import time | |
25 | |
26 from sqlalchemy import ( | |
27 Boolean, | |
28 Column, | |
29 DDL, | |
30 DateTime, | |
31 Enum, | |
32 Float, | |
33 ForeignKey, | |
34 Index, | |
35 Integer, | |
36 JSON, | |
37 MetaData, | |
38 Text, | |
39 UniqueConstraint, | |
40 event, | |
41 ) | |
42 from sqlalchemy.orm import declarative_base, relationship | |
43 from sqlalchemy.sql.functions import now | |
44 from sqlalchemy.types import TypeDecorator | |
45 from twisted.words.protocols.jabber import jid | |
46 from wokkel import generic | |
47 | |
48 | |
49 Base = declarative_base( | |
50 metadata=MetaData( | |
51 naming_convention={ | |
52 "ix": 'ix_%(column_0_label)s', | |
53 "uq": "uq_%(table_name)s_%(column_0_name)s", | |
54 "ck": "ck_%(table_name)s_%(constraint_name)s", | |
55 "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", | |
56 "pk": "pk_%(table_name)s" | |
57 } | |
58 ) | |
59 ) | |
60 # keys which are in message data extra but not stored in extra field this is | |
61 # because those values are stored in separate fields | |
62 NOT_IN_EXTRA = ('origin_id', 'stanza_id', 'received_timestamp', 'update_uid') | |
63 | |
64 | |
65 class SyncState(enum.Enum): | |
66 #: synchronisation is currently in progress | |
67 IN_PROGRESS = 1 | |
68 #: synchronisation is done | |
69 COMPLETED = 2 | |
70 #: something wrong happened during synchronisation, won't sync | |
71 ERROR = 3 | |
72 #: synchronisation won't be done even if a syncing analyser matches | |
73 NO_SYNC = 4 | |
74 | |
75 | |
76 class SubscriptionState(enum.Enum): | |
77 SUBSCRIBED = 1 | |
78 PENDING = 2 | |
79 | |
80 | |
81 class LegacyPickle(TypeDecorator): | |
82 """Handle troubles with data pickled by former version of SàT | |
83 | |
84 This type is temporary until we do migration to a proper data type | |
85 """ | |
86 # Blob is used on SQLite but gives errors when used here, while Text works fine | |
87 impl = Text | |
88 cache_ok = True | |
89 | |
90 def process_bind_param(self, value, dialect): | |
91 if value is None: | |
92 return None | |
93 return pickle.dumps(value, 0) | |
94 | |
95 def process_result_value(self, value, dialect): | |
96 if value is None: | |
97 return None | |
98 # value types are inconsistent (probably a consequence of Python 2/3 port | |
99 # and/or SQLite dynamic typing) | |
100 try: | |
101 value = value.encode() | |
102 except AttributeError: | |
103 pass | |
104 # "utf-8" encoding is needed to handle Python 2 pickled data | |
105 return pickle.loads(value, encoding="utf-8") | |
106 | |
107 | |
108 class Json(TypeDecorator): | |
109 """Handle JSON field in DB independant way""" | |
110 # Blob is used on SQLite but gives errors when used here, while Text works fine | |
111 impl = Text | |
112 cache_ok = True | |
113 | |
114 def process_bind_param(self, value, dialect): | |
115 if value is None: | |
116 return None | |
117 return json.dumps(value) | |
118 | |
119 def process_result_value(self, value, dialect): | |
120 if value is None: | |
121 return None | |
122 return json.loads(value) | |
123 | |
124 | |
125 class JsonDefaultDict(Json): | |
126 """Json type which convert NULL to empty dict instead of None""" | |
127 | |
128 def process_result_value(self, value, dialect): | |
129 if value is None: | |
130 return {} | |
131 return json.loads(value) | |
132 | |
133 | |
134 class Xml(TypeDecorator): | |
135 impl = Text | |
136 cache_ok = True | |
137 | |
138 def process_bind_param(self, value, dialect): | |
139 if value is None: | |
140 return None | |
141 return value.toXml() | |
142 | |
143 def process_result_value(self, value, dialect): | |
144 if value is None: | |
145 return None | |
146 return generic.parseXml(value.encode()) | |
147 | |
148 | |
149 class JID(TypeDecorator): | |
150 """Store twisted JID in text fields""" | |
151 impl = Text | |
152 cache_ok = True | |
153 | |
154 def process_bind_param(self, value, dialect): | |
155 if value is None: | |
156 return None | |
157 return value.full() | |
158 | |
159 def process_result_value(self, value, dialect): | |
160 if value is None: | |
161 return None | |
162 return jid.JID(value) | |
163 | |
164 | |
165 class Profile(Base): | |
166 __tablename__ = "profiles" | |
167 | |
168 id = Column( | |
169 Integer, | |
170 primary_key=True, | |
171 nullable=True, | |
172 ) | |
173 name = Column(Text, unique=True) | |
174 | |
175 params = relationship("ParamInd", back_populates="profile", passive_deletes=True) | |
176 private_data = relationship( | |
177 "PrivateInd", back_populates="profile", passive_deletes=True | |
178 ) | |
179 private_bin_data = relationship( | |
180 "PrivateIndBin", back_populates="profile", passive_deletes=True | |
181 ) | |
182 | |
183 | |
184 class Component(Base): | |
185 __tablename__ = "components" | |
186 | |
187 profile_id = Column( | |
188 ForeignKey("profiles.id", ondelete="CASCADE"), | |
189 nullable=True, | |
190 primary_key=True | |
191 ) | |
192 entry_point = Column(Text, nullable=False) | |
193 profile = relationship("Profile") | |
194 | |
195 | |
196 class History(Base): | |
197 __tablename__ = "history" | |
198 __table_args__ = ( | |
199 UniqueConstraint("profile_id", "stanza_id", "source", "dest"), | |
200 UniqueConstraint("profile_id", "origin_id", "source", name="uq_origin_id"), | |
201 Index("history__profile_id_timestamp", "profile_id", "timestamp"), | |
202 Index( | |
203 "history__profile_id_received_timestamp", "profile_id", "received_timestamp" | |
204 ) | |
205 ) | |
206 | |
207 uid = Column(Text, primary_key=True) | |
208 origin_id = Column(Text) | |
209 stanza_id = Column(Text) | |
210 update_uid = Column(Text) | |
211 profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) | |
212 source = Column(Text) | |
213 dest = Column(Text) | |
214 source_res = Column(Text) | |
215 dest_res = Column(Text) | |
216 timestamp = Column(Float, nullable=False) | |
217 received_timestamp = Column(Float) | |
218 type = Column( | |
219 Enum( | |
220 "chat", | |
221 "error", | |
222 "groupchat", | |
223 "headline", | |
224 "normal", | |
225 # info is not XMPP standard, but used to keep track of info like join/leave | |
226 # in a MUC | |
227 "info", | |
228 name="message_type", | |
229 create_constraint=True, | |
230 ), | |
231 nullable=False, | |
232 ) | |
233 extra = Column(LegacyPickle) | |
234 | |
235 profile = relationship("Profile") | |
236 messages = relationship("Message", backref="history", passive_deletes=True) | |
237 subjects = relationship("Subject", backref="history", passive_deletes=True) | |
238 thread = relationship( | |
239 "Thread", uselist=False, back_populates="history", passive_deletes=True | |
240 ) | |
241 | |
242 def __init__(self, *args, **kwargs): | |
243 source_jid = kwargs.pop("source_jid", None) | |
244 if source_jid is not None: | |
245 kwargs["source"] = source_jid.userhost() | |
246 kwargs["source_res"] = source_jid.resource | |
247 dest_jid = kwargs.pop("dest_jid", None) | |
248 if dest_jid is not None: | |
249 kwargs["dest"] = dest_jid.userhost() | |
250 kwargs["dest_res"] = dest_jid.resource | |
251 super().__init__(*args, **kwargs) | |
252 | |
253 @property | |
254 def source_jid(self) -> jid.JID: | |
255 return jid.JID(f"{self.source}/{self.source_res or ''}") | |
256 | |
257 @source_jid.setter | |
258 def source_jid(self, source_jid: jid.JID) -> None: | |
259 self.source = source_jid.userhost | |
260 self.source_res = source_jid.resource | |
261 | |
262 @property | |
263 def dest_jid(self): | |
264 return jid.JID(f"{self.dest}/{self.dest_res or ''}") | |
265 | |
266 @dest_jid.setter | |
267 def dest_jid(self, dest_jid: jid.JID) -> None: | |
268 self.dest = dest_jid.userhost | |
269 self.dest_res = dest_jid.resource | |
270 | |
271 def __repr__(self): | |
272 dt = datetime.fromtimestamp(self.timestamp) | |
273 return f"History<{self.source_jid.full()}->{self.dest_jid.full()} [{dt}]>" | |
274 | |
275 def serialise(self): | |
276 extra = self.extra or {} | |
277 if self.origin_id is not None: | |
278 extra["origin_id"] = self.origin_id | |
279 if self.stanza_id is not None: | |
280 extra["stanza_id"] = self.stanza_id | |
281 if self.update_uid is not None: | |
282 extra["update_uid"] = self.update_uid | |
283 if self.received_timestamp is not None: | |
284 extra["received_timestamp"] = self.received_timestamp | |
285 if self.thread is not None: | |
286 extra["thread"] = self.thread.thread_id | |
287 if self.thread.parent_id is not None: | |
288 extra["thread_parent"] = self.thread.parent_id | |
289 | |
290 | |
291 return { | |
292 "from": f"{self.source}/{self.source_res}" if self.source_res | |
293 else self.source, | |
294 "to": f"{self.dest}/{self.dest_res}" if self.dest_res else self.dest, | |
295 "uid": self.uid, | |
296 "message": {m.language or '': m.message for m in self.messages}, | |
297 "subject": {m.language or '': m.subject for m in self.subjects}, | |
298 "type": self.type, | |
299 "extra": extra, | |
300 "timestamp": self.timestamp, | |
301 } | |
302 | |
303 def as_tuple(self): | |
304 d = self.serialise() | |
305 return ( | |
306 d['uid'], d['timestamp'], d['from'], d['to'], d['message'], d['subject'], | |
307 d['type'], d['extra'] | |
308 ) | |
309 | |
310 @staticmethod | |
311 def debug_collection(history_collection): | |
312 for idx, history in enumerate(history_collection): | |
313 history.debug_msg(idx) | |
314 | |
315 def debug_msg(self, idx=None): | |
316 """Print messages""" | |
317 dt = datetime.fromtimestamp(self.timestamp) | |
318 if idx is not None: | |
319 dt = f"({idx}) {dt}" | |
320 parts = [] | |
321 parts.append(f"[{dt}]<{self.source_jid.full()}->{self.dest_jid.full()}> ") | |
322 for message in self.messages: | |
323 if message.language: | |
324 parts.append(f"[{message.language}] ") | |
325 parts.append(f"{message.message}\n") | |
326 print("".join(parts)) | |
327 | |
328 | |
329 class Message(Base): | |
330 __tablename__ = "message" | |
331 __table_args__ = ( | |
332 Index("message__history_uid", "history_uid"), | |
333 ) | |
334 | |
335 id = Column( | |
336 Integer, | |
337 primary_key=True, | |
338 ) | |
339 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), nullable=False) | |
340 message = Column(Text, nullable=False) | |
341 language = Column(Text) | |
342 | |
343 def serialise(self) -> Dict[str, Any]: | |
344 s = {} | |
345 if self.message: | |
346 s["message"] = str(self.message) | |
347 if self.language: | |
348 s["language"] = str(self.language) | |
349 return s | |
350 | |
351 def __repr__(self): | |
352 lang_str = f"[{self.language}]" if self.language else "" | |
353 msg = f"{self.message[:20]}…" if len(self.message)>20 else self.message | |
354 content = f"{lang_str}{msg}" | |
355 return f"Message<{content}>" | |
356 | |
357 | |
358 class Subject(Base): | |
359 __tablename__ = "subject" | |
360 __table_args__ = ( | |
361 Index("subject__history_uid", "history_uid"), | |
362 ) | |
363 | |
364 id = Column( | |
365 Integer, | |
366 primary_key=True, | |
367 ) | |
368 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), nullable=False) | |
369 subject = Column(Text, nullable=False) | |
370 language = Column(Text) | |
371 | |
372 def serialise(self) -> Dict[str, Any]: | |
373 s = {} | |
374 if self.subject: | |
375 s["subject"] = str(self.subject) | |
376 if self.language: | |
377 s["language"] = str(self.language) | |
378 return s | |
379 | |
380 def __repr__(self): | |
381 lang_str = f"[{self.language}]" if self.language else "" | |
382 msg = f"{self.subject[:20]}…" if len(self.subject)>20 else self.subject | |
383 content = f"{lang_str}{msg}" | |
384 return f"Subject<{content}>" | |
385 | |
386 | |
387 class Thread(Base): | |
388 __tablename__ = "thread" | |
389 __table_args__ = ( | |
390 Index("thread__history_uid", "history_uid"), | |
391 ) | |
392 | |
393 id = Column( | |
394 Integer, | |
395 primary_key=True, | |
396 ) | |
397 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE")) | |
398 thread_id = Column(Text) | |
399 parent_id = Column(Text) | |
400 | |
401 history = relationship("History", uselist=False, back_populates="thread") | |
402 | |
403 def __repr__(self): | |
404 return f"Thread<{self.thread_id} [parent: {self.parent_id}]>" | |
405 | |
406 | |
407 class ParamGen(Base): | |
408 __tablename__ = "param_gen" | |
409 | |
410 category = Column(Text, primary_key=True) | |
411 name = Column(Text, primary_key=True) | |
412 value = Column(Text) | |
413 | |
414 | |
415 class ParamInd(Base): | |
416 __tablename__ = "param_ind" | |
417 | |
418 category = Column(Text, primary_key=True) | |
419 name = Column(Text, primary_key=True) | |
420 profile_id = Column( | |
421 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True | |
422 ) | |
423 value = Column(Text) | |
424 | |
425 profile = relationship("Profile", back_populates="params") | |
426 | |
427 | |
428 class PrivateGen(Base): | |
429 __tablename__ = "private_gen" | |
430 | |
431 namespace = Column(Text, primary_key=True) | |
432 key = Column(Text, primary_key=True) | |
433 value = Column(Text) | |
434 | |
435 | |
436 class PrivateInd(Base): | |
437 __tablename__ = "private_ind" | |
438 | |
439 namespace = Column(Text, primary_key=True) | |
440 key = Column(Text, primary_key=True) | |
441 profile_id = Column( | |
442 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True | |
443 ) | |
444 value = Column(Text) | |
445 | |
446 profile = relationship("Profile", back_populates="private_data") | |
447 | |
448 | |
449 class PrivateGenBin(Base): | |
450 __tablename__ = "private_gen_bin" | |
451 | |
452 namespace = Column(Text, primary_key=True) | |
453 key = Column(Text, primary_key=True) | |
454 value = Column(LegacyPickle) | |
455 | |
456 | |
457 class PrivateIndBin(Base): | |
458 __tablename__ = "private_ind_bin" | |
459 | |
460 namespace = Column(Text, primary_key=True) | |
461 key = Column(Text, primary_key=True) | |
462 profile_id = Column( | |
463 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True | |
464 ) | |
465 value = Column(LegacyPickle) | |
466 | |
467 profile = relationship("Profile", back_populates="private_bin_data") | |
468 | |
469 | |
470 class File(Base): | |
471 __tablename__ = "files" | |
472 __table_args__ = ( | |
473 Index("files__profile_id_owner_parent", "profile_id", "owner", "parent"), | |
474 Index( | |
475 "files__profile_id_owner_media_type_media_subtype", | |
476 "profile_id", | |
477 "owner", | |
478 "media_type", | |
479 "media_subtype" | |
480 ) | |
481 ) | |
482 | |
483 id = Column(Text, primary_key=True) | |
484 public_id = Column(Text, unique=True) | |
485 version = Column(Text, primary_key=True) | |
486 parent = Column(Text, nullable=False) | |
487 type = Column( | |
488 Enum( | |
489 "file", "directory", | |
490 name="file_type", | |
491 create_constraint=True | |
492 ), | |
493 nullable=False, | |
494 server_default="file", | |
495 ) | |
496 file_hash = Column(Text) | |
497 hash_algo = Column(Text) | |
498 name = Column(Text, nullable=False) | |
499 size = Column(Integer) | |
500 namespace = Column(Text) | |
501 media_type = Column(Text) | |
502 media_subtype = Column(Text) | |
503 created = Column(Float, nullable=False) | |
504 modified = Column(Float) | |
505 owner = Column(JID) | |
506 access = Column(JsonDefaultDict) | |
507 extra = Column(JsonDefaultDict) | |
508 profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) | |
509 | |
510 profile = relationship("Profile") | |
511 | |
512 | |
513 class PubsubNode(Base): | |
514 __tablename__ = "pubsub_nodes" | |
515 __table_args__ = ( | |
516 UniqueConstraint("profile_id", "service", "name"), | |
517 ) | |
518 | |
519 id = Column(Integer, primary_key=True) | |
520 profile_id = Column( | |
521 ForeignKey("profiles.id", ondelete="CASCADE") | |
522 ) | |
523 service = Column(JID) | |
524 name = Column(Text, nullable=False) | |
525 subscribed = Column( | |
526 Boolean(create_constraint=True, name="subscribed_bool"), nullable=False | |
527 ) | |
528 analyser = Column(Text) | |
529 sync_state = Column( | |
530 Enum( | |
531 SyncState, | |
532 name="sync_state", | |
533 create_constraint=True, | |
534 ), | |
535 nullable=True | |
536 ) | |
537 sync_state_updated = Column( | |
538 Float, | |
539 nullable=False, | |
540 default=time.time() | |
541 ) | |
542 type_ = Column( | |
543 Text, name="type", nullable=True | |
544 ) | |
545 subtype = Column( | |
546 Text, nullable=True | |
547 ) | |
548 extra = Column(JSON) | |
549 | |
550 items = relationship("PubsubItem", back_populates="node", passive_deletes=True) | |
551 subscriptions = relationship("PubsubSub", back_populates="node", passive_deletes=True) | |
552 | |
553 def __str__(self): | |
554 return f"Pubsub node {self.name!r} at {self.service}" | |
555 | |
556 | |
557 class PubsubSub(Base): | |
558 """Subscriptions to pubsub nodes | |
559 | |
560 Used by components managing a pubsub service | |
561 """ | |
562 __tablename__ = "pubsub_subs" | |
563 __table_args__ = ( | |
564 UniqueConstraint("node_id", "subscriber"), | |
565 ) | |
566 | |
567 id = Column(Integer, primary_key=True) | |
568 node_id = Column(ForeignKey("pubsub_nodes.id", ondelete="CASCADE"), nullable=False) | |
569 subscriber = Column(JID) | |
570 state = Column( | |
571 Enum( | |
572 SubscriptionState, | |
573 name="state", | |
574 create_constraint=True, | |
575 ), | |
576 nullable=True | |
577 ) | |
578 | |
579 node = relationship("PubsubNode", back_populates="subscriptions") | |
580 | |
581 | |
582 class PubsubItem(Base): | |
583 __tablename__ = "pubsub_items" | |
584 __table_args__ = ( | |
585 UniqueConstraint("node_id", "name"), | |
586 ) | |
587 id = Column(Integer, primary_key=True) | |
588 node_id = Column(ForeignKey("pubsub_nodes.id", ondelete="CASCADE"), nullable=False) | |
589 name = Column(Text, nullable=False) | |
590 data = Column(Xml, nullable=False) | |
591 created = Column(DateTime, nullable=False, server_default=now()) | |
592 updated = Column(DateTime, nullable=False, server_default=now(), onupdate=now()) | |
593 parsed = Column(JSON) | |
594 | |
595 node = relationship("PubsubNode", back_populates="items") | |
596 | |
597 | |
598 ## Full-Text Search | |
599 | |
600 # create | |
601 | |
602 @event.listens_for(PubsubItem.__table__, "after_create") | |
603 def fts_create(target, connection, **kw): | |
604 """Full-Text Search table creation""" | |
605 if connection.engine.name == "sqlite": | |
606 # Using SQLite FTS5 | |
607 queries = [ | |
608 "CREATE VIRTUAL TABLE pubsub_items_fts " | |
609 "USING fts5(data, content=pubsub_items, content_rowid=id)", | |
610 "CREATE TRIGGER pubsub_items_fts_sync_ins AFTER INSERT ON pubsub_items BEGIN" | |
611 " INSERT INTO pubsub_items_fts(rowid, data) VALUES (new.id, new.data);" | |
612 "END", | |
613 "CREATE TRIGGER pubsub_items_fts_sync_del AFTER DELETE ON pubsub_items BEGIN" | |
614 " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) " | |
615 "VALUES('delete', old.id, old.data);" | |
616 "END", | |
617 "CREATE TRIGGER pubsub_items_fts_sync_upd AFTER UPDATE ON pubsub_items BEGIN" | |
618 " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) VALUES" | |
619 "('delete', old.id, old.data);" | |
620 " INSERT INTO pubsub_items_fts(rowid, data) VALUES(new.id, new.data);" | |
621 "END" | |
622 ] | |
623 for q in queries: | |
624 connection.execute(DDL(q)) | |
625 | |
626 # drop | |
627 | |
628 @event.listens_for(PubsubItem.__table__, "before_drop") | |
629 def fts_drop(target, connection, **kw): | |
630 "Full-Text Search table drop" "" | |
631 if connection.engine.name == "sqlite": | |
632 # Using SQLite FTS5 | |
633 queries = [ | |
634 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_ins", | |
635 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_del", | |
636 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_upd", | |
637 "DROP TABLE IF EXISTS pubsub_items_fts", | |
638 ] | |
639 for q in queries: | |
640 connection.execute(DDL(q)) |