Mercurial > libervia-backend
changeset 4045:ae756bf7c3e8
plugin XEP-0176: Jingle ICE-UDP Transport Method implementation:
rel 419
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 15 May 2023 16:23:36 +0200 (19 months ago) |
parents | 3900626bc100 |
children | 0e3ce379aae3 |
files | sat/plugins/plugin_xep_0176.py |
diffstat | 1 files changed, 385 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0176.py Mon May 15 16:23:36 2023 +0200 @@ -0,0 +1,385 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Jingle (XEP-0176) +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Dict, List, Optional +import uuid + +from twisted.internet import defer +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.tools.common import data_format + +from .plugin_xep_0166 import BaseTransportHandler + +log = getLogger(__name__) + +NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1" + +PLUGIN_INFO = { + C.PI_NAME: "Jingle ICE-UDP Transport Method", + C.PI_IMPORT_NAME: "XEP-0176", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0176"], + C.PI_DEPENDENCIES: ["XEP-0166"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0176", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""), +} + + +class XEP_0176(BaseTransportHandler): + + def __init__(self, host): + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + self.host = host + self._j = host.plugins["XEP-0166"] # shortcut to access jingle + self._j.register_transport( + NS_JINGLE_ICE_UDP, self._j.TRANSPORT_DATAGRAM, self, 100 + ) + host.bridge.add_method( + "ice_candidates_add", + ".plugin", + in_sign="sss", + out_sign="", + method=self._ice_candidates_add, + async_=True, + ) + host.bridge.add_signal( + "ice_candidates_new", ".plugin", signature="sss" + ) # args: jingle_sid, candidates_serialised, profile + host.bridge.add_signal( + "ice_restart", ".plugin", signature="sss" + ) # args: jingle_sid, side ("local" or "peer"), profile + + def get_handler(self, client): + return XEP_0176_handler() + + def _ice_candidates_add( + self, + session_id: str, + media_ice_data_s: str, + profile_key: str, + ): + client = self.host.get_client(profile_key) + return defer.ensureDeferred(self.ice_candidates_add( + client, + session_id, + data_format.deserialise(media_ice_data_s), + )) + + def build_transport(self, ice_data: dict) -> domish.Element: + """Generate <transport> element from ICE data + + @param ice_data: a dict containing the following keys: + - "ufrag" (str): The ICE username fragment. + - "pwd" (str): The ICE password. + - "candidates" (List[dict]): A list of ICE candidate dictionaries, each + containing: + - "component_id" (int): The component ID. + - "foundation" (str): The candidate foundation. + - "address" (str): The candidate IP address. + - "port" (int): The candidate port. + - "priority" (int): The candidate priority. + - "transport" (str): The candidate transport protocol, e.g., "udp". + - "type" (str): The candidate type, e.g., "host", "srflx", "prflx", or + "relay". + - "generation" (str, optional): The candidate generation. Defaults to "0". + - "network" (str, optional): The candidate network. Defaults to "0". + - "rel_addr" (str, optional): The related address for the candidate, if + any. + - "rel_port" (int, optional): The related port for the candidate, if any. + + @return: A <transport> element. + """ + try: + ufrag: str = ice_data["ufrag"] + pwd: str = ice_data["pwd"] + candidates: List[dict] = ice_data["candidates"] + except KeyError as e: + raise exceptions.DataError(f"ICE {e} must be provided") + + candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True) + transport_elt = domish.Element( + (NS_JINGLE_ICE_UDP, "transport"), + attribs={"ufrag": ufrag, "pwd": pwd} + ) + + for candidate in candidates: + try: + candidate_elt = transport_elt.addElement("candidate") + candidate_elt["component"] = str(candidate["component_id"]) + candidate_elt["foundation"] = candidate["foundation"] + candidate_elt["generation"] = str(candidate.get("generation", "0")) + candidate_elt["id"] = candidate.get("id") or str(uuid.uuid4()) + candidate_elt["ip"] = candidate["address"] + candidate_elt["network"] = str(candidate.get("network", "0")) + candidate_elt["port"] = str(candidate["port"]) + candidate_elt["priority"] = str(candidate["priority"]) + candidate_elt["protocol"] = candidate["transport"] + candidate_elt["type"] = candidate["type"] + except KeyError as e: + raise exceptions.DataError( + f"Mandatory ICE candidate attribute {e} is missing" + ) + + if "rel_addr" in candidate and "rel_port" in candidate: + candidate_elt["rel-addr"] = candidate["rel_addr"] + candidate_elt["rel-port"] = str(candidate["rel_port"]) + + self.host.trigger.point("XEP-0176_build_transport", transport_elt, ice_data) + + return transport_elt + + def parse_transport(self, transport_elt: domish.Element) -> dict: + """Parse <transport> to a dict + + @param transport_elt: <transport> element + @return: ICE data (as in [build_description]) + """ + try: + ice_data = { + "ufrag": transport_elt["ufrag"], + "pwd": transport_elt["pwd"] + } + except KeyError as e: + raise exceptions.DataError( + f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}" + ) + ice_data["candidates"] = ice_candidates = [] + + for candidate_elt in transport_elt.elements(NS_JINGLE_ICE_UDP, "candidate"): + try: + candidate = { + "component_id": int(candidate_elt["component"]), + "foundation": candidate_elt["foundation"], + "address": candidate_elt["ip"], + "port": int(candidate_elt["port"]), + "priority": int(candidate_elt["priority"]), + "transport": candidate_elt["protocol"], + "type": candidate_elt["type"], + } + except KeyError as e: + raise exceptions.DataError( + f"Mandatory attribute {e} is missing in candidate element" + ) + + if candidate_elt.hasAttribute("generation"): + candidate["generation"] = candidate_elt["generation"] + + if candidate_elt.hasAttribute("network"): + candidate["network"] = candidate_elt["network"] + + if candidate_elt.hasAttribute("rel-addr"): + candidate["rel_addr"] = candidate_elt["rel-addr"] + + if candidate_elt.hasAttribute("rel-port"): + candidate["rel_port"] = int(candidate_elt["rel-port"]) + + ice_candidates.append(candidate) + + self.host.trigger.point("XEP-0176_parse_transport", transport_elt, ice_data) + + return ice_data + + async def jingle_session_init( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + ) -> domish.Element: + """Create a Jingle session initiation transport element with ICE candidates. + + @param client: SatXMPPEntity object representing the client. + @param session: Dictionary containing session data. + @param content_name: Name of the content. + @param ufrag: ICE username fragment. + @param pwd: ICE password. + @param candidates: List of ICE candidate dictionaries parsed from the + parse_ice_candidate method. + + @return: domish.Element representing the Jingle transport element. + + @raise exceptions.DataError: If mandatory data is missing from the candidates. + """ + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + ice_data = transport_data["local_ice_data"] + return self.build_transport(ice_data) + + async def jingle_handler( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + transport_elt: domish.Element, + ) -> domish.Element: + """Handle Jingle requests + + @param client: The SatXMPPEntity instance. + @param action: The action to be performed with the session. + @param session: A dictionary containing the session information. + @param content_name: The name of the content. + @param transport_elt: The domish.Element instance representing the transport + element. + + @return: <transport> element + """ + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): + peer_ice_data = self.parse_transport(transport_elt) + transport_data["peer_ice_data"] = peer_ice_data + + elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): + pass + + elif action == self._j.A_SESSION_ACCEPT: + pass + + elif action == self._j.A_START: + pass + + elif action == self._j.A_SESSION_INITIATE: + # responder side, we give our candidates + transport_elt = self.build_transport(transport_data["local_ice_data"]) + elif action == self._j.A_TRANSPORT_INFO: + new_ice_data = self.parse_transport(transport_elt) + restart = self.update_candidates(transport_data, new_ice_data, local=False) + if restart: + log.debug( + f"Peer ICE restart detected on session {session['id']} " + f"[{client.profile}]" + ) + self.host.bridge.ice_restart(session["id"], "peer", client.profile) + + self.host.bridge.ice_candidates_new( + session["id"], + data_format.serialise(new_ice_data["candidates"]), + client.profile + ) + elif action == self._j.A_DESTROY: + pass + else: + log.warning("FIXME: unmanaged action {}".format(action)) + + return transport_elt + + def jingle_terminate(self, client, action, session, content_name, reason_elt): + log.debug("ICE-UDP session terminated") + + def update_candidates( + self, + transport_data: dict, + new_ice_data: dict, + local: bool + ) -> bool: + """Update ICE candidates when new one are received + + @param transport_data: transport_data of the content linked to the candidates + @param new_ice_data: new ICE data, in the same format as returned + by [self.parse_transport] + @param local: True if it's our candidates, False if it's peer ones + @return: True if there is a ICE restart + """ + key = "local_ice_data" if local else "peer_ice_data" + try: + ice_data = transport_data[key] + except KeyError: + log.warning( + f"no {key} available" + ) + transport_data[key] = new_ice_data + else: + if ( + new_ice_data["ufrag"] != ice_data["ufrag"] + or new_ice_data["pwd"] != ice_data["pwd"] + ): + ice_data["ufrag"] = new_ice_data["ufrag"] + ice_data["pwd"] = new_ice_data["pwd"] + ice_data["candidates"] = new_ice_data["candidates"] + return True + return False + + async def ice_candidates_add( + self, + client: SatXMPPEntity, + session_id: str, + media_ice_data: Dict[str, dict] + ) -> None: + """Called when a new ICE candidates are available for a session + + @param session_id: Session ID + @param candidates: a map from media type (audio, video) to ICE data + ICE data must be in the same format as in [self.parse_transport] + """ + session = self._j.get_session(client, session_id) + iq_elt: Optional[domish.Element] = None + + for media_type, new_ice_data in media_ice_data.items(): + for content_name, content_data in session["contents"].items(): + if content_data["application_data"].get("media") == media_type: + break + else: + log.warning( + "no media of type {media_type} has been found" + ) + continue + restart = self.update_candidates( + content_data["transport_data"], new_ice_data, True + ) + if restart: + log.debug( + f"Local ICE restart detected on session {session['id']} " + f"[{client.profile}]" + ) + self.host.bridge.ice_restart(session["id"], "local", client.profile) + transport_elt = self.build_transport(new_ice_data) + iq_elt, __ = self._j.build_action( + client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt, + transport_elt=transport_elt + ) + + if iq_elt is not None: + try: + await iq_elt.send() + except Exception as e: + log.warning(f"Could not send new ICE candidates: {e}") + + else: + log.error("Could not find any content to apply new ICE candidates") + + +@implementer(iwokkel.IDisco) +class XEP_0176_handler(XMPPHandler): + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []