Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4278:240d8b7ad906
component Conferences: implementation of SFU component to make multi-party A/V conferences:
This component wrap `Galène` SFU and translate its signaling to XMPP Jingle in both
direction.
rel 445
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 05 Jul 2024 17:18:37 +0200 |
parents | |
children | 96fdf4891747 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_comp_conferences/__init__.py Fri Jul 05 17:18:37 2024 +0200 @@ -0,0 +1,456 @@ +#!/usr/bin/env python3 + +# Libervia A/V Conferences Component +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +from pathlib import Path +from urllib.parse import quote + +from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol +from shortuuid import uuid +import treq +from twisted.internet import defer, reactor +from twisted.internet.error import ConnectionDone +from twisted.python import failure +from twisted.python.failure import Failure +from twisted.python.procutils import which +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 +from libervia.backend.plugins.plugin_xep_0166 import XEP_0166 +from libervia.backend.plugins.plugin_xep_0167 import mapping +from libervia.backend.plugins.plugin_xep_0176 import XEP_0176 +from libervia.backend.tools.common import async_process, regex + + +log = getLogger(__name__) + +IMPORT_NAME = "conferences" + +PLUGIN_INFO = { + C.PI_NAME: "A/V Conferences Component", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_MODES: [C.PLUG_MODE_COMPONENT], + C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "ConferencesComponent", + C.PI_HANDLER: C.BOOL_FALSE, + C.PI_DESCRIPTION: _( + "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n" + "The Galène project (https://galene.org) is currently the only supported backend." + ), +} + +CONF_SECTION = f"component {IMPORT_NAME}" + + +class Conference: + + def __init__( + self, + parent: "ConferencesComponent", + group_name: str, + data_file: Path, + endpoint: str, + status_url: str, + ) -> None: + self.parent = parent + self.group_name = group_name + self.data_file = data_file + self.endpoint = endpoint + self.status_url = status_url + self._protocol: "GaleneProtocol|None" = None + self.connected = defer.Deferred() + self.ready = defer.Deferred() + + def __str__(self): + return f"conference {self.group_name!r}" + + @property + def protocol(self) -> "GaleneProtocol": + assert self._protocol is not None + return self._protocol + + @protocol.setter + def protocol(self, protocol: "GaleneProtocol") -> None: + self._protocol = protocol + self.connected.callback(None) + + @property + def _j(self) -> XEP_0166: + return self.parent._j + + @property + def _ice_udp(self) -> XEP_0176: + return self.parent._ice_udp + + @property + def client(self) -> SatXMPPEntity: + client = self.parent.client + assert client is not None + return client + + def join(self, user_jid: jid.JID) -> None: + self.protocol.send_data( + { + "type": "join", + "kind": "join", + "group": self.data_file.stem, + "username": user_jid.userhost(), + "password": "", + } + ) + + def send_offer(self, session: dict, sdp: str) -> None: + self.protocol.send_data( + { + "type": "offer", + "id": session["id"], + "label": "camera", + "username": session["peer_jid"].userhost(), + "sdp": sdp, + } + ) + + def add_candidate( + self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int + ) -> None: + """Add an ICE candidate. + + @param session: Jingle session. + @param candidate: ICE candidate, SDP format. + """ + self.protocol.send_data( + { + "type": "ice", + "id": session["id"], + "candidate": { + "candidate": mapping.generate_candidate_line(candidate), + "sdpMid": sdp_mid, + "sdpMLineIndex": sdp_mline_index, + }, + } + ) + + def on_joined(self, data: dict) -> None: + user_jid = jid.JID(data["username"]) + match data["kind"]: + case "join": + log.info(f"{user_jid} has joined {self}.") + case "fail": + log.warning(f"{user_jid} can't join {self}: {data}") + case "change": + log.debug(f"Change for {user_jid} in {self}.") + case "leave": + log.info(f"{user_jid} has left {self}.") + + def on_answer(self, data: dict) -> None: + """Called when SDP answer has been received + + Send the SDP to ``answer_sdp_d`` to continue workflow. + """ + session = self._j.get_session(self.client, data["id"]) + try: + answer_sdp_d = session.pop("answer_sdp_d") + except KeyError: + raise exceptions.InternalError( + '"answer_sdp_d" should be available in session.' + ) + else: + answer_sdp_d.callback(data["sdp"]) + + def on_ice(self, data: dict) -> None: + log.debug(f"ICE candidate: {data}") + session = self._j.get_session(self.client, data["id"]) + candidate_data = data["candidate"] + contents = session["contents"] + try: + content_id = list(contents)[candidate_data["sdpMLineIndex"]] + except IndexError: + log.error( + f"Can't find any content at index {candidate_data['sdpMLineIndex']}." + ) + return + content = contents[content_id] + local_ice_data = content["transport_data"]["local_ice_data"] + media = content["application_data"]["media"] + client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"]) + candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split()) + defer.ensureDeferred( + self._ice_udp.ice_candidates_add( + client, + session["id"], + { + media: { + "candidates": [candidate], + "ufrag": local_ice_data["ufrag"], + "pwd": local_ice_data["pwd"], + } + }, + ) + ) + + +class GaleneClientFactory(WebSocketClientFactory): + + def __init__(self, conference: Conference) -> None: + self.conference = conference + super().__init__(conference.endpoint) + + +class GaleneProtocol(WebSocketClientProtocol): + verbose = True + + @property + def conference(self) -> Conference: + conference: Conference = self.factory.conference + assert conference is not None + return conference + + def connectionMade(self) -> None: + super().connectionMade() + self.conference.protocol = self + + def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None: + super().connectionLost(reason) + + def onOpen(self): + handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())} + self.send_data(handshake_data) + + def send_data(self, data: dict) -> None: + if self.verbose: + log.debug(f"DATA SENT: {data}") + self.sendMessage(json.dumps(data).encode()) + + def onMessage(self, payload, isBinary): + if isBinary: + raise exceptions.DataError("Unexpected binary payload: {payload!r}") + try: + data = json.loads(payload) + except json.JSONDecodeError: + log.warning(f"Can't decode data: {payload!r}") + return + + try: + match data.get("type"): + case None: + log.warning(f'"type" is missing in data: {data!r}') + case "handshake": + version = data["version"][0] + + log.debug( + f"Handshake for group {self.conference.group_name!r}. Galène protocol " + f" v{version}." + ) + self.conference.ready.callback(None) + case "ping": + log.debug("pong") + self.send_data({"type": "pong"}) + case "joined" | "answer" | "ice" as data_type: + method = getattr(self.conference, f"on_{data_type}") + method(data) + case _: + log.debug(f"Unhandled message: {data}") + except (KeyError, IndexError): + log.exception(f"Unexpected data format: {data!r}") + + +class ConferencesComponent: + IMPORT_NAME = IMPORT_NAME + + def __init__(self, host): + self.host = host + self.client: SatXMPPEntity | None = None + self._e: XEP_0106 = host.plugins["XEP-0106"] + self._j: XEP_0166 = host.plugins["XEP-0166"] + self._ice_udp: XEP_0176 = host.plugins["XEP-0176"] + host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) + try: + galene_path = Path( + self.host.memory.config_get(CONF_SECTION, "galene_path") + or which("galene")[0] + ) + except IndexError: + raise exceptions.NotFound( + 'Can\'t find "galene" executable, "conferences" component can\'t be ' + "started without it. Please install it in location accessible in PATH, " + 'or use "galene_path" setting.' + ) + self.galene_http_port = self.host.memory.config_get( + CONF_SECTION, "http_port", "9443" + ) + galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene") + galene_static_path = galene_path.parent / "static" + self.galene_group_path = galene_data_path / "groups" + self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True) + try: + d = self._process = async_process.run( + str(galene_path), + "-static", + str(galene_static_path), + "-http", + f"127.0.0.1:{self.galene_http_port}", + # We don't want HTTPS here, it's only used for local interactions + "-insecure", + path=str(galene_data_path), + verbose=True, + ) + except Exception: + log.exception("Can't start Galene.") + else: + d.addErrback(self._galene_process_errback) + log.info(f"Galene instance started on port {self.galene_http_port}.") + + def get_handler(self, __): + return ConferencesHandler() + + def profile_connecting(self, client): + self.client = client + + async def attach_to_group(self, session: dict, group_name: str) -> Conference: + """Attach to a Galène group. + + Create a group data file if it doesn't exist. + Create and attach a Galene client. + + @param session: Jingle session data. + @param group_name: name of the conference group. + @return conference: Data of the conference. + """ + stem = regex.path_escape(group_name) + filename = f"{stem}.json" + data_file = self.galene_group_path / filename + if not data_file.exists(): + group_data = { + "wildcard-user": { + "password": {"type": "wildcard"}, + "permissions": "present", + }, + } + with data_file.open("w") as f: + json.dump(group_data, f) + log.debug(f"Conference data for {group_name!r} created at " f"{data_file} .") + + url = f"http://localhost:{self.galene_http_port}/group/{quote(stem)}" + status_url = f"{url}/.status" + + log.debug(f"Attaching to Galene.\n{url=}\n{status_url=}") + resp = await treq.get(status_url) + group_status = await resp.json() + log.debug(f"{group_status=}") + endpoint = group_status["endpoint"] + conference = Conference( + parent=self, + group_name=group_name, + data_file=data_file, + endpoint=endpoint, + status_url=status_url, + ) + + factory = GaleneClientFactory(conference) + # factory.setProtocolOptions(logOctets=True) + factory.protocol = GaleneProtocol + reactor.connectTCP("127.0.0.1", int(self.galene_http_port), factory) + + return conference + + async def _jingle_handler_trigger( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + desc_elt: domish.Element, + ) -> None: + if client != self.client: + return + if action == self._j.A_PREPARE_CONFIRMATION: + if "conference" in session: + # We have already set up the conference. + return + local_jid: jid.JID = session["local_jid"] + if not local_jid.user: + raise StanzaError("forbidden", "A room name must be specified.") + group_name = self._e.unescape(local_jid.user) + + session["conference"] = await self.attach_to_group(session, group_name) + session["pre_accepted"] = True + session["call_setup_cb"] = self.on_call_setup + session["ice_candidates_new_cb"] = self.on_ice_candidates_new + + async def on_call_setup( + self, + client: SatXMPPEntity, + session: dict, + call_data: dict, + ) -> None: + if self.client is None or client != self.client: + raise exceptions.InternalError(f"Unexpected client: {client}") + try: + conference = session["conference"] + except KeyError: + raise exceptions.InternalError("Conference data is missing.") + + await conference.ready + conference.join(session["peer_jid"]) + conference.send_offer(session, call_data["sdp"]) + + def on_ice_candidates_new( + self, + client: SatXMPPEntity, + session: dict, + ice_candidates_data: dict[str, dict], + ) -> None: + try: + conference = session["conference"] + except KeyError: + raise exceptions.InternalError("Conference data is missing.") + for media, media_data in ice_candidates_data.items(): + for idx, (content_id, content) in enumerate(session["contents"].items()): + if content["application_data"]["media"] == media: + break + else: + log.error(f"Can't find corresponding content for {media!r}") + continue + sdp_mline_index: int = idx + sdp_mid: str = content_id + + for candidate in media_data["candidates"]: + conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index) + + def _galene_process_errback(self, failure_: Failure) -> None: + log.error(f"Can't run Galene process: {failure_.value}") + + +@implementer(iwokkel.IDisco) +class ConferencesHandler(XMPPHandler): + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoIdentity("conference", "audio-video")]