comparison libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4278:240d8b7ad906

component Conferences: implementation of SFU component to make multi-party A/V conferences: This component wrap `Galène` SFU and translate its signaling to XMPP Jingle in both direction. rel 445
author Goffi <goffi@goffi.org>
date Fri, 05 Jul 2024 17:18:37 +0200
parents
children 96fdf4891747
comparison
equal deleted inserted replaced
4277:b4b4ea8c5c87 4278:240d8b7ad906
1 #!/usr/bin/env python3
2
3 # Libervia A/V Conferences Component
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import json
20 from pathlib import Path
21 from urllib.parse import quote
22
23 from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
24 from shortuuid import uuid
25 import treq
26 from twisted.internet import defer, reactor
27 from twisted.internet.error import ConnectionDone
28 from twisted.python import failure
29 from twisted.python.failure import Failure
30 from twisted.python.procutils import which
31 from twisted.words.protocols.jabber import jid
32 from twisted.words.protocols.jabber.error import StanzaError
33 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
34 from twisted.words.xish import domish
35 from wokkel import disco, iwokkel
36 from zope.interface import implementer
37
38 from libervia.backend.core import exceptions
39 from libervia.backend.core.constants import Const as C
40 from libervia.backend.core.core_types import SatXMPPEntity
41 from libervia.backend.core.i18n import _
42 from libervia.backend.core.log import getLogger
43 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
44 from libervia.backend.plugins.plugin_xep_0166 import XEP_0166
45 from libervia.backend.plugins.plugin_xep_0167 import mapping
46 from libervia.backend.plugins.plugin_xep_0176 import XEP_0176
47 from libervia.backend.tools.common import async_process, regex
48
49
50 log = getLogger(__name__)
51
52 IMPORT_NAME = "conferences"
53
54 PLUGIN_INFO = {
55 C.PI_NAME: "A/V Conferences Component",
56 C.PI_IMPORT_NAME: IMPORT_NAME,
57 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
58 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
59 C.PI_PROTOCOLS: [],
60 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"],
61 C.PI_RECOMMENDATIONS: [],
62 C.PI_MAIN: "ConferencesComponent",
63 C.PI_HANDLER: C.BOOL_FALSE,
64 C.PI_DESCRIPTION: _(
65 "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n"
66 "The Galène project (https://galene.org) is currently the only supported backend."
67 ),
68 }
69
70 CONF_SECTION = f"component {IMPORT_NAME}"
71
72
73 class Conference:
74
75 def __init__(
76 self,
77 parent: "ConferencesComponent",
78 group_name: str,
79 data_file: Path,
80 endpoint: str,
81 status_url: str,
82 ) -> None:
83 self.parent = parent
84 self.group_name = group_name
85 self.data_file = data_file
86 self.endpoint = endpoint
87 self.status_url = status_url
88 self._protocol: "GaleneProtocol|None" = None
89 self.connected = defer.Deferred()
90 self.ready = defer.Deferred()
91
92 def __str__(self):
93 return f"conference {self.group_name!r}"
94
95 @property
96 def protocol(self) -> "GaleneProtocol":
97 assert self._protocol is not None
98 return self._protocol
99
100 @protocol.setter
101 def protocol(self, protocol: "GaleneProtocol") -> None:
102 self._protocol = protocol
103 self.connected.callback(None)
104
105 @property
106 def _j(self) -> XEP_0166:
107 return self.parent._j
108
109 @property
110 def _ice_udp(self) -> XEP_0176:
111 return self.parent._ice_udp
112
113 @property
114 def client(self) -> SatXMPPEntity:
115 client = self.parent.client
116 assert client is not None
117 return client
118
119 def join(self, user_jid: jid.JID) -> None:
120 self.protocol.send_data(
121 {
122 "type": "join",
123 "kind": "join",
124 "group": self.data_file.stem,
125 "username": user_jid.userhost(),
126 "password": "",
127 }
128 )
129
130 def send_offer(self, session: dict, sdp: str) -> None:
131 self.protocol.send_data(
132 {
133 "type": "offer",
134 "id": session["id"],
135 "label": "camera",
136 "username": session["peer_jid"].userhost(),
137 "sdp": sdp,
138 }
139 )
140
141 def add_candidate(
142 self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int
143 ) -> None:
144 """Add an ICE candidate.
145
146 @param session: Jingle session.
147 @param candidate: ICE candidate, SDP format.
148 """
149 self.protocol.send_data(
150 {
151 "type": "ice",
152 "id": session["id"],
153 "candidate": {
154 "candidate": mapping.generate_candidate_line(candidate),
155 "sdpMid": sdp_mid,
156 "sdpMLineIndex": sdp_mline_index,
157 },
158 }
159 )
160
161 def on_joined(self, data: dict) -> None:
162 user_jid = jid.JID(data["username"])
163 match data["kind"]:
164 case "join":
165 log.info(f"{user_jid} has joined {self}.")
166 case "fail":
167 log.warning(f"{user_jid} can't join {self}: {data}")
168 case "change":
169 log.debug(f"Change for {user_jid} in {self}.")
170 case "leave":
171 log.info(f"{user_jid} has left {self}.")
172
173 def on_answer(self, data: dict) -> None:
174 """Called when SDP answer has been received
175
176 Send the SDP to ``answer_sdp_d`` to continue workflow.
177 """
178 session = self._j.get_session(self.client, data["id"])
179 try:
180 answer_sdp_d = session.pop("answer_sdp_d")
181 except KeyError:
182 raise exceptions.InternalError(
183 '"answer_sdp_d" should be available in session.'
184 )
185 else:
186 answer_sdp_d.callback(data["sdp"])
187
188 def on_ice(self, data: dict) -> None:
189 log.debug(f"ICE candidate: {data}")
190 session = self._j.get_session(self.client, data["id"])
191 candidate_data = data["candidate"]
192 contents = session["contents"]
193 try:
194 content_id = list(contents)[candidate_data["sdpMLineIndex"]]
195 except IndexError:
196 log.error(
197 f"Can't find any content at index {candidate_data['sdpMLineIndex']}."
198 )
199 return
200 content = contents[content_id]
201 local_ice_data = content["transport_data"]["local_ice_data"]
202 media = content["application_data"]["media"]
203 client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"])
204 candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split())
205 defer.ensureDeferred(
206 self._ice_udp.ice_candidates_add(
207 client,
208 session["id"],
209 {
210 media: {
211 "candidates": [candidate],
212 "ufrag": local_ice_data["ufrag"],
213 "pwd": local_ice_data["pwd"],
214 }
215 },
216 )
217 )
218
219
220 class GaleneClientFactory(WebSocketClientFactory):
221
222 def __init__(self, conference: Conference) -> None:
223 self.conference = conference
224 super().__init__(conference.endpoint)
225
226
227 class GaleneProtocol(WebSocketClientProtocol):
228 verbose = True
229
230 @property
231 def conference(self) -> Conference:
232 conference: Conference = self.factory.conference
233 assert conference is not None
234 return conference
235
236 def connectionMade(self) -> None:
237 super().connectionMade()
238 self.conference.protocol = self
239
240 def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None:
241 super().connectionLost(reason)
242
243 def onOpen(self):
244 handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())}
245 self.send_data(handshake_data)
246
247 def send_data(self, data: dict) -> None:
248 if self.verbose:
249 log.debug(f"DATA SENT: {data}")
250 self.sendMessage(json.dumps(data).encode())
251
252 def onMessage(self, payload, isBinary):
253 if isBinary:
254 raise exceptions.DataError("Unexpected binary payload: {payload!r}")
255 try:
256 data = json.loads(payload)
257 except json.JSONDecodeError:
258 log.warning(f"Can't decode data: {payload!r}")
259 return
260
261 try:
262 match data.get("type"):
263 case None:
264 log.warning(f'"type" is missing in data: {data!r}')
265 case "handshake":
266 version = data["version"][0]
267
268 log.debug(
269 f"Handshake for group {self.conference.group_name!r}. Galène protocol "
270 f" v{version}."
271 )
272 self.conference.ready.callback(None)
273 case "ping":
274 log.debug("pong")
275 self.send_data({"type": "pong"})
276 case "joined" | "answer" | "ice" as data_type:
277 method = getattr(self.conference, f"on_{data_type}")
278 method(data)
279 case _:
280 log.debug(f"Unhandled message: {data}")
281 except (KeyError, IndexError):
282 log.exception(f"Unexpected data format: {data!r}")
283
284
285 class ConferencesComponent:
286 IMPORT_NAME = IMPORT_NAME
287
288 def __init__(self, host):
289 self.host = host
290 self.client: SatXMPPEntity | None = None
291 self._e: XEP_0106 = host.plugins["XEP-0106"]
292 self._j: XEP_0166 = host.plugins["XEP-0166"]
293 self._ice_udp: XEP_0176 = host.plugins["XEP-0176"]
294 host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)
295 try:
296 galene_path = Path(
297 self.host.memory.config_get(CONF_SECTION, "galene_path")
298 or which("galene")[0]
299 )
300 except IndexError:
301 raise exceptions.NotFound(
302 'Can\'t find "galene" executable, "conferences" component can\'t be '
303 "started without it. Please install it in location accessible in PATH, "
304 'or use "galene_path" setting.'
305 )
306 self.galene_http_port = self.host.memory.config_get(
307 CONF_SECTION, "http_port", "9443"
308 )
309 galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene")
310 galene_static_path = galene_path.parent / "static"
311 self.galene_group_path = galene_data_path / "groups"
312 self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True)
313 try:
314 d = self._process = async_process.run(
315 str(galene_path),
316 "-static",
317 str(galene_static_path),
318 "-http",
319 f"127.0.0.1:{self.galene_http_port}",
320 # We don't want HTTPS here, it's only used for local interactions
321 "-insecure",
322 path=str(galene_data_path),
323 verbose=True,
324 )
325 except Exception:
326 log.exception("Can't start Galene.")
327 else:
328 d.addErrback(self._galene_process_errback)
329 log.info(f"Galene instance started on port {self.galene_http_port}.")
330
331 def get_handler(self, __):
332 return ConferencesHandler()
333
334 def profile_connecting(self, client):
335 self.client = client
336
337 async def attach_to_group(self, session: dict, group_name: str) -> Conference:
338 """Attach to a Galène group.
339
340 Create a group data file if it doesn't exist.
341 Create and attach a Galene client.
342
343 @param session: Jingle session data.
344 @param group_name: name of the conference group.
345 @return conference: Data of the conference.
346 """
347 stem = regex.path_escape(group_name)
348 filename = f"{stem}.json"
349 data_file = self.galene_group_path / filename
350 if not data_file.exists():
351 group_data = {
352 "wildcard-user": {
353 "password": {"type": "wildcard"},
354 "permissions": "present",
355 },
356 }
357 with data_file.open("w") as f:
358 json.dump(group_data, f)
359 log.debug(f"Conference data for {group_name!r} created at " f"{data_file} .")
360
361 url = f"http://localhost:{self.galene_http_port}/group/{quote(stem)}"
362 status_url = f"{url}/.status"
363
364 log.debug(f"Attaching to Galene.\n{url=}\n{status_url=}")
365 resp = await treq.get(status_url)
366 group_status = await resp.json()
367 log.debug(f"{group_status=}")
368 endpoint = group_status["endpoint"]
369 conference = Conference(
370 parent=self,
371 group_name=group_name,
372 data_file=data_file,
373 endpoint=endpoint,
374 status_url=status_url,
375 )
376
377 factory = GaleneClientFactory(conference)
378 # factory.setProtocolOptions(logOctets=True)
379 factory.protocol = GaleneProtocol
380 reactor.connectTCP("127.0.0.1", int(self.galene_http_port), factory)
381
382 return conference
383
384 async def _jingle_handler_trigger(
385 self,
386 client: SatXMPPEntity,
387 action: str,
388 session: dict,
389 content_name: str,
390 desc_elt: domish.Element,
391 ) -> None:
392 if client != self.client:
393 return
394 if action == self._j.A_PREPARE_CONFIRMATION:
395 if "conference" in session:
396 # We have already set up the conference.
397 return
398 local_jid: jid.JID = session["local_jid"]
399 if not local_jid.user:
400 raise StanzaError("forbidden", "A room name must be specified.")
401 group_name = self._e.unescape(local_jid.user)
402
403 session["conference"] = await self.attach_to_group(session, group_name)
404 session["pre_accepted"] = True
405 session["call_setup_cb"] = self.on_call_setup
406 session["ice_candidates_new_cb"] = self.on_ice_candidates_new
407
408 async def on_call_setup(
409 self,
410 client: SatXMPPEntity,
411 session: dict,
412 call_data: dict,
413 ) -> None:
414 if self.client is None or client != self.client:
415 raise exceptions.InternalError(f"Unexpected client: {client}")
416 try:
417 conference = session["conference"]
418 except KeyError:
419 raise exceptions.InternalError("Conference data is missing.")
420
421 await conference.ready
422 conference.join(session["peer_jid"])
423 conference.send_offer(session, call_data["sdp"])
424
425 def on_ice_candidates_new(
426 self,
427 client: SatXMPPEntity,
428 session: dict,
429 ice_candidates_data: dict[str, dict],
430 ) -> None:
431 try:
432 conference = session["conference"]
433 except KeyError:
434 raise exceptions.InternalError("Conference data is missing.")
435 for media, media_data in ice_candidates_data.items():
436 for idx, (content_id, content) in enumerate(session["contents"].items()):
437 if content["application_data"]["media"] == media:
438 break
439 else:
440 log.error(f"Can't find corresponding content for {media!r}")
441 continue
442 sdp_mline_index: int = idx
443 sdp_mid: str = content_id
444
445 for candidate in media_data["candidates"]:
446 conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index)
447
448 def _galene_process_errback(self, failure_: Failure) -> None:
449 log.error(f"Can't run Galene process: {failure_.value}")
450
451
452 @implementer(iwokkel.IDisco)
453 class ConferencesHandler(XMPPHandler):
454
455 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
456 return [disco.DiscoIdentity("conference", "audio-video")]