Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0353.py @ 4230:314d3c02bb67
core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:21:04 +0200 |
parents | 6784d07b99c8 |
children | e11b13418ba6 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Jingle Message Initiation (XEP-0353) # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from twisted.internet import defer from twisted.internet import reactor from twisted.words.protocols.jabber import error, jid from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger log = getLogger(__name__) NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0" PLUGIN_INFO = { C.PI_NAME: "Jingle Message Initiation", C.PI_IMPORT_NAME: "XEP-0353", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0353"], C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0334"], C.PI_MAIN: "XEP_0353", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of Jingle Message Initiation"""), } class RejectException(exceptions.CancelError): def __init__(self, reason: str, text: str|None = None): super().__init__(text) self.reason = reason class RetractException(exceptions.CancelError): pass class XEP_0353: def __init__(self, host): log.info(_("plugin {name} initialization").format(name=PLUGIN_INFO[C.PI_NAME])) self.host = host host.register_namespace("jingle-message", NS_JINGLE_MESSAGE) self._j = host.plugins["XEP-0166"] self._h = host.plugins["XEP-0334"] host.trigger.add_with_check( "XEP-0166_initiate_elt_built", self, self._on_initiate_trigger, # this plugin set the resource, we want it to happen first to other trigger # can get the full peer JID priority=host.trigger.MAX_PRIORITY, ) host.trigger.add_with_check( "XEP-0166_terminate", self, self._terminate_trigger, priority=host.trigger.MAX_PRIORITY, ) host.trigger.add("message_received", self._on_message_received) def get_handler(self, client): return Handler() def profile_connecting(self, client): # mapping from session id to deferred used to wait for destinee answer client._xep_0353_pending_sessions = {} def build_message_data(self, client, peer_jid, verb, session_id): mess_data = { "from": client.jid, "to": peer_jid, "uid": "", "message": {}, "type": C.MESS_TYPE_CHAT, "subject": {}, "extra": {}, } client.generate_message_xml(mess_data) message_elt = mess_data["xml"] verb_elt = message_elt.addElement((NS_JINGLE_MESSAGE, verb)) verb_elt["id"] = session_id self._h.add_hint_elements(message_elt, [self._h.HINT_STORE]) return mess_data async def _on_initiate_trigger( self, client: SatXMPPEntity, session: dict, iq_elt: domish.Element, jingle_elt: domish.Element, ) -> bool: peer_jid = session["peer_jid"] if peer_jid.resource: return True try: infos = await self.host.memory.disco.get_infos(client, peer_jid) except error.StanzaError as e: if e.condition == "service-unavailable": categories = {} else: raise e else: categories = {c for c, __ in infos.identities} if "component" in categories: # we don't use message initiation with components return True if peer_jid.userhostJID() not in client.roster: # if the contact is not in our roster, we need to send a directed presence # according to XEP-0353 §3.1 await client.presence.available(peer_jid) mess_data = self.build_message_data(client, peer_jid, "propose", session["id"]) message_elt = mess_data["xml"] for content_data in session["contents"].values(): # we get the full element build by the application plugin jingle_description_elt = content_data["application_data"]["desc_elt"] # and copy it to only keep the root <description> element, no children description_elt = domish.Element( (jingle_description_elt.uri, jingle_description_elt.name), defaultUri=jingle_description_elt.defaultUri, attribs=jingle_description_elt.attributes, localPrefixes=jingle_description_elt.localPrefixes, ) message_elt.propose.addChild(description_elt) response_d = defer.Deferred() # we wait for 2 min before cancelling the session init # FIXME: let's application decide timeout? response_d.addTimeout(2 * 60, reactor) client._xep_0353_pending_sessions[session["id"]] = response_d try: await client.send_message_data(mess_data) accepting_jid = await response_d except defer.TimeoutError: log.warning( _("Message initiation with {peer_jid} timed out").format( peer_jid=peer_jid ) ) except exceptions.CancelError as e: for content in session["contents"].values(): await content["application"].handler.jingle_preflight_cancel( client, session, e ) self._j.delete_session(client, session["id"]) return False else: if iq_elt["to"] != accepting_jid.userhost(): raise exceptions.InternalError( f"<jingle> 'to' attribute ({iq_elt['to']!r}) must not differ " f"from bare JID of the accepting entity ({accepting_jid!r}), this " "may be a sign of an internal bug, a hack attempt, or a MITM attack!" ) iq_elt["to"] = accepting_jid.full() session["peer_jid"] = accepting_jid finally: del client._xep_0353_pending_sessions[session["id"]] return True def _terminate_trigger( self, client: SatXMPPEntity, session: dict, reason_elt: domish.Element ) -> bool: session_id = session["id"] try: response_d = client._xep_0353_pending_sessions[session_id] except KeyError: return True # we have a XEP-0353 session, that means that we are retracting a proposed session mess_data = self.build_message_data( client, session["peer_jid"], "retract", session_id ) defer.ensureDeferred(client.send_message_data(mess_data)) response_d.errback(RetractException()) return False async def _on_message_received(self, client, message_elt, post_treat): for elt in message_elt.elements(): if elt.uri == NS_JINGLE_MESSAGE: if elt.name == "propose": return await self._handle_propose(client, message_elt, elt) elif elt.name == "retract": return self._handle_retract(client, message_elt, elt) elif elt.name == "proceed": return self._handle_proceed(client, message_elt, elt) elif elt.name == "accept": return self._handle_accept(client, message_elt, elt) elif elt.name == "reject": return self._handle_reject(client, message_elt, elt) elif elt.name == "ringing": return await self._handle_ringing(client, message_elt, elt) else: log.warning(f"invalid element: {elt.toXml}") return True return True def _get_sid_and_response_d( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, defer.Deferred]: """Retrieve session ID and response_d from response element""" try: session_id = elt["id"] except KeyError as e: assert elt.parent is not None log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") raise e try: response_d = client._xep_0353_pending_sessions[session_id] except KeyError as e: log.warning( _( "no pending session found with id {session_id}, did it timed out?" ).format(session_id=session_id) ) raise e return session_id, response_d async def _handle_propose(self, client, message_elt, elt): peer_jid = jid.JID(message_elt["from"]) local_jid = jid.JID(message_elt["to"]) session_id = elt["id"] try: desc_and_apps = [ (description_elt, self._j.get_application(description_elt.uri)) for description_elt in elt.elements() if description_elt.name == "description" ] if not desc_and_apps: raise AttributeError except AttributeError: log.warning(f"Invalid propose element: {message_elt.toXml()}") return False except exceptions.NotFound: log.warning( f"There is not registered application to handle this " f"proposal: {elt.toXml()}" ) return False if not desc_and_apps: log.warning("No application specified: {message_elt.toXml()}") return False session = self._j.create_session( client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid ) is_in_roster = peer_jid.userhostJID() in client.roster if is_in_roster: # we indicate that device is ringing as explained in # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user # is in roster to avoid presence leak of all our devices. mess_data = self.build_message_data(client, peer_jid, "ringing", session_id) await client.send_message_data(mess_data) for description_elt, application in desc_and_apps: try: await application.handler.jingle_preflight( client, session, description_elt ) except exceptions.CancelError as e: log.info(f"{client.profile} refused the session: {e}") if is_in_roster: # peer is in our roster, we send reject to them, ou other devices will # get carbon copies reject_dest_jid = peer_jid else: # peer is not in our roster, we send the "reject" only to our own # devices to make them stop ringing/doing notification, and we don't # send anything to peer to avoid presence leak. reject_dest_jid = client.jid.userhostJID() mess_data = self.build_message_data( client, reject_dest_jid, "reject", session_id ) await client.send_message_data(mess_data) self._j.delete_session(client, session_id) return False except defer.CancelledError: # raised when call is retracted before user can reply self._j.delete_session(client, session_id) return False if peer_jid.userhostJID() not in client.roster: await client.presence.available(peer_jid) mess_data = self.build_message_data(client, peer_jid, "proceed", session_id) await client.send_message_data(mess_data) return False def _handle_retract(self, client, message_elt, retract_elt): try: session = self._j.get_session(client, retract_elt["id"]) except KeyError: log.warning(f"invalid retract element: {message_elt.toXml()}") return False except exceptions.NotFound: log.warning(f"no session found with ID {retract_elt['id']}") return False log.debug( f"{message_elt['from']} are retracting their proposal {retract_elt['id']}" ) try: cancellable_deferred = session["cancellable_deferred"] if not cancellable_deferred: raise KeyError except KeyError: self._j.delete_session(client, session["id"]) else: for d in cancellable_deferred: d.cancel() return False def _handle_proceed(self, client, message_elt, proceed_elt): try: __, response_d = self._get_sid_and_response_d(client, proceed_elt) except KeyError: return True response_d.callback(jid.JID(message_elt["from"])) return False def _handle_accept(self, client, message_elt, accept_elt): pass def _handle_reject(self, client, message_elt, reject_elt): try: __, response_d = self._get_sid_and_response_d(client, reject_elt) except KeyError: return True reason_elt = self._j.get_reason_elt(reject_elt) reason, text = self._j.parse_reason_elt(reason_elt) if reason is None: reason = "busy" response_d.errback(RejectException(reason, text)) return False async def _handle_ringing(self, client, message_elt, ringing_elt): session_id = ringing_elt["id"] try: session = self._j.get_session(client, session_id) except exceptions.NotFound: log.warning(f"Session {session_id!r} unknown, ignoring ringing.") return False for __, content_data in session["contents"].items(): await content_data["application"].handler.jingle_preflight_info( client, session, "ringing", None ) return False @implementer(iwokkel.IDisco) class Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_MESSAGE)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []