Mercurial > libervia-backend
comparison sat/tools/sat_defer.py @ 2562:26edcf3a30eb
core, setup: huge cleaning:
- moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention
- move twisted directory to root
- removed all hacks from setup.py, and added missing dependencies, it is now clean
- use https URL for website in setup.py
- removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed
- renamed sat.sh to sat and fixed its installation
- added python_requires to specify Python version needed
- replaced glib2reactor which use deprecated code by gtk3reactor
sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Apr 2018 19:44:50 +0200 |
parents | src/tools/sat_defer.py@0046283a285d |
children | 56f94936df1e |
comparison
equal
deleted
inserted
replaced
2561:bd30dc3ffe5a | 2562:26edcf3a30eb |
---|---|
1 #!/usr/bin/env python2 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SàT: a XMPP client | |
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 """tools related to deferred""" | |
21 | |
22 from sat.core.log import getLogger | |
23 log = getLogger(__name__) | |
24 from sat.core import exceptions | |
25 from twisted.internet import defer | |
26 from twisted.internet import error as internet_error | |
27 from twisted.internet import reactor | |
28 from twisted.python import failure | |
29 from sat.core.constants import Const as C | |
30 from sat.memory import memory | |
31 | |
32 KEY_DEFERREDS = 'deferreds' | |
33 KEY_NEXT = 'next_defer' | |
34 | |
35 | |
36 class DelayedDeferred(object): | |
37 """A Deferred-like which is launched after a delay""" | |
38 | |
39 def __init__(self, delay, result): | |
40 """ | |
41 @param delay(float): delay before launching the callback, in seconds | |
42 @param result: result used with the callback | |
43 """ | |
44 self._deferred = defer.Deferred() | |
45 self._timer = reactor.callLater(delay, self._deferred.callback, result) | |
46 | |
47 def cancel(self): | |
48 try: | |
49 self._timer.cancel() | |
50 except internet_error.AlreadyCalled: | |
51 pass | |
52 self._deferred.cancel() | |
53 | |
54 def addCallbacks(self, *args, **kwargs): | |
55 self._deferred.addCallbacks(*args,**kwargs) | |
56 | |
57 def addCallback(self, *args, **kwargs): | |
58 self._deferred.addCallback(*args,**kwargs) | |
59 | |
60 def addErrback(self, *args, **kwargs): | |
61 self._deferred.addErrback(*args,**kwargs) | |
62 | |
63 def addBoth(self, *args, **kwargs): | |
64 self._deferred.addBoth(*args,**kwargs) | |
65 | |
66 def chainDeferred(self, *args, **kwargs): | |
67 self._deferred.chainDeferred(*args,**kwargs) | |
68 | |
69 def pause(self): | |
70 self._deferred.pause() | |
71 | |
72 def unpause(self): | |
73 self._deferred.unpause() | |
74 | |
75 | |
76 class RTDeferredSessions(memory.Sessions): | |
77 """Real Time Deferred Sessions""" | |
78 | |
79 | |
80 def __init__(self, timeout=120): | |
81 """Manage list of Deferreds in real-time, allowing to get intermediate results | |
82 | |
83 @param timeout (int): nb of seconds before deferreds cancellation | |
84 """ | |
85 super(RTDeferredSessions, self).__init__(timeout=timeout, resettable_timeout=False) | |
86 | |
87 def newSession(self, deferreds, profile): | |
88 """Launch a new session with a list of deferreds | |
89 | |
90 @param deferreds(list[defer.Deferred]): list of deferred to call | |
91 @param profile: %(doc_profile)s | |
92 @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received | |
93 """ | |
94 data = {KEY_NEXT: defer.Deferred()} | |
95 session_id, session_data = super(RTDeferredSessions, self).newSession(data, profile=profile) | |
96 if isinstance(deferreds, dict): | |
97 session_data[KEY_DEFERREDS] = deferreds.values() | |
98 iterator = deferreds.iteritems() | |
99 else: | |
100 session_data[KEY_DEFERREDS] = deferreds | |
101 iterator = enumerate(deferreds) | |
102 | |
103 for idx, d in iterator: | |
104 d._RTDeferred_index = idx | |
105 d._RTDeferred_return = None | |
106 d.addCallback(self._callback, d, session_id, profile) | |
107 d.addErrback(self._errback, d, session_id, profile) | |
108 return session_id | |
109 | |
110 def _purgeSession(self, session_id, reason=u"timeout", no_warning=False, got_result=False): | |
111 """Purge the session | |
112 | |
113 @param session_id(str): id of the session to purge | |
114 @param reason (unicode): human readable reason why the session is purged | |
115 @param no_warning(bool): if True, no warning will be put in logs | |
116 @param got_result(bool): True if the session is purged after normal ending (i.e.: all the results have been gotten). | |
117 reason and no_warning are ignored if got_result is True. | |
118 @raise KeyError: session doesn't exists (anymore ?) | |
119 """ | |
120 if not got_result: | |
121 try: | |
122 timer, session_data, profile = self._sessions[session_id] | |
123 except ValueError: | |
124 raise exceptions.InternalError(u'was expecting timer, session_data and profile; is profile set ?') | |
125 | |
126 # next_defer must be called before deferreds, | |
127 # else its callback will be called by _gotResult | |
128 next_defer = session_data[KEY_NEXT] | |
129 if not next_defer.called: | |
130 next_defer.errback(failure.Failure(defer.CancelledError(reason))) | |
131 | |
132 deferreds = session_data[KEY_DEFERREDS] | |
133 for d in deferreds: | |
134 d.cancel() | |
135 | |
136 if not no_warning: | |
137 log.warning(u"RTDeferredList cancelled: {} (profile {})".format(reason, profile)) | |
138 | |
139 super(RTDeferredSessions, self)._purgeSession(session_id) | |
140 | |
141 def _gotResult(self, session_id, profile): | |
142 """Method called after each callback or errback | |
143 | |
144 manage the next_defer deferred | |
145 """ | |
146 session_data = self.profileGet(session_id, profile) | |
147 defer_next = session_data[KEY_NEXT] | |
148 if not defer_next.called: | |
149 defer_next.callback(None) | |
150 | |
151 def _callback(self, result, deferred, session_id, profile): | |
152 deferred._RTDeferred_return = (True, result) | |
153 self._gotResult(session_id, profile) | |
154 | |
155 def _errback(self, failure, deferred, session_id, profile): | |
156 deferred._RTDeferred_return = (False, failure) | |
157 self._gotResult(session_id, profile) | |
158 | |
159 def cancel(self, session_id, reason=u"timeout", no_log=False): | |
160 """Stop this RTDeferredList | |
161 | |
162 Cancel all remaining deferred, and call self.final_defer.errback | |
163 @param reason (unicode): reason of the cancellation | |
164 @param no_log(bool): if True, don't log the cancellation | |
165 """ | |
166 self._purgeSession(session_id, reason=reason, no_warning=no_log) | |
167 | |
168 def getResults(self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE): | |
169 """Get current results of a real-time deferred session | |
170 | |
171 result already gotten are deleted | |
172 @param session_id(str): session id | |
173 @param on_success: can be: | |
174 - None: add success normaly to results | |
175 - callable: replace result by the return value of on_success(result) (may be deferred) | |
176 @param on_error: can be: | |
177 - None: add error normaly to results | |
178 - C.IGNORE: don't put errors in results | |
179 - callable: replace failure by the return value of on_error(failure) (may be deferred) | |
180 @param profile=%(doc_profile)s | |
181 @param result(tuple): tuple(remaining, results) where: | |
182 - remaining[int] is the number of remaining deferred | |
183 (deferreds from which we don't have result yet) | |
184 - results is a dict where: | |
185 - key is the index of the deferred if deferred is a list, or its key if it's a dict | |
186 - value = (success, result) where: | |
187 - success is True if the deferred was successful | |
188 - result is the result in case of success, else the failure | |
189 If remaining == 0, the session is ended | |
190 @raise KeyError: the session is already finished or doesn't exists at all | |
191 """ | |
192 if profile == C.PROF_KEY_NONE: | |
193 raise exceptions.ProfileNotSetError | |
194 session_data = self.profileGet(session_id, profile) | |
195 | |
196 @defer.inlineCallbacks | |
197 def next_cb(dummy): | |
198 # we got one or several results | |
199 results = {} | |
200 filtered_data = [] # used to keep deferreds without results | |
201 deferreds = session_data[KEY_DEFERREDS] | |
202 | |
203 for d in deferreds: | |
204 if d._RTDeferred_return: # we don't use d.called as called is True before the full callbacks chain has been called | |
205 # we have a result | |
206 idx = d._RTDeferred_index | |
207 success, result = d._RTDeferred_return | |
208 if success: | |
209 if on_success is not None: | |
210 if callable(on_success): | |
211 result = yield on_success(result) | |
212 else: | |
213 raise exceptions.InternalError('Unknown value of on_success: {}'.format(on_success)) | |
214 | |
215 else: | |
216 if on_error is not None: | |
217 if on_error == C.IGNORE: | |
218 continue | |
219 elif callable(on_error): | |
220 result = yield on_error(result) | |
221 else: | |
222 raise exceptions.InternalError('Unknown value of on_error: {}'.format(on_error)) | |
223 results[idx] = (success, result) | |
224 else: | |
225 filtered_data.append(d) | |
226 | |
227 # we change the deferred with the filtered list | |
228 # in other terms, we don't want anymore deferred from which we have got the result | |
229 session_data[KEY_DEFERREDS] = filtered_data | |
230 | |
231 if filtered_data: | |
232 # we create a new next_defer only if we are still waiting for results | |
233 session_data[KEY_NEXT] = defer.Deferred() | |
234 else: | |
235 # no more data to get, the result have been gotten, | |
236 # we can cleanly finish the session | |
237 self._purgeSession(session_id, got_result=True) | |
238 | |
239 defer.returnValue((len(filtered_data), results)) | |
240 | |
241 # we wait for a result | |
242 return session_data[KEY_NEXT].addCallback(next_cb) |