Mercurial > libervia-backend
diff libervia/frontends/tools/webrtc.py @ 4139:6745c6bd4c7a
frontends (tools): webrtc implementation:
this is a factored implementation usable by all non-web frontends. Sources and Sinks can
be configured easily to use tests source or local webcam/microphone, autosinks or a
`appsink` that the frontend will use.
rel 426
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Nov 2023 14:03:36 +0100 |
parents | |
children | 970b6209526a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/frontends/tools/webrtc.py Wed Nov 01 14:03:36 2023 +0100 @@ -0,0 +1,909 @@ +#!/usr/bin/env python3 + +# Libervia WebRTC implementation +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import gi +gi.require_versions({ + "Gst": "1.0", + "GstWebRTC": "1.0" +}) +from gi.repository import Gst, GstWebRTC, GstSdp + +try: + from gi.overrides import Gst as _ +except ImportError: + print( + "no GStreamer python overrides available, please install relevant pacakges on " + "your system." + ) +import asyncio +from dataclasses import dataclass +from datetime import datetime +import logging +import re +from typing import Callable +from urllib.parse import quote_plus + +from libervia.backend.core import exceptions +from libervia.backend.tools.common import data_format +from libervia.frontends.tools import aio + + +log = logging.getLogger(__name__) + +Gst.init(None) + +SOURCES_AUTO = "auto" +SOURCES_TEST = "test" +SINKS_APP = "app" +SINKS_AUTO = "auto" +SINKS_TEST = "test" + + +@dataclass +class AppSinkData: + local_video_cb: Callable + remote_video_cb: Callable + + +class WebRTC: + """GSTreamer based WebRTC implementation for audio and video communication. + + This class encapsulates the WebRTC functionalities required for initiating and + handling audio and video calls. + """ + + def __init__( + self, + bridge, + profile: str, + sources: str = SOURCES_AUTO, + sinks: str = SINKS_AUTO, + appsink_data: AppSinkData | None = None, + reset_cb: Callable | None = None, + ) -> None: + self.main_loop = asyncio.get_event_loop() + self.bridge = bridge + self.profile = profile + self.pipeline = None + self._audio_muted = False + self._video_muted = False + self.sources = sources + self.sinks = sinks + if sinks == SINKS_APP: + self.appsink_data = appsink_data + elif appsink_data is not None: + raise exceptions.InternalError( + "appsink_data can only be used for SINKS_APP sinks" + ) + self.reset_cb = reset_cb + self.reset_instance() + + @property + def audio_muted(self): + return self._audio_muted + + @audio_muted.setter + def audio_muted(self, muted: bool) -> None: + if muted != self._audio_muted: + self._audio_muted = muted + self.on_audio_mute(muted) + + @property + def video_muted(self): + return self._video_muted + + @video_muted.setter + def video_muted(self, muted: bool) -> None: + if muted != self._video_muted: + self._video_muted = muted + self.on_video_mute(muted) + + @property + def sdp_set(self): + return self._sdp_set + + @sdp_set.setter + def sdp_set(self, is_set: bool): + self._sdp_set = is_set + if is_set: + self.on_ice_candidates_new(self.remote_candidates_buffer) + for data in self.remote_candidates_buffer.values(): + data["candidates"].clear() + + @property + def media_types(self): + if self._media_types is None: + raise Exception("self._media_types should not be None!") + return self._media_types + + @media_types.setter + def media_types(self, new_media_types: dict) -> None: + self._media_types = new_media_types + self._media_types_inv = {v: k for k, v in new_media_types.items()} + + @property + def media_types_inv(self) -> dict: + if self._media_types_inv is None: + raise Exception("self._media_types_inv should not be None!") + return self._media_types_inv + + def generate_dot_file( + self, + filename: str = "pipeline", + details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL, + with_timestamp: bool = True, + bin_: Gst.Bin|None = None, + ) -> None: + """Generate Dot File for debugging + + ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir. + ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file. + See + https://gstreamer.freedesktop.org/documentation/gstreamer/debugutils.html?gi-language=python#GstDebugGraphDetails + for details. + + @param filename: name of the generated file + @param details: which details to print + @param with_timestamp: if True, add a timestamp to filename + @param bin_: which bin to output. By default, the whole pipeline + (``self.pipeline``) will be used. + """ + if bin_ is None: + bin_ = self.pipeline + if with_timestamp: + timestamp = datetime.now().isoformat(timespec='milliseconds') + filename = f"{timestamp}_filename" + + Gst.debug_bin_to_dot_file(bin_, details, filename) + + def get_sdp_mline_index(self, media_type: str) -> int: + """Gets the sdpMLineIndex for a given media type. + + @param media_type: The type of the media. + """ + for index, m_type in self.media_types.items(): + if m_type == media_type: + return index + raise ValueError(f"Media type '{media_type}' not found") + + def _set_media_types(self, offer_sdp: str) -> None: + """Sets media types from offer SDP + + @param offer: RTC session description containing the offer + """ + sdp_lines = offer_sdp.splitlines() + media_types = {} + mline_index = 0 + + for line in sdp_lines: + if line.startswith("m="): + media_types[mline_index] = line[2 : line.find(" ")] + mline_index += 1 + + self.media_types = media_types + + def _a_call(self, method, *args, **kwargs): + """Call an async method in main thread""" + aio.run_from_thread(method, *args, **kwargs, loop=self.main_loop) + + def get_payload_types( + self, sdpmsg, video_encoding: str, audio_encoding: str + ) -> dict[str, int | None]: + """Find the payload types for the specified video and audio encoding. + + Very simplistically finds the first payload type matching the encoding + name. More complex applications will want to match caps on + profile-level-id, packetization-mode, etc. + """ + # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at + # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py + video_pt = None + audio_pt = None + for i in range(0, sdpmsg.medias_len()): + media = sdpmsg.get_media(i) + for j in range(0, media.formats_len()): + fmt = media.get_format(j) + if fmt == "webrtc-datachannel": + continue + pt = int(fmt) + caps = media.get_caps_from_media(pt) + s = caps.get_structure(0) + encoding_name = s["encoding-name"] + if video_pt is None and encoding_name == video_encoding: + video_pt = pt + elif audio_pt is None and encoding_name == audio_encoding: + audio_pt = pt + return {video_encoding: video_pt, audio_encoding: audio_pt} + + def parse_ice_candidate(self, candidate_string): + """Parses the ice candidate string. + + @param candidate_string: The ice candidate string to be parsed. + """ + pattern = re.compile( + r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " + r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " + r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " + r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" + ) + match = pattern.match(candidate_string) + if match: + candidate_dict = match.groupdict() + + # Apply the correct types to the dictionary values + candidate_dict["component_id"] = int(candidate_dict["component_id"]) + candidate_dict["priority"] = int(candidate_dict["priority"]) + candidate_dict["port"] = int(candidate_dict["port"]) + + if candidate_dict["rel_port"]: + candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) + + if candidate_dict["generation"]: + candidate_dict["generation"] = candidate_dict["generation"] + + # Remove None values + return {k: v for k, v in candidate_dict.items() if v is not None} + else: + log.warning(f"can't parse candidate: {candidate_string!r}") + return None + + def build_ice_candidate(self, parsed_candidate): + """Builds ICE candidate + + @param parsed_candidate: Dictionary containing parsed ICE candidate + """ + base_format = ( + "candidate:{foundation} {component_id} {transport} {priority} " + "{address} {port} typ {type}" + ) + + if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"): + base_format += " raddr {rel_addr} rport {rel_port}" + + if parsed_candidate.get("generation"): + base_format += " generation {generation}" + + return base_format.format(**parsed_candidate) + + def extract_ufrag_pwd(self, sdp: str) -> tuple[str, str]: + """Retrieves ICE password and user fragment for SDP offer. + + @param sdp: The Session Description Protocol offer string. + @return: ufrag and pwd + @raise ValueError: Can't extract ufrag and password + """ + ufrag_line = re.search(r"ice-ufrag:(\S+)", sdp) + pwd_line = re.search(r"ice-pwd:(\S+)", sdp) + + if ufrag_line and pwd_line: + ufrag = self.ufrag = ufrag_line.group(1) + pwd = self.pwd = pwd_line.group(1) + return ufrag, pwd + else: + log.error(f"SDP with missing ice-ufrag or ice-pwd:\n{sdp}") + raise ValueError("Can't extract ice-ufrag and ice-pwd from SDP") + + def reset_instance(self): + """Inits or resets the instance variables to their default state.""" + self.role: str | None = None + if self.pipeline is not None: + self.pipeline.set_state(Gst.State.NULL) + self.pipeline = None + self._remote_video_pad = None + self.sid: str | None = None + self.offer: str | None = None + self.local_candidates_buffer = {} + self.ufrag: str | None = None + self.pwd: str | None = None + self.callee: str | None = None + self._media_types = None + self._media_types_inv = None + self._sdp_set: bool = False + self.remote_candidates_buffer: dict[str, dict[str, list]] = { + "audio": {"candidates": []}, + "video": {"candidates": []}, + } + self._media_types = None + self._media_types_inv = None + self.audio_valve = None + self.video_valve = None + if self.reset_cb is not None: + self.reset_cb() + + + async def setup_call( + self, + role: str, + audio_pt: int | None = 96, + video_pt: int | None = 97, + ) -> None: + """Sets up the call. + + This method establishes the Gstreamer pipeline for audio and video communication. + The method also manages STUN and TURN server configurations, signal watchers, and + various connection handlers for the webrtcbin. + + @param role: The role for the call, either 'initiator' or 'responder'. + @param audio_pt: The payload type for the audio stream. + @param video_pt: The payload type for the video stream + + @raises NotImplementedError: If audio_pt or video_pt is set to None. + @raises AssertionError: If the role is not 'initiator' or 'responder'. + """ + assert role in ("initiator", "responder") + self.role = role + if audio_pt is None or video_pt is None: + raise NotImplementedError("None value is not handled yet") + + if self.sources == SOURCES_AUTO: + video_source_elt = "v4l2src" + audio_source_elt = "pulsesrc" + elif self.sources == SOURCES_TEST: + video_source_elt = "videotestsrc is-live=true pattern=ball" + audio_source_elt = "audiotestsrc" + else: + raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}') + + extra_elt = "" + + if self.sinks == SINKS_APP: + local_video_sink_elt = ( + "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " + "sync=True" + ) + elif self.sinks == SINKS_AUTO: + extra_elt = "compositor name=compositor ! autovideosink" + local_video_sink_elt = """compositor.sink_1""" + else: + raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") + + self.gst_pipe_desc = f""" + webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle + + input-selector name=video_selector + ! videorate + ! video/x-raw,framerate=30/1 + ! tee name=t + + {extra_elt} + + {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. + videotestsrc is-live=true pattern=black ! queue leaky=downstream ! video_selector. + + t. + ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream + ! videoconvert + ! vp8enc deadline=1 keyframe-max-dist=60 + ! rtpvp8pay picture-id-mode=15-bit + ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} + ! sendrecv. + + t. + ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream + ! videoconvert + ! {local_video_sink_elt} + + {audio_source_elt} name=audio_src + ! valve + ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream + ! audioconvert + ! audioresample + ! opusenc audio-type=voice + ! rtpopuspay + ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} + ! sendrecv. + """ + + log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") + + # Create the pipeline + self.pipeline = Gst.parse_launch(self.gst_pipe_desc) + if not self.pipeline: + raise exceptions.InternalError("Failed to create Gstreamer pipeline.") + + self.webrtcbin = self.pipeline.get_by_name("sendrecv") + self.video_src = self.pipeline.get_by_name("video_src") + self.video_selector = self.pipeline.get_by_name("video_selector") + self.audio_valve = self.pipeline.get_by_name("audio_valve") + + if self.video_muted: + self.on_video_mute(True) + if self.audio_muted: + self.on_audio_mute(True) + + # set STUN and TURN servers + external_disco = data_format.deserialise( + await self.bridge.external_disco_get("", self.profile), type_check=list + ) + + for server in external_disco: + if server["type"] == "stun": + if server["transport"] == "tcp": + log.info( + "ignoring TCP STUN server, GStreamer only support one STUN server" + ) + url = f"stun://{server['host']}:{server['port']}" + log.debug(f"adding stun server: {url}") + self.webrtcbin.set_property("stun-server", url) + elif server["type"] == "turn": + url = "{scheme}://{username}:{password}@{host}:{port}".format( + scheme="turns" if server["transport"] == "tcp" else "turn", + username=quote_plus(server["username"]), + password=quote_plus(server["password"]), + host=server["host"], + port=server["port"], + ) + log.debug(f"adding turn server: {url}") + + if not self.webrtcbin.emit("add-turn-server", url): + log.warning(f"Erreur while adding TURN server {url}") + + # local video feedback + if self.sinks == SINKS_APP: + assert self.appsink_data is not None + local_video_sink = self.pipeline.get_by_name("local_video_sink") + local_video_sink.set_property("emit-signals", True) + local_video_sink.connect("new-sample", self.appsink_data.local_video_cb) + local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") + local_video_sink.set_property("caps", local_video_sink_caps) + + # Create bus and associate signal watchers + self.bus = self.pipeline.get_bus() + if not self.bus: + log.error("Failed to get bus from pipeline.") + return + + self.bus.add_signal_watch() + self.webrtcbin.connect("pad-added", self.on_pad_added) + self.bus.connect("message::error", self.on_bus_error) + self.bus.connect("message::eos", self.on_bus_eos) + self.webrtcbin.connect("on-negotiation-needed", self.on_negotiation_needed) + self.webrtcbin.connect("on-ice-candidate", self.on_ice_candidate) + self.webrtcbin.connect( + "notify::ice-gathering-state", self.on_ice_gathering_state_change + ) + self.webrtcbin.connect( + "notify::ice-connection-state", self.on_ice_connection_state + ) + + def start_pipeline(self) -> None: + """Starts the GStreamer pipeline.""" + log.debug("starting the pipeline") + self.pipeline.set_state(Gst.State.PLAYING) + + def on_negotiation_needed(self, webrtc): + """Initiate SDP offer when negotiation is needed.""" + log.debug("Negotiation needed.") + if self.role == "initiator": + log.debug("Creating offer…") + promise = Gst.Promise.new_with_change_func(self.on_offer_created) + self.webrtcbin.emit("create-offer", None, promise) + + def on_offer_created(self, promise): + """Callback for when SDP offer is created.""" + log.info("on_offer_created called") + assert promise.wait() == Gst.PromiseResult.REPLIED + reply = promise.get_reply() + if reply is None: + log.error("Promise reply is None. Offer creation might have failed.") + return + offer = reply["offer"] + self.offer = offer.sdp.as_text() + log.info(f"SDP offer created: \n{self.offer}") + self._set_media_types(self.offer) + promise = Gst.Promise.new() + self.webrtcbin.emit("set-local-description", offer, promise) + promise.interrupt() + self._a_call(self._start_call) + + def on_answer_set(self, promise): + assert promise.wait() == Gst.PromiseResult.REPLIED + + def on_answer_created(self, promise, _, __): + """Callback for when SDP answer is created.""" + assert promise.wait() == Gst.PromiseResult.REPLIED + reply = promise.get_reply() + answer = reply["answer"] + promise = Gst.Promise.new() + self.webrtcbin.emit("set-local-description", answer, promise) + promise.interrupt() + answer_sdp = answer.sdp.as_text() + log.info(f"SDP answer set: \n{answer_sdp}") + self.sdp_set = True + self._a_call(self.bridge.call_answer_sdp, self.sid, answer_sdp, self.profile) + + def on_offer_set(self, promise): + assert promise.wait() == Gst.PromiseResult.REPLIED + promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None) + self.webrtcbin.emit("create-answer", None, promise) + + def link_element_or_pad( + self, source: Gst.Element, dest: Gst.Element | Gst.Pad + ) -> bool: + """Check if dest is a pad or an element, and link appropriately""" + src_pad = source.get_static_pad("src") + + if isinstance(dest, Gst.Pad): + # If the dest is a pad, link directly + if not src_pad.link(dest) == Gst.PadLinkReturn.OK: + log.error( + "Failed to link 'conv' to the compositor's newly requested pad!" + ) + return False + elif isinstance(dest, Gst.Element): + return source.link(dest) + else: + log.error(f"Unexpected type for dest: {type(sink)}") + return False + + return True + + def scaled_dimensions( + self, original_width: int, original_height: int, max_width: int, max_height: int + ) -> tuple[int, int]: + """Calculates the scaled dimensions preserving aspect ratio. + + @param original_width: Original width of the video stream. + @param original_height: Original height of the video stream. + @param max_width: Maximum desired width for the scaled video. + @param max_height: Maximum desired height for the scaled video. + @return: The width and height of the scaled video. + """ + aspect_ratio = original_width / original_height + new_width = int(max_height * aspect_ratio) + + if new_width <= max_width: + return new_width, max_height + + new_height = int(max_width / aspect_ratio) + return max_width, new_height + + def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None: + """Handle the stream from the remote decodebin. + + This method processes the incoming stream from the remote decodebin, determining + whether it's video or audio. It then sets up the appropriate GStreamer elements + for video/audio processing and adds them to the pipeline. + + @param pad: The Gst.Pad from the remote decodebin producing the stream. + """ + assert self.pipeline is not None + if not pad.has_current_caps(): + log.error(f"{pad} has no caps, ignoring") + return + + caps = pad.get_current_caps() + assert len(caps) + s = caps[0] + name = s.get_name() + log.debug(f"====> NAME START: {name}") + + q = Gst.ElementFactory.make("queue") + + if name.startswith("video"): + log.debug("===> VIDEO OK") + + self._remote_video_pad = pad + + # Check and log the original size of the video + width = s.get_int("width").value + height = s.get_int("height").value + log.info(f"Original video size: {width}x{height}") + + # This is a fix for an issue found with Movim on desktop: a non standard + # resolution is used (990x557) resulting in bad alignement and no color in + # rendered image + adjust_resolution = width % 4 != 0 or height % 4 != 0 + if adjust_resolution: + log.info("non standard resolution, we need to adjust size") + width = (width + 3) // 4 * 4 + height = (height + 3) // 4 * 4 + log.info(f"Adjusted video size: {width}x{height}") + + conv = Gst.ElementFactory.make("videoconvert") + if self.sinks == SINKS_APP: + assert self.appsink_data is not None + remote_video_sink = Gst.ElementFactory.make("appsink") + + appsink_caps = Gst.Caps.from_string("video/x-raw,format=RGB") + remote_video_sink.set_property("caps", appsink_caps) + + remote_video_sink.set_property("emit-signals", True) + remote_video_sink.set_property("drop", True) + remote_video_sink.set_property("max-buffers", 1) + remote_video_sink.set_property("sync", True) + remote_video_sink.connect("new-sample", self.appsink_data.remote_video_cb) + self.pipeline.add(remote_video_sink) + if self.sinks == SINKS_AUTO: + compositor = self.pipeline.get_by_name("compositor") + + sink1_pad = compositor.get_static_pad("sink_1") + + local_width, local_height = self.scaled_dimensions( + sink1_pad.get_property("width"), + sink1_pad.get_property("height"), + width // 3, + height // 3, + ) + + sink1_pad.set_property("xpos", width - local_width) + sink1_pad.set_property("ypos", height - local_height) + sink1_pad.set_property("width", local_width) + sink1_pad.set_property("height", local_height) + sink1_pad.set_property("zorder", 1) + + # Request a new pad for the remote stream + sink_pad_template = compositor.get_pad_template("sink_%u") + remote_video_sink = compositor.request_pad(sink_pad_template, None, None) + remote_video_sink.set_property("zorder", 0) + + else: + raise exceptions.InternalError(f'Unhandled "sinks" value: {self.sinks!r}') + + if adjust_resolution: + videoscale = Gst.ElementFactory.make("videoscale") + adjusted_caps = Gst.Caps.from_string( + f"video/x-raw,width={width},height={height}" + ) + capsfilter = Gst.ElementFactory.make("capsfilter") + capsfilter.set_property("caps", adjusted_caps) + + self.pipeline.add(q, conv, videoscale, capsfilter) + + + self.pipeline.sync_children_states() + ret = pad.link(q.get_static_pad("sink")) + if ret != Gst.PadLinkReturn.OK: + log.error(f"Error linking pad: {ret}") + q.link(conv) + conv.link(videoscale) + videoscale.link(capsfilter) + self.link_element_or_pad(capsfilter.link, remote_video_sink) + + else: + self.pipeline.add(q, conv) + + self.pipeline.sync_children_states() + ret = pad.link(q.get_static_pad("sink")) + if ret != Gst.PadLinkReturn.OK: + log.error(f"Error linking pad: {ret}") + q.link(conv) + self.link_element_or_pad(conv, remote_video_sink) + + elif name.startswith("audio"): + log.debug("===> Audio OK") + conv = Gst.ElementFactory.make("audioconvert") + resample = Gst.ElementFactory.make("audioresample") + remote_audio_sink = Gst.ElementFactory.make("autoaudiosink") + self.pipeline.add(q, conv, resample, remote_audio_sink) + self.pipeline.sync_children_states() + ret = pad.link(q.get_static_pad("sink")) + if ret != Gst.PadLinkReturn.OK: + log.error(f"Error linking pad: {ret}") + q.link(conv) + conv.link(resample) + resample.link(remote_audio_sink) + + else: + log.warning(f"unmanaged name: {name!r}") + + def on_pad_added(self, __, pad: Gst.Pad) -> None: + """Handle the addition of a new pad to the element. + + When a new source pad is added to the element, this method creates a decodebin, + connects it to handle the stream, and links the pad to the decodebin. + + @param __: Placeholder for the signal source. Not used in this method. + @param pad: The newly added pad. + """ + log.debug("on_pad_added") + if pad.direction != Gst.PadDirection.SRC: + return + + decodebin = Gst.ElementFactory.make("decodebin") + decodebin.connect("pad-added", self.on_remote_decodebin_stream) + self.pipeline.add(decodebin) + decodebin.sync_state_with_parent() + pad.link(decodebin.get_static_pad("sink")) + + async def _start_call(self) -> None: + """Initiate the call. + + Initiates a call with the callee using the stored offer. If there are any buffered + local ICE candidates, they are sent as part of the initiation. + """ + assert self.callee + self.sid = await self.bridge.call_start( + str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile + ) + if self.local_candidates_buffer: + log.debug( + f"sending buffered local ICE candidates: {self.local_candidates_buffer}" + ) + if self.pwd is None: + sdp = self.webrtcbin.props.local_description.sdp.as_text() + self.extract_ufrag_pwd(sdp) + ice_data = {} + for media_type, candidates in self.local_candidates_buffer.items(): + ice_data[media_type] = { + "ufrag": self.ufrag, + "pwd": self.pwd, + "candidates": candidates, + } + await self.bridge.ice_candidates_add( + self.sid, data_format.serialise(ice_data), self.profile + ) + self.local_candidates_buffer.clear() + + def _remote_sdp_set(self, promise) -> None: + assert promise.wait() == Gst.PromiseResult.REPLIED + self.sdp_set = True + + def on_accepted_call(self, sdp: str, profile: str) -> None: + """Outgoing call has been accepted. + + @param sdp: The SDP answer string received from the other party. + @param profile: Profile used for the call. + """ + log.debug(f"SDP answer received: \n{sdp}") + + __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp) + answer = GstWebRTC.WebRTCSessionDescription.new( + GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg + ) + promise = Gst.Promise.new_with_change_func(self._remote_sdp_set) + self.webrtcbin.emit("set-remote-description", answer, promise) + + async def answer_call(self, sdp: str, profile: str) -> None: + """Answer an incoming call + + @param sdp: The SDP offer string received from the initiator. + @param profile: Profile used for the call. + + @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in + payload types. + """ + log.debug(f"SDP offer received: \n{sdp}") + self._set_media_types(sdp) + __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp) + payload_types = self.get_payload_types( + offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" + ) + assert "VP8" in payload_types + assert "OPUS" in payload_types + await self.setup_call( + "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] + ) + self.start_pipeline() + offer = GstWebRTC.WebRTCSessionDescription.new( + GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg + ) + promise = Gst.Promise.new_with_change_func(self.on_offer_set) + self.webrtcbin.emit("set-remote-description", offer, promise) + + def on_ice_candidate(self, webrtc, mline_index, candidate_sdp): + """Handles the on-ice-candidate signal of webrtcbin. + + @param webrtc: The webrtcbin element. + @param mlineindex: The mline index. + @param candidate: The ICE candidate. + """ + log.debug( + f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" + ) + parsed_candidate = self.parse_ice_candidate(candidate_sdp) + try: + media_type = self.media_types[mline_index] + except KeyError: + raise exceptions.InternalError("can't find media type") + + if self.sid is None: + log.debug("buffering local ICE candidate") + self.local_candidates_buffer.setdefault(media_type, []).append( + parsed_candidate + ) + else: + sdp = self.webrtcbin.props.local_description.sdp.as_text() + assert sdp is not None + ufrag, pwd = self.extract_ufrag_pwd(sdp) + ice_data = {"ufrag": ufrag, "pwd": pwd, "candidates": [parsed_candidate]} + self._a_call( + self.bridge.ice_candidates_add, + self.sid, + data_format.serialise({media_type: ice_data}), + self.profile, + ) + + def on_ice_candidates_new(self, candidates: dict) -> None: + """Handle new ICE candidates. + + @param candidates: A dictionary containing media types ("audio" or "video") as + keys and corresponding ICE data as values. + + @raise exceptions.InternalError: Raised when sdp mline index is not found. + """ + if not self.sdp_set: + log.debug("buffering remote ICE candidate") + for media_type in ("audio", "video"): + media_candidates = candidates.get(media_type) + if media_candidates: + buffer = self.remote_candidates_buffer[media_type] + buffer["candidates"].extend(media_candidates["candidates"]) + return + for media_type, ice_data in candidates.items(): + for candidate in ice_data["candidates"]: + candidate_sdp = self.build_ice_candidate(candidate) + try: + mline_index = self.get_sdp_mline_index(media_type) + except Exception as e: + raise exceptions.InternalError(f"Can't find sdp mline index: {e}") + self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp) + log.debug( + f"Remote ICE candidate added. MLine Index: {mline_index}, " + f"{candidate_sdp}" + ) + + def on_ice_gathering_state_change(self, pspec, __): + state = self.webrtcbin.get_property("ice-gathering-state") + log.debug(f"ICE gathering state changed to {state}") + + def on_ice_connection_state(self, pspec, __): + state = self.webrtcbin.props.ice_connection_state + if state == GstWebRTC.WebRTCICEConnectionState.FAILED: + log.error("ICE connection failed") + log.info(f"ICE connection state changed to {state}") + + def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None: + """Handles the GStreamer bus error messages. + + @param bus: The GStreamer bus. + @param message: The error message. + """ + err, debug = message.parse_error() + log.error(f"Error from {message.src.get_name()}: {err.message}") + log.error(f"Debugging info: {debug}") + + def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: + """Handles the GStreamer bus eos messages. + + @param bus: The GStreamer bus. + @param message: The eos message. + """ + log.info("End of stream") + + def on_audio_mute(self, muted: bool) -> None: + if self.audio_valve is not None: + self.audio_valve.set_property("drop", muted) + state = "muted" if muted else "unmuted" + log.info(f"audio is now {state}") + + def on_video_mute(self, muted: bool) -> None: + if self.video_selector is not None: + # when muted, we switch to a black image and deactivate the camera + if not muted: + self.video_src.set_state(Gst.State.PLAYING) + pad = self.video_selector.get_static_pad("sink_1" if muted else "sink_0") + self.video_selector.props.active_pad = pad + if muted: + self.video_src.set_state(Gst.State.NULL) + state = "muted" if muted else "unmuted" + log.info(f"video is now {state}") + + async def end_call(self) -> None: + """Stop streaming and clean instance""" + self.reset_instance()