Mercurial > libervia-backend
view libervia/frontends/tools/webrtc.py @ 4303:a7ec325246fb
component email-gateway: first draft:
Initial implementation of the Email Gateway.
This component uses XEP-0100 for registration. Upon registration and subsequent startups,
a connection is made to registered IMAP services, and incoming emails (in `INBOX`
mailboxes) are immediately forwarded as XMPP messages.
In the opposite direction, an SMTP connection is established to send emails on incoming
XMPP messages.
rel 449
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 06 Sep 2024 18:07:17 +0200 |
parents | 2992f9d1e039 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia WebRTC implementation # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from collections.abc import Awaitable import gi gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import Gst, GstWebRTC, GstSdp from libervia.backend.core import exceptions try: from gi.overrides import Gst as _ except ImportError: raise exceptions.MissingModule( "No GStreamer Python overrides available. Please install relevant packages on " "your system (e.g., `python3-gst-1.0` on Debian and derivatives)." ) import asyncio from datetime import datetime import logging import re from typing import Callable, Final from urllib.parse import quote_plus from libervia.backend.tools.common import data_format from libervia.frontends.tools import aio, display_servers, jid from .webrtc_models import ( CallData, SinksApp, SinksAuto, SinksData, SinksDataChannel, SinksNone, SourcesAuto, SourcesData, SourcesDataChannel, SourcesNone, SourcesPipeline, SourcesTest, ) current_server = display_servers.detect() if current_server == display_servers.X11: # GSTreamer's ximagesrc documentation asks to run this function import ctypes ctypes.CDLL("libX11.so.6").XInitThreads() log = logging.getLogger(__name__) VIDEO_SOURCE_AUTO: Final = "v4l2src" AUDIO_SOURCE_AUTO: Final = "pulsesrc" NONE_NOT_IMPLEMENTED_MSG: Final = "None value is not handled yet." Gst.init(None) class WebRTC: """GSTreamer based WebRTC implementation for audio and video communication. This class encapsulates the WebRTC functionalities required for initiating and handling audio and video calls, and data channels. """ def __init__( self, bridge, profile: str, sources_data: SourcesData | None = None, sinks_data: SinksData | None = None, reset_cb: Callable | None = None, merge_pip: bool | None = None, target_size: tuple[int, int] | None = None, call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None, dc_data_list: list[SourcesDataChannel | SinksDataChannel] | None = None, ) -> None: """Initializes a new WebRTC instance. @param bridge: An instance of backend bridge. @param profile: Libervia profile. @param sources_data: Data of the sources. The model used will determine which sources to use. SourcesDataChannel can be used here as a convenience. It will then be moved to ``data_channels`` and ``SourcesNone`` will be used instead for ``sources_data``. If None, SourcesAuto will be used. @param sinks_data: Data of the sinks. The model used will determine which sinks to use. SinksDataChannel can be used here as a convenience. It will then be moved to ``data_channels`` and ``SinksNone`` will be used instead for ``sinks_data``. If None, SinksAuto will be used. @param reset_cb: An optional Callable that is triggered on reset events. Can be used to reset UI data on new calls. @param merge_pip: A boolean flag indicating whether Picture-in-Picture mode is enabled. When PiP is used, local feedback is merged to remote video stream. Only one video stream is then produced (the local one). If None, PiP mode is selected automatically according to selected sink (it's used for SinksAuto only for now). @param target_size: Expected size of the final sink stream. Mainly use by composer when ``merge_pip`` is set. None to autodetect (no real autodetection implemeted yet, default to (1280,720)). @param call_start_cb: Called when call is started. @param dc_data_list: Data Channels to create. If a SourcesDataChannel is used as ``sources_data``, or a SinksDataChannel is used as ``sinks_data``, they will be automatically added to this list. """ self.main_loop = asyncio.get_event_loop() self.bridge = bridge self.profile = profile self.pipeline = None self._audio_muted = False self._video_muted = False self._desktop_sharing = False self.desktop_sharing_data = None if dc_data_list is None: dc_data_list = [] self.dc_data_list = dc_data_list if sources_data is None: sources_data = SourcesAuto() elif isinstance(sources_data, SourcesDataChannel): dc_data_list.append(sources_data) sources_data = SourcesNone() self.sources_data = sources_data if sinks_data is None: sinks_data = SinksAuto() elif isinstance(sinks_data, SinksDataChannel): dc_data_list.append(sinks_data) sinks_data = SinksNone() self.sinks_data = sinks_data if target_size is None: target_size = (1280, 720) self.target_width, self.target_height = target_size if merge_pip is None: merge_pip = isinstance(sinks_data, SinksAuto) self.merge_pip = merge_pip if call_start_cb is None: call_start_cb = self._call_start self.call_start_cb = call_start_cb if isinstance(sinks_data, SinksApp): if merge_pip and sinks_data.remote_video_cb is not None: raise ValueError("Remote_video_cb can't be used when merge_pip is used!") self.reset_cb = reset_cb if current_server == display_servers.WAYLAND: from .portal_desktop import DesktopPortal self.desktop_portal = DesktopPortal( on_session_closed_cb=self.on_portal_session_closed ) else: self.desktop_portal = None self.reset_instance() @property def audio_muted(self): return self._audio_muted @audio_muted.setter def audio_muted(self, muted: bool) -> None: if muted != self._audio_muted: self._audio_muted = muted self.on_audio_mute(muted) @property def video_muted(self): return self._video_muted @video_muted.setter def video_muted(self, muted: bool) -> None: if muted != self._video_muted: self._video_muted = muted self.on_video_mute(muted) @property def desktop_sharing(self): return self._desktop_sharing @desktop_sharing.setter def desktop_sharing(self, active: bool) -> None: if active != self._desktop_sharing: self._desktop_sharing = active self.on_desktop_switch(active) try: cb = self.bindings["desktop_sharing"] except KeyError: pass else: cb(active) @property def sdp_set(self): return self._sdp_set @sdp_set.setter def sdp_set(self, is_set: bool): self._sdp_set = is_set if is_set: self.on_ice_candidates_new(self.remote_candidates_buffer) for data in self.remote_candidates_buffer.values(): data["candidates"].clear() @property def media_types(self): if self._media_types is None: raise Exception("self._media_types should not be None!") return self._media_types @media_types.setter def media_types(self, new_media_types: dict) -> None: self._media_types = new_media_types self._media_types_inv = {v: k for k, v in new_media_types.items()} @property def media_types_inv(self) -> dict: if self._media_types_inv is None: raise Exception("self._media_types_inv should not be None!") return self._media_types_inv def bind(self, **kwargs: Callable) -> None: self.bindings.clear() for key, cb in kwargs.items(): if key not in ("desktop_sharing",): raise ValueError( 'Only "desktop_sharing" is currently allowed for binding' ) self.bindings[key] = cb def generate_dot_file( self, filename: str = "pipeline", details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL, with_timestamp: bool = True, bin_: Gst.Bin | None = None, ) -> None: """Generate Dot File for debugging ``GST_DEBUG_DUMP_DOT_DIR`` environment variable must be set to destination dir. ``dot -Tpng -o <filename>.png <filename>.dot`` can be use to convert to a PNG file. See https://gstreamer.freedesktop.org/documentation/gstreamer/debugutils.html?gi-language=python#GstDebugGraphDetails for details. @param filename: name of the generated file @param details: which details to print @param with_timestamp: if True, add a timestamp to filename @param bin_: which bin to output. By default, the whole pipeline (``self.pipeline``) will be used. """ if bin_ is None: bin_ = self.pipeline if with_timestamp: timestamp = datetime.now().isoformat(timespec="milliseconds") filename = f"{timestamp}_filename" Gst.debug_bin_to_dot_file(bin_, details, filename) def get_sdp_mline_index(self, media_type: str) -> int: """Gets the sdpMLineIndex for a given media type. @param media_type: The type of the media. """ for index, m_type in self.media_types.items(): if m_type == media_type: return index raise ValueError(f"Media type '{media_type}' not found") def _set_media_types(self, offer_sdp: str) -> None: """Sets media types from offer SDP @param offer: RTC session description containing the offer """ sdp_lines = offer_sdp.splitlines() media_types = {} mline_index = 0 for line in sdp_lines: if line.startswith("m="): media_types[mline_index] = line[2 : line.find(" ")] mline_index += 1 self.media_types = media_types def _a_call(self, method, *args, **kwargs): """Call an async method in main thread""" aio.run_from_thread(method, *args, **kwargs, loop=self.main_loop) def get_payload_types( self, sdpmsg, video_encoding: str, audio_encoding: str ) -> dict[str, int | None]: """Find the payload types for the specified video and audio encoding. Very simplistically finds the first payload type matching the encoding name. More complex applications will want to match caps on profile-level-id, packetization-mode, etc. """ # method coming from gstreamer example (Matthew Waters, Nirbheek Chauhan) at # subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py video_pt = None audio_pt = None for i in range(0, sdpmsg.medias_len()): media = sdpmsg.get_media(i) for j in range(0, media.formats_len()): fmt = media.get_format(j) if fmt == "webrtc-datachannel": continue pt = int(fmt) caps = media.get_caps_from_media(pt) s = caps.get_structure(0) encoding_name = s["encoding-name"] if video_pt is None and encoding_name == video_encoding: video_pt = pt elif audio_pt is None and encoding_name == audio_encoding: audio_pt = pt return {video_encoding: video_pt, audio_encoding: audio_pt} def parse_ice_candidate(self, candidate_string): """Parses the ice candidate string. @param candidate_string: The ice candidate string to be parsed. """ pattern = re.compile( r"candidate:(?P<foundation>\S+) (?P<component_id>\d+) (?P<transport>\S+) " r"(?P<priority>\d+) (?P<address>\S+) (?P<port>\d+) typ " r"(?P<type>\S+)(?: raddr (?P<rel_addr>\S+) rport " r"(?P<rel_port>\d+))?(?: generation (?P<generation>\d+))?" ) match = pattern.match(candidate_string) if match: candidate_dict = match.groupdict() # Apply the correct types to the dictionary values candidate_dict["component_id"] = int(candidate_dict["component_id"]) candidate_dict["priority"] = int(candidate_dict["priority"]) candidate_dict["port"] = int(candidate_dict["port"]) if candidate_dict["rel_port"]: candidate_dict["rel_port"] = int(candidate_dict["rel_port"]) if candidate_dict["generation"]: candidate_dict["generation"] = candidate_dict["generation"] # Remove None values return {k: v for k, v in candidate_dict.items() if v is not None} else: log.warning(f"can't parse candidate: {candidate_string!r}") return None def build_ice_candidate(self, parsed_candidate): """Builds ICE candidate @param parsed_candidate: Dictionary containing parsed ICE candidate """ base_format = ( "candidate:{foundation} {component_id} {transport} {priority} " "{address} {port} typ {type}" ) if parsed_candidate.get("rel_addr") and parsed_candidate.get("rel_port"): base_format += " raddr {rel_addr} rport {rel_port}" if parsed_candidate.get("generation"): base_format += " generation {generation}" return base_format.format(**parsed_candidate) def extract_ufrag_pwd(self, sdp: str) -> None: """Retrieves ICE password and user fragment for SDP offer. @param sdp: The Session Description Protocol offer string. """ lines = sdp.splitlines() media = "" mid_media_map = {} bundle_media = set() bundle_ufrag = "" bundle_pwd = "" in_bundle = False for line in lines: if line.startswith("m="): media = line.split("=")[1].split()[0] elif line.startswith("a=mid:"): mid = line.split(":")[1].strip() mid_media_map[mid] = media elif line.startswith("a=group:BUNDLE"): in_bundle = True bundle_media = set(line.split(":")[1].strip().split()) elif line.startswith("a=ice-ufrag:"): if in_bundle: bundle_ufrag = line.split(":")[1].strip() else: self.ufrag[media] = line.split(":")[1].strip() elif line.startswith("a=ice-pwd:"): if in_bundle: bundle_pwd = line.split(":")[1].strip() else: self.pwd[media] = line.split(":")[1].strip() else: in_bundle = False if bundle_ufrag and bundle_pwd: for mid in bundle_media: media = mid_media_map[mid] self.ufrag[media] = bundle_ufrag self.pwd[media] = bundle_pwd def reset_instance(self): """Inits or resets the instance variables to their default state.""" self.role: str | None = None if self.pipeline is not None: self.pipeline.set_state(Gst.State.NULL) self.pipeline = None self._remote_video_pad = None self.sid: str | None = None self.offer: str | None = None self.local_candidates_buffer = {} self.ufrag: dict[str, str] = {} self.pwd: dict[str, str] = {} self.callee: jid.JID | None = None self._media_types = None self._media_types_inv = None self._sdp_set: bool = False self.remote_candidates_buffer: dict[str, dict[str, list]] = { "audio": {"candidates": []}, "video": {"candidates": []}, } self._media_types = None self._media_types_inv = None self.audio_valve = None self.video_valve = None self.video_selector = None if self.desktop_portal is not None: self.desktop_portal.end_session() self.desktop_sharing = False self.desktop_sink_pad = None self.bindings = {} if self.reset_cb is not None: self.reset_cb() self.data_channels: dict[str, GstWebRTC.WebRTCDataChannel] = {} @property def data_channel(self) -> GstWebRTC.WebRTCDataChannel: """Convenience method to get WebRTCDataChannel instance when there is only one.""" if len(self.data_channels) != 1: raise exceptions.InternalError( "self.data_channel can only be used in a single Data Channel scenario. " "Use self.data_channels dict instead." ) return next(iter(self.data_channels.values())) async def setup_call( self, role: str, audio_pt: int | None = 96, video_pt: int | None = 97, ) -> None: """Sets up the call. This method establishes the Gstreamer pipeline for audio and video communication. The method also manages STUN and TURN server configurations, signal watchers, and various connection handlers for the webrtcbin. @param role: The role for the call, either 'initiator' or 'responder'. @param audio_pt: The payload type for the audio stream. @param video_pt: The payload type for the video stream @raises NotImplementedError: If audio_pt or video_pt is set to None. @raises AssertionError: If the role is not 'initiator' or 'responder'. """ assert role in ("initiator", "responder") self.role = role if isinstance(self.sources_data, SourcesPipeline): if self.sources_data.video_pipeline != "" and video_pt is None: raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) if self.sources_data.audio_pipeline != "" and audio_pt is None: raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) elif isinstance(self.sources_data, SourcesNone): pass else: if audio_pt is None or video_pt is None: raise NotImplementedError(NONE_NOT_IMPLEMENTED_MSG) match self.sources_data: case SourcesAuto(): video_source_elt = VIDEO_SOURCE_AUTO audio_source_elt = AUDIO_SOURCE_AUTO case SourcesNone(): video_source_elt = "" audio_source_elt = "" case SourcesPipeline() as source: if source.video_pipeline is None: video_source_elt = VIDEO_SOURCE_AUTO else: video_source_elt = source.video_pipeline if source.audio_pipeline is None: audio_source_elt = AUDIO_SOURCE_AUTO else: audio_source_elt = source.audio_pipeline case SourcesTest(): video_source_elt = "videotestsrc is-live=true pattern=ball" audio_source_elt = "audiotestsrc" case _: raise exceptions.InternalError( f'Unexpected "sources_data" value: {self.sources_data!r}' ) match self.sinks_data: case SinksApp(): local_video_sink_elt = ( "appsink name=local_video_sink emit-signals=true drop=true " "max-buffers=1 sync=True" ) case SinksAuto(): if isinstance(self.sources_data, SourcesNone): local_video_sink_elt = "" else: local_video_sink_elt = "autovideosink" case SinksNone(): local_video_sink_elt = "" case _: raise exceptions.InternalError( f'Unexpected "sinks_data" value {self.sinks_data!r}' ) gst_pipe_elements = [ "webrtcbin latency=30 name=sendrecv bundle-policy=max-bundle" ] if self.merge_pip and local_video_sink_elt: # Compositor is used to merge local video feedback in video sink, useful when # we have only a single video sink. gst_pipe_elements.append( "compositor name=compositor background=black " f"! video/x-raw,width={self.target_width}," f"height={self.target_height},framerate=30/1 " f"! {local_video_sink_elt}" ) local_video_sink_elt = "compositor.sink_1" if video_source_elt: # Video source with an input-selector to switch between normal and video mute # (or desktop sharing). gst_pipe_elements.append( f""" input-selector name=video_selector ! videorate drop-only=1 max-rate=30 ! video/x-raw,framerate=30/1 ! tee name=t {video_source_elt} name=video_src ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream ! video_selector. videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. t. ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream ! videoscale ! videoconvert ! vp8enc deadline=1 keyframe-max-dist=30 ! rtpvp8pay picture-id-mode=15-bit ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} ! sendrecv. """ ) if local_video_sink_elt: # Local video feedback. gst_pipe_elements.append( f""" t. ! queue max-size-buffers=1 max-size-time=0 max-size-bytes=0 leaky=downstream ! videoconvert ! {local_video_sink_elt} """ ) if audio_source_elt: # Audio with a valve for muting. gst_pipe_elements.append( f""" {audio_source_elt} name=audio_src ! valve ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream ! audioconvert ! audioresample ! opusenc audio-type=voice ! rtpopuspay ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} ! sendrecv. """ ) self.gst_pipe_desc = "\n\n".join(gst_pipe_elements) log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") # Create the pipeline try: self.pipeline = Gst.parse_launch(self.gst_pipe_desc) except Exception: log.exception("Can't parse pipeline") self.pipeline = None if not self.pipeline: raise exceptions.InternalError("Failed to create Gstreamer pipeline.") if not isinstance(self.pipeline, Gst.Pipeline): # in the case of Data Channel there is a single element, and Gst.parse_launch # doesn't create a Pipeline in this case, so we do it manually. pipeline = Gst.Pipeline() pipeline.add(self.pipeline) self.pipeline = pipeline self.webrtcbin = self.pipeline.get_by_name("sendrecv") if self.webrtcbin is None: raise exceptions.InternalError("Can't get the pipeline.") # If video or audio sources are not created, ``get_by_name`` will return None. self.video_src = self.pipeline.get_by_name("video_src") self.muted_src = self.pipeline.get_by_name("muted_src") self.video_selector = self.pipeline.get_by_name("video_selector") if self.video_src and isinstance(self.sources_data, SourcesPipeline): for name, value in self.sources_data.video_properties.items(): self.video_src.set_property(name, value) self.audio_src = self.pipeline.get_by_name("audio_src") if self.audio_src and isinstance(self.sources_data, SourcesPipeline): for name, value in self.sources_data.audio_properties.items(): self.audio_src.set_property(name, value) self.audio_valve = self.pipeline.get_by_name("audio_valve") if self.video_muted: self.on_video_mute(True) if self.audio_muted: self.on_audio_mute(True) # set STUN and TURN servers external_disco = data_format.deserialise( await self.bridge.external_disco_get("", self.profile), type_check=list ) for server in external_disco: if server["type"] == "stun": if server["transport"] == "tcp": log.info( "ignoring TCP STUN server, GStreamer only support one STUN server" ) url = f"stun://{server['host']}:{server['port']}" log.debug(f"adding stun server: {url}") self.webrtcbin.set_property("stun-server", url) elif server["type"] == "turn": url = "{scheme}://{username}:{password}@{host}:{port}".format( scheme="turns" if server["transport"] == "tcp" else "turn", username=quote_plus(server["username"]), password=quote_plus(server["password"]), host=server["host"], port=server["port"], ) log.debug(f"adding turn server: {url}") if not self.webrtcbin.emit("add-turn-server", url): log.warning(f"Erreur while adding TURN server {url}") # local video feedback if isinstance(self.sinks_data, SinksApp): local_video_sink = self.pipeline.get_by_name("local_video_sink") if local_video_sink is not None: local_video_sink.set_property("emit-signals", True) local_video_sink.connect("new-sample", self.sinks_data.local_video_cb) local_video_sink_caps = Gst.Caps.from_string(f"video/x-raw,format=RGB") local_video_sink.set_property("caps", local_video_sink_caps) # Create bus and associate signal watchers self.bus = self.pipeline.get_bus() if not self.bus: log.error("Failed to get bus from pipeline.") return self.bus.add_signal_watch() self.webrtcbin.connect("pad-added", self.on_pad_added) self.bus.connect("message::error", self.on_bus_error) self.bus.connect("message::eos", self.on_bus_eos) self.webrtcbin.connect("on-negotiation-needed", self.on_negotiation_needed) self.webrtcbin.connect("on-ice-candidate", self.on_ice_candidate) self.webrtcbin.connect( "notify::ice-gathering-state", self.on_ice_gathering_state_change ) self.webrtcbin.connect( "notify::ice-connection-state", self.on_ice_connection_state ) for dc_data in self.dc_data_list: self.create_data_channel(dc_data) def create_data_channel(self, dc_data: SourcesDataChannel | SinksDataChannel) -> None: """Create a Data Channel and connect relevant callbacks.""" assert self.pipeline is not None if isinstance(dc_data, SourcesDataChannel): # Data channel configuration for compatibility with browser defaults data_channel_options = Gst.Structure.new_empty("data-channel-options") data_channel_options.set_value("ordered", True) data_channel_options.set_value("protocol", "") # Create the data channel self.pipeline.set_state(Gst.State.READY) self.data_channels[dc_data.name] = data_channel = self.webrtcbin.emit( "create-data-channel", dc_data.name, data_channel_options ) if data_channel is None: log.error("Failed to create data channel") return data_channel.connect("on-open", dc_data.dc_open_cb) elif isinstance(dc_data, SinksDataChannel): self.webrtcbin.connect("on-data-channel", dc_data.dc_on_data_channel) else: raise ValueError("Only SourcesDataChannel or SinksDataChannel are allowed.") def start_pipeline(self) -> None: """Starts the GStreamer pipeline.""" log.debug("starting the pipeline") self.pipeline.set_state(Gst.State.PLAYING) def on_negotiation_needed(self, webrtc): """Initiate SDP offer when negotiation is needed.""" log.debug("Negotiation needed.") if self.role == "initiator": log.debug("Creating offer…") promise = Gst.Promise.new_with_change_func(self.on_offer_created) self.webrtcbin.emit("create-offer", None, promise) def on_offer_created(self, promise): """Callback for when SDP offer is created.""" log.info("on_offer_created called") assert promise.wait() == Gst.PromiseResult.REPLIED reply = promise.get_reply() if reply is None: log.error("Promise reply is None. Offer creation might have failed.") return offer = reply["offer"] self.offer = offer.sdp.as_text() log.info(f"SDP offer created: \n{self.offer}") self._set_media_types(self.offer) promise = Gst.Promise.new() self.webrtcbin.emit("set-local-description", offer, promise) promise.interrupt() self._a_call(self._start_call) def on_answer_set(self, promise): assert promise.wait() == Gst.PromiseResult.REPLIED def on_answer_created(self, promise, _, __): """Callback for when SDP answer is created.""" assert promise.wait() == Gst.PromiseResult.REPLIED reply = promise.get_reply() answer = reply["answer"] promise = Gst.Promise.new() self.webrtcbin.emit("set-local-description", answer, promise) promise.interrupt() answer_sdp = answer.sdp.as_text() log.info(f"SDP answer set: \n{answer_sdp}") self.sdp_set = True self._a_call(self.bridge.call_answer_sdp, self.sid, answer_sdp, self.profile) def on_offer_set(self, promise): assert promise.wait() == Gst.PromiseResult.REPLIED promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None) self.webrtcbin.emit("create-answer", None, promise) def link_element_or_pad( self, source: Gst.Element, dest: Gst.Element | Gst.Pad ) -> bool: """Check if dest is a pad or an element, and link appropriately""" src_pad = source.get_static_pad("src") if isinstance(dest, Gst.Pad): # If the dest is a pad, link directly if not src_pad.link(dest) == Gst.PadLinkReturn.OK: log.error( "Failed to link 'conv' to the compositor's newly requested pad!" ) return False elif isinstance(dest, Gst.Element): return source.link(dest) else: log.error(f"Unexpected type for dest: {type(dest)}") return False return True def scaled_dimensions( self, original_width: int, original_height: int, max_width: int, max_height: int ) -> tuple[int, int]: """Calculates the scaled dimensions preserving aspect ratio. @param original_width: Original width of the video stream. @param original_height: Original height of the video stream. @param max_width: Maximum desired width for the scaled video. @param max_height: Maximum desired height for the scaled video. @return: The width and height of the scaled video. """ aspect_ratio = original_width / original_height new_width = int(max_height * aspect_ratio) if new_width <= max_width: return new_width, max_height new_height = int(max_width / aspect_ratio) return max_width, new_height def on_remote_decodebin_stream(self, _, pad: Gst.Pad) -> None: """Handle the stream from the remote decodebin. This method processes the incoming stream from the remote decodebin, determining whether it's video or audio. It then sets up the appropriate GStreamer elements for video/audio processing and adds them to the pipeline. @param pad: The Gst.Pad from the remote decodebin producing the stream. """ assert self.pipeline is not None if not pad.has_current_caps(): log.error(f"{pad} has no caps, ignoring") return caps = pad.get_current_caps() assert len(caps) s = caps[0] name = s.get_name() log.debug(f"====> NAME START: {name}") q = Gst.ElementFactory.make("queue") if name.startswith("video"): log.debug("===> VIDEO OK") self._remote_video_pad = pad # Check and log the original size of the video width = self.target_width height = self.target_height log.info(f"Original video size: {width}x{height}") # This is a fix for an issue found with Movim on desktop: a non standard # resolution is used (990x557) resulting in bad alignement and no color in # rendered image adjust_resolution = width % 4 != 0 or height % 4 != 0 if adjust_resolution: log.info("non standard resolution, we need to adjust size") width = (width + 3) // 4 * 4 height = (height + 3) // 4 * 4 log.info(f"Adjusted video size: {width}x{height}") conv = Gst.ElementFactory.make("videoconvert") if self.merge_pip: # with ``merge_pip`` set, we plug the remote stream to the composer compositor = self.pipeline.get_by_name("compositor") sink1_pad = compositor.get_static_pad("sink_1") local_width, local_height = self.scaled_dimensions( sink1_pad.get_property("width"), sink1_pad.get_property("height"), width // 3, height // 3, ) sink1_pad.set_property("xpos", width - local_width) sink1_pad.set_property("ypos", height - local_height) sink1_pad.set_property("width", local_width) sink1_pad.set_property("height", local_height) sink1_pad.set_property("sizing-policy", 1) sink1_pad.set_property("zorder", 1) # Request a new pad for the remote stream sink_pad_template = compositor.get_pad_template("sink_%u") remote_video_sink = compositor.request_pad(sink_pad_template, None, None) remote_video_sink.set_property("zorder", 0) remote_video_sink.set_property("width", width) remote_video_sink.set_property("height", height) remote_video_sink.set_property("sizing-policy", 1) elif isinstance(self.sinks_data, SinksApp): # ``app`` sink without ``self.merge_pip`` set, be create the sink and # connect it to the ``remote_video_cb``. remote_video_sink = Gst.ElementFactory.make("appsink") remote_video_caps = Gst.Caps.from_string("video/x-raw,format=RGB") remote_video_sink.set_property("caps", remote_video_caps) remote_video_sink.set_property("emit-signals", True) remote_video_sink.set_property("drop", True) remote_video_sink.set_property("max-buffers", 1) remote_video_sink.set_property("sync", True) remote_video_sink.connect("new-sample", self.sinks_data.remote_video_cb) self.pipeline.add(remote_video_sink) elif isinstance(self.sinks_data, SinksAuto): # if ``self.merge_pip`` is not set, we create a dedicated # ``autovideosink`` for remote stream. remote_video_sink = Gst.ElementFactory.make("autovideosink") self.pipeline.add(remote_video_sink) else: raise exceptions.InternalError( f'Unhandled "sinks_data" value: {self.sinks_data!r}' ) if adjust_resolution: videoscale = Gst.ElementFactory.make("videoscale") adjusted_caps = Gst.Caps.from_string( f"video/x-raw,width={width},height={height}" ) capsfilter = Gst.ElementFactory.make("capsfilter") capsfilter.set_property("caps", adjusted_caps) self.pipeline.add(q, conv, videoscale, capsfilter) self.pipeline.sync_children_states() ret = pad.link(q.get_static_pad("sink")) if ret != Gst.PadLinkReturn.OK: log.error(f"Error linking pad: {ret}") q.link(conv) conv.link(videoscale) videoscale.link(capsfilter) self.link_element_or_pad(capsfilter.link, remote_video_sink) else: self.pipeline.add(q, conv) self.pipeline.sync_children_states() ret = pad.link(q.get_static_pad("sink")) if ret != Gst.PadLinkReturn.OK: log.error(f"Error linking pad: {ret}") q.link(conv) self.link_element_or_pad(conv, remote_video_sink) elif name.startswith("audio"): log.debug("===> Audio OK") conv = Gst.ElementFactory.make("audioconvert") resample = Gst.ElementFactory.make("audioresample") remote_audio_sink = Gst.ElementFactory.make("autoaudiosink") self.pipeline.add(q, conv, resample, remote_audio_sink) self.pipeline.sync_children_states() ret = pad.link(q.get_static_pad("sink")) if ret != Gst.PadLinkReturn.OK: log.error(f"Error linking pad: {ret}") q.link(conv) conv.link(resample) resample.link(remote_audio_sink) else: log.warning(f"unmanaged name: {name!r}") def on_pad_added(self, __, pad: Gst.Pad) -> None: """Handle the addition of a new pad to the element. When a new source pad is added to the element, this method creates a decodebin, connects it to handle the stream, and links the pad to the decodebin. @param __: Placeholder for the signal source. Not used in this method. @param pad: The newly added pad. """ log.debug("on_pad_added") if pad.direction != Gst.PadDirection.SRC: return decodebin = Gst.ElementFactory.make("decodebin") decodebin.connect("pad-added", self.on_remote_decodebin_stream) self.pipeline.add(decodebin) decodebin.sync_state_with_parent() pad.link(decodebin.get_static_pad("sink")) async def _call_start(self, callee: jid.JID, call_data: dict, profile: str) -> str: return await self.bridge.call_start( str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile ) async def _start_call(self) -> None: """Initiate the call. Initiates a call with the callee using the stored offer. If there are any buffered local ICE candidates, they are sent as part of the initiation. """ assert self.callee assert self.call_start_cb is not None self.sid = await self.call_start_cb( self.callee, {"sdp": self.offer}, self.profile ) if self.local_candidates_buffer: log.debug( f"sending buffered local ICE candidates: {self.local_candidates_buffer}" ) if not self.pwd: sdp = self.webrtcbin.props.local_description.sdp.as_text() self.extract_ufrag_pwd(sdp) ice_data = {} for media_type, candidates in self.local_candidates_buffer.items(): ice_data[media_type] = { "ufrag": self.ufrag[media_type], "pwd": self.pwd[media_type], "candidates": candidates, } await self.bridge.ice_candidates_add( self.sid, data_format.serialise(ice_data), self.profile ) self.local_candidates_buffer.clear() def _remote_sdp_set(self, promise) -> None: assert promise.wait() == Gst.PromiseResult.REPLIED self.sdp_set = True def on_accepted_call(self, sdp: str, profile: str) -> None: """Outgoing call has been accepted. @param sdp: The SDP answer string received from the other party. @param profile: Profile used for the call. """ log.debug(f"SDP answer received: \n{sdp}") __, sdpmsg = GstSdp.SDPMessage.new_from_text(sdp) answer = GstWebRTC.WebRTCSessionDescription.new( GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg ) promise = Gst.Promise.new_with_change_func(self._remote_sdp_set) self.webrtcbin.emit("set-remote-description", answer, promise) async def answer_call(self, sdp: str, profile: str) -> None: """Answer an incoming call @param sdp: The SDP offer string received from the initiator. @param profile: Profile used for the call. @raise AssertionError: Raised when either "VP8" or "OPUS" is not present in payload types. """ log.debug(f"SDP offer received: \n{sdp}") self._set_media_types(sdp) __, offer_sdp_msg = GstSdp.SDPMessage.new_from_text(sdp) payload_types = self.get_payload_types( offer_sdp_msg, video_encoding="VP8", audio_encoding="OPUS" ) assert "VP8" in payload_types assert "OPUS" in payload_types try: await self.setup_call( "responder", audio_pt=payload_types["OPUS"], video_pt=payload_types["VP8"] ) except Exception: log.exception("Can't setup call") raise self.start_pipeline() offer = GstWebRTC.WebRTCSessionDescription.new( GstWebRTC.WebRTCSDPType.OFFER, offer_sdp_msg ) promise = Gst.Promise.new_with_change_func(self.on_offer_set) self.webrtcbin.emit("set-remote-description", offer, promise) def on_ice_candidate(self, webrtc, mline_index, candidate_sdp): """Handles the on-ice-candidate signal of webrtcbin. @param webrtc: The webrtcbin element. @param mlineindex: The mline index. @param candidate: The ICE candidate. """ log.debug( f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" ) parsed_candidate = self.parse_ice_candidate(candidate_sdp) if parsed_candidate is None: log.warning(f"Can't parse candidate: {candidate_sdp}") return try: media_type = self.media_types[mline_index] except KeyError: raise exceptions.InternalError("can't find media type") if self.sid is None: log.debug("buffering local ICE candidate") self.local_candidates_buffer.setdefault(media_type, []).append( parsed_candidate ) else: sdp = self.webrtcbin.props.local_description.sdp.as_text() assert sdp is not None self.extract_ufrag_pwd(sdp) ice_data = { "ufrag": self.ufrag[media_type], "pwd": self.pwd[media_type], "candidates": [parsed_candidate], } self._a_call( self.bridge.ice_candidates_add, self.sid, data_format.serialise({media_type: ice_data}), self.profile, ) def on_ice_candidates_new(self, candidates: dict) -> None: """Handle new ICE candidates. @param candidates: A dictionary containing media types ("audio" or "video") as keys and corresponding ICE data as values. @raise exceptions.InternalError: Raised when sdp mline index is not found. """ if not self.sdp_set: log.debug("buffering remote ICE candidate") for media_type in ("audio", "video"): media_candidates = candidates.get(media_type) if media_candidates: buffer = self.remote_candidates_buffer[media_type] buffer["candidates"].extend(media_candidates["candidates"]) return for media_type, ice_data in candidates.items(): for candidate in ice_data["candidates"]: candidate_sdp = self.build_ice_candidate(candidate) try: mline_index = self.get_sdp_mline_index(media_type) except Exception as e: raise exceptions.InternalError(f"Can't find sdp mline index: {e}") self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp) log.warning( f"Remote ICE candidate added. MLine Index: {mline_index}, " f"{candidate_sdp}" ) def on_ice_gathering_state_change(self, pspec, __): state = self.webrtcbin.get_property("ice-gathering-state") log.debug(f"ICE gathering state changed to {state}") def on_ice_connection_state(self, pspec, __): state = self.webrtcbin.props.ice_connection_state if state == GstWebRTC.WebRTCICEConnectionState.FAILED: log.error("ICE connection failed") log.info(f"ICE connection state changed to {state}") def on_bus_error(self, bus: Gst.Bus, message: Gst.Message) -> None: """Handles the GStreamer bus error messages. @param bus: The GStreamer bus. @param message: The error message. """ err, debug = message.parse_error() log.error(f"Error from {message.src.get_name()}: {err.message}") log.error(f"Debugging info: {debug}") def on_bus_eos(self, bus: Gst.Bus, message: Gst.Message) -> None: """Handles the GStreamer bus eos messages. @param bus: The GStreamer bus. @param message: The eos message. """ log.info("End of stream") def on_audio_mute(self, muted: bool) -> None: """Handles (un)muting of audio. @param muted: True if audio is muted. """ if self.audio_valve is not None: self.audio_valve.set_property("drop", muted) state = "muted" if muted else "unmuted" log.info(f"audio is now {state}") def on_video_mute(self, muted: bool) -> None: """Handles (un)muting of video. @param muted: True if video is muted. """ if self.video_selector is not None: current_source = ( None if muted else "desktop" if self.desktop_sharing else "video" ) self.switch_video_source(current_source) state = "muted" if muted else "unmuted" log.info(f"Video is now {state}") def on_desktop_switch(self, desktop_active: bool) -> None: """Switches the video source between desktop and video. @param desktop_active: True if desktop must be active. False for video. """ if desktop_active and self.desktop_portal is not None: aio.run_async(self.on_desktop_switch_portal(desktop_active)) else: self.do_desktop_switch(desktop_active) async def on_desktop_switch_portal(self, desktop_active: bool) -> None: """Call freedesktop screenshare portal and the activate the shared stream""" assert self.desktop_portal is not None try: screenshare_data = await self.desktop_portal.request_screenshare() except exceptions.CancelError: self.desktop_sharing = False return self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])} self.do_desktop_switch(desktop_active) def on_portal_session_closed(self) -> None: self.desktop_sharing = False def do_desktop_switch(self, desktop_active: bool) -> None: if self.video_muted: # Update the active source state but do not switch self.desktop_sharing = desktop_active return source = "desktop" if desktop_active else "video" self.switch_video_source(source) self.desktop_sharing = desktop_active def switch_video_source(self, source: str | None) -> None: """Activates the specified source while deactivating the others. @param source: 'desktop', 'video', 'muted' or None for muted source. """ if source is None: source = "muted" if source not in ["video", "muted", "desktop"]: raise ValueError( f"Invalid source: {source!r}, use one of {'video', 'muted', 'desktop'}" ) self.pipeline.set_state(Gst.State.PAUSED) # Create a new desktop source if necessary if source == "desktop": self._setup_desktop_source(self.desktop_sharing_data) # Activate the chosen source and deactivate the others for src_name in ["video", "muted", "desktop"]: src_element = self.pipeline.get_by_name(f"{src_name}_src") if src_name == source: if src_element: src_element.set_state(Gst.State.PLAYING) else: if src_element: if src_name == "desktop": self._remove_desktop_source(src_element) else: src_element.set_state(Gst.State.NULL) # Set the video_selector active pad if source == "desktop": if self.desktop_sink_pad: pad = self.desktop_sink_pad else: log.error(f"No desktop pad available") pad = None else: pad_name = f"sink_{['video', 'muted'].index(source)}" pad = self.video_selector.get_static_pad(pad_name) if pad is not None: self.video_selector.props.active_pad = pad self.pipeline.set_state(Gst.State.PLAYING) def _setup_desktop_source(self, properties: dict[str, object] | None) -> None: """Set up a new desktop source. @param properties: The properties to set on the desktop source. """ source_elt = "ximagesrc" if self.desktop_portal is None else "pipewiresrc" desktop_src = Gst.ElementFactory.make(source_elt, "desktop_src") if properties is None: properties = {} for key, value in properties.items(): log.debug(f"setting {source_elt} property: {key!r}={value!r}") desktop_src.set_property(key, value) video_convert = Gst.ElementFactory.make("videoconvert", "desktop_videoconvert") queue = Gst.ElementFactory.make("queue", "desktop_queue") queue.set_property("leaky", "downstream") self.pipeline.add(desktop_src) self.pipeline.add(video_convert) self.pipeline.add(queue) desktop_src.link(video_convert) video_convert.link(queue) sink_pad_template = self.video_selector.get_pad_template("sink_%u") self.desktop_sink_pad = self.video_selector.request_pad( sink_pad_template, None, None ) queue_src_pad = queue.get_static_pad("src") queue_src_pad.link(self.desktop_sink_pad) desktop_src.sync_state_with_parent() video_convert.sync_state_with_parent() queue.sync_state_with_parent() def _remove_desktop_source(self, desktop_src: Gst.Element) -> None: """Remove the desktop source from the pipeline. @param desktop_src: The desktop source to remove. """ # Remove elements for the desktop source video_convert = self.pipeline.get_by_name("desktop_videoconvert") queue = self.pipeline.get_by_name("desktop_queue") if video_convert: video_convert.set_state(Gst.State.NULL) desktop_src.unlink(video_convert) self.pipeline.remove(video_convert) if queue: queue.set_state(Gst.State.NULL) self.pipeline.remove(queue) desktop_src.set_state(Gst.State.NULL) self.pipeline.remove(desktop_src) # Release the pad associated with the desktop source if self.desktop_sink_pad: self.video_selector.release_request_pad(self.desktop_sink_pad) self.desktop_sink_pad = None if self.desktop_portal is not None: self.desktop_portal.end_session() async def end_call(self) -> None: """Stop streaming and clean instance""" self.reset_instance() class WebRTCCall: """Helper class to create and handle WebRTC. This class handles signals and communication of connection data with backend. """ def __init__( self, bridge, profile: str, callee: jid.JID, on_call_setup_cb: Callable | None = None, on_call_ended_cb: Callable | None = None, **kwargs, ): """Create and setup a webRTC instance @param bridge: async Bridge. @param profile: profile making or receiving the call @param callee: peer jid @param kwargs: extra kw args to use when instantiating WebRTC """ self.profile = profile self.webrtc = WebRTC(bridge, profile, **kwargs) self.webrtc.callee = callee self.on_call_setup_cb = on_call_setup_cb self.on_call_ended_cb = on_call_ended_cb bridge.register_signal("ice_candidates_new", self.on_ice_candidates_new, "plugin") bridge.register_signal("call_setup", self.on_call_setup, "plugin") bridge.register_signal("call_ended", self.on_call_ended, "plugin") @classmethod async def make_webrtc_call( cls, bridge, profile: str, call_data: CallData, **kwargs ) -> "WebRTCCall": """Create the webrtc_call instance @param call_data: Call data of the command @param kwargs: extra args used to instanciate WebRTCCall """ webrtc_call = cls(bridge, profile, call_data.callee, **call_data.kwargs, **kwargs) if call_data.sid is None: # we are making the call await webrtc_call.start() else: # we are receiving the call webrtc_call.sid = call_data.sid if call_data.action_id is not None: await bridge.action_launch( call_data.action_id, data_format.serialise({"cancelled": False}), profile, ) return webrtc_call @property def sid(self) -> str | None: return self.webrtc.sid @sid.setter def sid(self, new_sid: str | None) -> None: self.webrtc.sid = new_sid async def on_ice_candidates_new( self, sid: str, candidates_s: str, profile: str ) -> None: if sid != self.webrtc.sid or profile != self.profile: return self.webrtc.on_ice_candidates_new( data_format.deserialise(candidates_s), ) async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None: if sid != self.webrtc.sid or profile != self.profile: return setup_data = data_format.deserialise(setup_data_s) try: role = setup_data["role"] sdp = setup_data["sdp"] except KeyError: log.error(f"Invalid setup data received: {setup_data}") return if role == "initiator": self.webrtc.on_accepted_call(sdp, profile) elif role == "responder": await self.webrtc.answer_call(sdp, profile) else: log.error(f"Invalid role received during setup: {setup_data}") if self.on_call_setup_cb is not None: await aio.maybe_async(self.on_call_setup_cb(sid, profile)) async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None: if sid != self.webrtc.sid or profile != self.profile: return await self.webrtc.end_call() if self.on_call_ended_cb is not None: await aio.maybe_async(self.on_call_ended_cb(sid, profile)) async def start(self): """Start a call. To be used only if we are initiator """ try: await self.webrtc.setup_call("initiator") except Exception: log.exception("Can't setup call") raise self.webrtc.start_pipeline()