Mercurial > libervia-backend
view sat/memory/sqla_mapping.py @ 3541:888109774673
core: various changes and fixes to work with new storage and D-Bus bridge:
- fixes coroutines handling in various places
- fixes types which are not serialised by Tx DBus
- XEP-0384: call storage methods in main thread in XEP: Python OMEMO's Promise use thread
which prevent the use of AsyncIO loop. To work around that, callLater is used to launch
storage method in main thread. This is a temporary workaround, as Python OMEMO should
get rid of Promise implementation and threads soon.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 03 Jun 2021 15:21:43 +0200 |
parents | f9a5b810f14d |
children | 84ea57a8d6b3 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import pickle import json from sqlalchemy import ( Column, Integer, Text, Float, Enum, ForeignKey, UniqueConstraint, Index, ) from sqlalchemy.orm import declarative_base, relationship from sqlalchemy.types import TypeDecorator from twisted.words.protocols.jabber import jid from datetime import datetime Base = declarative_base() # keys which are in message data extra but not stored in extra field this is # because those values are stored in separate fields NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid') class LegacyPickle(TypeDecorator): """Handle troubles with data pickled by former version of SàT This type is temporary until we do migration to a proper data type """ # Blob is used on SQLite but gives errors when used here, while Text works fine impl = Text cache_ok = True def process_bind_param(self, value, dialect): if value is None: return None return pickle.dumps(value, 0) def process_result_value(self, value, dialect): if value is None: return None # value types are inconsistent (probably a consequence of Python 2/3 port # and/or SQLite dynamic typing) try: value = value.encode() except AttributeError: pass # "utf-8" encoding is needed to handle Python 2 pickled data return pickle.loads(value, encoding="utf-8") class Json(TypeDecorator): """Handle JSON field in DB independant way""" # Blob is used on SQLite but gives errors when used here, while Text works fine impl = Text cache_ok = True def process_bind_param(self, value, dialect): if value is None: return None return json.dumps(value) def process_result_value(self, value, dialect): if value is None: return None return json.loads(value) class JsonDefaultDict(Json): """Json type which convert NULL to empty dict instead of None""" def process_result_value(self, value, dialect): if value is None: return {} return json.loads(value) class JID(TypeDecorator): """Store twisted JID in text fields""" impl = Text cache_ok = True def process_bind_param(self, value, dialect): if value is None: return None return value.full() def process_result_value(self, value, dialect): if value is None: return None return jid.JID(value) class Profile(Base): __tablename__ = "profiles" id = Column(Integer, primary_key=True) name = Column(Text, unique=True) params = relationship("ParamInd", back_populates="profile", passive_deletes=True) private_data = relationship( "PrivateInd", back_populates="profile", passive_deletes=True ) private_bin_data = relationship( "PrivateIndBin", back_populates="profile", passive_deletes=True ) class Component(Base): __tablename__ = "components" profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True) entry_point = Column(Text, nullable=False) profile = relationship("Profile") class MessageType(Base): __tablename__ = "message_types" type = Column(Text, primary_key=True) class History(Base): __tablename__ = "history" __table_args__ = ( UniqueConstraint("profile_id", "stanza_id", "source", "dest"), Index("history__profile_id_timestamp", "profile_id", "timestamp"), Index( "history__profile_id_received_timestamp", "profile_id", "received_timestamp" ) ) uid = Column(Text, primary_key=True) stanza_id = Column(Text) update_uid = Column(Text) profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) source = Column(Text) dest = Column(Text) source_res = Column(Text) dest_res = Column(Text) timestamp = Column(Float, nullable=False) received_timestamp = Column(Float) type = Column(ForeignKey("message_types.type")) extra = Column(LegacyPickle) profile = relationship("Profile") message_type = relationship("MessageType") messages = relationship("Message", backref="history", passive_deletes=True) subjects = relationship("Subject", backref="history", passive_deletes=True) thread = relationship( "Thread", uselist=False, back_populates="history", passive_deletes=True ) def __init__(self, *args, **kwargs): source_jid = kwargs.pop("source_jid", None) if source_jid is not None: kwargs["source"] = source_jid.userhost() kwargs["source_res"] = source_jid.resource dest_jid = kwargs.pop("dest_jid", None) if dest_jid is not None: kwargs["dest"] = dest_jid.userhost() kwargs["dest_res"] = dest_jid.resource super().__init__(*args, **kwargs) @property def source_jid(self) -> jid.JID: return jid.JID(f"{self.source}/{self.source_res or ''}") @source_jid.setter def source_jid(self, source_jid: jid.JID) -> None: self.source = source_jid.userhost self.source_res = source_jid.resource @property def dest_jid(self): return jid.JID(f"{self.dest}/{self.dest_res or ''}") @dest_jid.setter def dest_jid(self, dest_jid: jid.JID) -> None: self.dest = dest_jid.userhost self.dest_res = dest_jid.resource def __repr__(self): dt = datetime.fromtimestamp(self.timestamp) return f"History<{self.source_jid.full()}->{self.dest_jid.full()} [{dt}]>" def serialise(self): extra = self.extra if self.stanza_id is not None: extra["stanza_id"] = self.stanza_id if self.update_uid is not None: extra["update_uid"] = self.update_uid if self.received_timestamp is not None: extra["received_timestamp"] = self.received_timestamp if self.thread is not None: extra["thread"] = self.thread.thread_id if self.thread.parent_id is not None: extra["thread_parent"] = self.thread.parent_id return { "from": f"{self.source}/{self.source_res}" if self.source_res else self.source, "to": f"{self.dest}/{self.dest_res}" if self.dest_res else self.dest, "uid": self.uid, "message": {m.language or '': m.message for m in self.messages}, "subject": {m.language or '': m.subject for m in self.subjects}, "type": self.type, "extra": extra, "timestamp": self.timestamp, } def as_tuple(self): d = self.serialise() return ( d['uid'], d['timestamp'], d['from'], d['to'], d['message'], d['subject'], d['type'], d['extra'] ) @staticmethod def debug_collection(history_collection): for idx, history in enumerate(history_collection): history.debug_msg(idx) def debug_msg(self, idx=None): """Print messages""" dt = datetime.fromtimestamp(self.timestamp) if idx is not None: dt = f"({idx}) {dt}" parts = [] parts.append(f"[{dt}]<{self.source_jid.full()}->{self.dest_jid.full()}> ") for message in self.messages: if message.language: parts.append(f"[{message.language}] ") parts.append(f"{message.message}\n") print("".join(parts)) class Message(Base): __tablename__ = "message" id = Column(Integer, primary_key=True) history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), index=True) message = Column(Text) language = Column(Text) def __repr__(self): lang_str = f"[{self.language}]" if self.language else "" msg = f"{self.message[:20]}…" if len(self.message)>20 else self.message content = f"{lang_str}{msg}" return f"Message<{content}>" class Subject(Base): __tablename__ = "subject" id = Column(Integer, primary_key=True) history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), index=True) subject = Column(Text) language = Column(Text) def __repr__(self): lang_str = f"[{self.language}]" if self.language else "" msg = f"{self.subject[:20]}…" if len(self.subject)>20 else self.subject content = f"{lang_str}{msg}" return f"Subject<{content}>" class Thread(Base): __tablename__ = "thread" id = Column(Integer, primary_key=True) history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), index=True) thread_id = Column(Text) parent_id = Column(Text) history = relationship("History", uselist=False, back_populates="thread") def __repr__(self): return f"Thread<{self.thread_id} [parent: {self.parent_id}]>" class ParamGen(Base): __tablename__ = "param_gen" category = Column(Text, primary_key=True, nullable=False) name = Column(Text, primary_key=True, nullable=False) value = Column(Text) class ParamInd(Base): __tablename__ = "param_ind" category = Column(Text, primary_key=True, nullable=False) name = Column(Text, primary_key=True, nullable=False) profile_id = Column( ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True, nullable=False ) value = Column(Text) profile = relationship("Profile", back_populates="params") class PrivateGen(Base): __tablename__ = "private_gen" namespace = Column(Text, primary_key=True, nullable=False) key = Column(Text, primary_key=True, nullable=False) value = Column(Text) class PrivateInd(Base): __tablename__ = "private_ind" namespace = Column(Text, primary_key=True, nullable=False) key = Column(Text, primary_key=True, nullable=False) profile_id = Column( ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True, nullable=False ) value = Column(Text) profile = relationship("Profile", back_populates="private_data") class PrivateGenBin(Base): __tablename__ = "private_gen_bin" namespace = Column(Text, primary_key=True, nullable=False) key = Column(Text, primary_key=True, nullable=False) value = Column(LegacyPickle) class PrivateIndBin(Base): __tablename__ = "private_ind_bin" namespace = Column(Text, primary_key=True, nullable=False) key = Column(Text, primary_key=True, nullable=False) profile_id = Column( ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True, nullable=False ) value = Column(LegacyPickle) profile = relationship("Profile", back_populates="private_bin_data") class File(Base): __tablename__ = "files" __table_args__ = ( Index("files__profile_id_owner_parent", "profile_id", "owner", "parent"), Index( "files__profile_id_owner_media_type_media_subtype", "profile_id", "owner", "media_type", "media_subtype" ) ) id = Column(Text, primary_key=True, nullable=False) public_id = Column(Text, unique=True) version = Column(Text, primary_key=True, nullable=False) parent = Column(Text, nullable=False) type = Column( Enum("file", "directory", create_constraint=True), nullable=False, server_default="file", # name="file_type", ) file_hash = Column(Text) hash_algo = Column(Text) name = Column(Text, nullable=False) size = Column(Integer) namespace = Column(Text) media_type = Column(Text) media_subtype = Column(Text) created = Column(Float, nullable=False) modified = Column(Float) owner = Column(JID) access = Column(JsonDefaultDict) extra = Column(JsonDefaultDict) profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) profile = relationship("Profile")