comparison libervia/backend/plugins/plugin_comp_conferences/__init__.py @ 4286:96fdf4891747

component conferences: fix session management and init: - jingle session management is now correctly handled, with id of the Galène client used as prefix when necessary. - Galène is now run only when component session is started, avoiding systematic run on __init__. - Galène ICE logs are displayed with high verbosity. - Fix disco identity handling rel 447
author Goffi <goffi@goffi.org>
date Mon, 29 Jul 2024 03:29:14 +0200
parents 240d8b7ad906
children a0ed5c976bf8
comparison
equal deleted inserted replaced
4285:f1d0cde61af7 4286:96fdf4891747
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import json 19 import json
20 import os
20 from pathlib import Path 21 from pathlib import Path
21 from urllib.parse import quote 22 from urllib.parse import quote
22 23
23 from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol 24 from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
24 from shortuuid import uuid 25 from shortuuid import uuid
40 from libervia.backend.core.core_types import SatXMPPEntity 41 from libervia.backend.core.core_types import SatXMPPEntity
41 from libervia.backend.core.i18n import _ 42 from libervia.backend.core.i18n import _
42 from libervia.backend.core.log import getLogger 43 from libervia.backend.core.log import getLogger
43 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 44 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
44 from libervia.backend.plugins.plugin_xep_0166 import XEP_0166 45 from libervia.backend.plugins.plugin_xep_0166 import XEP_0166
45 from libervia.backend.plugins.plugin_xep_0167 import mapping 46 from libervia.backend.plugins.plugin_xep_0167 import XEP_0167, mapping, NS_AV_CONFERENCES
46 from libervia.backend.plugins.plugin_xep_0176 import XEP_0176 47 from libervia.backend.plugins.plugin_xep_0176 import XEP_0176
48 from libervia.backend.plugins.plugin_xep_0298 import XEP_0298
47 from libervia.backend.tools.common import async_process, regex 49 from libervia.backend.tools.common import async_process, regex
48 50
49 51
50 log = getLogger(__name__) 52 log = getLogger(__name__)
51 53
52 IMPORT_NAME = "conferences" 54 IMPORT_NAME = "conferences"
55 NAME = "Libervia A/V Conferences"
53 56
54 PLUGIN_INFO = { 57 PLUGIN_INFO = {
55 C.PI_NAME: "A/V Conferences Component", 58 C.PI_NAME: "A/V Conferences Component",
56 C.PI_IMPORT_NAME: IMPORT_NAME, 59 C.PI_IMPORT_NAME: IMPORT_NAME,
57 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 60 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
58 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 61 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
59 C.PI_PROTOCOLS: [], 62 C.PI_PROTOCOLS: [],
60 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"], 63 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176", "XEP-0298"],
61 C.PI_RECOMMENDATIONS: [], 64 C.PI_RECOMMENDATIONS: [],
62 C.PI_MAIN: "ConferencesComponent", 65 C.PI_MAIN: "ConferencesComponent",
63 C.PI_HANDLER: C.BOOL_FALSE, 66 C.PI_HANDLER: C.BOOL_TRUE,
64 C.PI_DESCRIPTION: _( 67 C.PI_DESCRIPTION: _(
65 "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n" 68 "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n"
66 "The Galène project (https://galene.org) is currently the only supported backend." 69 "The Galène project (https://galene.org) is currently the only supported backend."
67 ), 70 ),
68 } 71 }
77 parent: "ConferencesComponent", 80 parent: "ConferencesComponent",
78 group_name: str, 81 group_name: str,
79 data_file: Path, 82 data_file: Path,
80 endpoint: str, 83 endpoint: str,
81 status_url: str, 84 status_url: str,
85 session: dict,
82 ) -> None: 86 ) -> None:
83 self.parent = parent 87 self.parent = parent
84 self.group_name = group_name 88 self.group_name = group_name
85 self.data_file = data_file 89 self.data_file = data_file
86 self.endpoint = endpoint 90 self.endpoint = endpoint
87 self.status_url = status_url 91 self.status_url = status_url
88 self._protocol: "GaleneProtocol|None" = None 92 self._protocol: "GaleneProtocol|None" = None
89 self.connected = defer.Deferred() 93 self.connected = defer.Deferred()
90 self.ready = defer.Deferred() 94 self.ready = defer.Deferred()
95 self.session = session
91 96
92 def __str__(self): 97 def __str__(self):
93 return f"conference {self.group_name!r}" 98 return f"conference {self.group_name!r}"
94 99
95 @property 100 @property
101 def protocol(self, protocol: "GaleneProtocol") -> None: 106 def protocol(self, protocol: "GaleneProtocol") -> None:
102 self._protocol = protocol 107 self._protocol = protocol
103 self.connected.callback(None) 108 self.connected.callback(None)
104 109
105 @property 110 @property
111 def client_id(self) -> str:
112 return self.protocol.client_id
113
114 @property
106 def _j(self) -> XEP_0166: 115 def _j(self) -> XEP_0166:
107 return self.parent._j 116 return self.parent._j
117
118 @property
119 def _rtp(self) -> XEP_0167:
120 return self.parent._rtp
108 121
109 @property 122 @property
110 def _ice_udp(self) -> XEP_0176: 123 def _ice_udp(self) -> XEP_0176:
111 return self.parent._ice_udp 124 return self.parent._ice_udp
112 125
113 @property 126 @property
114 def client(self) -> SatXMPPEntity: 127 def client(self) -> SatXMPPEntity:
115 client = self.parent.client 128 client = self.parent.client
116 assert client is not None 129 assert client is not None
117 return client 130 return client
131
132 @property
133 def galene_id(self) -> str:
134 """ID to use with Galène API."""
135 return f"{self.client_id}-{self.session['id']}"
136
137 async def on_call_setup(
138 self,
139 client: SatXMPPEntity,
140 session: dict,
141 call_data: dict,
142 ) -> None:
143 if call_data["role"] == self._j.ROLE_INITIATOR:
144 self.send_answer(session["id"], call_data["sdp"])
145 else:
146 raise exceptions.InternalError(
147 '"role" should be "{self._j.ROLE_INITIATOR}" here.'
148 )
118 149
119 def join(self, user_jid: jid.JID) -> None: 150 def join(self, user_jid: jid.JID) -> None:
120 self.protocol.send_data( 151 self.protocol.send_data(
121 { 152 {
122 "type": "join", 153 "type": "join",
125 "username": user_jid.userhost(), 156 "username": user_jid.userhost(),
126 "password": "", 157 "password": "",
127 } 158 }
128 ) 159 )
129 160
130 def send_offer(self, session: dict, sdp: str) -> None: 161 def send_request(self, requested: dict | None = None) -> None:
162 if requested is None:
163 requested = {"": ["audio", "video"]}
164 self.protocol.send_data({"type": "request", "request": requested})
165
166 def send_offer(self, sdp: str) -> None:
131 self.protocol.send_data( 167 self.protocol.send_data(
132 { 168 {
133 "type": "offer", 169 "type": "offer",
134 "id": session["id"], 170 "id": self.galene_id,
135 "label": "camera", 171 "label": "camera",
136 "username": session["peer_jid"].userhost(), 172 "username": self.session["peer_jid"].userhost(),
137 "sdp": sdp, 173 "sdp": sdp,
138 } 174 }
139 ) 175 )
140 176
141 def add_candidate( 177 def send_answer(self, session_id: str, sdp: str) -> None:
142 self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int 178 data_id = session_id[len(self.client_id) + 1 :]
143 ) -> None: 179 self.protocol.send_data(
180 {
181 "type": "answer",
182 "id": data_id,
183 "label": "camera",
184 "username": self.session["peer_jid"].userhost(),
185 "sdp": sdp,
186 }
187 )
188
189 def add_candidate(self, candidate: dict, sdp_mid: str, sdp_mline_index: int) -> None:
144 """Add an ICE candidate. 190 """Add an ICE candidate.
145 191
146 @param session: Jingle session.
147 @param candidate: ICE candidate, SDP format. 192 @param candidate: ICE candidate, SDP format.
148 """ 193 """
149 self.protocol.send_data( 194 self.protocol.send_data(
150 { 195 {
151 "type": "ice", 196 "type": "ice",
152 "id": session["id"], 197 "id": self.galene_id,
153 "candidate": { 198 "candidate": {
154 "candidate": mapping.generate_candidate_line(candidate), 199 "candidate": mapping.generate_candidate_line(candidate),
155 "sdpMid": sdp_mid, 200 "sdpMid": sdp_mid,
156 "sdpMLineIndex": sdp_mline_index, 201 "sdpMLineIndex": sdp_mline_index,
157 }, 202 },
167 log.warning(f"{user_jid} can't join {self}: {data}") 212 log.warning(f"{user_jid} can't join {self}: {data}")
168 case "change": 213 case "change":
169 log.debug(f"Change for {user_jid} in {self}.") 214 log.debug(f"Change for {user_jid} in {self}.")
170 case "leave": 215 case "leave":
171 log.info(f"{user_jid} has left {self}.") 216 log.info(f"{user_jid} has left {self}.")
217 self.send_request()
172 218
173 def on_answer(self, data: dict) -> None: 219 def on_answer(self, data: dict) -> None:
174 """Called when SDP answer has been received 220 """Called when SDP answer has been received
175 221
176 Send the SDP to ``answer_sdp_d`` to continue workflow. 222 Send the SDP to ``answer_sdp_d`` to continue workflow.
177 """ 223 """
178 session = self._j.get_session(self.client, data["id"]) 224 try:
179 try: 225 answer_sdp_d = self.session.pop("answer_sdp_d")
180 answer_sdp_d = session.pop("answer_sdp_d")
181 except KeyError: 226 except KeyError:
182 raise exceptions.InternalError( 227 raise exceptions.InternalError(
183 '"answer_sdp_d" should be available in session.' 228 '"answer_sdp_d" should be available in session.'
184 ) 229 )
185 else: 230 else:
186 answer_sdp_d.callback(data["sdp"]) 231 answer_sdp_d.callback(data["sdp"])
187 232
233 def on_offer(self, data: dict) -> None:
234 """Called when a new SFP offer has been received"""
235 defer.ensureDeferred(self.a_on_offer(data))
236
237 async def a_on_offer(self, data: dict) -> None:
238 client: SatXMPPEntity = self.client.get_virtual_client(self.session["local_jid"])
239 sid = await self._rtp.call_start(
240 client,
241 self.session["peer_jid"],
242 {"sdp": data["sdp"]},
243 session_id=f"{self.client_id}-{data['id']}",
244 )
245 session = self._j.get_session(client, sid)
246 session["call_setup_cb"] = self.on_call_setup
247
188 def on_ice(self, data: dict) -> None: 248 def on_ice(self, data: dict) -> None:
189 log.debug(f"ICE candidate: {data}") 249 if data["id"] == self.galene_id:
190 session = self._j.get_session(self.client, data["id"]) 250 session = self.session
251 else:
252 session = self._j.get_session(self.client, f"{self.client_id}-{data['id']}")
253 log.debug(
254 f"ICE candidate for session {session['id']}, peer_jid={session['peer_jid']}."
255 )
191 candidate_data = data["candidate"] 256 candidate_data = data["candidate"]
192 contents = session["contents"] 257 contents = session["contents"]
193 try: 258 try:
194 content_id = list(contents)[candidate_data["sdpMLineIndex"]] 259 content_id = list(contents)[candidate_data["sdpMLineIndex"]]
195 except IndexError: 260 except IndexError:
197 f"Can't find any content at index {candidate_data['sdpMLineIndex']}." 262 f"Can't find any content at index {candidate_data['sdpMLineIndex']}."
198 ) 263 )
199 return 264 return
200 content = contents[content_id] 265 content = contents[content_id]
201 local_ice_data = content["transport_data"]["local_ice_data"] 266 local_ice_data = content["transport_data"]["local_ice_data"]
267
202 media = content["application_data"]["media"] 268 media = content["application_data"]["media"]
203 client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"]) 269 client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"])
204 candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split()) 270 candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split())
205 defer.ensureDeferred( 271 defer.ensureDeferred(
206 self._ice_udp.ice_candidates_add( 272 self._ice_udp.ice_candidates_add(
223 self.conference = conference 289 self.conference = conference
224 super().__init__(conference.endpoint) 290 super().__init__(conference.endpoint)
225 291
226 292
227 class GaleneProtocol(WebSocketClientProtocol): 293 class GaleneProtocol(WebSocketClientProtocol):
228 verbose = True 294
295 def __init__(self, *args, **kwargs):
296 super().__init__(*args, **kwargs)
297 self.client_id = str(uuid())
229 298
230 @property 299 @property
231 def conference(self) -> Conference: 300 def conference(self) -> Conference:
232 conference: Conference = self.factory.conference 301 conference: Conference = self.factory.conference
233 assert conference is not None 302 assert conference is not None
239 308
240 def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None: 309 def connectionLost(self, reason: failure.Failure = ConnectionDone) -> None:
241 super().connectionLost(reason) 310 super().connectionLost(reason)
242 311
243 def onOpen(self): 312 def onOpen(self):
244 handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())} 313 handshake_data = {"type": "handshake", "version": ["2"], "id": self.client_id}
245 self.send_data(handshake_data) 314 self.send_data(handshake_data)
246 315
247 def send_data(self, data: dict) -> None: 316 def send_data(self, data: dict) -> None:
248 if self.verbose: 317 if ConferencesComponent.verbose:
249 log.debug(f"DATA SENT: {data}") 318 log.debug(f"=> DATA SENT [{self.client_id}]: {data}")
250 self.sendMessage(json.dumps(data).encode()) 319 self.sendMessage(json.dumps(data).encode())
251 320
252 def onMessage(self, payload, isBinary): 321 def onMessage(self, payload, isBinary):
253 if isBinary: 322 if isBinary:
254 raise exceptions.DataError("Unexpected binary payload: {payload!r}") 323 raise exceptions.DataError("Unexpected binary payload: {payload!r}")
256 data = json.loads(payload) 325 data = json.loads(payload)
257 except json.JSONDecodeError: 326 except json.JSONDecodeError:
258 log.warning(f"Can't decode data: {payload!r}") 327 log.warning(f"Can't decode data: {payload!r}")
259 return 328 return
260 329
330 if ConferencesComponent.verbose:
331 log.debug(f"<= DATA RECEIVED [{self.client_id}]: {data}")
261 try: 332 try:
262 match data.get("type"): 333 match data.get("type"):
263 case None: 334 case None:
264 log.warning(f'"type" is missing in data: {data!r}') 335 log.warning(f'"type" is missing in data: {data!r}')
265 case "handshake": 336 case "handshake":
266 version = data["version"][0] 337 version = data["version"][0]
267 338
268 log.debug( 339 log.debug(
269 f"Handshake for group {self.conference.group_name!r}. Galène protocol " 340 f"Handshake for group {self.conference.group_name!r}. Galène "
270 f" v{version}." 341 f"protocol v{version}."
271 ) 342 )
272 self.conference.ready.callback(None) 343 self.conference.ready.callback(None)
273 case "ping": 344 case "ping":
274 log.debug("pong") 345 log.debug("pong")
275 self.send_data({"type": "pong"}) 346 self.send_data({"type": "pong"})
276 case "joined" | "answer" | "ice" as data_type: 347 case "joined" | "answer" | "ice" | "offer" as data_type:
277 method = getattr(self.conference, f"on_{data_type}") 348 method = getattr(self.conference, f"on_{data_type}")
278 method(data) 349 method(data)
279 case _: 350 case _:
280 log.debug(f"Unhandled message: {data}") 351 log.debug(f"Unhandled message: {data}")
281 except (KeyError, IndexError): 352 except (KeyError, IndexError):
282 log.exception(f"Unexpected data format: {data!r}") 353 log.exception(f"Unexpected data format: {data!r}")
283 354
284 355
285 class ConferencesComponent: 356 class ConferencesComponent:
286 IMPORT_NAME = IMPORT_NAME 357 IMPORT_NAME = IMPORT_NAME
358 verbose = 0
287 359
288 def __init__(self, host): 360 def __init__(self, host):
289 self.host = host 361 self.host = host
290 self.client: SatXMPPEntity | None = None 362 self.client: SatXMPPEntity | None = None
291 self._e: XEP_0106 = host.plugins["XEP-0106"] 363 self._e: XEP_0106 = host.plugins["XEP-0106"]
292 self._j: XEP_0166 = host.plugins["XEP-0166"] 364 self._j: XEP_0166 = host.plugins["XEP-0166"]
365 self._rtp: XEP_0167 = host.plugins["XEP-0167"]
293 self._ice_udp: XEP_0176 = host.plugins["XEP-0176"] 366 self._ice_udp: XEP_0176 = host.plugins["XEP-0176"]
367 self._coin: XEP_0298 = host.plugins["XEP-0298"]
294 host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) 368 host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)
369 self.initalized = False
370
371 def _init(self) -> None:
372 """Initialisation done after profile is connected"""
373 assert self.client is not None
374 self.client.identities.append(
375 disco.DiscoIdentity("conference", "audio-video", NAME)
376 )
377
295 try: 378 try:
296 galene_path = Path( 379 galene_path = Path(
297 self.host.memory.config_get(CONF_SECTION, "galene_path") 380 self.host.memory.config_get(CONF_SECTION, "galene_path")
298 or which("galene")[0] 381 or which("galene")[0]
299 ) 382 )
304 'or use "galene_path" setting.' 387 'or use "galene_path" setting.'
305 ) 388 )
306 self.galene_http_port = self.host.memory.config_get( 389 self.galene_http_port = self.host.memory.config_get(
307 CONF_SECTION, "http_port", "9443" 390 CONF_SECTION, "http_port", "9443"
308 ) 391 )
309 galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene") 392 galene_data_path = self.host.memory.get_cache_path(IMPORT_NAME, "galene")
310 galene_static_path = galene_path.parent / "static" 393 galene_static_path = galene_path.parent / "static"
311 self.galene_group_path = galene_data_path / "groups" 394 self.galene_group_path = galene_data_path / "groups"
312 self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True) 395 self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True)
396 env = os.environ
397 if self.verbose >= 2:
398 # We activate Galène debug logs on ICE candidates.
399 env["PION_LOG_TRACE"] = "ice"
400
313 try: 401 try:
314 d = self._process = async_process.run( 402 d = self._process = async_process.run(
315 str(galene_path), 403 str(galene_path),
316 "-static", 404 "-static",
317 str(galene_static_path), 405 str(galene_static_path),
318 "-http", 406 "-http",
319 f"127.0.0.1:{self.galene_http_port}", 407 f"127.0.0.1:{self.galene_http_port}",
320 # We don't want HTTPS here, it's only used for local interactions 408 # We don't want HTTPS here, it's only used for local interactions
321 "-insecure", 409 "-insecure",
322 path=str(galene_data_path), 410 path=str(galene_data_path),
323 verbose=True, 411 verbose=bool(self.verbose),
412 env=env,
324 ) 413 )
325 except Exception: 414 except Exception:
326 log.exception("Can't start Galene.") 415 log.exception("Can't start Galene.")
327 else: 416 else:
328 d.addErrback(self._galene_process_errback) 417 d.addErrback(self._galene_process_errback)
329 log.info(f"Galene instance started on port {self.galene_http_port}.") 418 log.info(f"Galene instance started on port {self.galene_http_port}.")
330 419
331 def get_handler(self, __): 420 def get_handler(self, __):
332 return ConferencesHandler() 421 return ConferencesHandler()
333 422
334 def profile_connecting(self, client): 423 def profile_connecting(self, client: SatXMPPEntity) -> None:
335 self.client = client 424 self.client = client
425 if not self.initalized:
426 self._init()
427 self.initalized = True
336 428
337 async def attach_to_group(self, session: dict, group_name: str) -> Conference: 429 async def attach_to_group(self, session: dict, group_name: str) -> Conference:
338 """Attach to a Galène group. 430 """Attach to a Galène group.
339 431
340 Create a group data file if it doesn't exist. 432 Create a group data file if it doesn't exist.
370 parent=self, 462 parent=self,
371 group_name=group_name, 463 group_name=group_name,
372 data_file=data_file, 464 data_file=data_file,
373 endpoint=endpoint, 465 endpoint=endpoint,
374 status_url=status_url, 466 status_url=status_url,
467 session=session,
375 ) 468 )
376 469
377 factory = GaleneClientFactory(conference) 470 factory = GaleneClientFactory(conference)
378 # factory.setProtocolOptions(logOctets=True) 471 # factory.setProtocolOptions(logOctets=True)
379 factory.protocol = GaleneProtocol 472 factory.protocol = GaleneProtocol
416 try: 509 try:
417 conference = session["conference"] 510 conference = session["conference"]
418 except KeyError: 511 except KeyError:
419 raise exceptions.InternalError("Conference data is missing.") 512 raise exceptions.InternalError("Conference data is missing.")
420 513
421 await conference.ready 514 if call_data["role"] == self._j.ROLE_RESPONDER:
422 conference.join(session["peer_jid"]) 515 await conference.ready
423 conference.send_offer(session, call_data["sdp"]) 516 conference.join(session["peer_jid"])
517 conference.send_offer(call_data["sdp"])
518 else:
519 raise exceptions.InternalError(
520 '"role" should be "{self._j.ROLE_RESPONDER}" here.'
521 )
424 522
425 def on_ice_candidates_new( 523 def on_ice_candidates_new(
426 self, 524 self,
427 client: SatXMPPEntity, 525 client: SatXMPPEntity,
428 session: dict, 526 session: dict,
441 continue 539 continue
442 sdp_mline_index: int = idx 540 sdp_mline_index: int = idx
443 sdp_mid: str = content_id 541 sdp_mid: str = content_id
444 542
445 for candidate in media_data["candidates"]: 543 for candidate in media_data["candidates"]:
446 conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index) 544 conference.add_candidate(candidate, sdp_mid, sdp_mline_index)
447 545
448 def _galene_process_errback(self, failure_: Failure) -> None: 546 def _galene_process_errback(self, failure_: Failure) -> None:
449 log.error(f"Can't run Galene process: {failure_.value}") 547 log.error(f"Can't run Galene process: {failure_.value}")
450 548
451 549
550
452 @implementer(iwokkel.IDisco) 551 @implementer(iwokkel.IDisco)
453 class ConferencesHandler(XMPPHandler): 552 class ConferencesHandler(XMPPHandler):
454 553
455 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): 554 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
456 return [disco.DiscoIdentity("conference", "audio-video")] 555 return [disco.DiscoFeature(NS_AV_CONFERENCES)]