comparison libervia/backend/plugins/plugin_xep_0176.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0176.py@4c8bf67bfbeb
children 23fa52acf72c
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia plugin for Jingle (XEP-0176)
4 # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Dict, List, Optional
20 import uuid
21
22 from twisted.internet import defer
23 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
24 from twisted.words.xish import domish
25 from wokkel import disco, iwokkel
26 from zope.interface import implementer
27
28 from libervia.backend.core import exceptions
29 from libervia.backend.core.constants import Const as C
30 from libervia.backend.core.core_types import SatXMPPEntity
31 from libervia.backend.core.i18n import _
32 from libervia.backend.core.log import getLogger
33 from libervia.backend.tools.common import data_format
34
35 from .plugin_xep_0166 import BaseTransportHandler
36
37 log = getLogger(__name__)
38
39 NS_JINGLE_ICE_UDP= "urn:xmpp:jingle:transports:ice-udp:1"
40
41 PLUGIN_INFO = {
42 C.PI_NAME: "Jingle ICE-UDP Transport Method",
43 C.PI_IMPORT_NAME: "XEP-0176",
44 C.PI_TYPE: "XEP",
45 C.PI_MODES: C.PLUG_MODE_BOTH,
46 C.PI_PROTOCOLS: ["XEP-0176"],
47 C.PI_DEPENDENCIES: ["XEP-0166"],
48 C.PI_RECOMMENDATIONS: [],
49 C.PI_MAIN: "XEP_0176",
50 C.PI_HANDLER: "yes",
51 C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""),
52 }
53
54
55 class XEP_0176(BaseTransportHandler):
56
57 def __init__(self, host):
58 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
59 self.host = host
60 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
61 self._j.register_transport(
62 NS_JINGLE_ICE_UDP, self._j.TRANSPORT_DATAGRAM, self, 100
63 )
64 host.bridge.add_method(
65 "ice_candidates_add",
66 ".plugin",
67 in_sign="sss",
68 out_sign="",
69 method=self._ice_candidates_add,
70 async_=True,
71 )
72 host.bridge.add_signal(
73 "ice_candidates_new", ".plugin", signature="sss"
74 ) # args: jingle_sid, candidates_serialised, profile
75 host.bridge.add_signal(
76 "ice_restart", ".plugin", signature="sss"
77 ) # args: jingle_sid, side ("local" or "peer"), profile
78
79 def get_handler(self, client):
80 return XEP_0176_handler()
81
82 def _ice_candidates_add(
83 self,
84 session_id: str,
85 media_ice_data_s: str,
86 profile_key: str,
87 ):
88 client = self.host.get_client(profile_key)
89 return defer.ensureDeferred(self.ice_candidates_add(
90 client,
91 session_id,
92 data_format.deserialise(media_ice_data_s),
93 ))
94
95 def build_transport(self, ice_data: dict) -> domish.Element:
96 """Generate <transport> element from ICE data
97
98 @param ice_data: a dict containing the following keys:
99 - "ufrag" (str): The ICE username fragment.
100 - "pwd" (str): The ICE password.
101 - "candidates" (List[dict]): A list of ICE candidate dictionaries, each
102 containing:
103 - "component_id" (int): The component ID.
104 - "foundation" (str): The candidate foundation.
105 - "address" (str): The candidate IP address.
106 - "port" (int): The candidate port.
107 - "priority" (int): The candidate priority.
108 - "transport" (str): The candidate transport protocol, e.g., "udp".
109 - "type" (str): The candidate type, e.g., "host", "srflx", "prflx", or
110 "relay".
111 - "generation" (str, optional): The candidate generation. Defaults to "0".
112 - "network" (str, optional): The candidate network. Defaults to "0".
113 - "rel_addr" (str, optional): The related address for the candidate, if
114 any.
115 - "rel_port" (int, optional): The related port for the candidate, if any.
116
117 @return: A <transport> element.
118 """
119 try:
120 ufrag: str = ice_data["ufrag"]
121 pwd: str = ice_data["pwd"]
122 candidates: List[dict] = ice_data["candidates"]
123 except KeyError as e:
124 raise exceptions.DataError(f"ICE {e} must be provided")
125
126 candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True)
127 transport_elt = domish.Element(
128 (NS_JINGLE_ICE_UDP, "transport"),
129 attribs={"ufrag": ufrag, "pwd": pwd}
130 )
131
132 for candidate in candidates:
133 try:
134 candidate_elt = transport_elt.addElement("candidate")
135 candidate_elt["component"] = str(candidate["component_id"])
136 candidate_elt["foundation"] = candidate["foundation"]
137 candidate_elt["generation"] = str(candidate.get("generation", "0"))
138 candidate_elt["id"] = candidate.get("id") or str(uuid.uuid4())
139 candidate_elt["ip"] = candidate["address"]
140 candidate_elt["network"] = str(candidate.get("network", "0"))
141 candidate_elt["port"] = str(candidate["port"])
142 candidate_elt["priority"] = str(candidate["priority"])
143 candidate_elt["protocol"] = candidate["transport"]
144 candidate_elt["type"] = candidate["type"]
145 except KeyError as e:
146 raise exceptions.DataError(
147 f"Mandatory ICE candidate attribute {e} is missing"
148 )
149
150 if "rel_addr" in candidate and "rel_port" in candidate:
151 candidate_elt["rel-addr"] = candidate["rel_addr"]
152 candidate_elt["rel-port"] = str(candidate["rel_port"])
153
154 self.host.trigger.point("XEP-0176_build_transport", transport_elt, ice_data)
155
156 return transport_elt
157
158 def parse_transport(self, transport_elt: domish.Element) -> dict:
159 """Parse <transport> to a dict
160
161 @param transport_elt: <transport> element
162 @return: ICE data (as in [build_transport])
163 """
164 try:
165 ice_data = {
166 "ufrag": transport_elt["ufrag"],
167 "pwd": transport_elt["pwd"]
168 }
169 except KeyError as e:
170 raise exceptions.DataError(
171 f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}"
172 )
173 ice_data["candidates"] = ice_candidates = []
174
175 for candidate_elt in transport_elt.elements(NS_JINGLE_ICE_UDP, "candidate"):
176 try:
177 candidate = {
178 "component_id": int(candidate_elt["component"]),
179 "foundation": candidate_elt["foundation"],
180 "address": candidate_elt["ip"],
181 "port": int(candidate_elt["port"]),
182 "priority": int(candidate_elt["priority"]),
183 "transport": candidate_elt["protocol"],
184 "type": candidate_elt["type"],
185 }
186 except KeyError as e:
187 raise exceptions.DataError(
188 f"Mandatory attribute {e} is missing in candidate element"
189 )
190
191 if candidate_elt.hasAttribute("generation"):
192 candidate["generation"] = candidate_elt["generation"]
193
194 if candidate_elt.hasAttribute("network"):
195 candidate["network"] = candidate_elt["network"]
196
197 if candidate_elt.hasAttribute("rel-addr"):
198 candidate["rel_addr"] = candidate_elt["rel-addr"]
199
200 if candidate_elt.hasAttribute("rel-port"):
201 candidate["rel_port"] = int(candidate_elt["rel-port"])
202
203 ice_candidates.append(candidate)
204
205 self.host.trigger.point("XEP-0176_parse_transport", transport_elt, ice_data)
206
207 return ice_data
208
209 async def jingle_session_init(
210 self,
211 client: SatXMPPEntity,
212 session: dict,
213 content_name: str,
214 ) -> domish.Element:
215 """Create a Jingle session initiation transport element with ICE candidates.
216
217 @param client: SatXMPPEntity object representing the client.
218 @param session: Dictionary containing session data.
219 @param content_name: Name of the content.
220 @param ufrag: ICE username fragment.
221 @param pwd: ICE password.
222 @param candidates: List of ICE candidate dictionaries parsed from the
223 parse_ice_candidate method.
224
225 @return: domish.Element representing the Jingle transport element.
226
227 @raise exceptions.DataError: If mandatory data is missing from the candidates.
228 """
229 content_data = session["contents"][content_name]
230 transport_data = content_data["transport_data"]
231 ice_data = transport_data["local_ice_data"]
232 return self.build_transport(ice_data)
233
234 async def jingle_handler(
235 self,
236 client: SatXMPPEntity,
237 action: str,
238 session: dict,
239 content_name: str,
240 transport_elt: domish.Element,
241 ) -> domish.Element:
242 """Handle Jingle requests
243
244 @param client: The SatXMPPEntity instance.
245 @param action: The action to be performed with the session.
246 @param session: A dictionary containing the session information.
247 @param content_name: The name of the content.
248 @param transport_elt: The domish.Element instance representing the transport
249 element.
250
251 @return: <transport> element
252 """
253 content_data = session["contents"][content_name]
254 transport_data = content_data["transport_data"]
255 if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
256 peer_ice_data = self.parse_transport(transport_elt)
257 transport_data["peer_ice_data"] = peer_ice_data
258
259 elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
260 pass
261
262 elif action == self._j.A_SESSION_ACCEPT:
263 pass
264
265 elif action == self._j.A_START:
266 pass
267
268 elif action == self._j.A_SESSION_INITIATE:
269 # responder side, we give our candidates
270 transport_elt = self.build_transport(transport_data["local_ice_data"])
271 elif action == self._j.A_TRANSPORT_INFO:
272
273 media_type = content_data["application_data"].get("media")
274 new_ice_data = self.parse_transport(transport_elt)
275 restart = self.update_candidates(transport_data, new_ice_data, local=False)
276 if restart:
277 log.debug(
278 f"Peer ICE restart detected on session {session['id']} "
279 f"[{client.profile}]"
280 )
281 self.host.bridge.ice_restart(session["id"], "peer", client.profile)
282
283 self.host.bridge.ice_candidates_new(
284 session["id"],
285 data_format.serialise({media_type: new_ice_data}),
286 client.profile
287 )
288 elif action == self._j.A_DESTROY:
289 pass
290 else:
291 log.warning("FIXME: unmanaged action {}".format(action))
292
293 return transport_elt
294
295 def jingle_terminate(
296 self,
297 client: SatXMPPEntity,
298 action: str,
299 session: dict,
300 content_name: str,
301 reason_elt: domish.Element,
302 ) -> None:
303 log.debug("ICE-UDP session terminated")
304
305 def update_candidates(
306 self,
307 transport_data: dict,
308 new_ice_data: dict,
309 local: bool
310 ) -> bool:
311 """Update ICE candidates when new one are received
312
313 @param transport_data: transport_data of the content linked to the candidates
314 @param new_ice_data: new ICE data, in the same format as returned
315 by [self.parse_transport]
316 @param local: True if it's our candidates, False if it's peer ones
317 @return: True if there is a ICE restart
318 """
319 key = "local_ice_data" if local else "peer_ice_data"
320 try:
321 ice_data = transport_data[key]
322 except KeyError:
323 log.warning(
324 f"no {key} available"
325 )
326 transport_data[key] = new_ice_data
327 else:
328 if (
329 new_ice_data["ufrag"] != ice_data["ufrag"]
330 or new_ice_data["pwd"] != ice_data["pwd"]
331 ):
332 ice_data["ufrag"] = new_ice_data["ufrag"]
333 ice_data["pwd"] = new_ice_data["pwd"]
334 ice_data["candidates"] = new_ice_data["candidates"]
335 return True
336 return False
337
338 async def ice_candidates_add(
339 self,
340 client: SatXMPPEntity,
341 session_id: str,
342 media_ice_data: Dict[str, dict]
343 ) -> None:
344 """Called when a new ICE candidates are available for a session
345
346 @param session_id: Session ID
347 @param candidates: a map from media type (audio, video) to ICE data
348 ICE data must be in the same format as in [self.parse_transport]
349 """
350 session = self._j.get_session(client, session_id)
351 iq_elt: Optional[domish.Element] = None
352
353 for media_type, new_ice_data in media_ice_data.items():
354 for content_name, content_data in session["contents"].items():
355 if content_data["application_data"].get("media") == media_type:
356 break
357 else:
358 log.warning(
359 "no media of type {media_type} has been found"
360 )
361 continue
362 restart = self.update_candidates(
363 content_data["transport_data"], new_ice_data, True
364 )
365 if restart:
366 log.debug(
367 f"Local ICE restart detected on session {session['id']} "
368 f"[{client.profile}]"
369 )
370 self.host.bridge.ice_restart(session["id"], "local", client.profile)
371 transport_elt = self.build_transport(new_ice_data)
372 iq_elt, __ = self._j.build_action(
373 client, self._j.A_TRANSPORT_INFO, session, content_name, iq_elt=iq_elt,
374 transport_elt=transport_elt
375 )
376
377 if iq_elt is not None:
378 try:
379 await iq_elt.send()
380 except Exception as e:
381 log.warning(f"Could not send new ICE candidates: {e}")
382
383 else:
384 log.error("Could not find any content to apply new ICE candidates")
385
386
387 @implementer(iwokkel.IDisco)
388 class XEP_0176_handler(XMPPHandler):
389
390 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
391 return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)]
392
393 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
394 return []