Mercurial > libervia-backend
view libervia/cli/call_gui.py @ 4306:94e0968987cd
plugin XEP-0033: code modernisation, improve delivery, data validation:
- Code has been rewritten using Pydantic models and `async` coroutines for data validation
and cleaner element parsing/generation.
- Delivery has been completely rewritten. It now works even if server doesn't support
multicast, and send to local multicast service first. Delivering to local multicast
service first is due to bad support of XEP-0033 in server (notably Prosody which has an
incomplete implementation), and the current impossibility to detect if a sub-domain
service handles fully multicast or only for local domains. This is a workaround to have
a good balance between backward compatilibity and use of bandwith, and to make it work
with the incoming email gateway implementation (the gateway will only deliver to
entities of its own domain).
- disco feature checking now uses `async` corountines. `host` implementation still use
Deferred return values for compatibility with legacy code.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children | 00837fa13e5a |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio from functools import partial from pathlib import Path from typing import Callable, cast from PyQt6.QtCore import QPoint, QSize, Qt from PyQt6.QtGui import ( QCloseEvent, QColor, QIcon, QImage, QPainter, QPen, QPixmap, QResizeEvent, QTransform, ) from PyQt6.QtWidgets import ( QApplication, QDialog, QDialogButtonBox, QHBoxLayout, QLabel, QListWidget, QListWidgetItem, QMainWindow, QPushButton, QSizePolicy, QVBoxLayout, QWidget, ) import gi from libervia.backend.core.i18n import _ from libervia.frontends.tools import aio, display_servers, webrtc gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import Gst ICON_SIZE = QSize(45, 45) BUTTON_SIZE = QSize(50, 50) running = False aio.install_glib_asyncio_iteration() class ActivableButton(QPushButton): def __init__(self, text, parent=None): super().__init__(parent) self._activated = True self._activated_colour = "#47c68e" self._deactivated_colour = "#ffe089" self._line_colour = "#ff0000" self._update_background_color() @property def activated_colour(self) -> str: return self._activated_colour @activated_colour.setter def activated_colour(self, new_colour: str) -> None: if new_colour != self._activated_colour: self._activated_colour = new_colour self._update_background_color() @property def deactivated_colour(self) -> str: return self._deactivated_colour @deactivated_colour.setter def deactivated_colour(self, new_colour: str) -> None: if new_colour != self._deactivated_colour: self._deactivated_colour = new_colour self._update_background_color() @property def line_colour(self) -> str: return self._line_colour @line_colour.setter def line_colour(self, new_colour: str) -> None: if new_colour != self._line_colour: self._line_colour = new_colour self.update() def paintEvent(self, a0): super().paintEvent(a0) if not self._activated: painter = QPainter(self) painter.setRenderHint(QPainter.RenderHint.Antialiasing) line_color = QColor(self._line_colour) line_width = 4 cap_style = Qt.PenCapStyle.RoundCap pen = QPen(line_color, line_width, Qt.PenStyle.SolidLine) pen.setCapStyle(cap_style) painter.setPen(pen) margin = 5 start_point = QPoint(margin, self.height() - margin) end_point = QPoint(self.width() - margin, margin) painter.drawLine(start_point, end_point) def _update_background_color(self): if self._activated: self.setStyleSheet(f"background-color: {self._activated_colour};") else: self.setStyleSheet(f"background-color: {self._deactivated_colour};") self.update() @property def activated(self): return self._activated @activated.setter def activated(self, value): if self._activated != value: self._activated = value self._update_background_color() class X11DesktopScreenDialog(QDialog): def __init__(self, windows_data, parent=None): super().__init__(parent) self.__a_result = asyncio.get_running_loop().create_future() self.setWindowTitle("Please select a window to share:") self.resize(400, 300) self.list_widget = QListWidget(self) for window_data in windows_data: item = QListWidgetItem(window_data["title"]) item.setData(Qt.ItemDataRole.UserRole, window_data) self.list_widget.addItem(item) self.buttonBox = QDialogButtonBox( QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel ) self.buttonBox.accepted.connect(self.on_accepted) self.buttonBox.rejected.connect(self.on_rejected) layout = QVBoxLayout(self) layout.addWidget(self.list_widget) layout.addWidget(self.buttonBox) def get_selected_window(self) -> dict | None: selectedItem = self.list_widget.currentItem() if selectedItem: return selectedItem.data(Qt.ItemDataRole.UserRole) return None def on_accepted(self): self.__a_result.set_result(self.get_selected_window()) self.close() def on_rejected(self): self.__a_result.set_result(None) self.close() def closeEvent(self, a0): super().closeEvent(a0) if not self.__a_result.done(): self.__a_result.set_result(None) async def a_show(self) -> dict | None: self.open() return await self.__a_result class AVCallUI(QMainWindow): def __init__(self, host, icons_path: Path): super().__init__() self.host = host self.webrtc_call = None self.icons_path = icons_path self.initUI() @staticmethod async def run_qt_loop(app): while running: app.sendPostedEvents() await asyncio.sleep(0.1) @classmethod async def run(cls, parent, call_data): """Run PyQt loop and show the app""" media_dir = Path(await parent.host.bridge.config_get("", "media_dir")) icons_path = media_dir / "fonts/fontello/svg" app = QApplication([]) av_call_gui = cls(parent.host, icons_path) av_call_gui.show() webrtc_call = await webrtc.WebRTCCall.make_webrtc_call( parent.host.bridge, parent.profile, call_data, sinks_data=webrtc.SinksApp( local_video_cb=partial(av_call_gui.on_new_sample, video_stream="local"), remote_video_cb=partial(av_call_gui.on_new_sample, video_stream="remote"), ), # we want to be sure that call is ended if user presses `Ctrl + c` or anything # else stops the session. on_call_setup_cb=lambda sid, profile: parent.host.add_on_quit_callback( parent.host.bridge.call_end, sid, "", profile ), on_call_ended_cb=lambda sid, profile: parent.host.a_quit(), ) av_call_gui.webrtc_call = webrtc_call global running running = True await cls.run_qt_loop(app) await parent.host.a_quit() def initUI(self): self.setGeometry(100, 100, 800, 600) self.setWindowTitle("Call") # Main layouts self.background_widget = QWidget(self) self.foreground_widget = QWidget(self) self.setCentralWidget(self.background_widget) back_layout = QVBoxLayout(self.background_widget) front_layout = QVBoxLayout(self.foreground_widget) # Remote video self.remote_video_widget = QLabel(self) self.remote_video_widget.setSizePolicy( QSizePolicy.Policy.Ignored, QSizePolicy.Policy.Ignored ) back_layout.addWidget(self.remote_video_widget) # Fullscreen button fullscreen_layout = QHBoxLayout() front_layout.addLayout(fullscreen_layout) fullscreen_layout.addStretch() self.fullscreen_btn = QPushButton("", self) self.fullscreen_btn.setFixedSize(BUTTON_SIZE) self.fullscreen_icon_normal = QIcon(str(self.icons_path / "resize-full.svg")) self.fullscreen_icon_fullscreen = QIcon(str(self.icons_path / "resize-small.svg")) self.fullscreen_btn.setIcon(self.fullscreen_icon_normal) self.fullscreen_btn.setIconSize(ICON_SIZE) self.fullscreen_btn.clicked.connect(self.toggle_fullscreen) fullscreen_layout.addWidget(self.fullscreen_btn) # Control buttons self.control_buttons_layout = QHBoxLayout() self.control_buttons_layout.setSpacing(40) self.toggle_video_btn = cast( ActivableButton, self.add_control_button("videocam", self.toggle_video) ) self.toggle_audio_btn = cast( ActivableButton, self.add_control_button("volume-up", self.toggle_audio) ) self.share_desktop_btn = cast( ActivableButton, self.add_control_button("desktop", self.share_desktop) ) self.share_desktop_btn.deactivated_colour = "#47c68e" self.share_desktop_btn.activated_colour = "#f24468" self.share_desktop_btn.line_colour = "#666666" self.share_desktop_btn.activated = False self.hang_up_btn = self.add_control_button( "phone", self.hang_up, rotate=135, background="red", activable=False ) controls_widget = QWidget(self) controls_widget.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed) controls_widget.setLayout(self.control_buttons_layout) front_layout.addStretch() bottom_layout = QHBoxLayout() bottom_layout.addStretch() front_layout.addLayout(bottom_layout) bottom_layout.addWidget(controls_widget, alignment=Qt.AlignmentFlag.AlignBottom) # Local video feedback bottom_layout.addStretch() self.local_video_widget = QLabel(self) bottom_layout.addWidget(self.local_video_widget) # we update sizes on resize event self.background_widget.resizeEvent = self.adjust_sizes self.adjust_sizes() def add_control_button( self, icon_name: str, callback: Callable, rotate: float | None = None, background: str | None = None, activable: bool = True, ) -> QPushButton | ActivableButton: if activable: button = ActivableButton("", self) else: button = QPushButton("", self) icon_path = self.icons_path / f"{icon_name}.svg" button.setIcon(QIcon(str(icon_path))) button.setIconSize(ICON_SIZE) button.setFixedSize(BUTTON_SIZE) if rotate is not None: pixmap = button.icon().pixmap(ICON_SIZE) transform = QTransform() transform.rotate(rotate) rotated_pixmap = pixmap.transformed(transform) button.setIcon(QIcon(rotated_pixmap)) if background: button.setStyleSheet(f"background-color: {background};") button.clicked.connect(callback) self.control_buttons_layout.addWidget(button) return button def adjust_sizes(self, a0: QResizeEvent | None = None) -> None: self.foreground_widget.setGeometry( 0, 0, self.background_widget.width(), self.background_widget.height() ) self.local_video_widget.setFixedSize(QSize(self.width() // 3, self.height() // 3)) if a0 is not None: super().resizeEvent(a0) def on_new_sample(self, video_sink, video_stream: str) -> bool: sample = video_sink.emit("pull-sample") if sample is None: return False video_pad = video_sink.get_static_pad("sink") assert video_pad is not None s = video_pad.get_current_caps().get_structure(0) stream_size = (s.get_value("width"), s.get_value("height")) self.host.loop.loop.call_soon_threadsafe( self.update_sample, sample, stream_size, video_stream ) return False def update_sample(self, sample, stream_size, video_stream: str) -> None: if sample is None: return video_widget = ( self.remote_video_widget if video_stream == "remote" else self.local_video_widget ) buf = sample.get_buffer() result, mapinfo = buf.map(Gst.MapFlags.READ) if result: buffer = mapinfo.data width, height = stream_size qimage = QImage(buffer, width, height, QImage.Format.Format_RGB888) pixmap = QPixmap.fromImage(qimage).scaled( QSize(video_widget.width(), video_widget.height()), Qt.AspectRatioMode.KeepAspectRatio, ) video_widget.setPixmap(pixmap) video_widget.setAlignment(Qt.AlignmentFlag.AlignCenter) buf.unmap(mapinfo) def toggle_fullscreen(self): fullscreen = not self.isFullScreen() if fullscreen: self.fullscreen_btn.setIcon(self.fullscreen_icon_fullscreen) self.showFullScreen() else: self.fullscreen_btn.setIcon(self.fullscreen_icon_normal) self.showNormal() def closeEvent(self, a0: QCloseEvent | None) -> None: super().closeEvent(a0) global running running = False def toggle_video(self): assert self.webrtc_call is not None self.webrtc_call.webrtc.video_muted = not self.webrtc_call.webrtc.video_muted self.toggle_video_btn.activated = not self.webrtc_call.webrtc.video_muted def toggle_audio(self): assert self.webrtc_call is not None self.webrtc_call.webrtc.audio_muted = not self.webrtc_call.webrtc.audio_muted self.toggle_audio_btn.activated = not self.webrtc_call.webrtc.audio_muted def share_desktop(self): assert self.webrtc_call is not None if self.webrtc_call.webrtc.desktop_sharing: self.webrtc_call.webrtc.desktop_sharing = False self.share_desktop_btn.activated = False elif display_servers.detect() == display_servers.X11: aio.run_async(self.show_X11_screen_dialog()) else: self.webrtc_call.webrtc.desktop_sharing = True def hang_up(self): self.close() @staticmethod def can_run(): # if a known display server is detected, we should be able to run return display_servers.detect() is not None async def show_X11_screen_dialog(self): assert self.webrtc_call is not None windows_data = display_servers.x11_list_windows() dialog = X11DesktopScreenDialog(windows_data, self) selected = await dialog.a_show() if selected is not None: xid = selected["id"] self.webrtc_call.webrtc.desktop_sharing_data = {"xid": xid} self.webrtc_call.webrtc.desktop_sharing = True self.share_desktop_btn.activated = True