Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0167/__init__.py @ 4240:79c8a70e1813
backend, frontend: prepare remote control:
This is a series of changes necessary to prepare the implementation of remote control
feature:
- XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several
applications are working in a same session, to know which one must be handled first.
Will be used to make Remote Control have precedence over Call content.
- XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the
benefit to have methods called in parallels is very low, and it cause a lot of trouble
as we can't predict order. Methods are now called sequentially so workflow can be
predicted.
- XEP-0167: fix `senders` XMPP attribute <=> SDP mapping
- XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so
the same key can be used with other jingle applications.
- XEP-0167, XEP-0343: move some method to XEP-0167
- XEP-0353: use new `priority` feature to call preflight methods of applications according
to it.
- frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism
based on Pydantic models. It is now possible to have has many Data Channel as necessary,
to have them in addition to A/V streams, to specify manually GStreamer sources and
sinks, etc.
- frontend (webrtc): rework of the pipeline to reduce latency.
- frontend: new `portal_desktop` method. Screenshare portal handling has been moved there,
and RemoteDesktop portal has been added.
- frontend (webrtc): fix `extract_ufrag_pwd` method.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:41 +0200 |
parents | e11b13418ba6 |
children | a7d4007a8fa5 |
comparison
equal
deleted
inserted
replaced
4239:a38559e6d6e2 | 4240:79c8a70e1813 |
---|---|
69 "unhold", | 69 "unhold", |
70 "mute", | 70 "mute", |
71 "unmute", | 71 "unmute", |
72 "ringing", | 72 "ringing", |
73 ) | 73 ) |
74 ANSWER_SDP_SENT_KEY = "answer_sdp_sent" | |
74 | 75 |
75 | 76 |
76 class XEP_0167(BaseApplicationHandler): | 77 class XEP_0167(BaseApplicationHandler): |
78 namespace = NS_JINGLE_RTP | |
79 | |
77 def __init__(self, host): | 80 def __init__(self, host): |
78 log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') | 81 log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') |
79 self.host = host | 82 self.host = host |
80 # FIXME: to be removed once host is accessible from global var | 83 # FIXME: to be removed once host is accessible from global var |
81 mapping.host = host | 84 mapping.host = host |
145 def parse_call_data(self, call_data: dict) -> dict: | 148 def parse_call_data(self, call_data: dict) -> dict: |
146 """Parse ``call_data`` and return corresponding contents end metadata""" | 149 """Parse ``call_data`` and return corresponding contents end metadata""" |
147 metadata = call_data.get("metadata") or {} | 150 metadata = call_data.get("metadata") or {} |
148 | 151 |
149 if "sdp" in call_data: | 152 if "sdp" in call_data: |
150 sdp_data = mapping.parse_sdp(call_data["sdp"]) | 153 sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR) |
151 to_delete = set() | 154 to_delete = set() |
152 for media, data in sdp_data.items(): | 155 for media, data in sdp_data.items(): |
153 if media not in ("audio", "video", "application"): | 156 if media not in ("audio", "video", "application"): |
154 continue | 157 continue |
155 to_delete.add(media) | 158 to_delete.add(media) |
184 del sdp_data[media] | 187 del sdp_data[media] |
185 metadata.update(sdp_data.get("metadata", {})) | 188 metadata.update(sdp_data.get("metadata", {})) |
186 | 189 |
187 return metadata | 190 return metadata |
188 | 191 |
189 async def call_start( | 192 def get_contents(self, call_data: dict, metadata: dict) -> list[dict]: |
190 self, | 193 """Generate call related contents. |
191 client: SatXMPPEntity, | 194 |
192 peer_jid: jid.JID, | 195 @param call_data: Call data after being parsed by [parse_call_data] |
193 call_data: dict, | 196 @param metadata: Metadata as returned by [parse_call_data] |
194 ) -> str: | 197 @return: List of contents to be used with [jingle.initiate]. |
195 """Initiate a call session with the given peer. | 198 |
196 | |
197 @param peer_jid: JID of the peer to initiate a call session with. | |
198 @param call_data: Dictionary containing data for the call. Must include SDP information. | |
199 The dict can have the following keys: | |
200 - sdp (str): SDP data for the call. | |
201 - metadata (dict): Additional metadata for the call (optional). | |
202 Each media type ("audio" and "video") in the SDP should have: | |
203 - application_data (dict): Data about the media. | |
204 - fingerprint (str): Security fingerprint data (optional). | |
205 - id (str): Identifier for the media (optional). | |
206 - ice-candidates: ICE candidates for media transport. | |
207 - And other transport specific data. | |
208 | |
209 @return: Session ID (SID) for the initiated call session. | |
210 | |
211 @raises exceptions.DataError: If media data is invalid or duplicate content name | |
212 (mid) is found. | |
213 """ | 199 """ |
214 sid = str(uuid.uuid4()) | |
215 metadata = self.parse_call_data(call_data) | |
216 contents = [] | 200 contents = [] |
217 seen_names = set() | 201 seen_names = set() |
218 | 202 |
219 for media, media_data in call_data.items(): | 203 for media, media_data in call_data.items(): |
220 if media not in ("audio", "video"): | 204 if media not in ("audio", "video"): |
221 continue | 205 continue |
222 content = { | 206 content = { |
223 "app_ns": NS_JINGLE_RTP, | 207 "app_ns": NS_JINGLE_RTP, |
224 "senders": "both", | 208 "senders": media_data["senders"], |
225 "transport_type": self._j.TRANSPORT_DATAGRAM, | 209 "transport_type": self._j.TRANSPORT_DATAGRAM, |
226 "app_kwargs": {"media": media, "media_data": media_data}, | 210 "app_kwargs": {"media": media, "media_data": media_data}, |
227 "transport_data": { | 211 "transport_data": { |
228 "local_ice_data": { | 212 "local_ice_data": { |
229 "ufrag": metadata["ice-ufrag"], | 213 "ufrag": metadata["ice-ufrag"], |
239 raise exceptions.DataError( | 223 raise exceptions.DataError( |
240 f"Content name (mid) seen multiple times: {name}" | 224 f"Content name (mid) seen multiple times: {name}" |
241 ) | 225 ) |
242 content["name"] = name | 226 content["name"] = name |
243 contents.append(content) | 227 contents.append(content) |
228 return contents | |
229 | |
230 async def call_start( | |
231 self, | |
232 client: SatXMPPEntity, | |
233 peer_jid: jid.JID, | |
234 call_data: dict, | |
235 ) -> str: | |
236 """Initiate a call session with the given peer. | |
237 | |
238 @param peer_jid: JID of the peer to initiate a call session with. | |
239 @param call_data: Dictionary containing data for the call. Must include SDP information. | |
240 The dict can have the following keys: | |
241 - sdp (str): SDP data for the call. | |
242 - metadata (dict): Additional metadata for the call (optional). | |
243 Each media type ("audio" and "video") in the SDP should have: | |
244 - application_data (dict): Data about the media. | |
245 - fingerprint (str): Security fingerprint data (optional). | |
246 - id (str): Identifier for the media (optional). | |
247 - ice-candidates: ICE candidates for media transport. | |
248 - And other transport specific data. | |
249 | |
250 @return: Session ID (SID) for the initiated call session. | |
251 | |
252 @raises exceptions.DataError: If media data is invalid or duplicate content name | |
253 (mid) is found. | |
254 """ | |
255 metadata = self.parse_call_data(call_data) | |
256 contents = self.get_contents(call_data, metadata) | |
244 if not contents: | 257 if not contents: |
245 raise exceptions.DataError("no valid media data found: {call_data}") | 258 raise exceptions.DataError("no valid media data found: {call_data}") |
246 | 259 |
247 call_type = ( | 260 call_type = ( |
248 C.META_SUBTYPE_CALL_VIDEO if "video" in call_data | 261 C.META_SUBTYPE_CALL_VIDEO if "video" in call_data |
249 else C.META_SUBTYPE_CALL_AUDIO | 262 else C.META_SUBTYPE_CALL_AUDIO |
250 ) | 263 ) |
251 | 264 |
252 defer.ensureDeferred( | 265 sid = await self._j.initiate( |
253 self._j.initiate( | |
254 client, | 266 client, |
255 peer_jid, | 267 peer_jid, |
256 contents, | 268 contents, |
257 sid=sid, | |
258 call_type=call_type, | 269 call_type=call_type, |
259 metadata=metadata, | 270 metadata=metadata, |
260 peer_metadata={}, | 271 peer_metadata={}, |
261 ) | 272 ) |
262 ) | |
263 return sid | 273 return sid |
264 | 274 |
265 def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None: | 275 def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None: |
266 client = self.host.get_client(profile) | 276 client = self.host.get_client(profile) |
267 session = self._j.get_session(client, session_id) | 277 session = self._j.get_session(client, session_id) |
336 resp_data = await dialog_d | 346 resp_data = await dialog_d |
337 | 347 |
338 accepted = not resp_data.get("cancelled", False) | 348 accepted = not resp_data.get("cancelled", False) |
339 | 349 |
340 if accepted: | 350 if accepted: |
341 session["call_accepted"] = True | 351 session["pre_accepted"] = True |
342 | 352 |
343 return accepted | 353 return accepted |
344 | 354 |
345 async def jingle_preflight( | 355 async def jingle_preflight( |
346 self, client: SatXMPPEntity, session: dict, description_elt: domish.Element | 356 self, client: SatXMPPEntity, session: dict, description_elt: domish.Element |
355 @param description_elt: The description element. It's parent attribute is used to | 365 @param description_elt: The description element. It's parent attribute is used to |
356 determine check siblings to see if it's an audio only or audio/video call. | 366 determine check siblings to see if it's an audio only or audio/video call. |
357 | 367 |
358 @raises exceptions.CancelError: If the user doesn't accept the incoming call. | 368 @raises exceptions.CancelError: If the user doesn't accept the incoming call. |
359 """ | 369 """ |
360 if session.get("call_accepted", False): | 370 if session.get("pre_accepted", False): |
361 # the call is already accepted, nothing to do | 371 # the call is already accepted, nothing to do |
362 return | 372 return |
363 | 373 |
364 parent_elt = description_elt.parent | 374 parent_elt = description_elt.parent |
365 assert parent_elt is not None | 375 assert parent_elt is not None |
472 # we request confirmation only for the first content, all others are | 482 # we request confirmation only for the first content, all others are |
473 # automatically accepted. In practice, that means that the call confirmation | 483 # automatically accepted. In practice, that means that the call confirmation |
474 # is requested only once for audio and video contents. | 484 # is requested only once for audio and video contents. |
475 return True | 485 return True |
476 | 486 |
477 if not session.get("call_accepted", False): | 487 if not session.get("pre_accepted", False): |
478 if any( | 488 if any( |
479 c["desc_elt"].getAttribute("media") == "video" | 489 c["desc_elt"].getAttribute("media") == "video" |
480 for c in session["contents"].values() | 490 for c in session["contents"].values() |
481 ): | 491 ): |
482 call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO | 492 call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO |
503 client.profile, | 513 client.profile, |
504 ) | 514 ) |
505 | 515 |
506 answer_sdp = await answer_sdp_d | 516 answer_sdp = await answer_sdp_d |
507 | 517 |
508 parsed_answer = mapping.parse_sdp(answer_sdp) | 518 parsed_answer = mapping.parse_sdp(answer_sdp, session["role"]) |
509 session["metadata"].update(parsed_answer["metadata"]) | 519 session["metadata"].update(parsed_answer["metadata"]) |
510 for media in ("audio", "video"): | 520 self.propagate_data(session, parsed_answer) |
521 | |
522 return True | |
523 | |
524 def propagate_data(self, session: dict, parsed_answer: dict) -> None: | |
525 """Propagate local SDP data to other contents""" | |
526 for media in ("audio", "video", "application"): | |
511 for content in session["contents"].values(): | 527 for content in session["contents"].values(): |
512 if content["desc_elt"].getAttribute("media") == media: | 528 try: |
513 media_data = parsed_answer[media] | |
514 application_data = content["application_data"] | 529 application_data = content["application_data"] |
515 application_data["local_data"] = media_data["application_data"] | 530 content_media = application_data["media"] |
516 transport_data = content["transport_data"] | 531 except KeyError: |
517 local_ice_data = media_data["transport_data"] | 532 pass |
518 transport_data["local_ice_data"] = local_ice_data | 533 else: |
519 | 534 if content_media == media: |
520 return True | 535 media_data = parsed_answer[media] |
536 application_data["local_data"] = media_data["application_data"] | |
537 transport_data = content["transport_data"] | |
538 local_ice_data = media_data["transport_data"] | |
539 transport_data["local_ice_data"] = local_ice_data | |
540 | |
541 def send_answer_sdp(self, client: SatXMPPEntity, session: dict) -> None: | |
542 """Send answer SDP to frontend""" | |
543 if not session.get(ANSWER_SDP_SENT_KEY, False): | |
544 # we only send the signal once, as it means that the whole session is | |
545 # accepted | |
546 answer_sdp = mapping.generate_sdp_from_session(session) | |
547 self.host.bridge.call_setup( | |
548 session["id"], | |
549 data_format.serialise( | |
550 { | |
551 "role": session["role"], | |
552 "sdp": answer_sdp, | |
553 } | |
554 ), | |
555 client.profile, | |
556 ) | |
557 session[ANSWER_SDP_SENT_KEY] = True | |
521 | 558 |
522 async def jingle_handler(self, client, action, session, content_name, desc_elt): | 559 async def jingle_handler(self, client, action, session, content_name, desc_elt): |
523 content_data = session["contents"][content_name] | 560 content_data = session["contents"][content_name] |
524 application_data = content_data["application_data"] | 561 application_data = content_data["application_data"] |
525 if action == self._j.A_PREPARE_CONFIRMATION: | 562 if action == self._j.A_PREPARE_CONFIRMATION: |
540 elif action == self._j.A_ACCEPTED_ACK: | 577 elif action == self._j.A_ACCEPTED_ACK: |
541 pass | 578 pass |
542 elif action == self._j.A_PREPARE_INITIATOR: | 579 elif action == self._j.A_PREPARE_INITIATOR: |
543 application_data["peer_data"] = mapping.parse_description(desc_elt) | 580 application_data["peer_data"] = mapping.parse_description(desc_elt) |
544 elif action == self._j.A_SESSION_ACCEPT: | 581 elif action == self._j.A_SESSION_ACCEPT: |
545 if content_name == next(iter(session["contents"])): | 582 pass # self.send_answer_sdp(client, session) |
546 # we only send the signal for first content, as it means that the whole | |
547 # session is accepted | |
548 answer_sdp = mapping.generate_sdp_from_session(session) | |
549 self.host.bridge.call_setup( | |
550 session["id"], | |
551 data_format.serialise( | |
552 { | |
553 "role": session["role"], | |
554 "sdp": answer_sdp, | |
555 } | |
556 ), | |
557 client.profile, | |
558 ) | |
559 else: | 583 else: |
560 log.warning(f"FIXME: unmanaged action {action}") | 584 log.warning(f"FIXME: unmanaged action {action}") |
561 | 585 |
562 self.host.trigger.point( | 586 self.host.trigger.point( |
563 "XEP-0167_jingle_handler", | 587 "XEP-0167_jingle_handler", |