diff libervia/backend/plugins/plugin_xep_0176.py @ 4116:23fa52acf72c

plugin XEP-0167, XEP-0176: transport-info and ICE candidate sending are delayed if session is not active yet
author Goffi <goffi@goffi.org>
date Mon, 21 Aug 2023 15:19:45 +0200
parents 4b842c1fb686
children e11b13418ba6
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0176.py	Wed Aug 16 18:33:28 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0176.py	Mon Aug 21 15:19:45 2023 +0200
@@ -36,7 +36,7 @@
 
 log = getLogger(__name__)
 
-NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1"
+NS_JINGLE_ICE_UDP = "urn:xmpp:jingle:transports:ice-udp:1"
 
 PLUGIN_INFO = {
     C.PI_NAME: "Jingle ICE-UDP Transport Method",
@@ -53,7 +53,6 @@
 
 
 class XEP_0176(BaseTransportHandler):
-
     def __init__(self, host):
         log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
         self.host = host
@@ -86,11 +85,13 @@
         profile_key: str,
     ):
         client = self.host.get_client(profile_key)
-        return defer.ensureDeferred(self.ice_candidates_add(
-            client,
-            session_id,
-            data_format.deserialise(media_ice_data_s),
-        ))
+        return defer.ensureDeferred(
+            self.ice_candidates_add(
+                client,
+                session_id,
+                data_format.deserialise(media_ice_data_s),
+            )
+        )
 
     def build_transport(self, ice_data: dict) -> domish.Element:
         """Generate <transport> element from ICE data
@@ -119,14 +120,13 @@
         try:
             ufrag: str = ice_data["ufrag"]
             pwd: str = ice_data["pwd"]
-            candidates: List[dict] = ice_data["candidates"]
         except KeyError as e:
             raise exceptions.DataError(f"ICE {e} must be provided")
+        candidates: List[dict] = ice_data.get("candidates", [])
 
         candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True)
         transport_elt = domish.Element(
-            (NS_JINGLE_ICE_UDP, "transport"),
-            attribs={"ufrag": ufrag, "pwd": pwd}
+            (NS_JINGLE_ICE_UDP, "transport"), attribs={"ufrag": ufrag, "pwd": pwd}
         )
 
         for candidate in candidates:
@@ -162,10 +162,7 @@
         @return: ICE data (as in [build_transport])
         """
         try:
-            ice_data = {
-                "ufrag": transport_elt["ufrag"],
-                "pwd": transport_elt["pwd"]
-            }
+            ice_data = {"ufrag": transport_elt["ufrag"], "pwd": transport_elt["pwd"]}
         except KeyError as e:
             raise exceptions.DataError(
                 f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}"
@@ -256,11 +253,36 @@
             peer_ice_data = self.parse_transport(transport_elt)
             transport_data["peer_ice_data"] = peer_ice_data
 
-        elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
+        elif action == self._j.A_ACCEPTED_ACK:
+            buffer = session.pop("XEP-0176_handler_buffer", None)
+            if buffer:
+                log.debug("replaying buffered events")
+                for args in buffer:
+                    await self.jingle_handler(*args)
+        elif action == self._j.A_PREPARE_RESPONDER:
             pass
 
         elif action == self._j.A_SESSION_ACCEPT:
-            pass
+            # we check if we have any buffered ICE candidates, and send them if it's the
+            # case
+            media_type = content_data["application_data"].get("media")
+            try:
+                buffer = session["XEP-0176_buffer"]
+                buffered_ice_data = buffer.pop(media_type)
+            except KeyError:
+                pass
+            else:
+                if not buffer:
+                    del session["XEP-0176_buffer"]
+                transport_elt = self.build_transport(buffered_ice_data)
+                iq_elt, __ = self._j.build_action(
+                    client,
+                    self._j.A_TRANSPORT_INFO,
+                    session,
+                    content_name,
+                    context_elt=transport_elt,
+                )
+                await iq_elt.send()
 
         elif action == self._j.A_START:
             pass
@@ -269,6 +291,13 @@
             # responder side, we give our candidates
             transport_elt = self.build_transport(transport_data["local_ice_data"])
         elif action == self._j.A_TRANSPORT_INFO:
+            if session["state"] == self._j.STATE_PENDING:
+                # Session is not yet active; we buffer the arguments to replay them
+                # when the session becomes active. This makes the frontend's life easier.
+                log.debug("session is not active yet, buffering transport-info element")
+                buffer = session.setdefault("XEP-0176_handler_buffer", [])
+                buffer.append([client, action, session, content_name, transport_elt])
+                return transport_elt
 
             media_type = content_data["application_data"].get("media")
             new_ice_data = self.parse_transport(transport_elt)
@@ -283,10 +312,10 @@
             self.host.bridge.ice_candidates_new(
                 session["id"],
                 data_format.serialise({media_type: new_ice_data}),
-                client.profile
+                client.profile,
             )
         elif action == self._j.A_DESTROY:
-           pass
+            pass
         else:
             log.warning("FIXME: unmanaged action {}".format(action))
 
@@ -303,10 +332,7 @@
         log.debug("ICE-UDP session terminated")
 
     def update_candidates(
-        self,
-        transport_data: dict,
-        new_ice_data: dict,
-        local: bool
+        self, transport_data: dict, new_ice_data: dict, local: bool
     ) -> bool:
         """Update ICE candidates when new one are received
 
@@ -320,9 +346,7 @@
         try:
             ice_data = transport_data[key]
         except KeyError:
-            log.warning(
-                f"no {key} available"
-            )
+            log.warning(f"no {key} available")
             transport_data[key] = new_ice_data
         else:
             if (
@@ -336,28 +360,45 @@
         return False
 
     async def ice_candidates_add(
-        self,
-        client: SatXMPPEntity,
-        session_id: str,
-        media_ice_data: Dict[str, dict]
+        self, client: SatXMPPEntity, session_id: str, media_ice_data: Dict[str, dict]
     ) -> None:
         """Called when a new ICE candidates are available for a session
 
         @param session_id: Session ID
-        @param candidates: a map from media type (audio, video) to ICE data
+        @param media_ice_data: a map from media type (audio, video) to ICE data
             ICE data must be in the same format as in [self.parse_transport]
         """
         session = self._j.get_session(client, session_id)
         iq_elt: Optional[domish.Element] = None
 
         for media_type, new_ice_data in media_ice_data.items():
+            if session["state"] == self._j.STATE_PENDING:
+                log.debug(f"session not active, buffering")
+                buffer = session.setdefault("XEP-0176_buffer", {})
+                media_buffer = buffer.setdefault(media_type, {})
+
+                for key in ["ufrag", "pwd"]:
+                    if key not in media_buffer:
+                        media_buffer[key] = new_ice_data[key]
+                    else:
+                        if media_buffer[key] != new_ice_data[key]:
+                            log.warning(
+                                f"{key} conflict, new value will replace old one\n"
+                                f"buffer={media_buffer[key]!r}\n"
+                                f"new={new_ice_data[key]!r}"
+                            )
+                            media_buffer[key] = new_ice_data[key]
+
+                media_buffer.setdefault("candidates", []).extend(
+                    new_ice_data["candidates"]
+                )
+                continue
+
             for content_name, content_data in session["contents"].items():
                 if content_data["application_data"].get("media") == media_type:
                     break
             else:
-                log.warning(
-                    "no media of type {media_type} has been found"
-                )
+                log.warning(f"no media of type {media_type} has been found")
                 continue
             restart = self.update_candidates(
                 content_data["transport_data"], new_ice_data, True
@@ -370,8 +411,12 @@
                 self.host.bridge.ice_restart(session["id"], "local", client.profile)
             transport_elt = self.build_transport(new_ice_data)
             iq_elt, __ = self._j.build_action(
-                client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt,
-                transport_elt=transport_elt
+                client,
+                self._j.A_TRANSPORT_INFO,
+                session,
+                content_name,
+                iq_elt=iq_elt,
+                context_elt=transport_elt,
             )
 
         if iq_elt is not None:
@@ -380,13 +425,9 @@
             except Exception as e:
                 log.warning(f"Could not send new ICE candidates: {e}")
 
-        else:
-            log.error("Could not find any content to apply new ICE candidates")
-
 
 @implementer(iwokkel.IDisco)
 class XEP_0176_handler(XMPPHandler):
-
     def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
         return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)]