# HG changeset patch # User Goffi # Date 1692624969 -7200 # Node ID e65d2ef1ded4795f61bab2e1b6bde8baa47f5e55 # Parent d282dbdd5ffdf78558b3be84992e5f0e523edab5 browser (calls/webrtc): send ICE candidates when received: - ICE candidates gathering is not waited for anymore - if session is not ready, candidates are buffered and replayed when suitable diff -r d282dbdd5ffd -r e65d2ef1ded4 libervia/web/pages/calls/_browser/webrtc.py --- a/libervia/web/pages/calls/_browser/webrtc.py Mon Aug 21 15:23:22 2023 +0200 +++ b/libervia/web/pages/calls/_browser/webrtc.py Mon Aug 21 15:36:09 2023 +0200 @@ -11,7 +11,6 @@ log.warning = log.warn profile = window.profile or "" bridge = Bridge() -GATHER_TIMEOUT = 10000 class WebRTC: @@ -75,15 +74,17 @@ self._media_types = None self._media_types_inv = None self._callee = None + self.ufrag = None + self.pwd = None self.sid = None self.local_candidates = None self.remote_stream = None - self.candidates_buffer = { + self.remote_candidates_buffer = { "audio": {"candidates": []}, "video": {"candidates": []}, } + self.local_candidates_buffer = {} self.media_candidates = {} - self.candidates_gathered = aio.Future() if self.on_reset_cb is not None: self.on_reset_cb() @@ -138,16 +139,20 @@ return index raise ValueError(f"Media type '{media_type}' not found") - def extract_pwd_ufrag(self, sdp): + def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: """Retrieves ICE password and user fragment for SDP offer. @param sdp: The Session Description Protocol offer string. + @return: ufrag and pwd + @raise ValueError: Can't extract ufrag and password """ ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) pwd_line = re.search(r"ice-pwd:(\S+)", sdp) if ufrag_line and pwd_line: - return ufrag_line.group(1), pwd_line.group(1) + ufrag = self.ufrag = ufrag_line.group(1) + pwd = self.pwd = pwd_line.group(1) + return ufrag, pwd else: log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") @@ -242,6 +247,23 @@ return self.media_candidates.setdefault(media_type, []).append(parsed_candidate) log.debug(f"ICE candidate [{media_type}]: {event.candidate.candidate}") + if self.sid is None: + log.debug("buffering candidate") + self.local_candidates_buffer.setdefault(media_type, []).append( + parsed_candidate + ) + else: + ufrag, pwd = self.extract_ufrag_pwd( + self._peer_connection.localDescription.sdp + ) + + ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} + aio.run( + bridge.ice_candidates_add( + self.sid, json.dumps({media_type: ice_data}) + ) + ) + else: log.debug("All ICE candidates gathered") @@ -294,7 +316,6 @@ log.debug(f"on_ice_gathering_state_change {connection.iceGatheringState=}") if connection.iceGatheringState == "complete": log.info("ICE candidates gathering done") - self.candidates_gathered.set_result(None) async def _create_peer_connection( self, @@ -495,7 +516,6 @@ if video_sender: await video_sender.replaceTrack(new_video_tracks[0]) - async def _gather_ice_candidates(self, is_initiator: bool, remote_candidates=None): """Get ICE candidates and wait to have them all before returning them @@ -508,12 +528,6 @@ ) self.media_candidates.clear() - gather_timeout = timer.set_timeout( - lambda: self.candidates_gathered.set_exception( - errors.TimeoutError("ICE gathering time out") - ), - GATHER_TIMEOUT, - ) if is_initiator: offer = await self._peer_connection.createOffer() @@ -526,10 +540,8 @@ if not is_initiator: log.debug(self._peer_connection.localDescription.sdp) - await self.candidates_gathered log.debug(self._peer_connection.localDescription.sdp) - timer.clear_timeout(gather_timeout) - ufrag, pwd = self.extract_pwd_ufrag(self._peer_connection.localDescription.sdp) + ufrag, pwd = self.extract_ufrag_pwd(self._peer_connection.localDescription.sdp) return { "ufrag": ufrag, "pwd": pwd, @@ -544,8 +556,8 @@ @param profile: Profile associated """ await self._peer_connection.setRemoteDescription({"type": "answer", "sdp": sdp}) - await self.on_ice_candidates_new(self.candidates_buffer) - self.candidates_buffer.clear() + await self.on_ice_candidates_new(self.remote_candidates_buffer) + self.remote_candidates_buffer.clear() def _on_ice_candidates_new(self, sid: str, candidates_s: str, profile: str) -> None: """Called when new ICE candidates are received @@ -574,14 +586,23 @@ @param candidates: Dictionary containing new ICE candidates """ log.debug(f"new peer candidates received: {candidates}") + # FIXME: workaround for https://github.com/brython-dev/brython/issues/2227, the + # following test raise a JS exception + try: + remoteDescription_is_none = self._peer_connection.remoteDescription is None + except Exception as e: + log.debug("Workaround for Brython bug activated.") + remoteDescription_is_none = True + if ( self._peer_connection is None - or self._peer_connection.remoteDescription is None + # or self._peer_connection.remoteDescription is None + or remoteDescription_is_none ): for media_type in ("audio", "video"): media_candidates = candidates.get(media_type) if media_candidates: - buffer = self.candidates_buffer[media_type] + buffer = self.remote_candidates_buffer[media_type] buffer["candidates"].extend(media_candidates["candidates"]) return for media_type, ice_data in candidates.items(): @@ -625,8 +646,8 @@ await self._peer_connection.setRemoteDescription( {"type": "offer", "sdp": offer_sdp} ) - await self.on_ice_candidates_new(self.candidates_buffer) - self.candidates_buffer.clear() + await self.on_ice_candidates_new(self.remote_candidates_buffer) + self.remote_candidates_buffer.clear() await self._get_user_media() # Gather local ICE candidates @@ -652,6 +673,28 @@ self.sid = await bridge.call_start(str(callee_jid), json.dumps(call_data)) log.debug(f"Call SID: {self.sid}") + if self.local_candidates_buffer: + log.debug( + f"sending buffered local ICE candidates: {self.local_candidates_buffer}" + ) + assert self.pwd is not None + ice_data = {} + for media_type, candidates in self.local_candidates_buffer.items(): + ice_data[media_type] = { + "ufrag": self.ufrag, + "pwd": self.pwd, + "candidates": candidates + } + aio.run( + bridge.ice_candidates_add( + self.sid, + json.dumps( + ice_data + ), + ) + ) + self.local_candidates_buffer.clear() + async def end_call(self) -> None: """Stop streaming and clean instance""" if self._peer_connection is None: @@ -694,8 +737,8 @@ def toggle_media_mute(self, media_type: str) -> bool: """Toggle mute/unmute for media tracks. - @param media_type: 'audio' or 'video'. Determines which media tracks - to process. + @param media_type: "audio" or "video". Determines which media tracks + to process. """ assert media_type in ("audio", "video"), "Invalid media type" @@ -703,13 +746,10 @@ is_muted_attr = f"is_{media_type}_muted" if local_video.srcObject: - log.debug(f"{local_video.srcObject=}") track_getter = getattr( local_video.srcObject, f"get{media_type.capitalize()}Tracks" ) - log.debug("track go") for track in track_getter(): - log.debug(f"{track=}") track.enabled = not track.enabled setattr(self, is_muted_attr, not track.enabled)