Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0176.py @ 4116:23fa52acf72c
plugin XEP-0167, XEP-0176: transport-info and ICE candidate sending are delayed if session is not active yet
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 21 Aug 2023 15:19:45 +0200 |
parents | 4b842c1fb686 |
children | e11b13418ba6 |
comparison
equal
deleted
inserted
replaced
4115:0da563780ffc | 4116:23fa52acf72c |
---|---|
34 | 34 |
35 from .plugin_xep_0166 import BaseTransportHandler | 35 from .plugin_xep_0166 import BaseTransportHandler |
36 | 36 |
37 log = getLogger(__name__) | 37 log = getLogger(__name__) |
38 | 38 |
39 NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1" | 39 NS_JINGLE_ICE_UDP = "urn:xmpp:jingle:transports:ice-udp:1" |
40 | 40 |
41 PLUGIN_INFO = { | 41 PLUGIN_INFO = { |
42 C.PI_NAME: "Jingle ICE-UDP Transport Method", | 42 C.PI_NAME: "Jingle ICE-UDP Transport Method", |
43 C.PI_IMPORT_NAME: "XEP-0176", | 43 C.PI_IMPORT_NAME: "XEP-0176", |
44 C.PI_TYPE: "XEP", | 44 C.PI_TYPE: "XEP", |
51 C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""), | 51 C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""), |
52 } | 52 } |
53 | 53 |
54 | 54 |
55 class XEP_0176(BaseTransportHandler): | 55 class XEP_0176(BaseTransportHandler): |
56 | |
57 def __init__(self, host): | 56 def __init__(self, host): |
58 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") | 57 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") |
59 self.host = host | 58 self.host = host |
60 self._j = host.plugins["XEP-0166"] # shortcut to access jingle | 59 self._j = host.plugins["XEP-0166"] # shortcut to access jingle |
61 self._j.register_transport( | 60 self._j.register_transport( |
84 session_id: str, | 83 session_id: str, |
85 media_ice_data_s: str, | 84 media_ice_data_s: str, |
86 profile_key: str, | 85 profile_key: str, |
87 ): | 86 ): |
88 client = self.host.get_client(profile_key) | 87 client = self.host.get_client(profile_key) |
89 return defer.ensureDeferred(self.ice_candidates_add( | 88 return defer.ensureDeferred( |
90 client, | 89 self.ice_candidates_add( |
91 session_id, | 90 client, |
92 data_format.deserialise(media_ice_data_s), | 91 session_id, |
93 )) | 92 data_format.deserialise(media_ice_data_s), |
93 ) | |
94 ) | |
94 | 95 |
95 def build_transport(self, ice_data: dict) -> domish.Element: | 96 def build_transport(self, ice_data: dict) -> domish.Element: |
96 """Generate <transport> element from ICE data | 97 """Generate <transport> element from ICE data |
97 | 98 |
98 @param ice_data: a dict containing the following keys: | 99 @param ice_data: a dict containing the following keys: |
117 @return: A <transport> element. | 118 @return: A <transport> element. |
118 """ | 119 """ |
119 try: | 120 try: |
120 ufrag: str = ice_data["ufrag"] | 121 ufrag: str = ice_data["ufrag"] |
121 pwd: str = ice_data["pwd"] | 122 pwd: str = ice_data["pwd"] |
122 candidates: List[dict] = ice_data["candidates"] | |
123 except KeyError as e: | 123 except KeyError as e: |
124 raise exceptions.DataError(f"ICE {e} must be provided") | 124 raise exceptions.DataError(f"ICE {e} must be provided") |
125 candidates: List[dict] = ice_data.get("candidates", []) | |
125 | 126 |
126 candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True) | 127 candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True) |
127 transport_elt = domish.Element( | 128 transport_elt = domish.Element( |
128 (NS_JINGLE_ICE_UDP, "transport"), | 129 (NS_JINGLE_ICE_UDP, "transport"), attribs={"ufrag": ufrag, "pwd": pwd} |
129 attribs={"ufrag": ufrag, "pwd": pwd} | |
130 ) | 130 ) |
131 | 131 |
132 for candidate in candidates: | 132 for candidate in candidates: |
133 try: | 133 try: |
134 candidate_elt = transport_elt.addElement("candidate") | 134 candidate_elt = transport_elt.addElement("candidate") |
160 | 160 |
161 @param transport_elt: <transport> element | 161 @param transport_elt: <transport> element |
162 @return: ICE data (as in [build_transport]) | 162 @return: ICE data (as in [build_transport]) |
163 """ | 163 """ |
164 try: | 164 try: |
165 ice_data = { | 165 ice_data = {"ufrag": transport_elt["ufrag"], "pwd": transport_elt["pwd"]} |
166 "ufrag": transport_elt["ufrag"], | |
167 "pwd": transport_elt["pwd"] | |
168 } | |
169 except KeyError as e: | 166 except KeyError as e: |
170 raise exceptions.DataError( | 167 raise exceptions.DataError( |
171 f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}" | 168 f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}" |
172 ) | 169 ) |
173 ice_data["candidates"] = ice_candidates = [] | 170 ice_data["candidates"] = ice_candidates = [] |
254 transport_data = content_data["transport_data"] | 251 transport_data = content_data["transport_data"] |
255 if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): | 252 if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): |
256 peer_ice_data = self.parse_transport(transport_elt) | 253 peer_ice_data = self.parse_transport(transport_elt) |
257 transport_data["peer_ice_data"] = peer_ice_data | 254 transport_data["peer_ice_data"] = peer_ice_data |
258 | 255 |
259 elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): | 256 elif action == self._j.A_ACCEPTED_ACK: |
257 buffer = session.pop("XEP-0176_handler_buffer", None) | |
258 if buffer: | |
259 log.debug("replaying buffered events") | |
260 for args in buffer: | |
261 await self.jingle_handler(*args) | |
262 elif action == self._j.A_PREPARE_RESPONDER: | |
260 pass | 263 pass |
261 | 264 |
262 elif action == self._j.A_SESSION_ACCEPT: | 265 elif action == self._j.A_SESSION_ACCEPT: |
263 pass | 266 # we check if we have any buffered ICE candidates, and send them if it's the |
267 # case | |
268 media_type = content_data["application_data"].get("media") | |
269 try: | |
270 buffer = session["XEP-0176_buffer"] | |
271 buffered_ice_data = buffer.pop(media_type) | |
272 except KeyError: | |
273 pass | |
274 else: | |
275 if not buffer: | |
276 del session["XEP-0176_buffer"] | |
277 transport_elt = self.build_transport(buffered_ice_data) | |
278 iq_elt, __ = self._j.build_action( | |
279 client, | |
280 self._j.A_TRANSPORT_INFO, | |
281 session, | |
282 content_name, | |
283 context_elt=transport_elt, | |
284 ) | |
285 await iq_elt.send() | |
264 | 286 |
265 elif action == self._j.A_START: | 287 elif action == self._j.A_START: |
266 pass | 288 pass |
267 | 289 |
268 elif action == self._j.A_SESSION_INITIATE: | 290 elif action == self._j.A_SESSION_INITIATE: |
269 # responder side, we give our candidates | 291 # responder side, we give our candidates |
270 transport_elt = self.build_transport(transport_data["local_ice_data"]) | 292 transport_elt = self.build_transport(transport_data["local_ice_data"]) |
271 elif action == self._j.A_TRANSPORT_INFO: | 293 elif action == self._j.A_TRANSPORT_INFO: |
294 if session["state"] == self._j.STATE_PENDING: | |
295 # Session is not yet active; we buffer the arguments to replay them | |
296 # when the session becomes active. This makes the frontend's life easier. | |
297 log.debug("session is not active yet, buffering transport-info element") | |
298 buffer = session.setdefault("XEP-0176_handler_buffer", []) | |
299 buffer.append([client, action, session, content_name, transport_elt]) | |
300 return transport_elt | |
272 | 301 |
273 media_type = content_data["application_data"].get("media") | 302 media_type = content_data["application_data"].get("media") |
274 new_ice_data = self.parse_transport(transport_elt) | 303 new_ice_data = self.parse_transport(transport_elt) |
275 restart = self.update_candidates(transport_data, new_ice_data, local=False) | 304 restart = self.update_candidates(transport_data, new_ice_data, local=False) |
276 if restart: | 305 if restart: |
281 self.host.bridge.ice_restart(session["id"], "peer", client.profile) | 310 self.host.bridge.ice_restart(session["id"], "peer", client.profile) |
282 | 311 |
283 self.host.bridge.ice_candidates_new( | 312 self.host.bridge.ice_candidates_new( |
284 session["id"], | 313 session["id"], |
285 data_format.serialise({media_type: new_ice_data}), | 314 data_format.serialise({media_type: new_ice_data}), |
286 client.profile | 315 client.profile, |
287 ) | 316 ) |
288 elif action == self._j.A_DESTROY: | 317 elif action == self._j.A_DESTROY: |
289 pass | 318 pass |
290 else: | 319 else: |
291 log.warning("FIXME: unmanaged action {}".format(action)) | 320 log.warning("FIXME: unmanaged action {}".format(action)) |
292 | 321 |
293 return transport_elt | 322 return transport_elt |
294 | 323 |
301 reason_elt: domish.Element, | 330 reason_elt: domish.Element, |
302 ) -> None: | 331 ) -> None: |
303 log.debug("ICE-UDP session terminated") | 332 log.debug("ICE-UDP session terminated") |
304 | 333 |
305 def update_candidates( | 334 def update_candidates( |
306 self, | 335 self, transport_data: dict, new_ice_data: dict, local: bool |
307 transport_data: dict, | |
308 new_ice_data: dict, | |
309 local: bool | |
310 ) -> bool: | 336 ) -> bool: |
311 """Update ICE candidates when new one are received | 337 """Update ICE candidates when new one are received |
312 | 338 |
313 @param transport_data: transport_data of the content linked to the candidates | 339 @param transport_data: transport_data of the content linked to the candidates |
314 @param new_ice_data: new ICE data, in the same format as returned | 340 @param new_ice_data: new ICE data, in the same format as returned |
318 """ | 344 """ |
319 key = "local_ice_data" if local else "peer_ice_data" | 345 key = "local_ice_data" if local else "peer_ice_data" |
320 try: | 346 try: |
321 ice_data = transport_data[key] | 347 ice_data = transport_data[key] |
322 except KeyError: | 348 except KeyError: |
323 log.warning( | 349 log.warning(f"no {key} available") |
324 f"no {key} available" | |
325 ) | |
326 transport_data[key] = new_ice_data | 350 transport_data[key] = new_ice_data |
327 else: | 351 else: |
328 if ( | 352 if ( |
329 new_ice_data["ufrag"] != ice_data["ufrag"] | 353 new_ice_data["ufrag"] != ice_data["ufrag"] |
330 or new_ice_data["pwd"] != ice_data["pwd"] | 354 or new_ice_data["pwd"] != ice_data["pwd"] |
334 ice_data["candidates"] = new_ice_data["candidates"] | 358 ice_data["candidates"] = new_ice_data["candidates"] |
335 return True | 359 return True |
336 return False | 360 return False |
337 | 361 |
338 async def ice_candidates_add( | 362 async def ice_candidates_add( |
339 self, | 363 self, client: SatXMPPEntity, session_id: str, media_ice_data: Dict[str, dict] |
340 client: SatXMPPEntity, | |
341 session_id: str, | |
342 media_ice_data: Dict[str, dict] | |
343 ) -> None: | 364 ) -> None: |
344 """Called when a new ICE candidates are available for a session | 365 """Called when a new ICE candidates are available for a session |
345 | 366 |
346 @param session_id: Session ID | 367 @param session_id: Session ID |
347 @param candidates: a map from media type (audio, video) to ICE data | 368 @param media_ice_data: a map from media type (audio, video) to ICE data |
348 ICE data must be in the same format as in [self.parse_transport] | 369 ICE data must be in the same format as in [self.parse_transport] |
349 """ | 370 """ |
350 session = self._j.get_session(client, session_id) | 371 session = self._j.get_session(client, session_id) |
351 iq_elt: Optional[domish.Element] = None | 372 iq_elt: Optional[domish.Element] = None |
352 | 373 |
353 for media_type, new_ice_data in media_ice_data.items(): | 374 for media_type, new_ice_data in media_ice_data.items(): |
375 if session["state"] == self._j.STATE_PENDING: | |
376 log.debug(f"session not active, buffering") | |
377 buffer = session.setdefault("XEP-0176_buffer", {}) | |
378 media_buffer = buffer.setdefault(media_type, {}) | |
379 | |
380 for key in ["ufrag", "pwd"]: | |
381 if key not in media_buffer: | |
382 media_buffer[key] = new_ice_data[key] | |
383 else: | |
384 if media_buffer[key] != new_ice_data[key]: | |
385 log.warning( | |
386 f"{key} conflict, new value will replace old one\n" | |
387 f"buffer={media_buffer[key]!r}\n" | |
388 f"new={new_ice_data[key]!r}" | |
389 ) | |
390 media_buffer[key] = new_ice_data[key] | |
391 | |
392 media_buffer.setdefault("candidates", []).extend( | |
393 new_ice_data["candidates"] | |
394 ) | |
395 continue | |
396 | |
354 for content_name, content_data in session["contents"].items(): | 397 for content_name, content_data in session["contents"].items(): |
355 if content_data["application_data"].get("media") == media_type: | 398 if content_data["application_data"].get("media") == media_type: |
356 break | 399 break |
357 else: | 400 else: |
358 log.warning( | 401 log.warning(f"no media of type {media_type} has been found") |
359 "no media of type {media_type} has been found" | |
360 ) | |
361 continue | 402 continue |
362 restart = self.update_candidates( | 403 restart = self.update_candidates( |
363 content_data["transport_data"], new_ice_data, True | 404 content_data["transport_data"], new_ice_data, True |
364 ) | 405 ) |
365 if restart: | 406 if restart: |
368 f"[{client.profile}]" | 409 f"[{client.profile}]" |
369 ) | 410 ) |
370 self.host.bridge.ice_restart(session["id"], "local", client.profile) | 411 self.host.bridge.ice_restart(session["id"], "local", client.profile) |
371 transport_elt = self.build_transport(new_ice_data) | 412 transport_elt = self.build_transport(new_ice_data) |
372 iq_elt, __ = self._j.build_action( | 413 iq_elt, __ = self._j.build_action( |
373 client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt, | 414 client, |
374 transport_elt=transport_elt | 415 self._j.A_TRANSPORT_INFO, |
416 session, | |
417 content_name, | |
418 iq_elt=iq_elt, | |
419 context_elt=transport_elt, | |
375 ) | 420 ) |
376 | 421 |
377 if iq_elt is not None: | 422 if iq_elt is not None: |
378 try: | 423 try: |
379 await iq_elt.send() | 424 await iq_elt.send() |
380 except Exception as e: | 425 except Exception as e: |
381 log.warning(f"Could not send new ICE candidates: {e}") | 426 log.warning(f"Could not send new ICE candidates: {e}") |
382 | 427 |
383 else: | |
384 log.error("Could not find any content to apply new ICE candidates") | |
385 | |
386 | 428 |
387 @implementer(iwokkel.IDisco) | 429 @implementer(iwokkel.IDisco) |
388 class XEP_0176_handler(XMPPHandler): | 430 class XEP_0176_handler(XMPPHandler): |
389 | |
390 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | 431 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): |
391 return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)] | 432 return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)] |
392 | 433 |
393 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | 434 def getDiscoItems(self, requestor, target, nodeIdentifier=""): |
394 return [] | 435 return [] |