comparison sat/tools/sat_defer.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 378188abe941
comparison
equal deleted inserted replaced
2623:49533de4540b 2624:56f94936df1e
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 """tools related to deferred""" 20 """tools related to deferred"""
21 21
22 from sat.core.log import getLogger 22 from sat.core.log import getLogger
23
23 log = getLogger(__name__) 24 log = getLogger(__name__)
24 from sat.core import exceptions 25 from sat.core import exceptions
25 from twisted.internet import defer 26 from twisted.internet import defer
26 from twisted.internet import error as internet_error 27 from twisted.internet import error as internet_error
27 from twisted.internet import reactor 28 from twisted.internet import reactor
28 from twisted.python import failure 29 from twisted.python import failure
29 from sat.core.constants import Const as C 30 from sat.core.constants import Const as C
30 from sat.memory import memory 31 from sat.memory import memory
31 32
32 KEY_DEFERREDS = 'deferreds' 33 KEY_DEFERREDS = "deferreds"
33 KEY_NEXT = 'next_defer' 34 KEY_NEXT = "next_defer"
34 35
35 36
36 class DelayedDeferred(object): 37 class DelayedDeferred(object):
37 """A Deferred-like which is launched after a delay""" 38 """A Deferred-like which is launched after a delay"""
38 39
50 except internet_error.AlreadyCalled: 51 except internet_error.AlreadyCalled:
51 pass 52 pass
52 self._deferred.cancel() 53 self._deferred.cancel()
53 54
54 def addCallbacks(self, *args, **kwargs): 55 def addCallbacks(self, *args, **kwargs):
55 self._deferred.addCallbacks(*args,**kwargs) 56 self._deferred.addCallbacks(*args, **kwargs)
56 57
57 def addCallback(self, *args, **kwargs): 58 def addCallback(self, *args, **kwargs):
58 self._deferred.addCallback(*args,**kwargs) 59 self._deferred.addCallback(*args, **kwargs)
59 60
60 def addErrback(self, *args, **kwargs): 61 def addErrback(self, *args, **kwargs):
61 self._deferred.addErrback(*args,**kwargs) 62 self._deferred.addErrback(*args, **kwargs)
62 63
63 def addBoth(self, *args, **kwargs): 64 def addBoth(self, *args, **kwargs):
64 self._deferred.addBoth(*args,**kwargs) 65 self._deferred.addBoth(*args, **kwargs)
65 66
66 def chainDeferred(self, *args, **kwargs): 67 def chainDeferred(self, *args, **kwargs):
67 self._deferred.chainDeferred(*args,**kwargs) 68 self._deferred.chainDeferred(*args, **kwargs)
68 69
69 def pause(self): 70 def pause(self):
70 self._deferred.pause() 71 self._deferred.pause()
71 72
72 def unpause(self): 73 def unpause(self):
74 75
75 76
76 class RTDeferredSessions(memory.Sessions): 77 class RTDeferredSessions(memory.Sessions):
77 """Real Time Deferred Sessions""" 78 """Real Time Deferred Sessions"""
78 79
79
80 def __init__(self, timeout=120): 80 def __init__(self, timeout=120):
81 """Manage list of Deferreds in real-time, allowing to get intermediate results 81 """Manage list of Deferreds in real-time, allowing to get intermediate results
82 82
83 @param timeout (int): nb of seconds before deferreds cancellation 83 @param timeout (int): nb of seconds before deferreds cancellation
84 """ 84 """
85 super(RTDeferredSessions, self).__init__(timeout=timeout, resettable_timeout=False) 85 super(RTDeferredSessions, self).__init__(
86 timeout=timeout, resettable_timeout=False
87 )
86 88
87 def newSession(self, deferreds, profile): 89 def newSession(self, deferreds, profile):
88 """Launch a new session with a list of deferreds 90 """Launch a new session with a list of deferreds
89 91
90 @param deferreds(list[defer.Deferred]): list of deferred to call 92 @param deferreds(list[defer.Deferred]): list of deferred to call
91 @param profile: %(doc_profile)s 93 @param profile: %(doc_profile)s
92 @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received 94 @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received
93 """ 95 """
94 data = {KEY_NEXT: defer.Deferred()} 96 data = {KEY_NEXT: defer.Deferred()}
95 session_id, session_data = super(RTDeferredSessions, self).newSession(data, profile=profile) 97 session_id, session_data = super(RTDeferredSessions, self).newSession(
98 data, profile=profile
99 )
96 if isinstance(deferreds, dict): 100 if isinstance(deferreds, dict):
97 session_data[KEY_DEFERREDS] = deferreds.values() 101 session_data[KEY_DEFERREDS] = deferreds.values()
98 iterator = deferreds.iteritems() 102 iterator = deferreds.iteritems()
99 else: 103 else:
100 session_data[KEY_DEFERREDS] = deferreds 104 session_data[KEY_DEFERREDS] = deferreds
105 d._RTDeferred_return = None 109 d._RTDeferred_return = None
106 d.addCallback(self._callback, d, session_id, profile) 110 d.addCallback(self._callback, d, session_id, profile)
107 d.addErrback(self._errback, d, session_id, profile) 111 d.addErrback(self._errback, d, session_id, profile)
108 return session_id 112 return session_id
109 113
110 def _purgeSession(self, session_id, reason=u"timeout", no_warning=False, got_result=False): 114 def _purgeSession(
115 self, session_id, reason=u"timeout", no_warning=False, got_result=False
116 ):
111 """Purge the session 117 """Purge the session
112 118
113 @param session_id(str): id of the session to purge 119 @param session_id(str): id of the session to purge
114 @param reason (unicode): human readable reason why the session is purged 120 @param reason (unicode): human readable reason why the session is purged
115 @param no_warning(bool): if True, no warning will be put in logs 121 @param no_warning(bool): if True, no warning will be put in logs
119 """ 125 """
120 if not got_result: 126 if not got_result:
121 try: 127 try:
122 timer, session_data, profile = self._sessions[session_id] 128 timer, session_data, profile = self._sessions[session_id]
123 except ValueError: 129 except ValueError:
124 raise exceptions.InternalError(u'was expecting timer, session_data and profile; is profile set ?') 130 raise exceptions.InternalError(
131 u"was expecting timer, session_data and profile; is profile set ?"
132 )
125 133
126 # next_defer must be called before deferreds, 134 # next_defer must be called before deferreds,
127 # else its callback will be called by _gotResult 135 # else its callback will be called by _gotResult
128 next_defer = session_data[KEY_NEXT] 136 next_defer = session_data[KEY_NEXT]
129 if not next_defer.called: 137 if not next_defer.called:
132 deferreds = session_data[KEY_DEFERREDS] 140 deferreds = session_data[KEY_DEFERREDS]
133 for d in deferreds: 141 for d in deferreds:
134 d.cancel() 142 d.cancel()
135 143
136 if not no_warning: 144 if not no_warning:
137 log.warning(u"RTDeferredList cancelled: {} (profile {})".format(reason, profile)) 145 log.warning(
146 u"RTDeferredList cancelled: {} (profile {})".format(reason, profile)
147 )
138 148
139 super(RTDeferredSessions, self)._purgeSession(session_id) 149 super(RTDeferredSessions, self)._purgeSession(session_id)
140 150
141 def _gotResult(self, session_id, profile): 151 def _gotResult(self, session_id, profile):
142 """Method called after each callback or errback 152 """Method called after each callback or errback
163 @param reason (unicode): reason of the cancellation 173 @param reason (unicode): reason of the cancellation
164 @param no_log(bool): if True, don't log the cancellation 174 @param no_log(bool): if True, don't log the cancellation
165 """ 175 """
166 self._purgeSession(session_id, reason=reason, no_warning=no_log) 176 self._purgeSession(session_id, reason=reason, no_warning=no_log)
167 177
168 def getResults(self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE): 178 def getResults(
179 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
180 ):
169 """Get current results of a real-time deferred session 181 """Get current results of a real-time deferred session
170 182
171 result already gotten are deleted 183 result already gotten are deleted
172 @param session_id(str): session id 184 @param session_id(str): session id
173 @param on_success: can be: 185 @param on_success: can be:
195 207
196 @defer.inlineCallbacks 208 @defer.inlineCallbacks
197 def next_cb(dummy): 209 def next_cb(dummy):
198 # we got one or several results 210 # we got one or several results
199 results = {} 211 results = {}
200 filtered_data = [] # used to keep deferreds without results 212 filtered_data = [] # used to keep deferreds without results
201 deferreds = session_data[KEY_DEFERREDS] 213 deferreds = session_data[KEY_DEFERREDS]
202 214
203 for d in deferreds: 215 for d in deferreds:
204 if d._RTDeferred_return: # we don't use d.called as called is True before the full callbacks chain has been called 216 if (
217 d._RTDeferred_return
218 ): # we don't use d.called as called is True before the full callbacks chain has been called
205 # we have a result 219 # we have a result
206 idx = d._RTDeferred_index 220 idx = d._RTDeferred_index
207 success, result = d._RTDeferred_return 221 success, result = d._RTDeferred_return
208 if success: 222 if success:
209 if on_success is not None: 223 if on_success is not None:
210 if callable(on_success): 224 if callable(on_success):
211 result = yield on_success(result) 225 result = yield on_success(result)
212 else: 226 else:
213 raise exceptions.InternalError('Unknown value of on_success: {}'.format(on_success)) 227 raise exceptions.InternalError(
228 "Unknown value of on_success: {}".format(on_success)
229 )
214 230
215 else: 231 else:
216 if on_error is not None: 232 if on_error is not None:
217 if on_error == C.IGNORE: 233 if on_error == C.IGNORE:
218 continue 234 continue
219 elif callable(on_error): 235 elif callable(on_error):
220 result = yield on_error(result) 236 result = yield on_error(result)
221 else: 237 else:
222 raise exceptions.InternalError('Unknown value of on_error: {}'.format(on_error)) 238 raise exceptions.InternalError(
239 "Unknown value of on_error: {}".format(on_error)
240 )
223 results[idx] = (success, result) 241 results[idx] = (success, result)
224 else: 242 else:
225 filtered_data.append(d) 243 filtered_data.append(d)
226 244
227 # we change the deferred with the filtered list 245 # we change the deferred with the filtered list