Mercurial > libervia-backend
comparison libervia/backend/tools/sat_defer.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/tools/sat_defer.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # Libervia: an XMPP client | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 """tools related to deferred""" | |
21 | |
22 from libervia.backend.core.log import getLogger | |
23 | |
24 log = getLogger(__name__) | |
25 from libervia.backend.core import exceptions | |
26 from twisted.internet import defer | |
27 from twisted.internet import error as internet_error | |
28 from twisted.internet import reactor | |
29 from twisted.words.protocols.jabber import error as jabber_error | |
30 from twisted.python import failure | |
31 from libervia.backend.core.constants import Const as C | |
32 from libervia.backend.memory import memory | |
33 | |
34 KEY_DEFERREDS = "deferreds" | |
35 KEY_NEXT = "next_defer" | |
36 | |
37 | |
38 def stanza_2_not_found(failure_): | |
39 """Convert item-not-found StanzaError to exceptions.NotFound""" | |
40 failure_.trap(jabber_error.StanzaError) | |
41 if failure_.value.condition == 'item-not-found': | |
42 raise exceptions.NotFound(failure_.value.text or failure_.value.condition) | |
43 return failure_ | |
44 | |
45 | |
46 class DelayedDeferred(object): | |
47 """A Deferred-like which is launched after a delay""" | |
48 | |
49 def __init__(self, delay, result): | |
50 """ | |
51 @param delay(float): delay before launching the callback, in seconds | |
52 @param result: result used with the callback | |
53 """ | |
54 self._deferred = defer.Deferred() | |
55 self._timer = reactor.callLater(delay, self._deferred.callback, result) | |
56 | |
57 def cancel(self): | |
58 try: | |
59 self._timer.cancel() | |
60 except internet_error.AlreadyCalled: | |
61 pass | |
62 self._deferred.cancel() | |
63 | |
64 def addCallbacks(self, *args, **kwargs): | |
65 self._deferred.addCallbacks(*args, **kwargs) | |
66 | |
67 def addCallback(self, *args, **kwargs): | |
68 self._deferred.addCallback(*args, **kwargs) | |
69 | |
70 def addErrback(self, *args, **kwargs): | |
71 self._deferred.addErrback(*args, **kwargs) | |
72 | |
73 def addBoth(self, *args, **kwargs): | |
74 self._deferred.addBoth(*args, **kwargs) | |
75 | |
76 def chainDeferred(self, *args, **kwargs): | |
77 self._deferred.chainDeferred(*args, **kwargs) | |
78 | |
79 def pause(self): | |
80 self._deferred.pause() | |
81 | |
82 def unpause(self): | |
83 self._deferred.unpause() | |
84 | |
85 | |
86 class RTDeferredSessions(memory.Sessions): | |
87 """Real Time Deferred Sessions""" | |
88 | |
89 def __init__(self, timeout=120): | |
90 """Manage list of Deferreds in real-time, allowing to get intermediate results | |
91 | |
92 @param timeout (int): nb of seconds before deferreds cancellation | |
93 """ | |
94 super(RTDeferredSessions, self).__init__( | |
95 timeout=timeout, resettable_timeout=False | |
96 ) | |
97 | |
98 def new_session(self, deferreds, profile): | |
99 """Launch a new session with a list of deferreds | |
100 | |
101 @param deferreds(list[defer.Deferred]): list of deferred to call | |
102 @param profile: %(doc_profile)s | |
103 @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received | |
104 """ | |
105 data = {KEY_NEXT: defer.Deferred()} | |
106 session_id, session_data = super(RTDeferredSessions, self).new_session( | |
107 data, profile=profile | |
108 ) | |
109 if isinstance(deferreds, dict): | |
110 session_data[KEY_DEFERREDS] = list(deferreds.values()) | |
111 iterator = iter(deferreds.items()) | |
112 else: | |
113 session_data[KEY_DEFERREDS] = deferreds | |
114 iterator = enumerate(deferreds) | |
115 | |
116 for idx, d in iterator: | |
117 d._RTDeferred_index = idx | |
118 d._RTDeferred_return = None | |
119 d.addCallback(self._callback, d, session_id, profile) | |
120 d.addErrback(self._errback, d, session_id, profile) | |
121 return session_id | |
122 | |
123 def _purge_session( | |
124 self, session_id, reason="timeout", no_warning=False, got_result=False | |
125 ): | |
126 """Purge the session | |
127 | |
128 @param session_id(str): id of the session to purge | |
129 @param reason (unicode): human readable reason why the session is purged | |
130 @param no_warning(bool): if True, no warning will be put in logs | |
131 @param got_result(bool): True if the session is purged after normal ending (i.e.: all the results have been gotten). | |
132 reason and no_warning are ignored if got_result is True. | |
133 @raise KeyError: session doesn't exists (anymore ?) | |
134 """ | |
135 if not got_result: | |
136 try: | |
137 timer, session_data, profile = self._sessions[session_id] | |
138 except ValueError: | |
139 raise exceptions.InternalError( | |
140 "was expecting timer, session_data and profile; is profile set ?" | |
141 ) | |
142 | |
143 # next_defer must be called before deferreds, | |
144 # else its callback will be called by _gotResult | |
145 next_defer = session_data[KEY_NEXT] | |
146 if not next_defer.called: | |
147 next_defer.errback(failure.Failure(defer.CancelledError(reason))) | |
148 | |
149 deferreds = session_data[KEY_DEFERREDS] | |
150 for d in deferreds: | |
151 d.cancel() | |
152 | |
153 if not no_warning: | |
154 log.warning( | |
155 "RTDeferredList cancelled: {} (profile {})".format(reason, profile) | |
156 ) | |
157 | |
158 super(RTDeferredSessions, self)._purge_session(session_id) | |
159 | |
160 def _gotResult(self, session_id, profile): | |
161 """Method called after each callback or errback | |
162 | |
163 manage the next_defer deferred | |
164 """ | |
165 session_data = self.profile_get(session_id, profile) | |
166 defer_next = session_data[KEY_NEXT] | |
167 if not defer_next.called: | |
168 defer_next.callback(None) | |
169 | |
170 def _callback(self, result, deferred, session_id, profile): | |
171 deferred._RTDeferred_return = (True, result) | |
172 self._gotResult(session_id, profile) | |
173 | |
174 def _errback(self, failure, deferred, session_id, profile): | |
175 deferred._RTDeferred_return = (False, failure) | |
176 self._gotResult(session_id, profile) | |
177 | |
178 def cancel(self, session_id, reason="timeout", no_log=False): | |
179 """Stop this RTDeferredList | |
180 | |
181 Cancel all remaining deferred, and call self.final_defer.errback | |
182 @param reason (unicode): reason of the cancellation | |
183 @param no_log(bool): if True, don't log the cancellation | |
184 """ | |
185 self._purge_session(session_id, reason=reason, no_warning=no_log) | |
186 | |
187 def get_results( | |
188 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE | |
189 ): | |
190 """Get current results of a real-time deferred session | |
191 | |
192 result already gotten are deleted | |
193 @param session_id(str): session id | |
194 @param on_success: can be: | |
195 - None: add success normaly to results | |
196 - callable: replace result by the return value of on_success(result) (may be deferred) | |
197 @param on_error: can be: | |
198 - None: add error normaly to results | |
199 - C.IGNORE: don't put errors in results | |
200 - callable: replace failure by the return value of on_error(failure) (may be deferred) | |
201 @param profile=%(doc_profile)s | |
202 @param result(tuple): tuple(remaining, results) where: | |
203 - remaining[int] is the number of remaining deferred | |
204 (deferreds from which we don't have result yet) | |
205 - results is a dict where: | |
206 - key is the index of the deferred if deferred is a list, or its key if it's a dict | |
207 - value = (success, result) where: | |
208 - success is True if the deferred was successful | |
209 - result is the result in case of success, else the failure | |
210 If remaining == 0, the session is ended | |
211 @raise KeyError: the session is already finished or doesn't exists at all | |
212 """ | |
213 if profile == C.PROF_KEY_NONE: | |
214 raise exceptions.ProfileNotSetError | |
215 session_data = self.profile_get(session_id, profile) | |
216 | |
217 @defer.inlineCallbacks | |
218 def next_cb(__): | |
219 # we got one or several results | |
220 results = {} | |
221 filtered_data = [] # used to keep deferreds without results | |
222 deferreds = session_data[KEY_DEFERREDS] | |
223 | |
224 for d in deferreds: | |
225 if ( | |
226 d._RTDeferred_return | |
227 ): # we don't use d.called as called is True before the full callbacks chain has been called | |
228 # we have a result | |
229 idx = d._RTDeferred_index | |
230 success, result = d._RTDeferred_return | |
231 if success: | |
232 if on_success is not None: | |
233 if callable(on_success): | |
234 result = yield on_success(result) | |
235 else: | |
236 raise exceptions.InternalError( | |
237 "Unknown value of on_success: {}".format(on_success) | |
238 ) | |
239 | |
240 else: | |
241 if on_error is not None: | |
242 if on_error == C.IGNORE: | |
243 continue | |
244 elif callable(on_error): | |
245 result = yield on_error(result) | |
246 else: | |
247 raise exceptions.InternalError( | |
248 "Unknown value of on_error: {}".format(on_error) | |
249 ) | |
250 results[idx] = (success, result) | |
251 else: | |
252 filtered_data.append(d) | |
253 | |
254 # we change the deferred with the filtered list | |
255 # in other terms, we don't want anymore deferred from which we have got the result | |
256 session_data[KEY_DEFERREDS] = filtered_data | |
257 | |
258 if filtered_data: | |
259 # we create a new next_defer only if we are still waiting for results | |
260 session_data[KEY_NEXT] = defer.Deferred() | |
261 else: | |
262 # no more data to get, the result have been gotten, | |
263 # we can cleanly finish the session | |
264 self._purge_session(session_id, got_result=True) | |
265 | |
266 defer.returnValue((len(filtered_data), results)) | |
267 | |
268 # we wait for a result | |
269 return session_data[KEY_NEXT].addCallback(next_cb) |