diff libervia/backend/plugins/plugin_xep_0272.py @ 4245:a7d4007a8fa5

plugin XEP-0272: implement XEP-0272: Multiparty Jingle (Muji) rel 429
author Goffi <goffi@goffi.org>
date Wed, 15 May 2024 17:34:46 +0200
parents
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0272.py	Wed May 15 17:34:46 2024 +0200
@@ -0,0 +1,333 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from wokkel import muc
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.plugins import plugin_xep_0166
+from libervia.backend.plugins import plugin_xep_0167
+from libervia.backend.plugins.plugin_xep_0167 import mapping
+from libervia.backend.tools.common import data_format
+
+from . import plugin_xep_0045, plugin_xep_0249
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Multiparty Jingle (Muji)",
+    C.PI_IMPORT_NAME: "XEP-0272",
+    C.PI_TYPE: "XEP",
+    C.PI_PROTOCOLS: ["XEP-0272"],
+    C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0166", "XEP-0167", "XEP-0249"],
+    C.PI_MAIN: "XEP_0272",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _(
+        "Allow to run A/V conference with several participant using P2P connections. "
+        "The number of participant must not be to big to make it work correctly."
+    ),
+}
+
+NS_MUJI = "http://telepathy.freedesktop.org/muji"
+PRESENCE_MUJI = f'/presence/muji[@xmlns="{NS_MUJI}"]'
+
+
+class XEP_0272:
+
+    def __init__(self, host):
+        log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
+        self.host = host
+        host.register_namespace("muji", NS_MUJI)
+        self._muc: plugin_xep_0045.XEP_0045 = host.plugins["XEP-0045"]
+        self._muc_invite: plugin_xep_0249.XEP_0249 = host.plugins["XEP-0249"]
+        self._j: plugin_xep_0166.XEP_0166 = host.plugins["XEP-0166"]
+        self._rtp: plugin_xep_0167.XEP_0167 = host.plugins["XEP-0167"]
+        host.bridge.add_method(
+            "call_group_start",
+            ".plugin",
+            in_sign="asss",
+            out_sign="s",
+            method=self._call_group_start,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "call_group_data_set",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._call_group_data_set,
+            async_=True,
+        )
+        host.bridge.add_signal("call_group_setup", ".plugin", signature="sss")
+
+    def get_handler(self, client):
+        return XEP_0272_handler(self)
+
+    def _call_group_start(
+        self,
+        entities_s: str,
+        extra_s: str,
+        profile_key: str,
+    ) -> defer.Deferred[str]:
+        client = self.host.get_client(profile_key)
+        d = defer.ensureDeferred(
+            self.call_group_start(
+                client, [jid.JID(e) for e in entities_s], data_format.deserialise(extra_s)
+            )
+        )
+        d.addCallback(data_format.serialise)
+        return d
+
+    def _call_group_data_set(
+        self,
+        room_jid_s: str,
+        call_data_s: str,
+        profile_key: str,
+    ) -> None:
+        client = self.host.get_client(profile_key)
+        defer.ensureDeferred(
+            self.call_group_data_set(
+                client, jid.JID(room_jid_s), data_format.deserialise(call_data_s)
+            )
+        )
+
+    async def on_room_join(self, room: muc.Room, user: muc.User) -> None:
+        pass
+
+    async def on_room_left(self, room: muc.Room, user: muc.User) -> None:
+        pass
+
+    def on_muji_request(
+        self, presence_elt: domish.Element, client: SatXMPPEntity
+    ) -> None:
+        from_jid = jid.JID(presence_elt["from"])
+        room_jid = from_jid.userhostJID()
+        try:
+            room = self._muc.get_room(client, room_jid)
+        except exceptions.NotFound:
+            log.warning(
+                f"Ignoring MUJI element from an unknown room: {presence_elt.toXml()}"
+            )
+            return
+        if from_jid == self._muc.get_room_user_jid(client, room_jid):
+            own_jid = True
+        else:
+            own_jid = False
+        muji_data = self.get_muji_data(room)
+        muji_elt = presence_elt.muji
+        assert muji_elt is not None
+        try:
+            next(muji_elt.elements(NS_MUJI, "preparing"))
+        except StopIteration:
+            preparing_state = False
+        else:
+            preparing_state = True
+
+        if preparing_state:
+            if own_jid:
+                # we have received the broadcast of our own preparation message
+                muji_data["done_collecting"] = True
+                self.try_to_finish_preparation(client, room, muji_data)
+            elif not muji_data.get("done_collecting", False):
+                # other entities currently doing preparation
+                preparing_jids = muji_data["preparing_jids"]
+                preparing_jids.add(from_jid)
+        elif not own_jid:
+            done_preparing = muji_data.get("done_preparing", False)
+            # if we are still in preparation, we remove the JID from data data we are
+            # still waiting for, and we check if we can finish the preparation.
+            if not done_preparing:
+                allowed_payloads = muji_data.setdefault("allowed_payloads")
+                # TODO: check allowed_payloads
+                preparing_jids = muji_data["preparing_jids"]
+                preparing_jids.discard(from_jid)
+                log.debug(
+                    f"[{client.profile}] received call data for {from_jid}.\n"
+                    f"{preparing_jids=}"
+                )
+                muji_data["to_call"].add(from_jid)
+                self.try_to_finish_preparation(client, room, muji_data)
+
+    def try_to_finish_preparation(
+        self, client: SatXMPPEntity, room: muc.Room, muji_data: dict
+    ) -> None:
+        """Finish preparation if possible.
+
+        This method checks if preparations of other JIDs needs to be waited, and if not,
+        finishes our own preparation.
+        """
+        preparing_jids = muji_data.get("preparing_jids")
+        if not preparing_jids:
+            # No preparation left to wait, we can finish our own.
+            muji_data = self.get_muji_data(room)
+            muji_data["done_preparing"] = True
+
+            log.debug(f"[{client.profile}] Done preparing.")
+
+            # We ask frontend to initiate the session, so we know supported codecs.
+            self.host.bridge.call_group_setup(
+                room.roomJID.full(),
+                data_format.serialise({
+                    "to_call": [entity.full() for entity in muji_data["to_call"]]
+                }),
+                client.profile,
+            )
+
+    async def call_group_data_set(
+        self,
+        client: SatXMPPEntity,
+        room_jid: jid.JID,
+        call_data: dict,
+    ) -> None:
+        """Called when frontends has prepared group call.
+
+        Group call data will be advertised on the MUC, and call will be initiated with all
+        participant which where in preparing state when we made our own preparation.
+
+        @param client: SatXMPPEntity instance.
+        @param room_jid: JID of the room used for MUJI coordination.
+        @param call_data: call data similar to the one used in ``XEP-0167.call_start``.
+        """
+        try:
+            room = self._muc.get_room(client, room_jid)
+        except exceptions.NotFound:
+            log.warning(
+                f"Ignoring MUJI element from an unknown room: {room_jid}"
+            )
+            return
+        sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR)
+        presence_elt, muji_elt = self.generate_presence_and_muji(client, room)
+        for media_type, media_data in sdp_data.items():
+            if media_type in ["audio", "video"]:
+                application_data = media_data["application_data"]
+                content_elt = muji_elt.addElement("content")
+                # XXX: the initiator will be actually the last to join, but this attribute
+                #   will be ignored anyway.
+                content_elt["creator"] = self._j.ROLE_INITIATOR
+                content_elt["name"] = media_data["id"]
+                description_elt = mapping.build_description(
+                    media_type, application_data, {}
+                )
+                content_elt.addChild(description_elt)
+
+                # we only want to keep payload types
+                to_remove = []
+                for child_elt in description_elt.children:
+                    if child_elt.name != "payload-type":
+                        to_remove.append(child_elt)
+                for elt in to_remove:
+                    description_elt.children.remove(elt)
+
+        await client.a_send(presence_elt)
+
+    def get_muji_data(self, room: muc.Room) -> dict:
+        """Get MUJI related data for this room
+
+        MUJI data is stored in the room object, so it will be deleted when the room object
+        itself will be deleted.
+        """
+        try:
+            return room._xep_0272_data
+        except AttributeError:
+            data = room._xep_0272_data = {
+                "preparing_jids": set(),
+                "to_call": set()
+            }
+            return data
+
+    def generate_presence_and_muji(
+        self, client: SatXMPPEntity, room: muc.Room
+    ) -> tuple[domish.Element, domish.Element]:
+        """Generate a <presence> stanza with MUJI element"""
+        presence_elt = domish.Element((None, "presence"))
+        presence_elt["from"] = client.jid.full()
+        presence_elt["to"] = room.roomJID.full()
+        muji_elt = presence_elt.addElement((NS_MUJI, "muji"))
+        return presence_elt, muji_elt
+
+    async def start_preparation(self, client: SatXMPPEntity, room: muc.Room) -> None:
+        """Start preparation of MUJI"""
+        presence_elt, muji_elt = self.generate_presence_and_muji(client, room)
+        muji_elt.addElement("preparing")
+        await client.a_send(presence_elt)
+
+    async def call_group_start(
+        self,
+        client: SatXMPPEntity,
+        entities: list[jid.JID],
+        extra: dict,
+    ) -> dict:
+        """Initiate a group call with the given peers.
+
+        A MUC room will be created, and people in ``list_entities`` will be invited. MUJI
+        session will then been started which each of them upon they arrival.
+        @param entities: JID of the peer to initiate a call session with.
+        @param extra: Extra data.
+
+        @return: group call data, with the following keys:
+
+            ``room_jid``
+                MUC room where the MUJI coordination is done. It may also be used for
+                normal chatting.
+
+        @raise exceptions.ExternalRequestError: The MUC room can't be created or joined.
+        """
+        log.debug(f"{client.profile} is starting a MUJI group call with {entities}")
+        room_jid = self._muc.get_unique_name(client, prefix="_muji_")
+        room = await self._muc.join(client, room_jid)
+        log.info(f"[{client.profile}] MUJI room created at {room_jid}")
+        if not room:
+            raise exceptions.ExternalRequestError("Can't create or join group chat room.")
+        await self.start_preparation(client, room)
+        room.on_joined_callbacks.append(self.on_room_join)
+        room.on_left_callbacks.append(self.on_room_left)
+        for entity in entities:
+            self._muc_invite.invite(
+                client,
+                entity,
+                room_jid,
+                reason="You have been invited to participate in a group call.",
+            )
+        return {"room_jid": room_jid.full()}
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0272_handler(XMPPHandler):
+
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+
+    def connectionInitialized(self):
+        self.xmlstream.addObserver(
+            PRESENCE_MUJI, self.plugin_parent.on_muji_request, client=self.parent
+        )
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_MUJI)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []