view libervia/cli/call_gui.py @ 4219:1b5cf2ee1d86

plugin XEP-0384, XEP-0391: download missing devices list: when a peer jid was not in our roster, devices list was not retrieved, resulting in failed en/decryption. This patch does check it and download missing devices list in necessary. There is no subscription managed yet, so the list won't be updated in case of new devices, this should be addressed at some point.
author Goffi <goffi@goffi.org>
date Tue, 05 Mar 2024 17:31:36 +0100
parents 9218d4331bb2
children d01b8d002619
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
from functools import partial
from pathlib import Path
from typing import Callable, cast

from PyQt6.QtCore import QPoint, QSize, Qt
from PyQt6.QtGui import (
    QCloseEvent,
    QColor,
    QIcon,
    QImage,
    QPainter,
    QPen,
    QPixmap,
    QResizeEvent,
    QTransform,
)
from PyQt6.QtWidgets import (
    QApplication,
    QDialog,
    QDialogButtonBox,
    QHBoxLayout,
    QLabel,
    QListWidget,
    QListWidgetItem,
    QMainWindow,
    QPushButton,
    QSizePolicy,
    QVBoxLayout,
    QWidget,
)
import gi

from libervia.backend.core.i18n import _
from libervia.cli.call_webrtc import WebRTCCall
from libervia.frontends.tools import aio, display_servers, webrtc
gi.require_versions({
    "Gst": "1.0",
    "GstWebRTC": "1.0"
})
from gi.repository import Gst



ICON_SIZE = QSize(45, 45)
BUTTON_SIZE = QSize(50, 50)
running = False


class ActivableButton(QPushButton):
    def __init__(self, text, parent=None):
        super().__init__(parent)
        self._activated = True
        self._activated_colour = "#47c68e"
        self._deactivated_colour = "#ffe089"
        self._line_colour = "#ff0000"
        self._update_background_color()

    @property
    def activated_colour(self) -> str:
        return self._activated_colour

    @activated_colour.setter
    def activated_colour(self, new_colour: str) -> None:
        if new_colour != self._activated_colour:
            self._activated_colour = new_colour
            self._update_background_color()

    @property
    def deactivated_colour(self) -> str:
        return self._deactivated_colour

    @deactivated_colour.setter
    def deactivated_colour(self, new_colour: str) -> None:
        if new_colour != self._deactivated_colour:
            self._deactivated_colour = new_colour
            self._update_background_color()

    @property
    def line_colour(self) -> str:
        return self._line_colour

    @line_colour.setter
    def line_colour(self, new_colour: str) -> None:
        if new_colour != self._line_colour:
            self._line_colour = new_colour
            self.update()

    def paintEvent(self, a0):
        super().paintEvent(a0)

        if not self._activated:
            painter = QPainter(self)
            painter.setRenderHint(QPainter.RenderHint.Antialiasing)

            line_color = QColor(self._line_colour)
            line_width = 4
            cap_style = Qt.PenCapStyle.RoundCap

            pen = QPen(line_color, line_width, Qt.PenStyle.SolidLine)
            pen.setCapStyle(cap_style)
            painter.setPen(pen)

            margin = 5
            start_point = QPoint(margin, self.height() - margin)
            end_point = QPoint(self.width() - margin, margin)
            painter.drawLine(start_point, end_point)

    def _update_background_color(self):
        if self._activated:
            self.setStyleSheet(f"background-color: {self._activated_colour};")
        else:
            self.setStyleSheet(f"background-color: {self._deactivated_colour};")
        self.update()

    @property
    def activated(self):
        return self._activated

    @activated.setter
    def activated(self, value):
        if self._activated != value:
            self._activated = value
            self._update_background_color()


class X11DesktopScreenDialog(QDialog):
    def __init__(self, windows_data, parent=None):
        super().__init__(parent)
        self.__a_result = asyncio.get_running_loop().create_future()
        self.setWindowTitle("Please select a window to share:")
        self.resize(400, 300)
        self.list_widget = QListWidget(self)
        for window_data in windows_data:
            item = QListWidgetItem(window_data["title"])
            item.setData(Qt.ItemDataRole.UserRole, window_data)
            self.list_widget.addItem(item)

        self.buttonBox = QDialogButtonBox(
            QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
        )
        self.buttonBox.accepted.connect(self.on_accepted)
        self.buttonBox.rejected.connect(self.on_rejected)

        layout = QVBoxLayout(self)
        layout.addWidget(self.list_widget)
        layout.addWidget(self.buttonBox)

    def get_selected_window(self) -> dict | None:
        selectedItem = self.list_widget.currentItem()
        if selectedItem:
            return selectedItem.data(Qt.ItemDataRole.UserRole)
        return None

    def on_accepted(self):
        self.__a_result.set_result(self.get_selected_window())
        self.close()

    def on_rejected(self):
        self.__a_result.set_result(None)
        self.close()

    def closeEvent(self, a0):
        super().closeEvent(a0)
        if not self.__a_result.done():
            self.__a_result.set_result(None)

    async def a_show(self) -> dict | None:
        self.open()
        return await self.__a_result


class AVCallUI(QMainWindow):
    def __init__(self, host, icons_path: Path):
        super().__init__()
        self.host = host
        self.webrtc_call = None
        self.icons_path = icons_path
        self.initUI()

    @staticmethod
    async def run_qt_loop(app):
        while running:
            app.sendPostedEvents()
            await asyncio.sleep(0.1)

    @classmethod
    async def run(cls, parent, call_data):
        """Run PyQt loop and show the app"""
        media_dir = Path(await parent.host.bridge.config_get("", "media_dir"))
        icons_path = media_dir / "fonts/fontello/svg"
        app = QApplication([])
        av_call_gui = cls(parent.host, icons_path)
        av_call_gui.show()
        webrtc_call = await WebRTCCall.make_webrtc_call(
            parent.host,
            parent.profile,
            call_data,
            sinks=webrtc.SINKS_APP,
            appsink_data=webrtc.AppSinkData(
                local_video_cb=partial(av_call_gui.on_new_sample, video_stream="local"),
                remote_video_cb=partial(av_call_gui.on_new_sample, video_stream="remote"),
            ),
        )
        av_call_gui.webrtc_call = webrtc_call

        global running
        running = True
        await cls.run_qt_loop(app)
        await parent.host.a_quit()

    def initUI(self):
        self.setGeometry(100, 100, 800, 600)
        self.setWindowTitle("Call")

        # Main layouts
        self.background_widget = QWidget(self)
        self.foreground_widget = QWidget(self)
        self.setCentralWidget(self.background_widget)
        back_layout = QVBoxLayout(self.background_widget)
        front_layout = QVBoxLayout(self.foreground_widget)

        # Remote video
        self.remote_video_widget = QLabel(self)
        self.remote_video_widget.setSizePolicy(
            QSizePolicy.Policy.Ignored, QSizePolicy.Policy.Ignored
        )
        back_layout.addWidget(self.remote_video_widget)

        # Fullscreen button
        fullscreen_layout = QHBoxLayout()
        front_layout.addLayout(fullscreen_layout)
        fullscreen_layout.addStretch()
        self.fullscreen_btn = QPushButton("", self)
        self.fullscreen_btn.setFixedSize(BUTTON_SIZE)
        self.fullscreen_icon_normal = QIcon(str(self.icons_path / "resize-full.svg"))
        self.fullscreen_icon_fullscreen = QIcon(str(self.icons_path / "resize-small.svg"))
        self.fullscreen_btn.setIcon(self.fullscreen_icon_normal)
        self.fullscreen_btn.setIconSize(ICON_SIZE)
        self.fullscreen_btn.clicked.connect(self.toggle_fullscreen)
        fullscreen_layout.addWidget(self.fullscreen_btn)

        # Control buttons
        self.control_buttons_layout = QHBoxLayout()
        self.control_buttons_layout.setSpacing(40)
        self.toggle_video_btn = cast(
            ActivableButton, self.add_control_button("videocam", self.toggle_video)
        )
        self.toggle_audio_btn = cast(
            ActivableButton, self.add_control_button("volume-up", self.toggle_audio)
        )
        self.share_desktop_btn = cast(
            ActivableButton, self.add_control_button("desktop", self.share_desktop)
        )
        self.share_desktop_btn.deactivated_colour = "#47c68e"
        self.share_desktop_btn.activated_colour = "#f24468"
        self.share_desktop_btn.line_colour = "#666666"
        self.share_desktop_btn.activated = False
        self.hang_up_btn = self.add_control_button(
            "phone", self.hang_up, rotate=135, background="red", activable=False
        )

        controls_widget = QWidget(self)
        controls_widget.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed)
        controls_widget.setLayout(self.control_buttons_layout)
        front_layout.addStretch()

        bottom_layout = QHBoxLayout()
        bottom_layout.addStretch()
        front_layout.addLayout(bottom_layout)
        bottom_layout.addWidget(controls_widget, alignment=Qt.AlignmentFlag.AlignBottom)

        # Local video feedback
        bottom_layout.addStretch()
        self.local_video_widget = QLabel(self)
        bottom_layout.addWidget(self.local_video_widget)

        # we update sizes on resize event
        self.background_widget.resizeEvent = self.adjust_sizes
        self.adjust_sizes()

    def add_control_button(
        self,
        icon_name: str,
        callback: Callable,
        rotate: float | None = None,
        background: str | None = None,
        activable: bool = True,
    ) -> QPushButton | ActivableButton:
        if activable:
            button = ActivableButton("", self)
        else:
            button = QPushButton("", self)
        icon_path = self.icons_path / f"{icon_name}.svg"
        button.setIcon(QIcon(str(icon_path)))
        button.setIconSize(ICON_SIZE)
        button.setFixedSize(BUTTON_SIZE)
        if rotate is not None:
            pixmap = button.icon().pixmap(ICON_SIZE)
            transform = QTransform()
            transform.rotate(rotate)
            rotated_pixmap = pixmap.transformed(transform)
            button.setIcon(QIcon(rotated_pixmap))
        if background:
            button.setStyleSheet(f"background-color: {background};")
        button.clicked.connect(callback)
        self.control_buttons_layout.addWidget(button)
        return button

    def adjust_sizes(self, a0: QResizeEvent | None = None) -> None:
        self.foreground_widget.setGeometry(
            0, 0, self.background_widget.width(), self.background_widget.height()
        )
        self.local_video_widget.setFixedSize(QSize(self.width() // 3, self.height() // 3))
        if a0 is not None:
            super().resizeEvent(a0)

    def on_new_sample(self, video_sink, video_stream: str) -> bool:
        sample = video_sink.emit("pull-sample")
        if sample is None:
            return False

        video_pad = video_sink.get_static_pad("sink")
        assert video_pad is not None
        s = video_pad.get_current_caps().get_structure(0)
        stream_size = (s.get_value("width"), s.get_value("height"))
        self.host.loop.loop.call_soon_threadsafe(
            self.update_sample, sample, stream_size, video_stream
        )

        return False

    def update_sample(self, sample, stream_size, video_stream: str) -> None:
        if sample is None:
            return

        video_widget = (
            self.remote_video_widget
            if video_stream == "remote"
            else self.local_video_widget
        )

        buf = sample.get_buffer()
        result, mapinfo = buf.map(Gst.MapFlags.READ)
        if result:
            buffer = mapinfo.data
            width, height = stream_size
            qimage = QImage(buffer, width, height, QImage.Format.Format_RGB888)
            pixmap = QPixmap.fromImage(qimage).scaled(
                QSize(video_widget.width(), video_widget.height()),
                Qt.AspectRatioMode.KeepAspectRatio,
            )
            video_widget.setPixmap(pixmap)
            video_widget.setAlignment(Qt.AlignmentFlag.AlignCenter)

            buf.unmap(mapinfo)

    def toggle_fullscreen(self):
        fullscreen = not self.isFullScreen()
        if fullscreen:
            self.fullscreen_btn.setIcon(self.fullscreen_icon_fullscreen)
            self.showFullScreen()
        else:
            self.fullscreen_btn.setIcon(self.fullscreen_icon_normal)
            self.showNormal()

    def closeEvent(self, a0: QCloseEvent|None) -> None:
        super().closeEvent(a0)
        global running
        running = False

    def toggle_video(self):
        assert self.webrtc_call is not None
        self.webrtc_call.webrtc.video_muted = not self.webrtc_call.webrtc.video_muted
        self.toggle_video_btn.activated = not self.webrtc_call.webrtc.video_muted

    def toggle_audio(self):
        assert self.webrtc_call is not None
        self.webrtc_call.webrtc.audio_muted = not self.webrtc_call.webrtc.audio_muted
        self.toggle_audio_btn.activated = not self.webrtc_call.webrtc.audio_muted

    def share_desktop(self):
        assert self.webrtc_call is not None
        if self.webrtc_call.webrtc.desktop_sharing:
            self.webrtc_call.webrtc.desktop_sharing = False
            self.share_desktop_btn.activated = False
        elif display_servers.detect() == display_servers.X11:
            aio.run_async(self.show_X11_screen_dialog())
        else:
            self.webrtc_call.webrtc.desktop_sharing = True

    def hang_up(self):
        self.close()

    @staticmethod
    def can_run():
        # if a known display server is detected, we should be able to run
        return display_servers.detect() is not None

    async def show_X11_screen_dialog(self):
        assert self.webrtc_call is not None
        windows_data = display_servers.x11_list_windows()
        dialog = X11DesktopScreenDialog(windows_data, self)
        selected = await dialog.a_show()
        if selected is not None:
            xid = selected["id"]
            self.webrtc_call.webrtc.desktop_sharing_data = {"xid": xid}
            self.webrtc_call.webrtc.desktop_sharing = True
            self.share_desktop_btn.activated = True