Mercurial > libervia-backend
view sat/tools/sat_defer.py @ 3582:71516731d0aa
core (memory/sqla): database migration using Alembic:
Alembic database migration tool, which is the recommended one for SQLAlchemy has been
integrated. When a database is created, it will be used to stamp to current (head)
revision, otherwise, DB will be checked to see if it needs to be updated, and upgrade will
be triggered if necessary.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 25 Jun 2021 17:55:23 +0200 |
parents | 7550ae9cfbac |
children | 524856bd7b19 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """tools related to deferred""" from sat.core.log import getLogger log = getLogger(__name__) from sat.core import exceptions from twisted.internet import defer from twisted.internet import error as internet_error from twisted.internet import reactor from twisted.words.protocols.jabber import error as jabber_error from twisted.python import failure from sat.core.constants import Const as C from sat.memory import memory KEY_DEFERREDS = "deferreds" KEY_NEXT = "next_defer" def stanza2NotFound(failure_): """Convert item-not-found StanzaError to exceptions.NotFound""" failure_.trap(jabber_error.StanzaError) if failure_.value.condition == 'item-not-found': raise exceptions.NotFound(failure_.value.text or failure_.value.condition) return failure_ class DelayedDeferred(object): """A Deferred-like which is launched after a delay""" def __init__(self, delay, result): """ @param delay(float): delay before launching the callback, in seconds @param result: result used with the callback """ self._deferred = defer.Deferred() self._timer = reactor.callLater(delay, self._deferred.callback, result) def cancel(self): try: self._timer.cancel() except internet_error.AlreadyCalled: pass self._deferred.cancel() def addCallbacks(self, *args, **kwargs): self._deferred.addCallbacks(*args, **kwargs) def addCallback(self, *args, **kwargs): self._deferred.addCallback(*args, **kwargs) def addErrback(self, *args, **kwargs): self._deferred.addErrback(*args, **kwargs) def addBoth(self, *args, **kwargs): self._deferred.addBoth(*args, **kwargs) def chainDeferred(self, *args, **kwargs): self._deferred.chainDeferred(*args, **kwargs) def pause(self): self._deferred.pause() def unpause(self): self._deferred.unpause() class RTDeferredSessions(memory.Sessions): """Real Time Deferred Sessions""" def __init__(self, timeout=120): """Manage list of Deferreds in real-time, allowing to get intermediate results @param timeout (int): nb of seconds before deferreds cancellation """ super(RTDeferredSessions, self).__init__( timeout=timeout, resettable_timeout=False ) def newSession(self, deferreds, profile): """Launch a new session with a list of deferreds @param deferreds(list[defer.Deferred]): list of deferred to call @param profile: %(doc_profile)s @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received """ data = {KEY_NEXT: defer.Deferred()} session_id, session_data = super(RTDeferredSessions, self).newSession( data, profile=profile ) if isinstance(deferreds, dict): session_data[KEY_DEFERREDS] = list(deferreds.values()) iterator = iter(deferreds.items()) else: session_data[KEY_DEFERREDS] = deferreds iterator = enumerate(deferreds) for idx, d in iterator: d._RTDeferred_index = idx d._RTDeferred_return = None d.addCallback(self._callback, d, session_id, profile) d.addErrback(self._errback, d, session_id, profile) return session_id def _purgeSession( self, session_id, reason="timeout", no_warning=False, got_result=False ): """Purge the session @param session_id(str): id of the session to purge @param reason (unicode): human readable reason why the session is purged @param no_warning(bool): if True, no warning will be put in logs @param got_result(bool): True if the session is purged after normal ending (i.e.: all the results have been gotten). reason and no_warning are ignored if got_result is True. @raise KeyError: session doesn't exists (anymore ?) """ if not got_result: try: timer, session_data, profile = self._sessions[session_id] except ValueError: raise exceptions.InternalError( "was expecting timer, session_data and profile; is profile set ?" ) # next_defer must be called before deferreds, # else its callback will be called by _gotResult next_defer = session_data[KEY_NEXT] if not next_defer.called: next_defer.errback(failure.Failure(defer.CancelledError(reason))) deferreds = session_data[KEY_DEFERREDS] for d in deferreds: d.cancel() if not no_warning: log.warning( "RTDeferredList cancelled: {} (profile {})".format(reason, profile) ) super(RTDeferredSessions, self)._purgeSession(session_id) def _gotResult(self, session_id, profile): """Method called after each callback or errback manage the next_defer deferred """ session_data = self.profileGet(session_id, profile) defer_next = session_data[KEY_NEXT] if not defer_next.called: defer_next.callback(None) def _callback(self, result, deferred, session_id, profile): deferred._RTDeferred_return = (True, result) self._gotResult(session_id, profile) def _errback(self, failure, deferred, session_id, profile): deferred._RTDeferred_return = (False, failure) self._gotResult(session_id, profile) def cancel(self, session_id, reason="timeout", no_log=False): """Stop this RTDeferredList Cancel all remaining deferred, and call self.final_defer.errback @param reason (unicode): reason of the cancellation @param no_log(bool): if True, don't log the cancellation """ self._purgeSession(session_id, reason=reason, no_warning=no_log) def getResults( self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE ): """Get current results of a real-time deferred session result already gotten are deleted @param session_id(str): session id @param on_success: can be: - None: add success normaly to results - callable: replace result by the return value of on_success(result) (may be deferred) @param on_error: can be: - None: add error normaly to results - C.IGNORE: don't put errors in results - callable: replace failure by the return value of on_error(failure) (may be deferred) @param profile=%(doc_profile)s @param result(tuple): tuple(remaining, results) where: - remaining[int] is the number of remaining deferred (deferreds from which we don't have result yet) - results is a dict where: - key is the index of the deferred if deferred is a list, or its key if it's a dict - value = (success, result) where: - success is True if the deferred was successful - result is the result in case of success, else the failure If remaining == 0, the session is ended @raise KeyError: the session is already finished or doesn't exists at all """ if profile == C.PROF_KEY_NONE: raise exceptions.ProfileNotSetError session_data = self.profileGet(session_id, profile) @defer.inlineCallbacks def next_cb(__): # we got one or several results results = {} filtered_data = [] # used to keep deferreds without results deferreds = session_data[KEY_DEFERREDS] for d in deferreds: if ( d._RTDeferred_return ): # we don't use d.called as called is True before the full callbacks chain has been called # we have a result idx = d._RTDeferred_index success, result = d._RTDeferred_return if success: if on_success is not None: if callable(on_success): result = yield on_success(result) else: raise exceptions.InternalError( "Unknown value of on_success: {}".format(on_success) ) else: if on_error is not None: if on_error == C.IGNORE: continue elif callable(on_error): result = yield on_error(result) else: raise exceptions.InternalError( "Unknown value of on_error: {}".format(on_error) ) results[idx] = (success, result) else: filtered_data.append(d) # we change the deferred with the filtered list # in other terms, we don't want anymore deferred from which we have got the result session_data[KEY_DEFERREDS] = filtered_data if filtered_data: # we create a new next_defer only if we are still waiting for results session_data[KEY_NEXT] = defer.Deferred() else: # no more data to get, the result have been gotten, # we can cleanly finish the session self._purgeSession(session_id, got_result=True) defer.returnValue((len(filtered_data), results)) # we wait for a result return session_data[KEY_NEXT].addCallback(next_cb)