diff libervia/backend/plugins/plugin_xep_0470.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0470.py@524856bd7b19
children 2109d864a3e7
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0470.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,591 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for Pubsub Attachments
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import List, Tuple, Dict, Any, Callable, Optional
+
+from twisted.words.protocols.jabber import jid, xmlstream, error
+from twisted.words.xish import domish
+from twisted.internet import defer
+from zope.interface import implementer
+from wokkel import pubsub, disco, iwokkel
+
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core import exceptions
+from libervia.backend.tools.common import uri, data_format, date_utils
+from libervia.backend.tools.utils import as_deferred, xmpp_date
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "XEP-0470"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Pubsub Attachments",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_TYPE: C.PLUG_TYPE_XEP,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0060"],
+    C.PI_MAIN: "PubsubAttachments",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""),
+}
+NS_PREFIX = "urn:xmpp:pubsub-attachments:"
+NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}1"
+NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:1"
+
+
+class PubsubAttachments:
+    namespace = NS_PUBSUB_ATTACHMENTS
+
+    def __init__(self, host):
+        log.info(_("XEP-0470 (Pubsub Attachments) plugin initialization"))
+        host.register_namespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS)
+        self.host = host
+        self._p = host.plugins["XEP-0060"]
+        self.handlers: Dict[Tuple[str, str], dict[str, Any]] = {}
+        host.trigger.add("XEP-0277_send", self.on_mb_send)
+        self.register_attachment_handler(
+            "noticed", NS_PUBSUB_ATTACHMENTS, self.noticed_get, self.noticed_set
+        )
+        self.register_attachment_handler(
+            "reactions", NS_PUBSUB_ATTACHMENTS, self.reactions_get, self.reactions_set
+        )
+        host.bridge.add_method(
+            "ps_attachments_get",
+            ".plugin",
+            in_sign="sssasss",
+            out_sign="(ss)",
+            method=self._get,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_attachments_set",
+            ".plugin",
+            in_sign="ss",
+            out_sign="",
+            method=self._set,
+            async_=True,
+        )
+
+    def get_handler(self, client):
+        return PubsubAttachments_Handler()
+
+    def register_attachment_handler(
+        self,
+        name: str,
+        namespace: str,
+        get_cb: Callable[
+            [SatXMPPEntity, domish.Element, Dict[str, Any]],
+            None],
+        set_cb: Callable[
+            [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]],
+            Optional[domish.Element]],
+    ) -> None:
+        """Register callbacks to handle an attachment
+
+        @param name: name of the element
+        @param namespace: namespace of the element
+            (name, namespace) couple must be unique
+        @param get: method to call when attachments are retrieved
+            it will be called with (client, element, data) where element is the
+            <attachments> element to parse, and data must be updated in place with
+            parsed data
+        @param set: method to call when the attachment need to be set or udpated
+            it will be called with (client, data, former_elt of None if there was no
+            former element). When suitable, ``operation`` should be used to check if we
+            request an ``update`` or a ``replace``.
+            The callback can be either a blocking method, a Deferred or a coroutine
+        """
+        key = (name, namespace)
+        if key in self.handlers:
+            raise exceptions.ConflictError(
+                f"({name}, {namespace}) attachment handlers are already registered"
+            )
+        self.handlers[(name, namespace)] = {
+            "get": get_cb,
+            "set": set_cb
+        }
+
+    def get_attachment_node_name(self, service: jid.JID, node: str, item: str) -> str:
+        """Generate name to use for attachment node"""
+        target_item_uri = uri.build_xmpp_uri(
+            "pubsub",
+            path=service.userhost(),
+            node=node,
+            item=item
+        )
+        return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}"
+
+    def is_attachment_node(self, node: str) -> bool:
+        """Return True if node name is an attachment node"""
+        return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/")
+
+    def attachment_node_2_item(self, node: str) -> Tuple[jid.JID, str, str]:
+        """Retrieve service, node and item from attachement node's name"""
+        if not self.is_attachment_node(node):
+            raise ValueError("this is not an attachment node!")
+        prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/")
+        item_uri = node[prefix_len:]
+        parsed_uri = uri.parse_xmpp_uri(item_uri)
+        if parsed_uri["type"] != "pubsub":
+            raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}")
+        try:
+            service = jid.JID(parsed_uri["path"])
+        except RuntimeError:
+            raise ValueError(f"invalid service in pubsub URI: {item_uri}")
+        node = parsed_uri["node"]
+        item = parsed_uri["item"]
+        return (service, node, item)
+
+    async def on_mb_send(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        item: domish.Element,
+        data: dict
+    ) -> bool:
+        """trigger to create attachment node on each publication"""
+        await self.create_attachments_node(
+            client, service, node, item["id"], autocreate=True
+        )
+        return True
+
+    async def create_attachments_node(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        item_id: str,
+        autocreate: bool = False
+    ):
+        """Create node for attachements if necessary
+
+        @param service: service of target node
+        @param node: node where target item is published
+        @param item_id: ID of target item
+        @param autocrate: if True, target node is create if it doesn't exist
+        """
+        try:
+            node_config = await self._p.getConfiguration(client, service, node)
+        except error.StanzaError as e:
+            if e.condition == "item-not-found" and autocreate:
+                # we auto-create the missing node
+                await self._p.createNode(
+                    client, service, node
+                )
+                node_config = await self._p.getConfiguration(client, service, node)
+            elif e.condition == "forbidden":
+                node_config = self._p.make_configuration_form({})
+            else:
+                raise e
+        try:
+            # FIXME: check if this is the best publish_model option
+            node_config.fields["pubsub#publish_model"].value = "open"
+        except KeyError:
+            log.warning("pubsub#publish_model field is missing")
+        attachment_node = self.get_attachment_node_name(service, node, item_id)
+        # we use the same options as target node
+        try:
+            await self._p.create_if_new_node(
+                client, service, attachment_node, options=dict(node_config)
+            )
+        except Exception as e:
+            log.warning(f"Can't create attachment node {attachment_node}: {e}")
+
+    def items_2_attachment_data(
+        self,
+        client: SatXMPPEntity,
+        items: List[domish.Element]
+    ) -> List[Dict[str, Any]]:
+        """Convert items from attachment node to attachment data"""
+        list_data = []
+        for item in items:
+            try:
+                attachments_elt = next(
+                    item.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
+                )
+            except StopIteration:
+                log.warning(
+                    "item is missing <attachments> elements, ignoring it: {item.toXml()}"
+                )
+                continue
+            item_id = item["id"]
+            publisher_s = item.getAttribute("publisher")
+            # publisher is not filled by all pubsub service, so we can't count on it
+            if publisher_s:
+                publisher = jid.JID(publisher_s)
+                if publisher.userhost() != item_id:
+                    log.warning(
+                        f"publisher {publisher.userhost()!r} doesn't correspond to item "
+                        f"id {item['id']!r}, ignoring. This may be a hack attempt.\n"
+                        f"{item.toXml()}"
+                    )
+                    continue
+            try:
+                jid.JID(item_id)
+            except RuntimeError:
+                log.warning(
+                    "item ID is not a JID, this is not compliant and is ignored: "
+                    f"{item.toXml}"
+                )
+                continue
+            data = {
+                "from": item_id
+            }
+            for handler in self.handlers.values():
+                handler["get"](client, attachments_elt, data)
+            if len(data) > 1:
+                list_data.append(data)
+        return list_data
+
+    def _get(
+        self,
+        service_s: str,
+        node: str,
+        item: str,
+        senders_s: List[str],
+        extra_s: str,
+        profile_key: str
+    ) -> defer.Deferred:
+        client = self.host.get_client(profile_key)
+        extra = data_format.deserialise(extra_s)
+        senders = [jid.JID(s) for s in senders_s]
+        d = defer.ensureDeferred(
+            self.get_attachments(client, jid.JID(service_s), node, item, senders)
+        )
+        d.addCallback(
+            lambda ret:
+            (data_format.serialise(ret[0]),
+             data_format.serialise(ret[1]))
+        )
+        return d
+
+    async def get_attachments(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        item: str,
+        senders: Optional[List[jid.JID]],
+        extra: Optional[dict] = None
+    ) -> Tuple[List[Dict[str, Any]], dict]:
+        """Retrieve data attached to a pubsub item
+
+        @param service: pubsub service where the node is
+        @param node: pubsub node containing the item
+        @param item: ID of the item for which attachments will be retrieved
+        @param senders: bare JIDs of entities that are checked. Attachments from those
+            entities will be retrieved.
+            If None, attachments from all entities will be retrieved
+        @param extra: extra data, will be used as ``extra`` argument when doing
+        ``get_items`` call.
+        @return: A tuple with:
+            - the list of attachments data, one item per found sender. The attachments
+              data are dict containing attachment, no ``extra`` field is used here
+              (contrarily to attachments data used with ``set_attachements``).
+            - metadata returned by the call to ``get_items``
+        """
+        if extra is None:
+            extra = {}
+        attachment_node = self.get_attachment_node_name(service, node, item)
+        item_ids = [e.userhost() for e in senders] if senders else None
+        items, metadata = await self._p.get_items(
+            client, service, attachment_node, item_ids=item_ids, extra=extra
+        )
+        list_data = self.items_2_attachment_data(client, items)
+
+        return list_data, metadata
+
+    def _set(
+        self,
+        attachments_s: str,
+        profile_key: str
+    ) -> None:
+        client = self.host.get_client(profile_key)
+        attachments = data_format.deserialise(attachments_s)  or {}
+        return defer.ensureDeferred(self.set_attachements(client, attachments))
+
+    async def apply_set_handler(
+        self,
+        client: SatXMPPEntity,
+        attachments_data: dict,
+        item_elt: Optional[domish.Element],
+        handlers: Optional[List[Tuple[str, str]]] = None,
+        from_jid: Optional[jid.JID] = None,
+    ) -> domish.Element:
+        """Apply all ``set`` callbacks to an attachments item
+
+        @param attachments_data: data describing the attachments
+            ``extra`` key will be used, and created if not found
+        @param from_jid: jid of the author of the attachments
+            ``client.jid.userhostJID()`` will be used if not specified
+        @param item_elt: item containing an <attachments> element
+            will be modified in place
+            if None, a new element will be created
+        @param handlers: list of (name, namespace) of handlers to use.
+            if None, all registered handlers will be used.
+        @return: updated item_elt if given, otherwise a new item_elt
+        """
+        attachments_data.setdefault("extra", {})
+        if item_elt is None:
+            item_id = client.jid.userhost() if from_jid is None else from_jid.userhost()
+            item_elt = pubsub.Item(item_id)
+            item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
+
+        try:
+            attachments_elt = next(
+                item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
+            )
+        except StopIteration:
+            log.warning(
+                f"no <attachments> element found, creating a new one: {item_elt.toXml()}"
+            )
+            attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
+
+        if handlers is None:
+            handlers = list(self.handlers.keys())
+
+        for name, namespace in handlers:
+            try:
+                handler = self.handlers[(name, namespace)]
+            except KeyError:
+                log.error(
+                    f"unregistered handler ({name!r}, {namespace!r}) is requested, "
+                    "ignoring"
+                )
+                continue
+            try:
+                former_elt = next(attachments_elt.elements(namespace, name))
+            except StopIteration:
+                former_elt = None
+            new_elt = await as_deferred(
+                handler["set"], client, attachments_data, former_elt
+            )
+            if new_elt != former_elt:
+                if former_elt is not None:
+                    attachments_elt.children.remove(former_elt)
+                if new_elt is not None:
+                    attachments_elt.addChild(new_elt)
+        return item_elt
+
+    async def set_attachements(
+        self,
+        client: SatXMPPEntity,
+        attachments_data: Dict[str, Any]
+    ) -> None:
+        """Set or update attachments
+
+        Former <attachments> element will be retrieved and updated. Individual
+        attachments replace or update their elements individually, according to the
+        "operation" key.
+
+        "operation" key may be "update" or "replace", and defaults to update, it is only
+        used in attachments where "update" makes sense (e.g. it's used for "reactions"
+        but not for "noticed").
+
+        @param attachments_data: data describing attachments. Various keys (usually stored
+            in attachments_data["extra"]) may be used depending on the attachments
+            handlers registered. The keys "service", "node" and "id" MUST be set.
+            ``attachments_data`` is thought to be compatible with microblog data.
+
+        """
+        try:
+            service = jid.JID(attachments_data["service"])
+            node = attachments_data["node"]
+            item = attachments_data["id"]
+        except (KeyError, RuntimeError):
+            raise ValueError(
+                'data must have "service", "node" and "id" set'
+            )
+        attachment_node = self.get_attachment_node_name(service, node, item)
+        try:
+            items, __ = await self._p.get_items(
+                client, service, attachment_node, item_ids=[client.jid.userhost()]
+            )
+        except exceptions.NotFound:
+            item_elt = None
+        else:
+            if not items:
+                item_elt = None
+            else:
+                item_elt = items[0]
+
+        item_elt = await self.apply_set_handler(
+            client,
+            attachments_data,
+            item_elt=item_elt,
+        )
+
+        try:
+            await self._p.send_items(client, service, attachment_node, [item_elt])
+        except error.StanzaError as e:
+            if e.condition == "item-not-found":
+                # the node doesn't exist, we can't publish attachments
+                log.warning(
+                    f"no attachment node found at {service} on {node!r} for item "
+                    f"{item!r}, we can't update attachments."
+                )
+                raise exceptions.NotFound("No attachment node available")
+            else:
+                raise e
+
+    async def subscribe(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        item: str,
+    ) -> None:
+        """Subscribe to attachment node targeting the item
+
+        @param service: service of target item (will also be used for attachment node)
+        @param node: node of target item (used to get attachment node's name)
+        @param item: name of target item (used to get attachment node's name)
+        """
+        attachment_node = self.get_attachment_node_name(service, node, item)
+        await self._p.subscribe(client, service, attachment_node)
+
+
+    def set_timestamp(self, attachment_elt: domish.Element, data: dict) -> None:
+        """Check if a ``timestamp`` attribute is set, parse it, and fill data
+
+        @param attachments_elt: element where the ``timestamp`` attribute may be set
+        @param data: data specific to the attachment (i.e. not the whole microblog data)
+            ``timestamp`` field will be set there if timestamp exists and is parsable
+        """
+        timestamp_raw = attachment_elt.getAttribute("timestamp")
+        if timestamp_raw:
+            try:
+                timestamp = date_utils.date_parse(timestamp_raw)
+            except date_utils.ParserError:
+                log.warning(f"can't parse timestamp: {timestamp_raw}")
+            else:
+                data["timestamp"] = timestamp
+
+    def noticed_get(
+        self,
+        client: SatXMPPEntity,
+        attachments_elt: domish.Element,
+        data: Dict[str, Any],
+    ) -> None:
+        try:
+            noticed_elt = next(
+                attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed")
+            )
+        except StopIteration:
+            pass
+        else:
+            noticed_data = {
+                "noticed": True
+            }
+            self.set_timestamp(noticed_elt, noticed_data)
+            data["noticed"] = noticed_data
+
+    def noticed_set(
+        self,
+        client: SatXMPPEntity,
+        data: Dict[str, Any],
+        former_elt: Optional[domish.Element]
+    ) -> Optional[domish.Element]:
+        """add or remove a <noticed> attachment
+
+        if data["noticed"] is True, element is added, if it's False, it's removed, and
+        it's not present or None, the former element is kept.
+        """
+        noticed = data["extra"].get("noticed")
+        if noticed is None:
+            return former_elt
+        elif noticed:
+            return domish.Element(
+                (NS_PUBSUB_ATTACHMENTS, "noticed"),
+                attribs = {
+                    "timestamp": xmpp_date()
+                }
+            )
+        else:
+            return None
+
+    def reactions_get(
+        self,
+        client: SatXMPPEntity,
+        attachments_elt: domish.Element,
+        data: Dict[str, Any],
+    ) -> None:
+        try:
+            reactions_elt = next(
+                attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reactions")
+            )
+        except StopIteration:
+            pass
+        else:
+            reactions_data = {"reactions": []}
+            reactions = reactions_data["reactions"]
+            for reaction_elt in reactions_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction"):
+                reactions.append(str(reaction_elt))
+            self.set_timestamp(reactions_elt, reactions_data)
+            data["reactions"] = reactions_data
+
+    def reactions_set(
+        self,
+        client: SatXMPPEntity,
+        data: Dict[str, Any],
+        former_elt: Optional[domish.Element]
+    ) -> Optional[domish.Element]:
+        """update the <reaction> attachment"""
+        reactions_data = data["extra"].get("reactions")
+        if reactions_data is None:
+            return former_elt
+        operation_type = reactions_data.get("operation", "update")
+        if operation_type == "update":
+            former_reactions = {
+                str(r) for r in former_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction")
+            } if former_elt is not None else set()
+            added_reactions = set(reactions_data.get("add") or [])
+            removed_reactions = set(reactions_data.get("remove") or [])
+            reactions = list((former_reactions | added_reactions) - removed_reactions)
+        elif operation_type == "replace":
+            reactions = reactions_data.get("reactions") or []
+        else:
+            raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}")
+        if reactions:
+            reactions_elt = domish.Element(
+                (NS_PUBSUB_ATTACHMENTS, "reactions"),
+                attribs = {
+                    "timestamp": xmpp_date()
+                }
+            )
+            for reactions_data in reactions:
+                reactions_elt.addElement("reaction", content=reactions_data)
+            return reactions_elt
+        else:
+            return None
+
+
+@implementer(iwokkel.IDisco)
+class PubsubAttachments_Handler(xmlstream.XMPPHandler):
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []