Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4282:8da377040ba6
doc (encryption): update pubsub encryption specifications.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 13 Jul 2024 17:45:47 +0200 |
parents | 240d8b7ad906 |
children | 96fdf4891747 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia A/V Conferences Component # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import json from pathlib import Path from urllib.parse import quote from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol from shortuuid import uuid import treq from twisted.internet import defer, reactor from twisted.internet.error import ConnectionDone from twisted.python import failure from twisted.python.failure import Failure from twisted.python.procutils import which from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.plugins.plugin_xep_0166 import XEP_0166 from libervia.backend.plugins.plugin_xep_0167 import mapping from libervia.backend.plugins.plugin_xep_0176 import XEP_0176 from libervia.backend.tools.common import async_process, regex log = getLogger(__name__) IMPORT_NAME = "conferences" PLUGIN_INFO = { C.PI_NAME: "A/V Conferences Component", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "ConferencesComponent", C.PI_HANDLER: C.BOOL_FALSE, C.PI_DESCRIPTION: _( "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n" "The Galène project (https://galene.org) is currently the only supported backend." ), } CONF_SECTION = f"component {IMPORT_NAME}" class Conference: def __init__( self, parent: "ConferencesComponent", group_name: str, data_file: Path, endpoint: str, status_url: str, ) -> None: self.parent = parent self.group_name = group_name self.data_file = data_file self.endpoint = endpoint self.status_url = status_url self._protocol: "GaleneProtocol|None" = None self.connected = defer.Deferred() self.ready = defer.Deferred() def __str__(self): return f"conference {self.group_name!r}" @property def protocol(self) -> "GaleneProtocol": assert self._protocol is not None return self._protocol @protocol.setter def protocol(self, protocol: "GaleneProtocol") -> None: self._protocol = protocol self.connected.callback(None) @property def _j(self) -> XEP_0166: return self.parent._j @property def _ice_udp(self) -> XEP_0176: return self.parent._ice_udp @property def client(self) -> SatXMPPEntity: client = self.parent.client assert client is not None return client def join(self, user_jid: jid.JID) -> None: self.protocol.send_data( { "type": "join", "kind": "join", "group": self.data_file.stem, "username": user_jid.userhost(), "password": "", } ) def send_offer(self, session: dict, sdp: str) -> None: self.protocol.send_data( { "type": "offer", "id": session["id"], "label": "camera", "username": session["peer_jid"].userhost(), "sdp": sdp, } ) def add_candidate( self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int ) -> None: """Add an ICE candidate. @param session: Jingle session. @param candidate: ICE candidate, SDP format. """ self.protocol.send_data( { "type": "ice", "id": session["id"], "candidate": { "candidate": mapping.generate_candidate_line(candidate), "sdpMid": sdp_mid, "sdpMLineIndex": sdp_mline_index, }, } ) def on_joined(self, data: dict) -> None: user_jid = jid.JID(data["username"]) match data["kind"]: case "join": log.info(f"{user_jid} has joined {self}.") case "fail": log.warning(f"{user_jid} can't join {self}: {data}") case "change": log.debug(f"Change for {user_jid} in {self}.") case "leave": log.info(f"{user_jid} has left {self}.") def on_answer(self, data: dict) -> None: """Called when SDP answer has been received Send the SDP to ``answer_sdp_d`` to continue workflow. """ session = self._j.get_session(self.client, data["id"]) try: answer_sdp_d = session.pop("answer_sdp_d") except KeyError: raise exceptions.InternalError( '"answer_sdp_d" should be available in session.' ) else: answer_sdp_d.callback(data["sdp"]) def on_ice(self, data: dict) -> None: log.debug(f"ICE candidate: {data}") session = self._j.get_session(self.client, data["id"]) candidate_data = data["candidate"] contents = session["contents"] try: content_id = list(contents)[candidate_data["sdpMLineIndex"]] except IndexError: log.error( f"Can't find any content at index {candidate_data['sdpMLineIndex']}." ) return content = contents[content_id] local_ice_data = content["transport_data"]["local_ice_data"] media = content["application_data"]["media"] client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"]) candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split()) defer.ensureDeferred( self._ice_udp.ice_candidates_add( client, session["id"], { media: { "candidates": [candidate], "ufrag": local_ice_data["ufrag"], "pwd": local_ice_data["pwd"], } }, ) ) class GaleneClientFactory(WebSocketClientFactory): def __init__(self, conference: Conference) -> None: self.conference = conference super().__init__(conference.endpoint) class GaleneProtocol(WebSocketClientProtocol): verbose = True @property def conference(self) -> Conference: conference: Conference = self.factory.conference assert conference is not None return conference def connectionMade(self) -> None: super().connectionMade() self.conference.protocol = self def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None: super().connectionLost(reason) def onOpen(self): handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())} self.send_data(handshake_data) def send_data(self, data: dict) -> None: if self.verbose: log.debug(f"DATA SENT: {data}") self.sendMessage(json.dumps(data).encode()) def onMessage(self, payload, isBinary): if isBinary: raise exceptions.DataError("Unexpected binary payload: {payload!r}") try: data = json.loads(payload) except json.JSONDecodeError: log.warning(f"Can't decode data: {payload!r}") return try: match data.get("type"): case None: log.warning(f'"type" is missing in data: {data!r}') case "handshake": version = data["version"][0] log.debug( f"Handshake for group {self.conference.group_name!r}. Galène protocol " f" v{version}." ) self.conference.ready.callback(None) case "ping": log.debug("pong") self.send_data({"type": "pong"}) case "joined" | "answer" | "ice" as data_type: method = getattr(self.conference, f"on_{data_type}") method(data) case _: log.debug(f"Unhandled message: {data}") except (KeyError, IndexError): log.exception(f"Unexpected data format: {data!r}") class ConferencesComponent: IMPORT_NAME = IMPORT_NAME def __init__(self, host): self.host = host self.client: SatXMPPEntity | None = None self._e: XEP_0106 = host.plugins["XEP-0106"] self._j: XEP_0166 = host.plugins["XEP-0166"] self._ice_udp: XEP_0176 = host.plugins["XEP-0176"] host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) try: galene_path = Path( self.host.memory.config_get(CONF_SECTION, "galene_path") or which("galene")[0] ) except IndexError: raise exceptions.NotFound( 'Can\'t find "galene" executable, "conferences" component can\'t be ' "started without it. Please install it in location accessible in PATH, " 'or use "galene_path" setting.' ) self.galene_http_port = self.host.memory.config_get( CONF_SECTION, "http_port", "9443" ) galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene") galene_static_path = galene_path.parent / "static" self.galene_group_path = galene_data_path / "groups" self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True) try: d = self._process = async_process.run( str(galene_path), "-static", str(galene_static_path), "-http", f"127.0.0.1:{self.galene_http_port}", # We don't want HTTPS here, it's only used for local interactions "-insecure", path=str(galene_data_path), verbose=True, ) except Exception: log.exception("Can't start Galene.") else: d.addErrback(self._galene_process_errback) log.info(f"Galene instance started on port {self.galene_http_port}.") def get_handler(self, __): return ConferencesHandler() def profile_connecting(self, client): self.client = client async def attach_to_group(self, session: dict, group_name: str) -> Conference: """Attach to a Galène group. Create a group data file if it doesn't exist. Create and attach a Galene client. @param session: Jingle session data. @param group_name: name of the conference group. @return conference: Data of the conference. """ stem = regex.path_escape(group_name) filename = f"{stem}.json" data_file = self.galene_group_path / filename if not data_file.exists(): group_data = { "wildcard-user": { "password": {"type": "wildcard"}, "permissions": "present", }, } with data_file.open("w") as f: json.dump(group_data, f) log.debug(f"Conference data for {group_name!r} created at " f"{data_file} .") url = f"http://localhost:{self.galene_http_port}/group/{quote(stem)}" status_url = f"{url}/.status" log.debug(f"Attaching to Galene.\n{url=}\n{status_url=}") resp = await treq.get(status_url) group_status = await resp.json() log.debug(f"{group_status=}") endpoint = group_status["endpoint"] conference = Conference( parent=self, group_name=group_name, data_file=data_file, endpoint=endpoint, status_url=status_url, ) factory = GaleneClientFactory(conference) # factory.setProtocolOptions(logOctets=True) factory.protocol = GaleneProtocol reactor.connectTCP("127.0.0.1", int(self.galene_http_port), factory) return conference async def _jingle_handler_trigger( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, desc_elt: domish.Element, ) -> None: if client != self.client: return if action == self._j.A_PREPARE_CONFIRMATION: if "conference" in session: # We have already set up the conference. return local_jid: jid.JID = session["local_jid"] if not local_jid.user: raise StanzaError("forbidden", "A room name must be specified.") group_name = self._e.unescape(local_jid.user) session["conference"] = await self.attach_to_group(session, group_name) session["pre_accepted"] = True session["call_setup_cb"] = self.on_call_setup session["ice_candidates_new_cb"] = self.on_ice_candidates_new async def on_call_setup( self, client: SatXMPPEntity, session: dict, call_data: dict, ) -> None: if self.client is None or client != self.client: raise exceptions.InternalError(f"Unexpected client: {client}") try: conference = session["conference"] except KeyError: raise exceptions.InternalError("Conference data is missing.") await conference.ready conference.join(session["peer_jid"]) conference.send_offer(session, call_data["sdp"]) def on_ice_candidates_new( self, client: SatXMPPEntity, session: dict, ice_candidates_data: dict[str, dict], ) -> None: try: conference = session["conference"] except KeyError: raise exceptions.InternalError("Conference data is missing.") for media, media_data in ice_candidates_data.items(): for idx, (content_id, content) in enumerate(session["contents"].items()): if content["application_data"]["media"] == media: break else: log.error(f"Can't find corresponding content for {media!r}") continue sdp_mline_index: int = idx sdp_mid: str = content_id for candidate in media_data["candidates"]: conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index) def _galene_process_errback(self, failure_: Failure) -> None: log.error(f"Can't run Galene process: {failure_.value}") @implementer(iwokkel.IDisco) class ConferencesHandler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoIdentity("conference", "audio-video")]