view libervia/cli/call_gui.py @ 4242:8acf46ed7f36

frontends: remote control implementation: This is the frontends common part of remote control implementation. It handle the creation of WebRTC session, and management of inputs. For now the reception use freedesktop.org Desktop portal, and works mostly with Wayland based Desktop Environments. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:43 +0200
parents 79c8a70e1813
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
from functools import partial
from pathlib import Path
from typing import Callable, cast

from PyQt6.QtCore import QPoint, QSize, Qt
from PyQt6.QtGui import (
    QCloseEvent,
    QColor,
    QIcon,
    QImage,
    QPainter,
    QPen,
    QPixmap,
    QResizeEvent,
    QTransform,
)
from PyQt6.QtWidgets import (
    QApplication,
    QDialog,
    QDialogButtonBox,
    QHBoxLayout,
    QLabel,
    QListWidget,
    QListWidgetItem,
    QMainWindow,
    QPushButton,
    QSizePolicy,
    QVBoxLayout,
    QWidget,
)
import gi

from libervia.backend.core.i18n import _
from libervia.frontends.tools import aio, display_servers, webrtc
gi.require_versions({
    "Gst": "1.0",
    "GstWebRTC": "1.0"
})
from gi.repository import Gst



ICON_SIZE = QSize(45, 45)
BUTTON_SIZE = QSize(50, 50)
running = False


aio.install_glib_asyncio_iteration()


class ActivableButton(QPushButton):
    def __init__(self, text, parent=None):
        super().__init__(parent)
        self._activated = True
        self._activated_colour = "#47c68e"
        self._deactivated_colour = "#ffe089"
        self._line_colour = "#ff0000"
        self._update_background_color()

    @property
    def activated_colour(self) -> str:
        return self._activated_colour

    @activated_colour.setter
    def activated_colour(self, new_colour: str) -> None:
        if new_colour != self._activated_colour:
            self._activated_colour = new_colour
            self._update_background_color()

    @property
    def deactivated_colour(self) -> str:
        return self._deactivated_colour

    @deactivated_colour.setter
    def deactivated_colour(self, new_colour: str) -> None:
        if new_colour != self._deactivated_colour:
            self._deactivated_colour = new_colour
            self._update_background_color()

    @property
    def line_colour(self) -> str:
        return self._line_colour

    @line_colour.setter
    def line_colour(self, new_colour: str) -> None:
        if new_colour != self._line_colour:
            self._line_colour = new_colour
            self.update()

    def paintEvent(self, a0):
        super().paintEvent(a0)

        if not self._activated:
            painter = QPainter(self)
            painter.setRenderHint(QPainter.RenderHint.Antialiasing)

            line_color = QColor(self._line_colour)
            line_width = 4
            cap_style = Qt.PenCapStyle.RoundCap

            pen = QPen(line_color, line_width, Qt.PenStyle.SolidLine)
            pen.setCapStyle(cap_style)
            painter.setPen(pen)

            margin = 5
            start_point = QPoint(margin, self.height() - margin)
            end_point = QPoint(self.width() - margin, margin)
            painter.drawLine(start_point, end_point)

    def _update_background_color(self):
        if self._activated:
            self.setStyleSheet(f"background-color: {self._activated_colour};")
        else:
            self.setStyleSheet(f"background-color: {self._deactivated_colour};")
        self.update()

    @property
    def activated(self):
        return self._activated

    @activated.setter
    def activated(self, value):
        if self._activated != value:
            self._activated = value
            self._update_background_color()


class X11DesktopScreenDialog(QDialog):
    def __init__(self, windows_data, parent=None):
        super().__init__(parent)
        self.__a_result = asyncio.get_running_loop().create_future()
        self.setWindowTitle("Please select a window to share:")
        self.resize(400, 300)
        self.list_widget = QListWidget(self)
        for window_data in windows_data:
            item = QListWidgetItem(window_data["title"])
            item.setData(Qt.ItemDataRole.UserRole, window_data)
            self.list_widget.addItem(item)

        self.buttonBox = QDialogButtonBox(
            QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
        )
        self.buttonBox.accepted.connect(self.on_accepted)
        self.buttonBox.rejected.connect(self.on_rejected)

        layout = QVBoxLayout(self)
        layout.addWidget(self.list_widget)
        layout.addWidget(self.buttonBox)

    def get_selected_window(self) -> dict | None:
        selectedItem = self.list_widget.currentItem()
        if selectedItem:
            return selectedItem.data(Qt.ItemDataRole.UserRole)
        return None

    def on_accepted(self):
        self.__a_result.set_result(self.get_selected_window())
        self.close()

    def on_rejected(self):
        self.__a_result.set_result(None)
        self.close()

    def closeEvent(self, a0):
        super().closeEvent(a0)
        if not self.__a_result.done():
            self.__a_result.set_result(None)

    async def a_show(self) -> dict | None:
        self.open()
        return await self.__a_result


class AVCallUI(QMainWindow):
    def __init__(self, host, icons_path: Path):
        super().__init__()
        self.host = host
        self.webrtc_call = None
        self.icons_path = icons_path
        self.initUI()

    @staticmethod
    async def run_qt_loop(app):
        while running:
            app.sendPostedEvents()
            await asyncio.sleep(0.1)

    @classmethod
    async def run(cls, parent, call_data):
        """Run PyQt loop and show the app"""
        media_dir = Path(await parent.host.bridge.config_get("", "media_dir"))
        icons_path = media_dir / "fonts/fontello/svg"
        app = QApplication([])
        av_call_gui = cls(parent.host, icons_path)
        av_call_gui.show()
        webrtc_call = await webrtc.WebRTCCall.make_webrtc_call(
            parent.host.bridge,
            parent.profile,
            call_data,
            sinks_data=webrtc.SinksApp(
                local_video_cb=partial(av_call_gui.on_new_sample, video_stream="local"),
                remote_video_cb=partial(av_call_gui.on_new_sample, video_stream="remote"),
            ),
            # we want to be sure that call is ended if user presses `Ctrl + c` or anything
            # else stops the session.
            on_call_setup_cb=lambda sid, profile: parent.host.add_on_quit_callback(
                parent.host.bridge.call_end, sid, "", profile
            ),
            on_call_ended_cb=lambda sid, profile: parent.host.a_quit(),
        )
        av_call_gui.webrtc_call = webrtc_call

        global running
        running = True
        await cls.run_qt_loop(app)
        await parent.host.a_quit()

    def initUI(self):
        self.setGeometry(100, 100, 800, 600)
        self.setWindowTitle("Call")

        # Main layouts
        self.background_widget = QWidget(self)
        self.foreground_widget = QWidget(self)
        self.setCentralWidget(self.background_widget)
        back_layout = QVBoxLayout(self.background_widget)
        front_layout = QVBoxLayout(self.foreground_widget)

        # Remote video
        self.remote_video_widget = QLabel(self)
        self.remote_video_widget.setSizePolicy(
            QSizePolicy.Policy.Ignored, QSizePolicy.Policy.Ignored
        )
        back_layout.addWidget(self.remote_video_widget)

        # Fullscreen button
        fullscreen_layout = QHBoxLayout()
        front_layout.addLayout(fullscreen_layout)
        fullscreen_layout.addStretch()
        self.fullscreen_btn = QPushButton("", self)
        self.fullscreen_btn.setFixedSize(BUTTON_SIZE)
        self.fullscreen_icon_normal = QIcon(str(self.icons_path / "resize-full.svg"))
        self.fullscreen_icon_fullscreen = QIcon(str(self.icons_path / "resize-small.svg"))
        self.fullscreen_btn.setIcon(self.fullscreen_icon_normal)
        self.fullscreen_btn.setIconSize(ICON_SIZE)
        self.fullscreen_btn.clicked.connect(self.toggle_fullscreen)
        fullscreen_layout.addWidget(self.fullscreen_btn)

        # Control buttons
        self.control_buttons_layout = QHBoxLayout()
        self.control_buttons_layout.setSpacing(40)
        self.toggle_video_btn = cast(
            ActivableButton, self.add_control_button("videocam", self.toggle_video)
        )
        self.toggle_audio_btn = cast(
            ActivableButton, self.add_control_button("volume-up", self.toggle_audio)
        )
        self.share_desktop_btn = cast(
            ActivableButton, self.add_control_button("desktop", self.share_desktop)
        )
        self.share_desktop_btn.deactivated_colour = "#47c68e"
        self.share_desktop_btn.activated_colour = "#f24468"
        self.share_desktop_btn.line_colour = "#666666"
        self.share_desktop_btn.activated = False
        self.hang_up_btn = self.add_control_button(
            "phone", self.hang_up, rotate=135, background="red", activable=False
        )

        controls_widget = QWidget(self)
        controls_widget.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
        controls_widget.setLayout(self.control_buttons_layout)
        front_layout.addStretch()

        bottom_layout = QHBoxLayout()
        bottom_layout.addStretch()
        front_layout.addLayout(bottom_layout)
        bottom_layout.addWidget(controls_widget, alignment=Qt.AlignmentFlag.AlignBottom)

        # Local video feedback
        bottom_layout.addStretch()
        self.local_video_widget = QLabel(self)
        bottom_layout.addWidget(self.local_video_widget)

        # we update sizes on resize event
        self.background_widget.resizeEvent = self.adjust_sizes
        self.adjust_sizes()

    def add_control_button(
        self,
        icon_name: str,
        callback: Callable,
        rotate: float | None = None,
        background: str | None = None,
        activable: bool = True,
    ) -> QPushButton | ActivableButton:
        if activable:
            button = ActivableButton("", self)
        else:
            button = QPushButton("", self)
        icon_path = self.icons_path / f"{icon_name}.svg"
        button.setIcon(QIcon(str(icon_path)))
        button.setIconSize(ICON_SIZE)
        button.setFixedSize(BUTTON_SIZE)
        if rotate is not None:
            pixmap = button.icon().pixmap(ICON_SIZE)
            transform = QTransform()
            transform.rotate(rotate)
            rotated_pixmap = pixmap.transformed(transform)
            button.setIcon(QIcon(rotated_pixmap))
        if background:
            button.setStyleSheet(f"background-color: {background};")
        button.clicked.connect(callback)
        self.control_buttons_layout.addWidget(button)
        return button

    def adjust_sizes(self, a0: QResizeEvent | None = None) -> None:
        self.foreground_widget.setGeometry(
            0, 0, self.background_widget.width(), self.background_widget.height()
        )
        self.local_video_widget.setFixedSize(QSize(self.width() // 3, self.height() // 3))
        if a0 is not None:
            super().resizeEvent(a0)

    def on_new_sample(self, video_sink, video_stream: str) -> bool:
        sample = video_sink.emit("pull-sample")
        if sample is None:
            return False

        video_pad = video_sink.get_static_pad("sink")
        assert video_pad is not None
        s = video_pad.get_current_caps().get_structure(0)
        stream_size = (s.get_value("width"), s.get_value("height"))
        self.host.loop.loop.call_soon_threadsafe(
            self.update_sample, sample, stream_size, video_stream
        )

        return False

    def update_sample(self, sample, stream_size, video_stream: str) -> None:
        if sample is None:
            return

        video_widget = (
            self.remote_video_widget
            if video_stream == "remote"
            else self.local_video_widget
        )

        buf = sample.get_buffer()
        result, mapinfo = buf.map(Gst.MapFlags.READ)
        if result:
            buffer = mapinfo.data
            width, height = stream_size
            qimage = QImage(buffer, width, height, QImage.Format.Format_RGB888)
            pixmap = QPixmap.fromImage(qimage).scaled(
                QSize(video_widget.width(), video_widget.height()),
                Qt.AspectRatioMode.KeepAspectRatio,
            )
            video_widget.setPixmap(pixmap)
            video_widget.setAlignment(Qt.AlignmentFlag.AlignCenter)

            buf.unmap(mapinfo)

    def toggle_fullscreen(self):
        fullscreen = not self.isFullScreen()
        if fullscreen:
            self.fullscreen_btn.setIcon(self.fullscreen_icon_fullscreen)
            self.showFullScreen()
        else:
            self.fullscreen_btn.setIcon(self.fullscreen_icon_normal)
            self.showNormal()

    def closeEvent(self, a0: QCloseEvent|None) -> None:
        super().closeEvent(a0)
        global running
        running = False

    def toggle_video(self):
        assert self.webrtc_call is not None
        self.webrtc_call.webrtc.video_muted = not self.webrtc_call.webrtc.video_muted
        self.toggle_video_btn.activated = not self.webrtc_call.webrtc.video_muted

    def toggle_audio(self):
        assert self.webrtc_call is not None
        self.webrtc_call.webrtc.audio_muted = not self.webrtc_call.webrtc.audio_muted
        self.toggle_audio_btn.activated = not self.webrtc_call.webrtc.audio_muted

    def share_desktop(self):
        assert self.webrtc_call is not None
        if self.webrtc_call.webrtc.desktop_sharing:
            self.webrtc_call.webrtc.desktop_sharing = False
            self.share_desktop_btn.activated = False
        elif display_servers.detect() == display_servers.X11:
            aio.run_async(self.show_X11_screen_dialog())
        else:
            self.webrtc_call.webrtc.desktop_sharing = True

    def hang_up(self):
        self.close()

    @staticmethod
    def can_run():
        # if a known display server is detected, we should be able to run
        return display_servers.detect() is not None

    async def show_X11_screen_dialog(self):
        assert self.webrtc_call is not None
        windows_data = display_servers.x11_list_windows()
        dialog = X11DesktopScreenDialog(windows_data, self)
        selected = await dialog.a_show()
        if selected is not None:
            xid = selected["id"]
            self.webrtc_call.webrtc.desktop_sharing_data = {"xid": xid}
            self.webrtc_call.webrtc.desktop_sharing = True
            self.share_desktop_btn.activated = True