comparison sat/bridge/pb.py @ 3039:a1bc34f90fa5

bridge (pb): implemented an asyncio compatible bridge: `pb` bridge can now be used with asyncio by instantiating AIOBridge.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:53:38 +0200
parents ab2696e34d29
children 9d0df638c8b4
comparison
equal deleted inserted replaced
3038:5f3068915686 3039:a1bc34f90fa5
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 20
21 import dataclasses
21 from sat.core.log import getLogger 22 from sat.core.log import getLogger
22
23 from twisted.spread import jelly, pb 23 from twisted.spread import jelly, pb
24 from twisted.internet import reactor 24 from twisted.internet import reactor
25
25 log = getLogger(__name__) 26 log = getLogger(__name__)
26 27
27 28
28 ## jelly hack 29 ## jelly hack
29 # we monkey patch jelly to handle namedtuple 30 # we monkey patch jelly to handle namedtuple
38 39
39 40
40 jelly._Jellier.jelly = fixed_jelly 41 jelly._Jellier.jelly = fixed_jelly
41 42
42 43
44 @dataclasses.dataclass(eq=False)
45 class HandlerWrapper:
46 # we use a wrapper to keep signals handlers because RemoteReference doesn't support
47 # comparison (other than equality), making it unusable with a list
48 handler: pb.RemoteReference
49
50
43 class PBRoot(pb.Root): 51 class PBRoot(pb.Root):
44 def __init__(self): 52 def __init__(self):
45 self.signals_handlers = [] 53 self.signals_handlers = []
46 54
47 def remote_initBridge(self, signals_handler): 55 def remote_initBridge(self, signals_handler):
48 self.signals_handlers.append(signals_handler) 56 self.signals_handlers.append(HandlerWrapper(signals_handler))
49 log.info("registered signal handler") 57 log.info("registered signal handler")
50 58
51 def sendSignalEb(self, failure, signal_name): 59 def sendSignalEb(self, failure_, signal_name):
52 log.error( 60 if not failure_.check(pb.PBConnectionLost):
53 "Error while sending signal {name}: {msg}".format( 61 log.error(
54 name=signal_name, msg=failure 62 f"Error while sending signal {signal_name}: {failure_}",
55 ) 63 )
56 )
57 64
58 def sendSignal(self, name, args, kwargs): 65 def sendSignal(self, name, args, kwargs):
59 to_remove = [] 66 to_remove = []
60 for handler in self.signals_handlers: 67 for wrapper in self.signals_handlers:
68 handler = wrapper.handler
61 try: 69 try:
62 d = handler.callRemote(name, *args, **kwargs) 70 d = handler.callRemote(name, *args, **kwargs)
63 except pb.DeadReferenceError: 71 except pb.DeadReferenceError:
64 to_remove.append(handler) 72 to_remove.append(wrapper)
65 else: 73 else:
66 d.addErrback(self.sendSignalEb, name) 74 d.addErrback(self.sendSignalEb, name)
67 if to_remove: 75 if to_remove:
68 for handler in to_remove: 76 for wrapper in to_remove:
69 log.debug("Removing signal handler for dead frontend") 77 log.debug("Removing signal handler for dead frontend")
70 self.signals_handlers.remove(handler) 78 self.signals_handlers.remove(wrapper)
71 79
72 def _bridgeDeactivateSignals(self): 80 def _bridgeDeactivateSignals(self):
73 if hasattr(self, "signals_paused"): 81 if hasattr(self, "signals_paused"):
74 log.warning("bridge signals already deactivated") 82 log.warning("bridge signals already deactivated")
75 if self.signals_handler: 83 if self.signals_handler: