Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4337:95792a1f26c7
component email gateway: attachments handling:
attachments are now stored, and metadata are created in database.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:23 +0100 |
parents | a0ed5c976bf8 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia A/V Conferences Component # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import json import os from pathlib import Path from urllib.parse import quote from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol from shortuuid import uuid import treq from twisted.internet import defer, reactor from twisted.internet.error import ConnectionDone from twisted.python import failure from twisted.python.failure import Failure from twisted.python.procutils import which from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.plugins.plugin_xep_0166 import XEP_0166 from libervia.backend.plugins.plugin_xep_0167 import XEP_0167, mapping, NS_AV_CONFERENCES from libervia.backend.plugins.plugin_xep_0176 import XEP_0176 from libervia.backend.plugins.plugin_xep_0298 import XEP_0298 from libervia.backend.tools.common import async_process, regex log = getLogger(__name__) IMPORT_NAME = "conferences" NAME = "Libervia A/V Conferences" PLUGIN_INFO = { C.PI_NAME: "A/V Conferences Component", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176", "XEP-0298"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "ConferencesComponent", C.PI_HANDLER: C.BOOL_TRUE, C.PI_DESCRIPTION: _( "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n" "The Galène project (https://galene.org) is currently the only supported backend." ), } CONF_SECTION = f"component {IMPORT_NAME}" class Conference: def __init__( self, parent: "ConferencesComponent", group_name: str, data_file: Path, endpoint: str, status_url: str, session: dict, ) -> None: self.parent = parent self.group_name = group_name self.data_file = data_file self.endpoint = endpoint self.status_url = status_url self._protocol: "GaleneProtocol|None" = None self.connected = defer.Deferred() self.ready = defer.Deferred() self.session = session def __str__(self): return f"conference {self.group_name!r}" @property def protocol(self) -> "GaleneProtocol": assert self._protocol is not None return self._protocol @protocol.setter def protocol(self, protocol: "GaleneProtocol") -> None: self._protocol = protocol self.connected.callback(None) @property def client_id(self) -> str: return self.protocol.client_id @property def _j(self) -> XEP_0166: return self.parent._j @property def _rtp(self) -> XEP_0167: return self.parent._rtp @property def _ice_udp(self) -> XEP_0176: return self.parent._ice_udp @property def client(self) -> SatXMPPEntity: client = self.parent.client assert client is not None return client @property def galene_id(self) -> str: """ID to use with Galène API.""" return f"{self.client_id}-{self.session['id']}" async def on_call_setup( self, client: SatXMPPEntity, session: dict, call_data: dict, ) -> None: if call_data["role"] == self._j.ROLE_INITIATOR: self.send_answer(session["id"], call_data["sdp"]) else: raise exceptions.InternalError( '"role" should be "{self._j.ROLE_INITIATOR}" here.' ) def join(self, user_jid: jid.JID) -> None: self.protocol.send_data( { "type": "join", "kind": "join", "group": self.data_file.stem, "username": user_jid.userhost(), "password": "", } ) def send_request(self, requested: dict | None = None) -> None: if requested is None: requested = {"": ["audio", "video"]} self.protocol.send_data({"type": "request", "request": requested}) def send_offer(self, sdp: str) -> None: self.protocol.send_data( { "type": "offer", "id": self.galene_id, "label": "camera", "username": self.session["peer_jid"].userhost(), "sdp": sdp, } ) def send_answer(self, session_id: str, sdp: str) -> None: data_id = session_id[len(self.client_id) + 1 :] self.protocol.send_data( { "type": "answer", "id": data_id, "label": "camera", "username": self.session["peer_jid"].userhost(), "sdp": sdp, } ) def add_candidate(self, candidate: dict, sdp_mid: str, sdp_mline_index: int) -> None: """Add an ICE candidate. @param candidate: ICE candidate, SDP format. """ self.protocol.send_data( { "type": "ice", "id": self.galene_id, "candidate": { "candidate": mapping.generate_candidate_line(candidate), "sdpMid": sdp_mid, "sdpMLineIndex": sdp_mline_index, }, } ) def on_joined(self, data: dict) -> None: user_jid = jid.JID(data["username"]) match data["kind"]: case "join": log.info(f"{user_jid} has joined {self}.") case "fail": log.warning(f"{user_jid} can't join {self}: {data}") case "change": log.debug(f"Change for {user_jid} in {self}.") case "leave": log.info(f"{user_jid} has left {self}.") self.send_request() def on_answer(self, data: dict) -> None: """Called when SDP answer has been received Send the SDP to ``answer_sdp_d`` to continue workflow. """ try: answer_sdp_d = self.session.pop("answer_sdp_d") except KeyError: raise exceptions.InternalError( '"answer_sdp_d" should be available in session.' ) else: answer_sdp_d.callback(data["sdp"]) def on_offer(self, data: dict) -> None: """Called when a new SFP offer has been received""" defer.ensureDeferred(self.a_on_offer(data)) async def a_on_offer(self, data: dict) -> None: client: SatXMPPEntity = self.client.get_virtual_client(self.session["local_jid"]) call_data = {"sdp": data["sdp"]} try: # FIXME: This assume that all username are coming from XMPP, "source" should # be used instead to be sure to match XMPP users, and other ones should use # a non "xmpp:" URL scheme. user_jid = jid.JID(data["username"]) if not user_jid.user: raise ValueError("Missing user part.") except Exception as e: log.warning("Username is not a JID: {data['username']=}, {e}") else: call_data["metadata"] = {"user": user_jid} sid = await self._rtp.call_start( client, self.session["peer_jid"], call_data, session_id=f"{self.client_id}-{data['id']}", ) session = self._j.get_session(client, sid) session["call_setup_cb"] = self.on_call_setup def on_ice(self, data: dict) -> None: if data["id"] == self.galene_id: session = self.session else: session = self._j.get_session(self.client, f"{self.client_id}-{data['id']}") log.debug( f"ICE candidate for session {session['id']}, peer_jid={session['peer_jid']}." ) candidate_data = data["candidate"] contents = session["contents"] try: content_id = list(contents)[candidate_data["sdpMLineIndex"]] except IndexError: log.error( f"Can't find any content at index {candidate_data['sdpMLineIndex']}." ) return content = contents[content_id] local_ice_data = content["transport_data"]["local_ice_data"] media = content["application_data"]["media"] client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"]) candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split()) defer.ensureDeferred( self._ice_udp.ice_candidates_add( client, session["id"], { media: { "candidates": [candidate], "ufrag": local_ice_data["ufrag"], "pwd": local_ice_data["pwd"], } }, ) ) class GaleneClientFactory(WebSocketClientFactory): def __init__(self, conference: Conference) -> None: self.conference = conference super().__init__(conference.endpoint) class GaleneProtocol(WebSocketClientProtocol): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.client_id = str(uuid()) @property def conference(self) -> Conference: conference: Conference = self.factory.conference assert conference is not None return conference def connectionMade(self) -> None: super().connectionMade() self.conference.protocol = self def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None: super().connectionLost(reason) def onOpen(self): handshake_data = {"type": "handshake", "version": ["2"], "id": self.client_id} self.send_data(handshake_data) def send_data(self, data: dict) -> None: if ConferencesComponent.verbose: log.debug(f"=> DATA SENT [{self.client_id}]: {data}") self.sendMessage(json.dumps(data).encode()) def onMessage(self, payload, isBinary): if isBinary: raise exceptions.DataError("Unexpected binary payload: {payload!r}") try: data = json.loads(payload) except json.JSONDecodeError: log.warning(f"Can't decode data: {payload!r}") return if ConferencesComponent.verbose: log.debug(f"<= DATA RECEIVED [{self.client_id}]: {data}") try: match data.get("type"): case None: log.warning(f'"type" is missing in data: {data!r}') case "handshake": version = data["version"][0] log.debug( f"Handshake for group {self.conference.group_name!r}. Galène " f"protocol v{version}." ) self.conference.ready.callback(None) case "ping": log.debug("pong") self.send_data({"type": "pong"}) case "joined" | "answer" | "ice" | "offer" as data_type: method = getattr(self.conference, f"on_{data_type}") method(data) case _: log.debug(f"Unhandled message: {data}") except (KeyError, IndexError): log.exception(f"Unexpected data format: {data!r}") class ConferencesComponent: IMPORT_NAME = IMPORT_NAME verbose = 0 def __init__(self, host): self.host = host self.client: SatXMPPEntity | None = None self._e: XEP_0106 = host.plugins["XEP-0106"] self._j: XEP_0166 = host.plugins["XEP-0166"] self._rtp: XEP_0167 = host.plugins["XEP-0167"] self._ice_udp: XEP_0176 = host.plugins["XEP-0176"] self._coin: XEP_0298 = host.plugins["XEP-0298"] host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) self.initalized = False def _init(self) -> None: """Initialisation done after profile is connected""" assert self.client is not None self.client.identities.append( disco.DiscoIdentity("conference", "audio-video", NAME) ) try: galene_path = Path( self.host.memory.config_get(CONF_SECTION, "galene_path") or which("galene")[0] ) except IndexError: raise exceptions.NotFound( 'Can\'t find "galene" executable, "conferences" component can\'t be ' "started without it. Please install it in location accessible in PATH, " 'or use "galene_path" setting.' ) self.galene_http_port = self.host.memory.config_get( CONF_SECTION, "http_port", "9443" ) galene_data_path = self.host.memory.get_cache_path(IMPORT_NAME, "galene") galene_static_path = galene_path.parent / "static" self.galene_group_path = galene_data_path / "groups" self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True) env = os.environ if self.verbose >= 2: # We activate Galène debug logs on ICE candidates. env["PION_LOG_TRACE"] = "ice" try: d = self._process = async_process.run( str(galene_path), "-static", str(galene_static_path), "-http", f"127.0.0.1:{self.galene_http_port}", # We don't want HTTPS here, it's only used for local interactions "-insecure", path=str(galene_data_path), verbose=bool(self.verbose), env=env, ) except Exception: log.exception("Can't start Galene.") else: d.addErrback(self._galene_process_errback) log.info(f"Galene instance started on port {self.galene_http_port}.") def get_handler(self, __): return ConferencesHandler() def profile_connecting(self, client: SatXMPPEntity) -> None: self.client = client if not self.initalized: self._init() self.initalized = True async def attach_to_group(self, session: dict, group_name: str) -> Conference: """Attach to a Galène group. Create a group data file if it doesn't exist. Create and attach a Galene client. @param session: Jingle session data. @param group_name: name of the conference group. @return conference: Data of the conference. """ stem = regex.path_escape(group_name) filename = f"{stem}.json" data_file = self.galene_group_path / filename if not data_file.exists(): group_data = { "wildcard-user": { "password": {"type": "wildcard"}, "permissions": "present", }, } with data_file.open("w") as f: json.dump(group_data, f) log.debug(f"Conference data for {group_name!r} created at " f"{data_file} .") url = f"http://localhost:{self.galene_http_port}/group/{quote(stem)}" status_url = f"{url}/.status" log.debug(f"Attaching to Galene.\n{url=}\n{status_url=}") resp = await treq.get(status_url) group_status = await resp.json() log.debug(f"{group_status=}") endpoint = group_status["endpoint"] conference = Conference( parent=self, group_name=group_name, data_file=data_file, endpoint=endpoint, status_url=status_url, session=session, ) factory = GaleneClientFactory(conference) # factory.setProtocolOptions(logOctets=True) factory.protocol = GaleneProtocol reactor.connectTCP("127.0.0.1", int(self.galene_http_port), factory) return conference async def _jingle_handler_trigger( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, desc_elt: domish.Element, ) -> None: if client != self.client: return if action == self._j.A_PREPARE_CONFIRMATION: if "conference" in session: # We have already set up the conference. return local_jid: jid.JID = session["local_jid"] if not local_jid.user: raise StanzaError("forbidden", "A room name must be specified.") group_name = self._e.unescape(local_jid.user) session["conference"] = await self.attach_to_group(session, group_name) session["pre_accepted"] = True session["call_setup_cb"] = self.on_call_setup session["ice_candidates_new_cb"] = self.on_ice_candidates_new async def on_call_setup( self, client: SatXMPPEntity, session: dict, call_data: dict, ) -> None: if self.client is None or client != self.client: raise exceptions.InternalError(f"Unexpected client: {client}") try: conference = session["conference"] except KeyError: raise exceptions.InternalError("Conference data is missing.") if call_data["role"] == self._j.ROLE_RESPONDER: await conference.ready conference.join(session["peer_jid"]) conference.send_offer(call_data["sdp"]) else: raise exceptions.InternalError( '"role" should be "{self._j.ROLE_RESPONDER}" here.' ) def on_ice_candidates_new( self, client: SatXMPPEntity, session: dict, ice_candidates_data: dict[str, dict], ) -> None: try: conference = session["conference"] except KeyError: raise exceptions.InternalError("Conference data is missing.") for media, media_data in ice_candidates_data.items(): for idx, (content_id, content) in enumerate(session["contents"].items()): if content["application_data"]["media"] == media: break else: log.error(f"Can't find corresponding content for {media!r}") continue sdp_mline_index: int = idx sdp_mid: str = content_id for candidate in media_data["candidates"]: conference.add_candidate(candidate, sdp_mid, sdp_mline_index) def _galene_process_errback(self, failure_: Failure) -> None: log.error(f"Can't run Galene process: {failure_.value}") @implementer(iwokkel.IDisco) class ConferencesHandler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_AV_CONFERENCES)]