comparison libervia/backend/plugins/plugin_xep_0272.py @ 4245:a7d4007a8fa5

plugin XEP-0272: implement XEP-0272: Multiparty Jingle (Muji) rel 429
author Goffi <goffi@goffi.org>
date Wed, 15 May 2024 17:34:46 +0200
parents
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4244:05f01ac1d5b2 4245:a7d4007a8fa5
1 #!/usr/bin/env python3
2
3 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
4
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 from twisted.internet import defer
19 from twisted.words.protocols.jabber import jid
20 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
21 from twisted.words.xish import domish
22 from wokkel import disco, iwokkel
23 from wokkel import muc
24 from zope.interface import implementer
25
26 from libervia.backend.core import exceptions
27 from libervia.backend.core.constants import Const as C
28 from libervia.backend.core.core_types import SatXMPPEntity
29 from libervia.backend.core.i18n import _
30 from libervia.backend.core.log import getLogger
31 from libervia.backend.plugins import plugin_xep_0166
32 from libervia.backend.plugins import plugin_xep_0167
33 from libervia.backend.plugins.plugin_xep_0167 import mapping
34 from libervia.backend.tools.common import data_format
35
36 from . import plugin_xep_0045, plugin_xep_0249
37
38 log = getLogger(__name__)
39
40
41 PLUGIN_INFO = {
42 C.PI_NAME: "Multiparty Jingle (Muji)",
43 C.PI_IMPORT_NAME: "XEP-0272",
44 C.PI_TYPE: "XEP",
45 C.PI_PROTOCOLS: ["XEP-0272"],
46 C.PI_DEPENDENCIES: ["XEP-0045", "XEP-0166", "XEP-0167", "XEP-0249"],
47 C.PI_MAIN: "XEP_0272",
48 C.PI_HANDLER: "yes",
49 C.PI_DESCRIPTION: _(
50 "Allow to run A/V conference with several participant using P2P connections. "
51 "The number of participant must not be to big to make it work correctly."
52 ),
53 }
54
55 NS_MUJI = "http://telepathy.freedesktop.org/muji"
56 PRESENCE_MUJI = f'/presence/muji[@xmlns="{NS_MUJI}"]'
57
58
59 class XEP_0272:
60
61 def __init__(self, host):
62 log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
63 self.host = host
64 host.register_namespace("muji", NS_MUJI)
65 self._muc: plugin_xep_0045.XEP_0045 = host.plugins["XEP-0045"]
66 self._muc_invite: plugin_xep_0249.XEP_0249 = host.plugins["XEP-0249"]
67 self._j: plugin_xep_0166.XEP_0166 = host.plugins["XEP-0166"]
68 self._rtp: plugin_xep_0167.XEP_0167 = host.plugins["XEP-0167"]
69 host.bridge.add_method(
70 "call_group_start",
71 ".plugin",
72 in_sign="asss",
73 out_sign="s",
74 method=self._call_group_start,
75 async_=True,
76 )
77 host.bridge.add_method(
78 "call_group_data_set",
79 ".plugin",
80 in_sign="sss",
81 out_sign="",
82 method=self._call_group_data_set,
83 async_=True,
84 )
85 host.bridge.add_signal("call_group_setup", ".plugin", signature="sss")
86
87 def get_handler(self, client):
88 return XEP_0272_handler(self)
89
90 def _call_group_start(
91 self,
92 entities_s: str,
93 extra_s: str,
94 profile_key: str,
95 ) -> defer.Deferred[str]:
96 client = self.host.get_client(profile_key)
97 d = defer.ensureDeferred(
98 self.call_group_start(
99 client, [jid.JID(e) for e in entities_s], data_format.deserialise(extra_s)
100 )
101 )
102 d.addCallback(data_format.serialise)
103 return d
104
105 def _call_group_data_set(
106 self,
107 room_jid_s: str,
108 call_data_s: str,
109 profile_key: str,
110 ) -> None:
111 client = self.host.get_client(profile_key)
112 defer.ensureDeferred(
113 self.call_group_data_set(
114 client, jid.JID(room_jid_s), data_format.deserialise(call_data_s)
115 )
116 )
117
118 async def on_room_join(self, room: muc.Room, user: muc.User) -> None:
119 pass
120
121 async def on_room_left(self, room: muc.Room, user: muc.User) -> None:
122 pass
123
124 def on_muji_request(
125 self, presence_elt: domish.Element, client: SatXMPPEntity
126 ) -> None:
127 from_jid = jid.JID(presence_elt["from"])
128 room_jid = from_jid.userhostJID()
129 try:
130 room = self._muc.get_room(client, room_jid)
131 except exceptions.NotFound:
132 log.warning(
133 f"Ignoring MUJI element from an unknown room: {presence_elt.toXml()}"
134 )
135 return
136 if from_jid == self._muc.get_room_user_jid(client, room_jid):
137 own_jid = True
138 else:
139 own_jid = False
140 muji_data = self.get_muji_data(room)
141 muji_elt = presence_elt.muji
142 assert muji_elt is not None
143 try:
144 next(muji_elt.elements(NS_MUJI, "preparing"))
145 except StopIteration:
146 preparing_state = False
147 else:
148 preparing_state = True
149
150 if preparing_state:
151 if own_jid:
152 # we have received the broadcast of our own preparation message
153 muji_data["done_collecting"] = True
154 self.try_to_finish_preparation(client, room, muji_data)
155 elif not muji_data.get("done_collecting", False):
156 # other entities currently doing preparation
157 preparing_jids = muji_data["preparing_jids"]
158 preparing_jids.add(from_jid)
159 elif not own_jid:
160 done_preparing = muji_data.get("done_preparing", False)
161 # if we are still in preparation, we remove the JID from data data we are
162 # still waiting for, and we check if we can finish the preparation.
163 if not done_preparing:
164 allowed_payloads = muji_data.setdefault("allowed_payloads")
165 # TODO: check allowed_payloads
166 preparing_jids = muji_data["preparing_jids"]
167 preparing_jids.discard(from_jid)
168 log.debug(
169 f"[{client.profile}] received call data for {from_jid}.\n"
170 f"{preparing_jids=}"
171 )
172 muji_data["to_call"].add(from_jid)
173 self.try_to_finish_preparation(client, room, muji_data)
174
175 def try_to_finish_preparation(
176 self, client: SatXMPPEntity, room: muc.Room, muji_data: dict
177 ) -> None:
178 """Finish preparation if possible.
179
180 This method checks if preparations of other JIDs needs to be waited, and if not,
181 finishes our own preparation.
182 """
183 preparing_jids = muji_data.get("preparing_jids")
184 if not preparing_jids:
185 # No preparation left to wait, we can finish our own.
186 muji_data = self.get_muji_data(room)
187 muji_data["done_preparing"] = True
188
189 log.debug(f"[{client.profile}] Done preparing.")
190
191 # We ask frontend to initiate the session, so we know supported codecs.
192 self.host.bridge.call_group_setup(
193 room.roomJID.full(),
194 data_format.serialise({
195 "to_call": [entity.full() for entity in muji_data["to_call"]]
196 }),
197 client.profile,
198 )
199
200 async def call_group_data_set(
201 self,
202 client: SatXMPPEntity,
203 room_jid: jid.JID,
204 call_data: dict,
205 ) -> None:
206 """Called when frontends has prepared group call.
207
208 Group call data will be advertised on the MUC, and call will be initiated with all
209 participant which where in preparing state when we made our own preparation.
210
211 @param client: SatXMPPEntity instance.
212 @param room_jid: JID of the room used for MUJI coordination.
213 @param call_data: call data similar to the one used in ``XEP-0167.call_start``.
214 """
215 try:
216 room = self._muc.get_room(client, room_jid)
217 except exceptions.NotFound:
218 log.warning(
219 f"Ignoring MUJI element from an unknown room: {room_jid}"
220 )
221 return
222 sdp_data = mapping.parse_sdp(call_data["sdp"], self._j.ROLE_INITIATOR)
223 presence_elt, muji_elt = self.generate_presence_and_muji(client, room)
224 for media_type, media_data in sdp_data.items():
225 if media_type in ["audio", "video"]:
226 application_data = media_data["application_data"]
227 content_elt = muji_elt.addElement("content")
228 # XXX: the initiator will be actually the last to join, but this attribute
229 # will be ignored anyway.
230 content_elt["creator"] = self._j.ROLE_INITIATOR
231 content_elt["name"] = media_data["id"]
232 description_elt = mapping.build_description(
233 media_type, application_data, {}
234 )
235 content_elt.addChild(description_elt)
236
237 # we only want to keep payload types
238 to_remove = []
239 for child_elt in description_elt.children:
240 if child_elt.name != "payload-type":
241 to_remove.append(child_elt)
242 for elt in to_remove:
243 description_elt.children.remove(elt)
244
245 await client.a_send(presence_elt)
246
247 def get_muji_data(self, room: muc.Room) -> dict:
248 """Get MUJI related data for this room
249
250 MUJI data is stored in the room object, so it will be deleted when the room object
251 itself will be deleted.
252 """
253 try:
254 return room._xep_0272_data
255 except AttributeError:
256 data = room._xep_0272_data = {
257 "preparing_jids": set(),
258 "to_call": set()
259 }
260 return data
261
262 def generate_presence_and_muji(
263 self, client: SatXMPPEntity, room: muc.Room
264 ) -> tuple[domish.Element, domish.Element]:
265 """Generate a <presence> stanza with MUJI element"""
266 presence_elt = domish.Element((None, "presence"))
267 presence_elt["from"] = client.jid.full()
268 presence_elt["to"] = room.roomJID.full()
269 muji_elt = presence_elt.addElement((NS_MUJI, "muji"))
270 return presence_elt, muji_elt
271
272 async def start_preparation(self, client: SatXMPPEntity, room: muc.Room) -> None:
273 """Start preparation of MUJI"""
274 presence_elt, muji_elt = self.generate_presence_and_muji(client, room)
275 muji_elt.addElement("preparing")
276 await client.a_send(presence_elt)
277
278 async def call_group_start(
279 self,
280 client: SatXMPPEntity,
281 entities: list[jid.JID],
282 extra: dict,
283 ) -> dict:
284 """Initiate a group call with the given peers.
285
286 A MUC room will be created, and people in ``list_entities`` will be invited. MUJI
287 session will then been started which each of them upon they arrival.
288 @param entities: JID of the peer to initiate a call session with.
289 @param extra: Extra data.
290
291 @return: group call data, with the following keys:
292
293 ``room_jid``
294 MUC room where the MUJI coordination is done. It may also be used for
295 normal chatting.
296
297 @raise exceptions.ExternalRequestError: The MUC room can't be created or joined.
298 """
299 log.debug(f"{client.profile} is starting a MUJI group call with {entities}")
300 room_jid = self._muc.get_unique_name(client, prefix="_muji_")
301 room = await self._muc.join(client, room_jid)
302 log.info(f"[{client.profile}] MUJI room created at {room_jid}")
303 if not room:
304 raise exceptions.ExternalRequestError("Can't create or join group chat room.")
305 await self.start_preparation(client, room)
306 room.on_joined_callbacks.append(self.on_room_join)
307 room.on_left_callbacks.append(self.on_room_left)
308 for entity in entities:
309 self._muc_invite.invite(
310 client,
311 entity,
312 room_jid,
313 reason="You have been invited to participate in a group call.",
314 )
315 return {"room_jid": room_jid.full()}
316
317
318 @implementer(iwokkel.IDisco)
319 class XEP_0272_handler(XMPPHandler):
320
321 def __init__(self, plugin_parent):
322 self.plugin_parent = plugin_parent
323
324 def connectionInitialized(self):
325 self.xmlstream.addObserver(
326 PRESENCE_MUJI, self.plugin_parent.on_muji_request, client=self.parent
327 )
328
329 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
330 return [disco.DiscoFeature(NS_MUJI)]
331
332 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
333 return []