view libervia/frontends/tools/webrtc_screenshare.py @ 4233:d01b8d002619

cli (call, file), frontends: implement webRTC data channel transfer: - file send/receive commands now supports webRTC transfer. In `send` command, the `--webrtc` flags is currenty used to activate it. - WebRTC related code have been factorized and moved to `libervia.frontends.tools.webrtc*` modules. rel 442
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 13:43:09 +0200
parents
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia WebRTC implementation
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from libervia.backend.core import exceptions

import asyncio
import logging
from random import randint


log = logging.getLogger(__name__)


SOURCES_AUTO = "auto"
SOURCES_TEST = "test"
SOURCES_DATACHANNEL = "datachannel"
SINKS_APP = "app"
SINKS_AUTO = "auto"
SINKS_TEST = "test"
SINKS_DATACHANNEL = "datachannel"


class ScreenshareError(Exception):
    pass


class DesktopPortal:

    def __init__(self, webrtc):
        import dbus
        from dbus.mainloop.glib import DBusGMainLoop
        # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes
        self.dbus = dbus
        self.webrtc = webrtc
        self.sources_type = dbus.UInt32(7)
        DBusGMainLoop(set_as_default=True)
        self.session_bus = dbus.SessionBus()
        portal_object = self.session_bus.get_object(
            'org.freedesktop.portal.Desktop',
            '/org/freedesktop/portal/desktop'
        )
        self.screencast_interface = dbus.Interface(
            portal_object,
            'org.freedesktop.portal.ScreenCast'
        )
        self.session_interface = None
        self.session_signal = None
        self.handle_counter = 0
        self.session_handle = None
        self.stream_data: dict|None = None

    @property
    def handle_token(self):
        self.handle_counter += 1
        return f"libervia{self.handle_counter}"

    def on_session_closed(self, details: dict) -> None:
        if self.session_interface is not None:
            self.session_interface = None
            self.webrtc.desktop_sharing = False
            if self.session_signal is not None:
                self.session_signal.remove()
                self.session_signal = None


    async def dbus_call(self, method_name: str, *args) -> dict:
        """Call a screenshare portal method

        This method handle the signal response.
        @param method_name: method to call
        @param args: extra args
            `handle_token` will be automatically added to the last arg (option dict)
        @return: method result
        """
        if self.session_handle is not None:
            self.end_screenshare()
        method = getattr(self.screencast_interface, method_name)
        options = args[-1]
        reply_fut = asyncio.Future()
        signal_fut = asyncio.Future()
        # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html
        handle_token = self.handle_token
        sender = self.session_bus.get_unique_name().replace(".", "_")[1:]
        path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}"
        signal_match = None

        def on_signal(response, results):
            assert signal_match is not None
            signal_match.remove()
            if response == 0:
                signal_fut.set_result(results)
            elif response == 1:
                signal_fut.set_exception(
                    exceptions.CancelError("Cancelled by user.")
                )
            else:
                signal_fut.set_exception(ScreenshareError(
                    f"Can't get signal result"
                ))

        signal_match = self.session_bus.add_signal_receiver(
            on_signal,
            signal_name="Response",
            dbus_interface="org.freedesktop.portal.Request",
            path=path
        )

        options["handle_token"] = handle_token

        method(
            *args,
            reply_handler=reply_fut.set_result,
            error_handler=reply_fut.set_exception
        )
        try:
            await reply_fut
        except Exception as e:
            raise ScreenshareError(f"Can't ask screenshare permission: {e}")
        return await signal_fut

    async def request_screenshare(self) -> dict:
        session_data = await self.dbus_call(
            "CreateSession",
            {
                "session_handle_token": str(randint(1, 2**32)),
            }
        )
        try:
            session_handle = session_data["session_handle"]
        except KeyError:
            raise ScreenshareError("Can't get session handle")
        self.session_handle = session_handle


        await self.dbus_call(
            "SelectSources",
            session_handle,
            {
                "multiple": True,
                "types": self.sources_type,
                "modal": True
            }
        )
        screenshare_data = await self.dbus_call(
            "Start",
            session_handle,
            "",
            {}
        )

        session_object = self.session_bus.get_object(
            'org.freedesktop.portal.Desktop',
            session_handle
        )
        self.session_interface = self.dbus.Interface(
            session_object,
            'org.freedesktop.portal.Session'
        )

        self.session_signal = self.session_bus.add_signal_receiver(
            self.on_session_closed,
            signal_name="Closed",
            dbus_interface="org.freedesktop.portal.Session",
            path=session_handle
        )

        try:
            node_id, stream_data = screenshare_data["streams"][0]
            source_type = int(stream_data["source_type"])
        except (IndexError, KeyError):
            raise ScreenshareError("Can't parse stream data")
        self.stream_data = stream_data = {
            "session_handle": session_handle,
            "node_id": node_id,
            "source_type": source_type
        }
        try:
            height = int(stream_data["size"][0])
            weight = int(stream_data["size"][1])
        except (IndexError, KeyError):
            pass
        else:
            stream_data["size"] = (height, weight)

        return self.stream_data

    def end_screenshare(self) -> None:
        """Close a running screenshare session, if any."""
        if self.session_interface is None:
            return
        self.session_interface.Close()
        self.on_session_closed({})