view libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4341:e9971a4b0627

remove uses of twisted.internet.defer.returnValue This function has been deprecated in Twisted 24.7.0.
author Povilas Kanapickas <povilas@radix.lt>
date Wed, 11 Dec 2024 01:17:09 +0200
parents a0ed5c976bf8
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia A/V Conferences Component
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import json
import os
from pathlib import Path
from urllib.parse import quote

from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
from shortuuid import uuid
import treq
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectionDone
from twisted.python import failure
from twisted.python.failure import Failure
from twisted.python.procutils import which
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from libervia.backend.plugins.plugin_xep_0166 import XEP_0166
from libervia.backend.plugins.plugin_xep_0167 import XEP_0167, mapping, NS_AV_CONFERENCES
from libervia.backend.plugins.plugin_xep_0176 import XEP_0176
from libervia.backend.plugins.plugin_xep_0298 import XEP_0298
from libervia.backend.tools.common import async_process, regex


log = getLogger(__name__)

IMPORT_NAME = "conferences"
NAME = "Libervia A/V Conferences"

PLUGIN_INFO = {
    C.PI_NAME: "A/V Conferences Component",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176", "XEP-0298"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "ConferencesComponent",
    C.PI_HANDLER: C.BOOL_TRUE,
    C.PI_DESCRIPTION: _(
        "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n"
        "The Galène project (https://galene.org) is currently the only supported backend."
    ),
}

CONF_SECTION = f"component {IMPORT_NAME}"


class Conference:

    def __init__(
        self,
        parent: "ConferencesComponent",
        group_name: str,
        data_file: Path,
        endpoint: str,
        status_url: str,
        session: dict,
    ) -> None:
        self.parent = parent
        self.group_name = group_name
        self.data_file = data_file
        self.endpoint = endpoint
        self.status_url = status_url
        self._protocol: "GaleneProtocol|None" = None
        self.connected = defer.Deferred()
        self.ready = defer.Deferred()
        self.session = session

    def __str__(self):
        return f"conference {self.group_name!r}"

    @property
    def protocol(self) -> "GaleneProtocol":
        assert self._protocol is not None
        return self._protocol

    @protocol.setter
    def protocol(self, protocol: "GaleneProtocol") -> None:
        self._protocol = protocol
        self.connected.callback(None)

    @property
    def client_id(self) -> str:
        return self.protocol.client_id

    @property
    def _j(self) -> XEP_0166:
        return self.parent._j

    @property
    def _rtp(self) -> XEP_0167:
        return self.parent._rtp

    @property
    def _ice_udp(self) -> XEP_0176:
        return self.parent._ice_udp

    @property
    def client(self) -> SatXMPPEntity:
        client = self.parent.client
        assert client is not None
        return client

    @property
    def galene_id(self) -> str:
        """ID to use with Galène API."""
        return f"{self.client_id}-{self.session['id']}"

    async def on_call_setup(
        self,
        client: SatXMPPEntity,
        session: dict,
        call_data: dict,
    ) -> None:
        if call_data["role"] == self._j.ROLE_INITIATOR:
            self.send_answer(session["id"], call_data["sdp"])
        else:
            raise exceptions.InternalError(
                '"role" should be "{self._j.ROLE_INITIATOR}" here.'
            )

    def join(self, user_jid: jid.JID) -> None:
        self.protocol.send_data(
            {
                "type": "join",
                "kind": "join",
                "group": self.data_file.stem,
                "username": user_jid.userhost(),
                "password": "",
            }
        )

    def send_request(self, requested: dict | None = None) -> None:
        if requested is None:
            requested = {"": ["audio", "video"]}
        self.protocol.send_data({"type": "request", "request": requested})

    def send_offer(self, sdp: str) -> None:
        self.protocol.send_data(
            {
                "type": "offer",
                "id": self.galene_id,
                "label": "camera",
                "username": self.session["peer_jid"].userhost(),
                "sdp": sdp,
            }
        )

    def send_answer(self, session_id: str, sdp: str) -> None:
        data_id = session_id[len(self.client_id) + 1 :]
        self.protocol.send_data(
            {
                "type": "answer",
                "id": data_id,
                "label": "camera",
                "username": self.session["peer_jid"].userhost(),
                "sdp": sdp,
            }
        )

    def add_candidate(self, candidate: dict, sdp_mid: str, sdp_mline_index: int) -> None:
        """Add an ICE candidate.

        @param candidate: ICE candidate, SDP format.
        """
        self.protocol.send_data(
            {
                "type": "ice",
                "id": self.galene_id,
                "candidate": {
                    "candidate": mapping.generate_candidate_line(candidate),
                    "sdpMid": sdp_mid,
                    "sdpMLineIndex": sdp_mline_index,
                },
            }
        )

    def on_joined(self, data: dict) -> None:
        user_jid = jid.JID(data["username"])
        match data["kind"]:
            case "join":
                log.info(f"{user_jid} has joined {self}.")
            case "fail":
                log.warning(f"{user_jid} can't join {self}: {data}")
            case "change":
                log.debug(f"Change for {user_jid} in {self}.")
            case "leave":
                log.info(f"{user_jid} has left {self}.")
        self.send_request()

    def on_answer(self, data: dict) -> None:
        """Called when SDP answer has been received

        Send the SDP to ``answer_sdp_d`` to continue workflow.
        """
        try:
            answer_sdp_d = self.session.pop("answer_sdp_d")
        except KeyError:
            raise exceptions.InternalError(
                '"answer_sdp_d" should be available in session.'
            )
        else:
            answer_sdp_d.callback(data["sdp"])

    def on_offer(self, data: dict) -> None:
        """Called when a new SFP offer has been received"""
        defer.ensureDeferred(self.a_on_offer(data))

    async def a_on_offer(self, data: dict) -> None:
        client: SatXMPPEntity = self.client.get_virtual_client(self.session["local_jid"])
        call_data = {"sdp": data["sdp"]}
        try:
            # FIXME: This assume that all username are coming from XMPP, "source" should
            #   be used instead to be sure to match XMPP users, and other ones should use
            #   a non "xmpp:" URL scheme.
            user_jid = jid.JID(data["username"])
            if not user_jid.user:
                raise ValueError("Missing user part.")
        except Exception as e:
            log.warning("Username is not a JID: {data['username']=}, {e}")
        else:
            call_data["metadata"] = {"user": user_jid}
        sid = await self._rtp.call_start(
            client,
            self.session["peer_jid"],
            call_data,
            session_id=f"{self.client_id}-{data['id']}",
        )
        session = self._j.get_session(client, sid)
        session["call_setup_cb"] = self.on_call_setup

    def on_ice(self, data: dict) -> None:
        if data["id"] == self.galene_id:
            session = self.session
        else:
            session = self._j.get_session(self.client, f"{self.client_id}-{data['id']}")
        log.debug(
            f"ICE candidate for session {session['id']}, peer_jid={session['peer_jid']}."
        )
        candidate_data = data["candidate"]
        contents = session["contents"]
        try:
            content_id = list(contents)[candidate_data["sdpMLineIndex"]]
        except IndexError:
            log.error(
                f"Can't find any content at index {candidate_data['sdpMLineIndex']}."
            )
            return
        content = contents[content_id]
        local_ice_data = content["transport_data"]["local_ice_data"]

        media = content["application_data"]["media"]
        client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"])
        candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split())
        defer.ensureDeferred(
            self._ice_udp.ice_candidates_add(
                client,
                session["id"],
                {
                    media: {
                        "candidates": [candidate],
                        "ufrag": local_ice_data["ufrag"],
                        "pwd": local_ice_data["pwd"],
                    }
                },
            )
        )


class GaleneClientFactory(WebSocketClientFactory):

    def __init__(self, conference: Conference) -> None:
        self.conference = conference
        super().__init__(conference.endpoint)


class GaleneProtocol(WebSocketClientProtocol):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client_id = str(uuid())

    @property
    def conference(self) -> Conference:
        conference: Conference = self.factory.conference
        assert conference is not None
        return conference

    def connectionMade(self) -> None:
        super().connectionMade()
        self.conference.protocol = self

    def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None:
        super().connectionLost(reason)

    def onOpen(self):
        handshake_data = {"type": "handshake", "version": ["2"], "id": self.client_id}
        self.send_data(handshake_data)

    def send_data(self, data: dict) -> None:
        if ConferencesComponent.verbose:
            log.debug(f"=> DATA SENT [{self.client_id}]: {data}")
        self.sendMessage(json.dumps(data).encode())

    def onMessage(self, payload, isBinary):
        if isBinary:
            raise exceptions.DataError("Unexpected binary payload: {payload!r}")
        try:
            data = json.loads(payload)
        except json.JSONDecodeError:
            log.warning(f"Can't decode data: {payload!r}")
            return

        if ConferencesComponent.verbose:
            log.debug(f"<= DATA RECEIVED [{self.client_id}]: {data}")
        try:
            match data.get("type"):
                case None:
                    log.warning(f'"type" is missing in data: {data!r}')
                case "handshake":
                    version = data["version"][0]

                    log.debug(
                        f"Handshake for group {self.conference.group_name!r}. Galène "
                        f"protocol  v{version}."
                    )
                    self.conference.ready.callback(None)
                case "ping":
                    log.debug("pong")
                    self.send_data({"type": "pong"})
                case "joined" | "answer" | "ice" | "offer" as data_type:
                    method = getattr(self.conference, f"on_{data_type}")
                    method(data)
                case _:
                    log.debug(f"Unhandled message: {data}")
        except (KeyError, IndexError):
            log.exception(f"Unexpected data format: {data!r}")


class ConferencesComponent:
    IMPORT_NAME = IMPORT_NAME
    verbose = 0

    def __init__(self, host):
        self.host = host
        self.client: SatXMPPEntity | None = None
        self._e: XEP_0106 = host.plugins["XEP-0106"]
        self._j: XEP_0166 = host.plugins["XEP-0166"]
        self._rtp: XEP_0167 = host.plugins["XEP-0167"]
        self._ice_udp: XEP_0176 = host.plugins["XEP-0176"]
        self._coin: XEP_0298 = host.plugins["XEP-0298"]
        host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)
        self.initalized = False

    def _init(self) -> None:
        """Initialisation done after profile is connected"""
        assert self.client is not None
        self.client.identities.append(
            disco.DiscoIdentity("conference", "audio-video", NAME)
        )

        try:
            galene_path = Path(
                self.host.memory.config_get(CONF_SECTION, "galene_path")
                or which("galene")[0]
            )
        except IndexError:
            raise exceptions.NotFound(
                'Can\'t find "galene" executable, "conferences" component can\'t be '
                "started without it. Please install it in location accessible in PATH, "
                'or use "galene_path" setting.'
            )
        self.galene_http_port = self.host.memory.config_get(
            CONF_SECTION, "http_port", "9443"
        )
        galene_data_path = self.host.memory.get_cache_path(IMPORT_NAME, "galene")
        galene_static_path = galene_path.parent / "static"
        self.galene_group_path = galene_data_path / "groups"
        self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True)
        env = os.environ
        if self.verbose >= 2:
            # We activate Galène debug logs on ICE candidates.
            env["PION_LOG_TRACE"] = "ice"

        try:
            d = self._process = async_process.run(
                str(galene_path),
                "-static",
                str(galene_static_path),
                "-http",
                f"127.0.0.1:{self.galene_http_port}",
                # We don't want HTTPS here, it's only used for local interactions
                "-insecure",
                path=str(galene_data_path),
                verbose=bool(self.verbose),
                env=env,
            )
        except Exception:
            log.exception("Can't start Galene.")
        else:
            d.addErrback(self._galene_process_errback)
            log.info(f"Galene instance started on port {self.galene_http_port}.")

    def get_handler(self, __):
        return ConferencesHandler()

    def profile_connecting(self, client: SatXMPPEntity) -> None:
        self.client = client
        if not self.initalized:
            self._init()
            self.initalized = True

    async def attach_to_group(self, session: dict, group_name: str) -> Conference:
        """Attach to a Galène group.

        Create a group data file if it doesn't exist.
        Create and attach a Galene client.

        @param session: Jingle session data.
        @param group_name: name of the conference group.
        @return conference: Data of the conference.
        """
        stem = regex.path_escape(group_name)
        filename = f"{stem}.json"
        data_file = self.galene_group_path / filename
        if not data_file.exists():
            group_data = {
                "wildcard-user": {
                    "password": {"type": "wildcard"},
                    "permissions": "present",
                },
            }
            with data_file.open("w") as f:
                json.dump(group_data, f)
            log.debug(f"Conference data for {group_name!r} created at " f"{data_file} .")

        url = f"http://localhost:{self.galene_http_port}/group/{quote(stem)}"
        status_url = f"{url}/.status"

        log.debug(f"Attaching to Galene.\n{url=}\n{status_url=}")
        resp = await treq.get(status_url)
        group_status = await resp.json()
        log.debug(f"{group_status=}")
        endpoint = group_status["endpoint"]
        conference = Conference(
            parent=self,
            group_name=group_name,
            data_file=data_file,
            endpoint=endpoint,
            status_url=status_url,
            session=session,
        )

        factory = GaleneClientFactory(conference)
        # factory.setProtocolOptions(logOctets=True)
        factory.protocol = GaleneProtocol
        reactor.connectTCP("127.0.0.1", int(self.galene_http_port), factory)

        return conference

    async def _jingle_handler_trigger(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        desc_elt: domish.Element,
    ) -> None:
        if client != self.client:
            return
        if action == self._j.A_PREPARE_CONFIRMATION:
            if "conference" in session:
                # We have already set up the conference.
                return
            local_jid: jid.JID = session["local_jid"]
            if not local_jid.user:
                raise StanzaError("forbidden", "A room name must be specified.")
            group_name = self._e.unescape(local_jid.user)

            session["conference"] = await self.attach_to_group(session, group_name)
            session["pre_accepted"] = True
            session["call_setup_cb"] = self.on_call_setup
            session["ice_candidates_new_cb"] = self.on_ice_candidates_new

    async def on_call_setup(
        self,
        client: SatXMPPEntity,
        session: dict,
        call_data: dict,
    ) -> None:
        if self.client is None or client != self.client:
            raise exceptions.InternalError(f"Unexpected client: {client}")
        try:
            conference = session["conference"]
        except KeyError:
            raise exceptions.InternalError("Conference data is missing.")

        if call_data["role"] == self._j.ROLE_RESPONDER:
            await conference.ready
            conference.join(session["peer_jid"])
            conference.send_offer(call_data["sdp"])
        else:
            raise exceptions.InternalError(
                '"role" should be "{self._j.ROLE_RESPONDER}" here.'
            )

    def on_ice_candidates_new(
        self,
        client: SatXMPPEntity,
        session: dict,
        ice_candidates_data: dict[str, dict],
    ) -> None:
        try:
            conference = session["conference"]
        except KeyError:
            raise exceptions.InternalError("Conference data is missing.")
        for media, media_data in ice_candidates_data.items():
            for idx, (content_id, content) in enumerate(session["contents"].items()):
                if content["application_data"]["media"] == media:
                    break
            else:
                log.error(f"Can't find corresponding content for {media!r}")
                continue
            sdp_mline_index: int = idx
            sdp_mid: str = content_id

            for candidate in media_data["candidates"]:
                conference.add_candidate(candidate, sdp_mid, sdp_mline_index)

    def _galene_process_errback(self, failure_: Failure) -> None:
        log.error(f"Can't run Galene process: {failure_.value}")


@implementer(iwokkel.IDisco)
class ConferencesHandler(XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_AV_CONFERENCES)]