Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0167/__init__.py @ 4291:39ac821ebbdb
plugin XEP-0167: handle conferences:
- SDP can now be answered by component instead of frontend. This is useful for A/V
conferences component to handle A/V call jingle sessions.
- new `call_update` and method, and `content-add` action preparation. This is not yet used
by A/V conference, but it's a preparation for a potential future use.
- Add NS_AV_CONFERENCES to features as required by the newly proposed A/V Conferences
protoXEP.
rel 447
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 29 Jul 2024 03:31:09 +0200 |
parents | 96fdf4891747 |
children | a0ed5c976bf8 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0167/__init__.py Mon Jul 29 03:31:06 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py Mon Jul 29 03:31:09 2024 +0200 @@ -95,12 +95,20 @@ async_=True, ) host.bridge.add_method( + "call_update", + ".plugin", + in_sign="sss", + out_sign="", + method=self._call_update, + async_=True, + ) + host.bridge.add_method( "call_answer_sdp", ".plugin", in_sign="sss", out_sign="", method=self._call_answer_sdp, - async_=True, + async_=False, ) host.bridge.add_method( "call_info", @@ -122,6 +130,9 @@ # profile host.bridge.add_signal("call_setup", ".plugin", signature="sss") + # args: session_id, serialised update data, profile + host.bridge.add_signal("call_update", ".plugin", signature="sss") + # args: session_id, data, profile host.bridge.add_signal("call_ended", ".plugin", signature="sss") @@ -146,6 +157,17 @@ ) ) + def _call_update( + self, + session_id: str, + call_data_s: str, + profile_key: str, + ): + client = self.host.get_client(profile_key) + return defer.ensureDeferred( + self.call_update(client, session_id, data_format.deserialise(call_data_s)) + ) + def parse_call_data(self, call_data: dict) -> dict: """Parse ``call_data`` and return corresponding contents end metadata""" metadata = call_data.get("metadata") or {} @@ -233,6 +255,7 @@ client: SatXMPPEntity, peer_jid: jid.JID, call_data: dict, + session_id: str | None = None, ) -> str: """Initiate a call session with the given peer. @@ -247,6 +270,8 @@ - id (str): Identifier for the media (optional). - ice-candidates: ICE candidates for media transport. - And other transport specific data. + @param session_id: ID of the Jingle session. If None, an ID will be automatically + generated. @return: Session ID (SID) for the initiated call session. @@ -271,9 +296,75 @@ call_type=call_type, metadata=metadata, peer_metadata={}, + sid=session_id, ) return sid + async def call_update( + self, + client: SatXMPPEntity, + session_id: str, + call_data: dict, + ) -> None: + """Update a running call session. + + @param session_id: ID of the Jingle session to update. + @param call_data: Dictionary containing updated data for the call. Must include SDP information. + The dict can have the following keys: + - sdp (str): SDP data for the call. + - metadata (dict): Additional metadata for the call (optional). + Each media type ("audio" and "video") in the SDP should have: + - application_data (dict): Data about the media. + - fingerprint (str): Security fingerprint data (optional). + - id (str): Identifier for the media (optional). + - ice-candidates: ICE candidates for media transport. + - And other transport specific data. + + + @raises exceptions.DataError: If media data is invalid or duplicate content name + (mid) is found. + """ + session = self._j.get_session(client, session_id) + try: + new_offer_sdp = call_data["sdp"] + except KeyError: + raise exceptions.DataError(f"New SDP offer is missing: {call_data}") + metadata = self.parse_call_data(call_data) + contents = self.get_contents(call_data, metadata) + if not contents: + raise exceptions.DataError("no valid media data found: {call_data}") + + call_type = ( + C.META_SUBTYPE_CALL_VIDEO + if "video" in call_data + else C.META_SUBTYPE_CALL_AUDIO + ) + for content_args in contents: + content = content_args["app_kwargs"] + content["app_ns"] = NS_JINGLE_RTP + content["name"] = (content_args["name"],) + content["transport_type"] = self._j.TRANSPORT_DATAGRAM + media = content["media"] + media_data = content["media_data"].copy() + media_data["transport_data"] = content_args["transport_data"][ + "local_ice_data" + ] + desc_elt = mapping.build_description(media, media_data, {}) + iq_elt, __ = self._j.build_action( + client, + self._j.A_CONTENT_ADD, + session, + content_args["name"], + context_elt=desc_elt, + ) + content_data = self._j.get_content_data(content) + transport = self._j.get_transport(client, content, content_data) + transport_elt = transport.handler.build_transport( + media_data["transport_data"] + ) + iq_elt.jingle.content.addChild(transport_elt) + await iq_elt.send() + def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None: client = self.host.get_client(profile) session = self._j.get_session(client, session_id) @@ -551,26 +642,43 @@ local_ice_data = media_data["transport_data"] transport_data["local_ice_data"] = local_ice_data - def send_answer_sdp(self, client: SatXMPPEntity, session: dict) -> None: + async def send_answer_sdp(self, client: SatXMPPEntity, session: dict) -> None: """Send answer SDP to frontend""" if not session.get(ANSWER_SDP_SENT_KEY, False): # we only send the signal once, as it means that the whole session is # accepted answer_sdp = mapping.generate_sdp_from_session(session) - self.host.bridge.call_setup( - session["id"], - data_format.serialise( + + call_setup = session.get("call_setup_cb") + + if call_setup is None: + self.host.bridge.call_setup( + session["id"], + data_format.serialise( + { + "role": session["role"], + "sdp": answer_sdp, + } + ), + client.profile, + ) + else: + await call_setup( + client, + session, { "role": session["role"], "sdp": answer_sdp, - } - ), - client.profile, - ) + }, + ) + session[ANSWER_SDP_SENT_KEY] = True async def jingle_handler(self, client, action, session, content_name, desc_elt): - content_data = session["contents"][content_name] + if action == self._j.A_CONTENT_ADD: + content_data = session["contents_new"][content_name] + else: + content_data = session["contents"][content_name] application_data = content_data["application_data"] if action == self._j.A_PREPARE_CONFIRMATION: session["metadata"] = {} @@ -592,7 +700,16 @@ elif action == self._j.A_PREPARE_INITIATOR: application_data["peer_data"] = mapping.parse_description(desc_elt) elif action == self._j.A_SESSION_ACCEPT: - self.send_answer_sdp(client, session) + await self.send_answer_sdp(client, session) + elif action == self._j.A_CONTENT_ADD: + current_contents = session["contents"] + if content_name in current_contents: + raise exceptions.ConflictError( + f"There is already a {content_name!r} content." + ) + current_contents[content_name] = content_data + application_data["media"] = desc_elt["media"] + application_data["peer_data"] = mapping.parse_description(desc_elt) else: log.warning(f"FIXME: unmanaged action {action}") @@ -678,6 +795,7 @@ disco.DiscoFeature(NS_JINGLE_RTP), disco.DiscoFeature(NS_JINGLE_RTP_AUDIO), disco.DiscoFeature(NS_JINGLE_RTP_VIDEO), + disco.DiscoFeature(NS_AV_CONFERENCES), ] def getDiscoItems(self, requestor, target, nodeIdentifier=""):