Mercurial > libervia-backend
comparison src/tools/sat_defer.py @ 1448:227856b13d7a
core: new tools.sat_defer module, and implementation of RTDeferredSessions:
The real-time deferred session is a class which manage session of several deferreds at once (pretty much like a DeferredList), with the ability to get intermediate results.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 15 Aug 2015 22:13:27 +0200 |
parents | |
children | e987325c14ef |
comparison
equal
deleted
inserted
replaced
1447:b003dbd2b4e9 | 1448:227856b13d7a |
---|---|
1 #!/usr/bin/python | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SàT: a XMPP client | |
5 # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 """tools related to deferred""" | |
21 | |
22 from sat.core.log import getLogger | |
23 log = getLogger(__name__) | |
24 from sat.core import exceptions | |
25 from twisted.internet import defer | |
26 from twisted.python import failure | |
27 from sat.core.constants import Const as C | |
28 from sat.memory import memory | |
29 | |
30 KEY_DEFERREDS = 'deferreds' | |
31 KEY_NEXT = 'next_defer' | |
32 | |
33 | |
34 class RTDeferredSessions(memory.Sessions): | |
35 """Real Time Deferred Sessions""" | |
36 | |
37 | |
38 def __init__(self, timeout=120): | |
39 """Manage list of Deferreds in real-time, allowing to get intermediate results | |
40 | |
41 @param timeout (int): nb of seconds before deferreds cancellation | |
42 """ | |
43 super(RTDeferredSessions, self).__init__(timeout=timeout, resettable_timeout=False) | |
44 | |
45 def newSession(self, deferreds, profile): | |
46 """Launch a new session with a list of deferreds | |
47 | |
48 @param deferreds(list[defer.Deferred]): list of deferred to call | |
49 @param profile: %(doc_profile)s | |
50 @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received | |
51 """ | |
52 data = {KEY_NEXT: defer.Deferred()} | |
53 session_id, session_data = super(RTDeferredSessions, self).newSession(data, profile=profile) | |
54 if isinstance(deferreds, dict): | |
55 session_data[KEY_DEFERREDS] = deferreds.values() | |
56 iterator = deferreds.iteritems() | |
57 else: | |
58 session_data[KEY_DEFERREDS] = deferreds | |
59 iterator = enumerate(deferreds) | |
60 | |
61 for idx, d in iterator: | |
62 d._RTDeferred_index = idx | |
63 d._RTDeferred_return = None | |
64 d.addCallback(self._callback, d, session_id, profile) | |
65 d.addErrback(self._errback, d, session_id, profile) | |
66 return session_id | |
67 | |
68 def _purgeSession(self, session_id, reason=u"timeout", no_warning=False, got_result=False): | |
69 """Purge the session | |
70 | |
71 @param session_id(str): id of the session to purge | |
72 @param reason (unicode): human readable reason why the session is purged | |
73 @param no_warning(bool): if True, no warning will be put in logs | |
74 @param got_result(bool): True if the session is purged after normal ending (i.e.: all the results have been gotten). | |
75 reason and no_warning are ignored if got_result is True. | |
76 @raise KeyError: session doesn't exists (anymore ?) | |
77 """ | |
78 if not got_result: | |
79 try: | |
80 timer, session_data, profile = self._sessions[session_id] | |
81 except ValueError: | |
82 raise exceptions.InternalError(u'was expecting timer, session_data and profile; is profile set ?') | |
83 | |
84 # next_defer must be called before deferreds, | |
85 # else its callback will be called by _gotResult | |
86 next_defer = session_data[KEY_NEXT] | |
87 if not next_defer.called: | |
88 next_defer.errback(failure.Failure(defer.CancelledError(reason))) | |
89 | |
90 deferreds = session_data[KEY_DEFERREDS] | |
91 for d in deferreds: | |
92 d.cancel() | |
93 | |
94 if not no_warning: | |
95 log.warning(u"RTDeferredList cancelled: {} (profile {})".format(reason, profile)) | |
96 | |
97 super(RTDeferredSessions, self)._purgeSession(session_id) | |
98 | |
99 def _gotResult(self, session_id, profile): | |
100 """Method called after each callback or errback | |
101 | |
102 manage the next_defer deferred | |
103 """ | |
104 session_data = self.profileGet(session_id, profile) | |
105 defer_next = session_data[KEY_NEXT] | |
106 if not defer_next.called: | |
107 defer_next.callback(None) | |
108 | |
109 def _callback(self, result, deferred, session_id, profile): | |
110 deferred._RTDeferred_return = (True, result) | |
111 self._gotResult(session_id, profile) | |
112 | |
113 def _errback(self, failure, deferred, session_id, profile): | |
114 deferred._RTDeferred_return = (False, failure) | |
115 self._gotResult(session_id, profile) | |
116 | |
117 def cancel(self, session_id, reason=u"timeout", no_log=False): | |
118 """Stop this RTDeferredList | |
119 | |
120 Cancel all remaining deferred, and call self.final_defer.errback | |
121 @param reason (unicode): reason of the cancellation | |
122 @param no_log(bool): if True, don't log the cancellation | |
123 """ | |
124 self._purgeSession(session_id, reason=reason, no_warning=no_log) | |
125 | |
126 def getResults(self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE): | |
127 """Get current results of a real-time deferred session | |
128 | |
129 result already gotten are deleted | |
130 @param session_id(str): session id | |
131 @param on_success: can be: | |
132 - None: add success normaly to results | |
133 - callable: replace result by the return value of on_success(result) (may be deferred) | |
134 @param on_error: can be: | |
135 - None: add error normaly to results | |
136 - C.IGNORE: don't put errors in results | |
137 - callable: replace failure by the return value of on_error(failure) (may be deferred) | |
138 @param profile=%(doc_profile)s | |
139 @param result(tuple): tuple(remaining, results) where: | |
140 - remaining[int] is the number of remaining deferred | |
141 (deferreds from which we don't have result yet) | |
142 - results is a dict where: | |
143 - key is the index of the deferred if deferred is a list, or its key if it's a dict | |
144 - value = (success, result) where: | |
145 - success is True if the deferred was successful | |
146 - result is the result in case of success, else the failure | |
147 If remaining == 0, the session is ended | |
148 @raise KeyError: the session is already finished or doesn't exists at all | |
149 """ | |
150 if profile == C.PROF_KEY_NONE: | |
151 raise exceptions.ProfileNotSetError | |
152 session_data = self.profileGet(session_id, profile) | |
153 | |
154 @defer.inlineCallbacks | |
155 def next_cb(dummy): | |
156 # we got one or several results | |
157 results = {} | |
158 filtered_data = [] # used to keep deferreds without results | |
159 deferreds = session_data[KEY_DEFERREDS] | |
160 | |
161 for d in deferreds: | |
162 if d._RTDeferred_return: # we don't use d.called as called is True before the full callbacks chain has been called | |
163 # we have a result | |
164 idx = d._RTDeferred_index | |
165 success, result = d._RTDeferred_return | |
166 if success: | |
167 if on_success is not None: | |
168 if callable(on_success): | |
169 result = yield on_success(result) | |
170 else: | |
171 raise exceptions.InternalError('Unknown value of on_success: {}'.format(on_success)) | |
172 | |
173 else: | |
174 if on_error is not None: | |
175 if on_error == C.IGNORE: | |
176 continue | |
177 elif callable(on_error): | |
178 result = yield on_error(result) | |
179 else: | |
180 raise exceptions.InternalError('Unknown value of on_error: {}'.format(on_error)) | |
181 results[idx] = (success, result) | |
182 else: | |
183 filtered_data.append(d) | |
184 | |
185 # we change the deferred with the filtered list | |
186 # in other terms, we don't want anymore deferred from which we have got the result | |
187 session_data[KEY_DEFERREDS] = filtered_data | |
188 | |
189 if filtered_data: | |
190 # we create a new next_defer only if we are still waiting for results | |
191 session_data[KEY_NEXT] = defer.Deferred() | |
192 else: | |
193 # no more data to get, the result have been gotten, | |
194 # we can cleanly finish the session | |
195 self._purgeSession(session_id, got_result=True) | |
196 | |
197 defer.returnValue((len(filtered_data), results)) | |
198 | |
199 # we wait for a result | |
200 return session_data[KEY_NEXT].addCallback(next_cb) |