comparison sat/plugins/plugin_xep_0167/mapping.py @ 4056:1c4f4aa36d98

plugin XEP-0167: Jingle RTP Sessions implementation: rel 420
author Goffi <goffi@goffi.org>
date Mon, 29 May 2023 13:38:10 +0200
parents
children adb9dc9c8114
comparison
equal deleted inserted replaced
4055:38819c69aa39 4056:1c4f4aa36d98
1 #!/usr/bin/env python3
2
3 # Libervia plugin for managing pipes (experimental)
4 # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import base64
20 from typing import Any, Dict, Optional
21
22 from twisted.words.xish import domish
23
24 from sat.core.constants import Const as C
25 from sat.core.log import getLogger
26
27 from .constants import NS_JINGLE_RTP
28
29 log = getLogger(__name__)
30
31 host = None
32
33
34 def senders_to_sdp(senders: str, session: dict) -> str:
35 """Returns appropriate SDP attribute corresponding to Jingle senders attribute"""
36 if senders == "both":
37 return "a=sendrecv"
38 elif senders == "none":
39 return "a=inactive"
40 elif session["role"] == senders:
41 return "a=sendonly"
42 else:
43 return "a=recvonly"
44
45
46 def generate_sdp_from_session(
47 session: dict, local: bool = False, port: int = 9999
48 ) -> str:
49 """Generate an SDP string from session data.
50
51 @param session: A dictionary containing the session data. It should have the
52 following structure:
53
54 {
55 "contents": {
56 "<content_id>": {
57 "application_data": {
58 "media": <str: "audio" or "video">,
59 "local_data": <media_data dict>,
60 "peer_data": <media_data dict>,
61 ...
62 },
63 "transport_data": {
64 "local_ice_data": <ice_data dict>,
65 "peer_ice_data": <ice_data dict>,
66 ...
67 },
68 ...
69 },
70 ...
71 }
72 }
73 @param local: A boolean value indicating whether to generate SDP for the local or
74 peer entity. If True, the method will generate SDP for the local entity,
75 otherwise for the peer entity. Generally the local SDP is received from frontends
76 and not needed in backend, except for debugging purpose.
77 @param port: The preferred port for communications.
78
79 @return: The generated SDP string.
80 """
81 sdp_lines = ["v=0"]
82
83 # Add originator (o=) line after the version (v=) line
84 username = base64.b64encode(session["local_jid"].full().encode()).decode()
85 session_id = "1" # Increment this for each session
86 session_version = "1" # Increment this when the session is updated
87 network_type = "IN"
88 address_type = "IP4"
89 connection_address = "0.0.0.0"
90 o_line = (
91 f"o={username} {session_id} {session_version} {network_type} {address_type} "
92 f"{connection_address}"
93 )
94 sdp_lines.append(o_line)
95
96 # Add the mandatory "s=" and t=" lines
97 sdp_lines.append("s=-")
98 sdp_lines.append("t=0 0")
99
100 # stream direction
101 all_senders = {c["senders"] for c in session["contents"].values()}
102 # if we don't have a common senders for all contents, we set them at media level
103 senders = all_senders.pop() if len(all_senders) == 1 else None
104 if senders is not None:
105 sdp_lines.append(senders_to_sdp(senders, session))
106
107 sdp_lines.append("a=msid-semantic:WMS *")
108
109 host.trigger.point(
110 "XEP-0167_generate_sdp_session",
111 session,
112 local,
113 sdp_lines,
114 triggers_no_cancel=True
115 )
116
117 contents = session["contents"]
118 for content_name, content_data in contents.items():
119 app_data_key = "local_data" if local else "peer_data"
120 application_data = content_data["application_data"]
121 media_data = application_data[app_data_key]
122 media = application_data["media"]
123 payload_types = media_data.get("payload_types", {})
124
125 # Generate m= line
126 transport = "UDP/TLS/RTP/SAVPF"
127 payload_type_ids = [str(pt_id) for pt_id in payload_types]
128 m_line = f"m={media} {port} {transport} {' '.join(payload_type_ids)}"
129 sdp_lines.append(m_line)
130
131 sdp_lines.append(f"c={network_type} {address_type} {connection_address}")
132
133 sdp_lines.append(f"a=mid:{content_name}")
134
135 # stream direction
136 if senders is None:
137 sdp_lines.append(senders_to_sdp(content_data["senders"], session))
138
139 # Generate a= lines for rtpmap and fmtp
140 for pt_id, pt in payload_types.items():
141 name = pt["name"]
142 clockrate = pt.get("clockrate", "")
143 sdp_lines.append(f"a=rtpmap:{pt_id} {name}/{clockrate}")
144
145 if "ptime" in pt:
146 sdp_lines.append(f"a=ptime:{pt['ptime']}")
147
148 if "parameters" in pt:
149 fmtp_params = ";".join([f"{k}={v}" for k, v in pt["parameters"].items()])
150 sdp_lines.append(f"a=fmtp:{pt_id} {fmtp_params}")
151
152 if "bandwidth" in media_data:
153 sdp_lines.append(f"a=b:{media_data['bandwidth']}")
154
155 if media_data.get("rtcp-mux"):
156 sdp_lines.append("a=rtcp-mux")
157
158 # Generate a= lines for fingerprint, ICE ufrag, pwd and candidates
159 ice_data_key = "local_ice_data" if local else "peer_ice_data"
160 ice_data = content_data["transport_data"][ice_data_key]
161
162 if "fingerprint" in ice_data:
163 fingerprint_data = ice_data["fingerprint"]
164 sdp_lines.append(
165 f"a=fingerprint:{fingerprint_data['hash']} "
166 f"{fingerprint_data['fingerprint']}"
167 )
168 sdp_lines.append(f"a=setup:{fingerprint_data['setup']}")
169
170 sdp_lines.append(f"a=ice-ufrag:{ice_data['ufrag']}")
171 sdp_lines.append(f"a=ice-pwd:{ice_data['pwd']}")
172
173 for candidate in ice_data["candidates"]:
174 foundation = candidate["foundation"]
175 component_id = candidate["component_id"]
176 transport = candidate["transport"]
177 priority = candidate["priority"]
178 address = candidate["address"]
179 candidate_port = candidate["port"]
180 candidate_type = candidate["type"]
181
182 candidate_line = (
183 f"a=candidate:{foundation} {component_id} {transport} {priority} "
184 f"{address} {candidate_port} typ {candidate_type}"
185 )
186
187 if "rel_addr" in candidate and "rel_port" in candidate:
188 candidate_line += (
189 f" raddr {candidate['rel_addr']} rport {candidate['rel_port']}"
190 )
191
192 if "generation" in candidate:
193 candidate_line += f" generation {candidate['generation']}"
194
195 if "network" in candidate:
196 candidate_line += f" network {candidate['network']}"
197
198 sdp_lines.append(candidate_line)
199
200 # Generate a= lines for encryption
201 if "encryption" in media_data:
202 for enc_data in media_data["encryption"]:
203 crypto_suite = enc_data["crypto-suite"]
204 key_params = enc_data["key-params"]
205 session_params = enc_data.get("session-params", "")
206 tag = enc_data["tag"]
207
208 crypto_line = f"a=crypto:{tag} {crypto_suite} {key_params}"
209 if session_params:
210 crypto_line += f" {session_params}"
211 sdp_lines.append(crypto_line)
212
213
214 host.trigger.point(
215 "XEP-0167_generate_sdp_content",
216 session,
217 local,
218 content_name,
219 content_data,
220 sdp_lines,
221 application_data,
222 app_data_key,
223 media_data,
224 media,
225 triggers_no_cancel=True
226 )
227
228 # Combine SDP lines and return the result
229 return "\r\n".join(sdp_lines) + "\r\n"
230
231
232 def parse_sdp(sdp: str) -> dict:
233 """Parse SDP string.
234
235 @param sdp: The SDP string to parse.
236
237 @return: A dictionary containing parsed session data.
238 """
239 # FIXME: to be removed once host is accessible from global var
240 assert host is not None
241 lines = sdp.strip().split("\r\n")
242 # session metadata
243 metadata: Dict[str, Any] = {}
244 call_data = {"metadata": metadata}
245
246 media_type = None
247 media_data: Optional[Dict[str, Any]] = None
248 application_data: Optional[Dict[str, Any]] = None
249 transport_data: Optional[Dict[str, Any]] = None
250 fingerprint_data: Optional[Dict[str, str]] = None
251 ice_pwd: Optional[str] = None
252 ice_ufrag: Optional[str] = None
253 payload_types: Optional[Dict[int, dict]] = None
254
255 for line in lines:
256 try:
257 parts = line.split()
258 prefix = parts[0][:2] # Extract the 'a=', 'm=', etc., prefix
259 parts[0] = parts[0][2:] # Remove the prefix from the first element
260
261 if prefix == "m=":
262 media_type = parts[0]
263 port = int(parts[1])
264 payload_types = {}
265 for payload_type_id in [int(pt_id) for pt_id in parts[3:]]:
266 payload_type = {"id": payload_type_id}
267 payload_types[payload_type_id] = payload_type
268
269 application_data = {"media": media_type, "payload_types": payload_types}
270 transport_data = {"port": port}
271 if fingerprint_data is not None:
272 transport_data["fingerprint"] = fingerprint_data
273 if ice_pwd is not None:
274 transport_data["pwd"] = ice_pwd
275 if ice_ufrag is not None:
276 transport_data["ufrag"] = ice_ufrag
277 media_data = call_data[media_type] = {
278 "application_data": application_data,
279 "transport_data": transport_data,
280 }
281
282 elif prefix == "a=":
283 if ":" in parts[0]:
284 attribute, parts[0] = parts[0].split(":", 1)
285 else:
286 attribute = parts[0]
287
288 if (
289 media_type is None
290 or application_data is None
291 or transport_data is None
292 ) and not (
293 attribute
294 in (
295 "sendrecv",
296 "sendonly",
297 "recvonly",
298 "inactive",
299 "fingerprint",
300 "group",
301 "ice-options",
302 "msid-semantic",
303 "ice-pwd",
304 "ice-ufrag",
305 )
306 ):
307 log.warning(
308 "Received attribute before media description, this is "
309 f"invalid: {line}"
310 )
311 continue
312
313 if attribute == "mid":
314 assert media_data is not None
315 try:
316 media_data["id"] = parts[0]
317 except IndexError:
318 log.warning(f"invalid media ID: {line}")
319
320 elif attribute == "rtpmap":
321 assert application_data is not None
322 assert payload_types is not None
323 pt_id = int(parts[0])
324 codec_info = parts[1].split("/")
325 codec = codec_info[0]
326 clockrate = int(codec_info[1])
327 payload_type = {
328 "id": pt_id,
329 "name": codec,
330 "clockrate": clockrate,
331 }
332 # Handle optional channel count
333 if len(codec_info) > 2:
334 channels = int(codec_info[2])
335 payload_type["channels"] = channels
336
337 payload_types.setdefault(pt_id, {}).update(payload_type)
338
339 elif attribute == "fmtp":
340 assert payload_types is not None
341 pt_id = int(parts[0])
342 params = parts[1].split(";")
343 try:
344 payload_type = payload_types[pt_id]
345 except KeyError:
346 raise ValueError(
347 f"Can find content type {pt_id}, ignoring: {line}"
348 )
349
350 try:
351 payload_type["parameters"] = {
352 name: value
353 for name, value in (param.split("=") for param in params)
354 }
355 except ValueError:
356 payload_type.setdefault("exra-parameters", []).extend(params)
357
358 elif attribute == "candidate":
359 assert transport_data is not None
360 candidate = {
361 "foundation": parts[0],
362 "component_id": int(parts[1]),
363 "transport": parts[2],
364 "priority": int(parts[3]),
365 "address": parts[4],
366 "port": int(parts[5]),
367 "type": parts[7],
368 }
369
370 for part in parts[8:]:
371 if part == "raddr":
372 candidate["rel_addr"] = parts[parts.index(part) + 1]
373 elif part == "rport":
374 candidate["rel_port"] = int(parts[parts.index(part) + 1])
375 elif part == "generation":
376 candidate["generation"] = parts[parts.index(part) + 1]
377 elif part == "network":
378 candidate["network"] = parts[parts.index(part) + 1]
379
380 transport_data.setdefault("candidates", []).append(candidate)
381
382 elif attribute == "fingerprint":
383 algorithm, fingerprint = parts[0], parts[1]
384 fingerprint_data = {"hash": algorithm, "fingerprint": fingerprint}
385 if transport_data is not None:
386 transport_data["fingerprint"] = fingerprint_data
387 elif attribute == "setup":
388 assert transport_data is not None
389 setup = parts[0]
390 transport_data.setdefault("fingerprint", {})["setup"] = setup
391
392 elif attribute == "b":
393 assert application_data is not None
394 bandwidth = int(parts[0])
395 application_data["bandwidth"] = bandwidth
396
397 elif attribute == "rtcp-mux":
398 assert application_data is not None
399 application_data["rtcp-mux"] = True
400
401 elif attribute == "ice-ufrag":
402 if transport_data is not None:
403 transport_data["ufrag"] = parts[0]
404
405 elif attribute == "ice-pwd":
406 if transport_data is not None:
407 transport_data["pwd"] = parts[0]
408
409 host.trigger.point(
410 "XEP-0167_parse_sdp_a",
411 attribute,
412 parts,
413 call_data,
414 metadata,
415 media_type,
416 application_data,
417 transport_data,
418 triggers_no_cancel=True
419 )
420
421 except ValueError as e:
422 raise ValueError(f"Could not parse line. Invalid format ({e}): {line}") from e
423 except IndexError as e:
424 raise IndexError(f"Incomplete line. Missing data: {line}") from e
425
426 # we remove private data (data starting with _, used by some plugins (e.g. XEP-0294)
427 # to handle session data at media level))
428 for key in [k for k in call_data if k.startswith("_")]:
429 log.debug(f"cleaning remaining private data {key!r}")
430 del call_data[key]
431
432 # ICE candidates may only be specified for the first media, this
433 # duplicate the candidate for the other in this case
434 all_media = {k:v for k,v in call_data.items() if k in ("audio", "video")}
435 if len(all_media) > 1 and not all(
436 "candidates" in c["transport_data"] for c in all_media.values()
437 ):
438 first_content = next(iter(all_media.values()))
439 try:
440 ice_candidates = first_content["transport_data"]["candidates"]
441 except KeyError:
442 log.warning("missing candidates in SDP")
443 else:
444 for idx, content in enumerate(all_media.values()):
445 if idx == 0:
446 continue
447 content["transport_data"].setdefault("candidates", ice_candidates)
448
449 return call_data
450
451
452 def build_description(media: str, media_data: dict, session: dict) -> domish.Element:
453 """Generate <description> element from media data
454
455 @param media: media type ("audio" or "video")
456
457 @param media_data: A dictionary containing the media description data.
458 The keys and values are described below:
459
460 - ssrc (str, optional): The synchronization source identifier.
461 - payload_types (list): A list of dictionaries, each representing a payload
462 type.
463 Each dictionary may contain the following keys:
464 - channels (str, optional): Number of audio channels.
465 - clockrate (str, optional): Clock rate of the media.
466 - id (str): The unique identifier of the payload type.
467 - maxptime (str, optional): Maximum packet time.
468 - name (str, optional): Name of the codec.
469 - ptime (str, optional): Preferred packet time.
470 - parameters (dict, optional): A dictionary of codec-specific parameters.
471 Key-value pairs represent the parameter name and value, respectively.
472 - bandwidth (str, optional): The bandwidth type.
473 - rtcp-mux (bool, optional): Indicates whether RTCP multiplexing is enabled or
474 not.
475 - encryption (list, optional): A list of dictionaries, each representing an
476 encryption method.
477 Each dictionary may contain the following keys:
478 - tag (str): The unique identifier of the encryption method.
479 - crypto-suite (str): The encryption suite in use.
480 - key-params (str): Key parameters for the encryption suite.
481 - session-params (str, optional): Session parameters for the encryption
482 suite.
483
484 @return: A <description> element.
485 """
486 # FIXME: to be removed once host is accessible from global var
487 assert host is not None
488
489 desc_elt = domish.Element((NS_JINGLE_RTP, "description"), attribs={"media": media})
490
491 for pt_id, pt_data in media_data.get("payload_types", {}).items():
492 payload_type_elt = desc_elt.addElement("payload-type")
493 payload_type_elt["id"] = str(pt_id)
494 for attr in ["channels", "clockrate", "maxptime", "name", "ptime"]:
495 if attr in pt_data:
496 payload_type_elt[attr] = str(pt_data[attr])
497
498 if "parameters" in pt_data:
499 for param_name, param_value in pt_data["parameters"].items():
500 param_elt = payload_type_elt.addElement("parameter")
501 param_elt["name"] = param_name
502 param_elt["value"] = param_value
503 host.trigger.point(
504 "XEP-0167_build_description_payload_type",
505 desc_elt,
506 media_data,
507 pt_data,
508 payload_type_elt,
509 triggers_no_cancel=True
510 )
511
512 if "bandwidth" in media_data:
513 bandwidth_elt = desc_elt.addElement("bandwidth")
514 bandwidth_elt["type"] = media_data["bandwidth"]
515
516 if media_data.get("rtcp-mux"):
517 desc_elt.addElement("rtcp-mux")
518
519 # Add encryption element
520 if "encryption" in media_data:
521 encryption_elt = desc_elt.addElement("encryption")
522 # we always want require encryption if the `encryption` data is present
523 encryption_elt["required"] = "1"
524 for enc_data in media_data["encryption"]:
525 crypto_elt = encryption_elt.addElement("crypto")
526 for attr in ["tag", "crypto-suite", "key-params", "session-params"]:
527 if attr in enc_data:
528 crypto_elt[attr] = enc_data[attr]
529
530 host.trigger.point(
531 "XEP-0167_build_description",
532 desc_elt,
533 media_data,
534 session,
535 triggers_no_cancel=True
536 )
537
538 return desc_elt
539
540
541 def parse_description(desc_elt: domish.Element) -> dict:
542 """Parse <desciption> to a dict
543
544 @param desc_elt: <description> element
545 @return: media data as in [build_description]
546 """
547 # FIXME: to be removed once host is accessible from global var
548 assert host is not None
549
550 media_data = {}
551 if desc_elt.hasAttribute("ssrc"):
552 media_data.setdefault("ssrc", {})[desc_elt["ssrc"]] = {}
553
554 payload_types = {}
555 for payload_type_elt in desc_elt.elements(NS_JINGLE_RTP, "payload-type"):
556 payload_type_data = {
557 attr: payload_type_elt[attr]
558 for attr in [
559 "channels",
560 "clockrate",
561 "maxptime",
562 "name",
563 "ptime",
564 ]
565 if payload_type_elt.hasAttribute(attr)
566 }
567 try:
568 pt_id = int(payload_type_elt["id"])
569 except KeyError:
570 log.warning(
571 f"missing ID in payload type, ignoring: {payload_type_elt.toXml()}"
572 )
573 continue
574
575 parameters = {}
576 for param_elt in payload_type_elt.elements(NS_JINGLE_RTP, "parameter"):
577 param_name = param_elt.getAttribute("name")
578 param_value = param_elt.getAttribute("value")
579 if not param_name or param_value is None:
580 log.warning(f"invalid parameter: {param_elt.toXml()}")
581 continue
582 parameters[param_name] = param_value
583
584 if parameters:
585 payload_type_data["parameters"] = parameters
586
587 host.trigger.point(
588 "XEP-0167_parse_description_payload_type",
589 desc_elt,
590 media_data,
591 payload_type_elt,
592 payload_type_data,
593 triggers_no_cancel=True
594 )
595 payload_types[pt_id] = payload_type_data
596
597 # bandwidth
598 media_data["payload_types"] = payload_types
599 try:
600 bandwidth_elt = next(desc_elt.elements(NS_JINGLE_RTP, "bandwidth"))
601 except StopIteration:
602 pass
603 else:
604 bandwidth = bandwidth_elt.getAttribute("type")
605 if not bandwidth:
606 log.warning(f"invalid bandwidth: {bandwidth_elt.toXml}")
607 else:
608 media_data["bandwidth"] = bandwidth
609
610 # rtcp-mux
611 rtcp_mux_elt = next(desc_elt.elements(NS_JINGLE_RTP, "rtcp-mux"), None)
612 media_data["rtcp-mux"] = rtcp_mux_elt is not None
613
614 # Encryption
615 encryption_data = []
616 encryption_elt = next(desc_elt.elements(NS_JINGLE_RTP, "encryption"), None)
617 if encryption_elt:
618 media_data["encryption_required"] = C.bool(
619 encryption_elt.getAttribute("required", C.BOOL_FALSE)
620 )
621
622 for crypto_elt in encryption_elt.elements(NS_JINGLE_RTP, "crypto"):
623 crypto_data = {
624 attr: crypto_elt[attr]
625 for attr in [
626 "crypto-suite",
627 "key-params",
628 "session-params",
629 "tag",
630 ]
631 if crypto_elt.hasAttribute(attr)
632 }
633 encryption_data.append(crypto_data)
634
635 if encryption_data:
636 media_data["encryption"] = encryption_data
637
638 host.trigger.point(
639 "XEP-0167_parse_description",
640 desc_elt,
641 media_data,
642 triggers_no_cancel=True
643 )
644
645 return media_data