comparison libervia/backend/memory/sqla_mapping.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/memory/sqla_mapping.py@fe4725bf42fb
children 684ba556a617
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia: an XMPP client
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Dict, Any
20 from datetime import datetime
21 import enum
22 import json
23 import pickle
24 import time
25
26 from sqlalchemy import (
27 Boolean,
28 Column,
29 DDL,
30 DateTime,
31 Enum,
32 Float,
33 ForeignKey,
34 Index,
35 Integer,
36 JSON,
37 MetaData,
38 Text,
39 UniqueConstraint,
40 event,
41 )
42 from sqlalchemy.orm import declarative_base, relationship
43 from sqlalchemy.sql.functions import now
44 from sqlalchemy.types import TypeDecorator
45 from twisted.words.protocols.jabber import jid
46 from wokkel import generic
47
48
49 Base = declarative_base(
50 metadata=MetaData(
51 naming_convention={
52 "ix": 'ix_%(column_0_label)s',
53 "uq": "uq_%(table_name)s_%(column_0_name)s",
54 "ck": "ck_%(table_name)s_%(constraint_name)s",
55 "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
56 "pk": "pk_%(table_name)s"
57 }
58 )
59 )
60 # keys which are in message data extra but not stored in extra field this is
61 # because those values are stored in separate fields
62 NOT_IN_EXTRA = ('origin_id', 'stanza_id', 'received_timestamp', 'update_uid')
63
64
65 class SyncState(enum.Enum):
66 #: synchronisation is currently in progress
67 IN_PROGRESS = 1
68 #: synchronisation is done
69 COMPLETED = 2
70 #: something wrong happened during synchronisation, won't sync
71 ERROR = 3
72 #: synchronisation won't be done even if a syncing analyser matches
73 NO_SYNC = 4
74
75
76 class SubscriptionState(enum.Enum):
77 SUBSCRIBED = 1
78 PENDING = 2
79
80
81 class LegacyPickle(TypeDecorator):
82 """Handle troubles with data pickled by former version of SàT
83
84 This type is temporary until we do migration to a proper data type
85 """
86 # Blob is used on SQLite but gives errors when used here, while Text works fine
87 impl = Text
88 cache_ok = True
89
90 def process_bind_param(self, value, dialect):
91 if value is None:
92 return None
93 return pickle.dumps(value, 0)
94
95 def process_result_value(self, value, dialect):
96 if value is None:
97 return None
98 # value types are inconsistent (probably a consequence of Python 2/3 port
99 # and/or SQLite dynamic typing)
100 try:
101 value = value.encode()
102 except AttributeError:
103 pass
104 # "utf-8" encoding is needed to handle Python 2 pickled data
105 return pickle.loads(value, encoding="utf-8")
106
107
108 class Json(TypeDecorator):
109 """Handle JSON field in DB independant way"""
110 # Blob is used on SQLite but gives errors when used here, while Text works fine
111 impl = Text
112 cache_ok = True
113
114 def process_bind_param(self, value, dialect):
115 if value is None:
116 return None
117 return json.dumps(value)
118
119 def process_result_value(self, value, dialect):
120 if value is None:
121 return None
122 return json.loads(value)
123
124
125 class JsonDefaultDict(Json):
126 """Json type which convert NULL to empty dict instead of None"""
127
128 def process_result_value(self, value, dialect):
129 if value is None:
130 return {}
131 return json.loads(value)
132
133
134 class Xml(TypeDecorator):
135 impl = Text
136 cache_ok = True
137
138 def process_bind_param(self, value, dialect):
139 if value is None:
140 return None
141 return value.toXml()
142
143 def process_result_value(self, value, dialect):
144 if value is None:
145 return None
146 return generic.parseXml(value.encode())
147
148
149 class JID(TypeDecorator):
150 """Store twisted JID in text fields"""
151 impl = Text
152 cache_ok = True
153
154 def process_bind_param(self, value, dialect):
155 if value is None:
156 return None
157 return value.full()
158
159 def process_result_value(self, value, dialect):
160 if value is None:
161 return None
162 return jid.JID(value)
163
164
165 class Profile(Base):
166 __tablename__ = "profiles"
167
168 id = Column(
169 Integer,
170 primary_key=True,
171 nullable=True,
172 )
173 name = Column(Text, unique=True)
174
175 params = relationship("ParamInd", back_populates="profile", passive_deletes=True)
176 private_data = relationship(
177 "PrivateInd", back_populates="profile", passive_deletes=True
178 )
179 private_bin_data = relationship(
180 "PrivateIndBin", back_populates="profile", passive_deletes=True
181 )
182
183
184 class Component(Base):
185 __tablename__ = "components"
186
187 profile_id = Column(
188 ForeignKey("profiles.id", ondelete="CASCADE"),
189 nullable=True,
190 primary_key=True
191 )
192 entry_point = Column(Text, nullable=False)
193 profile = relationship("Profile")
194
195
196 class History(Base):
197 __tablename__ = "history"
198 __table_args__ = (
199 UniqueConstraint("profile_id", "stanza_id", "source", "dest"),
200 UniqueConstraint("profile_id", "origin_id", "source", name="uq_origin_id"),
201 Index("history__profile_id_timestamp", "profile_id", "timestamp"),
202 Index(
203 "history__profile_id_received_timestamp", "profile_id", "received_timestamp"
204 )
205 )
206
207 uid = Column(Text, primary_key=True)
208 origin_id = Column(Text)
209 stanza_id = Column(Text)
210 update_uid = Column(Text)
211 profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"))
212 source = Column(Text)
213 dest = Column(Text)
214 source_res = Column(Text)
215 dest_res = Column(Text)
216 timestamp = Column(Float, nullable=False)
217 received_timestamp = Column(Float)
218 type = Column(
219 Enum(
220 "chat",
221 "error",
222 "groupchat",
223 "headline",
224 "normal",
225 # info is not XMPP standard, but used to keep track of info like join/leave
226 # in a MUC
227 "info",
228 name="message_type",
229 create_constraint=True,
230 ),
231 nullable=False,
232 )
233 extra = Column(LegacyPickle)
234
235 profile = relationship("Profile")
236 messages = relationship("Message", backref="history", passive_deletes=True)
237 subjects = relationship("Subject", backref="history", passive_deletes=True)
238 thread = relationship(
239 "Thread", uselist=False, back_populates="history", passive_deletes=True
240 )
241
242 def __init__(self, *args, **kwargs):
243 source_jid = kwargs.pop("source_jid", None)
244 if source_jid is not None:
245 kwargs["source"] = source_jid.userhost()
246 kwargs["source_res"] = source_jid.resource
247 dest_jid = kwargs.pop("dest_jid", None)
248 if dest_jid is not None:
249 kwargs["dest"] = dest_jid.userhost()
250 kwargs["dest_res"] = dest_jid.resource
251 super().__init__(*args, **kwargs)
252
253 @property
254 def source_jid(self) -> jid.JID:
255 return jid.JID(f"{self.source}/{self.source_res or ''}")
256
257 @source_jid.setter
258 def source_jid(self, source_jid: jid.JID) -> None:
259 self.source = source_jid.userhost
260 self.source_res = source_jid.resource
261
262 @property
263 def dest_jid(self):
264 return jid.JID(f"{self.dest}/{self.dest_res or ''}")
265
266 @dest_jid.setter
267 def dest_jid(self, dest_jid: jid.JID) -> None:
268 self.dest = dest_jid.userhost
269 self.dest_res = dest_jid.resource
270
271 def __repr__(self):
272 dt = datetime.fromtimestamp(self.timestamp)
273 return f"History<{self.source_jid.full()}->{self.dest_jid.full()} [{dt}]>"
274
275 def serialise(self):
276 extra = self.extra or {}
277 if self.origin_id is not None:
278 extra["origin_id"] = self.origin_id
279 if self.stanza_id is not None:
280 extra["stanza_id"] = self.stanza_id
281 if self.update_uid is not None:
282 extra["update_uid"] = self.update_uid
283 if self.received_timestamp is not None:
284 extra["received_timestamp"] = self.received_timestamp
285 if self.thread is not None:
286 extra["thread"] = self.thread.thread_id
287 if self.thread.parent_id is not None:
288 extra["thread_parent"] = self.thread.parent_id
289
290
291 return {
292 "from": f"{self.source}/{self.source_res}" if self.source_res
293 else self.source,
294 "to": f"{self.dest}/{self.dest_res}" if self.dest_res else self.dest,
295 "uid": self.uid,
296 "message": {m.language or '': m.message for m in self.messages},
297 "subject": {m.language or '': m.subject for m in self.subjects},
298 "type": self.type,
299 "extra": extra,
300 "timestamp": self.timestamp,
301 }
302
303 def as_tuple(self):
304 d = self.serialise()
305 return (
306 d['uid'], d['timestamp'], d['from'], d['to'], d['message'], d['subject'],
307 d['type'], d['extra']
308 )
309
310 @staticmethod
311 def debug_collection(history_collection):
312 for idx, history in enumerate(history_collection):
313 history.debug_msg(idx)
314
315 def debug_msg(self, idx=None):
316 """Print messages"""
317 dt = datetime.fromtimestamp(self.timestamp)
318 if idx is not None:
319 dt = f"({idx}) {dt}"
320 parts = []
321 parts.append(f"[{dt}]<{self.source_jid.full()}->{self.dest_jid.full()}> ")
322 for message in self.messages:
323 if message.language:
324 parts.append(f"[{message.language}] ")
325 parts.append(f"{message.message}\n")
326 print("".join(parts))
327
328
329 class Message(Base):
330 __tablename__ = "message"
331 __table_args__ = (
332 Index("message__history_uid", "history_uid"),
333 )
334
335 id = Column(
336 Integer,
337 primary_key=True,
338 )
339 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), nullable=False)
340 message = Column(Text, nullable=False)
341 language = Column(Text)
342
343 def serialise(self) -> Dict[str, Any]:
344 s = {}
345 if self.message:
346 s["message"] = str(self.message)
347 if self.language:
348 s["language"] = str(self.language)
349 return s
350
351 def __repr__(self):
352 lang_str = f"[{self.language}]" if self.language else ""
353 msg = f"{self.message[:20]}…" if len(self.message)>20 else self.message
354 content = f"{lang_str}{msg}"
355 return f"Message<{content}>"
356
357
358 class Subject(Base):
359 __tablename__ = "subject"
360 __table_args__ = (
361 Index("subject__history_uid", "history_uid"),
362 )
363
364 id = Column(
365 Integer,
366 primary_key=True,
367 )
368 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), nullable=False)
369 subject = Column(Text, nullable=False)
370 language = Column(Text)
371
372 def serialise(self) -> Dict[str, Any]:
373 s = {}
374 if self.subject:
375 s["subject"] = str(self.subject)
376 if self.language:
377 s["language"] = str(self.language)
378 return s
379
380 def __repr__(self):
381 lang_str = f"[{self.language}]" if self.language else ""
382 msg = f"{self.subject[:20]}…" if len(self.subject)>20 else self.subject
383 content = f"{lang_str}{msg}"
384 return f"Subject<{content}>"
385
386
387 class Thread(Base):
388 __tablename__ = "thread"
389 __table_args__ = (
390 Index("thread__history_uid", "history_uid"),
391 )
392
393 id = Column(
394 Integer,
395 primary_key=True,
396 )
397 history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"))
398 thread_id = Column(Text)
399 parent_id = Column(Text)
400
401 history = relationship("History", uselist=False, back_populates="thread")
402
403 def __repr__(self):
404 return f"Thread<{self.thread_id} [parent: {self.parent_id}]>"
405
406
407 class ParamGen(Base):
408 __tablename__ = "param_gen"
409
410 category = Column(Text, primary_key=True)
411 name = Column(Text, primary_key=True)
412 value = Column(Text)
413
414
415 class ParamInd(Base):
416 __tablename__ = "param_ind"
417
418 category = Column(Text, primary_key=True)
419 name = Column(Text, primary_key=True)
420 profile_id = Column(
421 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True
422 )
423 value = Column(Text)
424
425 profile = relationship("Profile", back_populates="params")
426
427
428 class PrivateGen(Base):
429 __tablename__ = "private_gen"
430
431 namespace = Column(Text, primary_key=True)
432 key = Column(Text, primary_key=True)
433 value = Column(Text)
434
435
436 class PrivateInd(Base):
437 __tablename__ = "private_ind"
438
439 namespace = Column(Text, primary_key=True)
440 key = Column(Text, primary_key=True)
441 profile_id = Column(
442 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True
443 )
444 value = Column(Text)
445
446 profile = relationship("Profile", back_populates="private_data")
447
448
449 class PrivateGenBin(Base):
450 __tablename__ = "private_gen_bin"
451
452 namespace = Column(Text, primary_key=True)
453 key = Column(Text, primary_key=True)
454 value = Column(LegacyPickle)
455
456
457 class PrivateIndBin(Base):
458 __tablename__ = "private_ind_bin"
459
460 namespace = Column(Text, primary_key=True)
461 key = Column(Text, primary_key=True)
462 profile_id = Column(
463 ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True
464 )
465 value = Column(LegacyPickle)
466
467 profile = relationship("Profile", back_populates="private_bin_data")
468
469
470 class File(Base):
471 __tablename__ = "files"
472 __table_args__ = (
473 Index("files__profile_id_owner_parent", "profile_id", "owner", "parent"),
474 Index(
475 "files__profile_id_owner_media_type_media_subtype",
476 "profile_id",
477 "owner",
478 "media_type",
479 "media_subtype"
480 )
481 )
482
483 id = Column(Text, primary_key=True)
484 public_id = Column(Text, unique=True)
485 version = Column(Text, primary_key=True)
486 parent = Column(Text, nullable=False)
487 type = Column(
488 Enum(
489 "file", "directory",
490 name="file_type",
491 create_constraint=True
492 ),
493 nullable=False,
494 server_default="file",
495 )
496 file_hash = Column(Text)
497 hash_algo = Column(Text)
498 name = Column(Text, nullable=False)
499 size = Column(Integer)
500 namespace = Column(Text)
501 media_type = Column(Text)
502 media_subtype = Column(Text)
503 created = Column(Float, nullable=False)
504 modified = Column(Float)
505 owner = Column(JID)
506 access = Column(JsonDefaultDict)
507 extra = Column(JsonDefaultDict)
508 profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"))
509
510 profile = relationship("Profile")
511
512
513 class PubsubNode(Base):
514 __tablename__ = "pubsub_nodes"
515 __table_args__ = (
516 UniqueConstraint("profile_id", "service", "name"),
517 )
518
519 id = Column(Integer, primary_key=True)
520 profile_id = Column(
521 ForeignKey("profiles.id", ondelete="CASCADE")
522 )
523 service = Column(JID)
524 name = Column(Text, nullable=False)
525 subscribed = Column(
526 Boolean(create_constraint=True, name="subscribed_bool"), nullable=False
527 )
528 analyser = Column(Text)
529 sync_state = Column(
530 Enum(
531 SyncState,
532 name="sync_state",
533 create_constraint=True,
534 ),
535 nullable=True
536 )
537 sync_state_updated = Column(
538 Float,
539 nullable=False,
540 default=time.time()
541 )
542 type_ = Column(
543 Text, name="type", nullable=True
544 )
545 subtype = Column(
546 Text, nullable=True
547 )
548 extra = Column(JSON)
549
550 items = relationship("PubsubItem", back_populates="node", passive_deletes=True)
551 subscriptions = relationship("PubsubSub", back_populates="node", passive_deletes=True)
552
553 def __str__(self):
554 return f"Pubsub node {self.name!r} at {self.service}"
555
556
557 class PubsubSub(Base):
558 """Subscriptions to pubsub nodes
559
560 Used by components managing a pubsub service
561 """
562 __tablename__ = "pubsub_subs"
563 __table_args__ = (
564 UniqueConstraint("node_id", "subscriber"),
565 )
566
567 id = Column(Integer, primary_key=True)
568 node_id = Column(ForeignKey("pubsub_nodes.id", ondelete="CASCADE"), nullable=False)
569 subscriber = Column(JID)
570 state = Column(
571 Enum(
572 SubscriptionState,
573 name="state",
574 create_constraint=True,
575 ),
576 nullable=True
577 )
578
579 node = relationship("PubsubNode", back_populates="subscriptions")
580
581
582 class PubsubItem(Base):
583 __tablename__ = "pubsub_items"
584 __table_args__ = (
585 UniqueConstraint("node_id", "name"),
586 )
587 id = Column(Integer, primary_key=True)
588 node_id = Column(ForeignKey("pubsub_nodes.id", ondelete="CASCADE"), nullable=False)
589 name = Column(Text, nullable=False)
590 data = Column(Xml, nullable=False)
591 created = Column(DateTime, nullable=False, server_default=now())
592 updated = Column(DateTime, nullable=False, server_default=now(), onupdate=now())
593 parsed = Column(JSON)
594
595 node = relationship("PubsubNode", back_populates="items")
596
597
598 ## Full-Text Search
599
600 # create
601
602 @event.listens_for(PubsubItem.__table__, "after_create")
603 def fts_create(target, connection, **kw):
604 """Full-Text Search table creation"""
605 if connection.engine.name == "sqlite":
606 # Using SQLite FTS5
607 queries = [
608 "CREATE VIRTUAL TABLE pubsub_items_fts "
609 "USING fts5(data, content=pubsub_items, content_rowid=id)",
610 "CREATE TRIGGER pubsub_items_fts_sync_ins AFTER INSERT ON pubsub_items BEGIN"
611 " INSERT INTO pubsub_items_fts(rowid, data) VALUES (new.id, new.data);"
612 "END",
613 "CREATE TRIGGER pubsub_items_fts_sync_del AFTER DELETE ON pubsub_items BEGIN"
614 " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) "
615 "VALUES('delete', old.id, old.data);"
616 "END",
617 "CREATE TRIGGER pubsub_items_fts_sync_upd AFTER UPDATE ON pubsub_items BEGIN"
618 " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) VALUES"
619 "('delete', old.id, old.data);"
620 " INSERT INTO pubsub_items_fts(rowid, data) VALUES(new.id, new.data);"
621 "END"
622 ]
623 for q in queries:
624 connection.execute(DDL(q))
625
626 # drop
627
628 @event.listens_for(PubsubItem.__table__, "before_drop")
629 def fts_drop(target, connection, **kw):
630 "Full-Text Search table drop" ""
631 if connection.engine.name == "sqlite":
632 # Using SQLite FTS5
633 queries = [
634 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_ins",
635 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_del",
636 "DROP TRIGGER IF EXISTS pubsub_items_fts_sync_upd",
637 "DROP TABLE IF EXISTS pubsub_items_fts",
638 ]
639 for q in queries:
640 connection.execute(DDL(q))