Mercurial > libervia-backend
changeset 4116:23fa52acf72c
plugin XEP-0167, XEP-0176: transport-info and ICE candidate sending are delayed if session is not active yet
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 21 Aug 2023 15:19:45 +0200 (16 months ago) |
parents | 0da563780ffc |
children | d861ad696797 |
files | libervia/backend/plugins/plugin_xep_0166/__init__.py libervia/backend/plugins/plugin_xep_0167/__init__.py libervia/backend/plugins/plugin_xep_0167/mapping.py libervia/backend/plugins/plugin_xep_0176.py |
diffstat | 4 files changed, 125 insertions(+), 108 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py Wed Aug 16 18:33:28 2023 +0200 +++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py Mon Aug 21 15:19:45 2023 +0200 @@ -55,9 +55,6 @@ NS_JINGLE : Final = "urn:xmpp:jingle:1" NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1" JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]' -STATE_PENDING : Final = "PENDING" -STATE_ACTIVE : Final = "ACTIVE" -STATE_ENDED : Final = "ENDED" CONFIRM_TXT : Final = D_( "{entity} want to start a jingle session with you, do you accept ?" ) @@ -89,6 +86,10 @@ REASON_FAILED_TRANSPORT : Final = "failed-transport" REASON_CONNECTIVITY_ERROR : Final = "connectivity-error" + STATE_PENDING : Final = "PENDING" + STATE_ACTIVE : Final = "ACTIVE" + STATE_ENDED : Final = "ENDED" + # standard actions A_SESSION_INITIATE : Final = "session-initiate" @@ -194,7 +195,7 @@ session_data = { "id": sid, - "state": STATE_PENDING, + "state": XEP_0166.STATE_PENDING, "initiator": client.jid if role == XEP_0166.ROLE_INITIATOR else peer_jid, "role": role, "local_jid": local_jid or client.jid, @@ -502,7 +503,8 @@ content_elt["creator"] = content_data["creator"] if context_elt is not None: - pass + if context_elt.parent is None: + content_elt.addChild(context_elt) elif action == XEP_0166.A_TRANSPORT_INFO: context_elt = transport_elt = content_elt.addElement( "transport", content_data["transport"].namespace @@ -1191,7 +1193,7 @@ d_list.addCallback(lambda __: iq_elt.send()) def change_state(__, session): - session["state"] = STATE_ACTIVE + session["state"] = XEP_0166.STATE_ACTIVE d_list.addCallback(change_state, session) d_list.addCallback( @@ -1279,7 +1281,7 @@ # at this point we can send the <iq/> result to confirm reception of the request client.send(xmlstream.toResponse(request, "result")) # and change the state - session["state"] = STATE_ACTIVE + session["state"] = XEP_0166.STATE_ACTIVE session["jingle_elt"] = jingle_elt await defer.DeferredList(self._call_plugins(
--- a/libervia/backend/plugins/plugin_xep_0167/__init__.py Wed Aug 16 18:33:28 2023 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py Mon Aug 21 15:19:45 2023 +0200 @@ -115,19 +115,13 @@ # args: session_id, serialised setup data (dict with keys "role" and "sdp"), # profile - host.bridge.add_signal( - "call_setup", ".plugin", signature="sss" - ) + host.bridge.add_signal("call_setup", ".plugin", signature="sss") # args: session_id, data, profile - host.bridge.add_signal( - "call_ended", ".plugin", signature="sss" - ) + host.bridge.add_signal("call_ended", ".plugin", signature="sss") # args: session_id, info_type, extra, profile - host.bridge.add_signal( - "call_info", ".plugin", signature="ssss" - ) + host.bridge.add_signal("call_info", ".plugin", signature="ssss") def get_handler(self, client): return XEP_0167_handler() @@ -194,7 +188,9 @@ except KeyError: log.warning(f"no media ID found for {media_type}: {media_data}") try: - call_data[media_type]["ice-candidates"] = transport_data["candidates"] + call_data[media_type]["ice-candidates"] = transport_data.get( + "candidates", [] + ) metadata["ice-ufrag"] = transport_data["ufrag"] metadata["ice-pwd"] = transport_data["pwd"] except KeyError: @@ -250,12 +246,7 @@ ) return sid - def _call_answer_sdp( - self, - session_id: str, - answer_sdp: str, - profile: str - ) -> None: + def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None: client = self.host.get_client(profile) session = self._j.get_session(client, session_id) try: @@ -274,9 +265,7 @@ ): client = self.host.get_client(profile_key) return defer.ensureDeferred( - self.call_end( - client, session_id, data_format.deserialise(data_s) - ) + self.call_end(client, session_id, data_format.deserialise(data_s)) ) async def call_end( @@ -297,10 +286,7 @@ # jingle callbacks async def confirm_incoming_call( - self, - client: SatXMPPEntity, - session: dict, - call_type: str + self, client: SatXMPPEntity, session: dict, call_type: str ) -> bool: """Prompt the user for a call confirmation. @@ -341,10 +327,7 @@ return accepted async def jingle_preflight( - self, - client: SatXMPPEntity, - session: dict, - description_elt: domish.Element + self, client: SatXMPPEntity, session: dict, description_elt: domish.Element ) -> None: """Perform preflight checks for an incoming call session. @@ -380,7 +363,7 @@ self.host.bridge.call_ended( session["id"], data_format.serialise({"reason": "retracted"}), - client.profile + client.profile, ) raise e @@ -392,13 +375,11 @@ client: SatXMPPEntity, session: dict, info_type: str, - info_data: dict|None = None + info_data: dict | None = None, ) -> None: if info_type == "ringing": if not session.get("ringing", False): - self.host.bridge.call_info( - session["id"], "ringing", "", client.profile - ) + self.host.bridge.call_info(session["id"], "ringing", "", client.profile) # we indicate that the ringing has started, to avoid sending several times # the signal session["ringing"] = True @@ -406,10 +387,7 @@ log.warning(f"Unknown preflight info type: {info_type!r}") async def jingle_preflight_cancel( - self, - client: SatXMPPEntity, - session: dict, - cancel_error: exceptions.CancelError + self, client: SatXMPPEntity, session: dict, cancel_error: exceptions.CancelError ) -> None: """The call has been rejected""" # call_ended is use to send the signal only once even if there are audio and video @@ -417,20 +395,15 @@ call_ended = session.get("call_ended", False) if call_ended: return - data = { - "reason": getattr(cancel_error, "reason", "cancelled") - } + data = {"reason": getattr(cancel_error, "reason", "cancelled")} text = getattr(cancel_error, "text", None) if text: data["text"] = text self.host.bridge.call_ended( - session["id"], - data_format.serialise(data), - client.profile + session["id"], data_format.serialise(data), client.profile ) session["call_ended"] = True - def jingle_session_init( self, client: SatXMPPEntity, @@ -507,11 +480,13 @@ self.host.bridge.call_setup( session["id"], - data_format.serialise({ - "role": session["role"], - "sdp": sdp, - }), - client.profile + data_format.serialise( + { + "role": session["role"], + "sdp": sdp, + } + ), + client.profile, ) answer_sdp = await answer_sdp_d @@ -559,11 +534,13 @@ answer_sdp = mapping.generate_sdp_from_session(session) self.host.bridge.call_setup( session["id"], - data_format.serialise({ - "role": session["role"], - "sdp": answer_sdp, - }), - client.profile + data_format.serialise( + { + "role": session["role"], + "sdp": answer_sdp, + } + ), + client.profile, ) else: log.warning(f"FIXME: unmanaged action {action}") @@ -635,9 +612,7 @@ reason_elt: domish.Element, ) -> None: reason, text = self._j.parse_reason_elt(reason_elt) - data = { - "reason": reason - } + data = {"reason": reason} if text: data["text"] = text self.host.bridge.call_ended(
--- a/libervia/backend/plugins/plugin_xep_0167/mapping.py Wed Aug 16 18:33:28 2023 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/mapping.py Mon Aug 21 15:19:45 2023 +0200 @@ -439,12 +439,11 @@ try: ice_candidates = first_content["transport_data"]["candidates"] except KeyError: - log.warning("missing candidates in SDP") - else: - for idx, content in enumerate(all_media.values()): - if idx == 0: - continue - content["transport_data"].setdefault("candidates", ice_candidates) + ice_candidates = [] + for idx, content in enumerate(all_media.values()): + if idx == 0: + continue + content["transport_data"].setdefault("candidates", ice_candidates) return call_data
--- a/libervia/backend/plugins/plugin_xep_0176.py Wed Aug 16 18:33:28 2023 +0200 +++ b/libervia/backend/plugins/plugin_xep_0176.py Mon Aug 21 15:19:45 2023 +0200 @@ -36,7 +36,7 @@ log = getLogger(__name__) -NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1" +NS_JINGLE_ICE_UDP = "urn:xmpp:jingle:transports:ice-udp:1" PLUGIN_INFO = { C.PI_NAME: "Jingle ICE-UDP Transport Method", @@ -53,7 +53,6 @@ class XEP_0176(BaseTransportHandler): - def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host @@ -86,11 +85,13 @@ profile_key: str, ): client = self.host.get_client(profile_key) - return defer.ensureDeferred(self.ice_candidates_add( - client, - session_id, - data_format.deserialise(media_ice_data_s), - )) + return defer.ensureDeferred( + self.ice_candidates_add( + client, + session_id, + data_format.deserialise(media_ice_data_s), + ) + ) def build_transport(self, ice_data: dict) -> domish.Element: """Generate <transport> element from ICE data @@ -119,14 +120,13 @@ try: ufrag: str = ice_data["ufrag"] pwd: str = ice_data["pwd"] - candidates: List[dict] = ice_data["candidates"] except KeyError as e: raise exceptions.DataError(f"ICE {e} must be provided") + candidates: List[dict] = ice_data.get("candidates", []) candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True) transport_elt = domish.Element( - (NS_JINGLE_ICE_UDP, "transport"), - attribs={"ufrag": ufrag, "pwd": pwd} + (NS_JINGLE_ICE_UDP, "transport"), attribs={"ufrag": ufrag, "pwd": pwd} ) for candidate in candidates: @@ -162,10 +162,7 @@ @return: ICE data (as in [build_transport]) """ try: - ice_data = { - "ufrag": transport_elt["ufrag"], - "pwd": transport_elt["pwd"] - } + ice_data = {"ufrag": transport_elt["ufrag"], "pwd": transport_elt["pwd"]} except KeyError as e: raise exceptions.DataError( f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}" @@ -256,11 +253,36 @@ peer_ice_data = self.parse_transport(transport_elt) transport_data["peer_ice_data"] = peer_ice_data - elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): + elif action == self._j.A_ACCEPTED_ACK: + buffer = session.pop("XEP-0176_handler_buffer", None) + if buffer: + log.debug("replaying buffered events") + for args in buffer: + await self.jingle_handler(*args) + elif action == self._j.A_PREPARE_RESPONDER: pass elif action == self._j.A_SESSION_ACCEPT: - pass + # we check if we have any buffered ICE candidates, and send them if it's the + # case + media_type = content_data["application_data"].get("media") + try: + buffer = session["XEP-0176_buffer"] + buffered_ice_data = buffer.pop(media_type) + except KeyError: + pass + else: + if not buffer: + del session["XEP-0176_buffer"] + transport_elt = self.build_transport(buffered_ice_data) + iq_elt, __ = self._j.build_action( + client, + self._j.A_TRANSPORT_INFO, + session, + content_name, + context_elt=transport_elt, + ) + await iq_elt.send() elif action == self._j.A_START: pass @@ -269,6 +291,13 @@ # responder side, we give our candidates transport_elt = self.build_transport(transport_data["local_ice_data"]) elif action == self._j.A_TRANSPORT_INFO: + if session["state"] == self._j.STATE_PENDING: + # Session is not yet active; we buffer the arguments to replay them + # when the session becomes active. This makes the frontend's life easier. + log.debug("session is not active yet, buffering transport-info element") + buffer = session.setdefault("XEP-0176_handler_buffer", []) + buffer.append([client, action, session, content_name, transport_elt]) + return transport_elt media_type = content_data["application_data"].get("media") new_ice_data = self.parse_transport(transport_elt) @@ -283,10 +312,10 @@ self.host.bridge.ice_candidates_new( session["id"], data_format.serialise({media_type: new_ice_data}), - client.profile + client.profile, ) elif action == self._j.A_DESTROY: - pass + pass else: log.warning("FIXME: unmanaged action {}".format(action)) @@ -303,10 +332,7 @@ log.debug("ICE-UDP session terminated") def update_candidates( - self, - transport_data: dict, - new_ice_data: dict, - local: bool + self, transport_data: dict, new_ice_data: dict, local: bool ) -> bool: """Update ICE candidates when new one are received @@ -320,9 +346,7 @@ try: ice_data = transport_data[key] except KeyError: - log.warning( - f"no {key} available" - ) + log.warning(f"no {key} available") transport_data[key] = new_ice_data else: if ( @@ -336,28 +360,45 @@ return False async def ice_candidates_add( - self, - client: SatXMPPEntity, - session_id: str, - media_ice_data: Dict[str, dict] + self, client: SatXMPPEntity, session_id: str, media_ice_data: Dict[str, dict] ) -> None: """Called when a new ICE candidates are available for a session @param session_id: Session ID - @param candidates: a map from media type (audio, video) to ICE data + @param media_ice_data: a map from media type (audio, video) to ICE data ICE data must be in the same format as in [self.parse_transport] """ session = self._j.get_session(client, session_id) iq_elt: Optional[domish.Element] = None for media_type, new_ice_data in media_ice_data.items(): + if session["state"] == self._j.STATE_PENDING: + log.debug(f"session not active, buffering") + buffer = session.setdefault("XEP-0176_buffer", {}) + media_buffer = buffer.setdefault(media_type, {}) + + for key in ["ufrag", "pwd"]: + if key not in media_buffer: + media_buffer[key] = new_ice_data[key] + else: + if media_buffer[key] != new_ice_data[key]: + log.warning( + f"{key} conflict, new value will replace old one\n" + f"buffer={media_buffer[key]!r}\n" + f"new={new_ice_data[key]!r}" + ) + media_buffer[key] = new_ice_data[key] + + media_buffer.setdefault("candidates", []).extend( + new_ice_data["candidates"] + ) + continue + for content_name, content_data in session["contents"].items(): if content_data["application_data"].get("media") == media_type: break else: - log.warning( - "no media of type {media_type} has been found" - ) + log.warning(f"no media of type {media_type} has been found") continue restart = self.update_candidates( content_data["transport_data"], new_ice_data, True @@ -370,8 +411,12 @@ self.host.bridge.ice_restart(session["id"], "local", client.profile) transport_elt = self.build_transport(new_ice_data) iq_elt, __ = self._j.build_action( - client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt, - transport_elt=transport_elt + client, + self._j.A_TRANSPORT_INFO, + session, + content_name, + iq_elt=iq_elt, + context_elt=transport_elt, ) if iq_elt is not None: @@ -380,13 +425,9 @@ except Exception as e: log.warning(f"Could not send new ICE candidates: {e}") - else: - log.error("Could not find any content to apply new ICE candidates") - @implementer(iwokkel.IDisco) class XEP_0176_handler(XMPPHandler): - def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)]