diff sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py @ 3039:a1bc34f90fa5

bridge (pb): implemented an asyncio compatible bridge: `pb` bridge can now be used with asyncio by instantiating AIOBridge.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:53:38 +0200
parents ab2696e34d29
children 84bb63e1e4c4
line wrap: on
line diff
--- a/sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py	Wed Sep 25 08:41:36 2019 +0200
+++ b/sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py	Wed Sep 25 08:53:38 2019 +0200
@@ -17,12 +17,16 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from sat.core.log import getLogger
+import asyncio
+from functools import partial
+from twisted.spread import pb
+from twisted.internet import reactor, defer
+from twisted.internet.error import ConnectionRefusedError
+from logging import getLogger
+from sat.core import exceptions
+from sat_frontends.bridge.bridge_frontend import BridgeException
 
 log = getLogger(__name__)
-from sat.core import exceptions
-from twisted.spread import pb
-from twisted.internet import reactor
 
 
 class SignalsHandler(pb.Referenceable):
@@ -38,7 +42,7 @@
         log.debug("registering signal {name}".format(name=name))
         method_name = "remote_" + name
         try:
-            self.__getattribute__(self, method_name)
+            self.__getattribute__(method_name)
         except AttributeError:
             pass
         else:
@@ -51,11 +55,12 @@
 
 
 class Bridge(object):
+
     def __init__(self):
         self.signals_handler = SignalsHandler()
 
     def __getattr__(self, name):
-        return lambda *args, **kwargs: self.call(name, args, kwargs)
+        return partial(self.call, name)
 
     def remoteCallback(self, result, callback):
         """call callback with argument or None
@@ -70,7 +75,7 @@
         else:
             callback(result)
 
-    def call(self, name, args, kwargs):
+    def call(self, name, *args, **kwargs):
         """call a remote method
 
         @param name(str): name of the bridge method
@@ -98,8 +103,9 @@
         if errback is not None:
             d.addErrback(errback)
 
-    def _initBridgeEb(self, failure):
-        log.error("Can't init bridge: {msg}".format(msg=failure))
+    def _initBridgeEb(self, failure_):
+        log.error("Can't init bridge: {msg}".format(msg=failure_))
+        return failure_
 
     def _set_root(self, root):
         """set remote root object
@@ -111,19 +117,57 @@
         d.addErrback(self._initBridgeEb)
         return d
 
-    def _generic_errback(self, failure):
-        log.error("bridge failure: {}".format(failure))
+    def getRootObjectEb(self, failure_):
+        """Call errback with appropriate bridge error"""
+        if failure_.check(ConnectionRefusedError):
+            raise exceptions.BridgeExceptionNoService
+        else:
+            raise failure_
 
     def bridgeConnect(self, callback, errback):
         factory = pb.PBClientFactory()
         reactor.connectTCP("localhost", 8789, factory)
         d = factory.getRootObject()
         d.addCallback(self._set_root)
-        d.addCallback(lambda __: callback())
-        d.addErrback(errback)
+        if callback is not None:
+            d.addCallback(lambda __: callback())
+        d.addErrback(self.getRootObjectEb)
+        if errback is not None:
+            d.addErrback(lambda failure_: errback(failure_.value))
+        return d
 
     def register_signal(self, functionName, handler, iface="core"):
         self.signals_handler.register_signal(functionName, handler, iface)
 
 
 ##METHODS_PART##
+
+class AIOSignalsHandler(SignalsHandler):
+
+    def register_signal(self, name, handler, iface="core"):
+        async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture(
+            asyncio.ensure_future(handler(*args, **kwargs)))
+        return super().register_signal(name, async_handler, iface)
+
+
+class AIOBridge(Bridge):
+
+    def __init__(self):
+        self.signals_handler = AIOSignalsHandler()
+
+    def _errback(self, failure_):
+        raise BridgeException(
+            name=failure_.type.decode('utf-8'),
+            message=str(failure_.value)
+            )
+
+    def call(self, name, *args, **kwargs):
+        d = self.root.callRemote(name, *args, *kwargs)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    async def bridgeConnect(self):
+        d = super().bridgeConnect(callback=None, errback=None)
+        return await d.asFuture(asyncio.get_event_loop())
+
+##ASYNC_METHODS_PART##