Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0272.py @ 4320:9658c534287e
plugin XEP-0215, XEP-0376: fix bad calls to `hasFeature`:
`hasFeature` was called like blocking code, missing the `await`. This has been fixed, and
is now using the `memory.disco.has_feature` version.
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 30 Sep 2024 14:14:38 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from wokkel import muc from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.plugins import plugin_xep_0166 from libervia.backend.plugins import plugin_xep_0167 from libervia.backend.plugins.plugin_xep_0167 import mapping from libervia.backend.tools.common import data_format from . import plugin_xep_0045, plugin_xep_0249 log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Multiparty Jingle (Muji)", C.PI_IMPORT_NAME: "XEP-0272", C.PI_TYPE: "XEP", C.PI_PROTOCOLS: ["XEP-0272"], C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0166", "XEP-0167", "XEP-0249"], C.PI_MAIN: "XEP_0272", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( "Allow to run A/V conference with several participant using P2P connections. " "The number of participant must not be to big to make it work correctly." ), } NS_MUJI = "http://telepathy.freedesktop.org/muji" PRESENCE_MUJI = f'/presence/muji[@xmlns="{NS_MUJI}"]' class XEP_0272: def __init__(self, host): log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') self.host = host host.register_namespace("muji", NS_MUJI) self._muc: plugin_xep_0045.XEP_0045 = host.plugins["XEP-0045"] self._muc_invite: plugin_xep_0249.XEP_0249 = host.plugins["XEP-0249"] self._j: plugin_xep_0166.XEP_0166 = host.plugins["XEP-0166"] self._rtp: plugin_xep_0167.XEP_0167 = host.plugins["XEP-0167"] host.bridge.add_method( "call_group_start", ".plugin", in_sign="asss", out_sign="s", method=self._call_group_start, async_=True, ) host.bridge.add_method( "call_group_data_set", ".plugin", in_sign="sss", out_sign="", method=self._call_group_data_set, async_=True, ) host.bridge.add_signal("call_group_setup", ".plugin", signature="sss") def get_handler(self, client): return XEP_0272_handler(self) def _call_group_start( self, entities_s: str, extra_s: str, profile_key: str, ) -> defer.Deferred[str]: client = self.host.get_client(profile_key) d = defer.ensureDeferred( self.call_group_start( client, [jid.JID(e) for e in entities_s], data_format.deserialise(extra_s) ) ) d.addCallback(data_format.serialise) return d def _call_group_data_set( self, room_jid_s: str, call_data_s: str, profile_key: str, ) -> None: client = self.host.get_client(profile_key) defer.ensureDeferred( self.call_group_data_set( client, jid.JID(room_jid_s), data_format.deserialise(call_data_s) ) ) async def on_room_join(self, room: muc.Room, user: muc.User) -> None: pass async def on_room_left(self, room: muc.Room, user: muc.User) -> None: pass def on_muji_request( self, presence_elt: domish.Element, client: SatXMPPEntity ) -> None: from_jid = jid.JID(presence_elt["from"]) room_jid = from_jid.userhostJID() try: room = self._muc.get_room(client, room_jid) except exceptions.NotFound: log.warning( f"Ignoring MUJI element from an unknown room: {presence_elt.toXml()}" ) return if from_jid == self._muc.get_room_user_jid(client, room_jid): own_jid = True else: own_jid = False muji_data = self.get_muji_data(room) muji_elt = presence_elt.muji assert muji_elt is not None try: next(muji_elt.elements(NS_MUJI, "preparing")) except StopIteration: preparing_state = False else: preparing_state = True if preparing_state: if own_jid: # we have received the broadcast of our own preparation message muji_data["done_collecting"] = True self.try_to_finish_preparation(client, room, muji_data) elif not muji_data.get("done_collecting", False): # other entities currently doing preparation preparing_jids = muji_data["preparing_jids"] preparing_jids.add(from_jid) elif not own_jid: done_preparing = muji_data.get("done_preparing", False) # if we are still in preparation, we remove the JID from data data we are # still waiting for, and we check if we can finish the preparation. if not done_preparing: allowed_payloads = muji_data.setdefault("allowed_payloads") # TODO: check allowed_payloads preparing_jids = muji_data["preparing_jids"] preparing_jids.discard(from_jid) log.debug( f"[{client.profile}] received call data for {from_jid}.\n" f"{preparing_jids=}" ) muji_data["to_call"].add(from_jid) self.try_to_finish_preparation(client, room, muji_data) def try_to_finish_preparation( self, client: SatXMPPEntity, room: muc.Room, muji_data: dict ) -> None: """Finish preparation if possible. This method checks if preparations of other JIDs needs to be waited, and if not, finishes our own preparation. """ preparing_jids = muji_data.get("preparing_jids") if not preparing_jids: # No preparation left to wait, we can finish our own. muji_data = self.get_muji_data(room) muji_data["done_preparing"] = True log.debug(f"[{client.profile}] Done preparing.") # We ask frontend to initiate the session, so we know supported codecs. self.host.bridge.call_group_setup( room.roomJID.full(), data_format.serialise( {"to_call": [entity.full() for entity in muji_data["to_call"]]} ), client.profile, ) async def call_group_data_set( self, client: SatXMPPEntity, room_jid: jid.JID, call_data: dict, ) -> None: """Called when frontends has prepared group call. Group call data will be advertised on the MUC, and call will be initiated with all participant which where in preparing state when we made our own preparation. @param client: SatXMPPEntity instance. @param room_jid: JID of the room used for MUJI coordination. @param call_data: call data similar to the one used in ``XEP-0167.call_start``. """ try: room = self._muc.get_room(client, room_jid) except exceptions.NotFound: log.warning(f"Ignoring MUJI element from an unknown room: {room_jid}") return sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR) presence_elt, muji_elt = self.generate_presence_and_muji(client, room) for media_type, media_data in sdp_data.items(): if media_type in ["audio", "video"]: application_data = media_data["application_data"] content_elt = muji_elt.addElement("content") # XXX: the initiator will be actually the last to join, but this attribute # will be ignored anyway. content_elt["creator"] = self._j.ROLE_INITIATOR content_elt["name"] = media_data["id"] description_elt = mapping.build_description( media_type, application_data, {} ) content_elt.addChild(description_elt) # we only want to keep payload types to_remove = [] for child_elt in description_elt.children: if child_elt.name != "payload-type": to_remove.append(child_elt) for elt in to_remove: description_elt.children.remove(elt) await client.a_send(presence_elt) def get_muji_data(self, room: muc.Room) -> dict: """Get MUJI related data for this room MUJI data is stored in the room object, so it will be deleted when the room object itself will be deleted. """ try: return room._xep_0272_data except AttributeError: data = room._xep_0272_data = {"preparing_jids": set(), "to_call": set()} return data def generate_presence_and_muji( self, client: SatXMPPEntity, room: muc.Room ) -> tuple[domish.Element, domish.Element]: """Generate a <presence> stanza with MUJI element""" presence_elt = domish.Element((None, "presence")) presence_elt["from"] = client.jid.full() presence_elt["to"] = room.roomJID.full() muji_elt = presence_elt.addElement((NS_MUJI, "muji")) return presence_elt, muji_elt async def start_preparation(self, client: SatXMPPEntity, room: muc.Room) -> None: """Start preparation of MUJI""" presence_elt, muji_elt = self.generate_presence_and_muji(client, room) muji_elt.addElement("preparing") await client.a_send(presence_elt) async def call_group_start( self, client: SatXMPPEntity, entities: list[jid.JID], extra: dict, ) -> dict: """Initiate a group call with the given peers. A MUC room will be created, and people in ``list_entities`` will be invited. MUJI session will then been started which each of them upon they arrival. @param entities: JID of the peer to initiate a call session with. @param extra: Extra data. @return: group call data, with the following keys: ``room_jid`` MUC room where the MUJI coordination is done. It may also be used for normal chatting. @raise exceptions.ExternalRequestError: The MUC room can't be created or joined. """ log.debug(f"{client.profile} is starting a MUJI group call with {entities}") room_jid = self._muc.get_unique_name(client, prefix="_muji_") room = await self._muc.join(client, room_jid) log.info(f"[{client.profile}] MUJI room created at {room_jid}") if not room: raise exceptions.ExternalRequestError("Can't create or join group chat room.") await self.start_preparation(client, room) room.on_joined_callbacks.append(self.on_room_join) room.on_left_callbacks.append(self.on_room_left) for entity in entities: self._muc_invite.invite( client, entity, room_jid, reason="You have been invited to participate in a group call.", ) return {"room_jid": room_jid.full()} @implementer(iwokkel.IDisco) class XEP_0272_handler(XMPPHandler): def __init__(self, plugin_parent): self.plugin_parent = plugin_parent def connectionInitialized(self): self.xmlstream.addObserver( PRESENCE_MUJI, self.plugin_parent.on_muji_request, client=self.parent ) def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_MUJI)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []