Mercurial > libervia-backend
view sat/plugins/plugin_pubsub_attachments.py @ 3875:a0666f17f300
plugin merge-requests: fix `await` use on blocking method
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 23 Aug 2022 13:00:07 +0200 |
parents | c0bcbcf5b4b7 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Pubsub Attachments # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import List, Tuple, Dict, Any, Callable, Optional from twisted.words.protocols.jabber import jid, xmlstream, error from twisted.words.xish import domish from twisted.internet import defer from zope.interface import implementer from wokkel import pubsub, disco, iwokkel from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger from sat.core.core_types import SatXMPPEntity from sat.core import exceptions from sat.tools.common import uri, data_format, date_utils from sat.tools.utils import xmpp_date log = getLogger(__name__) IMPORT_NAME = "PUBSUB_ATTACHMENTS" PLUGIN_INFO = { C.PI_NAME: "Pubsub Attachments", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_TYPE: C.PLUG_TYPE_EXP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0060"], C.PI_MAIN: "PubsubAttachments", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""), } NS_PREFIX = "urn:xmpp:pubsub-attachments:" NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}0" NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:0" class PubsubAttachments: namespace = NS_PUBSUB_ATTACHMENTS def __init__(self, host): log.info(_("Pubsub Attachments plugin initialization")) host.registerNamespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS) self.host = host self._p = host.plugins["XEP-0060"] self.handlers: Dict[Tuple[str, str], dict[str, Any]] = {} host.trigger.add("XEP-0277_send", self.onMBSend) self.registerAttachmentHandler( "noticed", NS_PUBSUB_ATTACHMENTS, self.noticedGet, self.noticedSet ) self.registerAttachmentHandler( "reaction", NS_PUBSUB_ATTACHMENTS, self.reactionGet, self.reactionSet ) host.bridge.addMethod( "psAttachmentsGet", ".plugin", in_sign="sssasss", out_sign="(ss)", method=self._get, async_=True, ) host.bridge.addMethod( "psAttachmentsSet", ".plugin", in_sign="ss", out_sign="", method=self._set, async_=True, ) def getHandler(self, client): return PubsubAttachments_Handler() def registerAttachmentHandler( self, name: str, namespace: str, get_cb: Callable[ [SatXMPPEntity, domish.Element, Dict[str, Any]], None], set_cb: Callable[ [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]], Optional[domish.Element]], ) -> None: """Register callbacks to handle an attachment @param name: name of the element @param namespace: namespace of the element (name, namespace) couple must be unique @param get: method to call when attachments are retrieved it will be called with (client, element, data) where element is the <attachments> element to parse, and data must be updated in place with parsed data @param set: method to call when the attachment need to be set or udpated it will be called with (client, data, former_elt of None if there was no former element). When suitable, ``operation`` should be used to check if we request an ``update`` or a ``replace``. """ key = (name, namespace) if key in self.handlers: raise exceptions.ConflictError( f"({name}, {namespace}) attachment handlers are already registered" ) self.handlers[(name, namespace)] = { "get": get_cb, "set": set_cb } def getAttachmentNodeName(self, service: jid.JID, node: str, item: str) -> str: """Generate name to use for attachment node""" target_item_uri = uri.buildXMPPUri( "pubsub", path=service.userhost(), node=node, item=item ) return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}" def isAttachmentNode(self, node: str) -> bool: """Return True if node name is an attachment node""" return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/") def attachmentNode2Item(self, node: str) -> Tuple[jid.JID, str, str]: """Retrieve service, node and item from attachement node's name""" if not self.isAttachmentNode(node): raise ValueError("this is not an attachment node!") prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/") item_uri = node[prefix_len:] parsed_uri = uri.parseXMPPUri(item_uri) if parsed_uri["type"] != "pubsub": raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}") try: service = jid.JID(parsed_uri["path"]) except RuntimeError: raise ValueError(f"invalid service in pubsub URI: {item_uri}") node = parsed_uri["node"] item = parsed_uri["item"] return (service, node, item) async def onMBSend( self, client: SatXMPPEntity, service: jid.JID, node: str, item: domish.Element, data: dict ) -> bool: """trigger to create attachment node on each publication""" node_config = await self._p.getConfiguration(client, service, node) attachment_node = self.getAttachmentNodeName(service, node, item["id"]) # we use the same options as target node try: await self._p.createIfNewNode( client, service, attachment_node, options=dict(node_config) ) except Exception as e: log.warning(f"Can't create attachment node {attachment_node}: {e}]") return True def items2attachmentData( self, client: SatXMPPEntity, items: List[domish.Element] ) -> List[Dict[str, Any]]: """Convert items from attachment node to attachment data""" list_data = [] for item in items: try: attachments_elt = next( item.elements(NS_PUBSUB_ATTACHMENTS, "attachments") ) except StopIteration: log.warning( "item is missing <attachments> elements, ignoring it: {item.toXml()}" ) continue item_id = item["id"] publisher_s = item.getAttribute("publisher") # publisher is not filled by all pubsub service, so we can't count on it if publisher_s: publisher = jid.JID(publisher_s) if publisher.userhost() != item_id: log.warning( f"publisher {publisher.userhost()!r} doesn't correspond to item " f"id {item['id']!r}, ignoring. This may be a hack attempt.\n" f"{item.toXml()}" ) continue try: jid.JID(item_id) except RuntimeError: log.warning( "item ID is not a JID, this is not compliant and is ignored: " f"{item.toXml}" ) continue data = { "from": item_id } for handler in self.handlers.values(): handler["get"](client, attachments_elt, data) if len(data) > 1: list_data.append(data) return list_data def _get( self, service_s: str, node: str, item: str, senders_s: List[str], extra_s: str, profile_key: str ) -> defer.Deferred: client = self.host.getClient(profile_key) extra = data_format.deserialise(extra_s) senders = [jid.JID(s) for s in senders_s] d = defer.ensureDeferred( self.getAttachments(client, jid.JID(service_s), node, item, senders) ) d.addCallback( lambda ret: (data_format.serialise(ret[0]), data_format.serialise(ret[1])) ) return d async def getAttachments( self, client: SatXMPPEntity, service: jid.JID, node: str, item: str, senders: Optional[List[jid.JID]], extra: Optional[dict] = None ) -> Tuple[List[Dict[str, Any]], dict]: if extra is None: extra = {} attachment_node = self.getAttachmentNodeName(service, node, item) item_ids = [e.userhost() for e in senders] if senders else None items, metadata = await self._p.getItems( client, service, attachment_node, item_ids=item_ids, extra=extra ) list_data = self.items2attachmentData(client, items) return list_data, metadata def _set( self, attachments_s: str, profile_key: str ) -> None: client = self.host.getClient(profile_key) attachments = data_format.deserialise(attachments_s) or {} return defer.ensureDeferred(self.setAttachments(client, attachments)) def applySetHandler( self, client: SatXMPPEntity, attachments_data: dict, item_elt: Optional[domish.Element], handlers: Optional[List[Tuple[str, str]]] = None, from_jid: Optional[jid.JID] = None, ) -> domish.Element: """Apply all ``set`` callbacks to an attachments item @param attachments_data: data describing the attachments ``extra`` key will be used, and created if not found @param from_jid: jid of the author of the attachments ``client.jid.userhostJID()`` will be used if not specified @param item_elt: item containing an <attachments> element will be modified in place if None, a new element will be created @param handlers: list of (name, namespace) of handlers to use. if None, all registered handlers will be used. @return: updated item_elt if given, otherwise a new item_elt """ attachments_data.setdefault("extra", {}) if item_elt is None: item_id = client.jid.userhost() if from_jid is None else from_jid.userhost() item_elt = pubsub.Item(item_id) item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) try: attachments_elt = next( item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments") ) except StopIteration: log.warning( f"no <attachments> element found, creating a new one: {item_elt.toXml()}" ) attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) if handlers is None: handlers = list(self.handlers.keys()) for name, namespace in handlers: try: handler = self.handlers[(name, namespace)] except KeyError: log.error( f"unregistered handler ({name!r}, {namespace!r}) is requested, " "ignoring" ) continue try: former_elt = next(attachments_elt.elements(namespace, name)) except StopIteration: former_elt = None new_elt = handler["set"](client, attachments_data, former_elt) if new_elt != former_elt: if former_elt is not None: attachments_elt.children.remove(former_elt) if new_elt is not None: attachments_elt.addChild(new_elt) return item_elt async def setAttachments( self, client: SatXMPPEntity, attachments_data: Dict[str, Any] ) -> None: """Set or update attachments Former <attachments> element will be retrieved and updated. Individual attachments replace or update their elements individually, according to the "operation" key. "operation" key may be "update" or "replace", and defaults to update, it is only used in attachments where "update" makes sense (e.g. it's used for "reactions" but not for "noticed"). @param attachments_data: data describing attachments. Various keys (usually stored in attachments_data["extra"]) may be used depending on the attachments handlers registered. The keys "service", "node" and "id" MUST be set. ``attachments_data`` is thought to be compatible with microblog data. """ try: service = jid.JID(attachments_data["service"]) node = attachments_data["node"] item = attachments_data["id"] except (KeyError, RuntimeError): raise ValueError( 'data must have "service", "node" and "id" set' ) attachment_node = self.getAttachmentNodeName(service, node, item) try: items, __ = await self._p.getItems( client, service, attachment_node, item_ids=[client.jid.userhost()] ) except exceptions.NotFound: item_elt = None else: if not items: item_elt = None else: item_elt = items[0] item_elt = self.applySetHandler( client, attachments_data, item_elt=item_elt, ) try: await self._p.sendItems(client, service, attachment_node, [item_elt]) except error.StanzaError as e: if e.condition == "item-not-found": # the node doesn't exist, we can't publish attachments log.warning( f"no attachment node found at {service} on {node!r} for item " f"{item!r}, we can't update attachments." ) raise exceptions.NotFound("No attachment node available") else: raise e async def subscribe( self, client: SatXMPPEntity, service: jid.JID, node: str, item: str, ) -> None: """Subscribe to attachment node targeting the item @param service: service of target item (will also be used for attachment node) @param node: node of target item (used to get attachment node's name) @param item: name of target item (used to get attachment node's name) """ attachment_node = self.getAttachmentNodeName(service, node, item) await self._p.subscribe(client, service, attachment_node) def setTimestamp(self, attachment_elt: domish.Element, data: dict) -> None: """Check if a ``timestamp`` attribute is set, parse it, and fill data @param attachments_elt: element where the ``timestamp`` attribute may be set @param data: data specific to the attachment (i.e. not the whole microblog data) ``timestamp`` field will be set there if timestamp exists and is parsable """ timestamp_raw = attachment_elt.getAttribute("timestamp") if timestamp_raw: try: timestamp = date_utils.date_parse(timestamp_raw) except date_utils.ParserError: log.warning(f"can't parse timestamp: {timestamp_raw}") else: data["timestamp"] = timestamp def noticedGet( self, client: SatXMPPEntity, attachments_elt: domish.Element, data: Dict[str, Any], ) -> None: try: noticed_elt = next( attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed") ) except StopIteration: pass else: noticed_data = { "noticed": True } self.setTimestamp(noticed_elt, noticed_data) data["noticed"] = noticed_data def noticedSet( self, client: SatXMPPEntity, data: Dict[str, Any], former_elt: Optional[domish.Element] ) -> Optional[domish.Element]: """add or remove a <noticed> attachment if data["noticed"] is True, element is added, if it's False, it's removed, and it's not present or None, the former element is kept. """ noticed = data["extra"].get("noticed") if noticed is None: return former_elt elif noticed: return domish.Element( (NS_PUBSUB_ATTACHMENTS, "noticed"), attribs = { "timestamp": xmpp_date() } ) else: return None def reactionGet( self, client: SatXMPPEntity, attachments_elt: domish.Element, data: Dict[str, Any], ) -> None: try: reaction_elt = next( attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction") ) except StopIteration: pass else: reaction_data = { "reactions": str(reaction_elt) } self.setTimestamp(reaction_elt, reaction_data) data["reaction"] = reaction_data def reactionSet( self, client: SatXMPPEntity, data: Dict[str, Any], former_elt: Optional[domish.Element] ) -> Optional[domish.Element]: """update the <reaction> attachment""" reaction = data["extra"].get("reaction") if reaction is None: return former_elt operation_type = reaction.get("operation", "update") if operation_type == "update": reactions = "".join( set(str(former_elt or "")) | set(reaction.get("reactions") or "") ) elif operation_type == "replace": reactions = reaction.get("reactions", "") else: raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}") if reactions: reaction_elt = domish.Element( (NS_PUBSUB_ATTACHMENTS, "reaction"), attribs = { "timestamp": xmpp_date() } ) reaction_elt.addContent(reactions) return reaction_elt else: return None @implementer(iwokkel.IDisco) class PubsubAttachments_Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []