Mercurial > libervia-backend
view sat/plugins/plugin_xep_0176.py @ 4064:08ee0e623e7e
plugin XEP-0338: "Jingle Grouping Framework" implementation:
rel 440
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 30 May 2023 17:59:20 +0200 |
parents | 4c8bf67bfbeb |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Jingle (XEP-0176) # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Dict, List, Optional import uuid from twisted.internet import defer from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from sat.core import exceptions from sat.core.constants import Const as C from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger from sat.tools.common import data_format from .plugin_xep_0166 import BaseTransportHandler log = getLogger(__name__) NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1" PLUGIN_INFO = { C.PI_NAME: "Jingle ICE-UDP Transport Method", C.PI_IMPORT_NAME: "XEP-0176", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0176"], C.PI_DEPENDENCIES: ["XEP-0166"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0176", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""), } class XEP_0176(BaseTransportHandler): def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._j = host.plugins["XEP-0166"] # shortcut to access jingle self._j.register_transport( NS_JINGLE_ICE_UDP, self._j.TRANSPORT_DATAGRAM, self, 100 ) host.bridge.add_method( "ice_candidates_add", ".plugin", in_sign="sss", out_sign="", method=self._ice_candidates_add, async_=True, ) host.bridge.add_signal( "ice_candidates_new", ".plugin", signature="sss" ) # args: jingle_sid, candidates_serialised, profile host.bridge.add_signal( "ice_restart", ".plugin", signature="sss" ) # args: jingle_sid, side ("local" or "peer"), profile def get_handler(self, client): return XEP_0176_handler() def _ice_candidates_add( self, session_id: str, media_ice_data_s: str, profile_key: str, ): client = self.host.get_client(profile_key) return defer.ensureDeferred(self.ice_candidates_add( client, session_id, data_format.deserialise(media_ice_data_s), )) def build_transport(self, ice_data: dict) -> domish.Element: """Generate <transport> element from ICE data @param ice_data: a dict containing the following keys: - "ufrag" (str): The ICE username fragment. - "pwd" (str): The ICE password. - "candidates" (List[dict]): A list of ICE candidate dictionaries, each containing: - "component_id" (int): The component ID. - "foundation" (str): The candidate foundation. - "address" (str): The candidate IP address. - "port" (int): The candidate port. - "priority" (int): The candidate priority. - "transport" (str): The candidate transport protocol, e.g., "udp". - "type" (str): The candidate type, e.g., "host", "srflx", "prflx", or "relay". - "generation" (str, optional): The candidate generation. Defaults to "0". - "network" (str, optional): The candidate network. Defaults to "0". - "rel_addr" (str, optional): The related address for the candidate, if any. - "rel_port" (int, optional): The related port for the candidate, if any. @return: A <transport> element. """ try: ufrag: str = ice_data["ufrag"] pwd: str = ice_data["pwd"] candidates: List[dict] = ice_data["candidates"] except KeyError as e: raise exceptions.DataError(f"ICE {e} must be provided") candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True) transport_elt = domish.Element( (NS_JINGLE_ICE_UDP, "transport"), attribs={"ufrag": ufrag, "pwd": pwd} ) for candidate in candidates: try: candidate_elt = transport_elt.addElement("candidate") candidate_elt["component"] = str(candidate["component_id"]) candidate_elt["foundation"] = candidate["foundation"] candidate_elt["generation"] = str(candidate.get("generation", "0")) candidate_elt["id"] = candidate.get("id") or str(uuid.uuid4()) candidate_elt["ip"] = candidate["address"] candidate_elt["network"] = str(candidate.get("network", "0")) candidate_elt["port"] = str(candidate["port"]) candidate_elt["priority"] = str(candidate["priority"]) candidate_elt["protocol"] = candidate["transport"] candidate_elt["type"] = candidate["type"] except KeyError as e: raise exceptions.DataError( f"Mandatory ICE candidate attribute {e} is missing" ) if "rel_addr" in candidate and "rel_port" in candidate: candidate_elt["rel-addr"] = candidate["rel_addr"] candidate_elt["rel-port"] = str(candidate["rel_port"]) self.host.trigger.point("XEP-0176_build_transport", transport_elt, ice_data) return transport_elt def parse_transport(self, transport_elt: domish.Element) -> dict: """Parse <transport> to a dict @param transport_elt: <transport> element @return: ICE data (as in [build_transport]) """ try: ice_data = { "ufrag": transport_elt["ufrag"], "pwd": transport_elt["pwd"] } except KeyError as e: raise exceptions.DataError( f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}" ) ice_data["candidates"] = ice_candidates = [] for candidate_elt in transport_elt.elements(NS_JINGLE_ICE_UDP, "candidate"): try: candidate = { "component_id": int(candidate_elt["component"]), "foundation": candidate_elt["foundation"], "address": candidate_elt["ip"], "port": int(candidate_elt["port"]), "priority": int(candidate_elt["priority"]), "transport": candidate_elt["protocol"], "type": candidate_elt["type"], } except KeyError as e: raise exceptions.DataError( f"Mandatory attribute {e} is missing in candidate element" ) if candidate_elt.hasAttribute("generation"): candidate["generation"] = candidate_elt["generation"] if candidate_elt.hasAttribute("network"): candidate["network"] = candidate_elt["network"] if candidate_elt.hasAttribute("rel-addr"): candidate["rel_addr"] = candidate_elt["rel-addr"] if candidate_elt.hasAttribute("rel-port"): candidate["rel_port"] = int(candidate_elt["rel-port"]) ice_candidates.append(candidate) self.host.trigger.point("XEP-0176_parse_transport", transport_elt, ice_data) return ice_data async def jingle_session_init( self, client: SatXMPPEntity, session: dict, content_name: str, ) -> domish.Element: """Create a Jingle session initiation transport element with ICE candidates. @param client: SatXMPPEntity object representing the client. @param session: Dictionary containing session data. @param content_name: Name of the content. @param ufrag: ICE username fragment. @param pwd: ICE password. @param candidates: List of ICE candidate dictionaries parsed from the parse_ice_candidate method. @return: domish.Element representing the Jingle transport element. @raise exceptions.DataError: If mandatory data is missing from the candidates. """ content_data = session["contents"][content_name] transport_data = content_data["transport_data"] ice_data = transport_data["local_ice_data"] return self.build_transport(ice_data) async def jingle_handler( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, transport_elt: domish.Element, ) -> domish.Element: """Handle Jingle requests @param client: The SatXMPPEntity instance. @param action: The action to be performed with the session. @param session: A dictionary containing the session information. @param content_name: The name of the content. @param transport_elt: The domish.Element instance representing the transport element. @return: <transport> element """ content_data = session["contents"][content_name] transport_data = content_data["transport_data"] if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): peer_ice_data = self.parse_transport(transport_elt) transport_data["peer_ice_data"] = peer_ice_data elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): pass elif action == self._j.A_SESSION_ACCEPT: pass elif action == self._j.A_START: pass elif action == self._j.A_SESSION_INITIATE: # responder side, we give our candidates transport_elt = self.build_transport(transport_data["local_ice_data"]) elif action == self._j.A_TRANSPORT_INFO: media_type = content_data["application_data"].get("media") new_ice_data = self.parse_transport(transport_elt) restart = self.update_candidates(transport_data, new_ice_data, local=False) if restart: log.debug( f"Peer ICE restart detected on session {session['id']} " f"[{client.profile}]" ) self.host.bridge.ice_restart(session["id"], "peer", client.profile) self.host.bridge.ice_candidates_new( session["id"], data_format.serialise({media_type: new_ice_data}), client.profile ) elif action == self._j.A_DESTROY: pass else: log.warning("FIXME: unmanaged action {}".format(action)) return transport_elt def jingle_terminate( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, reason_elt: domish.Element, ) -> None: log.debug("ICE-UDP session terminated") def update_candidates( self, transport_data: dict, new_ice_data: dict, local: bool ) -> bool: """Update ICE candidates when new one are received @param transport_data: transport_data of the content linked to the candidates @param new_ice_data: new ICE data, in the same format as returned by [self.parse_transport] @param local: True if it's our candidates, False if it's peer ones @return: True if there is a ICE restart """ key = "local_ice_data" if local else "peer_ice_data" try: ice_data = transport_data[key] except KeyError: log.warning( f"no {key} available" ) transport_data[key] = new_ice_data else: if ( new_ice_data["ufrag"] != ice_data["ufrag"] or new_ice_data["pwd"] != ice_data["pwd"] ): ice_data["ufrag"] = new_ice_data["ufrag"] ice_data["pwd"] = new_ice_data["pwd"] ice_data["candidates"] = new_ice_data["candidates"] return True return False async def ice_candidates_add( self, client: SatXMPPEntity, session_id: str, media_ice_data: Dict[str, dict] ) -> None: """Called when a new ICE candidates are available for a session @param session_id: Session ID @param candidates: a map from media type (audio, video) to ICE data ICE data must be in the same format as in [self.parse_transport] """ session = self._j.get_session(client, session_id) iq_elt: Optional[domish.Element] = None for media_type, new_ice_data in media_ice_data.items(): for content_name, content_data in session["contents"].items(): if content_data["application_data"].get("media") == media_type: break else: log.warning( "no media of type {media_type} has been found" ) continue restart = self.update_candidates( content_data["transport_data"], new_ice_data, True ) if restart: log.debug( f"Local ICE restart detected on session {session['id']} " f"[{client.profile}]" ) self.host.bridge.ice_restart(session["id"], "local", client.profile) transport_elt = self.build_transport(new_ice_data) iq_elt, __ = self._j.build_action( client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt, transport_elt=transport_elt ) if iq_elt is not None: try: await iq_elt.send() except Exception as e: log.warning(f"Could not send new ICE candidates: {e}") else: log.error("Could not find any content to apply new ICE candidates") @implementer(iwokkel.IDisco) class XEP_0176_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []