Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0272.py @ 4245:a7d4007a8fa5
plugin XEP-0272: implement XEP-0272: Multiparty Jingle (Muji)
rel 429
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 15 May 2024 17:34:46 +0200 |
parents | |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0272.py Wed May 15 17:34:46 2024 +0200 @@ -0,0 +1,333 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from wokkel import muc +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.plugins import plugin_xep_0166 +from libervia.backend.plugins import plugin_xep_0167 +from libervia.backend.plugins.plugin_xep_0167 import mapping +from libervia.backend.tools.common import data_format + +from . import plugin_xep_0045, plugin_xep_0249 + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "Multiparty Jingle (Muji)", + C.PI_IMPORT_NAME: "XEP-0272", + C.PI_TYPE: "XEP", + C.PI_PROTOCOLS: ["XEP-0272"], + C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0166", "XEP-0167", "XEP-0249"], + C.PI_MAIN: "XEP_0272", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _( + "Allow to run A/V conference with several participant using P2P connections. " + "The number of participant must not be to big to make it work correctly." + ), +} + +NS_MUJI = "http://telepathy.freedesktop.org/muji" +PRESENCE_MUJI = f'/presence/muji[@xmlns="{NS_MUJI}"]' + + +class XEP_0272: + + def __init__(self, host): + log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') + self.host = host + host.register_namespace("muji", NS_MUJI) + self._muc: plugin_xep_0045.XEP_0045 = host.plugins["XEP-0045"] + self._muc_invite: plugin_xep_0249.XEP_0249 = host.plugins["XEP-0249"] + self._j: plugin_xep_0166.XEP_0166 = host.plugins["XEP-0166"] + self._rtp: plugin_xep_0167.XEP_0167 = host.plugins["XEP-0167"] + host.bridge.add_method( + "call_group_start", + ".plugin", + in_sign="asss", + out_sign="s", + method=self._call_group_start, + async_=True, + ) + host.bridge.add_method( + "call_group_data_set", + ".plugin", + in_sign="sss", + out_sign="", + method=self._call_group_data_set, + async_=True, + ) + host.bridge.add_signal("call_group_setup", ".plugin", signature="sss") + + def get_handler(self, client): + return XEP_0272_handler(self) + + def _call_group_start( + self, + entities_s: str, + extra_s: str, + profile_key: str, + ) -> defer.Deferred[str]: + client = self.host.get_client(profile_key) + d = defer.ensureDeferred( + self.call_group_start( + client, [jid.JID(e) for e in entities_s], data_format.deserialise(extra_s) + ) + ) + d.addCallback(data_format.serialise) + return d + + def _call_group_data_set( + self, + room_jid_s: str, + call_data_s: str, + profile_key: str, + ) -> None: + client = self.host.get_client(profile_key) + defer.ensureDeferred( + self.call_group_data_set( + client, jid.JID(room_jid_s), data_format.deserialise(call_data_s) + ) + ) + + async def on_room_join(self, room: muc.Room, user: muc.User) -> None: + pass + + async def on_room_left(self, room: muc.Room, user: muc.User) -> None: + pass + + def on_muji_request( + self, presence_elt: domish.Element, client: SatXMPPEntity + ) -> None: + from_jid = jid.JID(presence_elt["from"]) + room_jid = from_jid.userhostJID() + try: + room = self._muc.get_room(client, room_jid) + except exceptions.NotFound: + log.warning( + f"Ignoring MUJI element from an unknown room: {presence_elt.toXml()}" + ) + return + if from_jid == self._muc.get_room_user_jid(client, room_jid): + own_jid = True + else: + own_jid = False + muji_data = self.get_muji_data(room) + muji_elt = presence_elt.muji + assert muji_elt is not None + try: + next(muji_elt.elements(NS_MUJI, "preparing")) + except StopIteration: + preparing_state = False + else: + preparing_state = True + + if preparing_state: + if own_jid: + # we have received the broadcast of our own preparation message + muji_data["done_collecting"] = True + self.try_to_finish_preparation(client, room, muji_data) + elif not muji_data.get("done_collecting", False): + # other entities currently doing preparation + preparing_jids = muji_data["preparing_jids"] + preparing_jids.add(from_jid) + elif not own_jid: + done_preparing = muji_data.get("done_preparing", False) + # if we are still in preparation, we remove the JID from data data we are + # still waiting for, and we check if we can finish the preparation. + if not done_preparing: + allowed_payloads = muji_data.setdefault("allowed_payloads") + # TODO: check allowed_payloads + preparing_jids = muji_data["preparing_jids"] + preparing_jids.discard(from_jid) + log.debug( + f"[{client.profile}] received call data for {from_jid}.\n" + f"{preparing_jids=}" + ) + muji_data["to_call"].add(from_jid) + self.try_to_finish_preparation(client, room, muji_data) + + def try_to_finish_preparation( + self, client: SatXMPPEntity, room: muc.Room, muji_data: dict + ) -> None: + """Finish preparation if possible. + + This method checks if preparations of other JIDs needs to be waited, and if not, + finishes our own preparation. + """ + preparing_jids = muji_data.get("preparing_jids") + if not preparing_jids: + # No preparation left to wait, we can finish our own. + muji_data = self.get_muji_data(room) + muji_data["done_preparing"] = True + + log.debug(f"[{client.profile}] Done preparing.") + + # We ask frontend to initiate the session, so we know supported codecs. + self.host.bridge.call_group_setup( + room.roomJID.full(), + data_format.serialise({ + "to_call": [entity.full() for entity in muji_data["to_call"]] + }), + client.profile, + ) + + async def call_group_data_set( + self, + client: SatXMPPEntity, + room_jid: jid.JID, + call_data: dict, + ) -> None: + """Called when frontends has prepared group call. + + Group call data will be advertised on the MUC, and call will be initiated with all + participant which where in preparing state when we made our own preparation. + + @param client: SatXMPPEntity instance. + @param room_jid: JID of the room used for MUJI coordination. + @param call_data: call data similar to the one used in ``XEP-0167.call_start``. + """ + try: + room = self._muc.get_room(client, room_jid) + except exceptions.NotFound: + log.warning( + f"Ignoring MUJI element from an unknown room: {room_jid}" + ) + return + sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR) + presence_elt, muji_elt = self.generate_presence_and_muji(client, room) + for media_type, media_data in sdp_data.items(): + if media_type in ["audio", "video"]: + application_data = media_data["application_data"] + content_elt = muji_elt.addElement("content") + # XXX: the initiator will be actually the last to join, but this attribute + # will be ignored anyway. + content_elt["creator"] = self._j.ROLE_INITIATOR + content_elt["name"] = media_data["id"] + description_elt = mapping.build_description( + media_type, application_data, {} + ) + content_elt.addChild(description_elt) + + # we only want to keep payload types + to_remove = [] + for child_elt in description_elt.children: + if child_elt.name != "payload-type": + to_remove.append(child_elt) + for elt in to_remove: + description_elt.children.remove(elt) + + await client.a_send(presence_elt) + + def get_muji_data(self, room: muc.Room) -> dict: + """Get MUJI related data for this room + + MUJI data is stored in the room object, so it will be deleted when the room object + itself will be deleted. + """ + try: + return room._xep_0272_data + except AttributeError: + data = room._xep_0272_data = { + "preparing_jids": set(), + "to_call": set() + } + return data + + def generate_presence_and_muji( + self, client: SatXMPPEntity, room: muc.Room + ) -> tuple[domish.Element, domish.Element]: + """Generate a <presence> stanza with MUJI element""" + presence_elt = domish.Element((None, "presence")) + presence_elt["from"] = client.jid.full() + presence_elt["to"] = room.roomJID.full() + muji_elt = presence_elt.addElement((NS_MUJI, "muji")) + return presence_elt, muji_elt + + async def start_preparation(self, client: SatXMPPEntity, room: muc.Room) -> None: + """Start preparation of MUJI""" + presence_elt, muji_elt = self.generate_presence_and_muji(client, room) + muji_elt.addElement("preparing") + await client.a_send(presence_elt) + + async def call_group_start( + self, + client: SatXMPPEntity, + entities: list[jid.JID], + extra: dict, + ) -> dict: + """Initiate a group call with the given peers. + + A MUC room will be created, and people in ``list_entities`` will be invited. MUJI + session will then been started which each of them upon they arrival. + @param entities: JID of the peer to initiate a call session with. + @param extra: Extra data. + + @return: group call data, with the following keys: + + ``room_jid`` + MUC room where the MUJI coordination is done. It may also be used for + normal chatting. + + @raise exceptions.ExternalRequestError: The MUC room can't be created or joined. + """ + log.debug(f"{client.profile} is starting a MUJI group call with {entities}") + room_jid = self._muc.get_unique_name(client, prefix="_muji_") + room = await self._muc.join(client, room_jid) + log.info(f"[{client.profile}] MUJI room created at {room_jid}") + if not room: + raise exceptions.ExternalRequestError("Can't create or join group chat room.") + await self.start_preparation(client, room) + room.on_joined_callbacks.append(self.on_room_join) + room.on_left_callbacks.append(self.on_room_left) + for entity in entities: + self._muc_invite.invite( + client, + entity, + room_jid, + reason="You have been invited to participate in a group call.", + ) + return {"room_jid": room_jid.full()} + + +@implementer(iwokkel.IDisco) +class XEP_0272_handler(XMPPHandler): + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def connectionInitialized(self): + self.xmlstream.addObserver( + PRESENCE_MUJI, self.plugin_parent.on_muji_request, client=self.parent + ) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_MUJI)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []