comparison sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py @ 3039:a1bc34f90fa5

bridge (pb): implemented an asyncio compatible bridge: `pb` bridge can now be used with asyncio by instantiating AIOBridge.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:53:38 +0200
parents ab2696e34d29
children 84bb63e1e4c4
comparison
equal deleted inserted replaced
3038:5f3068915686 3039:a1bc34f90fa5
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.log import getLogger 20 import asyncio
21 from functools import partial
22 from twisted.spread import pb
23 from twisted.internet import reactor, defer
24 from twisted.internet.error import ConnectionRefusedError
25 from logging import getLogger
26 from sat.core import exceptions
27 from sat_frontends.bridge.bridge_frontend import BridgeException
21 28
22 log = getLogger(__name__) 29 log = getLogger(__name__)
23 from sat.core import exceptions
24 from twisted.spread import pb
25 from twisted.internet import reactor
26 30
27 31
28 class SignalsHandler(pb.Referenceable): 32 class SignalsHandler(pb.Referenceable):
29 def __getattr__(self, name): 33 def __getattr__(self, name):
30 if name.startswith("remote_"): 34 if name.startswith("remote_"):
36 40
37 def register_signal(self, name, handler, iface="core"): 41 def register_signal(self, name, handler, iface="core"):
38 log.debug("registering signal {name}".format(name=name)) 42 log.debug("registering signal {name}".format(name=name))
39 method_name = "remote_" + name 43 method_name = "remote_" + name
40 try: 44 try:
41 self.__getattribute__(self, method_name) 45 self.__getattribute__(method_name)
42 except AttributeError: 46 except AttributeError:
43 pass 47 pass
44 else: 48 else:
45 raise exceptions.InternalError( 49 raise exceptions.InternalError(
46 "{name} signal handler has been registered twice".format( 50 "{name} signal handler has been registered twice".format(
49 ) 53 )
50 setattr(self, method_name, handler) 54 setattr(self, method_name, handler)
51 55
52 56
53 class Bridge(object): 57 class Bridge(object):
58
54 def __init__(self): 59 def __init__(self):
55 self.signals_handler = SignalsHandler() 60 self.signals_handler = SignalsHandler()
56 61
57 def __getattr__(self, name): 62 def __getattr__(self, name):
58 return lambda *args, **kwargs: self.call(name, args, kwargs) 63 return partial(self.call, name)
59 64
60 def remoteCallback(self, result, callback): 65 def remoteCallback(self, result, callback):
61 """call callback with argument or None 66 """call callback with argument or None
62 67
63 if result is not None not argument is used, 68 if result is not None not argument is used,
68 if result is None: 73 if result is None:
69 callback() 74 callback()
70 else: 75 else:
71 callback(result) 76 callback(result)
72 77
73 def call(self, name, args, kwargs): 78 def call(self, name, *args, **kwargs):
74 """call a remote method 79 """call a remote method
75 80
76 @param name(str): name of the bridge method 81 @param name(str): name of the bridge method
77 @param args(list): arguments 82 @param args(list): arguments
78 may contain callback and errback as last 2 items 83 may contain callback and errback as last 2 items
96 if callback is not None: 101 if callback is not None:
97 d.addCallback(self.remoteCallback, callback) 102 d.addCallback(self.remoteCallback, callback)
98 if errback is not None: 103 if errback is not None:
99 d.addErrback(errback) 104 d.addErrback(errback)
100 105
101 def _initBridgeEb(self, failure): 106 def _initBridgeEb(self, failure_):
102 log.error("Can't init bridge: {msg}".format(msg=failure)) 107 log.error("Can't init bridge: {msg}".format(msg=failure_))
108 return failure_
103 109
104 def _set_root(self, root): 110 def _set_root(self, root):
105 """set remote root object 111 """set remote root object
106 112
107 bridge will then be initialised 113 bridge will then be initialised
109 self.root = root 115 self.root = root
110 d = root.callRemote("initBridge", self.signals_handler) 116 d = root.callRemote("initBridge", self.signals_handler)
111 d.addErrback(self._initBridgeEb) 117 d.addErrback(self._initBridgeEb)
112 return d 118 return d
113 119
114 def _generic_errback(self, failure): 120 def getRootObjectEb(self, failure_):
115 log.error("bridge failure: {}".format(failure)) 121 """Call errback with appropriate bridge error"""
122 if failure_.check(ConnectionRefusedError):
123 raise exceptions.BridgeExceptionNoService
124 else:
125 raise failure_
116 126
117 def bridgeConnect(self, callback, errback): 127 def bridgeConnect(self, callback, errback):
118 factory = pb.PBClientFactory() 128 factory = pb.PBClientFactory()
119 reactor.connectTCP("localhost", 8789, factory) 129 reactor.connectTCP("localhost", 8789, factory)
120 d = factory.getRootObject() 130 d = factory.getRootObject()
121 d.addCallback(self._set_root) 131 d.addCallback(self._set_root)
122 d.addCallback(lambda __: callback()) 132 if callback is not None:
123 d.addErrback(errback) 133 d.addCallback(lambda __: callback())
134 d.addErrback(self.getRootObjectEb)
135 if errback is not None:
136 d.addErrback(lambda failure_: errback(failure_.value))
137 return d
124 138
125 def register_signal(self, functionName, handler, iface="core"): 139 def register_signal(self, functionName, handler, iface="core"):
126 self.signals_handler.register_signal(functionName, handler, iface) 140 self.signals_handler.register_signal(functionName, handler, iface)
127 141
128 142
129 ##METHODS_PART## 143 ##METHODS_PART##
144
145 class AIOSignalsHandler(SignalsHandler):
146
147 def register_signal(self, name, handler, iface="core"):
148 async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture(
149 asyncio.ensure_future(handler(*args, **kwargs)))
150 return super().register_signal(name, async_handler, iface)
151
152
153 class AIOBridge(Bridge):
154
155 def __init__(self):
156 self.signals_handler = AIOSignalsHandler()
157
158 def _errback(self, failure_):
159 raise BridgeException(
160 name=failure_.type.decode('utf-8'),
161 message=str(failure_.value)
162 )
163
164 def call(self, name, *args, **kwargs):
165 d = self.root.callRemote(name, *args, *kwargs)
166 d.addErrback(self._errback)
167 return d.asFuture(asyncio.get_event_loop())
168
169 async def bridgeConnect(self):
170 d = super().bridgeConnect(callback=None, errback=None)
171 return await d.asFuture(asyncio.get_event_loop())
172
173 ##ASYNC_METHODS_PART##