comparison libervia/backend/bridge/pb.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/bridge/pb.py@524856bd7b19
children 02f0adc745c6
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
21 import dataclasses
22 from functools import partial
23 from pathlib import Path
24 from twisted.spread import jelly, pb
25 from twisted.internet import reactor
26 from libervia.backend.core.log import getLogger
27 from libervia.backend.tools import config
28
29 log = getLogger(__name__)
30
31
32 ## jelly hack
33 # we monkey patch jelly to handle namedtuple
34 ori_jelly = jelly._Jellier.jelly
35
36
37 def fixed_jelly(self, obj):
38 """this method fix handling of namedtuple"""
39 if isinstance(obj, tuple) and not obj is tuple:
40 obj = tuple(obj)
41 return ori_jelly(self, obj)
42
43
44 jelly._Jellier.jelly = fixed_jelly
45
46
47 @dataclasses.dataclass(eq=False)
48 class HandlerWrapper:
49 # we use a wrapper to keep signals handlers because RemoteReference doesn't support
50 # comparison (other than equality), making it unusable with a list
51 handler: pb.RemoteReference
52
53
54 class PBRoot(pb.Root):
55 def __init__(self):
56 self.signals_handlers = []
57
58 def remote_init_bridge(self, signals_handler):
59 self.signals_handlers.append(HandlerWrapper(signals_handler))
60 log.info("registered signal handler")
61
62 def send_signal_eb(self, failure_, signal_name):
63 if not failure_.check(pb.PBConnectionLost):
64 log.error(
65 f"Error while sending signal {signal_name}: {failure_}",
66 )
67
68 def send_signal(self, name, args, kwargs):
69 to_remove = []
70 for wrapper in self.signals_handlers:
71 handler = wrapper.handler
72 try:
73 d = handler.callRemote(name, *args, **kwargs)
74 except pb.DeadReferenceError:
75 to_remove.append(wrapper)
76 else:
77 d.addErrback(self.send_signal_eb, name)
78 if to_remove:
79 for wrapper in to_remove:
80 log.debug("Removing signal handler for dead frontend")
81 self.signals_handlers.remove(wrapper)
82
83 def _bridge_deactivate_signals(self):
84 if hasattr(self, "signals_paused"):
85 log.warning("bridge signals already deactivated")
86 if self.signals_handler:
87 self.signals_paused.extend(self.signals_handler)
88 else:
89 self.signals_paused = self.signals_handlers
90 self.signals_handlers = []
91 log.debug("bridge signals have been deactivated")
92
93 def _bridge_reactivate_signals(self):
94 try:
95 self.signals_handlers = self.signals_paused
96 except AttributeError:
97 log.debug("signals were already activated")
98 else:
99 del self.signals_paused
100 log.debug("bridge signals have been reactivated")
101
102 ##METHODS_PART##
103
104
105 class bridge(object):
106 def __init__(self):
107 log.info("Init Perspective Broker...")
108 self.root = PBRoot()
109 conf = config.parse_main_conf()
110 get_conf = partial(config.get_conf, conf, "bridge_pb", "")
111 conn_type = get_conf("connection_type", "unix_socket")
112 if conn_type == "unix_socket":
113 local_dir = Path(config.config_get(conf, "", "local_dir")).resolve()
114 socket_path = local_dir / "bridge_pb"
115 log.info(f"using UNIX Socket at {socket_path}")
116 reactor.listenUNIX(
117 str(socket_path), pb.PBServerFactory(self.root), mode=0o600
118 )
119 elif conn_type == "socket":
120 port = int(get_conf("port", 8789))
121 log.info(f"using TCP Socket at port {port}")
122 reactor.listenTCP(port, pb.PBServerFactory(self.root))
123 else:
124 raise ValueError(f"Unknown pb connection type: {conn_type!r}")
125
126 def send_signal(self, name, *args, **kwargs):
127 self.root.send_signal(name, args, kwargs)
128
129 def remote_init_bridge(self, signals_handler):
130 self.signals_handlers.append(signals_handler)
131 log.info("registered signal handler")
132
133 def register_method(self, name, callback):
134 log.debug("registering PB bridge method [%s]" % name)
135 setattr(self.root, "remote_" + name, callback)
136 #  self.root.register_method(name, callback)
137
138 def add_method(
139 self, name, int_suffix, in_sign, out_sign, method, async_=False, doc={}
140 ):
141 """Dynamically add a method to PB bridge"""
142 # FIXME: doc parameter is kept only temporary, the time to remove it from calls
143 log.debug("Adding method {name} to PB bridge".format(name=name))
144 self.register_method(name, method)
145
146 def add_signal(self, name, int_suffix, signature, doc={}):
147 log.debug("Adding signal {name} to PB bridge".format(name=name))
148 setattr(
149 self, name, lambda *args, **kwargs: self.send_signal(name, *args, **kwargs)
150 )
151
152 def bridge_deactivate_signals(self):
153 """Stop sending signals to bridge
154
155 Mainly used for mobile frontends, when the frontend is paused
156 """
157 self.root._bridge_deactivate_signals()
158
159 def bridge_reactivate_signals(self):
160 """Send again signals to bridge
161
162 Should only be used after bridge_deactivate_signals has been called
163 """
164 self.root._bridge_reactivate_signals()
165
166 def _debug(self, action, params, profile):
167 self.send_signal("_debug", action, params, profile)
168
169 def action_new(self, action_data, id, security_limit, profile):
170 self.send_signal("action_new", action_data, id, security_limit, profile)
171
172 def connected(self, jid_s, profile):
173 self.send_signal("connected", jid_s, profile)
174
175 def contact_deleted(self, entity_jid, profile):
176 self.send_signal("contact_deleted", entity_jid, profile)
177
178 def contact_new(self, contact_jid, attributes, groups, profile):
179 self.send_signal("contact_new", contact_jid, attributes, groups, profile)
180
181 def disconnected(self, profile):
182 self.send_signal("disconnected", profile)
183
184 def entity_data_updated(self, jid, name, value, profile):
185 self.send_signal("entity_data_updated", jid, name, value, profile)
186
187 def message_encryption_started(self, to_jid, encryption_data, profile_key):
188 self.send_signal("message_encryption_started", to_jid, encryption_data, profile_key)
189
190 def message_encryption_stopped(self, to_jid, encryption_data, profile_key):
191 self.send_signal("message_encryption_stopped", to_jid, encryption_data, profile_key)
192
193 def message_new(self, uid, timestamp, from_jid, to_jid, message, subject, mess_type, extra, profile):
194 self.send_signal("message_new", uid, timestamp, from_jid, to_jid, message, subject, mess_type, extra, profile)
195
196 def param_update(self, name, value, category, profile):
197 self.send_signal("param_update", name, value, category, profile)
198
199 def presence_update(self, entity_jid, show, priority, statuses, profile):
200 self.send_signal("presence_update", entity_jid, show, priority, statuses, profile)
201
202 def progress_error(self, id, error, profile):
203 self.send_signal("progress_error", id, error, profile)
204
205 def progress_finished(self, id, metadata, profile):
206 self.send_signal("progress_finished", id, metadata, profile)
207
208 def progress_started(self, id, metadata, profile):
209 self.send_signal("progress_started", id, metadata, profile)
210
211 def subscribe(self, sub_type, entity_jid, profile):
212 self.send_signal("subscribe", sub_type, entity_jid, profile)