changeset 4245:a7d4007a8fa5

plugin XEP-0272: implement XEP-0272: Multiparty Jingle (Muji) rel 429
author Goffi <goffi@goffi.org>
date Wed, 15 May 2024 17:34:46 +0200 (7 months ago)
parents 05f01ac1d5b2
children 5eb13251fd75
files libervia/backend/core/constants.py libervia/backend/plugins/plugin_xep_0045.py libervia/backend/plugins/plugin_xep_0167/__init__.py libervia/backend/plugins/plugin_xep_0249.py libervia/backend/plugins/plugin_xep_0272.py
diffstat 5 files changed, 381 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/core/constants.py	Sat May 11 13:52:43 2024 +0200
+++ b/libervia/backend/core/constants.py	Wed May 15 17:34:46 2024 +0200
@@ -354,7 +354,9 @@
     META_TYPE_CONFIRM = "confirm"
     META_TYPE_FILE = "file"
     META_TYPE_CALL = "call"
+    META_TYPE_GROUP_CALL = "group-call"
     META_TYPE_REMOTE_CONTROL = "remote-control"
+    META_TYPE_MUC_INVIRATION = "muc-invitation"
     META_TYPE_OVERWRITE = "overwrite"
     META_TYPE_NOT_IN_ROSTER_LEAK = "not_in_roster_leak"
     META_SUBTYPE_CALL_AUDIO = "audio"
--- a/libervia/backend/plugins/plugin_xep_0045.py	Sat May 11 13:52:43 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0045.py	Wed May 15 17:34:46 2024 +0200
@@ -21,6 +21,7 @@
 from typing import Optional
 import uuid
 
+import shortuuid
 from twisted.internet import defer
 from twisted.python import failure
 from twisted.words.protocols.jabber import jid
@@ -80,7 +81,7 @@
         self.room = room
 
 
-class XEP_0045(object):
+class XEP_0045:
     # TODO: handle invitations
     # FIXME: this plugin need a good cleaning, join method is messy
 
@@ -512,25 +513,33 @@
         client = self.host.get_client(profile_key)
         return self.get_unique_name(client, muc_service or None).full()
 
-    def get_unique_name(self, client, muc_service=None):
+    def get_unique_name(
+        self,
+        client: SatXMPPEntity,
+        muc_service: jid.JID|None = None,
+        prefix: str = ""
+    ) -> jid.JID:
         """Return unique name for a room, avoiding collision
 
-        @param muc_service (jid.JID) : leave empty string to use the default service
-        @return: jid.JID (unique room bare JID)
+        @param client: Client instance.
+        @param muc_service: leave empty string to use the default service
+        @param prefix: prefix to use in room name.
+        @return: unique room bare JID
         """
-        # TODO: we should use #RFC-0045 10.1.4 when available here
-        room_name = str(uuid.uuid4())
+        room_name = f"{prefix}{shortuuid.uuid()}"
         if muc_service is None:
             try:
                 muc_service = client.muc_service
             except AttributeError:
-                raise exceptions.NotReady("Main server MUC service has not been checked yet")
+                raise exceptions.NotReady(
+                    "Main server MUC service has not been checked yet"
+                )
             if muc_service is None:
                 log.warning(_("No MUC service found on main server"))
                 raise exceptions.FeatureNotFound
 
         muc_service = muc_service.userhost()
-        return jid.JID("{}@{}".format(room_name, muc_service))
+        return jid.JID(f"{room_name}@{muc_service}")
 
     def get_default_muc(self):
         """Return the default MUC.
@@ -600,6 +609,8 @@
                 self._join_eb(failure.Failure(e), client, room_jid, nick, password)
             )
         else:
+            room.on_joined_callbacks = []
+            room.on_left_callbacks = []
             await defer.ensureDeferred(
                 self._join_cb(room, client, room_jid, nick)
             )
@@ -1258,6 +1269,8 @@
         else:
             if not room.fully_joined.called:
                 return
+            for cb in room.on_joined_callbacks:
+                defer.ensureDeferred(cb(room, user))
             try:
                 self._changing_nicks.remove(user.nick)
             except KeyError:
@@ -1313,6 +1326,8 @@
             if not room.fully_joined.called:
                 return
             log.debug(_("user {nick} left room {room_id}").format(nick=user.nick, room_id=room.occupantJID.userhost()))
+            for cb in room.on_left_callbacks:
+                defer.ensureDeferred(cb(room, user))
             extra = {'info_type': ROOM_USER_LEFT,
                      'user_affiliation': user.affiliation,
                      'user_role': user.role,
--- a/libervia/backend/plugins/plugin_xep_0167/__init__.py	Sat May 11 13:52:43 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py	Wed May 15 17:34:46 2024 +0200
@@ -17,7 +17,6 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 from typing import Optional
-import uuid
 
 from twisted.internet import reactor
 from twisted.internet import defer
@@ -579,7 +578,7 @@
         elif action == self._j.A_PREPARE_INITIATOR:
             application_data["peer_data"] = mapping.parse_description(desc_elt)
         elif action == self._j.A_SESSION_ACCEPT:
-            pass # self.send_answer_sdp(client, session)
+            self.send_answer_sdp(client, session)
         else:
             log.warning(f"FIXME: unmanaged action {action}")
 
--- a/libervia/backend/plugins/plugin_xep_0249.py	Sat May 11 13:52:43 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0249.py	Wed May 15 17:34:46 2024 +0200
@@ -25,6 +25,7 @@
 
 from libervia.backend.core import exceptions
 from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
 from libervia.backend.core.i18n import D_, _
 from libervia.backend.core.log import getLogger
 from libervia.backend.tools import xml_tools
@@ -57,7 +58,7 @@
 }
 
 
-class XEP_0249(object):
+class XEP_0249:
 
     params = """
     <params>
@@ -112,12 +113,20 @@
         client = self.host.get_client(profile_key)
         self.invite(client, jid.JID(guest_jid_s), jid.JID(room_jid_s, options))
 
-    def invite(self, client, guest, room, options={}):
+    def invite(
+        self,
+        client: SatXMPPEntity,
+        guest: jid.JID,
+        room: jid.JID,
+        # the dict is only used internally, so we can safely use a default dict instead of
+        # None here.
+        **options
+    ) -> None:
         """Invite a user to a room
 
-        @param guest(jid.JID): jid of the user to invite
-        @param room(jid.JID): jid of the room where the user is invited
-        @param options(dict): attribute with extra info (reason, password) as in #XEP-0249
+        @param guest: jid of the user to invite
+        @param room: jid of the room where the user is invited
+        @param options: attribute with extra info (reason, password) as in #XEP-0249
         """
         message = domish.Element((None, "message"))
         message["to"] = guest.full()
@@ -128,7 +137,6 @@
                 log.warning("Ignoring invalid invite option: {}".format(key))
                 continue
             x_elt[key] = value
-        #  there is not body in this message, so we can use directly send()
         client.send(message)
 
     def _accept(self, room_jid, profile_key=C.PROF_KEY_NONE):
@@ -188,13 +196,20 @@
             title = D_("MUC invitation")
             xml_tools.quick_note(self.host, client, msg, title, C.XMLUI_DATA_LVL_INFO)
         else:  # leave the default value here
+            action_extra = {
+                "type": C.META_TYPE_CONFIRM,
+                "subtype": C.META_TYPE_MUC_INVIRATION,
+                "from_jid": from_jid_s,
+                "room_jid": room_jid_s
+            }
             confirm_msg = D_(
                 "You have been invited by %(user)s to join the room %(room)s. "
                 "Do you accept?"
             ) % {"user": from_jid_s, "room": room_jid_s}
             confirm_title = D_("MUC invitation")
             d = xml_tools.defer_confirm(
-                self.host, confirm_msg, confirm_title, profile=client.profile
+                self.host, confirm_msg, confirm_title, profile=client.profile,
+                action_extra=action_extra
             )
 
             def accept_cb(accepted):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0272.py	Wed May 15 17:34:46 2024 +0200
@@ -0,0 +1,333 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from wokkel import muc
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.plugins import plugin_xep_0166
+from libervia.backend.plugins import plugin_xep_0167
+from libervia.backend.plugins.plugin_xep_0167 import mapping
+from libervia.backend.tools.common import data_format
+
+from . import plugin_xep_0045, plugin_xep_0249
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Multiparty Jingle (Muji)",
+    C.PI_IMPORT_NAME: "XEP-0272",
+    C.PI_TYPE: "XEP",
+    C.PI_PROTOCOLS: ["XEP-0272"],
+    C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0166", "XEP-0167", "XEP-0249"],
+    C.PI_MAIN: "XEP_0272",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _(
+        "Allow to run A/V conference with several participant using P2P connections. "
+        "The number of participant must not be to big to make it work correctly."
+    ),
+}
+
+NS_MUJI = "http://telepathy.freedesktop.org/muji"
+PRESENCE_MUJI = f'/presence/muji[@xmlns="{NS_MUJI}"]'
+
+
+class XEP_0272:
+
+    def __init__(self, host):
+        log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
+        self.host = host
+        host.register_namespace("muji", NS_MUJI)
+        self._muc: plugin_xep_0045.XEP_0045 = host.plugins["XEP-0045"]
+        self._muc_invite: plugin_xep_0249.XEP_0249 = host.plugins["XEP-0249"]
+        self._j: plugin_xep_0166.XEP_0166 = host.plugins["XEP-0166"]
+        self._rtp: plugin_xep_0167.XEP_0167 = host.plugins["XEP-0167"]
+        host.bridge.add_method(
+            "call_group_start",
+            ".plugin",
+            in_sign="asss",
+            out_sign="s",
+            method=self._call_group_start,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "call_group_data_set",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._call_group_data_set,
+            async_=True,
+        )
+        host.bridge.add_signal("call_group_setup", ".plugin", signature="sss")
+
+    def get_handler(self, client):
+        return XEP_0272_handler(self)
+
+    def _call_group_start(
+        self,
+        entities_s: str,
+        extra_s: str,
+        profile_key: str,
+    ) -> defer.Deferred[str]:
+        client = self.host.get_client(profile_key)
+        d = defer.ensureDeferred(
+            self.call_group_start(
+                client, [jid.JID(e) for e in entities_s], data_format.deserialise(extra_s)
+            )
+        )
+        d.addCallback(data_format.serialise)
+        return d
+
+    def _call_group_data_set(
+        self,
+        room_jid_s: str,
+        call_data_s: str,
+        profile_key: str,
+    ) -> None:
+        client = self.host.get_client(profile_key)
+        defer.ensureDeferred(
+            self.call_group_data_set(
+                client, jid.JID(room_jid_s), data_format.deserialise(call_data_s)
+            )
+        )
+
+    async def on_room_join(self, room: muc.Room, user: muc.User) -> None:
+        pass
+
+    async def on_room_left(self, room: muc.Room, user: muc.User) -> None:
+        pass
+
+    def on_muji_request(
+        self, presence_elt: domish.Element, client: SatXMPPEntity
+    ) -> None:
+        from_jid = jid.JID(presence_elt["from"])
+        room_jid = from_jid.userhostJID()
+        try:
+            room = self._muc.get_room(client, room_jid)
+        except exceptions.NotFound:
+            log.warning(
+                f"Ignoring MUJI element from an unknown room: {presence_elt.toXml()}"
+            )
+            return
+        if from_jid == self._muc.get_room_user_jid(client, room_jid):
+            own_jid = True
+        else:
+            own_jid = False
+        muji_data = self.get_muji_data(room)
+        muji_elt = presence_elt.muji
+        assert muji_elt is not None
+        try:
+            next(muji_elt.elements(NS_MUJI, "preparing"))
+        except StopIteration:
+            preparing_state = False
+        else:
+            preparing_state = True
+
+        if preparing_state:
+            if own_jid:
+                # we have received the broadcast of our own preparation message
+                muji_data["done_collecting"] = True
+                self.try_to_finish_preparation(client, room, muji_data)
+            elif not muji_data.get("done_collecting", False):
+                # other entities currently doing preparation
+                preparing_jids = muji_data["preparing_jids"]
+                preparing_jids.add(from_jid)
+        elif not own_jid:
+            done_preparing = muji_data.get("done_preparing", False)
+            # if we are still in preparation, we remove the JID from data data we are
+            # still waiting for, and we check if we can finish the preparation.
+            if not done_preparing:
+                allowed_payloads = muji_data.setdefault("allowed_payloads")
+                # TODO: check allowed_payloads
+                preparing_jids = muji_data["preparing_jids"]
+                preparing_jids.discard(from_jid)
+                log.debug(
+                    f"[{client.profile}] received call data for {from_jid}.\n"
+                    f"{preparing_jids=}"
+                )
+                muji_data["to_call"].add(from_jid)
+                self.try_to_finish_preparation(client, room, muji_data)
+
+    def try_to_finish_preparation(
+        self, client: SatXMPPEntity, room: muc.Room, muji_data: dict
+    ) -> None:
+        """Finish preparation if possible.
+
+        This method checks if preparations of other JIDs needs to be waited, and if not,
+        finishes our own preparation.
+        """
+        preparing_jids = muji_data.get("preparing_jids")
+        if not preparing_jids:
+            # No preparation left to wait, we can finish our own.
+            muji_data = self.get_muji_data(room)
+            muji_data["done_preparing"] = True
+
+            log.debug(f"[{client.profile}] Done preparing.")
+
+            # We ask frontend to initiate the session, so we know supported codecs.
+            self.host.bridge.call_group_setup(
+                room.roomJID.full(),
+                data_format.serialise({
+                    "to_call": [entity.full() for entity in muji_data["to_call"]]
+                }),
+                client.profile,
+            )
+
+    async def call_group_data_set(
+        self,
+        client: SatXMPPEntity,
+        room_jid: jid.JID,
+        call_data: dict,
+    ) -> None:
+        """Called when frontends has prepared group call.
+
+        Group call data will be advertised on the MUC, and call will be initiated with all
+        participant which where in preparing state when we made our own preparation.
+
+        @param client: SatXMPPEntity instance.
+        @param room_jid: JID of the room used for MUJI coordination.
+        @param call_data: call data similar to the one used in ``XEP-0167.call_start``.
+        """
+        try:
+            room = self._muc.get_room(client, room_jid)
+        except exceptions.NotFound:
+            log.warning(
+                f"Ignoring MUJI element from an unknown room: {room_jid}"
+            )
+            return
+        sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR)
+        presence_elt, muji_elt = self.generate_presence_and_muji(client, room)
+        for media_type, media_data in sdp_data.items():
+            if media_type in ["audio", "video"]:
+                application_data = media_data["application_data"]
+                content_elt = muji_elt.addElement("content")
+                # XXX: the initiator will be actually the last to join, but this attribute
+                #   will be ignored anyway.
+                content_elt["creator"] = self._j.ROLE_INITIATOR
+                content_elt["name"] = media_data["id"]
+                description_elt = mapping.build_description(
+                    media_type, application_data, {}
+                )
+                content_elt.addChild(description_elt)
+
+                # we only want to keep payload types
+                to_remove = []
+                for child_elt in description_elt.children:
+                    if child_elt.name != "payload-type":
+                        to_remove.append(child_elt)
+                for elt in to_remove:
+                    description_elt.children.remove(elt)
+
+        await client.a_send(presence_elt)
+
+    def get_muji_data(self, room: muc.Room) -> dict:
+        """Get MUJI related data for this room
+
+        MUJI data is stored in the room object, so it will be deleted when the room object
+        itself will be deleted.
+        """
+        try:
+            return room._xep_0272_data
+        except AttributeError:
+            data = room._xep_0272_data = {
+                "preparing_jids": set(),
+                "to_call": set()
+            }
+            return data
+
+    def generate_presence_and_muji(
+        self, client: SatXMPPEntity, room: muc.Room
+    ) -> tuple[domish.Element, domish.Element]:
+        """Generate a <presence> stanza with MUJI element"""
+        presence_elt = domish.Element((None, "presence"))
+        presence_elt["from"] = client.jid.full()
+        presence_elt["to"] = room.roomJID.full()
+        muji_elt = presence_elt.addElement((NS_MUJI, "muji"))
+        return presence_elt, muji_elt
+
+    async def start_preparation(self, client: SatXMPPEntity, room: muc.Room) -> None:
+        """Start preparation of MUJI"""
+        presence_elt, muji_elt = self.generate_presence_and_muji(client, room)
+        muji_elt.addElement("preparing")
+        await client.a_send(presence_elt)
+
+    async def call_group_start(
+        self,
+        client: SatXMPPEntity,
+        entities: list[jid.JID],
+        extra: dict,
+    ) -> dict:
+        """Initiate a group call with the given peers.
+
+        A MUC room will be created, and people in ``list_entities`` will be invited. MUJI
+        session will then been started which each of them upon they arrival.
+        @param entities: JID of the peer to initiate a call session with.
+        @param extra: Extra data.
+
+        @return: group call data, with the following keys:
+
+            ``room_jid``
+                MUC room where the MUJI coordination is done. It may also be used for
+                normal chatting.
+
+        @raise exceptions.ExternalRequestError: The MUC room can't be created or joined.
+        """
+        log.debug(f"{client.profile} is starting a MUJI group call with {entities}")
+        room_jid = self._muc.get_unique_name(client, prefix="_muji_")
+        room = await self._muc.join(client, room_jid)
+        log.info(f"[{client.profile}] MUJI room created at {room_jid}")
+        if not room:
+            raise exceptions.ExternalRequestError("Can't create or join group chat room.")
+        await self.start_preparation(client, room)
+        room.on_joined_callbacks.append(self.on_room_join)
+        room.on_left_callbacks.append(self.on_room_left)
+        for entity in entities:
+            self._muc_invite.invite(
+                client,
+                entity,
+                room_jid,
+                reason="You have been invited to participate in a group call.",
+            )
+        return {"room_jid": room_jid.full()}
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0272_handler(XMPPHandler):
+
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+
+    def connectionInitialized(self):
+        self.xmlstream.addObserver(
+            PRESENCE_MUJI, self.plugin_parent.on_muji_request, client=self.parent
+        )
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_MUJI)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []