Mercurial > libervia-backend
changeset 4245:a7d4007a8fa5
plugin XEP-0272: implement XEP-0272: Multiparty Jingle (Muji)
rel 429
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 15 May 2024 17:34:46 +0200 |
parents | 05f01ac1d5b2 |
children | 5eb13251fd75 |
files | libervia/backend/core/constants.py libervia/backend/plugins/plugin_xep_0045.py libervia/backend/plugins/plugin_xep_0167/__init__.py libervia/backend/plugins/plugin_xep_0249.py libervia/backend/plugins/plugin_xep_0272.py |
diffstat | 5 files changed, 381 insertions(+), 17 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/core/constants.py Sat May 11 13:52:43 2024 +0200 +++ b/libervia/backend/core/constants.py Wed May 15 17:34:46 2024 +0200 @@ -354,7 +354,9 @@ META_TYPE_CONFIRM = "confirm" META_TYPE_FILE = "file" META_TYPE_CALL = "call" + META_TYPE_GROUP_CALL = "group-call" META_TYPE_REMOTE_CONTROL = "remote-control" + META_TYPE_MUC_INVIRATION = "muc-invitation" META_TYPE_OVERWRITE = "overwrite" META_TYPE_NOT_IN_ROSTER_LEAK = "not_in_roster_leak" META_SUBTYPE_CALL_AUDIO = "audio"
--- a/libervia/backend/plugins/plugin_xep_0045.py Sat May 11 13:52:43 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0045.py Wed May 15 17:34:46 2024 +0200 @@ -21,6 +21,7 @@ from typing import Optional import uuid +import shortuuid from twisted.internet import defer from twisted.python import failure from twisted.words.protocols.jabber import jid @@ -80,7 +81,7 @@ self.room = room -class XEP_0045(object): +class XEP_0045: # TODO: handle invitations # FIXME: this plugin need a good cleaning, join method is messy @@ -512,25 +513,33 @@ client = self.host.get_client(profile_key) return self.get_unique_name(client, muc_service or None).full() - def get_unique_name(self, client, muc_service=None): + def get_unique_name( + self, + client: SatXMPPEntity, + muc_service: jid.JID|None = None, + prefix: str = "" + ) -> jid.JID: """Return unique name for a room, avoiding collision - @param muc_service (jid.JID) : leave empty string to use the default service - @return: jid.JID (unique room bare JID) + @param client: Client instance. + @param muc_service: leave empty string to use the default service + @param prefix: prefix to use in room name. + @return: unique room bare JID """ - # TODO: we should use #RFC-0045 10.1.4 when available here - room_name = str(uuid.uuid4()) + room_name = f"{prefix}{shortuuid.uuid()}" if muc_service is None: try: muc_service = client.muc_service except AttributeError: - raise exceptions.NotReady("Main server MUC service has not been checked yet") + raise exceptions.NotReady( + "Main server MUC service has not been checked yet" + ) if muc_service is None: log.warning(_("No MUC service found on main server")) raise exceptions.FeatureNotFound muc_service = muc_service.userhost() - return jid.JID("{}@{}".format(room_name, muc_service)) + return jid.JID(f"{room_name}@{muc_service}") def get_default_muc(self): """Return the default MUC. @@ -600,6 +609,8 @@ self._join_eb(failure.Failure(e), client, room_jid, nick, password) ) else: + room.on_joined_callbacks = [] + room.on_left_callbacks = [] await defer.ensureDeferred( self._join_cb(room, client, room_jid, nick) ) @@ -1258,6 +1269,8 @@ else: if not room.fully_joined.called: return + for cb in room.on_joined_callbacks: + defer.ensureDeferred(cb(room, user)) try: self._changing_nicks.remove(user.nick) except KeyError: @@ -1313,6 +1326,8 @@ if not room.fully_joined.called: return log.debug(_("user {nick} left room {room_id}").format(nick=user.nick, room_id=room.occupantJID.userhost())) + for cb in room.on_left_callbacks: + defer.ensureDeferred(cb(room, user)) extra = {'info_type': ROOM_USER_LEFT, 'user_affiliation': user.affiliation, 'user_role': user.role,
--- a/libervia/backend/plugins/plugin_xep_0167/__init__.py Sat May 11 13:52:43 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py Wed May 15 17:34:46 2024 +0200 @@ -17,7 +17,6 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional -import uuid from twisted.internet import reactor from twisted.internet import defer @@ -579,7 +578,7 @@ elif action == self._j.A_PREPARE_INITIATOR: application_data["peer_data"] = mapping.parse_description(desc_elt) elif action == self._j.A_SESSION_ACCEPT: - pass # self.send_answer_sdp(client, session) + self.send_answer_sdp(client, session) else: log.warning(f"FIXME: unmanaged action {action}")
--- a/libervia/backend/plugins/plugin_xep_0249.py Sat May 11 13:52:43 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0249.py Wed May 15 17:34:46 2024 +0200 @@ -25,6 +25,7 @@ from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.tools import xml_tools @@ -57,7 +58,7 @@ } -class XEP_0249(object): +class XEP_0249: params = """ <params> @@ -112,12 +113,20 @@ client = self.host.get_client(profile_key) self.invite(client, jid.JID(guest_jid_s), jid.JID(room_jid_s, options)) - def invite(self, client, guest, room, options={}): + def invite( + self, + client: SatXMPPEntity, + guest: jid.JID, + room: jid.JID, + # the dict is only used internally, so we can safely use a default dict instead of + # None here. + **options + ) -> None: """Invite a user to a room - @param guest(jid.JID): jid of the user to invite - @param room(jid.JID): jid of the room where the user is invited - @param options(dict): attribute with extra info (reason, password) as in #XEP-0249 + @param guest: jid of the user to invite + @param room: jid of the room where the user is invited + @param options: attribute with extra info (reason, password) as in #XEP-0249 """ message = domish.Element((None, "message")) message["to"] = guest.full() @@ -128,7 +137,6 @@ log.warning("Ignoring invalid invite option: {}".format(key)) continue x_elt[key] = value - # there is not body in this message, so we can use directly send() client.send(message) def _accept(self, room_jid, profile_key=C.PROF_KEY_NONE): @@ -188,13 +196,20 @@ title = D_("MUC invitation") xml_tools.quick_note(self.host, client, msg, title, C.XMLUI_DATA_LVL_INFO) else: # leave the default value here + action_extra = { + "type": C.META_TYPE_CONFIRM, + "subtype": C.META_TYPE_MUC_INVIRATION, + "from_jid": from_jid_s, + "room_jid": room_jid_s + } confirm_msg = D_( "You have been invited by %(user)s to join the room %(room)s. " "Do you accept?" ) % {"user": from_jid_s, "room": room_jid_s} confirm_title = D_("MUC invitation") d = xml_tools.defer_confirm( - self.host, confirm_msg, confirm_title, profile=client.profile + self.host, confirm_msg, confirm_title, profile=client.profile, + action_extra=action_extra ) def accept_cb(accepted):
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0272.py Wed May 15 17:34:46 2024 +0200 @@ -0,0 +1,333 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from wokkel import muc +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.plugins import plugin_xep_0166 +from libervia.backend.plugins import plugin_xep_0167 +from libervia.backend.plugins.plugin_xep_0167 import mapping +from libervia.backend.tools.common import data_format + +from . import plugin_xep_0045, plugin_xep_0249 + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "Multiparty Jingle (Muji)", + C.PI_IMPORT_NAME: "XEP-0272", + C.PI_TYPE: "XEP", + C.PI_PROTOCOLS: ["XEP-0272"], + C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0166", "XEP-0167", "XEP-0249"], + C.PI_MAIN: "XEP_0272", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _( + "Allow to run A/V conference with several participant using P2P connections. " + "The number of participant must not be to big to make it work correctly." + ), +} + +NS_MUJI = "http://telepathy.freedesktop.org/muji" +PRESENCE_MUJI = f'/presence/muji[@xmlns="{NS_MUJI}"]' + + +class XEP_0272: + + def __init__(self, host): + log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') + self.host = host + host.register_namespace("muji", NS_MUJI) + self._muc: plugin_xep_0045.XEP_0045 = host.plugins["XEP-0045"] + self._muc_invite: plugin_xep_0249.XEP_0249 = host.plugins["XEP-0249"] + self._j: plugin_xep_0166.XEP_0166 = host.plugins["XEP-0166"] + self._rtp: plugin_xep_0167.XEP_0167 = host.plugins["XEP-0167"] + host.bridge.add_method( + "call_group_start", + ".plugin", + in_sign="asss", + out_sign="s", + method=self._call_group_start, + async_=True, + ) + host.bridge.add_method( + "call_group_data_set", + ".plugin", + in_sign="sss", + out_sign="", + method=self._call_group_data_set, + async_=True, + ) + host.bridge.add_signal("call_group_setup", ".plugin", signature="sss") + + def get_handler(self, client): + return XEP_0272_handler(self) + + def _call_group_start( + self, + entities_s: str, + extra_s: str, + profile_key: str, + ) -> defer.Deferred[str]: + client = self.host.get_client(profile_key) + d = defer.ensureDeferred( + self.call_group_start( + client, [jid.JID(e) for e in entities_s], data_format.deserialise(extra_s) + ) + ) + d.addCallback(data_format.serialise) + return d + + def _call_group_data_set( + self, + room_jid_s: str, + call_data_s: str, + profile_key: str, + ) -> None: + client = self.host.get_client(profile_key) + defer.ensureDeferred( + self.call_group_data_set( + client, jid.JID(room_jid_s), data_format.deserialise(call_data_s) + ) + ) + + async def on_room_join(self, room: muc.Room, user: muc.User) -> None: + pass + + async def on_room_left(self, room: muc.Room, user: muc.User) -> None: + pass + + def on_muji_request( + self, presence_elt: domish.Element, client: SatXMPPEntity + ) -> None: + from_jid = jid.JID(presence_elt["from"]) + room_jid = from_jid.userhostJID() + try: + room = self._muc.get_room(client, room_jid) + except exceptions.NotFound: + log.warning( + f"Ignoring MUJI element from an unknown room: {presence_elt.toXml()}" + ) + return + if from_jid == self._muc.get_room_user_jid(client, room_jid): + own_jid = True + else: + own_jid = False + muji_data = self.get_muji_data(room) + muji_elt = presence_elt.muji + assert muji_elt is not None + try: + next(muji_elt.elements(NS_MUJI, "preparing")) + except StopIteration: + preparing_state = False + else: + preparing_state = True + + if preparing_state: + if own_jid: + # we have received the broadcast of our own preparation message + muji_data["done_collecting"] = True + self.try_to_finish_preparation(client, room, muji_data) + elif not muji_data.get("done_collecting", False): + # other entities currently doing preparation + preparing_jids = muji_data["preparing_jids"] + preparing_jids.add(from_jid) + elif not own_jid: + done_preparing = muji_data.get("done_preparing", False) + # if we are still in preparation, we remove the JID from data data we are + # still waiting for, and we check if we can finish the preparation. + if not done_preparing: + allowed_payloads = muji_data.setdefault("allowed_payloads") + # TODO: check allowed_payloads + preparing_jids = muji_data["preparing_jids"] + preparing_jids.discard(from_jid) + log.debug( + f"[{client.profile}] received call data for {from_jid}.\n" + f"{preparing_jids=}" + ) + muji_data["to_call"].add(from_jid) + self.try_to_finish_preparation(client, room, muji_data) + + def try_to_finish_preparation( + self, client: SatXMPPEntity, room: muc.Room, muji_data: dict + ) -> None: + """Finish preparation if possible. + + This method checks if preparations of other JIDs needs to be waited, and if not, + finishes our own preparation. + """ + preparing_jids = muji_data.get("preparing_jids") + if not preparing_jids: + # No preparation left to wait, we can finish our own. + muji_data = self.get_muji_data(room) + muji_data["done_preparing"] = True + + log.debug(f"[{client.profile}] Done preparing.") + + # We ask frontend to initiate the session, so we know supported codecs. + self.host.bridge.call_group_setup( + room.roomJID.full(), + data_format.serialise({ + "to_call": [entity.full() for entity in muji_data["to_call"]] + }), + client.profile, + ) + + async def call_group_data_set( + self, + client: SatXMPPEntity, + room_jid: jid.JID, + call_data: dict, + ) -> None: + """Called when frontends has prepared group call. + + Group call data will be advertised on the MUC, and call will be initiated with all + participant which where in preparing state when we made our own preparation. + + @param client: SatXMPPEntity instance. + @param room_jid: JID of the room used for MUJI coordination. + @param call_data: call data similar to the one used in ``XEP-0167.call_start``. + """ + try: + room = self._muc.get_room(client, room_jid) + except exceptions.NotFound: + log.warning( + f"Ignoring MUJI element from an unknown room: {room_jid}" + ) + return + sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR) + presence_elt, muji_elt = self.generate_presence_and_muji(client, room) + for media_type, media_data in sdp_data.items(): + if media_type in ["audio", "video"]: + application_data = media_data["application_data"] + content_elt = muji_elt.addElement("content") + # XXX: the initiator will be actually the last to join, but this attribute + # will be ignored anyway. + content_elt["creator"] = self._j.ROLE_INITIATOR + content_elt["name"] = media_data["id"] + description_elt = mapping.build_description( + media_type, application_data, {} + ) + content_elt.addChild(description_elt) + + # we only want to keep payload types + to_remove = [] + for child_elt in description_elt.children: + if child_elt.name != "payload-type": + to_remove.append(child_elt) + for elt in to_remove: + description_elt.children.remove(elt) + + await client.a_send(presence_elt) + + def get_muji_data(self, room: muc.Room) -> dict: + """Get MUJI related data for this room + + MUJI data is stored in the room object, so it will be deleted when the room object + itself will be deleted. + """ + try: + return room._xep_0272_data + except AttributeError: + data = room._xep_0272_data = { + "preparing_jids": set(), + "to_call": set() + } + return data + + def generate_presence_and_muji( + self, client: SatXMPPEntity, room: muc.Room + ) -> tuple[domish.Element, domish.Element]: + """Generate a <presence> stanza with MUJI element""" + presence_elt = domish.Element((None, "presence")) + presence_elt["from"] = client.jid.full() + presence_elt["to"] = room.roomJID.full() + muji_elt = presence_elt.addElement((NS_MUJI, "muji")) + return presence_elt, muji_elt + + async def start_preparation(self, client: SatXMPPEntity, room: muc.Room) -> None: + """Start preparation of MUJI""" + presence_elt, muji_elt = self.generate_presence_and_muji(client, room) + muji_elt.addElement("preparing") + await client.a_send(presence_elt) + + async def call_group_start( + self, + client: SatXMPPEntity, + entities: list[jid.JID], + extra: dict, + ) -> dict: + """Initiate a group call with the given peers. + + A MUC room will be created, and people in ``list_entities`` will be invited. MUJI + session will then been started which each of them upon they arrival. + @param entities: JID of the peer to initiate a call session with. + @param extra: Extra data. + + @return: group call data, with the following keys: + + ``room_jid`` + MUC room where the MUJI coordination is done. It may also be used for + normal chatting. + + @raise exceptions.ExternalRequestError: The MUC room can't be created or joined. + """ + log.debug(f"{client.profile} is starting a MUJI group call with {entities}") + room_jid = self._muc.get_unique_name(client, prefix="_muji_") + room = await self._muc.join(client, room_jid) + log.info(f"[{client.profile}] MUJI room created at {room_jid}") + if not room: + raise exceptions.ExternalRequestError("Can't create or join group chat room.") + await self.start_preparation(client, room) + room.on_joined_callbacks.append(self.on_room_join) + room.on_left_callbacks.append(self.on_room_left) + for entity in entities: + self._muc_invite.invite( + client, + entity, + room_jid, + reason="You have been invited to participate in a group call.", + ) + return {"room_jid": room_jid.full()} + + +@implementer(iwokkel.IDisco) +class XEP_0272_handler(XMPPHandler): + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def connectionInitialized(self): + self.xmlstream.addObserver( + PRESENCE_MUJI, self.plugin_parent.on_muji_request, client=self.parent + ) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_MUJI)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []