Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0353.py @ 4310:d27228b3c704
test (unit): add test for email gateway:
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Jingle Message Initiation (XEP-0353) # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import cast from twisted.internet import defer from twisted.internet import reactor from twisted.words.protocols.jabber import error, jid from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish from libervia.backend.plugins.plugin_xep_0166.models import ApplicationData from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.tools.xml_tools import element_copy try: from .plugin_xep_0167 import NS_JINGLE_RTP except ImportError: NS_JINGLE_RTP = None log = getLogger(__name__) NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0" PLUGIN_INFO = { C.PI_NAME: "Jingle Message Initiation", C.PI_IMPORT_NAME: "XEP-0353", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0353"], C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0334"], C.PI_MAIN: "XEP_0353", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of Jingle Message Initiation"""), } class RejectException(exceptions.CancelError): def __init__(self, reason: str, text: str | None = None): super().__init__(text) self.reason = reason class TakenByOtherDeviceException(exceptions.CancelError): reason: str = "taken_by_other_device" def __init__(self, device_jid: jid.JID): super().__init__(device_jid.full()) self.device_jid = device_jid class RetractException(exceptions.CancelError): pass class XEP_0353: def __init__(self, host): log.info(_("plugin {name} initialization").format(name=PLUGIN_INFO[C.PI_NAME])) self.host = host host.register_namespace("jingle-message", NS_JINGLE_MESSAGE) self._j = host.plugins["XEP-0166"] self._h = host.plugins["XEP-0334"] host.trigger.add_with_check( "XEP-0166_initiate_elt_built", self, self._on_initiate_trigger, # this plugin set the resource, we want it to happen first so other triggers # can get the full peer JID priority=host.trigger.MAX_PRIORITY, ) host.trigger.add_with_check( "XEP-0166_terminate", self, self._terminate_trigger, priority=host.trigger.MAX_PRIORITY, ) host.trigger.add("message_received", self._on_message_received) def get_handler(self, client): return Handler() def profile_connecting(self, client): # mapping from session id to deferred used to wait for destinee answer client._xep_0353_pending_sessions = {} def build_message_data(self, client, peer_jid, verb, session_id): mess_data = { "from": client.jid, "to": peer_jid, "uid": "", "message": {}, "type": C.MESS_TYPE_CHAT, "subject": {}, "extra": {}, } client.generate_message_xml(mess_data) message_elt = mess_data["xml"] verb_elt = message_elt.addElement((NS_JINGLE_MESSAGE, verb)) verb_elt["id"] = session_id self._h.add_hint_elements(message_elt, [self._h.HINT_STORE]) return mess_data async def _on_initiate_trigger( self, client: SatXMPPEntity, session: dict, iq_elt: domish.Element, jingle_elt: domish.Element, ) -> bool: peer_jid = session["peer_jid"] if peer_jid.resource: return True try: infos = await self.host.memory.disco.get_infos(client, peer_jid) except error.StanzaError as e: if e.condition == "service-unavailable": categories = {} else: raise e else: categories = {c for c, __ in infos.identities} if "component" in categories: # we don't use message initiation with components return True if peer_jid.userhostJID() not in client.roster: # if the contact is not in our roster, we need to send a directed presence # according to XEP-0353 §3.1 await client.presence.available(peer_jid) mess_data = self.build_message_data(client, peer_jid, "propose", session["id"]) message_elt = mess_data["xml"] for content_data in session["contents"].values(): # we get the full element build by the application plugin jingle_description_elt = content_data["application_data"]["desc_elt"] # we need to copy the element if jingle_description_elt.uri == NS_JINGLE_RTP: # for RTP, we only keep the root <description> element, no children description_elt = domish.Element( (jingle_description_elt.uri, jingle_description_elt.name), defaultUri=jingle_description_elt.defaultUri, attribs=jingle_description_elt.attributes, localPrefixes=jingle_description_elt.localPrefixes, ) else: # Otherwise we keep the children to have application useful data description_elt = element_copy(jingle_description_elt, with_parent=False) message_elt.propose.addChild(description_elt) response_d = defer.Deferred() # we wait for 2 min before cancelling the session init # FIXME: let's application decide timeout? response_d.addTimeout(2 * 60, reactor) client._xep_0353_pending_sessions[session["id"]] = response_d try: await client.send_message_data(mess_data) accepting_jid = await response_d except defer.TimeoutError: log.warning( _("Message initiation with {peer_jid} timed out").format( peer_jid=peer_jid ) ) except exceptions.CancelError as e: for content in session["contents"].values(): await content["application"].handler.jingle_preflight_cancel( client, session, e ) self._j.delete_session(client, session["id"]) return False else: if iq_elt["to"] != accepting_jid.userhost(): raise exceptions.InternalError( f"<jingle> 'to' attribute ({iq_elt['to']!r}) must not differ " f"from bare JID of the accepting entity ({accepting_jid!r}), this " "may be a sign of an internal bug, a hack attempt, or a MITM attack!" ) iq_elt["to"] = accepting_jid.full() session["peer_jid"] = accepting_jid finally: del client._xep_0353_pending_sessions[session["id"]] return True def _terminate_trigger( self, client: SatXMPPEntity, session: dict, reason_elt: domish.Element ) -> bool: session_id = session["id"] try: response_d = client._xep_0353_pending_sessions[session_id] except KeyError: return True # we have a XEP-0353 session, that means that we are retracting a proposed session mess_data = self.build_message_data( client, session["peer_jid"], "retract", session_id ) defer.ensureDeferred(client.send_message_data(mess_data)) response_d.errback(RetractException()) return False async def _on_message_received(self, client, message_elt, post_treat): for elt in message_elt.elements(): if elt.uri == NS_JINGLE_MESSAGE: # We use ensureDeferred to process the message initiation workflow in # parallel and to avoid blocking the message queue. defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt)) return False return True async def _handle_mess_init( self, client: SatXMPPEntity, message_elt: domish.Element, mess_init_elt: domish.Element, ) -> None: if mess_init_elt.name == "propose": await self._handle_propose(client, message_elt, mess_init_elt) elif mess_init_elt.name == "retract": self._handle_retract(client, message_elt, mess_init_elt) elif mess_init_elt.name == "proceed": self._handle_proceed(client, message_elt, mess_init_elt) elif mess_init_elt.name == "accept": self._handle_accept(client, message_elt, mess_init_elt) elif mess_init_elt.name == "reject": self._handle_reject(client, message_elt, mess_init_elt) elif mess_init_elt.name == "ringing": await self._handle_ringing(client, message_elt, mess_init_elt) else: log.warning(f"invalid element: {mess_init_elt.toXml}") def _get_sid_and_session_d( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, defer.Deferred | list[defer.Deferred]]: """Retrieve session ID and deferred or list of deferred from response element""" try: session_id = elt["id"] except KeyError as e: assert elt.parent is not None log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") raise e try: session_d = client._xep_0353_pending_sessions[session_id] except KeyError as e: log.warning( _( "no pending session found with id {session_id}, did it timed out?" ).format(session_id=session_id) ) raise e return session_id, session_d def _get_sid_and_response_d( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, defer.Deferred]: """Retrieve session ID and response_d from response element""" session_id, response_d = self._get_sid_and_session_d(client, elt) assert isinstance(response_d, defer.Deferred) return session_id, response_d def _get_sid_and_preflight_d_list( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, list[defer.Deferred]]: """Retrieve session ID and list of preflight_d from response element""" session_id, preflight_d_list = self._get_sid_and_session_d(client, elt) assert isinstance(preflight_d_list, list) return session_id, preflight_d_list async def _handle_propose( self, client: SatXMPPEntity, message_elt: domish.Element, elt: domish.Element ) -> None: peer_jid = jid.JID(message_elt["from"]) local_jid = jid.JID(message_elt["to"]) session_id = elt["id"] try: desc_and_apps = [ (description_elt, self._j.get_application(description_elt.uri)) for description_elt in elt.elements() if description_elt.name == "description" ] if not desc_and_apps: raise AttributeError except AttributeError: log.warning(f"Invalid propose element: {message_elt.toXml()}") return except exceptions.NotFound: log.warning( f"There is not registered application to handle this " f"proposal: {elt.toXml()}" ) return if not desc_and_apps: log.warning("No application specified: {message_elt.toXml()}") return cast(list[tuple[domish.Element, ApplicationData]], desc_and_apps) desc_and_apps.sort( key=lambda desc_and_app: desc_and_app[1].priority, reverse=True ) session = self._j.create_session( client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid ) is_in_roster = peer_jid.userhostJID() in client.roster if is_in_roster: # we indicate that device is ringing as explained in # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user # is in roster to avoid presence leak of all our devices. mess_data = self.build_message_data(client, peer_jid, "ringing", session_id) await client.send_message_data(mess_data) try: for description_elt, application in desc_and_apps: try: preflight_d = defer.ensureDeferred( application.handler.jingle_preflight( client, session, description_elt ) ) client._xep_0353_pending_sessions.setdefault(session_id, []).append( preflight_d ) await preflight_d except TakenByOtherDeviceException as e: log.info(f"The call has been takend by {e.device_jid}") await application.handler.jingle_preflight_cancel(client, session, e) self._j.delete_session(client, session_id) return except exceptions.CancelError as e: log.info(f"{client.profile} refused the session: {e}") if is_in_roster: # peer is in our roster, we send reject to them, ou other devices # will get carbon copies reject_dest_jid = peer_jid else: # peer is not in our roster, we send the "reject" only to our own # devices to make them stop ringing/doing notification, and we # don't send anything to peer to avoid presence leak. reject_dest_jid = client.jid.userhostJID() mess_data = self.build_message_data( client, reject_dest_jid, "reject", session_id ) await client.send_message_data(mess_data) self._j.delete_session(client, session_id) return except defer.CancelledError: # raised when call is retracted before user can reply self._j.delete_session(client, session_id) return finally: try: del client._xep_0353_pending_sessions[session_id] except KeyError: pass if peer_jid.userhostJID() not in client.roster: await client.presence.available(peer_jid) mess_data = self.build_message_data(client, peer_jid, "proceed", session_id) await client.send_message_data(mess_data) def _handle_retract(self, client, message_elt, retract_elt): try: session = self._j.get_session(client, retract_elt["id"]) except KeyError: log.warning(f"invalid retract element: {message_elt.toXml()}") return False except exceptions.NotFound: log.warning(f"no session found with ID {retract_elt['id']}") return False log.debug( f"{message_elt['from']} are retracting their proposal {retract_elt['id']}" ) try: cancellable_deferred = session["cancellable_deferred"] if not cancellable_deferred: raise KeyError except KeyError: self._j.delete_session(client, session["id"]) else: for d in cancellable_deferred: d.cancel() return False def _handle_proceed( self, client: SatXMPPEntity, message_elt: domish.Element, proceed_elt: domish.Element, ) -> None: from_jid = jid.JID(message_elt["from"]) # session_d is the deferred of the session, it can be preflight_d or response_d if from_jid.userhostJID() == client.jid.userhostJID(): # an other device took the session try: sid, preflight_d_list = self._get_sid_and_preflight_d_list( client, proceed_elt ) except KeyError: return for preflight_d in preflight_d_list: if not preflight_d.called: preflight_d.errback(TakenByOtherDeviceException(from_jid)) try: session = self._j.get_session(client, sid) except exceptions.NotFound: log.warning("No session found with sid {sid!r}.") else: # jingle_preflight_cancel? pass # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always # cleaned correctly (use a timeout?) else: try: __, response_d = self._get_sid_and_response_d(client, proceed_elt) except KeyError: return # we have a response deferred response_d.callback(jid.JID(message_elt["from"])) def _handle_accept(self, client, message_elt, accept_elt): pass def _handle_reject(self, client, message_elt, reject_elt): try: __, response_d = self._get_sid_and_response_d(client, reject_elt) except KeyError: return True reason_elt = self._j.get_reason_elt(reject_elt) reason, text = self._j.parse_reason_elt(reason_elt) if reason is None: reason = "busy" response_d.errback(RejectException(reason, text)) return False async def _handle_ringing(self, client, message_elt, ringing_elt): session_id = ringing_elt["id"] try: session = self._j.get_session(client, session_id) except exceptions.NotFound: log.warning(f"Session {session_id!r} unknown, ignoring ringing.") return False for __, content_data in session["contents"].items(): await content_data["application"].handler.jingle_preflight_info( client, session, "ringing", None ) return False @implementer(iwokkel.IDisco) class Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_MESSAGE)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []