# HG changeset patch # User Goffi # Date 1698842467 -3600 # Node ID f0ce49b360c8277d4b5346b62d6da8b1d337ebc5 # Parent d87b9a6b0b69e33357e27f9303dad0f15ebfa139 calls: move webrtc code to core: WebRTC code which can be used in several frontends has been factorized and moved to common `frontends.tools`. Test have been updated consequently. rel 426 diff -r d87b9a6b0b69 -r f0ce49b360c8 libervia/desktop_kivy/plugins/plugin_wid_calls.py --- a/libervia/desktop_kivy/plugins/plugin_wid_calls.py Wed Oct 25 15:29:33 2023 +0200 +++ b/libervia/desktop_kivy/plugins/plugin_wid_calls.py Wed Nov 01 13:41:07 2023 +0100 @@ -1,8 +1,6 @@ from dataclasses import dataclass from pathlib import Path -import re from typing import Optional, Callable -from urllib.parse import quote_plus from functools import partial # from gi.repository import GLib @@ -37,7 +35,7 @@ from libervia.backend.core import exceptions from libervia.backend.tools.common import data_format from libervia.frontends.quick_frontend import quick_widgets -from libervia.frontends.tools import jid, aio +from libervia.frontends.tools import aio, jid, webrtc from libervia.desktop_kivy import G @@ -103,388 +101,46 @@ test_mode: bool = False + PROXIED_PROPERTIES = {'audio_muted', 'callee', 'sid', 'video_muted'} + PROXIED_METHODS = {'answer_call', 'end_call', 'on_accepted_call', 'on_ice_candidates_new', 'setup_call', 'start_pipeline'} + def __init__(self, parent_calls: "Calls", profile: str) -> None: self.parent_calls = parent_calls self.profile = profile - self.pipeline = None - self.reset_instance() - - @property - def sdp_set(self): - return self._sdp_set - - @sdp_set.setter - def sdp_set(self, is_set: bool): - self._sdp_set = is_set - if is_set: - self.on_ice_candidates_new(self.remote_candidates_buffer) - for data in self.remote_candidates_buffer.values(): - data["candidates"].clear() - - @property - def media_types(self): - if self._media_types is None: - raise Exception("self._media_types should not be None!") - return self._media_types - - @media_types.setter - def media_types(self, new_media_types: dict) -> None: - self._media_types = new_media_types - self._media_types_inv = {v: k for k, v in new_media_types.items()} - - @property - def media_types_inv(self) -> dict: - if self._media_types_inv is None: - raise Exception("self._media_types_inv should not be None!") - return self._media_types_inv - - def get_sdp_mline_index(self, media_type: str) -> int: - """Gets the sdpMLineIndex for a given media type. - - @param media_type: The type of the media. - """ - for index, m_type in self.media_types.items(): - if m_type == media_type: - return index - raise ValueError(f"Media type '{media_type}' not found") - - def _set_media_types(self, offer_sdp: str) -> None: - """Sets media types from offer SDP - - @param offer: RTC session description containing the offer - """ - sdp_lines = offer_sdp.splitlines() - media_types = {} - mline_index = 0 - - for line in sdp_lines: - if line.startswith("m="): - media_types[mline_index] = line[2 : line.find(" ")] - mline_index += 1 - - self.media_types = media_types - - def _a_call(self, method, *args, **kwargs): - """Call an async method in main thread""" - - def wrapper(__): - aio.run_async(method(*args, **kwargs)) - return False - - Clock.schedule_once(wrapper) - - def get_payload_types( - self, sdpmsg, video_encoding: str, audio_encoding: str - ) -> dict[str, int | None]: - """Find the payload types for the specified video and audio encoding. - - Very simplistically finds the first payload type matching the encoding - name. More complex applications will want to match caps on - profile-level-id, packetization-mode, etc. - """ - # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at - # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py - video_pt = None - audio_pt = None - for i in range(0, sdpmsg.medias_len()): - media = sdpmsg.get_media(i) - for j in range(0, media.formats_len()): - fmt = media.get_format(j) - if fmt == "webrtc-datachannel": - continue - pt = int(fmt) - caps = media.get_caps_from_media(pt) - s = caps.get_structure(0) - encoding_name = s["encoding-name"] - if video_pt is None and encoding_name == video_encoding: - video_pt = pt - elif audio_pt is None and encoding_name == audio_encoding: - audio_pt = pt - return {video_encoding: video_pt, audio_encoding: audio_pt} - - def parse_ice_candidate(self, candidate_string): - """Parses the ice candidate string. - - @param candidate_string: The ice candidate string to be parsed. - """ - pattern = re.compile( - r"candidate:(?P\S+) (?P\d+) (?P\S+) " - r"(?P\d+) (?P
\S+) (?P\d+) typ " - r"(?P\S+)(?: raddr (?P\S+) rport " - r"(?P\d+))?(?: generation (?P\d+))?" - ) - match = pattern.match(candidate_string) - if match: - candidate_dict = match.groupdict() - - # Apply the correct types to the dictionary values - candidate_dict["component_id"] = int(candidate_dict["component_id"]) - candidate_dict["priority"] = int(candidate_dict["priority"]) - candidate_dict["port"] = int(candidate_dict["port"]) - - if candidate_dict["rel_port"]: - candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) - - if candidate_dict["generation"]: - candidate_dict["generation"] = candidate_dict["generation"] - - # Remove None values - return {k: v for k, v in candidate_dict.items() if v is not None} - else: - log.warning(f"can't parse candidate: {candidate_string!r}") - return None - - def build_ice_candidate(self, parsed_candidate): - """Builds ICE candidate - - @param parsed_candidate: Dictionary containing parsed ICE candidate - """ - base_format = ( - "candidate:{foundation} {component_id} {transport} {priority} " - "{address} {port} typ {type}" - ) - - if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"): - base_format += " raddr {rel_addr} rport {rel_port}" - - if parsed_candidate.get("generation"): - base_format += " generation {generation}" - - return base_format.format(**parsed_candidate) - - def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: - """Retrieves ICE password and user fragment for SDP offer. - - @param sdp: The Session Description Protocol offer string. - @return: ufrag and pwd - @raise ValueError: Can't extract ufrag and password - """ - ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) - pwd_line = re.search(r"ice-pwd:(\S+)", sdp) - - if ufrag_line and pwd_line: - ufrag = self.ufrag = ufrag_line.group(1) - pwd = self.pwd = pwd_line.group(1) - return ufrag, pwd - else: - log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") - raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") - - def reset_instance(self): - """Inits or resets the instance variables to their default state.""" - self.role: str | None = None - if self.pipeline is not None: - self.pipeline.set_state(Gst.State.NULL) - self.pipeline = None - self.texture_map: dict[VideoStreamWidget, TextureData] = {} - self._remote_video_pad = None - self.sid: str | None = None - self.offer: str | None = None - self.local_candidates_buffer = {} - self.ufrag: str | None = None - self.pwd: str | None = None - self.callee: str | None = None - self._media_types = None - self._media_types_inv = None - self._sdp_set: bool = False - self.remote_candidates_buffer: dict[str, dict[str, list]] = { - "audio": {"candidates": []}, - "video": {"candidates": []}, - } - self._media_types = None - self._media_types_inv = None - self.audio_valve = None - self.video_valve = None + self.webrtc = webrtc.WebRTC( + G.host.a_bridge, + profile, + sinks=webrtc.SINKS_TEST if self.test_mode else webrtc.SINKS_APP, + appsink_data=webrtc.AppSinkData( + local_video_cb=partial( + self.on_new_sample, + update_sample_method=self.update_sample, + video_widget=self.parent_calls.local_video + ), + remote_video_cb=partial( + self.on_new_sample, + update_sample_method=self.update_sample, + video_widget=self.parent_calls.remote_video + ) + ), + reset_cb=self.on_reset - async def setup_call( - self, - role: str, - audio_pt: int | None = 96, - video_pt: int | None = 97, - ) -> None: - """Sets up the call. - - This method establishes the Gstreamer pipeline for audio and video communication. - The method also manages STUN and TURN server configurations, signal watchers, and - various connection handlers for the webrtcbin. - - @param role: The role for the call, either 'initiator' or 'responder'. - @param audio_pt: The payload type for the audio stream. - @param video_pt: The payload type for the video stream - - @raises NotImplementedError: If audio_pt or video_pt is set to None. - @raises AssertionError: If the role is not 'initiator' or 'responder'. - """ - assert role in ("initiator", "responder") - self.role = role - if audio_pt is None or video_pt is None: - raise NotImplementedError("None value is not handled yet") - - if self.test_mode: - video_source_elt = "videotestsrc is-live=true pattern=ball" - audio_source_elt = "audiotestsrc" - else: - video_source_elt = "v4l2src" - audio_source_elt = "pulsesrc" - - self.gst_pipe_desc = f""" - webrtcbin latency=100 name=sendrecv bundle-policy=max-compat - - input-selector name=video_selector - ! videorate - ! video/x-raw,framerate=30/1 - ! tee name=t - - {video_source_elt} name=video_src ! queue ! video_selector. - videotestsrc is-live=true pattern=black ! queue ! video_selector. + ) + self.pipeline = None - t. - ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream - ! videoconvert - ! vp8enc deadline=1 keyframe-max-dist=60 - ! rtpvp8pay picture-id-mode=15-bit - ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} - ! sendrecv. - - t. - ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 - ! videoconvert - ! appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 sync=True - - {audio_source_elt} name=audio_src - ! valve name=audio_valve - ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 - ! audioconvert - ! audioresample - ! opusenc audio-type=voice - ! rtpopuspay - ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} - ! sendrecv. - """ - - log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") - - # Create the pipeline - self.pipeline = Gst.parse_launch(self.gst_pipe_desc) - if not self.pipeline: - raise exceptions.InternalError("Failed to create Gstreamer pipeline.") - - self.webrtc = self.pipeline.get_by_name("sendrecv") - self.video_src = self.pipeline.get_by_name("video_src") - self.video_selector = self.pipeline.get_by_name("video_selector") - self.audio_valve = self.pipeline.get_by_name("audio_valve") - - if self.parent_calls.video_muted: - self.on_video_mute(True) - if self.parent_calls.audio_muted: - self.on_audio_mute(True) - - # set STUN and TURN servers - external_disco = data_format.deserialise( - await G.host.a_bridge.external_disco_get("", self.profile), type_check=list - ) + def __getattr__(self, name): + if name in self.PROXIED_PROPERTIES or name in self.PROXIED_METHODS: + return getattr(self.webrtc, name) + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") - for server in external_disco: - if server["type"] == "stun": - if server["transport"] == "tcp": - log.info( - "ignoring TCP STUN server, GStreamer only support one STUN server" - ) - url = f"stun://{server['host']}:{server['port']}" - log.debug(f"adding stun server: {url}") - self.webrtc.set_property("stun-server", url) - elif server["type"] == "turn": - url = "{scheme}://{username}:{password}@{host}:{port}".format( - scheme="turns" if server["transport"] == "tcp" else "turn", - username=quote_plus(server["username"]), - password=quote_plus(server["password"]), - host=server["host"], - port=server["port"], - ) - log.debug(f"adding turn server: {url}") - - if not self.webrtc.emit("add-turn-server", url): - log.warning(f"Erreur while adding TURN server {url}") - - # local video feedback - local_video_sink = self.pipeline.get_by_name("local_video_sink") - local_video_sink.set_property("emit-signals", True) - local_video_sink.connect( - "new-sample", - self.on_new_sample, - self.update_sample, - self.parent_calls.local_video, - ) - local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") - local_video_sink.set_property("caps", local_video_sink_caps) - - # Create bus and associate signal watchers - self.bus = self.pipeline.get_bus() - if not self.bus: - log.error("Failed to get bus from pipeline.") - return - - self.bus.add_signal_watch() - self.webrtc.connect("pad-added", self.on_pad_added) - self.bus.connect("message::error", self.on_bus_error) - self.bus.connect("message::eos", self.on_bus_eos) - self.webrtc.connect("on-negotiation-needed", self.on_negotiation_needed) - self.webrtc.connect("on-ice-candidate", self.on_ice_candidate) - self.webrtc.connect( - "notify::ice-gathering-state", self.on_ice_gathering_state_change - ) - self.webrtc.connect("notify::ice-connection-state", self.on_ice_connection_state) + def __setattr__(self, name, value): + if name in self.PROXIED_PROPERTIES: + setattr(self.webrtc, name, value) + else: + super().__setattr__(name, value) - def start_pipeline(self) -> None: - """Starts the GStreamer pipeline.""" - log.debug("starting the pipeline") - self.pipeline.set_state(Gst.State.PLAYING) - - def on_negotiation_needed(self, webrtc): - """Initiate SDP offer when negotiation is needed.""" - log.debug("Negotiation needed.") - if self.role == "initiator": - log.debug("Creating offer…") - promise = Gst.Promise.new_with_change_func(self.on_offer_created) - self.webrtc.emit("create-offer", None, promise) - - def on_offer_created(self, promise): - """Callback for when SDP offer is created.""" - log.info("on_offer_created called") - assert promise.wait() == Gst.PromiseResult.REPLIED - reply = promise.get_reply() - if reply is None: - log.error("Promise reply is None. Offer creation might have failed.") - return - offer = reply["offer"] - self.offer = offer.sdp.as_text() - log.info(f"SDP offer created: \n{self.offer}") - self._set_media_types(self.offer) - promise = Gst.Promise.new() - self.webrtc.emit("set-local-description", offer, promise) - promise.interrupt() - self._a_call(self._start_call) - - def on_answer_set(self, promise): - assert promise.wait() == Gst.PromiseResult.REPLIED - - def on_answer_created(self, promise, _, __): - """Callback for when SDP answer is created.""" - assert promise.wait() == Gst.PromiseResult.REPLIED - reply = promise.get_reply() - answer = reply["answer"] - promise = Gst.Promise.new() - self.webrtc.emit("set-local-description", answer, promise) - promise.interrupt() - answer_sdp = answer.sdp.as_text() - log.info(f"SDP answer set: \n{answer_sdp}") - self.sdp_set = True - self._a_call(G.host.a_bridge.call_answer_sdp, self.sid, answer_sdp, self.profile) - - def on_offer_set(self, promise): - assert promise.wait() == Gst.PromiseResult.REPLIED - promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None) - self.webrtc.emit("create-answer", None, promise) + def on_reset(self): + self.texture_map: dict[VideoStreamWidget, TextureData] = {} def on_new_sample( self, @@ -583,311 +239,6 @@ if buf is not None and mapinfo is not None: buf.unmap(mapinfo) - def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None: - """Handle the stream from the remote decodebin. - - This method processes the incoming stream from the remote decodebin, determining - whether it's video or audio. It then sets up the appropriate GStreamer elements - for video/audio processing and adds them to the pipeline. - - @param pad: The Gst.Pad from the remote decodebin producing the stream. - """ - assert self.pipeline is not None - if not pad.has_current_caps(): - log.error(f"{pad} has no caps, ignoring") - return - - self.pipeline.set_state(Gst.State.PAUSED) - caps = pad.get_current_caps() - assert len(caps) - s = caps[0] - name = s.get_name() - log.debug(f"====> NAME START: {name}") - - q = Gst.ElementFactory.make("queue") - q.set_property("max-size-time", 0) - q.set_property("max-size-bytes", 0) - q.set_property("max-size-buffers", 5) - - if name.startswith("video"): - log.debug("===> VIDEO OK") - - self._remote_video_pad = pad - - # Check and log the original size of the video - width = s.get_int("width").value - height = s.get_int("height").value - log.info(f"Original video size: {width}x{height}") - - # This is a fix for an issue found with Movim on desktop: a non standard - # resolution is used (990x557) resulting in bad alignement and no color in - # rendered image - adjust_resolution = width % 4 != 0 or height % 4 != 0 - if adjust_resolution: - log.info("non standard resolution, we need to adjust size") - width = (width + 3) // 4 * 4 - height = (height + 3) // 4 * 4 - log.info(f"Adjusted video size: {width}x{height}") - - conv = Gst.ElementFactory.make("videoconvert") - remote_video_sink = Gst.ElementFactory.make("appsink") - - appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB") - remote_video_sink.set_property("caps", appsink_caps) - - remote_video_sink.set_property("emit-signals", True) - remote_video_sink.set_property("drop", True) - remote_video_sink.set_property("max-buffers", 1) - remote_video_sink.set_property("sync", True) - remote_video_sink.connect( - "new-sample", - self.on_new_sample, - self.update_sample, - self.parent_calls.remote_video, - ) - - if adjust_resolution: - videoscale = Gst.ElementFactory.make("videoscale") - adjusted_caps = Gst.Caps.from_string( - f"video/x-raw,width={width},height={height}" - ) - capsfilter = Gst.ElementFactory.make("capsfilter") - capsfilter.set_property("caps", adjusted_caps) - - self.pipeline.add(q, conv, videoscale, capsfilter, remote_video_sink) - self.pipeline.sync_children_states() - pad.link(q.get_static_pad("sink")) - q.link(conv) - conv.link(videoscale) - videoscale.link(capsfilter) - capsfilter.link(remote_video_sink) - else: - self.pipeline.add(q, conv, remote_video_sink) - self.pipeline.sync_children_states() - pad.link(q.get_static_pad("sink")) - q.link(conv) - conv.link(remote_video_sink) - - elif name.startswith("audio"): - log.debug("===> Audio OK") - conv = Gst.ElementFactory.make("audioconvert") - resample = Gst.ElementFactory.make("audioresample") - remote_audio_sink = Gst.ElementFactory.make("autoaudiosink") - self.pipeline.add(q, conv, resample, remote_audio_sink) - self.pipeline.sync_children_states() - pad.link(q.get_static_pad("sink")) - q.link(conv) - conv.link(resample) - resample.link(remote_audio_sink) - self.pipeline.set_state(Gst.State.PLAYING) - - def on_pad_added(self, __, pad: Gst.Pad) -> None: - """Handle the addition of a new pad to the element. - - When a new source pad is added to the element, this method creates a decodebin, - connects it to handle the stream, and links the pad to the decodebin. - - @param __: Placeholder for the signal source. Not used in this method. - @param pad: The newly added pad. - """ - - log.debug("on_pad_added") - if pad.direction != Gst.PadDirection.SRC: - return - - decodebin = Gst.ElementFactory.make("decodebin") - decodebin.connect("pad-added", self.on_remote_decodebin_stream) - self.pipeline.add(decodebin) - decodebin.sync_state_with_parent() - pad.link(decodebin.get_static_pad("sink")) - - async def _start_call(self) -> None: - """Initiate the call. - - Initiates a call with the callee using the stored offer. If there are any buffered - local ICE candidates, they are sent as part of the initiation. - """ - assert self.callee is not None - self.sid = await G.host.a_bridge.call_start( - str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile - ) - if self.local_candidates_buffer: - log.debug( - f"sending buffered local ICE candidates: {self.local_candidates_buffer}" - ) - if self.pwd is None: - sdp = self.webrtc.props.local_description.sdp.as_text() - self.extract_ufrag_pwd(sdp) - ice_data = {} - for media_type, candidates in self.local_candidates_buffer.items(): - ice_data[media_type] = { - "ufrag": self.ufrag, - "pwd": self.pwd, - "candidates": candidates, - } - await G.host.a_bridge.ice_candidates_add( - self.sid, data_format.serialise(ice_data), self.profile - ) - self.local_candidates_buffer.clear() - - def _remote_sdp_set(self, promise) -> None: - assert promise.wait() == Gst.PromiseResult.REPLIED - self.sdp_set = True - - def on_accepted_call(self, sdp: str, profile: str) -> None: - """Outgoing call has been accepted. - - @param sdp: The SDP answer string received from the other party. - @param profile: Profile used for the call. - """ - log.debug(f"SDP answer received: \n{sdp}") - - __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp) - answer = GstWebRTC.WebRTCSessionDescription.new( - GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg - ) - promise = Gst.Promise.new_with_change_func(self._remote_sdp_set) - self.webrtc.emit("set-remote-description", answer, promise) - - async def answer_call(self, sdp: str, profile: str) -> None: - """Answer an incoming call - - @param sdp: The SDP offer string received from the initiator. - @param profile: Profile used for the call. - - @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in - payload types. - """ - log.debug(f"SDP offer received: \n{sdp}") - self._set_media_types(sdp) - __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp) - payload_types = self.get_payload_types( - offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" - ) - assert "VP8" in payload_types - assert "OPUS" in payload_types - await self.setup_call( - "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] - ) - self.start_pipeline() - offer = GstWebRTC.WebRTCSessionDescription.new( - GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg - ) - promise = Gst.Promise.new_with_change_func(self.on_offer_set) - self.webrtc.emit("set-remote-description", offer, promise) - - def on_ice_candidate(self, webrtc, mline_index, candidate_sdp): - """Handles the on-ice-candidate signal of webrtcbin. - - @param webrtc: The webrtcbin element. - @param mlineindex: The mline index. - @param candidate: The ICE candidate. - """ - log.debug( - f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" - ) - parsed_candidate = self.parse_ice_candidate(candidate_sdp) - try: - media_type = self.media_types[mline_index] - except KeyError: - raise exceptions.InternalError("can't find media type") - - if self.sid is None: - log.debug("buffering local ICE candidate") - self.local_candidates_buffer.setdefault(media_type, []).append( - parsed_candidate - ) - else: - sdp = self.webrtc.props.local_description.sdp.as_text() - assert sdp is not None - ufrag, pwd = self.extract_ufrag_pwd(sdp) - ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} - self._a_call( - G.host.a_bridge.ice_candidates_add, - self.sid, - data_format.serialise({media_type: ice_data}), - self.profile, - ) - - def on_ice_candidates_new(self, candidates: dict) -> None: - """Handle new ICE candidates. - - @param candidates: A dictionary containing media types ("audio" or "video") as - keys and corresponding ICE data as values. - - @raise exceptions.InternalError: Raised when sdp mline index is not found. - """ - if not self.sdp_set: - log.debug("buffering remote ICE candidate") - for media_type in ("audio", "video"): - media_candidates = candidates.get(media_type) - if media_candidates: - buffer = self.remote_candidates_buffer[media_type] - buffer["candidates"].extend(media_candidates["candidates"]) - return - for media_type, ice_data in candidates.items(): - for candidate in ice_data["candidates"]: - candidate_sdp = self.build_ice_candidate(candidate) - try: - mline_index = self.get_sdp_mline_index(media_type) - except Exception as e: - raise exceptions.InternalError(f"Can't find sdp mline index: {e}") - self.webrtc.emit("add-ice-candidate", mline_index, candidate_sdp) - log.debug( - f"Remote ICE candidate added. MLine Index: {mline_index}, " - f"{candidate_sdp}" - ) - - def on_ice_gathering_state_change(self, pspec, __): - state = self.webrtc.get_property("ice-gathering-state") - log.debug(f"ICE gathering state changed to {state}") - - def on_ice_connection_state(self, pspec, __): - state = self.webrtc.props.ice_connection_state - if state == GstWebRTC.WebRTCICEConnectionState.FAILED: - log.error("ICE connection failed") - log.info(f"ICE connection state changed to {state}") - - def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None: - """Handles the GStreamer bus error messages. - - @param bus: The GStreamer bus. - @param message: The error message. - """ - err, debug = message.parse_error() - log.error(f"Error from {message.src.get_name()}: {err.message}") - log.error(f"Debugging info: {debug}") - - def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: - """Handles the GStreamer bus eos messages. - - @param bus: The GStreamer bus. - @param message: The eos message. - """ - log.info("End of stream") - - def on_audio_mute(self, muted: bool) -> None: - if self.audio_valve is not None: - self.audio_valve.set_property("drop", muted) - state = "muted" if muted else "unmuted" - log.info(f"audio is now {state}") - - def on_video_mute(self, muted: bool) -> None: - if self.video_selector is not None: - # when muted, we switch to a black image and deactivate the camera - if not muted: - self.video_src.set_state(Gst.State.PLAYING) - pad = self.video_selector.get_static_pad("sink_1" if muted else "sink_0") - self.video_selector.props.active_pad = pad - if muted: - self.video_src.set_state(Gst.State.NULL) - state = "muted" if muted else "unmuted" - log.info(f"video is now {state}") - - async def end_call(self) -> None: - """Stop streaming and clean instance""" - self.reset_instance() - class Calls( quick_widgets.QuickWidget, @@ -918,10 +269,6 @@ self.header_input_add_extra(call_btn) self.webrtc = WebRTC(self, self.profile) self.previous_fullscreen = None - self.bind( - audio_muted=lambda __, value: self.webrtc.on_audio_mute(value), - video_muted=lambda __, value: self.webrtc.on_video_mute(value), - ) self.reset_instance() @property @@ -1029,6 +376,12 @@ self.screen_manager.transition.direction = "down" self.screen_manager.current = "search" + def on_audio_muted(self, instance, muted: bool) -> None: + self.webrtc.audio_muted = muted + + def on_video_muted(self, instance, muted: bool) -> None: + self.webrtc.video_muted = muted + def on_fullscreen(self, instance, fullscreen: bool) -> None: if fullscreen: G.host.app.show_head_widget(False, animation=False) diff -r d87b9a6b0b69 -r f0ce49b360c8 tests/unit/test_plugin_calls.py --- a/tests/unit/test_plugin_calls.py Wed Oct 25 15:29:33 2023 +0200 +++ b/tests/unit/test_plugin_calls.py Wed Nov 01 13:41:07 2023 +0100 @@ -18,8 +18,6 @@ from unittest.mock import AsyncMock, MagicMock -from gi.repository import Gst -from libervia.backend.core import exceptions from libervia.backend.tools.common import data_format import pytest @@ -36,39 +34,12 @@ return host -@pytest.fixture(scope="function") -def webrtc(): - """Fixture for WebRTC instantiation.""" - host_mock = MagicMock() - profile = "test_profile" - instance = plugin_wid_calls.WebRTC(host_mock, profile) - - instance._set_media_types = MagicMock() - instance.start_pipeline = MagicMock() - instance.webrtc = MagicMock() - instance.webrtc.emit = MagicMock() - - instance.GstSdp_SDPMessage_new_from_text = MagicMock() - instance.GstWebRTC_WebRTCSessionDescription_new = MagicMock() - instance.Gst_Promise_new_with_change_func = MagicMock() - - return instance - - @pytest.fixture def calls(monkeypatch, host): """Fixture for Call UI instantiation.""" for attr in ("header_box", "local_video", "remote_video", "screen_manager"): - monkeypatch.setattr( - plugin_wid_calls.Calls, - attr, - MagicMock() - ) - calls = plugin_wid_calls.Calls( - host, - "test_peer@example.org", - ["test_profile"] - ) + monkeypatch.setattr(plugin_wid_calls.Calls, attr, MagicMock()) + calls = plugin_wid_calls.Calls(host, "test_peer@example.org", ["test_profile"]) calls.jid_selector = MagicMock() calls.header_input = MagicMock() calls.header_input.text = "fake_jid@domain" @@ -80,197 +51,7 @@ return calls -class TestWebRtc: - def test_get_payload_types(self, webrtc): - """The method can identify the correct payload types for video and audio.""" - fake_sdpmsg = MagicMock() - fake_media = MagicMock() - fake_caps = MagicMock() - fake_structure = MagicMock() - - # This side effect will return 'fake_video_encoding' first, then - # 'fake_audio_encoding'. - fake_structure.__getitem__.side_effect = [ - "fake_video_encoding", - "fake_audio_encoding", - ] - fake_caps.get_structure.return_value = fake_structure - fake_media.get_format.side_effect = ["webrtc-datachannel", "10", "20"] - fake_media.get_caps_from_media.return_value = fake_caps - fake_sdpmsg.get_media.return_value = fake_media - fake_sdpmsg.medias_len.return_value = 1 - fake_media.formats_len.return_value = 3 - - result = webrtc.get_payload_types( - fake_sdpmsg, "fake_video_encoding", "fake_audio_encoding" - ) - expected_result = {"fake_video_encoding": 10, "fake_audio_encoding": 20} - - assert result == expected_result - - def test_on_accepted_call(self, webrtc): - """The method correctly sets the remote SDP upon acceptance of an outgoing call.""" - sdp_str = "mock_sdp_string" - profile_str = "test_profile" - - webrtc.on_accepted_call(sdp_str, profile_str) - - # remote description must be set - assert webrtc.webrtc.emit.call_count == 1 - assert webrtc.webrtc.emit.call_args[0][0] == "set-remote-description" - - @pytest.mark.asyncio - async def test_answer_call(self, webrtc, monkeypatch): - """The method correctly answers an incoming call.""" - mock_setup_call = AsyncMock() - - def mock_get_payload_types(sdpmsg, video_encoding, audio_encoding): - return {"VP8": 96, "OPUS": 97} - - monkeypatch.setattr(webrtc, "setup_call", mock_setup_call) - monkeypatch.setattr(webrtc, "get_payload_types", mock_get_payload_types) - - sdp_str = "mock_sdp_string" - profile_str = "mock_profile" - - await webrtc.answer_call(sdp_str, profile_str) - - mock_setup_call.assert_called_once_with("responder", audio_pt=97, video_pt=96) - - # remote description must be set - assert webrtc.webrtc.emit.call_count == 1 - assert webrtc.webrtc.emit.call_args[0][0] == "set-remote-description" - - def test_on_remote_decodebin_stream_video(self, webrtc, monkeypatch): - """The method correctly handles video streams from the remote decodebin.""" - mock_pipeline = MagicMock() - monkeypatch.setattr(webrtc, "pipeline", mock_pipeline) - - mock_pad = MagicMock() - mock_caps = MagicMock() - mock_structure = MagicMock() - - mock_pad.has_current_caps.return_value = True - mock_pad.get_current_caps.return_value = mock_caps - mock_caps.__len__.return_value = 1 - mock_caps.__getitem__.return_value = mock_structure - mock_structure.get_name.return_value = "video/x-h264" - # We use non-standard resolution as example to trigger the workaround - mock_structure.get_int.side_effect = lambda x: MagicMock( - value=990 if x == "width" else 557 - ) - - webrtc.on_remote_decodebin_stream(None, mock_pad) - - assert webrtc._remote_video_pad == mock_pad - mock_pipeline.add.assert_called() - mock_pipeline.set_state.assert_called() - mock_pad.link.assert_called() - - def test_on_remote_decodebin_stream_audio(self, webrtc, monkeypatch): - """The method correctly handles audio streams from the remote decodebin.""" - mock_pipeline = MagicMock() - monkeypatch.setattr(webrtc, "pipeline", mock_pipeline) - - mock_pad = MagicMock() - mock_caps = MagicMock() - mock_structure = MagicMock() - - mock_pad.has_current_caps.return_value = True - mock_pad.get_current_caps.return_value = mock_caps - mock_caps.__len__.return_value = 1 - mock_caps.__getitem__.return_value = mock_structure - mock_structure.get_name.return_value = "audio/x-raw" - - webrtc.on_remote_decodebin_stream(None, mock_pad) - - mock_pipeline.add.assert_called() - mock_pipeline.set_state.assert_called() - mock_pad.link.assert_called() - - @pytest.mark.asyncio - async def test_setup_call_correct_role(self, host, webrtc, monkeypatch): - """Roles are set in setup_call.""" - monkeypatch.setattr(Gst, "parse_launch", MagicMock()) - monkeypatch.setattr(data_format, "deserialise", MagicMock(return_value=[])) - - await webrtc.setup_call("initiator") - assert webrtc.role == "initiator" - - await webrtc.setup_call("responder") - assert webrtc.role == "responder" - - with pytest.raises(AssertionError): - await webrtc.setup_call("invalid_role") - - @pytest.mark.asyncio - async def test_setup_call_test_mode(self, host, webrtc, monkeypatch): - """Test mode use fake video and audio in setup_call.""" - monkeypatch.setattr(data_format, "deserialise", MagicMock(return_value=[])) - monkeypatch.setattr(webrtc, "test_mode", True) - await webrtc.setup_call("initiator") - assert "videotestsrc" in webrtc.gst_pipe_desc - assert "audiotestsrc" in webrtc.gst_pipe_desc - - @pytest.mark.asyncio - async def test_setup_call_normal_mode(self, host, webrtc, monkeypatch): - """Normal mode use real video and audio in setup_call.""" - monkeypatch.setattr(data_format, "deserialise", MagicMock(return_value=[])) - monkeypatch.setattr(webrtc, "test_mode", False) - await webrtc.setup_call("initiator") - assert "v4l2src" in webrtc.gst_pipe_desc - assert "pulsesrc" in webrtc.gst_pipe_desc - - @pytest.mark.asyncio - async def test_setup_call_with_stun_and_turn(self, host, webrtc, monkeypatch): - """STUN and TURN server configurations are done in setup_call.""" - mock_pipeline = MagicMock() - mock_parse_launch = MagicMock() - mock_parse_launch.return_value = mock_pipeline - monkeypatch.setattr(Gst, "parse_launch", mock_parse_launch) - - mock_pipeline.get_by_name.return_value = webrtc.webrtc - - mock_external_disco = [ - {"type": "stun", "transport": "udp", "host": "stun.host", "port": "3478"}, - { - "type": "turn", - "transport": "udp", - "host": "turn.host", - "port": "3478", - "username": "user", - "password": "pass", - }, - ] - - monkeypatch.setattr( - data_format, "deserialise", MagicMock(return_value=mock_external_disco) - ) - - mock_emit = AsyncMock() - monkeypatch.setattr(webrtc.webrtc, "emit", mock_emit) - - mock_set_property = AsyncMock() - monkeypatch.setattr(webrtc.webrtc, "set_property", mock_set_property) - - await webrtc.setup_call("initiator") - - G.host.a_bridge.external_disco_get.assert_called_once_with("", webrtc.profile) - mock_set_property.assert_any_call("stun-server", "stun://stun.host:3478") - mock_emit.assert_called_once_with( - "add-turn-server", "turn://user:pass@turn.host:3478" - ) - - @pytest.mark.asyncio - async def test_setup_call_gstreamer_pipeline_failure(self, webrtc, monkeypatch): - """Test setup_call method handling Gstreamer pipeline failure.""" - monkeypatch.setattr(Gst, "parse_launch", lambda _: None) - with pytest.raises(exceptions.InternalError): - await webrtc.setup_call("initiator") - - class TestCalls: - @pytest.mark.asyncio async def test_toggle_call_sid_none(self, monkeypatch, calls): """Call is started when there is not sid set.""" @@ -293,7 +74,6 @@ host.a_bridge.call_end.assert_called_once_with("test_sid", "", calls.profile) assert calls.in_call == False - @pytest.mark.asyncio async def test_on_incoming_call_sid_none(self, monkeypatch, host, calls): """Incoming call is accepted if no ongoing call.""" @@ -307,9 +87,7 @@ assert calls.in_call == True assert calls.webrtc.sid == "test_sid" host.a_bridge.action_launch.assert_called_once_with( - fake_action_id, - data_format.serialise({"cancelled": False}), - fake_profile + fake_action_id, data_format.serialise({"cancelled": False}), fake_profile ) @pytest.mark.asyncio @@ -329,10 +107,7 @@ @pytest.mark.asyncio async def test_on_call_setup_initiator(self, calls): """Correct method called if role is 'initiator'.""" - setup_data = { - "role": "initiator", - "sdp": "fake_sdp" - } + setup_data = {"role": "initiator", "sdp": "fake_sdp"} profile = "fake_profile" await calls.on_call_setup(setup_data, profile) @@ -342,15 +117,8 @@ @pytest.mark.asyncio async def test_on_call_setup_responder(self, monkeypatch, calls): """Correct method called if role is 'responder'.""" - monkeypatch.setattr( - calls.webrtc, - "answer_call", - AsyncMock() - ) - setup_data = { - "role": "responder", - "sdp": "fake_sdp" - } + monkeypatch.setattr(calls.webrtc, "answer_call", AsyncMock()) + setup_data = {"role": "responder", "sdp": "fake_sdp"} profile = "fake_profile" await calls.on_call_setup(setup_data, profile) @@ -361,10 +129,7 @@ @pytest.mark.asyncio async def test_on_call_setup_invalid_role(self, calls): """Nothing is called if role is neither 'initiator' nor 'responder'.""" - setup_data = { - "role": "invalid_role", - "sdp": "fake_sdp" - } + setup_data = {"role": "invalid_role", "sdp": "fake_sdp"} profile = "fake_profile" await calls.on_call_setup(setup_data, profile)