diff libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4278:240d8b7ad906

component Conferences: implementation of SFU component to make multi-party A/V conferences: This component wrap `Galène` SFU and translate its signaling to XMPP Jingle in both direction. rel 445
author Goffi <goffi@goffi.org>
date Fri, 05 Jul 2024 17:18:37 +0200
parents
children 96fdf4891747
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_comp_conferences/__init__.py	Fri Jul 05 17:18:37 2024 +0200
@@ -0,0 +1,456 @@
+#!/usr/bin/env python3
+
+# Libervia A/V Conferences Component
+# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import json
+from pathlib import Path
+from urllib.parse import quote
+
+from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
+from shortuuid import uuid
+import treq
+from twisted.internet import defer, reactor
+from twisted.internet.error import ConnectionDone
+from twisted.python import failure
+from twisted.python.failure import Failure
+from twisted.python.procutils import which
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
+from libervia.backend.plugins.plugin_xep_0166 import XEP_0166
+from libervia.backend.plugins.plugin_xep_0167 import mapping
+from libervia.backend.plugins.plugin_xep_0176 import XEP_0176
+from libervia.backend.tools.common import async_process, regex
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "conferences"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "A/V Conferences Component",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
+    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "ConferencesComponent",
+    C.PI_HANDLER: C.BOOL_FALSE,
+    C.PI_DESCRIPTION: _(
+        "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n"
+        "The Galène project (https://galene.org) is currently the only supported backend."
+    ),
+}
+
+CONF_SECTION = f"component {IMPORT_NAME}"
+
+
+class Conference:
+
+    def __init__(
+        self,
+        parent: "ConferencesComponent",
+        group_name: str,
+        data_file: Path,
+        endpoint: str,
+        status_url: str,
+    ) -> None:
+        self.parent = parent
+        self.group_name = group_name
+        self.data_file = data_file
+        self.endpoint = endpoint
+        self.status_url = status_url
+        self._protocol: "GaleneProtocol|None" = None
+        self.connected = defer.Deferred()
+        self.ready = defer.Deferred()
+
+    def __str__(self):
+        return f"conference {self.group_name!r}"
+
+    @property
+    def protocol(self) -> "GaleneProtocol":
+        assert self._protocol is not None
+        return self._protocol
+
+    @protocol.setter
+    def protocol(self, protocol: "GaleneProtocol") -> None:
+        self._protocol = protocol
+        self.connected.callback(None)
+
+    @property
+    def _j(self) -> XEP_0166:
+        return self.parent._j
+
+    @property
+    def _ice_udp(self) -> XEP_0176:
+        return self.parent._ice_udp
+
+    @property
+    def client(self) -> SatXMPPEntity:
+        client = self.parent.client
+        assert client is not None
+        return client
+
+    def join(self, user_jid: jid.JID) -> None:
+        self.protocol.send_data(
+            {
+                "type": "join",
+                "kind": "join",
+                "group": self.data_file.stem,
+                "username": user_jid.userhost(),
+                "password": "",
+            }
+        )
+
+    def send_offer(self, session: dict, sdp: str) -> None:
+        self.protocol.send_data(
+            {
+                "type": "offer",
+                "id": session["id"],
+                "label": "camera",
+                "username": session["peer_jid"].userhost(),
+                "sdp": sdp,
+            }
+        )
+
+    def add_candidate(
+        self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int
+    ) -> None:
+        """Add an ICE candidate.
+
+        @param session: Jingle session.
+        @param candidate: ICE candidate, SDP format.
+        """
+        self.protocol.send_data(
+            {
+                "type": "ice",
+                "id": session["id"],
+                "candidate": {
+                    "candidate": mapping.generate_candidate_line(candidate),
+                    "sdpMid": sdp_mid,
+                    "sdpMLineIndex": sdp_mline_index,
+                },
+            }
+        )
+
+    def on_joined(self, data: dict) -> None:
+        user_jid = jid.JID(data["username"])
+        match data["kind"]:
+            case "join":
+                log.info(f"{user_jid} has joined {self}.")
+            case "fail":
+                log.warning(f"{user_jid} can't join {self}: {data}")
+            case "change":
+                log.debug(f"Change for {user_jid} in {self}.")
+            case "leave":
+                log.info(f"{user_jid} has left {self}.")
+
+    def on_answer(self, data: dict) -> None:
+        """Called when SDP answer has been received
+
+        Send the SDP to ``answer_sdp_d`` to continue workflow.
+        """
+        session = self._j.get_session(self.client, data["id"])
+        try:
+            answer_sdp_d = session.pop("answer_sdp_d")
+        except KeyError:
+            raise exceptions.InternalError(
+                '"answer_sdp_d" should be available in session.'
+            )
+        else:
+            answer_sdp_d.callback(data["sdp"])
+
+    def on_ice(self, data: dict) -> None:
+        log.debug(f"ICE candidate: {data}")
+        session = self._j.get_session(self.client, data["id"])
+        candidate_data = data["candidate"]
+        contents = session["contents"]
+        try:
+            content_id = list(contents)[candidate_data["sdpMLineIndex"]]
+        except IndexError:
+            log.error(
+                f"Can't find any content at index {candidate_data['sdpMLineIndex']}."
+            )
+            return
+        content = contents[content_id]
+        local_ice_data = content["transport_data"]["local_ice_data"]
+        media = content["application_data"]["media"]
+        client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"])
+        candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split())
+        defer.ensureDeferred(
+            self._ice_udp.ice_candidates_add(
+                client,
+                session["id"],
+                {
+                    media: {
+                        "candidates": [candidate],
+                        "ufrag": local_ice_data["ufrag"],
+                        "pwd": local_ice_data["pwd"],
+                    }
+                },
+            )
+        )
+
+
+class GaleneClientFactory(WebSocketClientFactory):
+
+    def __init__(self, conference: Conference) -> None:
+        self.conference = conference
+        super().__init__(conference.endpoint)
+
+
+class GaleneProtocol(WebSocketClientProtocol):
+    verbose = True
+
+    @property
+    def conference(self) -> Conference:
+        conference: Conference = self.factory.conference
+        assert conference is not None
+        return conference
+
+    def connectionMade(self) -> None:
+        super().connectionMade()
+        self.conference.protocol = self
+
+    def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None:
+        super().connectionLost(reason)
+
+    def onOpen(self):
+        handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())}
+        self.send_data(handshake_data)
+
+    def send_data(self, data: dict) -> None:
+        if self.verbose:
+            log.debug(f"DATA SENT: {data}")
+        self.sendMessage(json.dumps(data).encode())
+
+    def onMessage(self, payload, isBinary):
+        if isBinary:
+            raise exceptions.DataError("Unexpected binary payload: {payload!r}")
+        try:
+            data = json.loads(payload)
+        except json.JSONDecodeError:
+            log.warning(f"Can't decode data: {payload!r}")
+            return
+
+        try:
+            match data.get("type"):
+                case None:
+                    log.warning(f'"type" is missing in data: {data!r}')
+                case "handshake":
+                    version = data["version"][0]
+
+                    log.debug(
+                        f"Handshake for group {self.conference.group_name!r}. Galène protocol "
+                        f" v{version}."
+                    )
+                    self.conference.ready.callback(None)
+                case "ping":
+                    log.debug("pong")
+                    self.send_data({"type": "pong"})
+                case "joined" | "answer" | "ice" as data_type:
+                    method = getattr(self.conference, f"on_{data_type}")
+                    method(data)
+                case _:
+                    log.debug(f"Unhandled message: {data}")
+        except (KeyError, IndexError):
+            log.exception(f"Unexpected data format: {data!r}")
+
+
+class ConferencesComponent:
+    IMPORT_NAME = IMPORT_NAME
+
+    def __init__(self, host):
+        self.host = host
+        self.client: SatXMPPEntity | None = None
+        self._e: XEP_0106 = host.plugins["XEP-0106"]
+        self._j: XEP_0166 = host.plugins["XEP-0166"]
+        self._ice_udp: XEP_0176 = host.plugins["XEP-0176"]
+        host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)
+        try:
+            galene_path = Path(
+                self.host.memory.config_get(CONF_SECTION, "galene_path")
+                or which("galene")[0]
+            )
+        except IndexError:
+            raise exceptions.NotFound(
+                'Can\'t find "galene" executable, "conferences" component can\'t be '
+                "started without it. Please install it in location accessible in PATH, "
+                'or use "galene_path" setting.'
+            )
+        self.galene_http_port = self.host.memory.config_get(
+            CONF_SECTION, "http_port", "9443"
+        )
+        galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene")
+        galene_static_path = galene_path.parent / "static"
+        self.galene_group_path = galene_data_path / "groups"
+        self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True)
+        try:
+            d = self._process = async_process.run(
+                str(galene_path),
+                "-static",
+                str(galene_static_path),
+                "-http",
+                f"127.0.0.1:{self.galene_http_port}",
+                # We don't want HTTPS here, it's only used for local interactions
+                "-insecure",
+                path=str(galene_data_path),
+                verbose=True,
+            )
+        except Exception:
+            log.exception("Can't start Galene.")
+        else:
+            d.addErrback(self._galene_process_errback)
+            log.info(f"Galene instance started on port {self.galene_http_port}.")
+
+    def get_handler(self, __):
+        return ConferencesHandler()
+
+    def profile_connecting(self, client):
+        self.client = client
+
+    async def attach_to_group(self, session: dict, group_name: str) -> Conference:
+        """Attach to a Galène group.
+
+        Create a group data file if it doesn't exist.
+        Create and attach a Galene client.
+
+        @param session: Jingle session data.
+        @param group_name: name of the conference group.
+        @return conference: Data of the conference.
+        """
+        stem = regex.path_escape(group_name)
+        filename = f"{stem}.json"
+        data_file = self.galene_group_path / filename
+        if not data_file.exists():
+            group_data = {
+                "wildcard-user": {
+                    "password": {"type": "wildcard"},
+                    "permissions": "present",
+                },
+            }
+            with data_file.open("w") as f:
+                json.dump(group_data, f)
+            log.debug(f"Conference data for {group_name!r} created at " f"{data_file} .")
+
+        url = f"http://localhost:{self.galene_http_port}/group/{quote(stem)}"
+        status_url = f"{url}/.status"
+
+        log.debug(f"Attaching to Galene.\n{url=}\n{status_url=}")
+        resp = await treq.get(status_url)
+        group_status = await resp.json()
+        log.debug(f"{group_status=}")
+        endpoint = group_status["endpoint"]
+        conference = Conference(
+            parent=self,
+            group_name=group_name,
+            data_file=data_file,
+            endpoint=endpoint,
+            status_url=status_url,
+        )
+
+        factory = GaleneClientFactory(conference)
+        # factory.setProtocolOptions(logOctets=True)
+        factory.protocol = GaleneProtocol
+        reactor.connectTCP("127.0.0.1", int(self.galene_http_port), factory)
+
+        return conference
+
+    async def _jingle_handler_trigger(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        desc_elt: domish.Element,
+    ) -> None:
+        if client != self.client:
+            return
+        if action == self._j.A_PREPARE_CONFIRMATION:
+            if "conference" in session:
+                # We have already set up the conference.
+                return
+            local_jid: jid.JID = session["local_jid"]
+            if not local_jid.user:
+                raise StanzaError("forbidden", "A room name must be specified.")
+            group_name = self._e.unescape(local_jid.user)
+
+            session["conference"] = await self.attach_to_group(session, group_name)
+            session["pre_accepted"] = True
+            session["call_setup_cb"] = self.on_call_setup
+            session["ice_candidates_new_cb"] = self.on_ice_candidates_new
+
+    async def on_call_setup(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        call_data: dict,
+    ) -> None:
+        if self.client is None or client != self.client:
+            raise exceptions.InternalError(f"Unexpected client: {client}")
+        try:
+            conference = session["conference"]
+        except KeyError:
+            raise exceptions.InternalError("Conference data is missing.")
+
+        await conference.ready
+        conference.join(session["peer_jid"])
+        conference.send_offer(session, call_data["sdp"])
+
+    def on_ice_candidates_new(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        ice_candidates_data: dict[str, dict],
+    ) -> None:
+        try:
+            conference = session["conference"]
+        except KeyError:
+            raise exceptions.InternalError("Conference data is missing.")
+        for media, media_data in ice_candidates_data.items():
+            for idx, (content_id, content) in enumerate(session["contents"].items()):
+                if content["application_data"]["media"] == media:
+                    break
+            else:
+                log.error(f"Can't find corresponding content for {media!r}")
+                continue
+            sdp_mline_index: int = idx
+            sdp_mid: str = content_id
+
+            for candidate in media_data["candidates"]:
+                conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index)
+
+    def _galene_process_errback(self, failure_: Failure) -> None:
+        log.error(f"Can't run Galene process: {failure_.value}")
+
+
+@implementer(iwokkel.IDisco)
+class ConferencesHandler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoIdentity("conference", "audio-video")]