view libervia/desktop_kivy/plugins/plugin_wid_calls.py @ 509:f0ce49b360c8

calls: move webrtc code to core: WebRTC code which can be used in several frontends has been factorized and moved to common `frontends.tools`. Test have been updated consequently. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 13:41:07 +0100
parents 0480f883f0a6
children 97ab236e8f20
line wrap: on
line source

from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Callable
from functools import partial

# from gi.repository import GLib
from gi.repository import GObject, Gst, GstWebRTC, GstSdp

try:
    from gi.overrides import Gst as _
except ImportError:
    print(
        "no GStreamer python overrides available, please install relevant pacakges on "
        "your system."
    )
from kivy.clock import Clock
from kivy.core.audio import Sound, SoundLoader
from kivy.core.window import Window
from kivy.graphics.texture import Texture
from kivy.properties import (
    BooleanProperty,
    ColorProperty,
    NumericProperty,
    ObjectProperty,
    ReferenceListProperty,
)
from kivy.support import install_gobject_iteration
from kivy.uix.button import Button
from kivy.uix.image import Image
from kivy.uix.screenmanager import Screen
from kivy.uix.widget import Widget
from libervia.backend.core.constants import Const as C
from libervia.backend.core import log as logging
from libervia.backend.core.i18n import _
from libervia.backend.core import exceptions
from libervia.backend.tools.common import data_format
from libervia.frontends.quick_frontend import quick_widgets
from libervia.frontends.tools import aio, jid, webrtc

from libervia.desktop_kivy import G

from ..core import cagou_widget
from ..core import common
from ..core.behaviors import FilterBehavior

log = logging.getLogger(__name__)

install_gobject_iteration()

Gst.init(None)


PLUGIN_INFO = {
    "name": _("calls"),
    "main": "Calls",
    "description": _("Audio/Video calls"),
    "icon_symbol": "phone",
}


@dataclass
class TextureData:
    texture: Optional[Texture] = None
    size: Optional[tuple[int, int]] = None


class SearchScreen(Screen):
    pass


class InCallScreen(Screen):
    pass


class CallButton(Button):
    parent_widget = ObjectProperty(None)


class CallControlButton(common.SymbolButton):
    active = BooleanProperty(True)
    background_color = ColorProperty()
    margin_x = NumericProperty(0)
    margin_y = NumericProperty(0)
    margin = ReferenceListProperty(margin_x, margin_y)


class VideoStreamWidget(Image):
    pass


class WebRTC:
    """WebRTC implementation for audio and video communication.

    This class encapsulates the WebRTC functionalities required for initiating and
    handling audio and video calls.

    @attribute test_mode: A flag to indicate whether the WebRTC instance is in test mode.
        If true, test video and audio sources will be used. Otherwise first webcam and
        microphone available will be used.
    """

    test_mode: bool = False

    PROXIED_PROPERTIES = {'audio_muted', 'callee', 'sid', 'video_muted'}
    PROXIED_METHODS = {'answer_call', 'end_call', 'on_accepted_call', 'on_ice_candidates_new', 'setup_call', 'start_pipeline'}

    def __init__(self, parent_calls: "Calls", profile: str) -> None:
        self.parent_calls = parent_calls
        self.profile = profile
        self.webrtc = webrtc.WebRTC(
            G.host.a_bridge,
            profile,
            sinks=webrtc.SINKS_TEST if self.test_mode else webrtc.SINKS_APP,
            appsink_data=webrtc.AppSinkData(
                local_video_cb=partial(
                    self.on_new_sample,
                    update_sample_method=self.update_sample,
                    video_widget=self.parent_calls.local_video
                ),
                remote_video_cb=partial(
                    self.on_new_sample,
                    update_sample_method=self.update_sample,
                    video_widget=self.parent_calls.remote_video
                )
            ),
            reset_cb=self.on_reset

        )
        self.pipeline = None

    def __getattr__(self, name):
        if name in self.PROXIED_PROPERTIES or name in self.PROXIED_METHODS:
            return getattr(self.webrtc, name)
        raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")

    def __setattr__(self, name, value):
        if name in self.PROXIED_PROPERTIES:
            setattr(self.webrtc, name, value)
        else:
            super().__setattr__(name, value)

    def on_reset(self):
        self.texture_map: dict[VideoStreamWidget, TextureData] = {}

    def on_new_sample(
        self,
        video_sink: Gst.Element,
        update_sample_method: Callable,
        video_widget: VideoStreamWidget,
    ) -> bool:
        """Handles a new sample from the video sink.

        @param video_sink: The video sink to pull samples from.
        @param update_sample_method: The method to call for updating the sample.
        @param video_widget: The video widget (either local_video or remote_video).
        @return: Always False
        """
        sample = video_sink.emit("pull-sample")
        if sample is None:
            return False

        try:
            texture_data = self.texture_map[video_widget]
        except KeyError:
            texture_data = self.texture_map[video_widget] = TextureData()

        # Get the current video size
        video_pad = video_sink.get_static_pad("sink")
        assert video_pad is not None
        s = video_pad.get_current_caps().get_structure(0)
        stream_size = (s.get_value("width"), s.get_value("height"))

        # Compare with the texture size
        texture_size = texture_data.size
        if texture_size != stream_size:
            log.debug(f"sample size update: {texture_size} => {stream_size}")
            texture_data.size = stream_size
            # texture needs to be recreated, so we reset the one in texture_data
            texture_data.texture = None

        Clock.schedule_once(
            partial(
                update_sample_method,
                sample=sample,
                video_widget=video_widget,
            )
        )
        return False

    def update_sample(
        self,
        dt: float,
        sample: Optional[Gst.Sample],
        video_widget: VideoStreamWidget,
    ) -> None:
        """Updates the video sample.

        This method runs in the main thread.

        @param dt: Time delta since the last call. This is passed by Clock.schedule_once.
        @param sample: The video sample to update.
        @param texture_id: identifier of the texture data (e.g. "local" or "remote")
        @param texture: The texture object.
        @param texture_size: The texture size.
        @param video_widget: The video widget.
        """
        if sample is None:
            return
        try:
            texture_data = self.texture_map[video_widget]
        except KeyError:
            log.warning(f"no texture data found for {video_widget}")
            return

        if texture_data.texture is None and texture_data.size is not None:
            log.debug(f"===> creating texture for {video_widget}: {texture_data.size=}")
            texture = Texture.create(size=texture_data.size, colorfmt="rgb")
            assert texture is not None
            texture_data.texture = texture
            texture.flip_vertical()
            video_widget.texture = texture

        mapinfo = None
        buf = None
        try:
            buf = sample.get_buffer()
            _, mapinfo = buf.map(Gst.MapFlags.READ)

            buffer = mapinfo.data.tobytes()

            if texture_data.texture is None:
                log.debug("can't copy the buffer, texture is None")
                return
            texture_data.texture.blit_buffer(buffer, colorfmt="rgb")
            assert video_widget.canvas is not None
            video_widget.canvas.ask_update()
            buf.unmap(mapinfo)
        finally:
            if buf is not None and mapinfo is not None:
                buf.unmap(mapinfo)


class Calls(
        quick_widgets.QuickWidget,
        cagou_widget.LiberviaDesktopKivyWidget,
        FilterBehavior
):
    audio_muted = BooleanProperty(False)
    call_layout = ObjectProperty()
    call_screen = ObjectProperty()
    fullscreen = BooleanProperty(False)
    in_call = BooleanProperty(False)
    incoming_call_dialog: dict[str, Widget] = {}
    jid_selector = ObjectProperty()
    local_video = ObjectProperty()
    remote_video = ObjectProperty()
    ringtone: Sound | None = None
    screen_manager = ObjectProperty()
    signals_registered = False
    use_header_input = True
    video_muted = BooleanProperty(False)

    def __init__(self, host, target, profiles):
        quick_widgets.QuickWidget.__init__(self, G.host, target, profiles)
        cagou_widget.LiberviaDesktopKivyWidget.__init__(self)
        call_btn = CallButton(
            parent_widget=self, on_press=lambda *__: aio.run_async(self.toggle_call())
        )
        self.header_input_add_extra(call_btn)
        self.webrtc = WebRTC(self, self.profile)
        self.previous_fullscreen = None
        self.reset_instance()

    @property
    def sid(self):
        return self.webrtc.sid

    def hang_up(self):
        if self.sid is not None:
            aio.run_async(self.toggle_call())

    async def toggle_call(self):
        """Toggle between making a call and hanging up.

        This function will initiate or terminate a call based on the call state.
        """
        if self.sid is None:
            # Initiate the call
            log.info("initiating call")
            callee = jid.JID(self.header_input.text.strip())
            if not callee:
                return
            if not callee.is_valid():
                G.host.add_note(
                    _("Calls"),
                    _("Can't make a call: invalid destinee {}").format(repr(callee)),
                    level=C.XMLUI_DATA_LVL_ERROR
                )
                return

            self.webrtc.callee = callee
            await self.webrtc.setup_call("initiator")
            self.webrtc.start_pipeline()
            self.in_call = True
        else:
            # Terminate the call
            sid = self.sid
            await self.end_call({"reason": "terminated"}, self.profile)
            await G.host.a_bridge.call_end(sid, "", self.profile)
            self.in_call = False

    async def on_incoming_call(
        self, action_data: dict, action_id: str, profile: str
    ) -> None:
        """Handle incoming call notifications.

        @param action_data: A dictionary containing data related to the incoming call
        @param action_id: The ID corresponding to the incoming call action.
        @param profile: The profile associated with the incoming call.
        """
        if self.sid is not None:
            # FIXME: show a double remote call notification
            log.warning("Ignoring new remote call as we are already in a call.")
            return
        sid = action_data["session_id"]
        self.in_call = True
        self.webrtc.sid = sid
        # FIXME: we accept all remote calls for now, will be changed when proper UI/UX
        #   will be implemented
        log.warning("auto-accepting remote call")
        await G.host.a_bridge.action_launch(
            action_id, data_format.serialise({"cancelled": False}), profile
        )

    async def on_call_setup(self, setup_data: dict, profile: str) -> None:
        """Call has been accepted, connection can be established

        @param session_id: Session identifier
        @param setup_data: Data with following keys:
            role: initiator or responser
            sdp: Session Description Protocol data
        @param profile: Profile associated
        """
        try:
            role = setup_data["role"]
            sdp = setup_data["sdp"]
        except KeyError:
            log.error(f"Invalid setup data received: {setup_data}")
            return
        if role == "initiator":
            self.webrtc.on_accepted_call(sdp, profile)
        elif role == "responder":
            await self.webrtc.answer_call(sdp, profile)
        else:
            log.error(f"Invalid role received during setup: {setup_data}")

    def reset_instance(self) -> None:
        self.in_call = False
        self.local_video.texture = None
        self.remote_video.texture = None

    async def end_call(self, data: dict, profile: str) -> None:
        """End current call if any and reset the instance

        @param data: end call data, often includes the reason of the call ending.
        """
        await self.webrtc.end_call()
        self.reset_instance()

    def on_in_call(self, instance, in_call: bool) -> None:
        if in_call:
            self.screen_manager.transition.direction = "up"
            self.screen_manager.current = "call"
        else:
            self.fullscreen = False
            self.screen_manager.transition.direction = "down"
            self.screen_manager.current = "search"

    def on_audio_muted(self, instance, muted: bool) -> None:
        self.webrtc.audio_muted = muted

    def on_video_muted(self, instance, muted: bool) -> None:
        self.webrtc.video_muted = muted

    def on_fullscreen(self, instance, fullscreen: bool) -> None:
        if fullscreen:
            G.host.app.show_head_widget(False, animation=False)
            self.call_layout.parent.remove_widget(self.call_layout)
            G.host.show_extra_ui(self.call_layout)
            self.previous_fullscreen = Window.fullscreen
            Window.fullscreen = "auto"
        else:
            G.host.app.show_head_widget(True, animation=False)
            G.host.close_ui()
            self.call_screen.add_widget(self.call_layout)
            Window.fullscreen = self.previous_fullscreen or False

    def on_header_wid_input(self):
        aio.run_async(self.toggle_call())

    def on_header_wid_input_complete(self, wid, text, **kwargs):
        """we filter items when text is entered in input box"""
        for layout in self.jid_selector.items_layouts:
            self.do_filter(
                layout,
                text,
                # we append nick to jid to filter on both
                lambda c: c.jid + c.data.get('nick', ''),
                width_cb=lambda c: c.base_width,
                height_cb=lambda c: c.minimum_height,
                continue_tests=[lambda c: not isinstance(c, common.ContactButton)])

    def on_jid_select(self, contact_button):
        self.header_input.text = contact_button.jid
        aio.run_async(self.toggle_call())

    @classmethod
    def ice_candidates_new_handler(
        cls, sid: str, candidates_s: str, profile: str
    ) -> None:
        for wid in G.host.get_visible_list(cls):
            if profile not in wid.profiles or sid != wid.sid:
                continue
            wid.webrtc.on_ice_candidates_new(
                data_format.deserialise(candidates_s),
            )

    @classmethod
    def call_setup_handler(cls, sid: str, setup_data_s: str, profile: str) -> None:
        for wid in G.host.get_visible_list(cls):
            if profile not in wid.profiles or sid != wid.sid:
                continue
            aio.run_async(
                wid.on_call_setup(data_format.deserialise(setup_data_s), profile)
            )

    @classmethod
    def call_ended_handler(cls, sid: str, data_s: str, profile: str) -> None:
        if sid in cls.incoming_call_dialog:
            dialog_wid = cls.incoming_call_dialog.pop(sid)
            G.host.del_notif_widget(dialog_wid)
            G.host.add_note(_("Call cancelled"), _("The call has been cancelled."))


        for wid in G.host.get_visible_list(cls):
            if profile not in wid.profiles or sid != wid.sid:
                continue
            aio.run_async(wid.end_call(data_format.deserialise(data_s), profile))

    @classmethod
    def action_new_handler(
        cls, action_data: dict, action_id: str, security_limit: int, profile: str
    ) -> None:
        if profile in G.host.profiles:
            if cls.ringtone is None:
                cls.ringtone = SoundLoader.load(
                    str(Path(G.host.media_dir) / "sounds/notifications/ring_1.mp3")
                )
            if cls.ringtone is not None:
                cls.ringtone.play()
            peer_jid = jid.JID(action_data["from_jid"]).bare
            sid = action_data["session_id"]
            notif_body = f"{peer_jid} is calling you."
            notif_title = "Incoming call"
            G.host.desktop_notif(notif_body, title=notif_title, duration=10)

            def on_call_answer(accepted, __):
                del cls.incoming_call_dialog[sid]
                if cls.ringtone is not None:
                    cls.ringtone.stop()
                if accepted:
                    wid = G.host.do_action("calls", str(peer_jid), [profile])
                    aio.run_async(wid.on_incoming_call(action_data, action_id, profile))
                else:
                    aio.run_async(
                        G.host.a_bridge.action_launch(
                            action_id, data_format.serialise({"cancelled": True}), profile
                        )
                    )

            dialog_wid = G.host.show_dialog(
                notif_body, notif_title, type="yes/no", answer_cb=on_call_answer
            )
            cls.incoming_call_dialog[sid] = dialog_wid


if G.host is not None:
    log.debug("registering signals")
    G.host.register_signal(
        "ice_candidates_new",
        handler=Calls.ice_candidates_new_handler,
        iface="plugin",
    )
    G.host.register_signal("call_setup", handler=Calls.call_setup_handler, iface="plugin")
    G.host.register_signal("call_ended", handler=Calls.call_ended_handler, iface="plugin")
    G.host.register_action_handler(C.META_TYPE_CALL, Calls.action_new_handler)