comparison libervia/backend/tools/sat_defer.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/tools/sat_defer.py@524856bd7b19
children
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # Libervia: an XMPP client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 """tools related to deferred"""
21
22 from libervia.backend.core.log import getLogger
23
24 log = getLogger(__name__)
25 from libervia.backend.core import exceptions
26 from twisted.internet import defer
27 from twisted.internet import error as internet_error
28 from twisted.internet import reactor
29 from twisted.words.protocols.jabber import error as jabber_error
30 from twisted.python import failure
31 from libervia.backend.core.constants import Const as C
32 from libervia.backend.memory import memory
33
34 KEY_DEFERREDS = "deferreds"
35 KEY_NEXT = "next_defer"
36
37
38 def stanza_2_not_found(failure_):
39 """Convert item-not-found StanzaError to exceptions.NotFound"""
40 failure_.trap(jabber_error.StanzaError)
41 if failure_.value.condition == 'item-not-found':
42 raise exceptions.NotFound(failure_.value.text or failure_.value.condition)
43 return failure_
44
45
46 class DelayedDeferred(object):
47 """A Deferred-like which is launched after a delay"""
48
49 def __init__(self, delay, result):
50 """
51 @param delay(float): delay before launching the callback, in seconds
52 @param result: result used with the callback
53 """
54 self._deferred = defer.Deferred()
55 self._timer = reactor.callLater(delay, self._deferred.callback, result)
56
57 def cancel(self):
58 try:
59 self._timer.cancel()
60 except internet_error.AlreadyCalled:
61 pass
62 self._deferred.cancel()
63
64 def addCallbacks(self, *args, **kwargs):
65 self._deferred.addCallbacks(*args, **kwargs)
66
67 def addCallback(self, *args, **kwargs):
68 self._deferred.addCallback(*args, **kwargs)
69
70 def addErrback(self, *args, **kwargs):
71 self._deferred.addErrback(*args, **kwargs)
72
73 def addBoth(self, *args, **kwargs):
74 self._deferred.addBoth(*args, **kwargs)
75
76 def chainDeferred(self, *args, **kwargs):
77 self._deferred.chainDeferred(*args, **kwargs)
78
79 def pause(self):
80 self._deferred.pause()
81
82 def unpause(self):
83 self._deferred.unpause()
84
85
86 class RTDeferredSessions(memory.Sessions):
87 """Real Time Deferred Sessions"""
88
89 def __init__(self, timeout=120):
90 """Manage list of Deferreds in real-time, allowing to get intermediate results
91
92 @param timeout (int): nb of seconds before deferreds cancellation
93 """
94 super(RTDeferredSessions, self).__init__(
95 timeout=timeout, resettable_timeout=False
96 )
97
98 def new_session(self, deferreds, profile):
99 """Launch a new session with a list of deferreds
100
101 @param deferreds(list[defer.Deferred]): list of deferred to call
102 @param profile: %(doc_profile)s
103 @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received
104 """
105 data = {KEY_NEXT: defer.Deferred()}
106 session_id, session_data = super(RTDeferredSessions, self).new_session(
107 data, profile=profile
108 )
109 if isinstance(deferreds, dict):
110 session_data[KEY_DEFERREDS] = list(deferreds.values())
111 iterator = iter(deferreds.items())
112 else:
113 session_data[KEY_DEFERREDS] = deferreds
114 iterator = enumerate(deferreds)
115
116 for idx, d in iterator:
117 d._RTDeferred_index = idx
118 d._RTDeferred_return = None
119 d.addCallback(self._callback, d, session_id, profile)
120 d.addErrback(self._errback, d, session_id, profile)
121 return session_id
122
123 def _purge_session(
124 self, session_id, reason="timeout", no_warning=False, got_result=False
125 ):
126 """Purge the session
127
128 @param session_id(str): id of the session to purge
129 @param reason (unicode): human readable reason why the session is purged
130 @param no_warning(bool): if True, no warning will be put in logs
131 @param got_result(bool): True if the session is purged after normal ending (i.e.: all the results have been gotten).
132 reason and no_warning are ignored if got_result is True.
133 @raise KeyError: session doesn't exists (anymore ?)
134 """
135 if not got_result:
136 try:
137 timer, session_data, profile = self._sessions[session_id]
138 except ValueError:
139 raise exceptions.InternalError(
140 "was expecting timer, session_data and profile; is profile set ?"
141 )
142
143 # next_defer must be called before deferreds,
144 # else its callback will be called by _gotResult
145 next_defer = session_data[KEY_NEXT]
146 if not next_defer.called:
147 next_defer.errback(failure.Failure(defer.CancelledError(reason)))
148
149 deferreds = session_data[KEY_DEFERREDS]
150 for d in deferreds:
151 d.cancel()
152
153 if not no_warning:
154 log.warning(
155 "RTDeferredList cancelled: {} (profile {})".format(reason, profile)
156 )
157
158 super(RTDeferredSessions, self)._purge_session(session_id)
159
160 def _gotResult(self, session_id, profile):
161 """Method called after each callback or errback
162
163 manage the next_defer deferred
164 """
165 session_data = self.profile_get(session_id, profile)
166 defer_next = session_data[KEY_NEXT]
167 if not defer_next.called:
168 defer_next.callback(None)
169
170 def _callback(self, result, deferred, session_id, profile):
171 deferred._RTDeferred_return = (True, result)
172 self._gotResult(session_id, profile)
173
174 def _errback(self, failure, deferred, session_id, profile):
175 deferred._RTDeferred_return = (False, failure)
176 self._gotResult(session_id, profile)
177
178 def cancel(self, session_id, reason="timeout", no_log=False):
179 """Stop this RTDeferredList
180
181 Cancel all remaining deferred, and call self.final_defer.errback
182 @param reason (unicode): reason of the cancellation
183 @param no_log(bool): if True, don't log the cancellation
184 """
185 self._purge_session(session_id, reason=reason, no_warning=no_log)
186
187 def get_results(
188 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
189 ):
190 """Get current results of a real-time deferred session
191
192 result already gotten are deleted
193 @param session_id(str): session id
194 @param on_success: can be:
195 - None: add success normaly to results
196 - callable: replace result by the return value of on_success(result) (may be deferred)
197 @param on_error: can be:
198 - None: add error normaly to results
199 - C.IGNORE: don't put errors in results
200 - callable: replace failure by the return value of on_error(failure) (may be deferred)
201 @param profile=%(doc_profile)s
202 @param result(tuple): tuple(remaining, results) where:
203 - remaining[int] is the number of remaining deferred
204 (deferreds from which we don't have result yet)
205 - results is a dict where:
206 - key is the index of the deferred if deferred is a list, or its key if it's a dict
207 - value = (success, result) where:
208 - success is True if the deferred was successful
209 - result is the result in case of success, else the failure
210 If remaining == 0, the session is ended
211 @raise KeyError: the session is already finished or doesn't exists at all
212 """
213 if profile == C.PROF_KEY_NONE:
214 raise exceptions.ProfileNotSetError
215 session_data = self.profile_get(session_id, profile)
216
217 @defer.inlineCallbacks
218 def next_cb(__):
219 # we got one or several results
220 results = {}
221 filtered_data = [] # used to keep deferreds without results
222 deferreds = session_data[KEY_DEFERREDS]
223
224 for d in deferreds:
225 if (
226 d._RTDeferred_return
227 ): # we don't use d.called as called is True before the full callbacks chain has been called
228 # we have a result
229 idx = d._RTDeferred_index
230 success, result = d._RTDeferred_return
231 if success:
232 if on_success is not None:
233 if callable(on_success):
234 result = yield on_success(result)
235 else:
236 raise exceptions.InternalError(
237 "Unknown value of on_success: {}".format(on_success)
238 )
239
240 else:
241 if on_error is not None:
242 if on_error == C.IGNORE:
243 continue
244 elif callable(on_error):
245 result = yield on_error(result)
246 else:
247 raise exceptions.InternalError(
248 "Unknown value of on_error: {}".format(on_error)
249 )
250 results[idx] = (success, result)
251 else:
252 filtered_data.append(d)
253
254 # we change the deferred with the filtered list
255 # in other terms, we don't want anymore deferred from which we have got the result
256 session_data[KEY_DEFERREDS] = filtered_data
257
258 if filtered_data:
259 # we create a new next_defer only if we are still waiting for results
260 session_data[KEY_NEXT] = defer.Deferred()
261 else:
262 # no more data to get, the result have been gotten,
263 # we can cleanly finish the session
264 self._purge_session(session_id, got_result=True)
265
266 defer.returnValue((len(filtered_data), results))
267
268 # we wait for a result
269 return session_data[KEY_NEXT].addCallback(next_cb)