view libervia/backend/core/core_types.py @ 4382:b897d98b2c51 default tip

plugin XEP-0297: Reworked `forward` method and add bridge method: `Forward` method has been reworked and now includes a fallback. XEP-0297 ask to not use fallback, but following a discussion on xsf@, we agreed that this is a legacy thing and a fallback should nowadays be used, I'll propose a patch to the specification. A `message_forward` has been added to bridge. rel 461
author Goffi <goffi@goffi.org>
date Fri, 04 Jul 2025 12:33:42 +0200
parents 79d463e3fdeb
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia types
# Copyright (C) 2011  Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from abc import ABC, abstractmethod
from collections import namedtuple
from typing import TYPE_CHECKING, Any
from twisted.internet import defer
from twisted.python import failure
from typing_extensions import TypedDict

from twisted.words.protocols.jabber import jid as t_jid
from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from wokkel import disco

from libervia.backend.core import exceptions
from libervia.backend.models.core import MessageData

if TYPE_CHECKING:
    from libervia.backend.core.xmpp import LiberviaRosterProtocol


EncryptionPlugin = namedtuple(
    "EncryptionPlugin", ("instance", "name", "namespace", "priority", "directed")
)


class EncryptionSession(TypedDict):
    plugin: EncryptionPlugin


# Incomplete types built through observation rather than code inspection.
MessageDataExtra = TypedDict(
    "MessageDataExtra", {"encrypted": bool, "origin_id": str, "headers": dict[str, Any]}, total=False
)


# FIXME: deprecated, do not use this type. Use libervia.backend.models.core.MessageData
#   instead.
MessageDataLegacy = TypedDict(
    "MessageDataLegacy",
    {
        "from": t_jid.JID,
        "to": t_jid.JID,
        "uid": str,
        "message": dict[str, str],
        "subject": dict[str, str],
        "type": str,
        "timestamp": float,
        "extra": MessageDataExtra,
        "ENCRYPTION": EncryptionSession,
        "xml": domish.Element,
    },
    total=False,
)


class SatXMPPEntity(ABC):
    """Base class for Client and Component."""

    profile: str
    jid: t_jid.JID
    is_component: bool
    server_jid: t_jid.JID
    identities: list[disco.DiscoIdentity]
    xmlstream: xmlstream.XmlStream

    @classmethod
    @abstractmethod
    async def start_connection(cls, host, profile, max_retries):
        raise NotImplementedError

    @abstractmethod
    def disconnect_profile(self, reason: failure.Failure | None) -> None:
        """Disconnect the profile."""
        raise NotImplementedError

    @abstractmethod
    def is_connected(self) -> bool:
        """Return True is client is fully connected

        client is considered fully connected if transport is started and all plugins
        are initialised
        """
        raise NotImplementedError

    @abstractmethod
    def entity_disconnect(self) -> defer.Deferred[None]:
        raise NotImplementedError

    ## sending ##

    @abstractmethod
    def IQ(self, type_="set", timeout=60) -> xmlstream.IQ:
        """shortcut to create an IQ element managing deferred

        @param type_(unicode): IQ type ('set' or 'get')
        @param timeout(None, int): timeout in seconds
        @return((D)domish.Element: result stanza
            errback is called if an error stanza is returned
        """
        raise NotImplementedError

    @abstractmethod
    def sendError(
        self,
        iq_elt: domish.Element,
        condition: str,
        text: str | None = None,
        appCondition: str | None = None,
    ) -> None:
        """Send error stanza build from iq_elt

        @param iq_elt(domish.Element): initial IQ element
        @param condition(unicode): error condition
        """
        raise NotImplementedError

    @abstractmethod
    def generate_message_xml(
        self,
        data: MessageData,
        post_xml_treatments: defer.Deferred|None = None,
    ) -> MessageData:
        """Generate <message/> stanza from message data

        @param data: message data
            domish element will be put in data['xml']
            following keys are needed:
                - from
                - to
                - uid: can be set to '' if uid attribute is not wanted
                - message
                - type
                - subject
                - extra
        @param post_xml_treatments: a Deferred which will be called with data once XML is
            generated
        @return: message data
        """
        raise NotImplementedError

    @property
    @abstractmethod
    def is_admin(self) -> bool:
        """True if a client is an administrator with extra privileges"""
        raise NotImplementedError

    @abstractmethod
    def add_post_xml_callbacks(self, post_xml_treatments) -> None:
        """Used to add class level callbacks at the end of the workflow

        @param post_xml_treatments(D): the same Deferred as in sendMessage trigger
        """
        raise NotImplementedError

    @abstractmethod
    async def a_send(self, obj: domish.Element) -> None:
        raise NotImplementedError

    @abstractmethod
    def send(self, obj: domish.Element) -> None:
        # We need to call super() due to mulitple inheritance: Wokkel XMPPClient or
        # Component's `send` needs to be called.
        super().send(obj) # type: ignore

    @abstractmethod
    async def send_message_data(self, mess_data: MessageData) -> MessageData:
        """Convenient method to send message data to stream

        This method will send mess_data[u'xml'] to stream, but a trigger is there
        The trigger can't be cancelled, it's a good place for e2e encryption which
        don't handle full stanza encryption
        This trigger can return a Deferred (it's an async_point)
        @param mess_data(dict): message data as constructed by onMessage workflow
        @return (dict): mess_data (so it can be used in a deferred chain)
        """
        raise NotImplementedError

    @abstractmethod
    def sendMessage(
        self,
        to_jid,
        message,
        subject=None,
        mess_type="auto",
        extra=None,
        uid=None,
        no_trigger=False,
    ):
        """Send a message to an entity

        @param to_jid(jid.JID): destinee of the message
        @param message(dict): message body, key is the language (use '' when unknown)
        @param subject(dict): message subject, key is the language (use '' when unknown)
        @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or:
            - auto: for automatic type detection
            - info: for information ("info_type" can be specified in extra)
        @param extra(dict, None): extra data. Key can be:
            - info_type: information type, can be
                TODO
        @param uid(unicode, None): unique id:
            should be unique at least in this XMPP session
            if None, an uuid will be generated
        @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used
            useful when a message need to be sent without any modification
            ⚠ this will also skip encryption methods!
        """
        raise NotImplementedError

    @abstractmethod
    def is_message_printable(self, mess_data: MessageData) -> bool:
        """Return True if a message contain payload to show in frontends"""
        raise NotImplementedError

    @abstractmethod
    async def message_add_to_history(self, data):
        """Store message into database (for local history)

        @param data: message data dictionnary
        @param client: profile's client
        """
        raise NotImplementedError

    @abstractmethod
    def message_get_bridge_args(self, data):
        """Generate args to use with bridge from data dict"""
        raise NotImplementedError

    @abstractmethod
    def message_send_to_bridge(self, data):
        """Send message to bridge, so frontends can display it

        @param data: message data dictionnary
        """
        raise NotImplementedError

    ## helper methods ##

    @abstractmethod
    def p(self, plugin_name, missing=exceptions.MissingModule):
        """Get a plugin if available

        @param plugin_name(str): name of the plugin
        @param missing(object): value to return if plugin is missing
            if it is a subclass of Exception, it will be raised with a helping str as
            argument.
        @return (object): requested plugin wrapper, or default value
            The plugin wrapper will return the method with client set as first
            positional argument
        """
        raise NotImplementedError


class SatXMPPClient(SatXMPPEntity):
    roster: "LiberviaRosterProtocol"


class SatXMPPComponent(SatXMPPEntity):

    def is_local(self, jid_: t_jid.JID) -> bool: ...