Mercurial > libervia-backend
view libervia/cli/call_gui.py @ 4240:79c8a70e1813
backend, frontend: prepare remote control:
This is a series of changes necessary to prepare the implementation of remote control
feature:
- XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several
applications are working in a same session, to know which one must be handled first.
Will be used to make Remote Control have precedence over Call content.
- XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the
benefit to have methods called in parallels is very low, and it cause a lot of trouble
as we can't predict order. Methods are now called sequentially so workflow can be
predicted.
- XEP-0167: fix `senders` XMPP attribute <=> SDP mapping
- XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so
the same key can be used with other jingle applications.
- XEP-0167, XEP-0343: move some method to XEP-0167
- XEP-0353: use new `priority` feature to call preflight methods of applications according
to it.
- frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism
based on Pydantic models. It is now possible to have has many Data Channel as necessary,
to have them in addition to A/V streams, to specify manually GStreamer sources and
sinks, etc.
- frontend (webrtc): rework of the pipeline to reduce latency.
- frontend: new `portal_desktop` method. Screenshare portal handling has been moved there,
and RemoteDesktop portal has been added.
- frontend (webrtc): fix `extract_ufrag_pwd` method.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:41 +0200 |
parents | d01b8d002619 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio from functools import partial from pathlib import Path from typing import Callable, cast from PyQt6.QtCore import QPoint, QSize, Qt from PyQt6.QtGui import ( QCloseEvent, QColor, QIcon, QImage, QPainter, QPen, QPixmap, QResizeEvent, QTransform, ) from PyQt6.QtWidgets import ( QApplication, QDialog, QDialogButtonBox, QHBoxLayout, QLabel, QListWidget, QListWidgetItem, QMainWindow, QPushButton, QSizePolicy, QVBoxLayout, QWidget, ) import gi from libervia.backend.core.i18n import _ from libervia.frontends.tools import aio, display_servers, webrtc gi.require_versions({ "Gst": "1.0", "GstWebRTC": "1.0" }) from gi.repository import Gst ICON_SIZE = QSize(45, 45) BUTTON_SIZE = QSize(50, 50) running = False aio.install_glib_asyncio_iteration() class ActivableButton(QPushButton): def __init__(self, text, parent=None): super().__init__(parent) self._activated = True self._activated_colour = "#47c68e" self._deactivated_colour = "#ffe089" self._line_colour = "#ff0000" self._update_background_color() @property def activated_colour(self) -> str: return self._activated_colour @activated_colour.setter def activated_colour(self, new_colour: str) -> None: if new_colour != self._activated_colour: self._activated_colour = new_colour self._update_background_color() @property def deactivated_colour(self) -> str: return self._deactivated_colour @deactivated_colour.setter def deactivated_colour(self, new_colour: str) -> None: if new_colour != self._deactivated_colour: self._deactivated_colour = new_colour self._update_background_color() @property def line_colour(self) -> str: return self._line_colour @line_colour.setter def line_colour(self, new_colour: str) -> None: if new_colour != self._line_colour: self._line_colour = new_colour self.update() def paintEvent(self, a0): super().paintEvent(a0) if not self._activated: painter = QPainter(self) painter.setRenderHint(QPainter.RenderHint.Antialiasing) line_color = QColor(self._line_colour) line_width = 4 cap_style = Qt.PenCapStyle.RoundCap pen = QPen(line_color, line_width, Qt.PenStyle.SolidLine) pen.setCapStyle(cap_style) painter.setPen(pen) margin = 5 start_point = QPoint(margin, self.height() - margin) end_point = QPoint(self.width() - margin, margin) painter.drawLine(start_point, end_point) def _update_background_color(self): if self._activated: self.setStyleSheet(f"background-color: {self._activated_colour};") else: self.setStyleSheet(f"background-color: {self._deactivated_colour};") self.update() @property def activated(self): return self._activated @activated.setter def activated(self, value): if self._activated != value: self._activated = value self._update_background_color() class X11DesktopScreenDialog(QDialog): def __init__(self, windows_data, parent=None): super().__init__(parent) self.__a_result = asyncio.get_running_loop().create_future() self.setWindowTitle("Please select a window to share:") self.resize(400, 300) self.list_widget = QListWidget(self) for window_data in windows_data: item = QListWidgetItem(window_data["title"]) item.setData(Qt.ItemDataRole.UserRole, window_data) self.list_widget.addItem(item) self.buttonBox = QDialogButtonBox( QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel ) self.buttonBox.accepted.connect(self.on_accepted) self.buttonBox.rejected.connect(self.on_rejected) layout = QVBoxLayout(self) layout.addWidget(self.list_widget) layout.addWidget(self.buttonBox) def get_selected_window(self) -> dict | None: selectedItem = self.list_widget.currentItem() if selectedItem: return selectedItem.data(Qt.ItemDataRole.UserRole) return None def on_accepted(self): self.__a_result.set_result(self.get_selected_window()) self.close() def on_rejected(self): self.__a_result.set_result(None) self.close() def closeEvent(self, a0): super().closeEvent(a0) if not self.__a_result.done(): self.__a_result.set_result(None) async def a_show(self) -> dict | None: self.open() return await self.__a_result class AVCallUI(QMainWindow): def __init__(self, host, icons_path: Path): super().__init__() self.host = host self.webrtc_call = None self.icons_path = icons_path self.initUI() @staticmethod async def run_qt_loop(app): while running: app.sendPostedEvents() await asyncio.sleep(0.1) @classmethod async def run(cls, parent, call_data): """Run PyQt loop and show the app""" media_dir = Path(await parent.host.bridge.config_get("", "media_dir")) icons_path = media_dir / "fonts/fontello/svg" app = QApplication([]) av_call_gui = cls(parent.host, icons_path) av_call_gui.show() webrtc_call = await webrtc.WebRTCCall.make_webrtc_call( parent.host.bridge, parent.profile, call_data, sinks_data=webrtc.SinksApp( local_video_cb=partial(av_call_gui.on_new_sample, video_stream="local"), remote_video_cb=partial(av_call_gui.on_new_sample, video_stream="remote"), ), # we want to be sure that call is ended if user presses `Ctrl + c` or anything # else stops the session. on_call_setup_cb=lambda sid, profile: parent.host.add_on_quit_callback( parent.host.bridge.call_end, sid, "", profile ), on_call_ended_cb=lambda sid, profile: parent.host.a_quit(), ) av_call_gui.webrtc_call = webrtc_call global running running = True await cls.run_qt_loop(app) await parent.host.a_quit() def initUI(self): self.setGeometry(100, 100, 800, 600) self.setWindowTitle("Call") # Main layouts self.background_widget = QWidget(self) self.foreground_widget = QWidget(self) self.setCentralWidget(self.background_widget) back_layout = QVBoxLayout(self.background_widget) front_layout = QVBoxLayout(self.foreground_widget) # Remote video self.remote_video_widget = QLabel(self) self.remote_video_widget.setSizePolicy( QSizePolicy.Policy.Ignored, QSizePolicy.Policy.Ignored ) back_layout.addWidget(self.remote_video_widget) # Fullscreen button fullscreen_layout = QHBoxLayout() front_layout.addLayout(fullscreen_layout) fullscreen_layout.addStretch() self.fullscreen_btn = QPushButton("", self) self.fullscreen_btn.setFixedSize(BUTTON_SIZE) self.fullscreen_icon_normal = QIcon(str(self.icons_path / "resize-full.svg")) self.fullscreen_icon_fullscreen = QIcon(str(self.icons_path / "resize-small.svg")) self.fullscreen_btn.setIcon(self.fullscreen_icon_normal) self.fullscreen_btn.setIconSize(ICON_SIZE) self.fullscreen_btn.clicked.connect(self.toggle_fullscreen) fullscreen_layout.addWidget(self.fullscreen_btn) # Control buttons self.control_buttons_layout = QHBoxLayout() self.control_buttons_layout.setSpacing(40) self.toggle_video_btn = cast( ActivableButton, self.add_control_button("videocam", self.toggle_video) ) self.toggle_audio_btn = cast( ActivableButton, self.add_control_button("volume-up", self.toggle_audio) ) self.share_desktop_btn = cast( ActivableButton, self.add_control_button("desktop", self.share_desktop) ) self.share_desktop_btn.deactivated_colour = "#47c68e" self.share_desktop_btn.activated_colour = "#f24468" self.share_desktop_btn.line_colour = "#666666" self.share_desktop_btn.activated = False self.hang_up_btn = self.add_control_button( "phone", self.hang_up, rotate=135, background="red", activable=False ) controls_widget = QWidget(self) controls_widget.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed) controls_widget.setLayout(self.control_buttons_layout) front_layout.addStretch() bottom_layout = QHBoxLayout() bottom_layout.addStretch() front_layout.addLayout(bottom_layout) bottom_layout.addWidget(controls_widget, alignment=Qt.AlignmentFlag.AlignBottom) # Local video feedback bottom_layout.addStretch() self.local_video_widget = QLabel(self) bottom_layout.addWidget(self.local_video_widget) # we update sizes on resize event self.background_widget.resizeEvent = self.adjust_sizes self.adjust_sizes() def add_control_button( self, icon_name: str, callback: Callable, rotate: float | None = None, background: str | None = None, activable: bool = True, ) -> QPushButton | ActivableButton: if activable: button = ActivableButton("", self) else: button = QPushButton("", self) icon_path = self.icons_path / f"{icon_name}.svg" button.setIcon(QIcon(str(icon_path))) button.setIconSize(ICON_SIZE) button.setFixedSize(BUTTON_SIZE) if rotate is not None: pixmap = button.icon().pixmap(ICON_SIZE) transform = QTransform() transform.rotate(rotate) rotated_pixmap = pixmap.transformed(transform) button.setIcon(QIcon(rotated_pixmap)) if background: button.setStyleSheet(f"background-color: {background};") button.clicked.connect(callback) self.control_buttons_layout.addWidget(button) return button def adjust_sizes(self, a0: QResizeEvent | None = None) -> None: self.foreground_widget.setGeometry( 0, 0, self.background_widget.width(), self.background_widget.height() ) self.local_video_widget.setFixedSize(QSize(self.width() // 3, self.height() // 3)) if a0 is not None: super().resizeEvent(a0) def on_new_sample(self, video_sink, video_stream: str) -> bool: sample = video_sink.emit("pull-sample") if sample is None: return False video_pad = video_sink.get_static_pad("sink") assert video_pad is not None s = video_pad.get_current_caps().get_structure(0) stream_size = (s.get_value("width"), s.get_value("height")) self.host.loop.loop.call_soon_threadsafe( self.update_sample, sample, stream_size, video_stream ) return False def update_sample(self, sample, stream_size, video_stream: str) -> None: if sample is None: return video_widget = ( self.remote_video_widget if video_stream == "remote" else self.local_video_widget ) buf = sample.get_buffer() result, mapinfo = buf.map(Gst.MapFlags.READ) if result: buffer = mapinfo.data width, height = stream_size qimage = QImage(buffer, width, height, QImage.Format.Format_RGB888) pixmap = QPixmap.fromImage(qimage).scaled( QSize(video_widget.width(), video_widget.height()), Qt.AspectRatioMode.KeepAspectRatio, ) video_widget.setPixmap(pixmap) video_widget.setAlignment(Qt.AlignmentFlag.AlignCenter) buf.unmap(mapinfo) def toggle_fullscreen(self): fullscreen = not self.isFullScreen() if fullscreen: self.fullscreen_btn.setIcon(self.fullscreen_icon_fullscreen) self.showFullScreen() else: self.fullscreen_btn.setIcon(self.fullscreen_icon_normal) self.showNormal() def closeEvent(self, a0: QCloseEvent|None) -> None: super().closeEvent(a0) global running running = False def toggle_video(self): assert self.webrtc_call is not None self.webrtc_call.webrtc.video_muted = not self.webrtc_call.webrtc.video_muted self.toggle_video_btn.activated = not self.webrtc_call.webrtc.video_muted def toggle_audio(self): assert self.webrtc_call is not None self.webrtc_call.webrtc.audio_muted = not self.webrtc_call.webrtc.audio_muted self.toggle_audio_btn.activated = not self.webrtc_call.webrtc.audio_muted def share_desktop(self): assert self.webrtc_call is not None if self.webrtc_call.webrtc.desktop_sharing: self.webrtc_call.webrtc.desktop_sharing = False self.share_desktop_btn.activated = False elif display_servers.detect() == display_servers.X11: aio.run_async(self.show_X11_screen_dialog()) else: self.webrtc_call.webrtc.desktop_sharing = True def hang_up(self): self.close() @staticmethod def can_run(): # if a known display server is detected, we should be able to run return display_servers.detect() is not None async def show_X11_screen_dialog(self): assert self.webrtc_call is not None windows_data = display_servers.x11_list_windows() dialog = X11DesktopScreenDialog(windows_data, self) selected = await dialog.a_show() if selected is not None: xid = selected["id"] self.webrtc_call.webrtc.desktop_sharing_data = {"xid": xid} self.webrtc_call.webrtc.desktop_sharing = True self.share_desktop_btn.activated = True