Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0470.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0470.py@524856bd7b19 |
children | 2109d864a3e7 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0470.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,591 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Pubsub Attachments +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import List, Tuple, Dict, Any, Callable, Optional + +from twisted.words.protocols.jabber import jid, xmlstream, error +from twisted.words.xish import domish +from twisted.internet import defer +from zope.interface import implementer +from wokkel import pubsub, disco, iwokkel + +from libervia.backend.core.constants import Const as C +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core import exceptions +from libervia.backend.tools.common import uri, data_format, date_utils +from libervia.backend.tools.utils import as_deferred, xmpp_date + + +log = getLogger(__name__) + +IMPORT_NAME = "XEP-0470" + +PLUGIN_INFO = { + C.PI_NAME: "Pubsub Attachments", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_TYPE: C.PLUG_TYPE_XEP, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0060"], + C.PI_MAIN: "PubsubAttachments", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""), +} +NS_PREFIX = "urn:xmpp:pubsub-attachments:" +NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}1" +NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:1" + + +class PubsubAttachments: + namespace = NS_PUBSUB_ATTACHMENTS + + def __init__(self, host): + log.info(_("XEP-0470 (Pubsub Attachments) plugin initialization")) + host.register_namespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS) + self.host = host + self._p = host.plugins["XEP-0060"] + self.handlers: Dict[Tuple[str, str], dict[str, Any]] = {} + host.trigger.add("XEP-0277_send", self.on_mb_send) + self.register_attachment_handler( + "noticed", NS_PUBSUB_ATTACHMENTS, self.noticed_get, self.noticed_set + ) + self.register_attachment_handler( + "reactions", NS_PUBSUB_ATTACHMENTS, self.reactions_get, self.reactions_set + ) + host.bridge.add_method( + "ps_attachments_get", + ".plugin", + in_sign="sssasss", + out_sign="(ss)", + method=self._get, + async_=True, + ) + host.bridge.add_method( + "ps_attachments_set", + ".plugin", + in_sign="ss", + out_sign="", + method=self._set, + async_=True, + ) + + def get_handler(self, client): + return PubsubAttachments_Handler() + + def register_attachment_handler( + self, + name: str, + namespace: str, + get_cb: Callable[ + [SatXMPPEntity, domish.Element, Dict[str, Any]], + None], + set_cb: Callable[ + [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]], + Optional[domish.Element]], + ) -> None: + """Register callbacks to handle an attachment + + @param name: name of the element + @param namespace: namespace of the element + (name, namespace) couple must be unique + @param get: method to call when attachments are retrieved + it will be called with (client, element, data) where element is the + <attachments> element to parse, and data must be updated in place with + parsed data + @param set: method to call when the attachment need to be set or udpated + it will be called with (client, data, former_elt of None if there was no + former element). When suitable, ``operation`` should be used to check if we + request an ``update`` or a ``replace``. + The callback can be either a blocking method, a Deferred or a coroutine + """ + key = (name, namespace) + if key in self.handlers: + raise exceptions.ConflictError( + f"({name}, {namespace}) attachment handlers are already registered" + ) + self.handlers[(name, namespace)] = { + "get": get_cb, + "set": set_cb + } + + def get_attachment_node_name(self, service: jid.JID, node: str, item: str) -> str: + """Generate name to use for attachment node""" + target_item_uri = uri.build_xmpp_uri( + "pubsub", + path=service.userhost(), + node=node, + item=item + ) + return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}" + + def is_attachment_node(self, node: str) -> bool: + """Return True if node name is an attachment node""" + return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/") + + def attachment_node_2_item(self, node: str) -> Tuple[jid.JID, str, str]: + """Retrieve service, node and item from attachement node's name""" + if not self.is_attachment_node(node): + raise ValueError("this is not an attachment node!") + prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/") + item_uri = node[prefix_len:] + parsed_uri = uri.parse_xmpp_uri(item_uri) + if parsed_uri["type"] != "pubsub": + raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}") + try: + service = jid.JID(parsed_uri["path"]) + except RuntimeError: + raise ValueError(f"invalid service in pubsub URI: {item_uri}") + node = parsed_uri["node"] + item = parsed_uri["item"] + return (service, node, item) + + async def on_mb_send( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + item: domish.Element, + data: dict + ) -> bool: + """trigger to create attachment node on each publication""" + await self.create_attachments_node( + client, service, node, item["id"], autocreate=True + ) + return True + + async def create_attachments_node( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + item_id: str, + autocreate: bool = False + ): + """Create node for attachements if necessary + + @param service: service of target node + @param node: node where target item is published + @param item_id: ID of target item + @param autocrate: if True, target node is create if it doesn't exist + """ + try: + node_config = await self._p.getConfiguration(client, service, node) + except error.StanzaError as e: + if e.condition == "item-not-found" and autocreate: + # we auto-create the missing node + await self._p.createNode( + client, service, node + ) + node_config = await self._p.getConfiguration(client, service, node) + elif e.condition == "forbidden": + node_config = self._p.make_configuration_form({}) + else: + raise e + try: + # FIXME: check if this is the best publish_model option + node_config.fields["pubsub#publish_model"].value = "open" + except KeyError: + log.warning("pubsub#publish_model field is missing") + attachment_node = self.get_attachment_node_name(service, node, item_id) + # we use the same options as target node + try: + await self._p.create_if_new_node( + client, service, attachment_node, options=dict(node_config) + ) + except Exception as e: + log.warning(f"Can't create attachment node {attachment_node}: {e}") + + def items_2_attachment_data( + self, + client: SatXMPPEntity, + items: List[domish.Element] + ) -> List[Dict[str, Any]]: + """Convert items from attachment node to attachment data""" + list_data = [] + for item in items: + try: + attachments_elt = next( + item.elements(NS_PUBSUB_ATTACHMENTS, "attachments") + ) + except StopIteration: + log.warning( + "item is missing <attachments> elements, ignoring it: {item.toXml()}" + ) + continue + item_id = item["id"] + publisher_s = item.getAttribute("publisher") + # publisher is not filled by all pubsub service, so we can't count on it + if publisher_s: + publisher = jid.JID(publisher_s) + if publisher.userhost() != item_id: + log.warning( + f"publisher {publisher.userhost()!r} doesn't correspond to item " + f"id {item['id']!r}, ignoring. This may be a hack attempt.\n" + f"{item.toXml()}" + ) + continue + try: + jid.JID(item_id) + except RuntimeError: + log.warning( + "item ID is not a JID, this is not compliant and is ignored: " + f"{item.toXml}" + ) + continue + data = { + "from": item_id + } + for handler in self.handlers.values(): + handler["get"](client, attachments_elt, data) + if len(data) > 1: + list_data.append(data) + return list_data + + def _get( + self, + service_s: str, + node: str, + item: str, + senders_s: List[str], + extra_s: str, + profile_key: str + ) -> defer.Deferred: + client = self.host.get_client(profile_key) + extra = data_format.deserialise(extra_s) + senders = [jid.JID(s) for s in senders_s] + d = defer.ensureDeferred( + self.get_attachments(client, jid.JID(service_s), node, item, senders) + ) + d.addCallback( + lambda ret: + (data_format.serialise(ret[0]), + data_format.serialise(ret[1])) + ) + return d + + async def get_attachments( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + item: str, + senders: Optional[List[jid.JID]], + extra: Optional[dict] = None + ) -> Tuple[List[Dict[str, Any]], dict]: + """Retrieve data attached to a pubsub item + + @param service: pubsub service where the node is + @param node: pubsub node containing the item + @param item: ID of the item for which attachments will be retrieved + @param senders: bare JIDs of entities that are checked. Attachments from those + entities will be retrieved. + If None, attachments from all entities will be retrieved + @param extra: extra data, will be used as ``extra`` argument when doing + ``get_items`` call. + @return: A tuple with: + - the list of attachments data, one item per found sender. The attachments + data are dict containing attachment, no ``extra`` field is used here + (contrarily to attachments data used with ``set_attachements``). + - metadata returned by the call to ``get_items`` + """ + if extra is None: + extra = {} + attachment_node = self.get_attachment_node_name(service, node, item) + item_ids = [e.userhost() for e in senders] if senders else None + items, metadata = await self._p.get_items( + client, service, attachment_node, item_ids=item_ids, extra=extra + ) + list_data = self.items_2_attachment_data(client, items) + + return list_data, metadata + + def _set( + self, + attachments_s: str, + profile_key: str + ) -> None: + client = self.host.get_client(profile_key) + attachments = data_format.deserialise(attachments_s) or {} + return defer.ensureDeferred(self.set_attachements(client, attachments)) + + async def apply_set_handler( + self, + client: SatXMPPEntity, + attachments_data: dict, + item_elt: Optional[domish.Element], + handlers: Optional[List[Tuple[str, str]]] = None, + from_jid: Optional[jid.JID] = None, + ) -> domish.Element: + """Apply all ``set`` callbacks to an attachments item + + @param attachments_data: data describing the attachments + ``extra`` key will be used, and created if not found + @param from_jid: jid of the author of the attachments + ``client.jid.userhostJID()`` will be used if not specified + @param item_elt: item containing an <attachments> element + will be modified in place + if None, a new element will be created + @param handlers: list of (name, namespace) of handlers to use. + if None, all registered handlers will be used. + @return: updated item_elt if given, otherwise a new item_elt + """ + attachments_data.setdefault("extra", {}) + if item_elt is None: + item_id = client.jid.userhost() if from_jid is None else from_jid.userhost() + item_elt = pubsub.Item(item_id) + item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) + + try: + attachments_elt = next( + item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments") + ) + except StopIteration: + log.warning( + f"no <attachments> element found, creating a new one: {item_elt.toXml()}" + ) + attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) + + if handlers is None: + handlers = list(self.handlers.keys()) + + for name, namespace in handlers: + try: + handler = self.handlers[(name, namespace)] + except KeyError: + log.error( + f"unregistered handler ({name!r}, {namespace!r}) is requested, " + "ignoring" + ) + continue + try: + former_elt = next(attachments_elt.elements(namespace, name)) + except StopIteration: + former_elt = None + new_elt = await as_deferred( + handler["set"], client, attachments_data, former_elt + ) + if new_elt != former_elt: + if former_elt is not None: + attachments_elt.children.remove(former_elt) + if new_elt is not None: + attachments_elt.addChild(new_elt) + return item_elt + + async def set_attachements( + self, + client: SatXMPPEntity, + attachments_data: Dict[str, Any] + ) -> None: + """Set or update attachments + + Former <attachments> element will be retrieved and updated. Individual + attachments replace or update their elements individually, according to the + "operation" key. + + "operation" key may be "update" or "replace", and defaults to update, it is only + used in attachments where "update" makes sense (e.g. it's used for "reactions" + but not for "noticed"). + + @param attachments_data: data describing attachments. Various keys (usually stored + in attachments_data["extra"]) may be used depending on the attachments + handlers registered. The keys "service", "node" and "id" MUST be set. + ``attachments_data`` is thought to be compatible with microblog data. + + """ + try: + service = jid.JID(attachments_data["service"]) + node = attachments_data["node"] + item = attachments_data["id"] + except (KeyError, RuntimeError): + raise ValueError( + 'data must have "service", "node" and "id" set' + ) + attachment_node = self.get_attachment_node_name(service, node, item) + try: + items, __ = await self._p.get_items( + client, service, attachment_node, item_ids=[client.jid.userhost()] + ) + except exceptions.NotFound: + item_elt = None + else: + if not items: + item_elt = None + else: + item_elt = items[0] + + item_elt = await self.apply_set_handler( + client, + attachments_data, + item_elt=item_elt, + ) + + try: + await self._p.send_items(client, service, attachment_node, [item_elt]) + except error.StanzaError as e: + if e.condition == "item-not-found": + # the node doesn't exist, we can't publish attachments + log.warning( + f"no attachment node found at {service} on {node!r} for item " + f"{item!r}, we can't update attachments." + ) + raise exceptions.NotFound("No attachment node available") + else: + raise e + + async def subscribe( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + item: str, + ) -> None: + """Subscribe to attachment node targeting the item + + @param service: service of target item (will also be used for attachment node) + @param node: node of target item (used to get attachment node's name) + @param item: name of target item (used to get attachment node's name) + """ + attachment_node = self.get_attachment_node_name(service, node, item) + await self._p.subscribe(client, service, attachment_node) + + + def set_timestamp(self, attachment_elt: domish.Element, data: dict) -> None: + """Check if a ``timestamp`` attribute is set, parse it, and fill data + + @param attachments_elt: element where the ``timestamp`` attribute may be set + @param data: data specific to the attachment (i.e. not the whole microblog data) + ``timestamp`` field will be set there if timestamp exists and is parsable + """ + timestamp_raw = attachment_elt.getAttribute("timestamp") + if timestamp_raw: + try: + timestamp = date_utils.date_parse(timestamp_raw) + except date_utils.ParserError: + log.warning(f"can't parse timestamp: {timestamp_raw}") + else: + data["timestamp"] = timestamp + + def noticed_get( + self, + client: SatXMPPEntity, + attachments_elt: domish.Element, + data: Dict[str, Any], + ) -> None: + try: + noticed_elt = next( + attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed") + ) + except StopIteration: + pass + else: + noticed_data = { + "noticed": True + } + self.set_timestamp(noticed_elt, noticed_data) + data["noticed"] = noticed_data + + def noticed_set( + self, + client: SatXMPPEntity, + data: Dict[str, Any], + former_elt: Optional[domish.Element] + ) -> Optional[domish.Element]: + """add or remove a <noticed> attachment + + if data["noticed"] is True, element is added, if it's False, it's removed, and + it's not present or None, the former element is kept. + """ + noticed = data["extra"].get("noticed") + if noticed is None: + return former_elt + elif noticed: + return domish.Element( + (NS_PUBSUB_ATTACHMENTS, "noticed"), + attribs = { + "timestamp": xmpp_date() + } + ) + else: + return None + + def reactions_get( + self, + client: SatXMPPEntity, + attachments_elt: domish.Element, + data: Dict[str, Any], + ) -> None: + try: + reactions_elt = next( + attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reactions") + ) + except StopIteration: + pass + else: + reactions_data = {"reactions": []} + reactions = reactions_data["reactions"] + for reaction_elt in reactions_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction"): + reactions.append(str(reaction_elt)) + self.set_timestamp(reactions_elt, reactions_data) + data["reactions"] = reactions_data + + def reactions_set( + self, + client: SatXMPPEntity, + data: Dict[str, Any], + former_elt: Optional[domish.Element] + ) -> Optional[domish.Element]: + """update the <reaction> attachment""" + reactions_data = data["extra"].get("reactions") + if reactions_data is None: + return former_elt + operation_type = reactions_data.get("operation", "update") + if operation_type == "update": + former_reactions = { + str(r) for r in former_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction") + } if former_elt is not None else set() + added_reactions = set(reactions_data.get("add") or []) + removed_reactions = set(reactions_data.get("remove") or []) + reactions = list((former_reactions | added_reactions) - removed_reactions) + elif operation_type == "replace": + reactions = reactions_data.get("reactions") or [] + else: + raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}") + if reactions: + reactions_elt = domish.Element( + (NS_PUBSUB_ATTACHMENTS, "reactions"), + attribs = { + "timestamp": xmpp_date() + } + ) + for reactions_data in reactions: + reactions_elt.addElement("reaction", content=reactions_data) + return reactions_elt + else: + return None + + +@implementer(iwokkel.IDisco) +class PubsubAttachments_Handler(xmlstream.XMPPHandler): + + def getDiscoInfo(self, requestor, service, nodeIdentifier=""): + return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)] + + def getDiscoItems(self, requestor, service, nodeIdentifier=""): + return []