Mercurial > libervia-backend
diff sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py @ 3039:a1bc34f90fa5
bridge (pb): implemented an asyncio compatible bridge:
`pb` bridge can now be used with asyncio by instantiating AIOBridge.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:53:38 +0200 |
parents | ab2696e34d29 |
children | 84bb63e1e4c4 |
line wrap: on
line diff
--- a/sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py Wed Sep 25 08:41:36 2019 +0200 +++ b/sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py Wed Sep 25 08:53:38 2019 +0200 @@ -17,12 +17,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from sat.core.log import getLogger +import asyncio +from functools import partial +from twisted.spread import pb +from twisted.internet import reactor, defer +from twisted.internet.error import ConnectionRefusedError +from logging import getLogger +from sat.core import exceptions +from sat_frontends.bridge.bridge_frontend import BridgeException log = getLogger(__name__) -from sat.core import exceptions -from twisted.spread import pb -from twisted.internet import reactor class SignalsHandler(pb.Referenceable): @@ -38,7 +42,7 @@ log.debug("registering signal {name}".format(name=name)) method_name = "remote_" + name try: - self.__getattribute__(self, method_name) + self.__getattribute__(method_name) except AttributeError: pass else: @@ -51,11 +55,12 @@ class Bridge(object): + def __init__(self): self.signals_handler = SignalsHandler() def __getattr__(self, name): - return lambda *args, **kwargs: self.call(name, args, kwargs) + return partial(self.call, name) def remoteCallback(self, result, callback): """call callback with argument or None @@ -70,7 +75,7 @@ else: callback(result) - def call(self, name, args, kwargs): + def call(self, name, *args, **kwargs): """call a remote method @param name(str): name of the bridge method @@ -98,8 +103,9 @@ if errback is not None: d.addErrback(errback) - def _initBridgeEb(self, failure): - log.error("Can't init bridge: {msg}".format(msg=failure)) + def _initBridgeEb(self, failure_): + log.error("Can't init bridge: {msg}".format(msg=failure_)) + return failure_ def _set_root(self, root): """set remote root object @@ -111,19 +117,57 @@ d.addErrback(self._initBridgeEb) return d - def _generic_errback(self, failure): - log.error("bridge failure: {}".format(failure)) + def getRootObjectEb(self, failure_): + """Call errback with appropriate bridge error""" + if failure_.check(ConnectionRefusedError): + raise exceptions.BridgeExceptionNoService + else: + raise failure_ def bridgeConnect(self, callback, errback): factory = pb.PBClientFactory() reactor.connectTCP("localhost", 8789, factory) d = factory.getRootObject() d.addCallback(self._set_root) - d.addCallback(lambda __: callback()) - d.addErrback(errback) + if callback is not None: + d.addCallback(lambda __: callback()) + d.addErrback(self.getRootObjectEb) + if errback is not None: + d.addErrback(lambda failure_: errback(failure_.value)) + return d def register_signal(self, functionName, handler, iface="core"): self.signals_handler.register_signal(functionName, handler, iface) ##METHODS_PART## + +class AIOSignalsHandler(SignalsHandler): + + def register_signal(self, name, handler, iface="core"): + async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture( + asyncio.ensure_future(handler(*args, **kwargs))) + return super().register_signal(name, async_handler, iface) + + +class AIOBridge(Bridge): + + def __init__(self): + self.signals_handler = AIOSignalsHandler() + + def _errback(self, failure_): + raise BridgeException( + name=failure_.type.decode('utf-8'), + message=str(failure_.value) + ) + + def call(self, name, *args, **kwargs): + d = self.root.callRemote(name, *args, *kwargs) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + async def bridgeConnect(self): + d = super().bridgeConnect(callback=None, errback=None) + return await d.asFuture(asyncio.get_event_loop()) + +##ASYNC_METHODS_PART##