changeset 4116:23fa52acf72c

plugin XEP-0167, XEP-0176: transport-info and ICE candidate sending are delayed if session is not active yet
author Goffi <goffi@goffi.org>
date Mon, 21 Aug 2023 15:19:45 +0200 (16 months ago)
parents 0da563780ffc
children d861ad696797
files libervia/backend/plugins/plugin_xep_0166/__init__.py libervia/backend/plugins/plugin_xep_0167/__init__.py libervia/backend/plugins/plugin_xep_0167/mapping.py libervia/backend/plugins/plugin_xep_0176.py
diffstat 4 files changed, 125 insertions(+), 108 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py	Wed Aug 16 18:33:28 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py	Mon Aug 21 15:19:45 2023 +0200
@@ -55,9 +55,6 @@
 NS_JINGLE : Final = "urn:xmpp:jingle:1"
 NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1"
 JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]'
-STATE_PENDING : Final = "PENDING"
-STATE_ACTIVE : Final = "ACTIVE"
-STATE_ENDED : Final = "ENDED"
 CONFIRM_TXT : Final = D_(
     "{entity} want to start a jingle session with you, do you accept ?"
 )
@@ -89,6 +86,10 @@
     REASON_FAILED_TRANSPORT : Final = "failed-transport"
     REASON_CONNECTIVITY_ERROR : Final = "connectivity-error"
 
+    STATE_PENDING : Final = "PENDING"
+    STATE_ACTIVE : Final = "ACTIVE"
+    STATE_ENDED : Final = "ENDED"
+
     # standard actions
 
     A_SESSION_INITIATE : Final = "session-initiate"
@@ -194,7 +195,7 @@
 
         session_data = {
             "id": sid,
-            "state": STATE_PENDING,
+            "state": XEP_0166.STATE_PENDING,
             "initiator": client.jid if role == XEP_0166.ROLE_INITIATOR else peer_jid,
             "role": role,
             "local_jid": local_jid or client.jid,
@@ -502,7 +503,8 @@
         content_elt["creator"] = content_data["creator"]
 
         if context_elt is not None:
-            pass
+            if context_elt.parent is None:
+                content_elt.addChild(context_elt)
         elif action == XEP_0166.A_TRANSPORT_INFO:
             context_elt = transport_elt = content_elt.addElement(
                 "transport", content_data["transport"].namespace
@@ -1191,7 +1193,7 @@
         d_list.addCallback(lambda __: iq_elt.send())
 
         def change_state(__, session):
-            session["state"] = STATE_ACTIVE
+            session["state"] = XEP_0166.STATE_ACTIVE
 
         d_list.addCallback(change_state, session)
         d_list.addCallback(
@@ -1279,7 +1281,7 @@
         # at this point we can send the <iq/> result to confirm reception of the request
         client.send(xmlstream.toResponse(request, "result"))
         # and change the state
-        session["state"] = STATE_ACTIVE
+        session["state"] = XEP_0166.STATE_ACTIVE
         session["jingle_elt"] = jingle_elt
 
         await defer.DeferredList(self._call_plugins(
--- a/libervia/backend/plugins/plugin_xep_0167/__init__.py	Wed Aug 16 18:33:28 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py	Mon Aug 21 15:19:45 2023 +0200
@@ -115,19 +115,13 @@
 
         # args: session_id, serialised setup data (dict with keys "role" and "sdp"),
         #   profile
-        host.bridge.add_signal(
-            "call_setup", ".plugin", signature="sss"
-        )
+        host.bridge.add_signal("call_setup", ".plugin", signature="sss")
 
         # args: session_id, data, profile
-        host.bridge.add_signal(
-            "call_ended", ".plugin", signature="sss"
-        )
+        host.bridge.add_signal("call_ended", ".plugin", signature="sss")
 
         # args: session_id, info_type, extra, profile
-        host.bridge.add_signal(
-            "call_info", ".plugin", signature="ssss"
-        )
+        host.bridge.add_signal("call_info", ".plugin", signature="ssss")
 
     def get_handler(self, client):
         return XEP_0167_handler()
@@ -194,7 +188,9 @@
                 except KeyError:
                     log.warning(f"no media ID found for {media_type}: {media_data}")
                 try:
-                    call_data[media_type]["ice-candidates"] = transport_data["candidates"]
+                    call_data[media_type]["ice-candidates"] = transport_data.get(
+                        "candidates", []
+                    )
                     metadata["ice-ufrag"] = transport_data["ufrag"]
                     metadata["ice-pwd"] = transport_data["pwd"]
                 except KeyError:
@@ -250,12 +246,7 @@
         )
         return sid
 
-    def _call_answer_sdp(
-        self,
-        session_id: str,
-        answer_sdp: str,
-        profile: str
-    ) -> None:
+    def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None:
         client = self.host.get_client(profile)
         session = self._j.get_session(client, session_id)
         try:
@@ -274,9 +265,7 @@
     ):
         client = self.host.get_client(profile_key)
         return defer.ensureDeferred(
-            self.call_end(
-                client, session_id, data_format.deserialise(data_s)
-            )
+            self.call_end(client, session_id, data_format.deserialise(data_s))
         )
 
     async def call_end(
@@ -297,10 +286,7 @@
     # jingle callbacks
 
     async def confirm_incoming_call(
-        self,
-        client: SatXMPPEntity,
-        session: dict,
-        call_type: str
+        self, client: SatXMPPEntity, session: dict, call_type: str
     ) -> bool:
         """Prompt the user for a call confirmation.
 
@@ -341,10 +327,7 @@
         return accepted
 
     async def jingle_preflight(
-        self,
-        client: SatXMPPEntity,
-        session: dict,
-        description_elt: domish.Element
+        self, client: SatXMPPEntity, session: dict, description_elt: domish.Element
     ) -> None:
         """Perform preflight checks for an incoming call session.
 
@@ -380,7 +363,7 @@
             self.host.bridge.call_ended(
                 session["id"],
                 data_format.serialise({"reason": "retracted"}),
-                client.profile
+                client.profile,
             )
             raise e
 
@@ -392,13 +375,11 @@
         client: SatXMPPEntity,
         session: dict,
         info_type: str,
-        info_data: dict|None = None
+        info_data: dict | None = None,
     ) -> None:
         if info_type == "ringing":
             if not session.get("ringing", False):
-                self.host.bridge.call_info(
-                    session["id"], "ringing", "", client.profile
-                )
+                self.host.bridge.call_info(session["id"], "ringing", "", client.profile)
                 # we indicate that the ringing has started, to avoid sending several times
                 # the signal
                 session["ringing"] = True
@@ -406,10 +387,7 @@
             log.warning(f"Unknown preflight info type: {info_type!r}")
 
     async def jingle_preflight_cancel(
-        self,
-        client: SatXMPPEntity,
-        session: dict,
-        cancel_error: exceptions.CancelError
+        self, client: SatXMPPEntity, session: dict, cancel_error: exceptions.CancelError
     ) -> None:
         """The call has been rejected"""
         # call_ended is use to send the signal only once even if there are audio and video
@@ -417,20 +395,15 @@
         call_ended = session.get("call_ended", False)
         if call_ended:
             return
-        data = {
-            "reason": getattr(cancel_error, "reason", "cancelled")
-        }
+        data = {"reason": getattr(cancel_error, "reason", "cancelled")}
         text = getattr(cancel_error, "text", None)
         if text:
             data["text"] = text
         self.host.bridge.call_ended(
-            session["id"],
-            data_format.serialise(data),
-            client.profile
+            session["id"], data_format.serialise(data), client.profile
         )
         session["call_ended"] = True
 
-
     def jingle_session_init(
         self,
         client: SatXMPPEntity,
@@ -507,11 +480,13 @@
 
         self.host.bridge.call_setup(
             session["id"],
-            data_format.serialise({
-                "role": session["role"],
-                "sdp": sdp,
-            }),
-            client.profile
+            data_format.serialise(
+                {
+                    "role": session["role"],
+                    "sdp": sdp,
+                }
+            ),
+            client.profile,
         )
 
         answer_sdp = await answer_sdp_d
@@ -559,11 +534,13 @@
                 answer_sdp = mapping.generate_sdp_from_session(session)
                 self.host.bridge.call_setup(
                     session["id"],
-                    data_format.serialise({
-                        "role": session["role"],
-                        "sdp": answer_sdp,
-                    }),
-                    client.profile
+                    data_format.serialise(
+                        {
+                            "role": session["role"],
+                            "sdp": answer_sdp,
+                        }
+                    ),
+                    client.profile,
                 )
         else:
             log.warning(f"FIXME: unmanaged action {action}")
@@ -635,9 +612,7 @@
         reason_elt: domish.Element,
     ) -> None:
         reason, text = self._j.parse_reason_elt(reason_elt)
-        data = {
-            "reason": reason
-        }
+        data = {"reason": reason}
         if text:
             data["text"] = text
         self.host.bridge.call_ended(
--- a/libervia/backend/plugins/plugin_xep_0167/mapping.py	Wed Aug 16 18:33:28 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0167/mapping.py	Mon Aug 21 15:19:45 2023 +0200
@@ -439,12 +439,11 @@
         try:
             ice_candidates = first_content["transport_data"]["candidates"]
         except KeyError:
-            log.warning("missing candidates in SDP")
-        else:
-            for idx, content in enumerate(all_media.values()):
-                if idx == 0:
-                    continue
-                content["transport_data"].setdefault("candidates", ice_candidates)
+            ice_candidates = []
+        for idx, content in enumerate(all_media.values()):
+            if idx == 0:
+                continue
+            content["transport_data"].setdefault("candidates", ice_candidates)
 
     return call_data
 
--- a/libervia/backend/plugins/plugin_xep_0176.py	Wed Aug 16 18:33:28 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0176.py	Mon Aug 21 15:19:45 2023 +0200
@@ -36,7 +36,7 @@
 
 log = getLogger(__name__)
 
-NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1"
+NS_JINGLE_ICE_UDP = "urn:xmpp:jingle:transports:ice-udp:1"
 
 PLUGIN_INFO = {
     C.PI_NAME: "Jingle ICE-UDP Transport Method",
@@ -53,7 +53,6 @@
 
 
 class XEP_0176(BaseTransportHandler):
-
     def __init__(self, host):
         log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
         self.host = host
@@ -86,11 +85,13 @@
         profile_key: str,
     ):
         client = self.host.get_client(profile_key)
-        return defer.ensureDeferred(self.ice_candidates_add(
-            client,
-            session_id,
-            data_format.deserialise(media_ice_data_s),
-        ))
+        return defer.ensureDeferred(
+            self.ice_candidates_add(
+                client,
+                session_id,
+                data_format.deserialise(media_ice_data_s),
+            )
+        )
 
     def build_transport(self, ice_data: dict) -> domish.Element:
         """Generate <transport> element from ICE data
@@ -119,14 +120,13 @@
         try:
             ufrag: str = ice_data["ufrag"]
             pwd: str = ice_data["pwd"]
-            candidates: List[dict] = ice_data["candidates"]
         except KeyError as e:
             raise exceptions.DataError(f"ICE {e} must be provided")
+        candidates: List[dict] = ice_data.get("candidates", [])
 
         candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True)
         transport_elt = domish.Element(
-            (NS_JINGLE_ICE_UDP, "transport"),
-            attribs={"ufrag": ufrag, "pwd": pwd}
+            (NS_JINGLE_ICE_UDP, "transport"), attribs={"ufrag": ufrag, "pwd": pwd}
         )
 
         for candidate in candidates:
@@ -162,10 +162,7 @@
         @return: ICE data (as in [build_transport])
         """
         try:
-            ice_data = {
-                "ufrag": transport_elt["ufrag"],
-                "pwd": transport_elt["pwd"]
-            }
+            ice_data = {"ufrag": transport_elt["ufrag"], "pwd": transport_elt["pwd"]}
         except KeyError as e:
             raise exceptions.DataError(
                 f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}"
@@ -256,11 +253,36 @@
             peer_ice_data = self.parse_transport(transport_elt)
             transport_data["peer_ice_data"] = peer_ice_data
 
-        elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
+        elif action == self._j.A_ACCEPTED_ACK:
+            buffer = session.pop("XEP-0176_handler_buffer", None)
+            if buffer:
+                log.debug("replaying buffered events")
+                for args in buffer:
+                    await self.jingle_handler(*args)
+        elif action == self._j.A_PREPARE_RESPONDER:
             pass
 
         elif action == self._j.A_SESSION_ACCEPT:
-            pass
+            # we check if we have any buffered ICE candidates, and send them if it's the
+            # case
+            media_type = content_data["application_data"].get("media")
+            try:
+                buffer = session["XEP-0176_buffer"]
+                buffered_ice_data = buffer.pop(media_type)
+            except KeyError:
+                pass
+            else:
+                if not buffer:
+                    del session["XEP-0176_buffer"]
+                transport_elt = self.build_transport(buffered_ice_data)
+                iq_elt, __ = self._j.build_action(
+                    client,
+                    self._j.A_TRANSPORT_INFO,
+                    session,
+                    content_name,
+                    context_elt=transport_elt,
+                )
+                await iq_elt.send()
 
         elif action == self._j.A_START:
             pass
@@ -269,6 +291,13 @@
             # responder side, we give our candidates
             transport_elt = self.build_transport(transport_data["local_ice_data"])
         elif action == self._j.A_TRANSPORT_INFO:
+            if session["state"] == self._j.STATE_PENDING:
+                # Session is not yet active; we buffer the arguments to replay them
+                # when the session becomes active. This makes the frontend's life easier.
+                log.debug("session is not active yet, buffering transport-info element")
+                buffer = session.setdefault("XEP-0176_handler_buffer", [])
+                buffer.append([client, action, session, content_name, transport_elt])
+                return transport_elt
 
             media_type = content_data["application_data"].get("media")
             new_ice_data = self.parse_transport(transport_elt)
@@ -283,10 +312,10 @@
             self.host.bridge.ice_candidates_new(
                 session["id"],
                 data_format.serialise({media_type: new_ice_data}),
-                client.profile
+                client.profile,
             )
         elif action == self._j.A_DESTROY:
-           pass
+            pass
         else:
             log.warning("FIXME: unmanaged action {}".format(action))
 
@@ -303,10 +332,7 @@
         log.debug("ICE-UDP session terminated")
 
     def update_candidates(
-        self,
-        transport_data: dict,
-        new_ice_data: dict,
-        local: bool
+        self, transport_data: dict, new_ice_data: dict, local: bool
     ) -> bool:
         """Update ICE candidates when new one are received
 
@@ -320,9 +346,7 @@
         try:
             ice_data = transport_data[key]
         except KeyError:
-            log.warning(
-                f"no {key} available"
-            )
+            log.warning(f"no {key} available")
             transport_data[key] = new_ice_data
         else:
             if (
@@ -336,28 +360,45 @@
         return False
 
     async def ice_candidates_add(
-        self,
-        client: SatXMPPEntity,
-        session_id: str,
-        media_ice_data: Dict[str, dict]
+        self, client: SatXMPPEntity, session_id: str, media_ice_data: Dict[str, dict]
     ) -> None:
         """Called when a new ICE candidates are available for a session
 
         @param session_id: Session ID
-        @param candidates: a map from media type (audio, video) to ICE data
+        @param media_ice_data: a map from media type (audio, video) to ICE data
             ICE data must be in the same format as in [self.parse_transport]
         """
         session = self._j.get_session(client, session_id)
         iq_elt: Optional[domish.Element] = None
 
         for media_type, new_ice_data in media_ice_data.items():
+            if session["state"] == self._j.STATE_PENDING:
+                log.debug(f"session not active, buffering")
+                buffer = session.setdefault("XEP-0176_buffer", {})
+                media_buffer = buffer.setdefault(media_type, {})
+
+                for key in ["ufrag", "pwd"]:
+                    if key not in media_buffer:
+                        media_buffer[key] = new_ice_data[key]
+                    else:
+                        if media_buffer[key] != new_ice_data[key]:
+                            log.warning(
+                                f"{key} conflict, new value will replace old one\n"
+                                f"buffer={media_buffer[key]!r}\n"
+                                f"new={new_ice_data[key]!r}"
+                            )
+                            media_buffer[key] = new_ice_data[key]
+
+                media_buffer.setdefault("candidates", []).extend(
+                    new_ice_data["candidates"]
+                )
+                continue
+
             for content_name, content_data in session["contents"].items():
                 if content_data["application_data"].get("media") == media_type:
                     break
             else:
-                log.warning(
-                    "no media of type {media_type} has been found"
-                )
+                log.warning(f"no media of type {media_type} has been found")
                 continue
             restart = self.update_candidates(
                 content_data["transport_data"], new_ice_data, True
@@ -370,8 +411,12 @@
                 self.host.bridge.ice_restart(session["id"], "local", client.profile)
             transport_elt = self.build_transport(new_ice_data)
             iq_elt, __ = self._j.build_action(
-                client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt,
-                transport_elt=transport_elt
+                client,
+                self._j.A_TRANSPORT_INFO,
+                session,
+                content_name,
+                iq_elt=iq_elt,
+                context_elt=transport_elt,
             )
 
         if iq_elt is not None:
@@ -380,13 +425,9 @@
             except Exception as e:
                 log.warning(f"Could not send new ICE candidates: {e}")
 
-        else:
-            log.error("Could not find any content to apply new ICE candidates")
-
 
 @implementer(iwokkel.IDisco)
 class XEP_0176_handler(XMPPHandler):
-
     def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
         return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)]