Mercurial > libervia-backend
comparison sat/tools/sat_defer.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 378188abe941 |
comparison
equal
deleted
inserted
replaced
2623:49533de4540b | 2624:56f94936df1e |
---|---|
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 """tools related to deferred""" | 20 """tools related to deferred""" |
21 | 21 |
22 from sat.core.log import getLogger | 22 from sat.core.log import getLogger |
23 | |
23 log = getLogger(__name__) | 24 log = getLogger(__name__) |
24 from sat.core import exceptions | 25 from sat.core import exceptions |
25 from twisted.internet import defer | 26 from twisted.internet import defer |
26 from twisted.internet import error as internet_error | 27 from twisted.internet import error as internet_error |
27 from twisted.internet import reactor | 28 from twisted.internet import reactor |
28 from twisted.python import failure | 29 from twisted.python import failure |
29 from sat.core.constants import Const as C | 30 from sat.core.constants import Const as C |
30 from sat.memory import memory | 31 from sat.memory import memory |
31 | 32 |
32 KEY_DEFERREDS = 'deferreds' | 33 KEY_DEFERREDS = "deferreds" |
33 KEY_NEXT = 'next_defer' | 34 KEY_NEXT = "next_defer" |
34 | 35 |
35 | 36 |
36 class DelayedDeferred(object): | 37 class DelayedDeferred(object): |
37 """A Deferred-like which is launched after a delay""" | 38 """A Deferred-like which is launched after a delay""" |
38 | 39 |
50 except internet_error.AlreadyCalled: | 51 except internet_error.AlreadyCalled: |
51 pass | 52 pass |
52 self._deferred.cancel() | 53 self._deferred.cancel() |
53 | 54 |
54 def addCallbacks(self, *args, **kwargs): | 55 def addCallbacks(self, *args, **kwargs): |
55 self._deferred.addCallbacks(*args,**kwargs) | 56 self._deferred.addCallbacks(*args, **kwargs) |
56 | 57 |
57 def addCallback(self, *args, **kwargs): | 58 def addCallback(self, *args, **kwargs): |
58 self._deferred.addCallback(*args,**kwargs) | 59 self._deferred.addCallback(*args, **kwargs) |
59 | 60 |
60 def addErrback(self, *args, **kwargs): | 61 def addErrback(self, *args, **kwargs): |
61 self._deferred.addErrback(*args,**kwargs) | 62 self._deferred.addErrback(*args, **kwargs) |
62 | 63 |
63 def addBoth(self, *args, **kwargs): | 64 def addBoth(self, *args, **kwargs): |
64 self._deferred.addBoth(*args,**kwargs) | 65 self._deferred.addBoth(*args, **kwargs) |
65 | 66 |
66 def chainDeferred(self, *args, **kwargs): | 67 def chainDeferred(self, *args, **kwargs): |
67 self._deferred.chainDeferred(*args,**kwargs) | 68 self._deferred.chainDeferred(*args, **kwargs) |
68 | 69 |
69 def pause(self): | 70 def pause(self): |
70 self._deferred.pause() | 71 self._deferred.pause() |
71 | 72 |
72 def unpause(self): | 73 def unpause(self): |
74 | 75 |
75 | 76 |
76 class RTDeferredSessions(memory.Sessions): | 77 class RTDeferredSessions(memory.Sessions): |
77 """Real Time Deferred Sessions""" | 78 """Real Time Deferred Sessions""" |
78 | 79 |
79 | |
80 def __init__(self, timeout=120): | 80 def __init__(self, timeout=120): |
81 """Manage list of Deferreds in real-time, allowing to get intermediate results | 81 """Manage list of Deferreds in real-time, allowing to get intermediate results |
82 | 82 |
83 @param timeout (int): nb of seconds before deferreds cancellation | 83 @param timeout (int): nb of seconds before deferreds cancellation |
84 """ | 84 """ |
85 super(RTDeferredSessions, self).__init__(timeout=timeout, resettable_timeout=False) | 85 super(RTDeferredSessions, self).__init__( |
86 timeout=timeout, resettable_timeout=False | |
87 ) | |
86 | 88 |
87 def newSession(self, deferreds, profile): | 89 def newSession(self, deferreds, profile): |
88 """Launch a new session with a list of deferreds | 90 """Launch a new session with a list of deferreds |
89 | 91 |
90 @param deferreds(list[defer.Deferred]): list of deferred to call | 92 @param deferreds(list[defer.Deferred]): list of deferred to call |
91 @param profile: %(doc_profile)s | 93 @param profile: %(doc_profile)s |
92 @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received | 94 @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received |
93 """ | 95 """ |
94 data = {KEY_NEXT: defer.Deferred()} | 96 data = {KEY_NEXT: defer.Deferred()} |
95 session_id, session_data = super(RTDeferredSessions, self).newSession(data, profile=profile) | 97 session_id, session_data = super(RTDeferredSessions, self).newSession( |
98 data, profile=profile | |
99 ) | |
96 if isinstance(deferreds, dict): | 100 if isinstance(deferreds, dict): |
97 session_data[KEY_DEFERREDS] = deferreds.values() | 101 session_data[KEY_DEFERREDS] = deferreds.values() |
98 iterator = deferreds.iteritems() | 102 iterator = deferreds.iteritems() |
99 else: | 103 else: |
100 session_data[KEY_DEFERREDS] = deferreds | 104 session_data[KEY_DEFERREDS] = deferreds |
105 d._RTDeferred_return = None | 109 d._RTDeferred_return = None |
106 d.addCallback(self._callback, d, session_id, profile) | 110 d.addCallback(self._callback, d, session_id, profile) |
107 d.addErrback(self._errback, d, session_id, profile) | 111 d.addErrback(self._errback, d, session_id, profile) |
108 return session_id | 112 return session_id |
109 | 113 |
110 def _purgeSession(self, session_id, reason=u"timeout", no_warning=False, got_result=False): | 114 def _purgeSession( |
115 self, session_id, reason=u"timeout", no_warning=False, got_result=False | |
116 ): | |
111 """Purge the session | 117 """Purge the session |
112 | 118 |
113 @param session_id(str): id of the session to purge | 119 @param session_id(str): id of the session to purge |
114 @param reason (unicode): human readable reason why the session is purged | 120 @param reason (unicode): human readable reason why the session is purged |
115 @param no_warning(bool): if True, no warning will be put in logs | 121 @param no_warning(bool): if True, no warning will be put in logs |
119 """ | 125 """ |
120 if not got_result: | 126 if not got_result: |
121 try: | 127 try: |
122 timer, session_data, profile = self._sessions[session_id] | 128 timer, session_data, profile = self._sessions[session_id] |
123 except ValueError: | 129 except ValueError: |
124 raise exceptions.InternalError(u'was expecting timer, session_data and profile; is profile set ?') | 130 raise exceptions.InternalError( |
131 u"was expecting timer, session_data and profile; is profile set ?" | |
132 ) | |
125 | 133 |
126 # next_defer must be called before deferreds, | 134 # next_defer must be called before deferreds, |
127 # else its callback will be called by _gotResult | 135 # else its callback will be called by _gotResult |
128 next_defer = session_data[KEY_NEXT] | 136 next_defer = session_data[KEY_NEXT] |
129 if not next_defer.called: | 137 if not next_defer.called: |
132 deferreds = session_data[KEY_DEFERREDS] | 140 deferreds = session_data[KEY_DEFERREDS] |
133 for d in deferreds: | 141 for d in deferreds: |
134 d.cancel() | 142 d.cancel() |
135 | 143 |
136 if not no_warning: | 144 if not no_warning: |
137 log.warning(u"RTDeferredList cancelled: {} (profile {})".format(reason, profile)) | 145 log.warning( |
146 u"RTDeferredList cancelled: {} (profile {})".format(reason, profile) | |
147 ) | |
138 | 148 |
139 super(RTDeferredSessions, self)._purgeSession(session_id) | 149 super(RTDeferredSessions, self)._purgeSession(session_id) |
140 | 150 |
141 def _gotResult(self, session_id, profile): | 151 def _gotResult(self, session_id, profile): |
142 """Method called after each callback or errback | 152 """Method called after each callback or errback |
163 @param reason (unicode): reason of the cancellation | 173 @param reason (unicode): reason of the cancellation |
164 @param no_log(bool): if True, don't log the cancellation | 174 @param no_log(bool): if True, don't log the cancellation |
165 """ | 175 """ |
166 self._purgeSession(session_id, reason=reason, no_warning=no_log) | 176 self._purgeSession(session_id, reason=reason, no_warning=no_log) |
167 | 177 |
168 def getResults(self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE): | 178 def getResults( |
179 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE | |
180 ): | |
169 """Get current results of a real-time deferred session | 181 """Get current results of a real-time deferred session |
170 | 182 |
171 result already gotten are deleted | 183 result already gotten are deleted |
172 @param session_id(str): session id | 184 @param session_id(str): session id |
173 @param on_success: can be: | 185 @param on_success: can be: |
195 | 207 |
196 @defer.inlineCallbacks | 208 @defer.inlineCallbacks |
197 def next_cb(dummy): | 209 def next_cb(dummy): |
198 # we got one or several results | 210 # we got one or several results |
199 results = {} | 211 results = {} |
200 filtered_data = [] # used to keep deferreds without results | 212 filtered_data = [] # used to keep deferreds without results |
201 deferreds = session_data[KEY_DEFERREDS] | 213 deferreds = session_data[KEY_DEFERREDS] |
202 | 214 |
203 for d in deferreds: | 215 for d in deferreds: |
204 if d._RTDeferred_return: # we don't use d.called as called is True before the full callbacks chain has been called | 216 if ( |
217 d._RTDeferred_return | |
218 ): # we don't use d.called as called is True before the full callbacks chain has been called | |
205 # we have a result | 219 # we have a result |
206 idx = d._RTDeferred_index | 220 idx = d._RTDeferred_index |
207 success, result = d._RTDeferred_return | 221 success, result = d._RTDeferred_return |
208 if success: | 222 if success: |
209 if on_success is not None: | 223 if on_success is not None: |
210 if callable(on_success): | 224 if callable(on_success): |
211 result = yield on_success(result) | 225 result = yield on_success(result) |
212 else: | 226 else: |
213 raise exceptions.InternalError('Unknown value of on_success: {}'.format(on_success)) | 227 raise exceptions.InternalError( |
228 "Unknown value of on_success: {}".format(on_success) | |
229 ) | |
214 | 230 |
215 else: | 231 else: |
216 if on_error is not None: | 232 if on_error is not None: |
217 if on_error == C.IGNORE: | 233 if on_error == C.IGNORE: |
218 continue | 234 continue |
219 elif callable(on_error): | 235 elif callable(on_error): |
220 result = yield on_error(result) | 236 result = yield on_error(result) |
221 else: | 237 else: |
222 raise exceptions.InternalError('Unknown value of on_error: {}'.format(on_error)) | 238 raise exceptions.InternalError( |
239 "Unknown value of on_error: {}".format(on_error) | |
240 ) | |
223 results[idx] = (success, result) | 241 results[idx] = (success, result) |
224 else: | 242 else: |
225 filtered_data.append(d) | 243 filtered_data.append(d) |
226 | 244 |
227 # we change the deferred with the filtered list | 245 # we change the deferred with the filtered list |