view libervia/backend/bridge/bridge_constructor/constructors/pb/ @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <>
date Sat, 11 May 2024 13:52:41 +0200
parents 26b7ed2817da
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# SàT communication bridge
# Copyright (C) 2009-2021 Jérôme Poisson (

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.

import asyncio
from logging import getLogger
from functools import partial
from pathlib import Path
from twisted.spread import pb
from twisted.internet import reactor, defer
from twisted.internet.error import ConnectionRefusedError, ConnectError
from libervia.backend.core import exceptions
from import config
from libervia.frontends.bridge.bridge_frontend import BridgeException

log = getLogger(__name__)

class SignalsHandler(pb.Referenceable):
    def __getattr__(self, name):
        if name.startswith("remote_"):
            log.debug("calling an unregistered signal: {name}".format(name=name[7:]))
            return lambda *args, **kwargs: None

            raise AttributeError(name)

    def register_signal(self, name, handler, iface="core"):
        log.debug("registering signal {name}".format(name=name))
        method_name = "remote_" + name
        except AttributeError:
            raise exceptions.InternalError(
                "{name} signal handler has been registered twice".format(
        setattr(self, method_name, handler)

class bridge(object):

    def __init__(self):
        self.signals_handler = SignalsHandler()

    def __getattr__(self, name):
        return partial(, name)

    def _generic_errback(self, err):
        log.error(f"bridge error: {err}")

    def _errback(self, failure_, ori_errback):
        """Convert Failure to BridgeException"""

    def remote_callback(self, result, callback):
        """call callback with argument or None

        if result is not None not argument is used,
        else result is used as argument
        @param result: remote call result
        @param callback(callable): method to call on result
        if result is None:

    def call(self, name, *args, **kwargs):
        """call a remote method

        @param name(str): name of the bridge method
        @param args(list): arguments
            may contain callback and errback as last 2 items
        @param kwargs(dict): keyword arguments
            may contain callback and errback
        callback = errback = None
        if kwargs:
                callback = kwargs.pop("callback")
            except KeyError:
                errback = kwargs.pop("errback")
            except KeyError:
        elif len(args) >= 2 and callable(args[-1]) and callable(args[-2]):
            errback = args.pop()
            callback = args.pop()
        d = self.root.callRemote(name, *args, **kwargs)
        if callback is not None:
            d.addCallback(self.remote_callback, callback)
        if errback is not None:

    def _init_bridge_eb(self, failure_):
        log.error("Can't init bridge: {msg}".format(msg=failure_))
        return failure_

    def _set_root(self, root):
        """set remote root object

        bridge will then be initialised
        self.root = root
        d = root.callRemote("initBridge", self.signals_handler)
        return d

    def get_root_object_eb(self, failure_):
        """Call errback with appropriate bridge error"""
        if failure_.check(ConnectionRefusedError, ConnectError):
            raise exceptions.BridgeExceptionNoService
            raise failure_

    def bridge_connect(self, callback, errback):
        factory = pb.PBClientFactory()
        conf = config.parse_main_conf()
        get_conf = partial(config.get_conf, conf, "bridge_pb", "")
        conn_type = get_conf("connection_type", "unix_socket")
        if conn_type == "unix_socket":
            local_dir = Path(config.config_get(conf, "", "local_dir")).resolve()
            socket_path = local_dir / "bridge_pb"
            reactor.connectUNIX(str(socket_path), factory)
        elif conn_type == "socket":
            host = get_conf("host", "localhost")
            port = int(get_conf("port", 8789))
            reactor.connectTCP(host, port, factory)
            raise ValueError(f"Unknown pb connection type: {conn_type!r}")
        d = factory.getRootObject()
        if callback is not None:
            d.addCallback(lambda __: callback())
        if errback is not None:
            d.addErrback(lambda failure_: errback(failure_.value))
        return d

    def register_signal(self, functionName, handler, iface="core"):
        self.signals_handler.register_signal(functionName, handler, iface)


class AIOSignalsHandler(SignalsHandler):

    def register_signal(self, name, handler, iface="core"):
        async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture(
            asyncio.ensure_future(handler(*args, **kwargs)))
        return super().register_signal(name, async_handler, iface)

class AIOBridge(bridge):

    def __init__(self):
        self.signals_handler = AIOSignalsHandler()

    def _errback(self, failure_):
        """Convert Failure to BridgeException"""
        raise BridgeException(

    def call(self, name, *args, **kwargs):
        d = self.root.callRemote(name, *args, *kwargs)
        return d.asFuture(asyncio.get_event_loop())

    async def bridge_connect(self):
        d = super().bridge_connect(callback=None, errback=None)
        return await d.asFuture(asyncio.get_event_loop())