Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0353.py @ 4234:67de9ed101aa
docker (e2e): add GStreamer dependencies to test WebRTC stack:
rel 424
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 15:04:01 +0200 |
parents | e11b13418ba6 |
children | 79c8a70e1813 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Jingle Message Initiation (XEP-0353) # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from twisted.internet import defer from twisted.internet import reactor from twisted.words.protocols.jabber import error, jid from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.tools.xml_tools import element_copy try: from .plugin_xep_0167 import NS_JINGLE_RTP except ImportError: NS_JINGLE_RTP = None log = getLogger(__name__) NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0" PLUGIN_INFO = { C.PI_NAME: "Jingle Message Initiation", C.PI_IMPORT_NAME: "XEP-0353", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0353"], C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0334"], C.PI_MAIN: "XEP_0353", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of Jingle Message Initiation"""), } class RejectException(exceptions.CancelError): def __init__(self, reason: str, text: str|None = None): super().__init__(text) self.reason = reason class TakenByOtherDeviceException(exceptions.CancelError): reason: str = "taken_by_other_device" def __init__(self, device_jid: jid.JID): super().__init__(device_jid.full()) self.device_jid = device_jid class RetractException(exceptions.CancelError): pass class XEP_0353: def __init__(self, host): log.info(_("plugin {name} initialization").format(name=PLUGIN_INFO[C.PI_NAME])) self.host = host host.register_namespace("jingle-message", NS_JINGLE_MESSAGE) self._j = host.plugins["XEP-0166"] self._h = host.plugins["XEP-0334"] host.trigger.add_with_check( "XEP-0166_initiate_elt_built", self, self._on_initiate_trigger, # this plugin set the resource, we want it to happen first so other triggers # can get the full peer JID priority=host.trigger.MAX_PRIORITY, ) host.trigger.add_with_check( "XEP-0166_terminate", self, self._terminate_trigger, priority=host.trigger.MAX_PRIORITY, ) host.trigger.add("message_received", self._on_message_received) def get_handler(self, client): return Handler() def profile_connecting(self, client): # mapping from session id to deferred used to wait for destinee answer client._xep_0353_pending_sessions = {} def build_message_data(self, client, peer_jid, verb, session_id): mess_data = { "from": client.jid, "to": peer_jid, "uid": "", "message": {}, "type": C.MESS_TYPE_CHAT, "subject": {}, "extra": {}, } client.generate_message_xml(mess_data) message_elt = mess_data["xml"] verb_elt = message_elt.addElement((NS_JINGLE_MESSAGE, verb)) verb_elt["id"] = session_id self._h.add_hint_elements(message_elt, [self._h.HINT_STORE]) return mess_data async def _on_initiate_trigger( self, client: SatXMPPEntity, session: dict, iq_elt: domish.Element, jingle_elt: domish.Element, ) -> bool: peer_jid = session["peer_jid"] if peer_jid.resource: return True try: infos = await self.host.memory.disco.get_infos(client, peer_jid) except error.StanzaError as e: if e.condition == "service-unavailable": categories = {} else: raise e else: categories = {c for c, __ in infos.identities} if "component" in categories: # we don't use message initiation with components return True if peer_jid.userhostJID() not in client.roster: # if the contact is not in our roster, we need to send a directed presence # according to XEP-0353 §3.1 await client.presence.available(peer_jid) mess_data = self.build_message_data(client, peer_jid, "propose", session["id"]) message_elt = mess_data["xml"] for content_data in session["contents"].values(): # we get the full element build by the application plugin jingle_description_elt = content_data["application_data"]["desc_elt"] # we need to copy the element if jingle_description_elt.uri == NS_JINGLE_RTP: # for RTP, we only keep the root <description> element, no children description_elt = domish.Element( (jingle_description_elt.uri, jingle_description_elt.name), defaultUri=jingle_description_elt.defaultUri, attribs=jingle_description_elt.attributes, localPrefixes=jingle_description_elt.localPrefixes, ) else: # Otherwise we keep the children to have application useful data description_elt = element_copy(jingle_description_elt, with_parent=False) message_elt.propose.addChild(description_elt) response_d = defer.Deferred() # we wait for 2 min before cancelling the session init # FIXME: let's application decide timeout? response_d.addTimeout(2 * 60, reactor) client._xep_0353_pending_sessions[session["id"]] = response_d try: await client.send_message_data(mess_data) accepting_jid = await response_d except defer.TimeoutError: log.warning( _("Message initiation with {peer_jid} timed out").format( peer_jid=peer_jid ) ) except exceptions.CancelError as e: for content in session["contents"].values(): await content["application"].handler.jingle_preflight_cancel( client, session, e ) self._j.delete_session(client, session["id"]) return False else: if iq_elt["to"] != accepting_jid.userhost(): raise exceptions.InternalError( f"<jingle> 'to' attribute ({iq_elt['to']!r}) must not differ " f"from bare JID of the accepting entity ({accepting_jid!r}), this " "may be a sign of an internal bug, a hack attempt, or a MITM attack!" ) iq_elt["to"] = accepting_jid.full() session["peer_jid"] = accepting_jid finally: del client._xep_0353_pending_sessions[session["id"]] return True def _terminate_trigger( self, client: SatXMPPEntity, session: dict, reason_elt: domish.Element ) -> bool: session_id = session["id"] try: response_d = client._xep_0353_pending_sessions[session_id] except KeyError: return True # we have a XEP-0353 session, that means that we are retracting a proposed session mess_data = self.build_message_data( client, session["peer_jid"], "retract", session_id ) defer.ensureDeferred(client.send_message_data(mess_data)) response_d.errback(RetractException()) return False async def _on_message_received(self, client, message_elt, post_treat): for elt in message_elt.elements(): if elt.uri == NS_JINGLE_MESSAGE: # We use ensureDeferred to process the message initiation workflow in # parallel and to avoid blocking the message queue. defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt)) return False return True async def _handle_mess_init( self, client: SatXMPPEntity, message_elt: domish.Element, mess_init_elt: domish.Element ) -> None: if mess_init_elt.name == "propose": await self._handle_propose(client, message_elt, mess_init_elt) elif mess_init_elt.name == "retract": self._handle_retract(client, message_elt, mess_init_elt) elif mess_init_elt.name == "proceed": self._handle_proceed(client, message_elt, mess_init_elt) elif mess_init_elt.name == "accept": self._handle_accept(client, message_elt, mess_init_elt) elif mess_init_elt.name == "reject": self._handle_reject(client, message_elt, mess_init_elt) elif mess_init_elt.name == "ringing": await self._handle_ringing(client, message_elt, mess_init_elt) else: log.warning(f"invalid element: {mess_init_elt.toXml}") def _get_sid_and_session_d( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, defer.Deferred|list[defer.Deferred]]: """Retrieve session ID and deferred or list of deferred from response element""" try: session_id = elt["id"] except KeyError as e: assert elt.parent is not None log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") raise e try: session_d = client._xep_0353_pending_sessions[session_id] except KeyError as e: log.warning( _( "no pending session found with id {session_id}, did it timed out?" ).format(session_id=session_id) ) raise e return session_id, session_d def _get_sid_and_response_d( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, defer.Deferred]: """Retrieve session ID and response_d from response element""" session_id, response_d = self._get_sid_and_session_d(client, elt) assert isinstance(response_d, defer.Deferred) return session_id, response_d def _get_sid_and_preflight_d_list( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, list[defer.Deferred]]: """Retrieve session ID and list of preflight_d from response element""" session_id, preflight_d_list = self._get_sid_and_session_d(client, elt) assert isinstance(preflight_d_list, list) return session_id, preflight_d_list async def _handle_propose( self, client: SatXMPPEntity, message_elt: domish.Element, elt: domish.Element ) -> None: peer_jid = jid.JID(message_elt["from"]) local_jid = jid.JID(message_elt["to"]) session_id = elt["id"] try: desc_and_apps = [ (description_elt, self._j.get_application(description_elt.uri)) for description_elt in elt.elements() if description_elt.name == "description" ] if not desc_and_apps: raise AttributeError except AttributeError: log.warning(f"Invalid propose element: {message_elt.toXml()}") return except exceptions.NotFound: log.warning( f"There is not registered application to handle this " f"proposal: {elt.toXml()}" ) return if not desc_and_apps: log.warning("No application specified: {message_elt.toXml()}") return session = self._j.create_session( client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid ) is_in_roster = peer_jid.userhostJID() in client.roster if is_in_roster: # we indicate that device is ringing as explained in # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user # is in roster to avoid presence leak of all our devices. mess_data = self.build_message_data(client, peer_jid, "ringing", session_id) await client.send_message_data(mess_data) try: for description_elt, application in desc_and_apps: try: preflight_d = defer.ensureDeferred( application.handler.jingle_preflight( client, session, description_elt ) ) client._xep_0353_pending_sessions.setdefault(session_id, []).append( preflight_d ) await preflight_d except TakenByOtherDeviceException as e: log.info(f"The call has been takend by {e.device_jid}") await application.handler.jingle_preflight_cancel(client, session, e) self._j.delete_session(client, session_id) return except exceptions.CancelError as e: log.info(f"{client.profile} refused the session: {e}") if is_in_roster: # peer is in our roster, we send reject to them, ou other devices # will get carbon copies reject_dest_jid = peer_jid else: # peer is not in our roster, we send the "reject" only to our own # devices to make them stop ringing/doing notification, and we # don't send anything to peer to avoid presence leak. reject_dest_jid = client.jid.userhostJID() mess_data = self.build_message_data( client, reject_dest_jid, "reject", session_id ) await client.send_message_data(mess_data) self._j.delete_session(client, session_id) return except defer.CancelledError: # raised when call is retracted before user can reply self._j.delete_session(client, session_id) return finally: try: del client._xep_0353_pending_sessions[session_id] except KeyError: pass if peer_jid.userhostJID() not in client.roster: await client.presence.available(peer_jid) mess_data = self.build_message_data(client, peer_jid, "proceed", session_id) await client.send_message_data(mess_data) def _handle_retract(self, client, message_elt, retract_elt): try: session = self._j.get_session(client, retract_elt["id"]) except KeyError: log.warning(f"invalid retract element: {message_elt.toXml()}") return False except exceptions.NotFound: log.warning(f"no session found with ID {retract_elt['id']}") return False log.debug( f"{message_elt['from']} are retracting their proposal {retract_elt['id']}" ) try: cancellable_deferred = session["cancellable_deferred"] if not cancellable_deferred: raise KeyError except KeyError: self._j.delete_session(client, session["id"]) else: for d in cancellable_deferred: d.cancel() return False def _handle_proceed( self, client: SatXMPPEntity, message_elt: domish.Element, proceed_elt: domish.Element ) -> None: from_jid = jid.JID(message_elt["from"]) # session_d is the deferred of the session, it can be preflight_d or response_d if from_jid.userhostJID() == client.jid.userhostJID(): # an other device took the session try: sid, preflight_d_list = self._get_sid_and_preflight_d_list( client, proceed_elt ) except KeyError: return for preflight_d in preflight_d_list: if not preflight_d.called: preflight_d.errback(TakenByOtherDeviceException(from_jid)) try: session = self._j.get_session(client, sid) except exceptions.NotFound: log.warning("No session found with sid {sid!r}.") else: # jingle_preflight_cancel? pass # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always # cleaned correctly (use a timeout?) else: try: __, response_d = self._get_sid_and_response_d(client, proceed_elt) except KeyError: return # we have a response deferred response_d.callback(jid.JID(message_elt["from"])) def _handle_accept(self, client, message_elt, accept_elt): pass def _handle_reject(self, client, message_elt, reject_elt): try: __, response_d = self._get_sid_and_response_d(client, reject_elt) except KeyError: return True reason_elt = self._j.get_reason_elt(reject_elt) reason, text = self._j.parse_reason_elt(reason_elt) if reason is None: reason = "busy" response_d.errback(RejectException(reason, text)) return False async def _handle_ringing(self, client, message_elt, ringing_elt): session_id = ringing_elt["id"] try: session = self._j.get_session(client, session_id) except exceptions.NotFound: log.warning(f"Session {session_id!r} unknown, ignoring ringing.") return False for __, content_data in session["contents"].items(): await content_data["application"].handler.jingle_preflight_info( client, session, "ringing", None ) return False @implementer(iwokkel.IDisco) class Handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_MESSAGE)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []