view libervia/backend/plugins/plugin_xep_0470.py @ 4139:6745c6bd4c7a

frontends (tools): webrtc implementation: this is a factored implementation usable by all non-web frontends. Sources and Sinks can be configured easily to use tests source or local webcam/microphone, autosinks or a `appsink` that the frontend will use. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:03:36 +0100
parents 4b842c1fb686
children 2109d864a3e7
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Pubsub Attachments
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import List, Tuple, Dict, Any, Callable, Optional

from twisted.words.protocols.jabber import jid, xmlstream, error
from twisted.words.xish import domish
from twisted.internet import defer
from zope.interface import implementer
from wokkel import pubsub, disco, iwokkel

from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core import exceptions
from libervia.backend.tools.common import uri, data_format, date_utils
from libervia.backend.tools.utils import as_deferred, xmpp_date


log = getLogger(__name__)

IMPORT_NAME = "XEP-0470"

PLUGIN_INFO = {
    C.PI_NAME: "Pubsub Attachments",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_TYPE: C.PLUG_TYPE_XEP,
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0060"],
    C.PI_MAIN: "PubsubAttachments",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""),
}
NS_PREFIX = "urn:xmpp:pubsub-attachments:"
NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}1"
NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:1"


class PubsubAttachments:
    namespace = NS_PUBSUB_ATTACHMENTS

    def __init__(self, host):
        log.info(_("XEP-0470 (Pubsub Attachments) plugin initialization"))
        host.register_namespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS)
        self.host = host
        self._p = host.plugins["XEP-0060"]
        self.handlers: Dict[Tuple[str, str], dict[str, Any]] = {}
        host.trigger.add("XEP-0277_send", self.on_mb_send)
        self.register_attachment_handler(
            "noticed", NS_PUBSUB_ATTACHMENTS, self.noticed_get, self.noticed_set
        )
        self.register_attachment_handler(
            "reactions", NS_PUBSUB_ATTACHMENTS, self.reactions_get, self.reactions_set
        )
        host.bridge.add_method(
            "ps_attachments_get",
            ".plugin",
            in_sign="sssasss",
            out_sign="(ss)",
            method=self._get,
            async_=True,
        )
        host.bridge.add_method(
            "ps_attachments_set",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._set,
            async_=True,
        )

    def get_handler(self, client):
        return PubsubAttachments_Handler()

    def register_attachment_handler(
        self,
        name: str,
        namespace: str,
        get_cb: Callable[
            [SatXMPPEntity, domish.Element, Dict[str, Any]],
            None],
        set_cb: Callable[
            [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]],
            Optional[domish.Element]],
    ) -> None:
        """Register callbacks to handle an attachment

        @param name: name of the element
        @param namespace: namespace of the element
            (name, namespace) couple must be unique
        @param get: method to call when attachments are retrieved
            it will be called with (client, element, data) where element is the
            <attachments> element to parse, and data must be updated in place with
            parsed data
        @param set: method to call when the attachment need to be set or udpated
            it will be called with (client, data, former_elt of None if there was no
            former element). When suitable, ``operation`` should be used to check if we
            request an ``update`` or a ``replace``.
            The callback can be either a blocking method, a Deferred or a coroutine
        """
        key = (name, namespace)
        if key in self.handlers:
            raise exceptions.ConflictError(
                f"({name}, {namespace}) attachment handlers are already registered"
            )
        self.handlers[(name, namespace)] = {
            "get": get_cb,
            "set": set_cb
        }

    def get_attachment_node_name(self, service: jid.JID, node: str, item: str) -> str:
        """Generate name to use for attachment node"""
        target_item_uri = uri.build_xmpp_uri(
            "pubsub",
            path=service.userhost(),
            node=node,
            item=item
        )
        return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}"

    def is_attachment_node(self, node: str) -> bool:
        """Return True if node name is an attachment node"""
        return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/")

    def attachment_node_2_item(self, node: str) -> Tuple[jid.JID, str, str]:
        """Retrieve service, node and item from attachement node's name"""
        if not self.is_attachment_node(node):
            raise ValueError("this is not an attachment node!")
        prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/")
        item_uri = node[prefix_len:]
        parsed_uri = uri.parse_xmpp_uri(item_uri)
        if parsed_uri["type"] != "pubsub":
            raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}")
        try:
            service = jid.JID(parsed_uri["path"])
        except RuntimeError:
            raise ValueError(f"invalid service in pubsub URI: {item_uri}")
        node = parsed_uri["node"]
        item = parsed_uri["item"]
        return (service, node, item)

    async def on_mb_send(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        item: domish.Element,
        data: dict
    ) -> bool:
        """trigger to create attachment node on each publication"""
        await self.create_attachments_node(
            client, service, node, item["id"], autocreate=True
        )
        return True

    async def create_attachments_node(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        item_id: str,
        autocreate: bool = False
    ):
        """Create node for attachements if necessary

        @param service: service of target node
        @param node: node where target item is published
        @param item_id: ID of target item
        @param autocrate: if True, target node is create if it doesn't exist
        """
        try:
            node_config = await self._p.getConfiguration(client, service, node)
        except error.StanzaError as e:
            if e.condition == "item-not-found" and autocreate:
                # we auto-create the missing node
                await self._p.createNode(
                    client, service, node
                )
                node_config = await self._p.getConfiguration(client, service, node)
            elif e.condition == "forbidden":
                node_config = self._p.make_configuration_form({})
            else:
                raise e
        try:
            # FIXME: check if this is the best publish_model option
            node_config.fields["pubsub#publish_model"].value = "open"
        except KeyError:
            log.warning("pubsub#publish_model field is missing")
        attachment_node = self.get_attachment_node_name(service, node, item_id)
        # we use the same options as target node
        try:
            await self._p.create_if_new_node(
                client, service, attachment_node, options=dict(node_config)
            )
        except Exception as e:
            log.warning(f"Can't create attachment node {attachment_node}: {e}")

    def items_2_attachment_data(
        self,
        client: SatXMPPEntity,
        items: List[domish.Element]
    ) -> List[Dict[str, Any]]:
        """Convert items from attachment node to attachment data"""
        list_data = []
        for item in items:
            try:
                attachments_elt = next(
                    item.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
                )
            except StopIteration:
                log.warning(
                    "item is missing <attachments> elements, ignoring it: {item.toXml()}"
                )
                continue
            item_id = item["id"]
            publisher_s = item.getAttribute("publisher")
            # publisher is not filled by all pubsub service, so we can't count on it
            if publisher_s:
                publisher = jid.JID(publisher_s)
                if publisher.userhost() != item_id:
                    log.warning(
                        f"publisher {publisher.userhost()!r} doesn't correspond to item "
                        f"id {item['id']!r}, ignoring. This may be a hack attempt.\n"
                        f"{item.toXml()}"
                    )
                    continue
            try:
                jid.JID(item_id)
            except RuntimeError:
                log.warning(
                    "item ID is not a JID, this is not compliant and is ignored: "
                    f"{item.toXml}"
                )
                continue
            data = {
                "from": item_id
            }
            for handler in self.handlers.values():
                handler["get"](client, attachments_elt, data)
            if len(data) > 1:
                list_data.append(data)
        return list_data

    def _get(
        self,
        service_s: str,
        node: str,
        item: str,
        senders_s: List[str],
        extra_s: str,
        profile_key: str
    ) -> defer.Deferred:
        client = self.host.get_client(profile_key)
        extra = data_format.deserialise(extra_s)
        senders = [jid.JID(s) for s in senders_s]
        d = defer.ensureDeferred(
            self.get_attachments(client, jid.JID(service_s), node, item, senders)
        )
        d.addCallback(
            lambda ret:
            (data_format.serialise(ret[0]),
             data_format.serialise(ret[1]))
        )
        return d

    async def get_attachments(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        item: str,
        senders: Optional[List[jid.JID]],
        extra: Optional[dict] = None
    ) -> Tuple[List[Dict[str, Any]], dict]:
        """Retrieve data attached to a pubsub item

        @param service: pubsub service where the node is
        @param node: pubsub node containing the item
        @param item: ID of the item for which attachments will be retrieved
        @param senders: bare JIDs of entities that are checked. Attachments from those
            entities will be retrieved.
            If None, attachments from all entities will be retrieved
        @param extra: extra data, will be used as ``extra`` argument when doing
        ``get_items`` call.
        @return: A tuple with:
            - the list of attachments data, one item per found sender. The attachments
              data are dict containing attachment, no ``extra`` field is used here
              (contrarily to attachments data used with ``set_attachements``).
            - metadata returned by the call to ``get_items``
        """
        if extra is None:
            extra = {}
        attachment_node = self.get_attachment_node_name(service, node, item)
        item_ids = [e.userhost() for e in senders] if senders else None
        items, metadata = await self._p.get_items(
            client, service, attachment_node, item_ids=item_ids, extra=extra
        )
        list_data = self.items_2_attachment_data(client, items)

        return list_data, metadata

    def _set(
        self,
        attachments_s: str,
        profile_key: str
    ) -> None:
        client = self.host.get_client(profile_key)
        attachments = data_format.deserialise(attachments_s)  or {}
        return defer.ensureDeferred(self.set_attachements(client, attachments))

    async def apply_set_handler(
        self,
        client: SatXMPPEntity,
        attachments_data: dict,
        item_elt: Optional[domish.Element],
        handlers: Optional[List[Tuple[str, str]]] = None,
        from_jid: Optional[jid.JID] = None,
    ) -> domish.Element:
        """Apply all ``set`` callbacks to an attachments item

        @param attachments_data: data describing the attachments
            ``extra`` key will be used, and created if not found
        @param from_jid: jid of the author of the attachments
            ``client.jid.userhostJID()`` will be used if not specified
        @param item_elt: item containing an <attachments> element
            will be modified in place
            if None, a new element will be created
        @param handlers: list of (name, namespace) of handlers to use.
            if None, all registered handlers will be used.
        @return: updated item_elt if given, otherwise a new item_elt
        """
        attachments_data.setdefault("extra", {})
        if item_elt is None:
            item_id = client.jid.userhost() if from_jid is None else from_jid.userhost()
            item_elt = pubsub.Item(item_id)
            item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))

        try:
            attachments_elt = next(
                item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
            )
        except StopIteration:
            log.warning(
                f"no <attachments> element found, creating a new one: {item_elt.toXml()}"
            )
            attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))

        if handlers is None:
            handlers = list(self.handlers.keys())

        for name, namespace in handlers:
            try:
                handler = self.handlers[(name, namespace)]
            except KeyError:
                log.error(
                    f"unregistered handler ({name!r}, {namespace!r}) is requested, "
                    "ignoring"
                )
                continue
            try:
                former_elt = next(attachments_elt.elements(namespace, name))
            except StopIteration:
                former_elt = None
            new_elt = await as_deferred(
                handler["set"], client, attachments_data, former_elt
            )
            if new_elt != former_elt:
                if former_elt is not None:
                    attachments_elt.children.remove(former_elt)
                if new_elt is not None:
                    attachments_elt.addChild(new_elt)
        return item_elt

    async def set_attachements(
        self,
        client: SatXMPPEntity,
        attachments_data: Dict[str, Any]
    ) -> None:
        """Set or update attachments

        Former <attachments> element will be retrieved and updated. Individual
        attachments replace or update their elements individually, according to the
        "operation" key.

        "operation" key may be "update" or "replace", and defaults to update, it is only
        used in attachments where "update" makes sense (e.g. it's used for "reactions"
        but not for "noticed").

        @param attachments_data: data describing attachments. Various keys (usually stored
            in attachments_data["extra"]) may be used depending on the attachments
            handlers registered. The keys "service", "node" and "id" MUST be set.
            ``attachments_data`` is thought to be compatible with microblog data.

        """
        try:
            service = jid.JID(attachments_data["service"])
            node = attachments_data["node"]
            item = attachments_data["id"]
        except (KeyError, RuntimeError):
            raise ValueError(
                'data must have "service", "node" and "id" set'
            )
        attachment_node = self.get_attachment_node_name(service, node, item)
        try:
            items, __ = await self._p.get_items(
                client, service, attachment_node, item_ids=[client.jid.userhost()]
            )
        except exceptions.NotFound:
            item_elt = None
        else:
            if not items:
                item_elt = None
            else:
                item_elt = items[0]

        item_elt = await self.apply_set_handler(
            client,
            attachments_data,
            item_elt=item_elt,
        )

        try:
            await self._p.send_items(client, service, attachment_node, [item_elt])
        except error.StanzaError as e:
            if e.condition == "item-not-found":
                # the node doesn't exist, we can't publish attachments
                log.warning(
                    f"no attachment node found at {service} on {node!r} for item "
                    f"{item!r}, we can't update attachments."
                )
                raise exceptions.NotFound("No attachment node available")
            else:
                raise e

    async def subscribe(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        node: str,
        item: str,
    ) -> None:
        """Subscribe to attachment node targeting the item

        @param service: service of target item (will also be used for attachment node)
        @param node: node of target item (used to get attachment node's name)
        @param item: name of target item (used to get attachment node's name)
        """
        attachment_node = self.get_attachment_node_name(service, node, item)
        await self._p.subscribe(client, service, attachment_node)


    def set_timestamp(self, attachment_elt: domish.Element, data: dict) -> None:
        """Check if a ``timestamp`` attribute is set, parse it, and fill data

        @param attachments_elt: element where the ``timestamp`` attribute may be set
        @param data: data specific to the attachment (i.e. not the whole microblog data)
            ``timestamp`` field will be set there if timestamp exists and is parsable
        """
        timestamp_raw = attachment_elt.getAttribute("timestamp")
        if timestamp_raw:
            try:
                timestamp = date_utils.date_parse(timestamp_raw)
            except date_utils.ParserError:
                log.warning(f"can't parse timestamp: {timestamp_raw}")
            else:
                data["timestamp"] = timestamp

    def noticed_get(
        self,
        client: SatXMPPEntity,
        attachments_elt: domish.Element,
        data: Dict[str, Any],
    ) -> None:
        try:
            noticed_elt = next(
                attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed")
            )
        except StopIteration:
            pass
        else:
            noticed_data = {
                "noticed": True
            }
            self.set_timestamp(noticed_elt, noticed_data)
            data["noticed"] = noticed_data

    def noticed_set(
        self,
        client: SatXMPPEntity,
        data: Dict[str, Any],
        former_elt: Optional[domish.Element]
    ) -> Optional[domish.Element]:
        """add or remove a <noticed> attachment

        if data["noticed"] is True, element is added, if it's False, it's removed, and
        it's not present or None, the former element is kept.
        """
        noticed = data["extra"].get("noticed")
        if noticed is None:
            return former_elt
        elif noticed:
            return domish.Element(
                (NS_PUBSUB_ATTACHMENTS, "noticed"),
                attribs = {
                    "timestamp": xmpp_date()
                }
            )
        else:
            return None

    def reactions_get(
        self,
        client: SatXMPPEntity,
        attachments_elt: domish.Element,
        data: Dict[str, Any],
    ) -> None:
        try:
            reactions_elt = next(
                attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reactions")
            )
        except StopIteration:
            pass
        else:
            reactions_data = {"reactions": []}
            reactions = reactions_data["reactions"]
            for reaction_elt in reactions_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction"):
                reactions.append(str(reaction_elt))
            self.set_timestamp(reactions_elt, reactions_data)
            data["reactions"] = reactions_data

    def reactions_set(
        self,
        client: SatXMPPEntity,
        data: Dict[str, Any],
        former_elt: Optional[domish.Element]
    ) -> Optional[domish.Element]:
        """update the <reaction> attachment"""
        reactions_data = data["extra"].get("reactions")
        if reactions_data is None:
            return former_elt
        operation_type = reactions_data.get("operation", "update")
        if operation_type == "update":
            former_reactions = {
                str(r) for r in former_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction")
            } if former_elt is not None else set()
            added_reactions = set(reactions_data.get("add") or [])
            removed_reactions = set(reactions_data.get("remove") or [])
            reactions = list((former_reactions | added_reactions) - removed_reactions)
        elif operation_type == "replace":
            reactions = reactions_data.get("reactions") or []
        else:
            raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}")
        if reactions:
            reactions_elt = domish.Element(
                (NS_PUBSUB_ATTACHMENTS, "reactions"),
                attribs = {
                    "timestamp": xmpp_date()
                }
            )
            for reactions_data in reactions:
                reactions_elt.addElement("reaction", content=reactions_data)
            return reactions_elt
        else:
            return None


@implementer(iwokkel.IDisco)
class PubsubAttachments_Handler(xmlstream.XMPPHandler):

    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)]

    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
        return []