view sat/plugins/plugin_xep_0372.py @ 4004:cd6f70015738

plugin XEP-0384: run `profileConnected` workflow in background: XEP-0384 connection workflow can be very long when connecting after a while (due to OLDMEMO migration notably), which can lead to bad UX. It is not necessary to `await` it as `encrypt` and `decrypt` already wait for the plugin to be initialized correctly, so the workflow is now run in background.
author Goffi <goffi@goffi.org>
date Fri, 10 Mar 2023 17:22:45 +0100
parents 68a11b95a7d3
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for XEP-0372
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Dict, Union
from textwrap import dedent
from sat.core import exceptions
from sat.tools.common import uri as xmpp_uri

from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from zope.interface import implementer
from wokkel import disco, iwokkel

from sat.core.constants import Const as C
from sat.core.i18n import _
from sat.core.log import getLogger
from sat.core.core_types import SatXMPPEntity
from sat.tools.common import data_format


log = getLogger(__name__)

PLUGIN_INFO = {
    C.PI_NAME: "References",
    C.PI_IMPORT_NAME: "XEP-0372",
    C.PI_TYPE: C.PLUG_TYPE_XEP,
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0372"],
    C.PI_DEPENDENCIES: ["XEP-0334"],
    C.PI_MAIN: "XEP_0372",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(dedent("""\
        XEP-0372 (References) implementation

        This plugin implement generic references and mentions.
    """)),
}

NS_REFS = "urn:xmpp:reference:0"
ALLOWED_TYPES = ("mention", "data")


class XEP_0372:
    namespace = NS_REFS

    def __init__(self, host):
        log.info(_("References plugin initialization"))
        host.registerNamespace("refs", NS_REFS)
        self.host = host
        self._h = host.plugins["XEP-0334"]
        host.trigger.add("messageReceived", self._messageReceivedTrigger)
        host.bridge.addMethod(
            "referenceSend",
            ".plugin",
            in_sign="sssss",
            out_sign="",
            method=self._sendReference,
            async_=False,
        )

    def getHandler(self, client):
        return XEP_0372_Handler()

    def refElementToRefData(
        self,
        reference_elt: domish.Element
    ) -> Dict[str, Union[str, int, dict]]:
        ref_data: Dict[str, Union[str, int, dict]] = {
            "uri": reference_elt["uri"],
            "type": reference_elt["type"]
        }

        if ref_data["uri"].startswith("xmpp:"):
            ref_data["parsed_uri"] = xmpp_uri.parseXMPPUri(ref_data["uri"])

        for attr in ("begin", "end"):
            try:
                ref_data[attr] = int(reference_elt[attr])
            except (KeyError, ValueError, TypeError):
                continue

        anchor = reference_elt.getAttribute("anchor")
        if anchor is not None:
            ref_data["anchor"] = anchor
            if anchor.startswith("xmpp:"):
                ref_data["parsed_anchor"] = xmpp_uri.parseXMPPUri(anchor)
        return ref_data

    async def _messageReceivedTrigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred
    ) -> bool:
        """Check if a direct invitation is in the message, and handle it"""
        reference_elt = next(message_elt.elements(NS_REFS, "reference"), None)
        if reference_elt is None:
            return True
        try:
            ref_data = self.refElementToRefData(reference_elt)
        except KeyError:
            log.warning("invalid <reference> element: {reference_elt.toXml}")
            return True

        if not await self.host.trigger.asyncPoint(
            "XEP-0372_ref_received", client, message_elt, ref_data
        ):
            return False
        return True

    def buildRefElement(
        self,
        uri: str,
        type_: str = "mention",
        begin: Optional[int] = None,
        end: Optional[int] = None,
        anchor: Optional[str] = None,
    ) -> domish.Element:
        """Build and return the <reference> element"""
        if type_ not in ALLOWED_TYPES:
            raise ValueError(f"Unknown type: {type_!r}")
        reference_elt = domish.Element(
            (NS_REFS, "reference"),
            attribs={"uri": uri, "type": type_}
        )
        if begin is not None:
            reference_elt["begin"] = str(begin)
        if end is not None:
            reference_elt["end"] = str(end)
        if anchor is not None:
            reference_elt["anchor"] = anchor
        return reference_elt

    def _sendReference(
        self,
        recipient: str,
        anchor: str,
        type_: str,
        extra_s: str,
        profile_key: str
    ) -> defer.Deferred:
        recipient_jid = jid.JID(recipient)
        client = self.host.getClient(profile_key)
        extra: dict = data_format.deserialise(extra_s, default={})
        self.sendReference(
            client,
            uri=extra.get("uri"),
            type_=type_,
            anchor=anchor,
            to_jid=recipient_jid
        )

    def sendReference(
        self,
        client: "SatXMPPEntity",
        uri: Optional[str] = None,
        type_: str = "mention",
        begin: Optional[int] = None,
        end: Optional[int] = None,
        anchor: Optional[str] = None,
        message_elt: Optional[domish.Element] = None,
        to_jid: Optional[jid.JID] = None
    ) -> None:
        """Build and send a reference_elt

        @param uri: URI pointing to referenced object (XMPP entity, Pubsub Item, etc)
            if not set, "to_jid" will be used to build an URI to the entity
        @param type_: type of reference
            one of [ALLOWED_TYPES]
        @param begin: optional begin index
        @param end: optional end index
        @param anchor: URI of refering object (message, pubsub item), when the refence
            is not already in the wrapping message element. In other words, it's the
            object where the reference appears.
        @param message_elt: wrapping <message> element, if not set a new one will be
            generated
        @param to_jid: destinee of the reference. If not specified, "to" attribute of
            message_elt will be used.

        """
        if uri is None:
            if to_jid is None:
                raise exceptions.InternalError(
                    '"to_jid" must be set if "uri is None"'
                )
            uri = xmpp_uri.buildXMPPUri(path=to_jid.full())
        if message_elt is None:
            message_elt = domish.Element((None, "message"))

        if to_jid is not None:
            message_elt["to"] = to_jid.full()
        else:
            try:
                to_jid = jid.JID(message_elt["to"])
            except (KeyError, RuntimeError):
                raise exceptions.InternalError(
                    'invalid "to" attribute in given message element: '
                    '{message_elt.toXml()}'
                )

        message_elt.addChild(self.buildRefElement(uri, type_, begin, end, anchor))
        self._h.addHintElements(message_elt, [self._h.HINT_STORE])
        client.send(message_elt)


@implementer(iwokkel.IDisco)
class XEP_0372_Handler(XMPPHandler):

    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_REFS)]

    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
        return []