Mercurial > libervia-desktop-kivy
view libervia/desktop_kivy/plugins/plugin_wid_calls.py @ 510:97ab236e8f20
plugin calls: implement desktop sharing:
rel 433
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 16 Jan 2024 10:41:38 +0100 |
parents | f0ce49b360c8 |
children | 644a8d165e5a |
line wrap: on
line source
from dataclasses import dataclass from pathlib import Path from typing import Optional, Callable from functools import partial # from gi.repository import GLib from gi.repository import GObject, Gst, GstWebRTC, GstSdp from kivy.metrics import dp try: from gi.overrides import Gst as _ except ImportError: print( "no GStreamer python overrides available, please install relevant pacakges on " "your system." ) from kivy.clock import Clock from kivy.core.audio import Sound, SoundLoader from kivy.core.window import Window from kivy.graphics.texture import Texture from kivy.properties import ( BooleanProperty, ColorProperty, NumericProperty, ObjectProperty, ReferenceListProperty, ) from kivy.support import install_gobject_iteration from kivy.uix.button import Button from kivy.uix.image import Image from kivy.uix.popup import Popup from kivy.uix.screenmanager import Screen from kivy.uix.widget import Widget from libervia.backend.core.constants import Const as C from libervia.backend.core import log as logging from libervia.backend.core.i18n import _ from libervia.backend.core import exceptions from libervia.backend.tools.common import data_format from libervia.frontends.quick_frontend import quick_widgets from libervia.frontends.tools import aio, display_servers, jid, webrtc from libervia.desktop_kivy import G from ..core import cagou_widget from ..core import common from ..core.behaviors import FilterBehavior log = logging.getLogger(__name__) install_gobject_iteration() Gst.init(None) PLUGIN_INFO = { "name": _("calls"), "main": "Calls", "description": _("Audio/Video calls"), "icon_symbol": "phone", } @dataclass class TextureData: texture: Optional[Texture] = None size: Optional[tuple[int, int]] = None class SearchScreen(Screen): pass class InCallScreen(Screen): pass class CallButton(Button): parent_widget = ObjectProperty(None) class CallControlButton(common.SymbolButton): active = BooleanProperty(True) background_color = ColorProperty() margin_x = NumericProperty(0) margin_y = NumericProperty(0) margin = ReferenceListProperty(margin_x, margin_y) class VideoStreamWidget(Image): pass class WebRTC: """WebRTC implementation for audio and video communication. This class encapsulates the WebRTC functionalities required for initiating and handling audio and video calls. @attribute test_mode: A flag to indicate whether the WebRTC instance is in test mode. If true, test video and audio sources will be used. Otherwise first webcam and microphone available will be used. """ test_mode: bool = False PROXIED_PROPERTIES = { 'audio_muted', 'callee', 'desktop_sharing', 'sid', 'video_muted', 'desktop_sharing_data' } PROXIED_METHODS = {'answer_call', 'end_call', 'on_accepted_call', 'on_ice_candidates_new', 'setup_call', 'start_pipeline'} def __init__(self, parent_calls: "Calls", profile: str) -> None: self.parent_calls = parent_calls self.profile = profile self.webrtc = webrtc.WebRTC( G.host.a_bridge, profile, sinks=webrtc.SINKS_TEST if self.test_mode else webrtc.SINKS_APP, appsink_data=webrtc.AppSinkData( local_video_cb=partial( self.on_new_sample, update_sample_method=self.update_sample, video_widget=self.parent_calls.local_video ), remote_video_cb=partial( self.on_new_sample, update_sample_method=self.update_sample, video_widget=self.parent_calls.remote_video ) ), reset_cb=self.on_reset ) self.pipeline = None def __getattr__(self, name): if name in self.PROXIED_PROPERTIES or name in self.PROXIED_METHODS: return getattr(self.webrtc, name) raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __setattr__(self, name, value): if name in self.PROXIED_PROPERTIES: setattr(self.webrtc, name, value) else: super().__setattr__(name, value) def on_reset(self): self.texture_map: dict[VideoStreamWidget, TextureData] = {} def on_new_sample( self, video_sink: Gst.Element, update_sample_method: Callable, video_widget: VideoStreamWidget, ) -> bool: """Handles a new sample from the video sink. @param video_sink: The video sink to pull samples from. @param update_sample_method: The method to call for updating the sample. @param video_widget: The video widget (either local_video or remote_video). @return: Always False """ sample = video_sink.emit("pull-sample") if sample is None: return False try: texture_data = self.texture_map[video_widget] except KeyError: texture_data = self.texture_map[video_widget] = TextureData() # Get the current video size video_pad = video_sink.get_static_pad("sink") assert video_pad is not None s = video_pad.get_current_caps().get_structure(0) stream_size = (s.get_value("width"), s.get_value("height")) # Compare with the texture size texture_size = texture_data.size if texture_size != stream_size: log.debug(f"sample size update: {texture_size} => {stream_size}") texture_data.size = stream_size # texture needs to be recreated, so we reset the one in texture_data texture_data.texture = None Clock.schedule_once( partial( update_sample_method, sample=sample, video_widget=video_widget, ) ) return False def update_sample( self, dt: float, sample: Optional[Gst.Sample], video_widget: VideoStreamWidget, ) -> None: """Updates the video sample. This method runs in the main thread. @param dt: Time delta since the last call. This is passed by Clock.schedule_once. @param sample: The video sample to update. @param texture_id: identifier of the texture data (e.g. "local" or "remote") @param texture: The texture object. @param texture_size: The texture size. @param video_widget: The video widget. """ if sample is None: return try: texture_data = self.texture_map[video_widget] except KeyError: log.warning(f"no texture data found for {video_widget}") return if texture_data.texture is None and texture_data.size is not None: log.debug(f"===> creating texture for {video_widget}: {texture_data.size=}") texture = Texture.create(size=texture_data.size, colorfmt="rgb") assert texture is not None texture_data.texture = texture texture.flip_vertical() video_widget.texture = texture mapinfo = None buf = None try: buf = sample.get_buffer() _, mapinfo = buf.map(Gst.MapFlags.READ) buffer = mapinfo.data.tobytes() if texture_data.texture is None: log.debug("can't copy the buffer, texture is None") return texture_data.texture.blit_buffer(buffer, colorfmt="rgb") assert video_widget.canvas is not None video_widget.canvas.ask_update() buf.unmap(mapinfo) finally: if buf is not None and mapinfo is not None: buf.unmap(mapinfo) class WindowSelectButton(Button): pass class DesktopScreenDialog(Popup): windows_buttons = ObjectProperty() def __init__(self, calls: "Calls", **kwargs): super().__init__(**kwargs) self.calls = calls if display_servers.detect() == display_servers.X11: windows_data = display_servers.x11_list_windows() for window_data in windows_data: log.debug(f"- X11 windows found: {window_data}") button = WindowSelectButton( text = window_data["title"], on_press=partial(self.on_window_selected, xid=window_data["id"]) ) self.windows_buttons.add_widget(button) def on_window_selected(self, instance, xid: str) -> None: if xid is not None: self.calls.webrtc.desktop_sharing_data = {"xid": xid} else: self.calls.webrtc.desktop_sharing_data = None self.calls.desktop_sharing = True self.dismiss() class Calls( quick_widgets.QuickWidget, cagou_widget.LiberviaDesktopKivyWidget, FilterBehavior ): audio_muted = BooleanProperty(False) call_layout = ObjectProperty() call_screen = ObjectProperty() fullscreen = BooleanProperty(False) in_call = BooleanProperty(False) incoming_call_dialog: dict[str, Widget] = {} jid_selector = ObjectProperty() local_video = ObjectProperty() remote_video = ObjectProperty() ringtone: Sound | None = None screen_manager = ObjectProperty() signals_registered = False use_header_input = True video_muted = BooleanProperty(False) desktop_sharing = BooleanProperty(False) def __init__(self, host, target, profiles): quick_widgets.QuickWidget.__init__(self, G.host, target, profiles) cagou_widget.LiberviaDesktopKivyWidget.__init__(self) call_btn = CallButton( parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call()) ) self.header_input_add_extra(call_btn) self.webrtc = WebRTC(self, self.profile) self.previous_fullscreen = None self.reset_instance() @property def sid(self): return self.webrtc.sid def hang_up(self): if self.sid is not None: aio.run_async(self.toggle_call()) async def toggle_call(self): """Toggle between making a call and hanging up. This function will initiate or terminate a call based on the call state. """ if self.sid is None: # Initiate the call log.info("initiating call") callee = jid.JID(self.header_input.text.strip()) if not callee: return if not callee.is_valid(): G.host.add_note( _("Calls"), _("Can't make a call: invalid destinee {}").format(repr(callee)), level=C.XMLUI_DATA_LVL_ERROR ) return self.webrtc.callee = callee await self.webrtc.setup_call("initiator") self.webrtc.start_pipeline() self.in_call = True else: # Terminate the call sid = self.sid await self.end_call({"reason": "terminated"}, self.profile) await G.host.a_bridge.call_end(sid, "", self.profile) self.in_call = False async def on_incoming_call( self, action_data: dict, action_id: str, profile: str ) -> None: """Handle incoming call notifications. @param action_data: A dictionary containing data related to the incoming call @param action_id: The ID corresponding to the incoming call action. @param profile: The profile associated with the incoming call. """ if self.sid is not None: # FIXME: show a double remote call notification log.warning("Ignoring new remote call as we are already in a call.") return sid = action_data["session_id"] self.in_call = True self.webrtc.sid = sid # FIXME: we accept all remote calls for now, will be changed when proper UI/UX # will be implemented log.warning("auto-accepting remote call") await G.host.a_bridge.action_launch( action_id, data_format.serialise({"cancelled": False}), profile ) async def on_call_setup(self, setup_data: dict, profile: str) -> None: """Call has been accepted, connection can be established @param session_id: Session identifier @param setup_data: Data with following keys: role: initiator or responser sdp: Session Description Protocol data @param profile: Profile associated """ try: role = setup_data["role"] sdp = setup_data["sdp"] except KeyError: log.error(f"Invalid setup data received: {setup_data}") return if role == "initiator": self.webrtc.on_accepted_call(sdp, profile) elif role == "responder": await self.webrtc.answer_call(sdp, profile) else: log.error(f"Invalid role received during setup: {setup_data}") def reset_instance(self) -> None: self.in_call = False self.local_video.texture = None self.remote_video.texture = None async def end_call(self, data: dict, profile: str) -> None: """End current call if any and reset the instance @param data: end call data, often includes the reason of the call ending. """ await self.webrtc.end_call() self.reset_instance() def on_in_call(self, instance, in_call: bool) -> None: if in_call: self.screen_manager.transition.direction = "up" self.screen_manager.current = "call" else: self.fullscreen = False self.screen_manager.transition.direction = "down" self.screen_manager.current = "search" def on_audio_muted(self, instance, muted: bool) -> None: self.webrtc.audio_muted = muted def on_video_muted(self, instance, muted: bool) -> None: self.webrtc.video_muted = muted def on_desktop_btn_press(self) -> None: if self.desktop_sharing: self.desktop_sharing = False else: DesktopScreenDialog(self).open() def on_desktop_sharing(self, instance, active: bool) -> None: self.webrtc.desktop_sharing = active def on_fullscreen(self, instance, fullscreen: bool) -> None: if fullscreen: G.host.app.show_head_widget(False, animation=False) self.call_layout.parent.remove_widget(self.call_layout) G.host.show_extra_ui(self.call_layout) self.previous_fullscreen = Window.fullscreen Window.fullscreen = "auto" else: G.host.app.show_head_widget(True, animation=False) G.host.close_ui() self.call_screen.add_widget(self.call_layout) Window.fullscreen = self.previous_fullscreen or False def on_header_wid_input(self): aio.run_async(self.toggle_call()) def on_header_wid_input_complete(self, wid, text, **kwargs): """we filter items when text is entered in input box""" for layout in self.jid_selector.items_layouts: self.do_filter( layout, text, # we append nick to jid to filter on both lambda c: c.jid + c.data.get('nick', ''), width_cb=lambda c: c.base_width, height_cb=lambda c: c.minimum_height, continue_tests=[lambda c: not isinstance(c, common.ContactButton)]) def on_jid_select(self, contact_button): self.header_input.text = contact_button.jid aio.run_async(self.toggle_call()) @classmethod def ice_candidates_new_handler( cls, sid: str, candidates_s: str, profile: str ) -> None: for wid in G.host.get_visible_list(cls): if profile not in wid.profiles or sid != wid.sid: continue wid.webrtc.on_ice_candidates_new( data_format.deserialise(candidates_s), ) @classmethod def call_setup_handler(cls, sid: str, setup_data_s: str, profile: str) -> None: for wid in G.host.get_visible_list(cls): if profile not in wid.profiles or sid != wid.sid: continue aio.run_async( wid.on_call_setup(data_format.deserialise(setup_data_s), profile) ) @classmethod def call_ended_handler(cls, sid: str, data_s: str, profile: str) -> None: if sid in cls.incoming_call_dialog: dialog_wid = cls.incoming_call_dialog.pop(sid) G.host.del_notif_widget(dialog_wid) G.host.add_note(_("Call cancelled"), _("The call has been cancelled.")) for wid in G.host.get_visible_list(cls): if profile not in wid.profiles or sid != wid.sid: continue aio.run_async(wid.end_call(data_format.deserialise(data_s), profile)) @classmethod def action_new_handler( cls, action_data: dict, action_id: str, security_limit: int, profile: str ) -> None: if profile in G.host.profiles: if cls.ringtone is None: cls.ringtone = SoundLoader.load( str(Path(G.host.media_dir) / "sounds/notifications/ring_1.mp3") ) if cls.ringtone is not None: cls.ringtone.play() peer_jid = jid.JID(action_data["from_jid"]).bare sid = action_data["session_id"] notif_body = f"{peer_jid} is calling you." notif_title = "Incoming call" G.host.desktop_notif(notif_body, title=notif_title, duration=10) def on_call_answer(accepted, __): del cls.incoming_call_dialog[sid] if cls.ringtone is not None: cls.ringtone.stop() if accepted: wid = G.host.do_action("calls", str(peer_jid), [profile]) aio.run_async(wid.on_incoming_call(action_data, action_id, profile)) else: aio.run_async( G.host.a_bridge.action_launch( action_id, data_format.serialise({"cancelled": True}), profile ) ) dialog_wid = G.host.show_dialog( notif_body, notif_title, type="yes/no", answer_cb=on_call_answer ) cls.incoming_call_dialog[sid] = dialog_wid if G.host is not None: log.debug("registering signals") G.host.register_signal( "ice_candidates_new", handler=Calls.ice_candidates_new_handler, iface="plugin", ) G.host.register_signal("call_setup", handler=Calls.call_setup_handler, iface="plugin") G.host.register_signal("call_ended", handler=Calls.call_ended_handler, iface="plugin") G.host.register_action_handler(C.META_TYPE_CALL, Calls.action_new_handler)