Mercurial > libervia-backend
view libervia/cli/call_gui.py @ 4309:b56b1eae7994
component email gateway: add multicasting:
XEP-0033 multicasting is now supported both for incoming and outgoing messages. XEP-0033
metadata are converted to suitable Email headers and vice versa.
Email address and JID are both supported, and delivery is done by the gateway when
suitable on incoming messages.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children | 00837fa13e5a |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio from functools import partial from pathlib import Path from typing import Callable, cast from PyQt6.QtCore import QPoint, QSize, Qt from PyQt6.QtGui import ( QCloseEvent, QColor, QIcon, QImage, QPainter, QPen, QPixmap, QResizeEvent, QTransform, ) from PyQt6.QtWidgets import ( QApplication, QDialog, QDialogButtonBox, QHBoxLayout, QLabel, QListWidget, QListWidgetItem, QMainWindow, QPushButton, QSizePolicy, QVBoxLayout, QWidget, ) import gi from libervia.backend.core.i18n import _ from libervia.frontends.tools import aio, display_servers, webrtc gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import Gst ICON_SIZE = QSize(45, 45) BUTTON_SIZE = QSize(50, 50) running = False aio.install_glib_asyncio_iteration() class ActivableButton(QPushButton): def __init__(self, text, parent=None): super().__init__(parent) self._activated = True self._activated_colour = "#47c68e" self._deactivated_colour = "#ffe089" self._line_colour = "#ff0000" self._update_background_color() @property def activated_colour(self) -> str: return self._activated_colour @activated_colour.setter def activated_colour(self, new_colour: str) -> None: if new_colour != self._activated_colour: self._activated_colour = new_colour self._update_background_color() @property def deactivated_colour(self) -> str: return self._deactivated_colour @deactivated_colour.setter def deactivated_colour(self, new_colour: str) -> None: if new_colour != self._deactivated_colour: self._deactivated_colour = new_colour self._update_background_color() @property def line_colour(self) -> str: return self._line_colour @line_colour.setter def line_colour(self, new_colour: str) -> None: if new_colour != self._line_colour: self._line_colour = new_colour self.update() def paintEvent(self, a0): super().paintEvent(a0) if not self._activated: painter = QPainter(self) painter.setRenderHint(QPainter.RenderHint.Antialiasing) line_color = QColor(self._line_colour) line_width = 4 cap_style = Qt.PenCapStyle.RoundCap pen = QPen(line_color, line_width, Qt.PenStyle.SolidLine) pen.setCapStyle(cap_style) painter.setPen(pen) margin = 5 start_point = QPoint(margin, self.height() - margin) end_point = QPoint(self.width() - margin, margin) painter.drawLine(start_point, end_point) def _update_background_color(self): if self._activated: self.setStyleSheet(f"background-color: {self._activated_colour};") else: self.setStyleSheet(f"background-color: {self._deactivated_colour};") self.update() @property def activated(self): return self._activated @activated.setter def activated(self, value): if self._activated != value: self._activated = value self._update_background_color() class X11DesktopScreenDialog(QDialog): def __init__(self, windows_data, parent=None): super().__init__(parent) self.__a_result = asyncio.get_running_loop().create_future() self.setWindowTitle("Please select a window to share:") self.resize(400, 300) self.list_widget = QListWidget(self) for window_data in windows_data: item = QListWidgetItem(window_data["title"]) item.setData(Qt.ItemDataRole.UserRole, window_data) self.list_widget.addItem(item) self.buttonBox = QDialogButtonBox( QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel ) self.buttonBox.accepted.connect(self.on_accepted) self.buttonBox.rejected.connect(self.on_rejected) layout = QVBoxLayout(self) layout.addWidget(self.list_widget) layout.addWidget(self.buttonBox) def get_selected_window(self) -> dict | None: selectedItem = self.list_widget.currentItem() if selectedItem: return selectedItem.data(Qt.ItemDataRole.UserRole) return None def on_accepted(self): self.__a_result.set_result(self.get_selected_window()) self.close() def on_rejected(self): self.__a_result.set_result(None) self.close() def closeEvent(self, a0): super().closeEvent(a0) if not self.__a_result.done(): self.__a_result.set_result(None) async def a_show(self) -> dict | None: self.open() return await self.__a_result class AVCallUI(QMainWindow): def __init__(self, host, icons_path: Path): super().__init__() self.host = host self.webrtc_call = None self.icons_path = icons_path self.initUI() @staticmethod async def run_qt_loop(app): while running: app.sendPostedEvents() await asyncio.sleep(0.1) @classmethod async def run(cls, parent, call_data): """Run PyQt loop and show the app""" media_dir = Path(await parent.host.bridge.config_get("", "media_dir")) icons_path = media_dir / "fonts/fontello/svg" app = QApplication([]) av_call_gui = cls(parent.host, icons_path) av_call_gui.show() webrtc_call = await webrtc.WebRTCCall.make_webrtc_call( parent.host.bridge, parent.profile, call_data, sinks_data=webrtc.SinksApp( local_video_cb=partial(av_call_gui.on_new_sample, video_stream="local"), remote_video_cb=partial(av_call_gui.on_new_sample, video_stream="remote"), ), # we want to be sure that call is ended if user presses `Ctrl + c` or anything # else stops the session. on_call_setup_cb=lambda sid, profile: parent.host.add_on_quit_callback( parent.host.bridge.call_end, sid, "", profile ), on_call_ended_cb=lambda sid, profile: parent.host.a_quit(), ) av_call_gui.webrtc_call = webrtc_call global running running = True await cls.run_qt_loop(app) await parent.host.a_quit() def initUI(self): self.setGeometry(100, 100, 800, 600) self.setWindowTitle("Call") # Main layouts self.background_widget = QWidget(self) self.foreground_widget = QWidget(self) self.setCentralWidget(self.background_widget) back_layout = QVBoxLayout(self.background_widget) front_layout = QVBoxLayout(self.foreground_widget) # Remote video self.remote_video_widget = QLabel(self) self.remote_video_widget.setSizePolicy( QSizePolicy.Policy.Ignored, QSizePolicy.Policy.Ignored ) back_layout.addWidget(self.remote_video_widget) # Fullscreen button fullscreen_layout = QHBoxLayout() front_layout.addLayout(fullscreen_layout) fullscreen_layout.addStretch() self.fullscreen_btn = QPushButton("", self) self.fullscreen_btn.setFixedSize(BUTTON_SIZE) self.fullscreen_icon_normal = QIcon(str(self.icons_path / "resize-full.svg")) self.fullscreen_icon_fullscreen = QIcon(str(self.icons_path / "resize-small.svg")) self.fullscreen_btn.setIcon(self.fullscreen_icon_normal) self.fullscreen_btn.setIconSize(ICON_SIZE) self.fullscreen_btn.clicked.connect(self.toggle_fullscreen) fullscreen_layout.addWidget(self.fullscreen_btn) # Control buttons self.control_buttons_layout = QHBoxLayout() self.control_buttons_layout.setSpacing(40) self.toggle_video_btn = cast( ActivableButton, self.add_control_button("videocam", self.toggle_video) ) self.toggle_audio_btn = cast( ActivableButton, self.add_control_button("volume-up", self.toggle_audio) ) self.share_desktop_btn = cast( ActivableButton, self.add_control_button("desktop", self.share_desktop) ) self.share_desktop_btn.deactivated_colour = "#47c68e" self.share_desktop_btn.activated_colour = "#f24468" self.share_desktop_btn.line_colour = "#666666" self.share_desktop_btn.activated = False self.hang_up_btn = self.add_control_button( "phone", self.hang_up, rotate=135, background="red", activable=False ) controls_widget = QWidget(self) controls_widget.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed) controls_widget.setLayout(self.control_buttons_layout) front_layout.addStretch() bottom_layout = QHBoxLayout() bottom_layout.addStretch() front_layout.addLayout(bottom_layout) bottom_layout.addWidget(controls_widget, alignment=Qt.AlignmentFlag.AlignBottom) # Local video feedback bottom_layout.addStretch() self.local_video_widget = QLabel(self) bottom_layout.addWidget(self.local_video_widget) # we update sizes on resize event self.background_widget.resizeEvent = self.adjust_sizes self.adjust_sizes() def add_control_button( self, icon_name: str, callback: Callable, rotate: float | None = None, background: str | None = None, activable: bool = True, ) -> QPushButton | ActivableButton: if activable: button = ActivableButton("", self) else: button = QPushButton("", self) icon_path = self.icons_path / f"{icon_name}.svg" button.setIcon(QIcon(str(icon_path))) button.setIconSize(ICON_SIZE) button.setFixedSize(BUTTON_SIZE) if rotate is not None: pixmap = button.icon().pixmap(ICON_SIZE) transform = QTransform() transform.rotate(rotate) rotated_pixmap = pixmap.transformed(transform) button.setIcon(QIcon(rotated_pixmap)) if background: button.setStyleSheet(f"background-color: {background};") button.clicked.connect(callback) self.control_buttons_layout.addWidget(button) return button def adjust_sizes(self, a0: QResizeEvent | None = None) -> None: self.foreground_widget.setGeometry( 0, 0, self.background_widget.width(), self.background_widget.height() ) self.local_video_widget.setFixedSize(QSize(self.width() // 3, self.height() // 3)) if a0 is not None: super().resizeEvent(a0) def on_new_sample(self, video_sink, video_stream: str) -> bool: sample = video_sink.emit("pull-sample") if sample is None: return False video_pad = video_sink.get_static_pad("sink") assert video_pad is not None s = video_pad.get_current_caps().get_structure(0) stream_size = (s.get_value("width"), s.get_value("height")) self.host.loop.loop.call_soon_threadsafe( self.update_sample, sample, stream_size, video_stream ) return False def update_sample(self, sample, stream_size, video_stream: str) -> None: if sample is None: return video_widget = ( self.remote_video_widget if video_stream == "remote" else self.local_video_widget ) buf = sample.get_buffer() result, mapinfo = buf.map(Gst.MapFlags.READ) if result: buffer = mapinfo.data width, height = stream_size qimage = QImage(buffer, width, height, QImage.Format.Format_RGB888) pixmap = QPixmap.fromImage(qimage).scaled( QSize(video_widget.width(), video_widget.height()), Qt.AspectRatioMode.KeepAspectRatio, ) video_widget.setPixmap(pixmap) video_widget.setAlignment(Qt.AlignmentFlag.AlignCenter) buf.unmap(mapinfo) def toggle_fullscreen(self): fullscreen = not self.isFullScreen() if fullscreen: self.fullscreen_btn.setIcon(self.fullscreen_icon_fullscreen) self.showFullScreen() else: self.fullscreen_btn.setIcon(self.fullscreen_icon_normal) self.showNormal() def closeEvent(self, a0: QCloseEvent | None) -> None: super().closeEvent(a0) global running running = False def toggle_video(self): assert self.webrtc_call is not None self.webrtc_call.webrtc.video_muted = not self.webrtc_call.webrtc.video_muted self.toggle_video_btn.activated = not self.webrtc_call.webrtc.video_muted def toggle_audio(self): assert self.webrtc_call is not None self.webrtc_call.webrtc.audio_muted = not self.webrtc_call.webrtc.audio_muted self.toggle_audio_btn.activated = not self.webrtc_call.webrtc.audio_muted def share_desktop(self): assert self.webrtc_call is not None if self.webrtc_call.webrtc.desktop_sharing: self.webrtc_call.webrtc.desktop_sharing = False self.share_desktop_btn.activated = False elif display_servers.detect() == display_servers.X11: aio.run_async(self.show_X11_screen_dialog()) else: self.webrtc_call.webrtc.desktop_sharing = True def hang_up(self): self.close() @staticmethod def can_run(): # if a known display server is detected, we should be able to run return display_servers.detect() is not None async def show_X11_screen_dialog(self): assert self.webrtc_call is not None windows_data = display_servers.x11_list_windows() dialog = X11DesktopScreenDialog(windows_data, self) selected = await dialog.a_show() if selected is not None: xid = selected["id"] self.webrtc_call.webrtc.desktop_sharing_data = {"xid": xid} self.webrtc_call.webrtc.desktop_sharing = True self.share_desktop_btn.activated = True