changeset 3864:ac255a0fbd4c

plugin pubsub attachments: partial implementation of pubsub-attachments protoXEP: This is an implementation of the "Basic Usage" of https://xmpp.org/extensions/inbox/pubsub-attachments.html. rel 370
author Goffi <goffi@goffi.org>
date Wed, 20 Jul 2022 17:49:51 +0200
parents c04f5e8a3568
children 59fbb66b2923
files sat/plugins/plugin_pubsub_attachments.py
diffstat 1 files changed, 483 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_pubsub_attachments.py	Wed Jul 20 17:49:51 2022 +0200
@@ -0,0 +1,483 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for Pubsub Attachments
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import List, Tuple, Dict, Any, Callable, Optional
+
+from twisted.words.protocols.jabber import jid, xmlstream, error
+from twisted.words.xish import domish
+from twisted.internet import defer
+from zope.interface import implementer
+from wokkel import pubsub, disco, iwokkel
+
+from sat.core.constants import Const as C
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.core.core_types import SatXMPPEntity
+from sat.core import exceptions
+from sat.tools.common import uri, data_format, date_utils
+from sat.tools.utils import xmpp_date
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "PUBSUB_ATTACHMENTS"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Pubsub Attachments",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_TYPE: C.PLUG_TYPE_EXP,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0060"],
+    C.PI_MAIN: "PubsubAttachments",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""),
+}
+NS_PREFIX = "urn:xmpp:pubsub-attachments:"
+NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}0"
+NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:0"
+
+
+class PubsubAttachments:
+    namespace = NS_PUBSUB_ATTACHMENTS
+
+    def __init__(self, host):
+        log.info(_("Pubsub Attachments plugin initialization"))
+        host.registerNamespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS)
+        self.host = host
+        self._p = host.plugins["XEP-0060"]
+        self.handlers = {}
+        host.trigger.add("XEP-0277_send", self.onMBSend)
+        self.registerAttachmentHandler(
+            "noticed", NS_PUBSUB_ATTACHMENTS, self.noticedGet, self.noticedSet
+        )
+        self.registerAttachmentHandler(
+            "reaction", NS_PUBSUB_ATTACHMENTS, self.reactionGet, self.reactionSet
+        )
+        host.bridge.addMethod(
+            "psAttachmentsGet",
+            ".plugin",
+            in_sign="sssasss",
+            out_sign="(ss)",
+            method=self._get,
+            async_=True,
+        )
+        host.bridge.addMethod(
+            "psAttachmentsSet",
+            ".plugin",
+            in_sign="ss",
+            out_sign="",
+            method=self._set,
+            async_=True,
+        )
+
+    def getHandler(self, client):
+        return PubsubAttachments_Handler()
+
+    def registerAttachmentHandler(
+        self,
+        name: str,
+        namespace: str,
+        get_cb: Callable[
+            [SatXMPPEntity, domish.Element, Dict[str, Any]],
+            None],
+        set_cb: Callable[
+            [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]],
+            Optional[domish.Element]],
+    ) -> None:
+        """Register callbacks to handle an attachment
+
+        @param name: name of the element
+        @param namespace: namespace of the element
+            (name, namespace) couple must be unique
+        @param get: method to call when attachments are retrieved
+            it will be called with (client, element, data) where element is the
+            <attachments> element to parse, and data must be updated in place with
+            parsed data
+        @param set: method to call when the attachment need to be set or udpated
+            it will be called with (client, data, former_elt of None if there was no
+            former element). When suitable, ``operation`` should be used to check if we
+            request an ``update`` or a ``replace``.
+        """
+        key = (name, namespace)
+        if key in self.handlers:
+            raise exceptions.ConflictError(
+                f"({name}, {namespace}) attachment handlers are already registered"
+            )
+        self.handlers[(name, namespace)] = {
+            "get": get_cb,
+            "set": set_cb
+        }
+
+    def getAttachmentNodeName(self, service: jid.JID, node: str, item: str) -> str:
+        """Generate name to use for attachment node"""
+        target_item_uri = uri.buildXMPPUri(
+            "pubsub",
+            path=service.userhost(),
+            node=node,
+            item=item
+        )
+        return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}"
+
+    def isAttachmentNode(self, node: str) -> bool:
+        """Return True if node name is an attachment node"""
+        return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/")
+
+    def attachmentNode2Item(self, node: str) -> Tuple[jid.JID, str, str]:
+        """Retrieve service, node and item from attachement node's name"""
+        if not self.isAttachmentNode(node):
+            raise ValueError("this is not an attachment node!")
+        prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/")
+        item_uri = node[prefix_len:]
+        parsed_uri = uri.parseXMPPUri(item_uri)
+        if parsed_uri["type"] != "pubsub":
+            raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}")
+        try:
+            service = jid.JID(parsed_uri["path"])
+        except RuntimeError:
+            raise ValueError(f"invalid service in pubsub URI: {item_uri}")
+        node = parsed_uri["node"]
+        item = parsed_uri["item"]
+        return (service, node, item)
+
+    async def onMBSend(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        item: domish.Element,
+        data: dict
+    ) -> bool:
+        """trigger to create attachment node on each publication"""
+        node_config = await self._p.getConfiguration(client, service, node)
+        attachment_node = self.getAttachmentNodeName(service, node, item["id"])
+        # we use the same options as target node
+        try:
+            await self._p.createIfNewNode(
+                client, service, attachment_node, options=dict(node_config)
+            )
+        except Exception as e:
+            log.warning(f"Can't create attachment node {attachment_node}: {e}]")
+        return True
+
+    def items2attachmentData(
+        self,
+        client: SatXMPPEntity,
+        items: List[domish.Element]
+    ) -> List[Dict[str, Any]]:
+        """Convert items from attachment node to attachment data"""
+        list_data = []
+        for item in items:
+            try:
+                attachments_elt = next(
+                    item.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
+                )
+            except StopIteration:
+                log.warning(
+                    "item is missing <attachments> elements, ignoring it: {item.toXml()}"
+                )
+                continue
+            item_id = item["id"]
+            publisher_s = item.getAttribute("publisher")
+            # publisher is not filled by all pubsub service, so we can't count on it
+            if publisher_s:
+                publisher = jid.JID(publisher_s)
+                if publisher.userhost() != item_id:
+                    log.warning(
+                        f"publisher {publisher.userhost()!r} doesn't correspond to item "
+                        f"id {item['id']!r}, ignoring. This may be a hack attempt.\n"
+                        f"{item.toXml()}"
+                    )
+                    continue
+            try:
+                jid.JID(item_id)
+            except RuntimeError:
+                log.warning(
+                    "item ID is not a JID, this is not compliant and is ignored: "
+                    f"{item.toXml}"
+                )
+                continue
+            data = {
+                "from": item_id
+            }
+            for handler in self.handlers.values():
+                handler["get"](client, attachments_elt, data)
+            if len(data) > 1:
+                list_data.append(data)
+        return list_data
+
+    def _get(
+        self,
+        service_s: str,
+        node: str,
+        item: str,
+        senders_s: List[str],
+        extra_s: str,
+        profile_key: str
+    ) -> defer.Deferred:
+        client = self.host.getClient(profile_key)
+        extra = data_format.deserialise(extra_s)
+        senders = [jid.JID(s) for s in senders_s]
+        d = defer.ensureDeferred(
+            self.getAttachments(client, jid.JID(service_s), node, item, senders)
+        )
+        d.addCallback(
+            lambda ret:
+            (data_format.serialise(ret[0]),
+             data_format.serialise(ret[1]))
+        )
+        return d
+
+    async def getAttachments(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        item: str,
+        senders: Optional[List[jid.JID]],
+        extra: Optional[dict] = None
+    ) -> Tuple[List[Dict[str, Any]], dict]:
+        if extra is None:
+            extra = {}
+        attachment_node = self.getAttachmentNodeName(service, node, item)
+        item_ids = [e.userhost() for e in senders] if senders else None
+        items, metadata = await self._p.getItems(
+            client, service, attachment_node, item_ids=item_ids, extra=extra
+        )
+        list_data = self.items2attachmentData(client, items)
+
+        return list_data, metadata
+
+    def _set(
+        self,
+        attachments_s: str,
+        profile_key: str
+    ) -> None:
+        client = self.host.getClient(profile_key)
+        attachments = data_format.deserialise(attachments_s)  or {}
+        return defer.ensureDeferred(self.setAttachments( client, attachments))
+
+    async def setAttachments(
+        self,
+        client: SatXMPPEntity,
+        data: Dict[str, Any]
+    ) -> None:
+        """Set or update attachments
+
+        Former <attachments> element will be retrieved and updated. Individual
+        attachments replace or update their elements individually, according to the
+        "operation" key.
+
+        "operation" key may be "update" or "replace", and defaults to update, it is only
+        used in attachments where "update" makes sense (e.g. it's used for "reactions"
+        but not for "noticed").
+
+        @param data: microblog data data. Various keys (usually stored in
+            data["extra"]) may be used depending on the attachments handlers
+            registered. The keys "service", "node" and "id" MUST be set.
+        """
+        data.setdefault("extra", {})
+        try:
+            service = jid.JID(data["service"])
+            node = data["node"]
+            item = data["id"]
+        except (KeyError, RuntimeError):
+            raise ValueError(
+                'data must have "service", "node" and "id" set'
+            )
+        attachment_node = self.getAttachmentNodeName(service, node, item)
+        items, __ = await self._p.getItems(
+            client, service, attachment_node, item_ids=[client.jid.userhost()]
+        )
+        if not items:
+            # the item doesn't exist, we create a new one
+            item_elt = pubsub.Item(client.jid.userhost())
+            item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
+        else:
+            item_elt = items[0]
+
+        try:
+            attachments_elt = next(
+                item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
+            )
+        except StopIteration:
+            log.warning(
+                f"no <attachments> element found, creating a new one: {item_elt.toXml()}"
+            )
+            attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
+
+        for (name, namespace), handler in self.handlers.items():
+            try:
+                former_elt = next(attachments_elt.elements(namespace, name))
+            except StopIteration:
+                former_elt = None
+            new_elt = handler["set"](client, data, former_elt)
+            if new_elt != former_elt:
+                if former_elt is not None:
+                    attachments_elt.children.remove(former_elt)
+                if new_elt is not None:
+                    attachments_elt.addChild(new_elt)
+        try:
+            await self._p.sendItems(client, service, attachment_node, [item_elt])
+        except error.StanzaError as e:
+            if e.condition == "item-not-found":
+                # the node doesn't exist, we can't publish attachments
+                log.warning(
+                    f"no attachment node found at {service} on {node!r} for item "
+                    f"{item!r}, we can't update attachments."
+                )
+                raise exceptions.NotFound("No attachment node available")
+            else:
+                raise e
+
+    async def subscribe(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        node: str,
+        item: str,
+    ) -> None:
+        """Subscribe to attachment node targeting the item
+
+        @param service: service of target item (will also be used for attachment node)
+        @param node: node of target item (used to get attachment node's name)
+        @param item: name of target item (used to get attachment node's name)
+        """
+        attachment_node = self.getAttachmentNodeName(service, node, item)
+        await self._p.subscribe(client, service, attachment_node)
+
+
+    def setTimestamp(self, attachment_elt: domish.Element, data: dict) -> None:
+        """Check if a ``timestamp`` attribute is set, parse it, and fill data
+
+        @param attachments_elt: element where the ``timestamp`` attribute may be set
+        @param data: data specific to the attachment (i.e. not the whole microblog data)
+            ``timestamp`` field will be set there if timestamp exists and is parsable
+        """
+        timestamp_raw = attachment_elt.getAttribute("timestamp")
+        if timestamp_raw:
+            try:
+                timestamp = date_utils.date_parse(timestamp_raw)
+            except date_utils.ParserError:
+                log.warning(f"can't parse timestamp: {timestamp_raw}")
+            else:
+                data["timestamp"] = timestamp
+
+    def noticedGet(
+        self,
+        client: SatXMPPEntity,
+        attachments_elt: domish.Element,
+        data: Dict[str, Any],
+    ) -> None:
+        try:
+            noticed_elt = next(
+                attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed")
+            )
+        except StopIteration:
+            pass
+        else:
+            noticed_data = {
+                "noticed": True
+            }
+            self.setTimestamp(noticed_elt, noticed_data)
+            data["noticed"] = noticed_data
+
+    def noticedSet(
+        self,
+        client: SatXMPPEntity,
+        data: Dict[str, Any],
+        former_elt: Optional[domish.Element]
+    ) -> Optional[domish.Element]:
+        """add or remove a <noticed> attachment
+
+        if data["noticed"] is True, element is added, if it's False, it's removed, and
+        it's not present or None, the former element is kept.
+        """
+        noticed = data["extra"].get("noticed")
+        if noticed is None:
+            return former_elt
+        elif noticed:
+            return domish.Element(
+                (NS_PUBSUB_ATTACHMENTS, "noticed"),
+                attribs = {
+                    "timestamp": xmpp_date()
+                }
+            )
+        else:
+            return None
+
+    def reactionGet(
+        self,
+        client: SatXMPPEntity,
+        attachments_elt: domish.Element,
+        data: Dict[str, Any],
+    ) -> None:
+        try:
+            reaction_elt = next(
+                attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction")
+            )
+        except StopIteration:
+            pass
+        else:
+            reaction_data = {
+                "reactions": str(reaction_elt)
+            }
+            self.setTimestamp(reaction_elt, reaction_data)
+            data["reaction"] = reaction_data
+
+    def reactionSet(
+        self,
+        client: SatXMPPEntity,
+        data: Dict[str, Any],
+        former_elt: Optional[domish.Element]
+    ) -> Optional[domish.Element]:
+        """update the <reaction> attachment"""
+        reaction = data["extra"].get("reaction")
+        if reaction is None:
+            return former_elt
+        operation_type = reaction.get("operation", "update")
+        if operation_type == "update":
+            reactions = "".join(
+                set(str(former_elt or ""))
+                | set(reaction.get("reactions") or "")
+            )
+        elif operation_type == "replace":
+            reactions = reaction.get("reactions", "")
+        else:
+            raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}")
+        if reactions:
+            reaction_elt = domish.Element(
+                (NS_PUBSUB_ATTACHMENTS, "reaction"),
+                attribs = {
+                    "timestamp": xmpp_date()
+                }
+            )
+            reaction_elt.addContent(reactions)
+            return reaction_elt
+        else:
+            return None
+
+
+@implementer(iwokkel.IDisco)
+class PubsubAttachments_Handler(xmlstream.XMPPHandler):
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []