Mercurial > libervia-backend
comparison sat/bridge/bridge_constructor/constructors/pb/pb_frontend_template.py @ 3039:a1bc34f90fa5
bridge (pb): implemented an asyncio compatible bridge:
`pb` bridge can now be used with asyncio by instantiating AIOBridge.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:53:38 +0200 |
parents | ab2696e34d29 |
children | 84bb63e1e4c4 |
comparison
equal
deleted
inserted
replaced
3038:5f3068915686 | 3039:a1bc34f90fa5 |
---|---|
15 # GNU Affero General Public License for more details. | 15 # GNU Affero General Public License for more details. |
16 | 16 |
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat.core.log import getLogger | 20 import asyncio |
21 from functools import partial | |
22 from twisted.spread import pb | |
23 from twisted.internet import reactor, defer | |
24 from twisted.internet.error import ConnectionRefusedError | |
25 from logging import getLogger | |
26 from sat.core import exceptions | |
27 from sat_frontends.bridge.bridge_frontend import BridgeException | |
21 | 28 |
22 log = getLogger(__name__) | 29 log = getLogger(__name__) |
23 from sat.core import exceptions | |
24 from twisted.spread import pb | |
25 from twisted.internet import reactor | |
26 | 30 |
27 | 31 |
28 class SignalsHandler(pb.Referenceable): | 32 class SignalsHandler(pb.Referenceable): |
29 def __getattr__(self, name): | 33 def __getattr__(self, name): |
30 if name.startswith("remote_"): | 34 if name.startswith("remote_"): |
36 | 40 |
37 def register_signal(self, name, handler, iface="core"): | 41 def register_signal(self, name, handler, iface="core"): |
38 log.debug("registering signal {name}".format(name=name)) | 42 log.debug("registering signal {name}".format(name=name)) |
39 method_name = "remote_" + name | 43 method_name = "remote_" + name |
40 try: | 44 try: |
41 self.__getattribute__(self, method_name) | 45 self.__getattribute__(method_name) |
42 except AttributeError: | 46 except AttributeError: |
43 pass | 47 pass |
44 else: | 48 else: |
45 raise exceptions.InternalError( | 49 raise exceptions.InternalError( |
46 "{name} signal handler has been registered twice".format( | 50 "{name} signal handler has been registered twice".format( |
49 ) | 53 ) |
50 setattr(self, method_name, handler) | 54 setattr(self, method_name, handler) |
51 | 55 |
52 | 56 |
53 class Bridge(object): | 57 class Bridge(object): |
58 | |
54 def __init__(self): | 59 def __init__(self): |
55 self.signals_handler = SignalsHandler() | 60 self.signals_handler = SignalsHandler() |
56 | 61 |
57 def __getattr__(self, name): | 62 def __getattr__(self, name): |
58 return lambda *args, **kwargs: self.call(name, args, kwargs) | 63 return partial(self.call, name) |
59 | 64 |
60 def remoteCallback(self, result, callback): | 65 def remoteCallback(self, result, callback): |
61 """call callback with argument or None | 66 """call callback with argument or None |
62 | 67 |
63 if result is not None not argument is used, | 68 if result is not None not argument is used, |
68 if result is None: | 73 if result is None: |
69 callback() | 74 callback() |
70 else: | 75 else: |
71 callback(result) | 76 callback(result) |
72 | 77 |
73 def call(self, name, args, kwargs): | 78 def call(self, name, *args, **kwargs): |
74 """call a remote method | 79 """call a remote method |
75 | 80 |
76 @param name(str): name of the bridge method | 81 @param name(str): name of the bridge method |
77 @param args(list): arguments | 82 @param args(list): arguments |
78 may contain callback and errback as last 2 items | 83 may contain callback and errback as last 2 items |
96 if callback is not None: | 101 if callback is not None: |
97 d.addCallback(self.remoteCallback, callback) | 102 d.addCallback(self.remoteCallback, callback) |
98 if errback is not None: | 103 if errback is not None: |
99 d.addErrback(errback) | 104 d.addErrback(errback) |
100 | 105 |
101 def _initBridgeEb(self, failure): | 106 def _initBridgeEb(self, failure_): |
102 log.error("Can't init bridge: {msg}".format(msg=failure)) | 107 log.error("Can't init bridge: {msg}".format(msg=failure_)) |
108 return failure_ | |
103 | 109 |
104 def _set_root(self, root): | 110 def _set_root(self, root): |
105 """set remote root object | 111 """set remote root object |
106 | 112 |
107 bridge will then be initialised | 113 bridge will then be initialised |
109 self.root = root | 115 self.root = root |
110 d = root.callRemote("initBridge", self.signals_handler) | 116 d = root.callRemote("initBridge", self.signals_handler) |
111 d.addErrback(self._initBridgeEb) | 117 d.addErrback(self._initBridgeEb) |
112 return d | 118 return d |
113 | 119 |
114 def _generic_errback(self, failure): | 120 def getRootObjectEb(self, failure_): |
115 log.error("bridge failure: {}".format(failure)) | 121 """Call errback with appropriate bridge error""" |
122 if failure_.check(ConnectionRefusedError): | |
123 raise exceptions.BridgeExceptionNoService | |
124 else: | |
125 raise failure_ | |
116 | 126 |
117 def bridgeConnect(self, callback, errback): | 127 def bridgeConnect(self, callback, errback): |
118 factory = pb.PBClientFactory() | 128 factory = pb.PBClientFactory() |
119 reactor.connectTCP("localhost", 8789, factory) | 129 reactor.connectTCP("localhost", 8789, factory) |
120 d = factory.getRootObject() | 130 d = factory.getRootObject() |
121 d.addCallback(self._set_root) | 131 d.addCallback(self._set_root) |
122 d.addCallback(lambda __: callback()) | 132 if callback is not None: |
123 d.addErrback(errback) | 133 d.addCallback(lambda __: callback()) |
134 d.addErrback(self.getRootObjectEb) | |
135 if errback is not None: | |
136 d.addErrback(lambda failure_: errback(failure_.value)) | |
137 return d | |
124 | 138 |
125 def register_signal(self, functionName, handler, iface="core"): | 139 def register_signal(self, functionName, handler, iface="core"): |
126 self.signals_handler.register_signal(functionName, handler, iface) | 140 self.signals_handler.register_signal(functionName, handler, iface) |
127 | 141 |
128 | 142 |
129 ##METHODS_PART## | 143 ##METHODS_PART## |
144 | |
145 class AIOSignalsHandler(SignalsHandler): | |
146 | |
147 def register_signal(self, name, handler, iface="core"): | |
148 async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture( | |
149 asyncio.ensure_future(handler(*args, **kwargs))) | |
150 return super().register_signal(name, async_handler, iface) | |
151 | |
152 | |
153 class AIOBridge(Bridge): | |
154 | |
155 def __init__(self): | |
156 self.signals_handler = AIOSignalsHandler() | |
157 | |
158 def _errback(self, failure_): | |
159 raise BridgeException( | |
160 name=failure_.type.decode('utf-8'), | |
161 message=str(failure_.value) | |
162 ) | |
163 | |
164 def call(self, name, *args, **kwargs): | |
165 d = self.root.callRemote(name, *args, *kwargs) | |
166 d.addErrback(self._errback) | |
167 return d.asFuture(asyncio.get_event_loop()) | |
168 | |
169 async def bridgeConnect(self): | |
170 d = super().bridgeConnect(callback=None, errback=None) | |
171 return await d.asFuture(asyncio.get_event_loop()) | |
172 | |
173 ##ASYNC_METHODS_PART## |