view libervia/cli/call_gui.py @ 4295:7d98d894933c

frontends (tools/aio): Fix excessive use of CPU: Use a short delay instead of `loop.call_soonloop.call_soon` when work as been done, as this was resulting in an excessive use of CPU.
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 17:38:31 +0200
parents 0d7bb4df2343
children 00837fa13e5a
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
from functools import partial
from pathlib import Path
from typing import Callable, cast

from PyQt6.QtCore import QPoint, QSize, Qt
from PyQt6.QtGui import (
    QCloseEvent,
    QColor,
    QIcon,
    QImage,
    QPainter,
    QPen,
    QPixmap,
    QResizeEvent,
    QTransform,
)
from PyQt6.QtWidgets import (
    QApplication,
    QDialog,
    QDialogButtonBox,
    QHBoxLayout,
    QLabel,
    QListWidget,
    QListWidgetItem,
    QMainWindow,
    QPushButton,
    QSizePolicy,
    QVBoxLayout,
    QWidget,
)
import gi

from libervia.backend.core.i18n import _
from libervia.frontends.tools import aio, display_servers, webrtc

gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
from gi.repository import Gst


ICON_SIZE = QSize(45, 45)
BUTTON_SIZE = QSize(50, 50)
running = False


aio.install_glib_asyncio_iteration()


class ActivableButton(QPushButton):
    def __init__(self, text, parent=None):
        super().__init__(parent)
        self._activated = True
        self._activated_colour = "#47c68e"
        self._deactivated_colour = "#ffe089"
        self._line_colour = "#ff0000"
        self._update_background_color()

    @property
    def activated_colour(self) -> str:
        return self._activated_colour

    @activated_colour.setter
    def activated_colour(self, new_colour: str) -> None:
        if new_colour != self._activated_colour:
            self._activated_colour = new_colour
            self._update_background_color()

    @property
    def deactivated_colour(self) -> str:
        return self._deactivated_colour

    @deactivated_colour.setter
    def deactivated_colour(self, new_colour: str) -> None:
        if new_colour != self._deactivated_colour:
            self._deactivated_colour = new_colour
            self._update_background_color()

    @property
    def line_colour(self) -> str:
        return self._line_colour

    @line_colour.setter
    def line_colour(self, new_colour: str) -> None:
        if new_colour != self._line_colour:
            self._line_colour = new_colour
            self.update()

    def paintEvent(self, a0):
        super().paintEvent(a0)

        if not self._activated:
            painter = QPainter(self)
            painter.setRenderHint(QPainter.RenderHint.Antialiasing)

            line_color = QColor(self._line_colour)
            line_width = 4
            cap_style = Qt.PenCapStyle.RoundCap

            pen = QPen(line_color, line_width, Qt.PenStyle.SolidLine)
            pen.setCapStyle(cap_style)
            painter.setPen(pen)

            margin = 5
            start_point = QPoint(margin, self.height() - margin)
            end_point = QPoint(self.width() - margin, margin)
            painter.drawLine(start_point, end_point)

    def _update_background_color(self):
        if self._activated:
            self.setStyleSheet(f"background-color: {self._activated_colour};")
        else:
            self.setStyleSheet(f"background-color: {self._deactivated_colour};")
        self.update()

    @property
    def activated(self):
        return self._activated

    @activated.setter
    def activated(self, value):
        if self._activated != value:
            self._activated = value
            self._update_background_color()


class X11DesktopScreenDialog(QDialog):
    def __init__(self, windows_data, parent=None):
        super().__init__(parent)
        self.__a_result = asyncio.get_running_loop().create_future()
        self.setWindowTitle("Please select a window to share:")
        self.resize(400, 300)
        self.list_widget = QListWidget(self)
        for window_data in windows_data:
            item = QListWidgetItem(window_data["title"])
            item.setData(Qt.ItemDataRole.UserRole, window_data)
            self.list_widget.addItem(item)

        self.buttonBox = QDialogButtonBox(
            QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
        )
        self.buttonBox.accepted.connect(self.on_accepted)
        self.buttonBox.rejected.connect(self.on_rejected)

        layout = QVBoxLayout(self)
        layout.addWidget(self.list_widget)
        layout.addWidget(self.buttonBox)

    def get_selected_window(self) -> dict | None:
        selectedItem = self.list_widget.currentItem()
        if selectedItem:
            return selectedItem.data(Qt.ItemDataRole.UserRole)
        return None

    def on_accepted(self):
        self.__a_result.set_result(self.get_selected_window())
        self.close()

    def on_rejected(self):
        self.__a_result.set_result(None)
        self.close()

    def closeEvent(self, a0):
        super().closeEvent(a0)
        if not self.__a_result.done():
            self.__a_result.set_result(None)

    async def a_show(self) -> dict | None:
        self.open()
        return await self.__a_result


class AVCallUI(QMainWindow):
    def __init__(self, host, icons_path: Path):
        super().__init__()
        self.host = host
        self.webrtc_call = None
        self.icons_path = icons_path
        self.initUI()

    @staticmethod
    async def run_qt_loop(app):
        while running:
            app.sendPostedEvents()
            await asyncio.sleep(0.1)

    @classmethod
    async def run(cls, parent, call_data):
        """Run PyQt loop and show the app"""
        media_dir = Path(await parent.host.bridge.config_get("", "media_dir"))
        icons_path = media_dir / "fonts/fontello/svg"
        app = QApplication([])
        av_call_gui = cls(parent.host, icons_path)
        av_call_gui.show()
        webrtc_call = await webrtc.WebRTCCall.make_webrtc_call(
            parent.host.bridge,
            parent.profile,
            call_data,
            sinks_data=webrtc.SinksApp(
                local_video_cb=partial(av_call_gui.on_new_sample, video_stream="local"),
                remote_video_cb=partial(av_call_gui.on_new_sample, video_stream="remote"),
            ),
            # we want to be sure that call is ended if user presses `Ctrl + c` or anything
            # else stops the session.
            on_call_setup_cb=lambda sid, profile: parent.host.add_on_quit_callback(
                parent.host.bridge.call_end, sid, "", profile
            ),
            on_call_ended_cb=lambda sid, profile: parent.host.a_quit(),
        )
        av_call_gui.webrtc_call = webrtc_call

        global running
        running = True
        await cls.run_qt_loop(app)
        await parent.host.a_quit()

    def initUI(self):
        self.setGeometry(100, 100, 800, 600)
        self.setWindowTitle("Call")

        # Main layouts
        self.background_widget = QWidget(self)
        self.foreground_widget = QWidget(self)
        self.setCentralWidget(self.background_widget)
        back_layout = QVBoxLayout(self.background_widget)
        front_layout = QVBoxLayout(self.foreground_widget)

        # Remote video
        self.remote_video_widget = QLabel(self)
        self.remote_video_widget.setSizePolicy(
            QSizePolicy.Policy.Ignored, QSizePolicy.Policy.Ignored
        )
        back_layout.addWidget(self.remote_video_widget)

        # Fullscreen button
        fullscreen_layout = QHBoxLayout()
        front_layout.addLayout(fullscreen_layout)
        fullscreen_layout.addStretch()
        self.fullscreen_btn = QPushButton("", self)
        self.fullscreen_btn.setFixedSize(BUTTON_SIZE)
        self.fullscreen_icon_normal = QIcon(str(self.icons_path / "resize-full.svg"))
        self.fullscreen_icon_fullscreen = QIcon(str(self.icons_path / "resize-small.svg"))
        self.fullscreen_btn.setIcon(self.fullscreen_icon_normal)
        self.fullscreen_btn.setIconSize(ICON_SIZE)
        self.fullscreen_btn.clicked.connect(self.toggle_fullscreen)
        fullscreen_layout.addWidget(self.fullscreen_btn)

        # Control buttons
        self.control_buttons_layout = QHBoxLayout()
        self.control_buttons_layout.setSpacing(40)
        self.toggle_video_btn = cast(
            ActivableButton, self.add_control_button("videocam", self.toggle_video)
        )
        self.toggle_audio_btn = cast(
            ActivableButton, self.add_control_button("volume-up", self.toggle_audio)
        )
        self.share_desktop_btn = cast(
            ActivableButton, self.add_control_button("desktop", self.share_desktop)
        )
        self.share_desktop_btn.deactivated_colour = "#47c68e"
        self.share_desktop_btn.activated_colour = "#f24468"
        self.share_desktop_btn.line_colour = "#666666"
        self.share_desktop_btn.activated = False
        self.hang_up_btn = self.add_control_button(
            "phone", self.hang_up, rotate=135, background="red", activable=False
        )

        controls_widget = QWidget(self)
        controls_widget.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
        controls_widget.setLayout(self.control_buttons_layout)
        front_layout.addStretch()

        bottom_layout = QHBoxLayout()
        bottom_layout.addStretch()
        front_layout.addLayout(bottom_layout)
        bottom_layout.addWidget(controls_widget, alignment=Qt.AlignmentFlag.AlignBottom)

        # Local video feedback
        bottom_layout.addStretch()
        self.local_video_widget = QLabel(self)
        bottom_layout.addWidget(self.local_video_widget)

        # we update sizes on resize event
        self.background_widget.resizeEvent = self.adjust_sizes
        self.adjust_sizes()

    def add_control_button(
        self,
        icon_name: str,
        callback: Callable,
        rotate: float | None = None,
        background: str | None = None,
        activable: bool = True,
    ) -> QPushButton | ActivableButton:
        if activable:
            button = ActivableButton("", self)
        else:
            button = QPushButton("", self)
        icon_path = self.icons_path / f"{icon_name}.svg"
        button.setIcon(QIcon(str(icon_path)))
        button.setIconSize(ICON_SIZE)
        button.setFixedSize(BUTTON_SIZE)
        if rotate is not None:
            pixmap = button.icon().pixmap(ICON_SIZE)
            transform = QTransform()
            transform.rotate(rotate)
            rotated_pixmap = pixmap.transformed(transform)
            button.setIcon(QIcon(rotated_pixmap))
        if background:
            button.setStyleSheet(f"background-color: {background};")
        button.clicked.connect(callback)
        self.control_buttons_layout.addWidget(button)
        return button

    def adjust_sizes(self, a0: QResizeEvent | None = None) -> None:
        self.foreground_widget.setGeometry(
            0, 0, self.background_widget.width(), self.background_widget.height()
        )
        self.local_video_widget.setFixedSize(QSize(self.width() // 3, self.height() // 3))
        if a0 is not None:
            super().resizeEvent(a0)

    def on_new_sample(self, video_sink, video_stream: str) -> bool:
        sample = video_sink.emit("pull-sample")
        if sample is None:
            return False

        video_pad = video_sink.get_static_pad("sink")
        assert video_pad is not None
        s = video_pad.get_current_caps().get_structure(0)
        stream_size = (s.get_value("width"), s.get_value("height"))
        self.host.loop.loop.call_soon_threadsafe(
            self.update_sample, sample, stream_size, video_stream
        )

        return False

    def update_sample(self, sample, stream_size, video_stream: str) -> None:
        if sample is None:
            return

        video_widget = (
            self.remote_video_widget
            if video_stream == "remote"
            else self.local_video_widget
        )

        buf = sample.get_buffer()
        result, mapinfo = buf.map(Gst.MapFlags.READ)
        if result:
            buffer = mapinfo.data
            width, height = stream_size
            qimage = QImage(buffer, width, height, QImage.Format.Format_RGB888)
            pixmap = QPixmap.fromImage(qimage).scaled(
                QSize(video_widget.width(), video_widget.height()),
                Qt.AspectRatioMode.KeepAspectRatio,
            )
            video_widget.setPixmap(pixmap)
            video_widget.setAlignment(Qt.AlignmentFlag.AlignCenter)

            buf.unmap(mapinfo)

    def toggle_fullscreen(self):
        fullscreen = not self.isFullScreen()
        if fullscreen:
            self.fullscreen_btn.setIcon(self.fullscreen_icon_fullscreen)
            self.showFullScreen()
        else:
            self.fullscreen_btn.setIcon(self.fullscreen_icon_normal)
            self.showNormal()

    def closeEvent(self, a0: QCloseEvent | None) -> None:
        super().closeEvent(a0)
        global running
        running = False

    def toggle_video(self):
        assert self.webrtc_call is not None
        self.webrtc_call.webrtc.video_muted = not self.webrtc_call.webrtc.video_muted
        self.toggle_video_btn.activated = not self.webrtc_call.webrtc.video_muted

    def toggle_audio(self):
        assert self.webrtc_call is not None
        self.webrtc_call.webrtc.audio_muted = not self.webrtc_call.webrtc.audio_muted
        self.toggle_audio_btn.activated = not self.webrtc_call.webrtc.audio_muted

    def share_desktop(self):
        assert self.webrtc_call is not None
        if self.webrtc_call.webrtc.desktop_sharing:
            self.webrtc_call.webrtc.desktop_sharing = False
            self.share_desktop_btn.activated = False
        elif display_servers.detect() == display_servers.X11:
            aio.run_async(self.show_X11_screen_dialog())
        else:
            self.webrtc_call.webrtc.desktop_sharing = True

    def hang_up(self):
        self.close()

    @staticmethod
    def can_run():
        # if a known display server is detected, we should be able to run
        return display_servers.detect() is not None

    async def show_X11_screen_dialog(self):
        assert self.webrtc_call is not None
        windows_data = display_servers.x11_list_windows()
        dialog = X11DesktopScreenDialog(windows_data, self)
        selected = await dialog.a_show()
        if selected is not None:
            xid = selected["id"]
            self.webrtc_call.webrtc.desktop_sharing_data = {"xid": xid}
            self.webrtc_call.webrtc.desktop_sharing = True
            self.share_desktop_btn.activated = True