# HG changeset patch # User Goffi # Date 1722216554 -7200 # Node ID 96fdf4891747518c2d02cb48af38bf837e63172d # Parent f1d0cde61af7c988bf5a8686e6f5190e2d178100 component conferences: fix session management and init: - jingle session management is now correctly handled, with id of the Galène client used as prefix when necessary. - Galène is now run only when component session is started, avoiding systematic run on __init__. - Galène ICE logs are displayed with high verbosity. - Fix disco identity handling rel 447 diff -r f1d0cde61af7 -r 96fdf4891747 libervia/backend/core/core_types.py --- a/libervia/backend/core/core_types.py Sun Jul 14 17:42:53 2024 +0200 +++ b/libervia/backend/core/core_types.py Mon Jul 29 03:29:14 2024 +0200 @@ -23,6 +23,7 @@ from twisted.words.protocols.jabber import jid as t_jid from twisted.words.protocols.jabber import xmlstream from twisted.words.xish import domish +from wokkel import disco class SatXMPPEntity: @@ -33,6 +34,7 @@ is_component: bool server_jid: t_jid.JID IQ: Callable[[Optional[str], Optional[int]], xmlstream.IQ] + identities: list[disco.DiscoIdentity] EncryptionPlugin = namedtuple( diff -r f1d0cde61af7 -r 96fdf4891747 libervia/backend/plugins/plugin_comp_conferences/__init__.py --- a/libervia/backend/plugins/plugin_comp_conferences/__init__.py Sun Jul 14 17:42:53 2024 +0200 +++ b/libervia/backend/plugins/plugin_comp_conferences/__init__.py Mon Jul 29 03:29:14 2024 +0200 @@ -17,6 +17,7 @@ # along with this program. If not, see . import json +import os from pathlib import Path from urllib.parse import quote @@ -42,14 +43,16 @@ from libervia.backend.core.log import getLogger from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.plugins.plugin_xep_0166 import XEP_0166 -from libervia.backend.plugins.plugin_xep_0167 import mapping +from libervia.backend.plugins.plugin_xep_0167 import XEP_0167, mapping, NS_AV_CONFERENCES from libervia.backend.plugins.plugin_xep_0176 import XEP_0176 +from libervia.backend.plugins.plugin_xep_0298 import XEP_0298 from libervia.backend.tools.common import async_process, regex log = getLogger(__name__) IMPORT_NAME = "conferences" +NAME = "Libervia A/V Conferences" PLUGIN_INFO = { C.PI_NAME: "A/V Conferences Component", @@ -57,10 +60,10 @@ C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], - C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"], + C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176", "XEP-0298"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "ConferencesComponent", - C.PI_HANDLER: C.BOOL_FALSE, + C.PI_HANDLER: C.BOOL_TRUE, C.PI_DESCRIPTION: _( "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n" "The Galène project (https://galene.org) is currently the only supported backend." @@ -79,6 +82,7 @@ data_file: Path, endpoint: str, status_url: str, + session: dict, ) -> None: self.parent = parent self.group_name = group_name @@ -88,6 +92,7 @@ self._protocol: "GaleneProtocol|None" = None self.connected = defer.Deferred() self.ready = defer.Deferred() + self.session = session def __str__(self): return f"conference {self.group_name!r}" @@ -103,10 +108,18 @@ self.connected.callback(None) @property + def client_id(self) -> str: + return self.protocol.client_id + + @property def _j(self) -> XEP_0166: return self.parent._j @property + def _rtp(self) -> XEP_0167: + return self.parent._rtp + + @property def _ice_udp(self) -> XEP_0176: return self.parent._ice_udp @@ -116,6 +129,24 @@ assert client is not None return client + @property + def galene_id(self) -> str: + """ID to use with Galène API.""" + return f"{self.client_id}-{self.session['id']}" + + async def on_call_setup( + self, + client: SatXMPPEntity, + session: dict, + call_data: dict, + ) -> None: + if call_data["role"] == self._j.ROLE_INITIATOR: + self.send_answer(session["id"], call_data["sdp"]) + else: + raise exceptions.InternalError( + '"role" should be "{self._j.ROLE_INITIATOR}" here.' + ) + def join(self, user_jid: jid.JID) -> None: self.protocol.send_data( { @@ -127,29 +158,43 @@ } ) - def send_offer(self, session: dict, sdp: str) -> None: + def send_request(self, requested: dict | None = None) -> None: + if requested is None: + requested = {"": ["audio", "video"]} + self.protocol.send_data({"type": "request", "request": requested}) + + def send_offer(self, sdp: str) -> None: self.protocol.send_data( { "type": "offer", - "id": session["id"], + "id": self.galene_id, "label": "camera", - "username": session["peer_jid"].userhost(), + "username": self.session["peer_jid"].userhost(), "sdp": sdp, } ) - def add_candidate( - self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int - ) -> None: + def send_answer(self, session_id: str, sdp: str) -> None: + data_id = session_id[len(self.client_id) + 1 :] + self.protocol.send_data( + { + "type": "answer", + "id": data_id, + "label": "camera", + "username": self.session["peer_jid"].userhost(), + "sdp": sdp, + } + ) + + def add_candidate(self, candidate: dict, sdp_mid: str, sdp_mline_index: int) -> None: """Add an ICE candidate. - @param session: Jingle session. @param candidate: ICE candidate, SDP format. """ self.protocol.send_data( { "type": "ice", - "id": session["id"], + "id": self.galene_id, "candidate": { "candidate": mapping.generate_candidate_line(candidate), "sdpMid": sdp_mid, @@ -169,15 +214,15 @@ log.debug(f"Change for {user_jid} in {self}.") case "leave": log.info(f"{user_jid} has left {self}.") + self.send_request() def on_answer(self, data: dict) -> None: """Called when SDP answer has been received Send the SDP to ``answer_sdp_d`` to continue workflow. """ - session = self._j.get_session(self.client, data["id"]) try: - answer_sdp_d = session.pop("answer_sdp_d") + answer_sdp_d = self.session.pop("answer_sdp_d") except KeyError: raise exceptions.InternalError( '"answer_sdp_d" should be available in session.' @@ -185,9 +230,29 @@ else: answer_sdp_d.callback(data["sdp"]) + def on_offer(self, data: dict) -> None: + """Called when a new SFP offer has been received""" + defer.ensureDeferred(self.a_on_offer(data)) + + async def a_on_offer(self, data: dict) -> None: + client: SatXMPPEntity = self.client.get_virtual_client(self.session["local_jid"]) + sid = await self._rtp.call_start( + client, + self.session["peer_jid"], + {"sdp": data["sdp"]}, + session_id=f"{self.client_id}-{data['id']}", + ) + session = self._j.get_session(client, sid) + session["call_setup_cb"] = self.on_call_setup + def on_ice(self, data: dict) -> None: - log.debug(f"ICE candidate: {data}") - session = self._j.get_session(self.client, data["id"]) + if data["id"] == self.galene_id: + session = self.session + else: + session = self._j.get_session(self.client, f"{self.client_id}-{data['id']}") + log.debug( + f"ICE candidate for session {session['id']}, peer_jid={session['peer_jid']}." + ) candidate_data = data["candidate"] contents = session["contents"] try: @@ -199,6 +264,7 @@ return content = contents[content_id] local_ice_data = content["transport_data"]["local_ice_data"] + media = content["application_data"]["media"] client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"]) candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split()) @@ -225,7 +291,10 @@ class GaleneProtocol(WebSocketClientProtocol): - verbose = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.client_id = str(uuid()) @property def conference(self) -> Conference: @@ -241,12 +310,12 @@ super().connectionLost(reason) def onOpen(self): - handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())} + handshake_data = {"type": "handshake", "version": ["2"], "id": self.client_id} self.send_data(handshake_data) def send_data(self, data: dict) -> None: - if self.verbose: - log.debug(f"DATA SENT: {data}") + if ConferencesComponent.verbose: + log.debug(f"=> DATA SENT [{self.client_id}]: {data}") self.sendMessage(json.dumps(data).encode()) def onMessage(self, payload, isBinary): @@ -258,6 +327,8 @@ log.warning(f"Can't decode data: {payload!r}") return + if ConferencesComponent.verbose: + log.debug(f"<= DATA RECEIVED [{self.client_id}]: {data}") try: match data.get("type"): case None: @@ -266,14 +337,14 @@ version = data["version"][0] log.debug( - f"Handshake for group {self.conference.group_name!r}. Galène protocol " - f" v{version}." + f"Handshake for group {self.conference.group_name!r}. Galène " + f"protocol v{version}." ) self.conference.ready.callback(None) case "ping": log.debug("pong") self.send_data({"type": "pong"}) - case "joined" | "answer" | "ice" as data_type: + case "joined" | "answer" | "ice" | "offer" as data_type: method = getattr(self.conference, f"on_{data_type}") method(data) case _: @@ -284,14 +355,26 @@ class ConferencesComponent: IMPORT_NAME = IMPORT_NAME + verbose = 0 def __init__(self, host): self.host = host self.client: SatXMPPEntity | None = None self._e: XEP_0106 = host.plugins["XEP-0106"] self._j: XEP_0166 = host.plugins["XEP-0166"] + self._rtp: XEP_0167 = host.plugins["XEP-0167"] self._ice_udp: XEP_0176 = host.plugins["XEP-0176"] + self._coin: XEP_0298 = host.plugins["XEP-0298"] host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) + self.initalized = False + + def _init(self) -> None: + """Initialisation done after profile is connected""" + assert self.client is not None + self.client.identities.append( + disco.DiscoIdentity("conference", "audio-video", NAME) + ) + try: galene_path = Path( self.host.memory.config_get(CONF_SECTION, "galene_path") @@ -306,10 +389,15 @@ self.galene_http_port = self.host.memory.config_get( CONF_SECTION, "http_port", "9443" ) - galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene") + galene_data_path = self.host.memory.get_cache_path(IMPORT_NAME, "galene") galene_static_path = galene_path.parent / "static" self.galene_group_path = galene_data_path / "groups" self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True) + env = os.environ + if self.verbose >= 2: + # We activate Galène debug logs on ICE candidates. + env["PION_LOG_TRACE"] = "ice" + try: d = self._process = async_process.run( str(galene_path), @@ -320,7 +408,8 @@ # We don't want HTTPS here, it's only used for local interactions "-insecure", path=str(galene_data_path), - verbose=True, + verbose=bool(self.verbose), + env=env, ) except Exception: log.exception("Can't start Galene.") @@ -331,8 +420,11 @@ def get_handler(self, __): return ConferencesHandler() - def profile_connecting(self, client): + def profile_connecting(self, client: SatXMPPEntity) -> None: self.client = client + if not self.initalized: + self._init() + self.initalized = True async def attach_to_group(self, session: dict, group_name: str) -> Conference: """Attach to a Galène group. @@ -372,6 +464,7 @@ data_file=data_file, endpoint=endpoint, status_url=status_url, + session=session, ) factory = GaleneClientFactory(conference) @@ -418,9 +511,14 @@ except KeyError: raise exceptions.InternalError("Conference data is missing.") - await conference.ready - conference.join(session["peer_jid"]) - conference.send_offer(session, call_data["sdp"]) + if call_data["role"] == self._j.ROLE_RESPONDER: + await conference.ready + conference.join(session["peer_jid"]) + conference.send_offer(call_data["sdp"]) + else: + raise exceptions.InternalError( + '"role" should be "{self._j.ROLE_RESPONDER}" here.' + ) def on_ice_candidates_new( self, @@ -443,14 +541,15 @@ sdp_mid: str = content_id for candidate in media_data["candidates"]: - conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index) + conference.add_candidate(candidate, sdp_mid, sdp_mline_index) def _galene_process_errback(self, failure_: Failure) -> None: log.error(f"Can't run Galene process: {failure_.value}") + @implementer(iwokkel.IDisco) class ConferencesHandler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): - return [disco.DiscoIdentity("conference", "audio-video")] + return [disco.DiscoFeature(NS_AV_CONFERENCES)] diff -r f1d0cde61af7 -r 96fdf4891747 libervia/backend/plugins/plugin_xep_0167/__init__.py --- a/libervia/backend/plugins/plugin_xep_0167/__init__.py Sun Jul 14 17:42:53 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py Mon Jul 29 03:29:14 2024 +0200 @@ -41,6 +41,7 @@ NS_JINGLE_RTP_AUDIO, NS_JINGLE_RTP_INFO, NS_JINGLE_RTP_VIDEO, + NS_AV_CONFERENCES ) diff -r f1d0cde61af7 -r 96fdf4891747 libervia/backend/plugins/plugin_xep_0167/constants.py --- a/libervia/backend/plugins/plugin_xep_0167/constants.py Sun Jul 14 17:42:53 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/constants.py Mon Jul 29 03:29:14 2024 +0200 @@ -25,3 +25,4 @@ NS_JINGLE_RTP_VIDEO: Final = f"{NS_JINGLE_RTP_BASE}:video" NS_JINGLE_RTP_ERRORS: Final = f"{NS_JINGLE_RTP_BASE}:errors:1" NS_JINGLE_RTP_INFO: Final = f"{NS_JINGLE_RTP_BASE}:info:1" +NS_AV_CONFERENCES: Final = "urn:xmpp:jingle:av-conferences:0"