Mercurial > libervia-backend
view libervia/backend/core/core_types.py @ 4382:b897d98b2c51 default tip
plugin XEP-0297: Reworked `forward` method and add bridge method:
`Forward` method has been reworked and now includes a fallback. XEP-0297 ask to not use
fallback, but following a discussion on xsf@, we agreed that this is a legacy thing and a
fallback should nowadays be used, I'll propose a patch to the specification.
A `message_forward` has been added to bridge.
rel 461
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 04 Jul 2025 12:33:42 +0200 |
parents | 79d463e3fdeb |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia types # Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from abc import ABC, abstractmethod from collections import namedtuple from typing import TYPE_CHECKING, Any from twisted.internet import defer from twisted.python import failure from typing_extensions import TypedDict from twisted.words.protocols.jabber import jid as t_jid from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish from wokkel import disco from libervia.backend.core import exceptions from libervia.backend.models.core import MessageData if TYPE_CHECKING: from libervia.backend.core.xmpp import LiberviaRosterProtocol EncryptionPlugin = namedtuple( "EncryptionPlugin", ("instance", "name", "namespace", "priority", "directed") ) class EncryptionSession(TypedDict): plugin: EncryptionPlugin # Incomplete types built through observation rather than code inspection. MessageDataExtra = TypedDict( "MessageDataExtra", {"encrypted": bool, "origin_id": str, "headers": dict[str, Any]}, total=False ) # FIXME: deprecated, do not use this type. Use libervia.backend.models.core.MessageData # instead. MessageDataLegacy = TypedDict( "MessageDataLegacy", { "from": t_jid.JID, "to": t_jid.JID, "uid": str, "message": dict[str, str], "subject": dict[str, str], "type": str, "timestamp": float, "extra": MessageDataExtra, "ENCRYPTION": EncryptionSession, "xml": domish.Element, }, total=False, ) class SatXMPPEntity(ABC): """Base class for Client and Component.""" profile: str jid: t_jid.JID is_component: bool server_jid: t_jid.JID identities: list[disco.DiscoIdentity] xmlstream: xmlstream.XmlStream @classmethod @abstractmethod async def start_connection(cls, host, profile, max_retries): raise NotImplementedError @abstractmethod def disconnect_profile(self, reason: failure.Failure | None) -> None: """Disconnect the profile.""" raise NotImplementedError @abstractmethod def is_connected(self) -> bool: """Return True is client is fully connected client is considered fully connected if transport is started and all plugins are initialised """ raise NotImplementedError @abstractmethod def entity_disconnect(self) -> defer.Deferred[None]: raise NotImplementedError ## sending ## @abstractmethod def IQ(self, type_="set", timeout=60) -> xmlstream.IQ: """shortcut to create an IQ element managing deferred @param type_(unicode): IQ type ('set' or 'get') @param timeout(None, int): timeout in seconds @return((D)domish.Element: result stanza errback is called if an error stanza is returned """ raise NotImplementedError @abstractmethod def sendError( self, iq_elt: domish.Element, condition: str, text: str | None = None, appCondition: str | None = None, ) -> None: """Send error stanza build from iq_elt @param iq_elt(domish.Element): initial IQ element @param condition(unicode): error condition """ raise NotImplementedError @abstractmethod def generate_message_xml( self, data: MessageData, post_xml_treatments: defer.Deferred|None = None, ) -> MessageData: """Generate <message/> stanza from message data @param data: message data domish element will be put in data['xml'] following keys are needed: - from - to - uid: can be set to '' if uid attribute is not wanted - message - type - subject - extra @param post_xml_treatments: a Deferred which will be called with data once XML is generated @return: message data """ raise NotImplementedError @property @abstractmethod def is_admin(self) -> bool: """True if a client is an administrator with extra privileges""" raise NotImplementedError @abstractmethod def add_post_xml_callbacks(self, post_xml_treatments) -> None: """Used to add class level callbacks at the end of the workflow @param post_xml_treatments(D): the same Deferred as in sendMessage trigger """ raise NotImplementedError @abstractmethod async def a_send(self, obj: domish.Element) -> None: raise NotImplementedError @abstractmethod def send(self, obj: domish.Element) -> None: # We need to call super() due to mulitple inheritance: Wokkel XMPPClient or # Component's `send` needs to be called. super().send(obj) # type: ignore @abstractmethod async def send_message_data(self, mess_data: MessageData) -> MessageData: """Convenient method to send message data to stream This method will send mess_data[u'xml'] to stream, but a trigger is there The trigger can't be cancelled, it's a good place for e2e encryption which don't handle full stanza encryption This trigger can return a Deferred (it's an async_point) @param mess_data(dict): message data as constructed by onMessage workflow @return (dict): mess_data (so it can be used in a deferred chain) """ raise NotImplementedError @abstractmethod def sendMessage( self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None, no_trigger=False, ): """Send a message to an entity @param to_jid(jid.JID): destinee of the message @param message(dict): message body, key is the language (use '' when unknown) @param subject(dict): message subject, key is the language (use '' when unknown) @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or: - auto: for automatic type detection - info: for information ("info_type" can be specified in extra) @param extra(dict, None): extra data. Key can be: - info_type: information type, can be TODO @param uid(unicode, None): unique id: should be unique at least in this XMPP session if None, an uuid will be generated @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used useful when a message need to be sent without any modification ⚠ this will also skip encryption methods! """ raise NotImplementedError @abstractmethod def is_message_printable(self, mess_data: MessageData) -> bool: """Return True if a message contain payload to show in frontends""" raise NotImplementedError @abstractmethod async def message_add_to_history(self, data): """Store message into database (for local history) @param data: message data dictionnary @param client: profile's client """ raise NotImplementedError @abstractmethod def message_get_bridge_args(self, data): """Generate args to use with bridge from data dict""" raise NotImplementedError @abstractmethod def message_send_to_bridge(self, data): """Send message to bridge, so frontends can display it @param data: message data dictionnary """ raise NotImplementedError ## helper methods ## @abstractmethod def p(self, plugin_name, missing=exceptions.MissingModule): """Get a plugin if available @param plugin_name(str): name of the plugin @param missing(object): value to return if plugin is missing if it is a subclass of Exception, it will be raised with a helping str as argument. @return (object): requested plugin wrapper, or default value The plugin wrapper will return the method with client set as first positional argument """ raise NotImplementedError class SatXMPPClient(SatXMPPEntity): roster: "LiberviaRosterProtocol" class SatXMPPComponent(SatXMPPEntity): def is_local(self, jid_: t_jid.JID) -> bool: ...