comparison libervia/backend/plugins/plugin_xep_0167/__init__.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents e11b13418ba6
children a7d4007a8fa5
comparison
equal deleted inserted replaced
4239:a38559e6d6e2 4240:79c8a70e1813
69 "unhold", 69 "unhold",
70 "mute", 70 "mute",
71 "unmute", 71 "unmute",
72 "ringing", 72 "ringing",
73 ) 73 )
74 ANSWER_SDP_SENT_KEY = "answer_sdp_sent"
74 75
75 76
76 class XEP_0167(BaseApplicationHandler): 77 class XEP_0167(BaseApplicationHandler):
78 namespace = NS_JINGLE_RTP
79
77 def __init__(self, host): 80 def __init__(self, host):
78 log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') 81 log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
79 self.host = host 82 self.host = host
80 # FIXME: to be removed once host is accessible from global var 83 # FIXME: to be removed once host is accessible from global var
81 mapping.host = host 84 mapping.host = host
145 def parse_call_data(self, call_data: dict) -> dict: 148 def parse_call_data(self, call_data: dict) -> dict:
146 """Parse ``call_data`` and return corresponding contents end metadata""" 149 """Parse ``call_data`` and return corresponding contents end metadata"""
147 metadata = call_data.get("metadata") or {} 150 metadata = call_data.get("metadata") or {}
148 151
149 if "sdp" in call_data: 152 if "sdp" in call_data:
150 sdp_data = mapping.parse_sdp(call_data["sdp"]) 153 sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR)
151 to_delete = set() 154 to_delete = set()
152 for media, data in sdp_data.items(): 155 for media, data in sdp_data.items():
153 if media not in ("audio", "video", "application"): 156 if media not in ("audio", "video", "application"):
154 continue 157 continue
155 to_delete.add(media) 158 to_delete.add(media)
184 del sdp_data[media] 187 del sdp_data[media]
185 metadata.update(sdp_data.get("metadata", {})) 188 metadata.update(sdp_data.get("metadata", {}))
186 189
187 return metadata 190 return metadata
188 191
189 async def call_start( 192 def get_contents(self, call_data: dict, metadata: dict) -> list[dict]:
190 self, 193 """Generate call related contents.
191 client: SatXMPPEntity, 194
192 peer_jid: jid.JID, 195 @param call_data: Call data after being parsed by [parse_call_data]
193 call_data: dict, 196 @param metadata: Metadata as returned by [parse_call_data]
194 ) -> str: 197 @return: List of contents to be used with [jingle.initiate].
195 """Initiate a call session with the given peer. 198
196
197 @param peer_jid: JID of the peer to initiate a call session with.
198 @param call_data: Dictionary containing data for the call. Must include SDP information.
199 The dict can have the following keys:
200 - sdp (str): SDP data for the call.
201 - metadata (dict): Additional metadata for the call (optional).
202 Each media type ("audio" and "video") in the SDP should have:
203 - application_data (dict): Data about the media.
204 - fingerprint (str): Security fingerprint data (optional).
205 - id (str): Identifier for the media (optional).
206 - ice-candidates: ICE candidates for media transport.
207 - And other transport specific data.
208
209 @return: Session ID (SID) for the initiated call session.
210
211 @raises exceptions.DataError: If media data is invalid or duplicate content name
212 (mid) is found.
213 """ 199 """
214 sid = str(uuid.uuid4())
215 metadata = self.parse_call_data(call_data)
216 contents = [] 200 contents = []
217 seen_names = set() 201 seen_names = set()
218 202
219 for media, media_data in call_data.items(): 203 for media, media_data in call_data.items():
220 if media not in ("audio", "video"): 204 if media not in ("audio", "video"):
221 continue 205 continue
222 content = { 206 content = {
223 "app_ns": NS_JINGLE_RTP, 207 "app_ns": NS_JINGLE_RTP,
224 "senders": "both", 208 "senders": media_data["senders"],
225 "transport_type": self._j.TRANSPORT_DATAGRAM, 209 "transport_type": self._j.TRANSPORT_DATAGRAM,
226 "app_kwargs": {"media": media, "media_data": media_data}, 210 "app_kwargs": {"media": media, "media_data": media_data},
227 "transport_data": { 211 "transport_data": {
228 "local_ice_data": { 212 "local_ice_data": {
229 "ufrag": metadata["ice-ufrag"], 213 "ufrag": metadata["ice-ufrag"],
239 raise exceptions.DataError( 223 raise exceptions.DataError(
240 f"Content name (mid) seen multiple times: {name}" 224 f"Content name (mid) seen multiple times: {name}"
241 ) 225 )
242 content["name"] = name 226 content["name"] = name
243 contents.append(content) 227 contents.append(content)
228 return contents
229
230 async def call_start(
231 self,
232 client: SatXMPPEntity,
233 peer_jid: jid.JID,
234 call_data: dict,
235 ) -> str:
236 """Initiate a call session with the given peer.
237
238 @param peer_jid: JID of the peer to initiate a call session with.
239 @param call_data: Dictionary containing data for the call. Must include SDP information.
240 The dict can have the following keys:
241 - sdp (str): SDP data for the call.
242 - metadata (dict): Additional metadata for the call (optional).
243 Each media type ("audio" and "video") in the SDP should have:
244 - application_data (dict): Data about the media.
245 - fingerprint (str): Security fingerprint data (optional).
246 - id (str): Identifier for the media (optional).
247 - ice-candidates: ICE candidates for media transport.
248 - And other transport specific data.
249
250 @return: Session ID (SID) for the initiated call session.
251
252 @raises exceptions.DataError: If media data is invalid or duplicate content name
253 (mid) is found.
254 """
255 metadata = self.parse_call_data(call_data)
256 contents = self.get_contents(call_data, metadata)
244 if not contents: 257 if not contents:
245 raise exceptions.DataError("no valid media data found: {call_data}") 258 raise exceptions.DataError("no valid media data found: {call_data}")
246 259
247 call_type = ( 260 call_type = (
248 C.META_SUBTYPE_CALL_VIDEO if "video" in call_data 261 C.META_SUBTYPE_CALL_VIDEO if "video" in call_data
249 else C.META_SUBTYPE_CALL_AUDIO 262 else C.META_SUBTYPE_CALL_AUDIO
250 ) 263 )
251 264
252 defer.ensureDeferred( 265 sid = await self._j.initiate(
253 self._j.initiate(
254 client, 266 client,
255 peer_jid, 267 peer_jid,
256 contents, 268 contents,
257 sid=sid,
258 call_type=call_type, 269 call_type=call_type,
259 metadata=metadata, 270 metadata=metadata,
260 peer_metadata={}, 271 peer_metadata={},
261 ) 272 )
262 )
263 return sid 273 return sid
264 274
265 def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None: 275 def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None:
266 client = self.host.get_client(profile) 276 client = self.host.get_client(profile)
267 session = self._j.get_session(client, session_id) 277 session = self._j.get_session(client, session_id)
336 resp_data = await dialog_d 346 resp_data = await dialog_d
337 347
338 accepted = not resp_data.get("cancelled", False) 348 accepted = not resp_data.get("cancelled", False)
339 349
340 if accepted: 350 if accepted:
341 session["call_accepted"] = True 351 session["pre_accepted"] = True
342 352
343 return accepted 353 return accepted
344 354
345 async def jingle_preflight( 355 async def jingle_preflight(
346 self, client: SatXMPPEntity, session: dict, description_elt: domish.Element 356 self, client: SatXMPPEntity, session: dict, description_elt: domish.Element
355 @param description_elt: The description element. It's parent attribute is used to 365 @param description_elt: The description element. It's parent attribute is used to
356 determine check siblings to see if it's an audio only or audio/video call. 366 determine check siblings to see if it's an audio only or audio/video call.
357 367
358 @raises exceptions.CancelError: If the user doesn't accept the incoming call. 368 @raises exceptions.CancelError: If the user doesn't accept the incoming call.
359 """ 369 """
360 if session.get("call_accepted", False): 370 if session.get("pre_accepted", False):
361 # the call is already accepted, nothing to do 371 # the call is already accepted, nothing to do
362 return 372 return
363 373
364 parent_elt = description_elt.parent 374 parent_elt = description_elt.parent
365 assert parent_elt is not None 375 assert parent_elt is not None
472 # we request confirmation only for the first content, all others are 482 # we request confirmation only for the first content, all others are
473 # automatically accepted. In practice, that means that the call confirmation 483 # automatically accepted. In practice, that means that the call confirmation
474 # is requested only once for audio and video contents. 484 # is requested only once for audio and video contents.
475 return True 485 return True
476 486
477 if not session.get("call_accepted", False): 487 if not session.get("pre_accepted", False):
478 if any( 488 if any(
479 c["desc_elt"].getAttribute("media") == "video" 489 c["desc_elt"].getAttribute("media") == "video"
480 for c in session["contents"].values() 490 for c in session["contents"].values()
481 ): 491 ):
482 call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO 492 call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO
503 client.profile, 513 client.profile,
504 ) 514 )
505 515
506 answer_sdp = await answer_sdp_d 516 answer_sdp = await answer_sdp_d
507 517
508 parsed_answer = mapping.parse_sdp(answer_sdp) 518 parsed_answer = mapping.parse_sdp(answer_sdp, session["role"])
509 session["metadata"].update(parsed_answer["metadata"]) 519 session["metadata"].update(parsed_answer["metadata"])
510 for media in ("audio", "video"): 520 self.propagate_data(session, parsed_answer)
521
522 return True
523
524 def propagate_data(self, session: dict, parsed_answer: dict) -> None:
525 """Propagate local SDP data to other contents"""
526 for media in ("audio", "video", "application"):
511 for content in session["contents"].values(): 527 for content in session["contents"].values():
512 if content["desc_elt"].getAttribute("media") == media: 528 try:
513 media_data = parsed_answer[media]
514 application_data = content["application_data"] 529 application_data = content["application_data"]
515 application_data["local_data"] = media_data["application_data"] 530 content_media = application_data["media"]
516 transport_data = content["transport_data"] 531 except KeyError:
517 local_ice_data = media_data["transport_data"] 532 pass
518 transport_data["local_ice_data"] = local_ice_data 533 else:
519 534 if content_media == media:
520 return True 535 media_data = parsed_answer[media]
536 application_data["local_data"] = media_data["application_data"]
537 transport_data = content["transport_data"]
538 local_ice_data = media_data["transport_data"]
539 transport_data["local_ice_data"] = local_ice_data
540
541 def send_answer_sdp(self, client: SatXMPPEntity, session: dict) -> None:
542 """Send answer SDP to frontend"""
543 if not session.get(ANSWER_SDP_SENT_KEY, False):
544 # we only send the signal once, as it means that the whole session is
545 # accepted
546 answer_sdp = mapping.generate_sdp_from_session(session)
547 self.host.bridge.call_setup(
548 session["id"],
549 data_format.serialise(
550 {
551 "role": session["role"],
552 "sdp": answer_sdp,
553 }
554 ),
555 client.profile,
556 )
557 session[ANSWER_SDP_SENT_KEY] = True
521 558
522 async def jingle_handler(self, client, action, session, content_name, desc_elt): 559 async def jingle_handler(self, client, action, session, content_name, desc_elt):
523 content_data = session["contents"][content_name] 560 content_data = session["contents"][content_name]
524 application_data = content_data["application_data"] 561 application_data = content_data["application_data"]
525 if action == self._j.A_PREPARE_CONFIRMATION: 562 if action == self._j.A_PREPARE_CONFIRMATION:
540 elif action == self._j.A_ACCEPTED_ACK: 577 elif action == self._j.A_ACCEPTED_ACK:
541 pass 578 pass
542 elif action == self._j.A_PREPARE_INITIATOR: 579 elif action == self._j.A_PREPARE_INITIATOR:
543 application_data["peer_data"] = mapping.parse_description(desc_elt) 580 application_data["peer_data"] = mapping.parse_description(desc_elt)
544 elif action == self._j.A_SESSION_ACCEPT: 581 elif action == self._j.A_SESSION_ACCEPT:
545 if content_name == next(iter(session["contents"])): 582 pass # self.send_answer_sdp(client, session)
546 # we only send the signal for first content, as it means that the whole
547 # session is accepted
548 answer_sdp = mapping.generate_sdp_from_session(session)
549 self.host.bridge.call_setup(
550 session["id"],
551 data_format.serialise(
552 {
553 "role": session["role"],
554 "sdp": answer_sdp,
555 }
556 ),
557 client.profile,
558 )
559 else: 583 else:
560 log.warning(f"FIXME: unmanaged action {action}") 584 log.warning(f"FIXME: unmanaged action {action}")
561 585
562 self.host.trigger.point( 586 self.host.trigger.point(
563 "XEP-0167_jingle_handler", 587 "XEP-0167_jingle_handler",