Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0293.py @ 4060:fce92ba311f4
plugin XEP-0293: "Jingle RTP Feedback Negotiation" implementation:
rel 437
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 29 May 2023 17:58:03 +0200 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
4059:00dbc3370d35 | 4060:fce92ba311f4 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin | |
4 # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import List | |
20 | |
21 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
22 from twisted.words.xish import domish | |
23 from wokkel import disco, iwokkel | |
24 from zope.interface import implementer | |
25 | |
26 from sat.core.constants import Const as C | |
27 from sat.core.i18n import _ | |
28 from sat.core.log import getLogger | |
29 | |
30 log = getLogger(__name__) | |
31 | |
32 NS_JINGLE_RTP_RTCP_FB = "urn:xmpp:jingle:apps:rtp:rtcp-fb:0" | |
33 | |
34 PLUGIN_INFO = { | |
35 C.PI_NAME: "Jingle RTP Feedback Negotiation", | |
36 C.PI_IMPORT_NAME: "XEP-0293", | |
37 C.PI_TYPE: "XEP", | |
38 C.PI_MODES: C.PLUG_MODE_BOTH, | |
39 C.PI_PROTOCOLS: ["XEP-0293"], | |
40 C.PI_DEPENDENCIES: ["XEP-0092", "XEP-0166", "XEP-0167"], | |
41 C.PI_RECOMMENDATIONS: [], | |
42 C.PI_MAIN: "XEP_0293", | |
43 C.PI_HANDLER: "yes", | |
44 C.PI_DESCRIPTION: _("""Jingle RTP Feedback Negotiation"""), | |
45 } | |
46 | |
47 RTCP_FB_KEY = "rtcp-fb" | |
48 | |
49 | |
50 class XEP_0293: | |
51 def __init__(self, host): | |
52 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") | |
53 host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) | |
54 host.trigger.add( | |
55 "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger | |
56 ) | |
57 host.trigger.add("XEP-0167_parse_description", self._parse_description_trigger) | |
58 host.trigger.add( | |
59 "XEP-0167_parse_description_payload_type", | |
60 self._parse_description_payload_type_trigger, | |
61 ) | |
62 host.trigger.add("XEP-0167_build_description", self._build_description_trigger) | |
63 host.trigger.add( | |
64 "XEP-0167_build_description_payload_type", | |
65 self._build_description_payload_type_trigger, | |
66 ) | |
67 | |
68 def get_handler(self, client): | |
69 return XEP_0293_handler() | |
70 | |
71 ## SDP | |
72 | |
73 def _parse_sdp_a_trigger( | |
74 self, | |
75 attribute: str, | |
76 parts: List[str], | |
77 call_data: dict, | |
78 metadata: dict, | |
79 media_type: str, | |
80 application_data: dict, | |
81 transport_data: dict, | |
82 ) -> None: | |
83 """Parse "rtcp-fb" and "rtcp-fb-trr-int" attributes | |
84 | |
85 @param attribute: The attribute being parsed. | |
86 @param parts: The list of parts in the attribute. | |
87 @param call_data: The call data dict. | |
88 @param metadata: The metadata dict. | |
89 @param media_type: The media type (e.g., audio, video). | |
90 @param application_data: The application data dict. | |
91 @param transport_data: The transport data dict. | |
92 @param payload_map: The payload map dict. | |
93 """ | |
94 if attribute == "rtcp-fb": | |
95 pt_id = parts[0] | |
96 feedback_type = parts[1] | |
97 | |
98 feedback_subtype = None | |
99 parameters = {} | |
100 | |
101 # Check if there are extra parameters | |
102 if len(parts) > 2: | |
103 feedback_subtype = parts[2] | |
104 | |
105 if len(parts) > 3: | |
106 for parameter in parts[3:]: | |
107 name, _, value = parameter.partition("=") | |
108 parameters[name] = value or None | |
109 | |
110 # Check if this feedback is linked to a payload type | |
111 if pt_id == "*": | |
112 # Not linked to a payload type, add to application data | |
113 application_data.setdefault(RTCP_FB_KEY, []).append( | |
114 (feedback_type, feedback_subtype, parameters) | |
115 ) | |
116 else: | |
117 payload_types = application_data.get("payload_types", {}) | |
118 try: | |
119 payload_type = payload_types[int(pt_id)] | |
120 except KeyError: | |
121 log.warning( | |
122 f"Got reference to unknown payload type {pt_id}: " | |
123 f"{' '.join(parts)}" | |
124 ) | |
125 else: | |
126 # Linked to a payload type, add to payload data | |
127 payload_type.setdefault(RTCP_FB_KEY, []).append( | |
128 (feedback_type, feedback_subtype, parameters) | |
129 ) | |
130 | |
131 elif attribute == "rtcp-fb-trr-int": | |
132 pt_id = parts[0] # Payload type ID | |
133 interval = int(parts[1]) | |
134 | |
135 # Check if this interval is linked to a payload type | |
136 if pt_id == "*": | |
137 # Not linked to a payload type, add to application data | |
138 application_data["rtcp-fb-trr-int"] = interval | |
139 else: | |
140 payload_types = application_data.get("payload_types", {}) | |
141 try: | |
142 payload_type = payload_types[int(pt_id)] | |
143 except KeyError: | |
144 log.warning( | |
145 f"Got reference to unknown payload type {pt_id}: " | |
146 f"{' '.join(parts)}" | |
147 ) | |
148 else: | |
149 # Linked to a payload type, add to payload data | |
150 payload_type["rtcp-fb-trr-int"] = interval | |
151 | |
152 def _generate_rtcp_fb_lines( | |
153 self, data: dict, pt_id: str, sdp_lines: List[str] | |
154 ) -> None: | |
155 for type_, subtype, parameters in data.get(RTCP_FB_KEY, []): | |
156 parameters_strs = [ | |
157 f"{k}={v}" if v is not None else k for k, v in parameters.items() | |
158 ] | |
159 parameters_str = " ".join(parameters_strs) | |
160 | |
161 sdp_line = f"a=rtcp-fb:{pt_id} {type_}" | |
162 if subtype: | |
163 sdp_line += f" {subtype}" | |
164 if parameters_str: | |
165 sdp_line += f" {parameters_str}" | |
166 sdp_lines.append(sdp_line) | |
167 | |
168 def _generate_rtcp_fb_trr_int_lines( | |
169 self, data: dict, pt_id: str, sdp_lines: List[str] | |
170 ) -> None: | |
171 if "rtcp-fb-trr-int" not in data: | |
172 return | |
173 sdp_lines.append(f"a=rtcp-fb:{pt_id} trr-int {data['rtcp-fb-trr-int']}") | |
174 | |
175 def _generate_sdp_content_trigger( | |
176 self, | |
177 session: dict, | |
178 local: bool, | |
179 content_name: str, | |
180 content_data: dict, | |
181 sdp_lines: List[str], | |
182 application_data: dict, | |
183 app_data_key: str, | |
184 media_data: dict, | |
185 media: str, | |
186 ) -> None: | |
187 """Generate SDP attributes "rtcp-fb" and "rtcp-fb-trr-int" from application data. | |
188 | |
189 @param session: The session data. | |
190 @param local: Whether this is local or remote content. | |
191 @param content_name: The name of the content. | |
192 @param content_data: The data of the content. | |
193 @param sdp_lines: The list of SDP lines to append to. | |
194 @param application_data: The application data dict. | |
195 @param app_data_key: The key for the application data. | |
196 @param media_data: The media data dict. | |
197 @param media: The media type (e.g., audio, video). | |
198 """ | |
199 # Generate lines for application data | |
200 self._generate_rtcp_fb_lines(application_data, "*", sdp_lines) | |
201 self._generate_rtcp_fb_trr_int_lines(application_data, "*", sdp_lines) | |
202 | |
203 # Generate lines for each payload type | |
204 for pt_id, payload_data in media_data.get("payload_types", {}).items(): | |
205 self._generate_rtcp_fb_lines(payload_data, pt_id, sdp_lines) | |
206 self._generate_rtcp_fb_trr_int_lines(payload_data, pt_id, sdp_lines) | |
207 | |
208 ## XML | |
209 | |
210 def _parse_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None: | |
211 """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements. | |
212 | |
213 @param parent_elt: The parent domish.Element. | |
214 @param data: The data dict to populate. | |
215 """ | |
216 for rtcp_fb_elt in parent_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb"): | |
217 try: | |
218 type_ = rtcp_fb_elt["type"] | |
219 subtype = rtcp_fb_elt.getAttribute("subtype") | |
220 | |
221 parameters = {} | |
222 for parameter_elt in rtcp_fb_elt.elements( | |
223 NS_JINGLE_RTP_RTCP_FB, "parameter" | |
224 ): | |
225 parameters[parameter_elt["name"]] = parameter_elt.getAttribute( | |
226 "value" | |
227 ) | |
228 | |
229 data.setdefault(RTCP_FB_KEY, []).append((type_, subtype, parameters)) | |
230 except (KeyError, ValueError) as e: | |
231 log.warning(f"Error while parsing <rtcp-fb>: {e}\n{rtcp_fb_elt.toXml()}") | |
232 | |
233 for rtcp_fb_trr_int_elt in parent_elt.elements( | |
234 NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int" | |
235 ): | |
236 try: | |
237 interval_value = int(rtcp_fb_trr_int_elt["value"]) | |
238 data.setdefault("rtcp_fb_trr_int", []).append(interval_value) | |
239 except (KeyError, ValueError) as e: | |
240 log.warning( | |
241 f"Error while parsing <rtcp-fb-trr-int>: {e}\n" | |
242 f"{rtcp_fb_trr_int_elt.toXml()}" | |
243 ) | |
244 | |
245 def _parse_description_trigger( | |
246 self, desc_elt: domish.Element, media_data: dict | |
247 ) -> None: | |
248 """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a description. | |
249 | |
250 @param desc_elt: The <description> domish.Element. | |
251 @param media_data: The media data dict to populate. | |
252 """ | |
253 self._parse_rtcp_fb_elements(desc_elt, media_data) | |
254 | |
255 def _parse_description_payload_type_trigger( | |
256 self, | |
257 desc_elt: domish.Element, | |
258 media_data: dict, | |
259 payload_type_elt: domish.Element, | |
260 payload_type_data: dict, | |
261 ) -> None: | |
262 """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a payload type. | |
263 | |
264 @param desc_elt: The <description> domish.Element. | |
265 @param media_data: The media data dict. | |
266 @param payload_type_elt: The <payload-type> domish.Element. | |
267 @param payload_type_data: The payload type data dict to populate. | |
268 """ | |
269 self._parse_rtcp_fb_elements(payload_type_elt, payload_type_data) | |
270 | |
271 def build_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None: | |
272 """Helper method to build the <rtcp-fb> and <rtcp-fb-trr-int> elements""" | |
273 for type_, subtype, parameters in data.get(RTCP_FB_KEY, []): | |
274 rtcp_fb_elt = parent_elt.addElement((NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) | |
275 rtcp_fb_elt["type"] = type_ | |
276 if subtype: | |
277 rtcp_fb_elt["subtype"] = subtype | |
278 for name, value in parameters.items(): | |
279 param_elt = rtcp_fb_elt.addElement(name) | |
280 if value is not None: | |
281 param_elt.addContent(str(value)) | |
282 | |
283 if "rtcp-fb-trr-int" in data: | |
284 rtcp_fb_trr_int_elt = parent_elt.addElement( | |
285 (NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int") | |
286 ) | |
287 rtcp_fb_trr_int_elt["value"] = str(data["rtcp-fb-trr-int"]) | |
288 | |
289 def _build_description_payload_type_trigger( | |
290 self, | |
291 desc_elt: domish.Element, | |
292 media_data: dict, | |
293 payload_type: dict, | |
294 payload_type_elt: domish.Element, | |
295 ) -> None: | |
296 """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a payload type""" | |
297 self.build_rtcp_fb_elements(payload_type_elt, payload_type) | |
298 | |
299 def _build_description_trigger( | |
300 self, desc_elt: domish.Element, media_data: dict, session: dict | |
301 ) -> None: | |
302 """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a media description""" | |
303 self.build_rtcp_fb_elements(desc_elt, media_data) | |
304 | |
305 | |
306 @implementer(iwokkel.IDisco) | |
307 class XEP_0293_handler(XMPPHandler): | |
308 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
309 return [disco.DiscoFeature(NS_JINGLE_RTP_RTCP_FB)] | |
310 | |
311 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
312 return [] |