# HG changeset patch # User Goffi # Date 1657458965 -7200 # Node ID 68a11b95a7d3b4d717118cb0e89ddecc721f8cd0 # Parent 67fc066ed2cdb2e8ff18a6fa44a2b159f0cd72c2 plugin XEP-0372: References implementation: rel 369 diff -r 67fc066ed2cd -r 68a11b95a7d3 sat/plugins/plugin_xep_0334.py --- a/sat/plugins/plugin_xep_0334.py Sat Jul 09 16:30:37 2022 +0200 +++ b/sat/plugins/plugin_xep_0334.py Sun Jul 10 15:16:05 2022 +0200 @@ -37,6 +37,7 @@ C.PI_NAME: "Message Processing Hints", C.PI_IMPORT_NAME: "XEP-0334", C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0334"], C.PI_MAIN: "XEP_0334", C.PI_HANDLER: "yes", diff -r 67fc066ed2cd -r 68a11b95a7d3 sat/plugins/plugin_xep_0372.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0372.py Sun Jul 10 15:16:05 2022 +0200 @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 + +# Libervia plugin for XEP-0372 +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Optional, Dict, Union +from textwrap import dedent +from sat.core import exceptions +from sat.tools.common import uri as xmpp_uri + +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from zope.interface import implementer +from wokkel import disco, iwokkel + +from sat.core.constants import Const as C +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.core.core_types import SatXMPPEntity +from sat.tools.common import data_format + + +log = getLogger(__name__) + +PLUGIN_INFO = { + C.PI_NAME: "References", + C.PI_IMPORT_NAME: "XEP-0372", + C.PI_TYPE: C.PLUG_TYPE_XEP, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0372"], + C.PI_DEPENDENCIES: ["XEP-0334"], + C.PI_MAIN: "XEP_0372", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _(dedent("""\ + XEP-0372 (References) implementation + + This plugin implement generic references and mentions. + """)), +} + +NS_REFS = "urn:xmpp:reference:0" +ALLOWED_TYPES = ("mention", "data") + + +class XEP_0372: + namespace = NS_REFS + + def __init__(self, host): + log.info(_("References plugin initialization")) + host.registerNamespace("refs", NS_REFS) + self.host = host + self._h = host.plugins["XEP-0334"] + host.trigger.add("messageReceived", self._messageReceivedTrigger) + host.bridge.addMethod( + "referenceSend", + ".plugin", + in_sign="sssss", + out_sign="", + method=self._sendReference, + async_=False, + ) + + def getHandler(self, client): + return XEP_0372_Handler() + + def refElementToRefData( + self, + reference_elt: domish.Element + ) -> Dict[str, Union[str, int, dict]]: + ref_data: Dict[str, Union[str, int, dict]] = { + "uri": reference_elt["uri"], + "type": reference_elt["type"] + } + + if ref_data["uri"].startswith("xmpp:"): + ref_data["parsed_uri"] = xmpp_uri.parseXMPPUri(ref_data["uri"]) + + for attr in ("begin", "end"): + try: + ref_data[attr] = int(reference_elt[attr]) + except (KeyError, ValueError, TypeError): + continue + + anchor = reference_elt.getAttribute("anchor") + if anchor is not None: + ref_data["anchor"] = anchor + if anchor.startswith("xmpp:"): + ref_data["parsed_anchor"] = xmpp_uri.parseXMPPUri(anchor) + return ref_data + + async def _messageReceivedTrigger( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + post_treat: defer.Deferred + ) -> bool: + """Check if a direct invitation is in the message, and handle it""" + reference_elt = next(message_elt.elements(NS_REFS, "reference"), None) + if reference_elt is None: + return True + try: + ref_data = self.refElementToRefData(reference_elt) + except KeyError: + log.warning("invalid element: {reference_elt.toXml}") + return True + + if not await self.host.trigger.asyncPoint( + "XEP-0372_ref_received", client, message_elt, ref_data + ): + return False + return True + + def buildRefElement( + self, + uri: str, + type_: str = "mention", + begin: Optional[int] = None, + end: Optional[int] = None, + anchor: Optional[str] = None, + ) -> domish.Element: + """Build and return the element""" + if type_ not in ALLOWED_TYPES: + raise ValueError(f"Unknown type: {type_!r}") + reference_elt = domish.Element( + (NS_REFS, "reference"), + attribs={"uri": uri, "type": type_} + ) + if begin is not None: + reference_elt["begin"] = str(begin) + if end is not None: + reference_elt["end"] = str(end) + if anchor is not None: + reference_elt["anchor"] = anchor + return reference_elt + + def _sendReference( + self, + recipient: str, + anchor: str, + type_: str, + extra_s: str, + profile_key: str + ) -> defer.Deferred: + recipient_jid = jid.JID(recipient) + client = self.host.getClient(profile_key) + extra: dict = data_format.deserialise(extra_s, default={}) + self.sendReference( + client, + uri=extra.get("uri"), + type_=type_, + anchor=anchor, + to_jid=recipient_jid + ) + + def sendReference( + self, + client: "SatXMPPEntity", + uri: Optional[str] = None, + type_: str = "mention", + begin: Optional[int] = None, + end: Optional[int] = None, + anchor: Optional[str] = None, + message_elt: Optional[domish.Element] = None, + to_jid: Optional[jid.JID] = None + ) -> None: + """Build and send a reference_elt + + @param uri: URI pointing to referenced object (XMPP entity, Pubsub Item, etc) + if not set, "to_jid" will be used to build an URI to the entity + @param type_: type of reference + one of [ALLOWED_TYPES] + @param begin: optional begin index + @param end: optional end index + @param anchor: URI of refering object (message, pubsub item), when the refence + is not already in the wrapping message element. In other words, it's the + object where the reference appears. + @param message_elt: wrapping element, if not set a new one will be + generated + @param to_jid: destinee of the reference. If not specified, "to" attribute of + message_elt will be used. + + """ + if uri is None: + if to_jid is None: + raise exceptions.InternalError( + '"to_jid" must be set if "uri is None"' + ) + uri = xmpp_uri.buildXMPPUri(path=to_jid.full()) + if message_elt is None: + message_elt = domish.Element((None, "message")) + + if to_jid is not None: + message_elt["to"] = to_jid.full() + else: + try: + to_jid = jid.JID(message_elt["to"]) + except (KeyError, RuntimeError): + raise exceptions.InternalError( + 'invalid "to" attribute in given message element: ' + '{message_elt.toXml()}' + ) + + message_elt.addChild(self.buildRefElement(uri, type_, begin, end, anchor)) + self._h.addHintElements(message_elt, [self._h.HINT_STORE]) + client.send(message_elt) + + +@implementer(iwokkel.IDisco) +class XEP_0372_Handler(XMPPHandler): + + def getDiscoInfo(self, requestor, service, nodeIdentifier=""): + return [disco.DiscoFeature(NS_REFS)] + + def getDiscoItems(self, requestor, service, nodeIdentifier=""): + return []