view libervia/backend/plugins/plugin_xep_0272.py @ 4341:e9971a4b0627

remove uses of twisted.internet.defer.returnValue This function has been deprecated in Twisted 24.7.0.
author Povilas Kanapickas <povilas@radix.lt>
date Wed, 11 Dec 2024 01:17:09 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from wokkel import muc
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.plugins import plugin_xep_0166
from libervia.backend.plugins import plugin_xep_0167
from libervia.backend.plugins.plugin_xep_0167 import mapping
from libervia.backend.tools.common import data_format

from . import plugin_xep_0045, plugin_xep_0249

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Multiparty Jingle (Muji)",
    C.PI_IMPORT_NAME: "XEP-0272",
    C.PI_TYPE: "XEP",
    C.PI_PROTOCOLS: ["XEP-0272"],
    C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0166", "XEP-0167", "XEP-0249"],
    C.PI_MAIN: "XEP_0272",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(
        "Allow to run A/V conference with several participant using P2P connections. "
        "The number of participant must not be to big to make it work correctly."
    ),
}

NS_MUJI = "http://telepathy.freedesktop.org/muji"
PRESENCE_MUJI = f'/presence/muji[@xmlns="{NS_MUJI}"]'


class XEP_0272:

    def __init__(self, host):
        log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
        self.host = host
        host.register_namespace("muji", NS_MUJI)
        self._muc: plugin_xep_0045.XEP_0045 = host.plugins["XEP-0045"]
        self._muc_invite: plugin_xep_0249.XEP_0249 = host.plugins["XEP-0249"]
        self._j: plugin_xep_0166.XEP_0166 = host.plugins["XEP-0166"]
        self._rtp: plugin_xep_0167.XEP_0167 = host.plugins["XEP-0167"]
        host.bridge.add_method(
            "call_group_start",
            ".plugin",
            in_sign="asss",
            out_sign="s",
            method=self._call_group_start,
            async_=True,
        )
        host.bridge.add_method(
            "call_group_data_set",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._call_group_data_set,
            async_=True,
        )
        host.bridge.add_signal("call_group_setup", ".plugin", signature="sss")

    def get_handler(self, client):
        return XEP_0272_handler(self)

    def _call_group_start(
        self,
        entities_s: str,
        extra_s: str,
        profile_key: str,
    ) -> defer.Deferred[str]:
        client = self.host.get_client(profile_key)
        d = defer.ensureDeferred(
            self.call_group_start(
                client, [jid.JID(e) for e in entities_s], data_format.deserialise(extra_s)
            )
        )
        d.addCallback(data_format.serialise)
        return d

    def _call_group_data_set(
        self,
        room_jid_s: str,
        call_data_s: str,
        profile_key: str,
    ) -> None:
        client = self.host.get_client(profile_key)
        defer.ensureDeferred(
            self.call_group_data_set(
                client, jid.JID(room_jid_s), data_format.deserialise(call_data_s)
            )
        )

    async def on_room_join(self, room: muc.Room, user: muc.User) -> None:
        pass

    async def on_room_left(self, room: muc.Room, user: muc.User) -> None:
        pass

    def on_muji_request(
        self, presence_elt: domish.Element, client: SatXMPPEntity
    ) -> None:
        from_jid = jid.JID(presence_elt["from"])
        room_jid = from_jid.userhostJID()
        try:
            room = self._muc.get_room(client, room_jid)
        except exceptions.NotFound:
            log.warning(
                f"Ignoring MUJI element from an unknown room: {presence_elt.toXml()}"
            )
            return
        if from_jid == self._muc.get_room_user_jid(client, room_jid):
            own_jid = True
        else:
            own_jid = False
        muji_data = self.get_muji_data(room)
        muji_elt = presence_elt.muji
        assert muji_elt is not None
        try:
            next(muji_elt.elements(NS_MUJI, "preparing"))
        except StopIteration:
            preparing_state = False
        else:
            preparing_state = True

        if preparing_state:
            if own_jid:
                # we have received the broadcast of our own preparation message
                muji_data["done_collecting"] = True
                self.try_to_finish_preparation(client, room, muji_data)
            elif not muji_data.get("done_collecting", False):
                # other entities currently doing preparation
                preparing_jids = muji_data["preparing_jids"]
                preparing_jids.add(from_jid)
        elif not own_jid:
            done_preparing = muji_data.get("done_preparing", False)
            # if we are still in preparation, we remove the JID from data data we are
            # still waiting for, and we check if we can finish the preparation.
            if not done_preparing:
                allowed_payloads = muji_data.setdefault("allowed_payloads")
                # TODO: check allowed_payloads
                preparing_jids = muji_data["preparing_jids"]
                preparing_jids.discard(from_jid)
                log.debug(
                    f"[{client.profile}] received call data for {from_jid}.\n"
                    f"{preparing_jids=}"
                )
                muji_data["to_call"].add(from_jid)
                self.try_to_finish_preparation(client, room, muji_data)

    def try_to_finish_preparation(
        self, client: SatXMPPEntity, room: muc.Room, muji_data: dict
    ) -> None:
        """Finish preparation if possible.

        This method checks if preparations of other JIDs needs to be waited, and if not,
        finishes our own preparation.
        """
        preparing_jids = muji_data.get("preparing_jids")
        if not preparing_jids:
            # No preparation left to wait, we can finish our own.
            muji_data = self.get_muji_data(room)
            muji_data["done_preparing"] = True

            log.debug(f"[{client.profile}] Done preparing.")

            # We ask frontend to initiate the session, so we know supported codecs.
            self.host.bridge.call_group_setup(
                room.roomJID.full(),
                data_format.serialise(
                    {"to_call": [entity.full() for entity in muji_data["to_call"]]}
                ),
                client.profile,
            )

    async def call_group_data_set(
        self,
        client: SatXMPPEntity,
        room_jid: jid.JID,
        call_data: dict,
    ) -> None:
        """Called when frontends has prepared group call.

        Group call data will be advertised on the MUC, and call will be initiated with all
        participant which where in preparing state when we made our own preparation.

        @param client: SatXMPPEntity instance.
        @param room_jid: JID of the room used for MUJI coordination.
        @param call_data: call data similar to the one used in ``XEP-0167.call_start``.
        """
        try:
            room = self._muc.get_room(client, room_jid)
        except exceptions.NotFound:
            log.warning(f"Ignoring MUJI element from an unknown room: {room_jid}")
            return
        sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR)
        presence_elt, muji_elt = self.generate_presence_and_muji(client, room)
        for media_type, media_data in sdp_data.items():
            if media_type in ["audio", "video"]:
                application_data = media_data["application_data"]
                content_elt = muji_elt.addElement("content")
                # XXX: the initiator will be actually the last to join, but this attribute
                #   will be ignored anyway.
                content_elt["creator"] = self._j.ROLE_INITIATOR
                content_elt["name"] = media_data["id"]
                description_elt = mapping.build_description(
                    media_type, application_data, {}
                )
                content_elt.addChild(description_elt)

                # we only want to keep payload types
                to_remove = []
                for child_elt in description_elt.children:
                    if child_elt.name != "payload-type":
                        to_remove.append(child_elt)
                for elt in to_remove:
                    description_elt.children.remove(elt)

        await client.a_send(presence_elt)

    def get_muji_data(self, room: muc.Room) -> dict:
        """Get MUJI related data for this room

        MUJI data is stored in the room object, so it will be deleted when the room object
        itself will be deleted.
        """
        try:
            return room._xep_0272_data
        except AttributeError:
            data = room._xep_0272_data = {"preparing_jids": set(), "to_call": set()}
            return data

    def generate_presence_and_muji(
        self, client: SatXMPPEntity, room: muc.Room
    ) -> tuple[domish.Element, domish.Element]:
        """Generate a <presence> stanza with MUJI element"""
        presence_elt = domish.Element((None, "presence"))
        presence_elt["from"] = client.jid.full()
        presence_elt["to"] = room.roomJID.full()
        muji_elt = presence_elt.addElement((NS_MUJI, "muji"))
        return presence_elt, muji_elt

    async def start_preparation(self, client: SatXMPPEntity, room: muc.Room) -> None:
        """Start preparation of MUJI"""
        presence_elt, muji_elt = self.generate_presence_and_muji(client, room)
        muji_elt.addElement("preparing")
        await client.a_send(presence_elt)

    async def call_group_start(
        self,
        client: SatXMPPEntity,
        entities: list[jid.JID],
        extra: dict,
    ) -> dict:
        """Initiate a group call with the given peers.

        A MUC room will be created, and people in ``list_entities`` will be invited. MUJI
        session will then been started which each of them upon they arrival.
        @param entities: JID of the peer to initiate a call session with.
        @param extra: Extra data.

        @return: group call data, with the following keys:

            ``room_jid``
                MUC room where the MUJI coordination is done. It may also be used for
                normal chatting.

        @raise exceptions.ExternalRequestError: The MUC room can't be created or joined.
        """
        log.debug(f"{client.profile} is starting a MUJI group call with {entities}")
        room_jid = self._muc.get_unique_name(client, prefix="_muji_")
        room = await self._muc.join(client, room_jid)
        log.info(f"[{client.profile}] MUJI room created at {room_jid}")
        if not room:
            raise exceptions.ExternalRequestError("Can't create or join group chat room.")
        await self.start_preparation(client, room)
        room.on_joined_callbacks.append(self.on_room_join)
        room.on_left_callbacks.append(self.on_room_left)
        for entity in entities:
            self._muc_invite.invite(
                client,
                entity,
                room_jid,
                reason="You have been invited to participate in a group call.",
            )
        return {"room_jid": room_jid.full()}


@implementer(iwokkel.IDisco)
class XEP_0272_handler(XMPPHandler):

    def __init__(self, plugin_parent):
        self.plugin_parent = plugin_parent

    def connectionInitialized(self):
        self.xmlstream.addObserver(
            PRESENCE_MUJI, self.plugin_parent.on_muji_request, client=self.parent
        )

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_MUJI)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []