Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4278:240d8b7ad906
component Conferences: implementation of SFU component to make multi-party A/V conferences:
This component wrap `Galène` SFU and translate its signaling to XMPP Jingle in both
direction.
rel 445
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 05 Jul 2024 17:18:37 +0200 |
parents | |
children | 96fdf4891747 |
comparison
equal
deleted
inserted
replaced
4277:b4b4ea8c5c87 | 4278:240d8b7ad906 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia A/V Conferences Component | |
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import json | |
20 from pathlib import Path | |
21 from urllib.parse import quote | |
22 | |
23 from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol | |
24 from shortuuid import uuid | |
25 import treq | |
26 from twisted.internet import defer, reactor | |
27 from twisted.internet.error import ConnectionDone | |
28 from twisted.python import failure | |
29 from twisted.python.failure import Failure | |
30 from twisted.python.procutils import which | |
31 from twisted.words.protocols.jabber import jid | |
32 from twisted.words.protocols.jabber.error import StanzaError | |
33 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
34 from twisted.words.xish import domish | |
35 from wokkel import disco, iwokkel | |
36 from zope.interface import implementer | |
37 | |
38 from libervia.backend.core import exceptions | |
39 from libervia.backend.core.constants import Const as C | |
40 from libervia.backend.core.core_types import SatXMPPEntity | |
41 from libervia.backend.core.i18n import _ | |
42 from libervia.backend.core.log import getLogger | |
43 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 | |
44 from libervia.backend.plugins.plugin_xep_0166 import XEP_0166 | |
45 from libervia.backend.plugins.plugin_xep_0167 import mapping | |
46 from libervia.backend.plugins.plugin_xep_0176 import XEP_0176 | |
47 from libervia.backend.tools.common import async_process, regex | |
48 | |
49 | |
50 log = getLogger(__name__) | |
51 | |
52 IMPORT_NAME = "conferences" | |
53 | |
54 PLUGIN_INFO = { | |
55 C.PI_NAME: "A/V Conferences Component", | |
56 C.PI_IMPORT_NAME: IMPORT_NAME, | |
57 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | |
58 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | |
59 C.PI_PROTOCOLS: [], | |
60 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"], | |
61 C.PI_RECOMMENDATIONS: [], | |
62 C.PI_MAIN: "ConferencesComponent", | |
63 C.PI_HANDLER: C.BOOL_FALSE, | |
64 C.PI_DESCRIPTION: _( | |
65 "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n" | |
66 "The Galène project (https://galene.org) is currently the only supported backend." | |
67 ), | |
68 } | |
69 | |
70 CONF_SECTION = f"component {IMPORT_NAME}" | |
71 | |
72 | |
73 class Conference: | |
74 | |
75 def __init__( | |
76 self, | |
77 parent: "ConferencesComponent", | |
78 group_name: str, | |
79 data_file: Path, | |
80 endpoint: str, | |
81 status_url: str, | |
82 ) -> None: | |
83 self.parent = parent | |
84 self.group_name = group_name | |
85 self.data_file = data_file | |
86 self.endpoint = endpoint | |
87 self.status_url = status_url | |
88 self._protocol: "GaleneProtocol|None" = None | |
89 self.connected = defer.Deferred() | |
90 self.ready = defer.Deferred() | |
91 | |
92 def __str__(self): | |
93 return f"conference {self.group_name!r}" | |
94 | |
95 @property | |
96 def protocol(self) -> "GaleneProtocol": | |
97 assert self._protocol is not None | |
98 return self._protocol | |
99 | |
100 @protocol.setter | |
101 def protocol(self, protocol: "GaleneProtocol") -> None: | |
102 self._protocol = protocol | |
103 self.connected.callback(None) | |
104 | |
105 @property | |
106 def _j(self) -> XEP_0166: | |
107 return self.parent._j | |
108 | |
109 @property | |
110 def _ice_udp(self) -> XEP_0176: | |
111 return self.parent._ice_udp | |
112 | |
113 @property | |
114 def client(self) -> SatXMPPEntity: | |
115 client = self.parent.client | |
116 assert client is not None | |
117 return client | |
118 | |
119 def join(self, user_jid: jid.JID) -> None: | |
120 self.protocol.send_data( | |
121 { | |
122 "type": "join", | |
123 "kind": "join", | |
124 "group": self.data_file.stem, | |
125 "username": user_jid.userhost(), | |
126 "password": "", | |
127 } | |
128 ) | |
129 | |
130 def send_offer(self, session: dict, sdp: str) -> None: | |
131 self.protocol.send_data( | |
132 { | |
133 "type": "offer", | |
134 "id": session["id"], | |
135 "label": "camera", | |
136 "username": session["peer_jid"].userhost(), | |
137 "sdp": sdp, | |
138 } | |
139 ) | |
140 | |
141 def add_candidate( | |
142 self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int | |
143 ) -> None: | |
144 """Add an ICE candidate. | |
145 | |
146 @param session: Jingle session. | |
147 @param candidate: ICE candidate, SDP format. | |
148 """ | |
149 self.protocol.send_data( | |
150 { | |
151 "type": "ice", | |
152 "id": session["id"], | |
153 "candidate": { | |
154 "candidate": mapping.generate_candidate_line(candidate), | |
155 "sdpMid": sdp_mid, | |
156 "sdpMLineIndex": sdp_mline_index, | |
157 }, | |
158 } | |
159 ) | |
160 | |
161 def on_joined(self, data: dict) -> None: | |
162 user_jid = jid.JID(data["username"]) | |
163 match data["kind"]: | |
164 case "join": | |
165 log.info(f"{user_jid} has joined {self}.") | |
166 case "fail": | |
167 log.warning(f"{user_jid} can't join {self}: {data}") | |
168 case "change": | |
169 log.debug(f"Change for {user_jid} in {self}.") | |
170 case "leave": | |
171 log.info(f"{user_jid} has left {self}.") | |
172 | |
173 def on_answer(self, data: dict) -> None: | |
174 """Called when SDP answer has been received | |
175 | |
176 Send the SDP to ``answer_sdp_d`` to continue workflow. | |
177 """ | |
178 session = self._j.get_session(self.client, data["id"]) | |
179 try: | |
180 answer_sdp_d = session.pop("answer_sdp_d") | |
181 except KeyError: | |
182 raise exceptions.InternalError( | |
183 '"answer_sdp_d" should be available in session.' | |
184 ) | |
185 else: | |
186 answer_sdp_d.callback(data["sdp"]) | |
187 | |
188 def on_ice(self, data: dict) -> None: | |
189 log.debug(f"ICE candidate: {data}") | |
190 session = self._j.get_session(self.client, data["id"]) | |
191 candidate_data = data["candidate"] | |
192 contents = session["contents"] | |
193 try: | |
194 content_id = list(contents)[candidate_data["sdpMLineIndex"]] | |
195 except IndexError: | |
196 log.error( | |
197 f"Can't find any content at index {candidate_data['sdpMLineIndex']}." | |
198 ) | |
199 return | |
200 content = contents[content_id] | |
201 local_ice_data = content["transport_data"]["local_ice_data"] | |
202 media = content["application_data"]["media"] | |
203 client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"]) | |
204 candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split()) | |
205 defer.ensureDeferred( | |
206 self._ice_udp.ice_candidates_add( | |
207 client, | |
208 session["id"], | |
209 { | |
210 media: { | |
211 "candidates": [candidate], | |
212 "ufrag": local_ice_data["ufrag"], | |
213 "pwd": local_ice_data["pwd"], | |
214 } | |
215 }, | |
216 ) | |
217 ) | |
218 | |
219 | |
220 class GaleneClientFactory(WebSocketClientFactory): | |
221 | |
222 def __init__(self, conference: Conference) -> None: | |
223 self.conference = conference | |
224 super().__init__(conference.endpoint) | |
225 | |
226 | |
227 class GaleneProtocol(WebSocketClientProtocol): | |
228 verbose = True | |
229 | |
230 @property | |
231 def conference(self) -> Conference: | |
232 conference: Conference = self.factory.conference | |
233 assert conference is not None | |
234 return conference | |
235 | |
236 def connectionMade(self) -> None: | |
237 super().connectionMade() | |
238 self.conference.protocol = self | |
239 | |
240 def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None: | |
241 super().connectionLost(reason) | |
242 | |
243 def onOpen(self): | |
244 handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())} | |
245 self.send_data(handshake_data) | |
246 | |
247 def send_data(self, data: dict) -> None: | |
248 if self.verbose: | |
249 log.debug(f"DATA SENT: {data}") | |
250 self.sendMessage(json.dumps(data).encode()) | |
251 | |
252 def onMessage(self, payload, isBinary): | |
253 if isBinary: | |
254 raise exceptions.DataError("Unexpected binary payload: {payload!r}") | |
255 try: | |
256 data = json.loads(payload) | |
257 except json.JSONDecodeError: | |
258 log.warning(f"Can't decode data: {payload!r}") | |
259 return | |
260 | |
261 try: | |
262 match data.get("type"): | |
263 case None: | |
264 log.warning(f'"type" is missing in data: {data!r}') | |
265 case "handshake": | |
266 version = data["version"][0] | |
267 | |
268 log.debug( | |
269 f"Handshake for group {self.conference.group_name!r}. Galène protocol " | |
270 f" v{version}." | |
271 ) | |
272 self.conference.ready.callback(None) | |
273 case "ping": | |
274 log.debug("pong") | |
275 self.send_data({"type": "pong"}) | |
276 case "joined" | "answer" | "ice" as data_type: | |
277 method = getattr(self.conference, f"on_{data_type}") | |
278 method(data) | |
279 case _: | |
280 log.debug(f"Unhandled message: {data}") | |
281 except (KeyError, IndexError): | |
282 log.exception(f"Unexpected data format: {data!r}") | |
283 | |
284 | |
285 class ConferencesComponent: | |
286 IMPORT_NAME = IMPORT_NAME | |
287 | |
288 def __init__(self, host): | |
289 self.host = host | |
290 self.client: SatXMPPEntity | None = None | |
291 self._e: XEP_0106 = host.plugins["XEP-0106"] | |
292 self._j: XEP_0166 = host.plugins["XEP-0166"] | |
293 self._ice_udp: XEP_0176 = host.plugins["XEP-0176"] | |
294 host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) | |
295 try: | |
296 galene_path = Path( | |
297 self.host.memory.config_get(CONF_SECTION, "galene_path") | |
298 or which("galene")[0] | |
299 ) | |
300 except IndexError: | |
301 raise exceptions.NotFound( | |
302 'Can\'t find "galene" executable, "conferences" component can\'t be ' | |
303 "started without it. Please install it in location accessible in PATH, " | |
304 'or use "galene_path" setting.' | |
305 ) | |
306 self.galene_http_port = self.host.memory.config_get( | |
307 CONF_SECTION, "http_port", "9443" | |
308 ) | |
309 galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene") | |
310 galene_static_path = galene_path.parent / "static" | |
311 self.galene_group_path = galene_data_path / "groups" | |
312 self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True) | |
313 try: | |
314 d = self._process = async_process.run( | |
315 str(galene_path), | |
316 "-static", | |
317 str(galene_static_path), | |
318 "-http", | |
319 f"127.0.0.1:{self.galene_http_port}", | |
320 # We don't want HTTPS here, it's only used for local interactions | |
321 "-insecure", | |
322 path=str(galene_data_path), | |
323 verbose=True, | |
324 ) | |
325 except Exception: | |
326 log.exception("Can't start Galene.") | |
327 else: | |
328 d.addErrback(self._galene_process_errback) | |
329 log.info(f"Galene instance started on port {self.galene_http_port}.") | |
330 | |
331 def get_handler(self, __): | |
332 return ConferencesHandler() | |
333 | |
334 def profile_connecting(self, client): | |
335 self.client = client | |
336 | |
337 async def attach_to_group(self, session: dict, group_name: str) -> Conference: | |
338 """Attach to a Galène group. | |
339 | |
340 Create a group data file if it doesn't exist. | |
341 Create and attach a Galene client. | |
342 | |
343 @param session: Jingle session data. | |
344 @param group_name: name of the conference group. | |
345 @return conference: Data of the conference. | |
346 """ | |
347 stem = regex.path_escape(group_name) | |
348 filename = f"{stem}.json" | |
349 data_file = self.galene_group_path / filename | |
350 if not data_file.exists(): | |
351 group_data = { | |
352 "wildcard-user": { | |
353 "password": {"type": "wildcard"}, | |
354 "permissions": "present", | |
355 }, | |
356 } | |
357 with data_file.open("w") as f: | |
358 json.dump(group_data, f) | |
359 log.debug(f"Conference data for {group_name!r} created at " f"{data_file} .") | |
360 | |
361 url = f"http://localhost:{self.galene_http_port}/group/{quote(stem)}" | |
362 status_url = f"{url}/.status" | |
363 | |
364 log.debug(f"Attaching to Galene.\n{url=}\n{status_url=}") | |
365 resp = await treq.get(status_url) | |
366 group_status = await resp.json() | |
367 log.debug(f"{group_status=}") | |
368 endpoint = group_status["endpoint"] | |
369 conference = Conference( | |
370 parent=self, | |
371 group_name=group_name, | |
372 data_file=data_file, | |
373 endpoint=endpoint, | |
374 status_url=status_url, | |
375 ) | |
376 | |
377 factory = GaleneClientFactory(conference) | |
378 # factory.setProtocolOptions(logOctets=True) | |
379 factory.protocol = GaleneProtocol | |
380 reactor.connectTCP("127.0.0.1", int(self.galene_http_port), factory) | |
381 | |
382 return conference | |
383 | |
384 async def _jingle_handler_trigger( | |
385 self, | |
386 client: SatXMPPEntity, | |
387 action: str, | |
388 session: dict, | |
389 content_name: str, | |
390 desc_elt: domish.Element, | |
391 ) -> None: | |
392 if client != self.client: | |
393 return | |
394 if action == self._j.A_PREPARE_CONFIRMATION: | |
395 if "conference" in session: | |
396 # We have already set up the conference. | |
397 return | |
398 local_jid: jid.JID = session["local_jid"] | |
399 if not local_jid.user: | |
400 raise StanzaError("forbidden", "A room name must be specified.") | |
401 group_name = self._e.unescape(local_jid.user) | |
402 | |
403 session["conference"] = await self.attach_to_group(session, group_name) | |
404 session["pre_accepted"] = True | |
405 session["call_setup_cb"] = self.on_call_setup | |
406 session["ice_candidates_new_cb"] = self.on_ice_candidates_new | |
407 | |
408 async def on_call_setup( | |
409 self, | |
410 client: SatXMPPEntity, | |
411 session: dict, | |
412 call_data: dict, | |
413 ) -> None: | |
414 if self.client is None or client != self.client: | |
415 raise exceptions.InternalError(f"Unexpected client: {client}") | |
416 try: | |
417 conference = session["conference"] | |
418 except KeyError: | |
419 raise exceptions.InternalError("Conference data is missing.") | |
420 | |
421 await conference.ready | |
422 conference.join(session["peer_jid"]) | |
423 conference.send_offer(session, call_data["sdp"]) | |
424 | |
425 def on_ice_candidates_new( | |
426 self, | |
427 client: SatXMPPEntity, | |
428 session: dict, | |
429 ice_candidates_data: dict[str, dict], | |
430 ) -> None: | |
431 try: | |
432 conference = session["conference"] | |
433 except KeyError: | |
434 raise exceptions.InternalError("Conference data is missing.") | |
435 for media, media_data in ice_candidates_data.items(): | |
436 for idx, (content_id, content) in enumerate(session["contents"].items()): | |
437 if content["application_data"]["media"] == media: | |
438 break | |
439 else: | |
440 log.error(f"Can't find corresponding content for {media!r}") | |
441 continue | |
442 sdp_mline_index: int = idx | |
443 sdp_mid: str = content_id | |
444 | |
445 for candidate in media_data["candidates"]: | |
446 conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index) | |
447 | |
448 def _galene_process_errback(self, failure_: Failure) -> None: | |
449 log.error(f"Can't run Galene process: {failure_.value}") | |
450 | |
451 | |
452 @implementer(iwokkel.IDisco) | |
453 class ConferencesHandler(XMPPHandler): | |
454 | |
455 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
456 return [disco.DiscoIdentity("conference", "audio-video")] |