changeset 4286:96fdf4891747

component conferences: fix session management and init: - jingle session management is now correctly handled, with id of the Galène client used as prefix when necessary. - Galène is now run only when component session is started, avoiding systematic run on __init__. - Galène ICE logs are displayed with high verbosity. - Fix disco identity handling rel 447
author Goffi <goffi@goffi.org>
date Mon, 29 Jul 2024 03:29:14 +0200
parents f1d0cde61af7
children ff88a807852d
files libervia/backend/core/core_types.py libervia/backend/plugins/plugin_comp_conferences/__init__.py libervia/backend/plugins/plugin_xep_0167/__init__.py libervia/backend/plugins/plugin_xep_0167/constants.py
diffstat 4 files changed, 133 insertions(+), 30 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/core/core_types.py	Sun Jul 14 17:42:53 2024 +0200
+++ b/libervia/backend/core/core_types.py	Mon Jul 29 03:29:14 2024 +0200
@@ -23,6 +23,7 @@
 from twisted.words.protocols.jabber import jid as t_jid
 from twisted.words.protocols.jabber import xmlstream
 from twisted.words.xish import domish
+from wokkel import disco
 
 
 class SatXMPPEntity:
@@ -33,6 +34,7 @@
     is_component: bool
     server_jid: t_jid.JID
     IQ: Callable[[Optional[str], Optional[int]], xmlstream.IQ]
+    identities: list[disco.DiscoIdentity]
 
 
 EncryptionPlugin = namedtuple(
--- a/libervia/backend/plugins/plugin_comp_conferences/__init__.py	Sun Jul 14 17:42:53 2024 +0200
+++ b/libervia/backend/plugins/plugin_comp_conferences/__init__.py	Mon Jul 29 03:29:14 2024 +0200
@@ -17,6 +17,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import json
+import os
 from pathlib import Path
 from urllib.parse import quote
 
@@ -42,14 +43,16 @@
 from libervia.backend.core.log import getLogger
 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
 from libervia.backend.plugins.plugin_xep_0166 import XEP_0166
-from libervia.backend.plugins.plugin_xep_0167 import mapping
+from libervia.backend.plugins.plugin_xep_0167 import XEP_0167, mapping, NS_AV_CONFERENCES
 from libervia.backend.plugins.plugin_xep_0176 import XEP_0176
+from libervia.backend.plugins.plugin_xep_0298 import XEP_0298
 from libervia.backend.tools.common import async_process, regex
 
 
 log = getLogger(__name__)
 
 IMPORT_NAME = "conferences"
+NAME = "Libervia A/V Conferences"
 
 PLUGIN_INFO = {
     C.PI_NAME: "A/V Conferences Component",
@@ -57,10 +60,10 @@
     C.PI_MODES: [C.PLUG_MODE_COMPONENT],
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176"],
+    C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0166", "XEP-0167", "XEP-0176", "XEP-0298"],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "ConferencesComponent",
-    C.PI_HANDLER: C.BOOL_FALSE,
+    C.PI_HANDLER: C.BOOL_TRUE,
     C.PI_DESCRIPTION: _(
         "Handle multiparty audio/video conferences, using a Selective Forwarding Unit.\n"
         "The Galène project (https://galene.org) is currently the only supported backend."
@@ -79,6 +82,7 @@
         data_file: Path,
         endpoint: str,
         status_url: str,
+        session: dict,
     ) -> None:
         self.parent = parent
         self.group_name = group_name
@@ -88,6 +92,7 @@
         self._protocol: "GaleneProtocol|None" = None
         self.connected = defer.Deferred()
         self.ready = defer.Deferred()
+        self.session = session
 
     def __str__(self):
         return f"conference {self.group_name!r}"
@@ -103,10 +108,18 @@
         self.connected.callback(None)
 
     @property
+    def client_id(self) -> str:
+        return self.protocol.client_id
+
+    @property
     def _j(self) -> XEP_0166:
         return self.parent._j
 
     @property
+    def _rtp(self) -> XEP_0167:
+        return self.parent._rtp
+
+    @property
     def _ice_udp(self) -> XEP_0176:
         return self.parent._ice_udp
 
@@ -116,6 +129,24 @@
         assert client is not None
         return client
 
+    @property
+    def galene_id(self) -> str:
+        """ID to use with Galène API."""
+        return f"{self.client_id}-{self.session['id']}"
+
+    async def on_call_setup(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        call_data: dict,
+    ) -> None:
+        if call_data["role"] == self._j.ROLE_INITIATOR:
+            self.send_answer(session["id"], call_data["sdp"])
+        else:
+            raise exceptions.InternalError(
+                '"role" should be "{self._j.ROLE_INITIATOR}" here.'
+            )
+
     def join(self, user_jid: jid.JID) -> None:
         self.protocol.send_data(
             {
@@ -127,29 +158,43 @@
             }
         )
 
-    def send_offer(self, session: dict, sdp: str) -> None:
+    def send_request(self, requested: dict | None = None) -> None:
+        if requested is None:
+            requested = {"": ["audio", "video"]}
+        self.protocol.send_data({"type": "request", "request": requested})
+
+    def send_offer(self, sdp: str) -> None:
         self.protocol.send_data(
             {
                 "type": "offer",
-                "id": session["id"],
+                "id": self.galene_id,
                 "label": "camera",
-                "username": session["peer_jid"].userhost(),
+                "username": self.session["peer_jid"].userhost(),
                 "sdp": sdp,
             }
         )
 
-    def add_candidate(
-        self, session: dict, candidate: dict, sdp_mid: str, sdp_mline_index: int
-    ) -> None:
+    def send_answer(self, session_id: str, sdp: str) -> None:
+        data_id = session_id[len(self.client_id) + 1 :]
+        self.protocol.send_data(
+            {
+                "type": "answer",
+                "id": data_id,
+                "label": "camera",
+                "username": self.session["peer_jid"].userhost(),
+                "sdp": sdp,
+            }
+        )
+
+    def add_candidate(self, candidate: dict, sdp_mid: str, sdp_mline_index: int) -> None:
         """Add an ICE candidate.
 
-        @param session: Jingle session.
         @param candidate: ICE candidate, SDP format.
         """
         self.protocol.send_data(
             {
                 "type": "ice",
-                "id": session["id"],
+                "id": self.galene_id,
                 "candidate": {
                     "candidate": mapping.generate_candidate_line(candidate),
                     "sdpMid": sdp_mid,
@@ -169,15 +214,15 @@
                 log.debug(f"Change for {user_jid} in {self}.")
             case "leave":
                 log.info(f"{user_jid} has left {self}.")
+        self.send_request()
 
     def on_answer(self, data: dict) -> None:
         """Called when SDP answer has been received
 
         Send the SDP to ``answer_sdp_d`` to continue workflow.
         """
-        session = self._j.get_session(self.client, data["id"])
         try:
-            answer_sdp_d = session.pop("answer_sdp_d")
+            answer_sdp_d = self.session.pop("answer_sdp_d")
         except KeyError:
             raise exceptions.InternalError(
                 '"answer_sdp_d" should be available in session.'
@@ -185,9 +230,29 @@
         else:
             answer_sdp_d.callback(data["sdp"])
 
+    def on_offer(self, data: dict) -> None:
+        """Called when a new SFP offer has been received"""
+        defer.ensureDeferred(self.a_on_offer(data))
+
+    async def a_on_offer(self, data: dict) -> None:
+        client: SatXMPPEntity = self.client.get_virtual_client(self.session["local_jid"])
+        sid = await self._rtp.call_start(
+            client,
+            self.session["peer_jid"],
+            {"sdp": data["sdp"]},
+            session_id=f"{self.client_id}-{data['id']}",
+        )
+        session = self._j.get_session(client, sid)
+        session["call_setup_cb"] = self.on_call_setup
+
     def on_ice(self, data: dict) -> None:
-        log.debug(f"ICE candidate: {data}")
-        session = self._j.get_session(self.client, data["id"])
+        if data["id"] == self.galene_id:
+            session = self.session
+        else:
+            session = self._j.get_session(self.client, f"{self.client_id}-{data['id']}")
+        log.debug(
+            f"ICE candidate for session {session['id']}, peer_jid={session['peer_jid']}."
+        )
         candidate_data = data["candidate"]
         contents = session["contents"]
         try:
@@ -199,6 +264,7 @@
             return
         content = contents[content_id]
         local_ice_data = content["transport_data"]["local_ice_data"]
+
         media = content["application_data"]["media"]
         client: SatXMPPEntity = self.client.get_virtual_client(session["local_jid"])
         candidate = mapping.parse_candidate(candidate_data["candidate"][10:].split())
@@ -225,7 +291,10 @@
 
 
 class GaleneProtocol(WebSocketClientProtocol):
-    verbose = True
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.client_id = str(uuid())
 
     @property
     def conference(self) -> Conference:
@@ -241,12 +310,12 @@
         super().connectionLost(reason)
 
     def onOpen(self):
-        handshake_data = {"type": "handshake", "version": ["2"], "id": str(uuid())}
+        handshake_data = {"type": "handshake", "version": ["2"], "id": self.client_id}
         self.send_data(handshake_data)
 
     def send_data(self, data: dict) -> None:
-        if self.verbose:
-            log.debug(f"DATA SENT: {data}")
+        if ConferencesComponent.verbose:
+            log.debug(f"=> DATA SENT [{self.client_id}]: {data}")
         self.sendMessage(json.dumps(data).encode())
 
     def onMessage(self, payload, isBinary):
@@ -258,6 +327,8 @@
             log.warning(f"Can't decode data: {payload!r}")
             return
 
+        if ConferencesComponent.verbose:
+            log.debug(f"<= DATA RECEIVED [{self.client_id}]: {data}")
         try:
             match data.get("type"):
                 case None:
@@ -266,14 +337,14 @@
                     version = data["version"][0]
 
                     log.debug(
-                        f"Handshake for group {self.conference.group_name!r}. Galène protocol "
-                        f" v{version}."
+                        f"Handshake for group {self.conference.group_name!r}. Galène "
+                        f"protocol  v{version}."
                     )
                     self.conference.ready.callback(None)
                 case "ping":
                     log.debug("pong")
                     self.send_data({"type": "pong"})
-                case "joined" | "answer" | "ice" as data_type:
+                case "joined" | "answer" | "ice" | "offer" as data_type:
                     method = getattr(self.conference, f"on_{data_type}")
                     method(data)
                 case _:
@@ -284,14 +355,26 @@
 
 class ConferencesComponent:
     IMPORT_NAME = IMPORT_NAME
+    verbose = 0
 
     def __init__(self, host):
         self.host = host
         self.client: SatXMPPEntity | None = None
         self._e: XEP_0106 = host.plugins["XEP-0106"]
         self._j: XEP_0166 = host.plugins["XEP-0166"]
+        self._rtp: XEP_0167 = host.plugins["XEP-0167"]
         self._ice_udp: XEP_0176 = host.plugins["XEP-0176"]
+        self._coin: XEP_0298 = host.plugins["XEP-0298"]
         host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)
+        self.initalized = False
+
+    def _init(self) -> None:
+        """Initialisation done after profile is connected"""
+        assert self.client is not None
+        self.client.identities.append(
+            disco.DiscoIdentity("conference", "audio-video", NAME)
+        )
+
         try:
             galene_path = Path(
                 self.host.memory.config_get(CONF_SECTION, "galene_path")
@@ -306,10 +389,15 @@
         self.galene_http_port = self.host.memory.config_get(
             CONF_SECTION, "http_port", "9443"
         )
-        galene_data_path = host.memory.get_cache_path(IMPORT_NAME, "galene")
+        galene_data_path = self.host.memory.get_cache_path(IMPORT_NAME, "galene")
         galene_static_path = galene_path.parent / "static"
         self.galene_group_path = galene_data_path / "groups"
         self.galene_group_path.mkdir(0o700, parents=True, exist_ok=True)
+        env = os.environ
+        if self.verbose >= 2:
+            # We activate Galène debug logs on ICE candidates.
+            env["PION_LOG_TRACE"] = "ice"
+
         try:
             d = self._process = async_process.run(
                 str(galene_path),
@@ -320,7 +408,8 @@
                 # We don't want HTTPS here, it's only used for local interactions
                 "-insecure",
                 path=str(galene_data_path),
-                verbose=True,
+                verbose=bool(self.verbose),
+                env=env,
             )
         except Exception:
             log.exception("Can't start Galene.")
@@ -331,8 +420,11 @@
     def get_handler(self, __):
         return ConferencesHandler()
 
-    def profile_connecting(self, client):
+    def profile_connecting(self, client: SatXMPPEntity) -> None:
         self.client = client
+        if not self.initalized:
+            self._init()
+            self.initalized = True
 
     async def attach_to_group(self, session: dict, group_name: str) -> Conference:
         """Attach to a Galène group.
@@ -372,6 +464,7 @@
             data_file=data_file,
             endpoint=endpoint,
             status_url=status_url,
+            session=session,
         )
 
         factory = GaleneClientFactory(conference)
@@ -418,9 +511,14 @@
         except KeyError:
             raise exceptions.InternalError("Conference data is missing.")
 
-        await conference.ready
-        conference.join(session["peer_jid"])
-        conference.send_offer(session, call_data["sdp"])
+        if call_data["role"] == self._j.ROLE_RESPONDER:
+            await conference.ready
+            conference.join(session["peer_jid"])
+            conference.send_offer(call_data["sdp"])
+        else:
+            raise exceptions.InternalError(
+                '"role" should be "{self._j.ROLE_RESPONDER}" here.'
+            )
 
     def on_ice_candidates_new(
         self,
@@ -443,14 +541,15 @@
             sdp_mid: str = content_id
 
             for candidate in media_data["candidates"]:
-                conference.add_candidate(session, candidate, sdp_mid, sdp_mline_index)
+                conference.add_candidate(candidate, sdp_mid, sdp_mline_index)
 
     def _galene_process_errback(self, failure_: Failure) -> None:
         log.error(f"Can't run Galene process: {failure_.value}")
 
 
+
 @implementer(iwokkel.IDisco)
 class ConferencesHandler(XMPPHandler):
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
-        return [disco.DiscoIdentity("conference", "audio-video")]
+        return [disco.DiscoFeature(NS_AV_CONFERENCES)]
--- a/libervia/backend/plugins/plugin_xep_0167/__init__.py	Sun Jul 14 17:42:53 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py	Mon Jul 29 03:29:14 2024 +0200
@@ -41,6 +41,7 @@
     NS_JINGLE_RTP_AUDIO,
     NS_JINGLE_RTP_INFO,
     NS_JINGLE_RTP_VIDEO,
+    NS_AV_CONFERENCES
 )
 
 
--- a/libervia/backend/plugins/plugin_xep_0167/constants.py	Sun Jul 14 17:42:53 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0167/constants.py	Mon Jul 29 03:29:14 2024 +0200
@@ -25,3 +25,4 @@
 NS_JINGLE_RTP_VIDEO: Final = f"{NS_JINGLE_RTP_BASE}:video"
 NS_JINGLE_RTP_ERRORS: Final = f"{NS_JINGLE_RTP_BASE}:errors:1"
 NS_JINGLE_RTP_INFO: Final = f"{NS_JINGLE_RTP_BASE}:info:1"
+NS_AV_CONFERENCES: Final = "urn:xmpp:jingle:av-conferences:0"