diff libervia/backend/tools/sat_defer.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/tools/sat_defer.py@524856bd7b19
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/tools/sat_defer.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,269 @@
+#!/usr/bin/env python3
+
+
+# Libervia: an XMPP client
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""tools related to deferred"""
+
+from libervia.backend.core.log import getLogger
+
+log = getLogger(__name__)
+from libervia.backend.core import exceptions
+from twisted.internet import defer
+from twisted.internet import error as internet_error
+from twisted.internet import reactor
+from twisted.words.protocols.jabber import error as jabber_error
+from twisted.python import failure
+from libervia.backend.core.constants import Const as C
+from libervia.backend.memory import memory
+
+KEY_DEFERREDS = "deferreds"
+KEY_NEXT = "next_defer"
+
+
+def stanza_2_not_found(failure_):
+    """Convert item-not-found StanzaError to exceptions.NotFound"""
+    failure_.trap(jabber_error.StanzaError)
+    if failure_.value.condition == 'item-not-found':
+        raise exceptions.NotFound(failure_.value.text or failure_.value.condition)
+    return failure_
+
+
+class DelayedDeferred(object):
+    """A Deferred-like which is launched after a delay"""
+
+    def __init__(self, delay, result):
+        """
+        @param delay(float): delay before launching the callback, in seconds
+        @param result: result used with the callback
+        """
+        self._deferred = defer.Deferred()
+        self._timer = reactor.callLater(delay, self._deferred.callback, result)
+
+    def cancel(self):
+        try:
+            self._timer.cancel()
+        except internet_error.AlreadyCalled:
+            pass
+        self._deferred.cancel()
+
+    def addCallbacks(self, *args, **kwargs):
+        self._deferred.addCallbacks(*args, **kwargs)
+
+    def addCallback(self, *args, **kwargs):
+        self._deferred.addCallback(*args, **kwargs)
+
+    def addErrback(self, *args, **kwargs):
+        self._deferred.addErrback(*args, **kwargs)
+
+    def addBoth(self, *args, **kwargs):
+        self._deferred.addBoth(*args, **kwargs)
+
+    def chainDeferred(self, *args, **kwargs):
+        self._deferred.chainDeferred(*args, **kwargs)
+
+    def pause(self):
+        self._deferred.pause()
+
+    def unpause(self):
+        self._deferred.unpause()
+
+
+class RTDeferredSessions(memory.Sessions):
+    """Real Time Deferred Sessions"""
+
+    def __init__(self, timeout=120):
+        """Manage list of Deferreds in real-time, allowing to get intermediate results
+
+        @param timeout (int): nb of seconds before deferreds cancellation
+        """
+        super(RTDeferredSessions, self).__init__(
+            timeout=timeout, resettable_timeout=False
+        )
+
+    def new_session(self, deferreds, profile):
+        """Launch a new session with a list of deferreds
+
+        @param deferreds(list[defer.Deferred]): list of deferred to call
+        @param profile: %(doc_profile)s
+        @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received
+        """
+        data = {KEY_NEXT: defer.Deferred()}
+        session_id, session_data = super(RTDeferredSessions, self).new_session(
+            data, profile=profile
+        )
+        if isinstance(deferreds, dict):
+            session_data[KEY_DEFERREDS] = list(deferreds.values())
+            iterator = iter(deferreds.items())
+        else:
+            session_data[KEY_DEFERREDS] = deferreds
+            iterator = enumerate(deferreds)
+
+        for idx, d in iterator:
+            d._RTDeferred_index = idx
+            d._RTDeferred_return = None
+            d.addCallback(self._callback, d, session_id, profile)
+            d.addErrback(self._errback, d, session_id, profile)
+        return session_id
+
+    def _purge_session(
+        self, session_id, reason="timeout", no_warning=False, got_result=False
+    ):
+        """Purge the session
+
+        @param session_id(str): id of the session to purge
+        @param reason (unicode): human readable reason why the session is purged
+        @param no_warning(bool): if True, no warning will be put in logs
+        @param got_result(bool): True if the session is purged after normal ending (i.e.: all the results have been gotten).
+            reason and no_warning are ignored if got_result is True.
+        @raise KeyError: session doesn't exists (anymore ?)
+        """
+        if not got_result:
+            try:
+                timer, session_data, profile = self._sessions[session_id]
+            except ValueError:
+                raise exceptions.InternalError(
+                    "was expecting timer, session_data and profile; is profile set ?"
+                )
+
+            # next_defer must be called before deferreds,
+            # else its callback will be called by _gotResult
+            next_defer = session_data[KEY_NEXT]
+            if not next_defer.called:
+                next_defer.errback(failure.Failure(defer.CancelledError(reason)))
+
+            deferreds = session_data[KEY_DEFERREDS]
+            for d in deferreds:
+                d.cancel()
+
+            if not no_warning:
+                log.warning(
+                    "RTDeferredList cancelled: {} (profile {})".format(reason, profile)
+                )
+
+        super(RTDeferredSessions, self)._purge_session(session_id)
+
+    def _gotResult(self, session_id, profile):
+        """Method called after each callback or errback
+
+        manage the next_defer deferred
+        """
+        session_data = self.profile_get(session_id, profile)
+        defer_next = session_data[KEY_NEXT]
+        if not defer_next.called:
+            defer_next.callback(None)
+
+    def _callback(self, result, deferred, session_id, profile):
+        deferred._RTDeferred_return = (True, result)
+        self._gotResult(session_id, profile)
+
+    def _errback(self, failure, deferred, session_id, profile):
+        deferred._RTDeferred_return = (False, failure)
+        self._gotResult(session_id, profile)
+
+    def cancel(self, session_id, reason="timeout", no_log=False):
+        """Stop this RTDeferredList
+
+        Cancel all remaining deferred, and call self.final_defer.errback
+        @param reason (unicode): reason of the cancellation
+        @param no_log(bool): if True, don't log the cancellation
+        """
+        self._purge_session(session_id, reason=reason, no_warning=no_log)
+
+    def get_results(
+        self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
+    ):
+        """Get current results of a real-time deferred session
+
+        result already gotten are deleted
+        @param session_id(str): session id
+        @param on_success: can be:
+            - None: add success normaly to results
+            - callable: replace result by the return value of on_success(result) (may be deferred)
+        @param on_error: can be:
+            - None: add error normaly to results
+            - C.IGNORE: don't put errors in results
+            - callable: replace failure by the return value of on_error(failure) (may be deferred)
+        @param profile=%(doc_profile)s
+        @param result(tuple): tuple(remaining, results) where:
+            - remaining[int] is the number of remaining deferred
+                (deferreds from which we don't have result yet)
+            - results is a dict where:
+                - key is the index of the deferred if deferred is a list, or its key if it's a dict
+                - value = (success, result) where:
+                    - success is True if the deferred was successful
+                    - result is the result in case of success, else the failure
+            If remaining == 0, the session is ended
+        @raise KeyError: the session is already finished or doesn't exists at all
+        """
+        if profile == C.PROF_KEY_NONE:
+            raise exceptions.ProfileNotSetError
+        session_data = self.profile_get(session_id, profile)
+
+        @defer.inlineCallbacks
+        def next_cb(__):
+            # we got one or several results
+            results = {}
+            filtered_data = []  # used to keep deferreds without results
+            deferreds = session_data[KEY_DEFERREDS]
+
+            for d in deferreds:
+                if (
+                    d._RTDeferred_return
+                ):  # we don't use d.called as called is True before the full callbacks chain has been called
+                    # we have a result
+                    idx = d._RTDeferred_index
+                    success, result = d._RTDeferred_return
+                    if success:
+                        if on_success is not None:
+                            if callable(on_success):
+                                result = yield on_success(result)
+                            else:
+                                raise exceptions.InternalError(
+                                    "Unknown value of on_success: {}".format(on_success)
+                                )
+
+                    else:
+                        if on_error is not None:
+                            if on_error == C.IGNORE:
+                                continue
+                            elif callable(on_error):
+                                result = yield on_error(result)
+                            else:
+                                raise exceptions.InternalError(
+                                    "Unknown value of on_error: {}".format(on_error)
+                                )
+                    results[idx] = (success, result)
+                else:
+                    filtered_data.append(d)
+
+            # we change the deferred with the filtered list
+            # in other terms, we don't want anymore deferred from which we have got the result
+            session_data[KEY_DEFERREDS] = filtered_data
+
+            if filtered_data:
+                # we create a new next_defer only if we are still waiting for results
+                session_data[KEY_NEXT] = defer.Deferred()
+            else:
+                # no more data to get, the result have been gotten,
+                # we can cleanly finish the session
+                self._purge_session(session_id, got_result=True)
+
+            defer.returnValue((len(filtered_data), results))
+
+        # we wait for a result
+        return session_data[KEY_NEXT].addCallback(next_cb)