Mercurial > libervia-backend
changeset 3864:ac255a0fbd4c
plugin pubsub attachments: partial implementation of pubsub-attachments protoXEP:
This is an implementation of the "Basic Usage" of
https://xmpp.org/extensions/inbox/pubsub-attachments.html.
rel 370
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 20 Jul 2022 17:49:51 +0200 |
parents | c04f5e8a3568 |
children | 59fbb66b2923 |
files | sat/plugins/plugin_pubsub_attachments.py |
diffstat | 1 files changed, 483 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_pubsub_attachments.py Wed Jul 20 17:49:51 2022 +0200 @@ -0,0 +1,483 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Pubsub Attachments +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import List, Tuple, Dict, Any, Callable, Optional + +from twisted.words.protocols.jabber import jid, xmlstream, error +from twisted.words.xish import domish +from twisted.internet import defer +from zope.interface import implementer +from wokkel import pubsub, disco, iwokkel + +from sat.core.constants import Const as C +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.core.core_types import SatXMPPEntity +from sat.core import exceptions +from sat.tools.common import uri, data_format, date_utils +from sat.tools.utils import xmpp_date + + +log = getLogger(__name__) + +IMPORT_NAME = "PUBSUB_ATTACHMENTS" + +PLUGIN_INFO = { + C.PI_NAME: "Pubsub Attachments", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_TYPE: C.PLUG_TYPE_EXP, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0060"], + C.PI_MAIN: "PubsubAttachments", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""), +} +NS_PREFIX = "urn:xmpp:pubsub-attachments:" +NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}0" +NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:0" + + +class PubsubAttachments: + namespace = NS_PUBSUB_ATTACHMENTS + + def __init__(self, host): + log.info(_("Pubsub Attachments plugin initialization")) + host.registerNamespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS) + self.host = host + self._p = host.plugins["XEP-0060"] + self.handlers = {} + host.trigger.add("XEP-0277_send", self.onMBSend) + self.registerAttachmentHandler( + "noticed", NS_PUBSUB_ATTACHMENTS, self.noticedGet, self.noticedSet + ) + self.registerAttachmentHandler( + "reaction", NS_PUBSUB_ATTACHMENTS, self.reactionGet, self.reactionSet + ) + host.bridge.addMethod( + "psAttachmentsGet", + ".plugin", + in_sign="sssasss", + out_sign="(ss)", + method=self._get, + async_=True, + ) + host.bridge.addMethod( + "psAttachmentsSet", + ".plugin", + in_sign="ss", + out_sign="", + method=self._set, + async_=True, + ) + + def getHandler(self, client): + return PubsubAttachments_Handler() + + def registerAttachmentHandler( + self, + name: str, + namespace: str, + get_cb: Callable[ + [SatXMPPEntity, domish.Element, Dict[str, Any]], + None], + set_cb: Callable[ + [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]], + Optional[domish.Element]], + ) -> None: + """Register callbacks to handle an attachment + + @param name: name of the element + @param namespace: namespace of the element + (name, namespace) couple must be unique + @param get: method to call when attachments are retrieved + it will be called with (client, element, data) where element is the + <attachments> element to parse, and data must be updated in place with + parsed data + @param set: method to call when the attachment need to be set or udpated + it will be called with (client, data, former_elt of None if there was no + former element). When suitable, ``operation`` should be used to check if we + request an ``update`` or a ``replace``. + """ + key = (name, namespace) + if key in self.handlers: + raise exceptions.ConflictError( + f"({name}, {namespace}) attachment handlers are already registered" + ) + self.handlers[(name, namespace)] = { + "get": get_cb, + "set": set_cb + } + + def getAttachmentNodeName(self, service: jid.JID, node: str, item: str) -> str: + """Generate name to use for attachment node""" + target_item_uri = uri.buildXMPPUri( + "pubsub", + path=service.userhost(), + node=node, + item=item + ) + return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}" + + def isAttachmentNode(self, node: str) -> bool: + """Return True if node name is an attachment node""" + return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/") + + def attachmentNode2Item(self, node: str) -> Tuple[jid.JID, str, str]: + """Retrieve service, node and item from attachement node's name""" + if not self.isAttachmentNode(node): + raise ValueError("this is not an attachment node!") + prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/") + item_uri = node[prefix_len:] + parsed_uri = uri.parseXMPPUri(item_uri) + if parsed_uri["type"] != "pubsub": + raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}") + try: + service = jid.JID(parsed_uri["path"]) + except RuntimeError: + raise ValueError(f"invalid service in pubsub URI: {item_uri}") + node = parsed_uri["node"] + item = parsed_uri["item"] + return (service, node, item) + + async def onMBSend( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + item: domish.Element, + data: dict + ) -> bool: + """trigger to create attachment node on each publication""" + node_config = await self._p.getConfiguration(client, service, node) + attachment_node = self.getAttachmentNodeName(service, node, item["id"]) + # we use the same options as target node + try: + await self._p.createIfNewNode( + client, service, attachment_node, options=dict(node_config) + ) + except Exception as e: + log.warning(f"Can't create attachment node {attachment_node}: {e}]") + return True + + def items2attachmentData( + self, + client: SatXMPPEntity, + items: List[domish.Element] + ) -> List[Dict[str, Any]]: + """Convert items from attachment node to attachment data""" + list_data = [] + for item in items: + try: + attachments_elt = next( + item.elements(NS_PUBSUB_ATTACHMENTS, "attachments") + ) + except StopIteration: + log.warning( + "item is missing <attachments> elements, ignoring it: {item.toXml()}" + ) + continue + item_id = item["id"] + publisher_s = item.getAttribute("publisher") + # publisher is not filled by all pubsub service, so we can't count on it + if publisher_s: + publisher = jid.JID(publisher_s) + if publisher.userhost() != item_id: + log.warning( + f"publisher {publisher.userhost()!r} doesn't correspond to item " + f"id {item['id']!r}, ignoring. This may be a hack attempt.\n" + f"{item.toXml()}" + ) + continue + try: + jid.JID(item_id) + except RuntimeError: + log.warning( + "item ID is not a JID, this is not compliant and is ignored: " + f"{item.toXml}" + ) + continue + data = { + "from": item_id + } + for handler in self.handlers.values(): + handler["get"](client, attachments_elt, data) + if len(data) > 1: + list_data.append(data) + return list_data + + def _get( + self, + service_s: str, + node: str, + item: str, + senders_s: List[str], + extra_s: str, + profile_key: str + ) -> defer.Deferred: + client = self.host.getClient(profile_key) + extra = data_format.deserialise(extra_s) + senders = [jid.JID(s) for s in senders_s] + d = defer.ensureDeferred( + self.getAttachments(client, jid.JID(service_s), node, item, senders) + ) + d.addCallback( + lambda ret: + (data_format.serialise(ret[0]), + data_format.serialise(ret[1])) + ) + return d + + async def getAttachments( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + item: str, + senders: Optional[List[jid.JID]], + extra: Optional[dict] = None + ) -> Tuple[List[Dict[str, Any]], dict]: + if extra is None: + extra = {} + attachment_node = self.getAttachmentNodeName(service, node, item) + item_ids = [e.userhost() for e in senders] if senders else None + items, metadata = await self._p.getItems( + client, service, attachment_node, item_ids=item_ids, extra=extra + ) + list_data = self.items2attachmentData(client, items) + + return list_data, metadata + + def _set( + self, + attachments_s: str, + profile_key: str + ) -> None: + client = self.host.getClient(profile_key) + attachments = data_format.deserialise(attachments_s) or {} + return defer.ensureDeferred(self.setAttachments( client, attachments)) + + async def setAttachments( + self, + client: SatXMPPEntity, + data: Dict[str, Any] + ) -> None: + """Set or update attachments + + Former <attachments> element will be retrieved and updated. Individual + attachments replace or update their elements individually, according to the + "operation" key. + + "operation" key may be "update" or "replace", and defaults to update, it is only + used in attachments where "update" makes sense (e.g. it's used for "reactions" + but not for "noticed"). + + @param data: microblog data data. Various keys (usually stored in + data["extra"]) may be used depending on the attachments handlers + registered. The keys "service", "node" and "id" MUST be set. + """ + data.setdefault("extra", {}) + try: + service = jid.JID(data["service"]) + node = data["node"] + item = data["id"] + except (KeyError, RuntimeError): + raise ValueError( + 'data must have "service", "node" and "id" set' + ) + attachment_node = self.getAttachmentNodeName(service, node, item) + items, __ = await self._p.getItems( + client, service, attachment_node, item_ids=[client.jid.userhost()] + ) + if not items: + # the item doesn't exist, we create a new one + item_elt = pubsub.Item(client.jid.userhost()) + item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) + else: + item_elt = items[0] + + try: + attachments_elt = next( + item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments") + ) + except StopIteration: + log.warning( + f"no <attachments> element found, creating a new one: {item_elt.toXml()}" + ) + attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) + + for (name, namespace), handler in self.handlers.items(): + try: + former_elt = next(attachments_elt.elements(namespace, name)) + except StopIteration: + former_elt = None + new_elt = handler["set"](client, data, former_elt) + if new_elt != former_elt: + if former_elt is not None: + attachments_elt.children.remove(former_elt) + if new_elt is not None: + attachments_elt.addChild(new_elt) + try: + await self._p.sendItems(client, service, attachment_node, [item_elt]) + except error.StanzaError as e: + if e.condition == "item-not-found": + # the node doesn't exist, we can't publish attachments + log.warning( + f"no attachment node found at {service} on {node!r} for item " + f"{item!r}, we can't update attachments." + ) + raise exceptions.NotFound("No attachment node available") + else: + raise e + + async def subscribe( + self, + client: SatXMPPEntity, + service: jid.JID, + node: str, + item: str, + ) -> None: + """Subscribe to attachment node targeting the item + + @param service: service of target item (will also be used for attachment node) + @param node: node of target item (used to get attachment node's name) + @param item: name of target item (used to get attachment node's name) + """ + attachment_node = self.getAttachmentNodeName(service, node, item) + await self._p.subscribe(client, service, attachment_node) + + + def setTimestamp(self, attachment_elt: domish.Element, data: dict) -> None: + """Check if a ``timestamp`` attribute is set, parse it, and fill data + + @param attachments_elt: element where the ``timestamp`` attribute may be set + @param data: data specific to the attachment (i.e. not the whole microblog data) + ``timestamp`` field will be set there if timestamp exists and is parsable + """ + timestamp_raw = attachment_elt.getAttribute("timestamp") + if timestamp_raw: + try: + timestamp = date_utils.date_parse(timestamp_raw) + except date_utils.ParserError: + log.warning(f"can't parse timestamp: {timestamp_raw}") + else: + data["timestamp"] = timestamp + + def noticedGet( + self, + client: SatXMPPEntity, + attachments_elt: domish.Element, + data: Dict[str, Any], + ) -> None: + try: + noticed_elt = next( + attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed") + ) + except StopIteration: + pass + else: + noticed_data = { + "noticed": True + } + self.setTimestamp(noticed_elt, noticed_data) + data["noticed"] = noticed_data + + def noticedSet( + self, + client: SatXMPPEntity, + data: Dict[str, Any], + former_elt: Optional[domish.Element] + ) -> Optional[domish.Element]: + """add or remove a <noticed> attachment + + if data["noticed"] is True, element is added, if it's False, it's removed, and + it's not present or None, the former element is kept. + """ + noticed = data["extra"].get("noticed") + if noticed is None: + return former_elt + elif noticed: + return domish.Element( + (NS_PUBSUB_ATTACHMENTS, "noticed"), + attribs = { + "timestamp": xmpp_date() + } + ) + else: + return None + + def reactionGet( + self, + client: SatXMPPEntity, + attachments_elt: domish.Element, + data: Dict[str, Any], + ) -> None: + try: + reaction_elt = next( + attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction") + ) + except StopIteration: + pass + else: + reaction_data = { + "reactions": str(reaction_elt) + } + self.setTimestamp(reaction_elt, reaction_data) + data["reaction"] = reaction_data + + def reactionSet( + self, + client: SatXMPPEntity, + data: Dict[str, Any], + former_elt: Optional[domish.Element] + ) -> Optional[domish.Element]: + """update the <reaction> attachment""" + reaction = data["extra"].get("reaction") + if reaction is None: + return former_elt + operation_type = reaction.get("operation", "update") + if operation_type == "update": + reactions = "".join( + set(str(former_elt or "")) + | set(reaction.get("reactions") or "") + ) + elif operation_type == "replace": + reactions = reaction.get("reactions", "") + else: + raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}") + if reactions: + reaction_elt = domish.Element( + (NS_PUBSUB_ATTACHMENTS, "reaction"), + attribs = { + "timestamp": xmpp_date() + } + ) + reaction_elt.addContent(reactions) + return reaction_elt + else: + return None + + +@implementer(iwokkel.IDisco) +class PubsubAttachments_Handler(xmlstream.XMPPHandler): + + def getDiscoInfo(self, requestor, service, nodeIdentifier=""): + return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)] + + def getDiscoItems(self, requestor, service, nodeIdentifier=""): + return []