comparison libervia/backend/plugins/plugin_xep_0176.py @ 4116:23fa52acf72c

plugin XEP-0167, XEP-0176: transport-info and ICE candidate sending are delayed if session is not active yet
author Goffi <goffi@goffi.org>
date Mon, 21 Aug 2023 15:19:45 +0200
parents 4b842c1fb686
children e11b13418ba6
comparison
equal deleted inserted replaced
4115:0da563780ffc 4116:23fa52acf72c
34 34
35 from .plugin_xep_0166 import BaseTransportHandler 35 from .plugin_xep_0166 import BaseTransportHandler
36 36
37 log = getLogger(__name__) 37 log = getLogger(__name__)
38 38
39 NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1" 39 NS_JINGLE_ICE_UDP = "urn:xmpp:jingle:transports:ice-udp:1"
40 40
41 PLUGIN_INFO = { 41 PLUGIN_INFO = {
42 C.PI_NAME: "Jingle ICE-UDP Transport Method", 42 C.PI_NAME: "Jingle ICE-UDP Transport Method",
43 C.PI_IMPORT_NAME: "XEP-0176", 43 C.PI_IMPORT_NAME: "XEP-0176",
44 C.PI_TYPE: "XEP", 44 C.PI_TYPE: "XEP",
51 C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""), 51 C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""),
52 } 52 }
53 53
54 54
55 class XEP_0176(BaseTransportHandler): 55 class XEP_0176(BaseTransportHandler):
56
57 def __init__(self, host): 56 def __init__(self, host):
58 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") 57 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
59 self.host = host 58 self.host = host
60 self._j = host.plugins["XEP-0166"] # shortcut to access jingle 59 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
61 self._j.register_transport( 60 self._j.register_transport(
84 session_id: str, 83 session_id: str,
85 media_ice_data_s: str, 84 media_ice_data_s: str,
86 profile_key: str, 85 profile_key: str,
87 ): 86 ):
88 client = self.host.get_client(profile_key) 87 client = self.host.get_client(profile_key)
89 return defer.ensureDeferred(self.ice_candidates_add( 88 return defer.ensureDeferred(
90 client, 89 self.ice_candidates_add(
91 session_id, 90 client,
92 data_format.deserialise(media_ice_data_s), 91 session_id,
93 )) 92 data_format.deserialise(media_ice_data_s),
93 )
94 )
94 95
95 def build_transport(self, ice_data: dict) -> domish.Element: 96 def build_transport(self, ice_data: dict) -> domish.Element:
96 """Generate <transport> element from ICE data 97 """Generate <transport> element from ICE data
97 98
98 @param ice_data: a dict containing the following keys: 99 @param ice_data: a dict containing the following keys:
117 @return: A <transport> element. 118 @return: A <transport> element.
118 """ 119 """
119 try: 120 try:
120 ufrag: str = ice_data["ufrag"] 121 ufrag: str = ice_data["ufrag"]
121 pwd: str = ice_data["pwd"] 122 pwd: str = ice_data["pwd"]
122 candidates: List[dict] = ice_data["candidates"]
123 except KeyError as e: 123 except KeyError as e:
124 raise exceptions.DataError(f"ICE {e} must be provided") 124 raise exceptions.DataError(f"ICE {e} must be provided")
125 candidates: List[dict] = ice_data.get("candidates", [])
125 126
126 candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True) 127 candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True)
127 transport_elt = domish.Element( 128 transport_elt = domish.Element(
128 (NS_JINGLE_ICE_UDP, "transport"), 129 (NS_JINGLE_ICE_UDP, "transport"), attribs={"ufrag": ufrag, "pwd": pwd}
129 attribs={"ufrag": ufrag, "pwd": pwd}
130 ) 130 )
131 131
132 for candidate in candidates: 132 for candidate in candidates:
133 try: 133 try:
134 candidate_elt = transport_elt.addElement("candidate") 134 candidate_elt = transport_elt.addElement("candidate")
160 160
161 @param transport_elt: <transport> element 161 @param transport_elt: <transport> element
162 @return: ICE data (as in [build_transport]) 162 @return: ICE data (as in [build_transport])
163 """ 163 """
164 try: 164 try:
165 ice_data = { 165 ice_data = {"ufrag": transport_elt["ufrag"], "pwd": transport_elt["pwd"]}
166 "ufrag": transport_elt["ufrag"],
167 "pwd": transport_elt["pwd"]
168 }
169 except KeyError as e: 166 except KeyError as e:
170 raise exceptions.DataError( 167 raise exceptions.DataError(
171 f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}" 168 f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}"
172 ) 169 )
173 ice_data["candidates"] = ice_candidates = [] 170 ice_data["candidates"] = ice_candidates = []
254 transport_data = content_data["transport_data"] 251 transport_data = content_data["transport_data"]
255 if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): 252 if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
256 peer_ice_data = self.parse_transport(transport_elt) 253 peer_ice_data = self.parse_transport(transport_elt)
257 transport_data["peer_ice_data"] = peer_ice_data 254 transport_data["peer_ice_data"] = peer_ice_data
258 255
259 elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): 256 elif action == self._j.A_ACCEPTED_ACK:
257 buffer = session.pop("XEP-0176_handler_buffer", None)
258 if buffer:
259 log.debug("replaying buffered events")
260 for args in buffer:
261 await self.jingle_handler(*args)
262 elif action == self._j.A_PREPARE_RESPONDER:
260 pass 263 pass
261 264
262 elif action == self._j.A_SESSION_ACCEPT: 265 elif action == self._j.A_SESSION_ACCEPT:
263 pass 266 # we check if we have any buffered ICE candidates, and send them if it's the
267 # case
268 media_type = content_data["application_data"].get("media")
269 try:
270 buffer = session["XEP-0176_buffer"]
271 buffered_ice_data = buffer.pop(media_type)
272 except KeyError:
273 pass
274 else:
275 if not buffer:
276 del session["XEP-0176_buffer"]
277 transport_elt = self.build_transport(buffered_ice_data)
278 iq_elt, __ = self._j.build_action(
279 client,
280 self._j.A_TRANSPORT_INFO,
281 session,
282 content_name,
283 context_elt=transport_elt,
284 )
285 await iq_elt.send()
264 286
265 elif action == self._j.A_START: 287 elif action == self._j.A_START:
266 pass 288 pass
267 289
268 elif action == self._j.A_SESSION_INITIATE: 290 elif action == self._j.A_SESSION_INITIATE:
269 # responder side, we give our candidates 291 # responder side, we give our candidates
270 transport_elt = self.build_transport(transport_data["local_ice_data"]) 292 transport_elt = self.build_transport(transport_data["local_ice_data"])
271 elif action == self._j.A_TRANSPORT_INFO: 293 elif action == self._j.A_TRANSPORT_INFO:
294 if session["state"] == self._j.STATE_PENDING:
295 # Session is not yet active; we buffer the arguments to replay them
296 # when the session becomes active. This makes the frontend's life easier.
297 log.debug("session is not active yet, buffering transport-info element")
298 buffer = session.setdefault("XEP-0176_handler_buffer", [])
299 buffer.append([client, action, session, content_name, transport_elt])
300 return transport_elt
272 301
273 media_type = content_data["application_data"].get("media") 302 media_type = content_data["application_data"].get("media")
274 new_ice_data = self.parse_transport(transport_elt) 303 new_ice_data = self.parse_transport(transport_elt)
275 restart = self.update_candidates(transport_data, new_ice_data, local=False) 304 restart = self.update_candidates(transport_data, new_ice_data, local=False)
276 if restart: 305 if restart:
281 self.host.bridge.ice_restart(session["id"], "peer", client.profile) 310 self.host.bridge.ice_restart(session["id"], "peer", client.profile)
282 311
283 self.host.bridge.ice_candidates_new( 312 self.host.bridge.ice_candidates_new(
284 session["id"], 313 session["id"],
285 data_format.serialise({media_type: new_ice_data}), 314 data_format.serialise({media_type: new_ice_data}),
286 client.profile 315 client.profile,
287 ) 316 )
288 elif action == self._j.A_DESTROY: 317 elif action == self._j.A_DESTROY:
289 pass 318 pass
290 else: 319 else:
291 log.warning("FIXME: unmanaged action {}".format(action)) 320 log.warning("FIXME: unmanaged action {}".format(action))
292 321
293 return transport_elt 322 return transport_elt
294 323
301 reason_elt: domish.Element, 330 reason_elt: domish.Element,
302 ) -> None: 331 ) -> None:
303 log.debug("ICE-UDP session terminated") 332 log.debug("ICE-UDP session terminated")
304 333
305 def update_candidates( 334 def update_candidates(
306 self, 335 self, transport_data: dict, new_ice_data: dict, local: bool
307 transport_data: dict,
308 new_ice_data: dict,
309 local: bool
310 ) -> bool: 336 ) -> bool:
311 """Update ICE candidates when new one are received 337 """Update ICE candidates when new one are received
312 338
313 @param transport_data: transport_data of the content linked to the candidates 339 @param transport_data: transport_data of the content linked to the candidates
314 @param new_ice_data: new ICE data, in the same format as returned 340 @param new_ice_data: new ICE data, in the same format as returned
318 """ 344 """
319 key = "local_ice_data" if local else "peer_ice_data" 345 key = "local_ice_data" if local else "peer_ice_data"
320 try: 346 try:
321 ice_data = transport_data[key] 347 ice_data = transport_data[key]
322 except KeyError: 348 except KeyError:
323 log.warning( 349 log.warning(f"no {key} available")
324 f"no {key} available"
325 )
326 transport_data[key] = new_ice_data 350 transport_data[key] = new_ice_data
327 else: 351 else:
328 if ( 352 if (
329 new_ice_data["ufrag"] != ice_data["ufrag"] 353 new_ice_data["ufrag"] != ice_data["ufrag"]
330 or new_ice_data["pwd"] != ice_data["pwd"] 354 or new_ice_data["pwd"] != ice_data["pwd"]
334 ice_data["candidates"] = new_ice_data["candidates"] 358 ice_data["candidates"] = new_ice_data["candidates"]
335 return True 359 return True
336 return False 360 return False
337 361
338 async def ice_candidates_add( 362 async def ice_candidates_add(
339 self, 363 self, client: SatXMPPEntity, session_id: str, media_ice_data: Dict[str, dict]
340 client: SatXMPPEntity,
341 session_id: str,
342 media_ice_data: Dict[str, dict]
343 ) -> None: 364 ) -> None:
344 """Called when a new ICE candidates are available for a session 365 """Called when a new ICE candidates are available for a session
345 366
346 @param session_id: Session ID 367 @param session_id: Session ID
347 @param candidates: a map from media type (audio, video) to ICE data 368 @param media_ice_data: a map from media type (audio, video) to ICE data
348 ICE data must be in the same format as in [self.parse_transport] 369 ICE data must be in the same format as in [self.parse_transport]
349 """ 370 """
350 session = self._j.get_session(client, session_id) 371 session = self._j.get_session(client, session_id)
351 iq_elt: Optional[domish.Element] = None 372 iq_elt: Optional[domish.Element] = None
352 373
353 for media_type, new_ice_data in media_ice_data.items(): 374 for media_type, new_ice_data in media_ice_data.items():
375 if session["state"] == self._j.STATE_PENDING:
376 log.debug(f"session not active, buffering")
377 buffer = session.setdefault("XEP-0176_buffer", {})
378 media_buffer = buffer.setdefault(media_type, {})
379
380 for key in ["ufrag", "pwd"]:
381 if key not in media_buffer:
382 media_buffer[key] = new_ice_data[key]
383 else:
384 if media_buffer[key] != new_ice_data[key]:
385 log.warning(
386 f"{key} conflict, new value will replace old one\n"
387 f"buffer={media_buffer[key]!r}\n"
388 f"new={new_ice_data[key]!r}"
389 )
390 media_buffer[key] = new_ice_data[key]
391
392 media_buffer.setdefault("candidates", []).extend(
393 new_ice_data["candidates"]
394 )
395 continue
396
354 for content_name, content_data in session["contents"].items(): 397 for content_name, content_data in session["contents"].items():
355 if content_data["application_data"].get("media") == media_type: 398 if content_data["application_data"].get("media") == media_type:
356 break 399 break
357 else: 400 else:
358 log.warning( 401 log.warning(f"no media of type {media_type} has been found")
359 "no media of type {media_type} has been found"
360 )
361 continue 402 continue
362 restart = self.update_candidates( 403 restart = self.update_candidates(
363 content_data["transport_data"], new_ice_data, True 404 content_data["transport_data"], new_ice_data, True
364 ) 405 )
365 if restart: 406 if restart:
368 f"[{client.profile}]" 409 f"[{client.profile}]"
369 ) 410 )
370 self.host.bridge.ice_restart(session["id"], "local", client.profile) 411 self.host.bridge.ice_restart(session["id"], "local", client.profile)
371 transport_elt = self.build_transport(new_ice_data) 412 transport_elt = self.build_transport(new_ice_data)
372 iq_elt, __ = self._j.build_action( 413 iq_elt, __ = self._j.build_action(
373 client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt, 414 client,
374 transport_elt=transport_elt 415 self._j.A_TRANSPORT_INFO,
416 session,
417 content_name,
418 iq_elt=iq_elt,
419 context_elt=transport_elt,
375 ) 420 )
376 421
377 if iq_elt is not None: 422 if iq_elt is not None:
378 try: 423 try:
379 await iq_elt.send() 424 await iq_elt.send()
380 except Exception as e: 425 except Exception as e:
381 log.warning(f"Could not send new ICE candidates: {e}") 426 log.warning(f"Could not send new ICE candidates: {e}")
382 427
383 else:
384 log.error("Could not find any content to apply new ICE candidates")
385
386 428
387 @implementer(iwokkel.IDisco) 429 @implementer(iwokkel.IDisco)
388 class XEP_0176_handler(XMPPHandler): 430 class XEP_0176_handler(XMPPHandler):
389
390 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): 431 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
391 return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)] 432 return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)]
392 433
393 def getDiscoItems(self, requestor, target, nodeIdentifier=""): 434 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
394 return [] 435 return []