changeset 4045:ae756bf7c3e8

plugin XEP-0176: Jingle ICE-UDP Transport Method implementation: rel 419
author Goffi <goffi@goffi.org>
date Mon, 15 May 2023 16:23:36 +0200 (19 months ago)
parents 3900626bc100
children 0e3ce379aae3
files sat/plugins/plugin_xep_0176.py
diffstat 1 files changed, 385 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0176.py	Mon May 15 16:23:36 2023 +0200
@@ -0,0 +1,385 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for Jingle (XEP-0176)
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Dict, List, Optional
+import uuid
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.tools.common import data_format
+
+from .plugin_xep_0166 import BaseTransportHandler
+
+log = getLogger(__name__)
+
+NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Jingle ICE-UDP Transport Method",
+    C.PI_IMPORT_NAME: "XEP-0176",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0176"],
+    C.PI_DEPENDENCIES: ["XEP-0166"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "XEP_0176",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""),
+}
+
+
+class XEP_0176(BaseTransportHandler):
+
+    def __init__(self, host):
+        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
+        self.host = host
+        self._j = host.plugins["XEP-0166"]  # shortcut to access jingle
+        self._j.register_transport(
+            NS_JINGLE_ICE_UDP, self._j.TRANSPORT_DATAGRAM, self, 100
+        )
+        host.bridge.add_method(
+            "ice_candidates_add",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._ice_candidates_add,
+            async_=True,
+        )
+        host.bridge.add_signal(
+            "ice_candidates_new", ".plugin", signature="sss"
+        )  # args: jingle_sid, candidates_serialised, profile
+        host.bridge.add_signal(
+            "ice_restart", ".plugin", signature="sss"
+        )  # args: jingle_sid, side ("local" or "peer"), profile
+
+    def get_handler(self, client):
+        return XEP_0176_handler()
+
+    def _ice_candidates_add(
+        self,
+        session_id: str,
+        media_ice_data_s: str,
+        profile_key: str,
+    ):
+        client = self.host.get_client(profile_key)
+        return defer.ensureDeferred(self.ice_candidates_add(
+            client,
+            session_id,
+            data_format.deserialise(media_ice_data_s),
+        ))
+
+    def build_transport(self, ice_data: dict) -> domish.Element:
+        """Generate <transport> element from ICE data
+
+        @param ice_data: a dict containing the following keys:
+            - "ufrag" (str): The ICE username fragment.
+            - "pwd" (str): The ICE password.
+            - "candidates" (List[dict]): A list of ICE candidate dictionaries, each
+              containing:
+                - "component_id" (int): The component ID.
+                - "foundation" (str): The candidate foundation.
+                - "address" (str): The candidate IP address.
+                - "port" (int): The candidate port.
+                - "priority" (int): The candidate priority.
+                - "transport" (str): The candidate transport protocol, e.g., "udp".
+                - "type" (str): The candidate type, e.g., "host", "srflx", "prflx", or
+                  "relay".
+                - "generation" (str, optional): The candidate generation. Defaults to "0".
+                - "network" (str, optional): The candidate network. Defaults to "0".
+                - "rel_addr" (str, optional): The related address for the candidate, if
+                  any.
+                - "rel_port" (int, optional): The related port for the candidate, if any.
+
+        @return: A <transport> element.
+        """
+        try:
+            ufrag: str = ice_data["ufrag"]
+            pwd: str = ice_data["pwd"]
+            candidates: List[dict] = ice_data["candidates"]
+        except KeyError as e:
+            raise exceptions.DataError(f"ICE {e} must be provided")
+
+        candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True)
+        transport_elt = domish.Element(
+            (NS_JINGLE_ICE_UDP, "transport"),
+            attribs={"ufrag": ufrag, "pwd": pwd}
+        )
+
+        for candidate in candidates:
+            try:
+                candidate_elt = transport_elt.addElement("candidate")
+                candidate_elt["component"] = str(candidate["component_id"])
+                candidate_elt["foundation"] = candidate["foundation"]
+                candidate_elt["generation"] = str(candidate.get("generation", "0"))
+                candidate_elt["id"] = candidate.get("id") or str(uuid.uuid4())
+                candidate_elt["ip"] = candidate["address"]
+                candidate_elt["network"] = str(candidate.get("network", "0"))
+                candidate_elt["port"] = str(candidate["port"])
+                candidate_elt["priority"] = str(candidate["priority"])
+                candidate_elt["protocol"] = candidate["transport"]
+                candidate_elt["type"] = candidate["type"]
+            except KeyError as e:
+                raise exceptions.DataError(
+                    f"Mandatory ICE candidate attribute {e} is missing"
+                )
+
+            if "rel_addr" in candidate and "rel_port" in candidate:
+                candidate_elt["rel-addr"] = candidate["rel_addr"]
+                candidate_elt["rel-port"] = str(candidate["rel_port"])
+
+        self.host.trigger.point("XEP-0176_build_transport", transport_elt, ice_data)
+
+        return transport_elt
+
+    def parse_transport(self, transport_elt: domish.Element) -> dict:
+        """Parse <transport> to a dict
+
+        @param transport_elt: <transport> element
+        @return: ICE data (as in [build_description])
+        """
+        try:
+            ice_data = {
+                "ufrag": transport_elt["ufrag"],
+                "pwd": transport_elt["pwd"]
+            }
+        except KeyError as e:
+            raise exceptions.DataError(
+                f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}"
+            )
+        ice_data["candidates"] = ice_candidates = []
+
+        for candidate_elt in transport_elt.elements(NS_JINGLE_ICE_UDP, "candidate"):
+            try:
+                candidate = {
+                    "component_id": int(candidate_elt["component"]),
+                    "foundation": candidate_elt["foundation"],
+                    "address": candidate_elt["ip"],
+                    "port": int(candidate_elt["port"]),
+                    "priority": int(candidate_elt["priority"]),
+                    "transport": candidate_elt["protocol"],
+                    "type": candidate_elt["type"],
+                }
+            except KeyError as e:
+                raise exceptions.DataError(
+                    f"Mandatory attribute {e} is missing in candidate element"
+                )
+
+            if candidate_elt.hasAttribute("generation"):
+                candidate["generation"] = candidate_elt["generation"]
+
+            if candidate_elt.hasAttribute("network"):
+                candidate["network"] = candidate_elt["network"]
+
+            if candidate_elt.hasAttribute("rel-addr"):
+                candidate["rel_addr"] = candidate_elt["rel-addr"]
+
+            if candidate_elt.hasAttribute("rel-port"):
+                candidate["rel_port"] = int(candidate_elt["rel-port"])
+
+            ice_candidates.append(candidate)
+
+        self.host.trigger.point("XEP-0176_parse_transport", transport_elt, ice_data)
+
+        return ice_data
+
+    async def jingle_session_init(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        content_name: str,
+    ) -> domish.Element:
+        """Create a Jingle session initiation transport element with ICE candidates.
+
+        @param client: SatXMPPEntity object representing the client.
+        @param session: Dictionary containing session data.
+        @param content_name: Name of the content.
+        @param ufrag: ICE username fragment.
+        @param pwd: ICE password.
+        @param candidates: List of ICE candidate dictionaries parsed from the
+            parse_ice_candidate method.
+
+        @return: domish.Element representing the Jingle transport element.
+
+        @raise exceptions.DataError: If mandatory data is missing from the candidates.
+        """
+        content_data = session["contents"][content_name]
+        transport_data = content_data["transport_data"]
+        ice_data = transport_data["local_ice_data"]
+        return self.build_transport(ice_data)
+
+    async def jingle_handler(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        transport_elt: domish.Element,
+    ) -> domish.Element:
+        """Handle Jingle requests
+
+        @param client: The SatXMPPEntity instance.
+        @param action: The action to be performed with the session.
+        @param session: A dictionary containing the session information.
+        @param content_name: The name of the content.
+        @param transport_elt: The domish.Element instance representing the transport
+            element.
+
+        @return: <transport> element
+        """
+        content_data = session["contents"][content_name]
+        transport_data = content_data["transport_data"]
+        if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
+            peer_ice_data = self.parse_transport(transport_elt)
+            transport_data["peer_ice_data"] = peer_ice_data
+
+        elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
+            pass
+
+        elif action == self._j.A_SESSION_ACCEPT:
+            pass
+
+        elif action == self._j.A_START:
+            pass
+
+        elif action == self._j.A_SESSION_INITIATE:
+            # responder side, we give our candidates
+            transport_elt = self.build_transport(transport_data["local_ice_data"])
+        elif action == self._j.A_TRANSPORT_INFO:
+            new_ice_data = self.parse_transport(transport_elt)
+            restart = self.update_candidates(transport_data, new_ice_data, local=False)
+            if restart:
+                log.debug(
+                    f"Peer ICE restart detected on session {session['id']} "
+                    f"[{client.profile}]"
+                )
+                self.host.bridge.ice_restart(session["id"], "peer", client.profile)
+
+            self.host.bridge.ice_candidates_new(
+                session["id"],
+                data_format.serialise(new_ice_data["candidates"]),
+                client.profile
+            )
+        elif action == self._j.A_DESTROY:
+           pass
+        else:
+            log.warning("FIXME: unmanaged action {}".format(action))
+
+        return transport_elt
+
+    def jingle_terminate(self, client, action, session, content_name, reason_elt):
+        log.debug("ICE-UDP session terminated")
+
+    def update_candidates(
+        self,
+        transport_data: dict,
+        new_ice_data: dict,
+        local: bool
+    ) -> bool:
+        """Update ICE candidates when new one are received
+
+        @param transport_data: transport_data of the content linked to the candidates
+        @param new_ice_data: new ICE data, in the same format as returned
+            by [self.parse_transport]
+        @param local: True if it's our candidates, False if it's peer ones
+        @return: True if there is a ICE restart
+        """
+        key = "local_ice_data" if local else "peer_ice_data"
+        try:
+            ice_data = transport_data[key]
+        except KeyError:
+            log.warning(
+                f"no {key} available"
+            )
+            transport_data[key] = new_ice_data
+        else:
+            if (
+                new_ice_data["ufrag"] != ice_data["ufrag"]
+                or new_ice_data["pwd"] != ice_data["pwd"]
+            ):
+                ice_data["ufrag"] = new_ice_data["ufrag"]
+                ice_data["pwd"] = new_ice_data["pwd"]
+                ice_data["candidates"] = new_ice_data["candidates"]
+                return True
+        return False
+
+    async def ice_candidates_add(
+        self,
+        client: SatXMPPEntity,
+        session_id: str,
+        media_ice_data: Dict[str, dict]
+    ) -> None:
+        """Called when a new ICE candidates are available for a session
+
+        @param session_id: Session ID
+        @param candidates: a map from media type (audio, video) to ICE data
+            ICE data must be in the same format as in [self.parse_transport]
+        """
+        session = self._j.get_session(client, session_id)
+        iq_elt: Optional[domish.Element] = None
+
+        for media_type, new_ice_data in media_ice_data.items():
+            for content_name, content_data in session["contents"].items():
+                if content_data["application_data"].get("media") == media_type:
+                    break
+            else:
+                log.warning(
+                    "no media of type {media_type} has been found"
+                )
+                continue
+            restart = self.update_candidates(
+                content_data["transport_data"], new_ice_data, True
+            )
+            if restart:
+                log.debug(
+                    f"Local ICE restart detected on session {session['id']} "
+                    f"[{client.profile}]"
+                )
+                self.host.bridge.ice_restart(session["id"], "local", client.profile)
+            transport_elt = self.build_transport(new_ice_data)
+            iq_elt, __ = self._j.build_action(
+                client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt,
+                transport_elt=transport_elt
+            )
+
+        if iq_elt is not None:
+            try:
+                await iq_elt.send()
+            except Exception as e:
+                log.warning(f"Could not send new ICE candidates: {e}")
+
+        else:
+            log.error("Could not find any content to apply new ICE candidates")
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0176_handler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []