diff libervia/backend/plugins/plugin_xep_0372.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0372.py@c23cad65ae99
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0372.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,230 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for XEP-0372
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Optional, Dict, Union
+from textwrap import dedent
+from libervia.backend.core import exceptions
+from libervia.backend.tools.common import uri as xmpp_uri
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from zope.interface import implementer
+from wokkel import disco, iwokkel
+
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.tools.common import data_format
+
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "References",
+    C.PI_IMPORT_NAME: "XEP-0372",
+    C.PI_TYPE: C.PLUG_TYPE_XEP,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0372"],
+    C.PI_DEPENDENCIES: ["XEP-0334"],
+    C.PI_MAIN: "XEP_0372",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _(dedent("""\
+        XEP-0372 (References) implementation
+
+        This plugin implement generic references and mentions.
+    """)),
+}
+
+NS_REFS = "urn:xmpp:reference:0"
+ALLOWED_TYPES = ("mention", "data")
+
+
+class XEP_0372:
+    namespace = NS_REFS
+
+    def __init__(self, host):
+        log.info(_("References plugin initialization"))
+        host.register_namespace("refs", NS_REFS)
+        self.host = host
+        self._h = host.plugins["XEP-0334"]
+        host.trigger.add("message_received", self._message_received_trigger)
+        host.bridge.add_method(
+            "reference_send",
+            ".plugin",
+            in_sign="sssss",
+            out_sign="",
+            method=self._send_reference,
+            async_=False,
+        )
+
+    def get_handler(self, client):
+        return XEP_0372_Handler()
+
+    def ref_element_to_ref_data(
+        self,
+        reference_elt: domish.Element
+    ) -> Dict[str, Union[str, int, dict]]:
+        ref_data: Dict[str, Union[str, int, dict]] = {
+            "uri": reference_elt["uri"],
+            "type": reference_elt["type"]
+        }
+
+        if ref_data["uri"].startswith("xmpp:"):
+            ref_data["parsed_uri"] = xmpp_uri.parse_xmpp_uri(ref_data["uri"])
+
+        for attr in ("begin", "end"):
+            try:
+                ref_data[attr] = int(reference_elt[attr])
+            except (KeyError, ValueError, TypeError):
+                continue
+
+        anchor = reference_elt.getAttribute("anchor")
+        if anchor is not None:
+            ref_data["anchor"] = anchor
+            if anchor.startswith("xmpp:"):
+                ref_data["parsed_anchor"] = xmpp_uri.parse_xmpp_uri(anchor)
+        return ref_data
+
+    async def _message_received_trigger(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred
+    ) -> bool:
+        """Check if a direct invitation is in the message, and handle it"""
+        reference_elt = next(message_elt.elements(NS_REFS, "reference"), None)
+        if reference_elt is None:
+            return True
+        try:
+            ref_data = self.ref_element_to_ref_data(reference_elt)
+        except KeyError:
+            log.warning("invalid <reference> element: {reference_elt.toXml}")
+            return True
+
+        if not await self.host.trigger.async_point(
+            "XEP-0372_ref_received", client, message_elt, ref_data
+        ):
+            return False
+        return True
+
+    def build_ref_element(
+        self,
+        uri: str,
+        type_: str = "mention",
+        begin: Optional[int] = None,
+        end: Optional[int] = None,
+        anchor: Optional[str] = None,
+    ) -> domish.Element:
+        """Build and return the <reference> element"""
+        if type_ not in ALLOWED_TYPES:
+            raise ValueError(f"Unknown type: {type_!r}")
+        reference_elt = domish.Element(
+            (NS_REFS, "reference"),
+            attribs={"uri": uri, "type": type_}
+        )
+        if begin is not None:
+            reference_elt["begin"] = str(begin)
+        if end is not None:
+            reference_elt["end"] = str(end)
+        if anchor is not None:
+            reference_elt["anchor"] = anchor
+        return reference_elt
+
+    def _send_reference(
+        self,
+        recipient: str,
+        anchor: str,
+        type_: str,
+        extra_s: str,
+        profile_key: str
+    ) -> defer.Deferred:
+        recipient_jid = jid.JID(recipient)
+        client = self.host.get_client(profile_key)
+        extra: dict = data_format.deserialise(extra_s, default={})
+        self.send_reference(
+            client,
+            uri=extra.get("uri"),
+            type_=type_,
+            anchor=anchor,
+            to_jid=recipient_jid
+        )
+
+    def send_reference(
+        self,
+        client: "SatXMPPEntity",
+        uri: Optional[str] = None,
+        type_: str = "mention",
+        begin: Optional[int] = None,
+        end: Optional[int] = None,
+        anchor: Optional[str] = None,
+        message_elt: Optional[domish.Element] = None,
+        to_jid: Optional[jid.JID] = None
+    ) -> None:
+        """Build and send a reference_elt
+
+        @param uri: URI pointing to referenced object (XMPP entity, Pubsub Item, etc)
+            if not set, "to_jid" will be used to build an URI to the entity
+        @param type_: type of reference
+            one of [ALLOWED_TYPES]
+        @param begin: optional begin index
+        @param end: optional end index
+        @param anchor: URI of refering object (message, pubsub item), when the refence
+            is not already in the wrapping message element. In other words, it's the
+            object where the reference appears.
+        @param message_elt: wrapping <message> element, if not set a new one will be
+            generated
+        @param to_jid: destinee of the reference. If not specified, "to" attribute of
+            message_elt will be used.
+
+        """
+        if uri is None:
+            if to_jid is None:
+                raise exceptions.InternalError(
+                    '"to_jid" must be set if "uri is None"'
+                )
+            uri = xmpp_uri.build_xmpp_uri(path=to_jid.full())
+        if message_elt is None:
+            message_elt = domish.Element((None, "message"))
+
+        if to_jid is not None:
+            message_elt["to"] = to_jid.full()
+        else:
+            try:
+                to_jid = jid.JID(message_elt["to"])
+            except (KeyError, RuntimeError):
+                raise exceptions.InternalError(
+                    'invalid "to" attribute in given message element: '
+                    '{message_elt.toXml()}'
+                )
+
+        message_elt.addChild(self.build_ref_element(uri, type_, begin, end, anchor))
+        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
+        client.send(message_elt)
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0372_Handler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_REFS)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []