view libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4278:240d8b7ad906

component Conferences: implementation of SFU component to make multi-party A/V conferences: This component wrap `Galène` SFU and translate its signaling to XMPP Jingle in both direction. rel 445
author Goffi <goffi@goffi.org>
date Fri, 05 Jul 2024 17:18:37 +0200
parents
children 96fdf4891747
line wrap: on
line source

#!/usr/bin/env python3

# Libervia A/V Conferences Component
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import json
from pathlib import Path
from urllib.parse import quote

from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
from shortuuid import uuid
import treq
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectionDone
from twisted.python import failure
from twisted.python.failure import Failure
from twisted.python.procutils import which
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from libervia.backend.plugins.plugin_xep_0166 import XEP_0166
from libervia.backend.plugins.plugin_xep_0167 import mapping
from libervia.backend.plugins.plugin_xep_0176 import XEP_0176
from libervia.backend.tools.common import async_process, regex


log = getLogger(__name__)

IMPORT_NAME = "conferences"

PLUGIN_INFO = {
    C.PI_NAME: "A/V Conferences Component",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "ConferencesComponent",
    C.PI_HANDLER: C.BOOL_FALSE,
    C.PI_DESCRIPTION: _(
        "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n"
        "The Galène project (https://galene.org) is currently the only supported backend."
    ),
}

CONF_SECTION = f"component {IMPORT_NAME}"


class Conference:

    def __init__(
        self,
        parent: "ConferencesComponent",
        group_name: str,
        data_file: Path,
        endpoint: str,
        status_url: str,
    ) -> None:
        self.parent = parent
        self.group_name = group_name
        self.data_file = data_file
        self.endpoint = endpoint
        self.status_url = status_url
        self._protocol: "GaleneProtocol|None" = None
        self.connected = defer.Deferred()
        self.ready = defer.Deferred()

    def __str__(self):
        return f"conference {self.group_name!r}"

    @property
    def protocol(self) -> "GaleneProtocol":
        assert self._protocol is not None
        return self._protocol

    @protocol.setter
    def protocol(self, protocol: "GaleneProtocol") -> None:
        self._protocol = protocol
        self.connected.callback(None)

    @property
    def _j(self) -> XEP_0166:
        return self.parent._j

    @property
    def _ice_udp(self) -> XEP_0176:
        return self.parent._ice_udp

    @property
    def client(self) -> SatXMPPEntity:
        client = self.parent.client
        assert client is not None
        return client

    def join(self, user_jid: jid.JID) -> None:
        self.protocol.send_data(
            {
                "type": "join",
                "kind": "join",
                "group": self.data_file.stem,
                "username": user_jid.userhost(),
                "password": "",
            }
        )

    def send_offer(self, session: dict, sdp: str) -> None:
        self.protocol.send_data(
            {
                "type": "offer",
                "id": session["id"],
                "label": "camera",
                "username": session["peer_jid"].userhost(),
                "sdp": sdp,
            }
        )

    def add_candidate(
        self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int
    ) -> None:
        """Add an ICE candidate.

        @param session: Jingle session.
        @param candidate: ICE candidate, SDP format.
        """
        self.protocol.send_data(
            {
                "type": "ice",
                "id": session["id"],
                "candidate": {
                    "candidate": mapping.generate_candidate_line(candidate),
                    "sdpMid": sdp_mid,
                    "sdpMLineIndex": sdp_mline_index,
                },
            }
        )

    def on_joined(self, data: dict) -> None:
        user_jid = jid.JID(data["username"])
        match data["kind"]:
            case "join":
                log.info(f"{user_jid} has joined {self}.")
            case "fail":
                log.warning(f"{user_jid} can't join {self}: {data}")
            case "change":
                log.debug(f"Change for {user_jid} in {self}.")
            case "leave":
                log.info(f"{user_jid} has left {self}.")

    def on_answer(self, data: dict) -> None:
        """Called when SDP answer has been received

        Send the SDP to ``answer_sdp_d`` to continue workflow.
        """
        session = self._j.get_session(self.client, data["id"])
        try:
            answer_sdp_d = session.pop("answer_sdp_d")
        except KeyError:
            raise exceptions.InternalError(
                '"answer_sdp_d" should be available in session.'
            )
        else:
            answer_sdp_d.callback(data["sdp"])

    def on_ice(self, data: dict) -> None:
        log.debug(f"ICE candidate: {data}")
        session = self._j.get_session(self.client, data["id"])
        candidate_data = data["candidate"]
        contents = session["contents"]
        try:
            content_id = list(contents)[candidate_data["sdpMLineIndex"]]
        except IndexError:
            log.error(
                f"Can't find any content at index {candidate_data['sdpMLineIndex']}."
            )
            return
        content = contents[content_id]
        local_ice_data = content["transport_data"]["local_ice_data"]
        media = content["application_data"]["media"]
        client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"])
        candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split())
        defer.ensureDeferred(
            self._ice_udp.ice_candidates_add(
                client,
                session["id"],
                {
                    media: {
                        "candidates": [candidate],
                        "ufrag": local_ice_data["ufrag"],
                        "pwd": local_ice_data["pwd"],
                    }
                },
            )
        )


class GaleneClientFactory(WebSocketClientFactory):

    def __init__(self, conference: Conference) -> None:
        self.conference = conference
        super().__init__(conference.endpoint)


class GaleneProtocol(WebSocketClientProtocol):
    verbose = True

    @property
    def conference(self) -> Conference:
        conference: Conference = self.factory.conference
        assert conference is not None
        return conference

    def connectionMade(self) -> None:
        super().connectionMade()
        self.conference.protocol = self

    def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None:
        super().connectionLost(reason)

    def onOpen(self):
        handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())}
        self.send_data(handshake_data)

    def send_data(self, data: dict) -> None:
        if self.verbose:
            log.debug(f"DATA SENT: {data}")
        self.sendMessage(json.dumps(data).encode())

    def onMessage(self, payload, isBinary):
        if isBinary:
            raise exceptions.DataError("Unexpected binary payload: {payload!r}")
        try:
            data = json.loads(payload)
        except json.JSONDecodeError:
            log.warning(f"Can't decode data: {payload!r}")
            return

        try:
            match data.get("type"):
                case None:
                    log.warning(f'"type" is missing in data: {data!r}')
                case "handshake":
                    version = data["version"][0]

                    log.debug(
                        f"Handshake for group {self.conference.group_name!r}. Galène protocol "
                        f" v{version}."
                    )
                    self.conference.ready.callback(None)
                case "ping":
                    log.debug("pong")
                    self.send_data({"type": "pong"})
                case "joined" | "answer" | "ice" as data_type:
                    method = getattr(self.conference, f"on_{data_type}")
                    method(data)
                case _:
                    log.debug(f"Unhandled message: {data}")
        except (KeyError, IndexError):
            log.exception(f"Unexpected data format: {data!r}")


class ConferencesComponent:
    IMPORT_NAME = IMPORT_NAME

    def __init__(self, host):
        self.host = host
        self.client: SatXMPPEntity | None = None
        self._e: XEP_0106 = host.plugins["XEP-0106"]
        self._j: XEP_0166 = host.plugins["XEP-0166"]
        self._ice_udp: XEP_0176 = host.plugins["XEP-0176"]
        host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)
        try:
            galene_path = Path(
                self.host.memory.config_get(CONF_SECTION, "galene_path")
                or which("galene")[0]
            )
        except IndexError:
            raise exceptions.NotFound(
                'Can\'t find "galene" executable, "conferences" component can\'t be '
                "started without it. Please install it in location accessible in PATH, "
                'or use "galene_path" setting.'
            )
        self.galene_http_port = self.host.memory.config_get(
            CONF_SECTION, "http_port", "9443"
        )
        galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene")
        galene_static_path = galene_path.parent / "static"
        self.galene_group_path = galene_data_path / "groups"
        self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True)
        try:
            d = self._process = async_process.run(
                str(galene_path),
                "-static",
                str(galene_static_path),
                "-http",
                f"127.0.0.1:{self.galene_http_port}",
                # We don't want HTTPS here, it's only used for local interactions
                "-insecure",
                path=str(galene_data_path),
                verbose=True,
            )
        except Exception:
            log.exception("Can't start Galene.")
        else:
            d.addErrback(self._galene_process_errback)
            log.info(f"Galene instance started on port {self.galene_http_port}.")

    def get_handler(self, __):
        return ConferencesHandler()

    def profile_connecting(self, client):
        self.client = client

    async def attach_to_group(self, session: dict, group_name: str) -> Conference:
        """Attach to a Galène group.

        Create a group data file if it doesn't exist.
        Create and attach a Galene client.

        @param session: Jingle session data.
        @param group_name: name of the conference group.
        @return conference: Data of the conference.
        """
        stem = regex.path_escape(group_name)
        filename = f"{stem}.json"
        data_file = self.galene_group_path / filename
        if not data_file.exists():
            group_data = {
                "wildcard-user": {
                    "password": {"type": "wildcard"},
                    "permissions": "present",
                },
            }
            with data_file.open("w") as f:
                json.dump(group_data, f)
            log.debug(f"Conference data for {group_name!r} created at " f"{data_file} .")

        url = f"http://localhost:{self.galene_http_port}/group/{quote(stem)}"
        status_url = f"{url}/.status"

        log.debug(f"Attaching to Galene.\n{url=}\n{status_url=}")
        resp = await treq.get(status_url)
        group_status = await resp.json()
        log.debug(f"{group_status=}")
        endpoint = group_status["endpoint"]
        conference = Conference(
            parent=self,
            group_name=group_name,
            data_file=data_file,
            endpoint=endpoint,
            status_url=status_url,
        )

        factory = GaleneClientFactory(conference)
        # factory.setProtocolOptions(logOctets=True)
        factory.protocol = GaleneProtocol
        reactor.connectTCP("127.0.0.1", int(self.galene_http_port), factory)

        return conference

    async def _jingle_handler_trigger(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        desc_elt: domish.Element,
    ) -> None:
        if client != self.client:
            return
        if action == self._j.A_PREPARE_CONFIRMATION:
            if "conference" in session:
                # We have already set up the conference.
                return
            local_jid: jid.JID = session["local_jid"]
            if not local_jid.user:
                raise StanzaError("forbidden", "A room name must be specified.")
            group_name = self._e.unescape(local_jid.user)

            session["conference"] = await self.attach_to_group(session, group_name)
            session["pre_accepted"] = True
            session["call_setup_cb"] = self.on_call_setup
            session["ice_candidates_new_cb"] = self.on_ice_candidates_new

    async def on_call_setup(
        self,
        client: SatXMPPEntity,
        session: dict,
        call_data: dict,
    ) -> None:
        if self.client is None or client != self.client:
            raise exceptions.InternalError(f"Unexpected client: {client}")
        try:
            conference = session["conference"]
        except KeyError:
            raise exceptions.InternalError("Conference data is missing.")

        await conference.ready
        conference.join(session["peer_jid"])
        conference.send_offer(session, call_data["sdp"])

    def on_ice_candidates_new(
        self,
        client: SatXMPPEntity,
        session: dict,
        ice_candidates_data: dict[str, dict],
    ) -> None:
        try:
            conference = session["conference"]
        except KeyError:
            raise exceptions.InternalError("Conference data is missing.")
        for media, media_data in ice_candidates_data.items():
            for idx, (content_id, content) in enumerate(session["contents"].items()):
                if content["application_data"]["media"] == media:
                    break
            else:
                log.error(f"Can't find corresponding content for {media!r}")
                continue
            sdp_mline_index: int = idx
            sdp_mid: str = content_id

            for candidate in media_data["candidates"]:
                conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index)

    def _galene_process_errback(self, failure_: Failure) -> None:
        log.error(f"Can't run Galene process: {failure_.value}")


@implementer(iwokkel.IDisco)
class ConferencesHandler(XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoIdentity("conference", "audio-video")]