Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0176.py @ 4116:23fa52acf72c
plugin XEP-0167, XEP-0176: transport-info and ICE candidate sending are delayed if session is not active yet
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 21 Aug 2023 15:19:45 +0200 |
parents | 4b842c1fb686 |
children | e11b13418ba6 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0176.py Wed Aug 16 18:33:28 2023 +0200 +++ b/libervia/backend/plugins/plugin_xep_0176.py Mon Aug 21 15:19:45 2023 +0200 @@ -36,7 +36,7 @@ log = getLogger(__name__) -NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1" +NS_JINGLE_ICE_UDP = "urn:xmpp:jingle:transports:ice-udp:1" PLUGIN_INFO = { C.PI_NAME: "Jingle ICE-UDP Transport Method", @@ -53,7 +53,6 @@ class XEP_0176(BaseTransportHandler): - def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host @@ -86,11 +85,13 @@ profile_key: str, ): client = self.host.get_client(profile_key) - return defer.ensureDeferred(self.ice_candidates_add( - client, - session_id, - data_format.deserialise(media_ice_data_s), - )) + return defer.ensureDeferred( + self.ice_candidates_add( + client, + session_id, + data_format.deserialise(media_ice_data_s), + ) + ) def build_transport(self, ice_data: dict) -> domish.Element: """Generate <transport> element from ICE data @@ -119,14 +120,13 @@ try: ufrag: str = ice_data["ufrag"] pwd: str = ice_data["pwd"] - candidates: List[dict] = ice_data["candidates"] except KeyError as e: raise exceptions.DataError(f"ICE {e} must be provided") + candidates: List[dict] = ice_data.get("candidates", []) candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True) transport_elt = domish.Element( - (NS_JINGLE_ICE_UDP, "transport"), - attribs={"ufrag": ufrag, "pwd": pwd} + (NS_JINGLE_ICE_UDP, "transport"), attribs={"ufrag": ufrag, "pwd": pwd} ) for candidate in candidates: @@ -162,10 +162,7 @@ @return: ICE data (as in [build_transport]) """ try: - ice_data = { - "ufrag": transport_elt["ufrag"], - "pwd": transport_elt["pwd"] - } + ice_data = {"ufrag": transport_elt["ufrag"], "pwd": transport_elt["pwd"]} except KeyError as e: raise exceptions.DataError( f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}" @@ -256,11 +253,36 @@ peer_ice_data = self.parse_transport(transport_elt) transport_data["peer_ice_data"] = peer_ice_data - elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): + elif action == self._j.A_ACCEPTED_ACK: + buffer = session.pop("XEP-0176_handler_buffer", None) + if buffer: + log.debug("replaying buffered events") + for args in buffer: + await self.jingle_handler(*args) + elif action == self._j.A_PREPARE_RESPONDER: pass elif action == self._j.A_SESSION_ACCEPT: - pass + # we check if we have any buffered ICE candidates, and send them if it's the + # case + media_type = content_data["application_data"].get("media") + try: + buffer = session["XEP-0176_buffer"] + buffered_ice_data = buffer.pop(media_type) + except KeyError: + pass + else: + if not buffer: + del session["XEP-0176_buffer"] + transport_elt = self.build_transport(buffered_ice_data) + iq_elt, __ = self._j.build_action( + client, + self._j.A_TRANSPORT_INFO, + session, + content_name, + context_elt=transport_elt, + ) + await iq_elt.send() elif action == self._j.A_START: pass @@ -269,6 +291,13 @@ # responder side, we give our candidates transport_elt = self.build_transport(transport_data["local_ice_data"]) elif action == self._j.A_TRANSPORT_INFO: + if session["state"] == self._j.STATE_PENDING: + # Session is not yet active; we buffer the arguments to replay them + # when the session becomes active. This makes the frontend's life easier. + log.debug("session is not active yet, buffering transport-info element") + buffer = session.setdefault("XEP-0176_handler_buffer", []) + buffer.append([client, action, session, content_name, transport_elt]) + return transport_elt media_type = content_data["application_data"].get("media") new_ice_data = self.parse_transport(transport_elt) @@ -283,10 +312,10 @@ self.host.bridge.ice_candidates_new( session["id"], data_format.serialise({media_type: new_ice_data}), - client.profile + client.profile, ) elif action == self._j.A_DESTROY: - pass + pass else: log.warning("FIXME: unmanaged action {}".format(action)) @@ -303,10 +332,7 @@ log.debug("ICE-UDP session terminated") def update_candidates( - self, - transport_data: dict, - new_ice_data: dict, - local: bool + self, transport_data: dict, new_ice_data: dict, local: bool ) -> bool: """Update ICE candidates when new one are received @@ -320,9 +346,7 @@ try: ice_data = transport_data[key] except KeyError: - log.warning( - f"no {key} available" - ) + log.warning(f"no {key} available") transport_data[key] = new_ice_data else: if ( @@ -336,28 +360,45 @@ return False async def ice_candidates_add( - self, - client: SatXMPPEntity, - session_id: str, - media_ice_data: Dict[str, dict] + self, client: SatXMPPEntity, session_id: str, media_ice_data: Dict[str, dict] ) -> None: """Called when a new ICE candidates are available for a session @param session_id: Session ID - @param candidates: a map from media type (audio, video) to ICE data + @param media_ice_data: a map from media type (audio, video) to ICE data ICE data must be in the same format as in [self.parse_transport] """ session = self._j.get_session(client, session_id) iq_elt: Optional[domish.Element] = None for media_type, new_ice_data in media_ice_data.items(): + if session["state"] == self._j.STATE_PENDING: + log.debug(f"session not active, buffering") + buffer = session.setdefault("XEP-0176_buffer", {}) + media_buffer = buffer.setdefault(media_type, {}) + + for key in ["ufrag", "pwd"]: + if key not in media_buffer: + media_buffer[key] = new_ice_data[key] + else: + if media_buffer[key] != new_ice_data[key]: + log.warning( + f"{key} conflict, new value will replace old one\n" + f"buffer={media_buffer[key]!r}\n" + f"new={new_ice_data[key]!r}" + ) + media_buffer[key] = new_ice_data[key] + + media_buffer.setdefault("candidates", []).extend( + new_ice_data["candidates"] + ) + continue + for content_name, content_data in session["contents"].items(): if content_data["application_data"].get("media") == media_type: break else: - log.warning( - "no media of type {media_type} has been found" - ) + log.warning(f"no media of type {media_type} has been found") continue restart = self.update_candidates( content_data["transport_data"], new_ice_data, True @@ -370,8 +411,12 @@ self.host.bridge.ice_restart(session["id"], "local", client.profile) transport_elt = self.build_transport(new_ice_data) iq_elt, __ = self._j.build_action( - client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt, - transport_elt=transport_elt + client, + self._j.A_TRANSPORT_INFO, + session, + content_name, + iq_elt=iq_elt, + context_elt=transport_elt, ) if iq_elt is not None: @@ -380,13 +425,9 @@ except Exception as e: log.warning(f"Could not send new ICE candidates: {e}") - else: - log.error("Could not find any content to apply new ICE candidates") - @implementer(iwokkel.IDisco) class XEP_0176_handler(XMPPHandler): - def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)]