Mercurial > libervia-backend
diff libervia/backend/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py@524856bd7b19 |
children | 26b7ed2817da |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 + +# SàT communication bridge +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import asyncio +from logging import getLogger +from functools import partial +from pathlib import Path +from twisted.spread import pb +from twisted.internet import reactor, defer +from twisted.internet.error import ConnectionRefusedError, ConnectError +from libervia.backend.core import exceptions +from libervia.backend.tools import config +from sat_frontends.bridge.bridge_frontend import BridgeException + +log = getLogger(__name__) + + +class SignalsHandler(pb.Referenceable): + def __getattr__(self, name): + if name.startswith("remote_"): + log.debug("calling an unregistered signal: {name}".format(name=name[7:])) + return lambda *args, **kwargs: None + + else: + raise AttributeError(name) + + def register_signal(self, name, handler, iface="core"): + log.debug("registering signal {name}".format(name=name)) + method_name = "remote_" + name + try: + self.__getattribute__(method_name) + except AttributeError: + pass + else: + raise exceptions.InternalError( + "{name} signal handler has been registered twice".format( + name=method_name + ) + ) + setattr(self, method_name, handler) + + +class bridge(object): + + def __init__(self): + self.signals_handler = SignalsHandler() + + def __getattr__(self, name): + return partial(self.call, name) + + def _generic_errback(self, err): + log.error(f"bridge error: {err}") + + def _errback(self, failure_, ori_errback): + """Convert Failure to BridgeException""" + ori_errback( + BridgeException( + name=failure_.type.decode('utf-8'), + message=str(failure_.value) + ) + ) + + def remote_callback(self, result, callback): + """call callback with argument or None + + if result is not None not argument is used, + else result is used as argument + @param result: remote call result + @param callback(callable): method to call on result + """ + if result is None: + callback() + else: + callback(result) + + def call(self, name, *args, **kwargs): + """call a remote method + + @param name(str): name of the bridge method + @param args(list): arguments + may contain callback and errback as last 2 items + @param kwargs(dict): keyword arguments + may contain callback and errback + """ + callback = errback = None + if kwargs: + try: + callback = kwargs.pop("callback") + except KeyError: + pass + try: + errback = kwargs.pop("errback") + except KeyError: + pass + elif len(args) >= 2 and callable(args[-1]) and callable(args[-2]): + errback = args.pop() + callback = args.pop() + d = self.root.callRemote(name, *args, **kwargs) + if callback is not None: + d.addCallback(self.remote_callback, callback) + if errback is not None: + d.addErrback(errback) + + def _init_bridge_eb(self, failure_): + log.error("Can't init bridge: {msg}".format(msg=failure_)) + return failure_ + + def _set_root(self, root): + """set remote root object + + bridge will then be initialised + """ + self.root = root + d = root.callRemote("initBridge", self.signals_handler) + d.addErrback(self._init_bridge_eb) + return d + + def get_root_object_eb(self, failure_): + """Call errback with appropriate bridge error""" + if failure_.check(ConnectionRefusedError, ConnectError): + raise exceptions.BridgeExceptionNoService + else: + raise failure_ + + def bridge_connect(self, callback, errback): + factory = pb.PBClientFactory() + conf = config.parse_main_conf() + get_conf = partial(config.get_conf, conf, "bridge_pb", "") + conn_type = get_conf("connection_type", "unix_socket") + if conn_type == "unix_socket": + local_dir = Path(config.config_get(conf, "", "local_dir")).resolve() + socket_path = local_dir / "bridge_pb" + reactor.connectUNIX(str(socket_path), factory) + elif conn_type == "socket": + host = get_conf("host", "localhost") + port = int(get_conf("port", 8789)) + reactor.connectTCP(host, port, factory) + else: + raise ValueError(f"Unknown pb connection type: {conn_type!r}") + d = factory.getRootObject() + d.addCallback(self._set_root) + if callback is not None: + d.addCallback(lambda __: callback()) + d.addErrback(self.get_root_object_eb) + if errback is not None: + d.addErrback(lambda failure_: errback(failure_.value)) + return d + + def register_signal(self, functionName, handler, iface="core"): + self.signals_handler.register_signal(functionName, handler, iface) + + +##METHODS_PART## + +class AIOSignalsHandler(SignalsHandler): + + def register_signal(self, name, handler, iface="core"): + async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture( + asyncio.ensure_future(handler(*args, **kwargs))) + return super().register_signal(name, async_handler, iface) + + +class AIOBridge(bridge): + + def __init__(self): + self.signals_handler = AIOSignalsHandler() + + def _errback(self, failure_): + """Convert Failure to BridgeException""" + raise BridgeException( + name=failure_.type.decode('utf-8'), + message=str(failure_.value) + ) + + def call(self, name, *args, **kwargs): + d = self.root.callRemote(name, *args, *kwargs) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + async def bridge_connect(self): + d = super().bridge_connect(callback=None, errback=None) + return await d.asFuture(asyncio.get_event_loop()) + +##ASYNC_METHODS_PART##