Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0372.py @ 4230:314d3c02bb67
core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:21:04 +0200 |
parents | 4b842c1fb686 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for XEP-0372 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Dict, Union from textwrap import dedent from libervia.backend.core import exceptions from libervia.backend.tools.common import uri as xmpp_uri from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from zope.interface import implementer from wokkel import disco, iwokkel from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.tools.common import data_format log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "References", C.PI_IMPORT_NAME: "XEP-0372", C.PI_TYPE: C.PLUG_TYPE_XEP, C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0372"], C.PI_DEPENDENCIES: ["XEP-0334"], C.PI_MAIN: "XEP_0372", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _(dedent("""\ XEP-0372 (References) implementation This plugin implement generic references and mentions. """)), } NS_REFS = "urn:xmpp:reference:0" ALLOWED_TYPES = ("mention", "data") class XEP_0372: namespace = NS_REFS def __init__(self, host): log.info(_("References plugin initialization")) host.register_namespace("refs", NS_REFS) self.host = host self._h = host.plugins["XEP-0334"] host.trigger.add("message_received", self._message_received_trigger) host.bridge.add_method( "reference_send", ".plugin", in_sign="sssss", out_sign="", method=self._send_reference, async_=False, ) def get_handler(self, client): return XEP_0372_Handler() def ref_element_to_ref_data( self, reference_elt: domish.Element ) -> Dict[str, Union[str, int, dict]]: ref_data: Dict[str, Union[str, int, dict]] = { "uri": reference_elt["uri"], "type": reference_elt["type"] } if ref_data["uri"].startswith("xmpp:"): ref_data["parsed_uri"] = xmpp_uri.parse_xmpp_uri(ref_data["uri"]) for attr in ("begin", "end"): try: ref_data[attr] = int(reference_elt[attr]) except (KeyError, ValueError, TypeError): continue anchor = reference_elt.getAttribute("anchor") if anchor is not None: ref_data["anchor"] = anchor if anchor.startswith("xmpp:"): ref_data["parsed_anchor"] = xmpp_uri.parse_xmpp_uri(anchor) return ref_data async def _message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred ) -> bool: """Check if a direct invitation is in the message, and handle it""" reference_elt = next(message_elt.elements(NS_REFS, "reference"), None) if reference_elt is None: return True try: ref_data = self.ref_element_to_ref_data(reference_elt) except KeyError: log.warning("invalid <reference> element: {reference_elt.toXml}") return True if not await self.host.trigger.async_point( "XEP-0372_ref_received", client, message_elt, ref_data ): return False return True def build_ref_element( self, uri: str, type_: str = "mention", begin: Optional[int] = None, end: Optional[int] = None, anchor: Optional[str] = None, ) -> domish.Element: """Build and return the <reference> element""" if type_ not in ALLOWED_TYPES: raise ValueError(f"Unknown type: {type_!r}") reference_elt = domish.Element( (NS_REFS, "reference"), attribs={"uri": uri, "type": type_} ) if begin is not None: reference_elt["begin"] = str(begin) if end is not None: reference_elt["end"] = str(end) if anchor is not None: reference_elt["anchor"] = anchor return reference_elt def _send_reference( self, recipient: str, anchor: str, type_: str, extra_s: str, profile_key: str ) -> defer.Deferred: recipient_jid = jid.JID(recipient) client = self.host.get_client(profile_key) extra: dict = data_format.deserialise(extra_s, default={}) self.send_reference( client, uri=extra.get("uri"), type_=type_, anchor=anchor, to_jid=recipient_jid ) def send_reference( self, client: "SatXMPPEntity", uri: Optional[str] = None, type_: str = "mention", begin: Optional[int] = None, end: Optional[int] = None, anchor: Optional[str] = None, message_elt: Optional[domish.Element] = None, to_jid: Optional[jid.JID] = None ) -> None: """Build and send a reference_elt @param uri: URI pointing to referenced object (XMPP entity, Pubsub Item, etc) if not set, "to_jid" will be used to build an URI to the entity @param type_: type of reference one of [ALLOWED_TYPES] @param begin: optional begin index @param end: optional end index @param anchor: URI of refering object (message, pubsub item), when the refence is not already in the wrapping message element. In other words, it's the object where the reference appears. @param message_elt: wrapping <message> element, if not set a new one will be generated @param to_jid: destinee of the reference. If not specified, "to" attribute of message_elt will be used. """ if uri is None: if to_jid is None: raise exceptions.InternalError( '"to_jid" must be set if "uri is None"' ) uri = xmpp_uri.build_xmpp_uri(path=to_jid.full()) if message_elt is None: message_elt = domish.Element((None, "message")) if to_jid is not None: message_elt["to"] = to_jid.full() else: try: to_jid = jid.JID(message_elt["to"]) except (KeyError, RuntimeError): raise exceptions.InternalError( 'invalid "to" attribute in given message element: ' '{message_elt.toXml()}' ) message_elt.addChild(self.build_ref_element(uri, type_, begin, end, anchor)) self._h.add_hint_elements(message_elt, [self._h.HINT_STORE]) client.send(message_elt) @implementer(iwokkel.IDisco) class XEP_0372_Handler(XMPPHandler): def getDiscoInfo(self, requestor, service, nodeIdentifier=""): return [disco.DiscoFeature(NS_REFS)] def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []