Mercurial > libervia-backend
view sat/plugins/plugin_xep_0372.py @ 3875:a0666f17f300
plugin merge-requests: fix `await` use on blocking method
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 23 Aug 2022 13:00:07 +0200 |
parents | 68a11b95a7d3 |
children | 524856bd7b19 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for XEP-0372 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Dict, Union from textwrap import dedent from sat.core import exceptions from sat.tools.common import uri as xmpp_uri from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from zope.interface import implementer from wokkel import disco, iwokkel from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger from sat.core.core_types import SatXMPPEntity from sat.tools.common import data_format log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "References", C.PI_IMPORT_NAME: "XEP-0372", C.PI_TYPE: C.PLUG_TYPE_XEP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0372"], C.PI_DEPENDENCIES: ["XEP-0334"], C.PI_MAIN: "XEP_0372", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _(dedent("""\ XEP-0372 (References) implementation This plugin implement generic references and mentions. """)), } NS_REFS = "urn:xmpp:reference:0" ALLOWED_TYPES = ("mention", "data") class XEP_0372: namespace = NS_REFS def __init__(self, host): log.info(_("References plugin initialization")) host.registerNamespace("refs", NS_REFS) self.host = host self._h = host.plugins["XEP-0334"] host.trigger.add("messageReceived", self._messageReceivedTrigger) host.bridge.addMethod( "referenceSend", ".plugin", in_sign="sssss", out_sign="", method=self._sendReference, async_=False, ) def getHandler(self, client): return XEP_0372_Handler() def refElementToRefData( self, reference_elt: domish.Element ) -> Dict[str, Union[str, int, dict]]: ref_data: Dict[str, Union[str, int, dict]] = { "uri": reference_elt["uri"], "type": reference_elt["type"] } if ref_data["uri"].startswith("xmpp:"): ref_data["parsed_uri"] = xmpp_uri.parseXMPPUri(ref_data["uri"]) for attr in ("begin", "end"): try: ref_data[attr] = int(reference_elt[attr]) except (KeyError, ValueError, TypeError): continue anchor = reference_elt.getAttribute("anchor") if anchor is not None: ref_data["anchor"] = anchor if anchor.startswith("xmpp:"): ref_data["parsed_anchor"] = xmpp_uri.parseXMPPUri(anchor) return ref_data async def _messageReceivedTrigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred ) -> bool: """Check if a direct invitation is in the message, and handle it""" reference_elt = next(message_elt.elements(NS_REFS, "reference"), None) if reference_elt is None: return True try: ref_data = self.refElementToRefData(reference_elt) except KeyError: log.warning("invalid <reference> element: {reference_elt.toXml}") return True if not await self.host.trigger.asyncPoint( "XEP-0372_ref_received", client, message_elt, ref_data ): return False return True def buildRefElement( self, uri: str, type_: str = "mention", begin: Optional[int] = None, end: Optional[int] = None, anchor: Optional[str] = None, ) -> domish.Element: """Build and return the <reference> element""" if type_ not in ALLOWED_TYPES: raise ValueError(f"Unknown type: {type_!r}") reference_elt = domish.Element( (NS_REFS, "reference"), attribs={"uri": uri, "type": type_} ) if begin is not None: reference_elt["begin"] = str(begin) if end is not None: reference_elt["end"] = str(end) if anchor is not None: reference_elt["anchor"] = anchor return reference_elt def _sendReference( self, recipient: str, anchor: str, type_: str, extra_s: str, profile_key: str ) -> defer.Deferred: recipient_jid = jid.JID(recipient) client = self.host.getClient(profile_key) extra: dict = data_format.deserialise(extra_s, default={}) self.sendReference( client, uri=extra.get("uri"), type_=type_, anchor=anchor, to_jid=recipient_jid ) def sendReference( self, client: "SatXMPPEntity", uri: Optional[str] = None, type_: str = "mention", begin: Optional[int] = None, end: Optional[int] = None, anchor: Optional[str] = None, message_elt: Optional[domish.Element] = None, to_jid: Optional[jid.JID] = None ) -> None: """Build and send a reference_elt @param uri: URI pointing to referenced object (XMPP entity, Pubsub Item, etc) if not set, "to_jid" will be used to build an URI to the entity @param type_: type of reference one of [ALLOWED_TYPES] @param begin: optional begin index @param end: optional end index @param anchor: URI of refering object (message, pubsub item), when the refence is not already in the wrapping message element. In other words, it's the object where the reference appears. @param message_elt: wrapping <message> element, if not set a new one will be generated @param to_jid: destinee of the reference. If not specified, "to" attribute of message_elt will be used. """ if uri is None: if to_jid is None: raise exceptions.InternalError( '"to_jid" must be set if "uri is None"' ) uri = xmpp_uri.buildXMPPUri(path=to_jid.full()) if message_elt is None: message_elt = domish.Element((None, "message")) if to_jid is not None: message_elt["to"] = to_jid.full() else: try: to_jid = jid.JID(message_elt["to"]) except (KeyError, RuntimeError): raise exceptions.InternalError( 'invalid "to" attribute in given message element: ' '{message_elt.toXml()}' ) message_elt.addChild(self.buildRefElement(uri, type_, begin, end, anchor)) self._h.addHintElements(message_elt, [self._h.HINT_STORE]) client.send(message_elt) @implementer(iwokkel.IDisco) class XEP_0372_Handler(XMPPHandler): def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_REFS)] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []