view sat/tools/sat_defer.py @ 3891:989df1047c3c

tests (AP gateway): reactions tests: 2 new tests check XMPP <=> AP reactions conversion rel 371
author Goffi <goffi@goffi.org>
date Wed, 31 Aug 2022 17:07:03 +0200
parents 7550ae9cfbac
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3


# Libervia: an XMPP client
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""tools related to deferred"""

from sat.core.log import getLogger

log = getLogger(__name__)
from sat.core import exceptions
from twisted.internet import defer
from twisted.internet import error as internet_error
from twisted.internet import reactor
from twisted.words.protocols.jabber import error as jabber_error
from twisted.python import failure
from sat.core.constants import Const as C
from sat.memory import memory

KEY_DEFERREDS = "deferreds"
KEY_NEXT = "next_defer"


def stanza2NotFound(failure_):
    """Convert item-not-found StanzaError to exceptions.NotFound"""
    failure_.trap(jabber_error.StanzaError)
    if failure_.value.condition == 'item-not-found':
        raise exceptions.NotFound(failure_.value.text or failure_.value.condition)
    return failure_


class DelayedDeferred(object):
    """A Deferred-like which is launched after a delay"""

    def __init__(self, delay, result):
        """
        @param delay(float): delay before launching the callback, in seconds
        @param result: result used with the callback
        """
        self._deferred = defer.Deferred()
        self._timer = reactor.callLater(delay, self._deferred.callback, result)

    def cancel(self):
        try:
            self._timer.cancel()
        except internet_error.AlreadyCalled:
            pass
        self._deferred.cancel()

    def addCallbacks(self, *args, **kwargs):
        self._deferred.addCallbacks(*args, **kwargs)

    def addCallback(self, *args, **kwargs):
        self._deferred.addCallback(*args, **kwargs)

    def addErrback(self, *args, **kwargs):
        self._deferred.addErrback(*args, **kwargs)

    def addBoth(self, *args, **kwargs):
        self._deferred.addBoth(*args, **kwargs)

    def chainDeferred(self, *args, **kwargs):
        self._deferred.chainDeferred(*args, **kwargs)

    def pause(self):
        self._deferred.pause()

    def unpause(self):
        self._deferred.unpause()


class RTDeferredSessions(memory.Sessions):
    """Real Time Deferred Sessions"""

    def __init__(self, timeout=120):
        """Manage list of Deferreds in real-time, allowing to get intermediate results

        @param timeout (int): nb of seconds before deferreds cancellation
        """
        super(RTDeferredSessions, self).__init__(
            timeout=timeout, resettable_timeout=False
        )

    def newSession(self, deferreds, profile):
        """Launch a new session with a list of deferreds

        @param deferreds(list[defer.Deferred]): list of deferred to call
        @param profile: %(doc_profile)s
        @param return (tupe[str, defer.Deferred]): tuple with session id and a deferred wich fire *WITHOUT RESULT* when all results are received
        """
        data = {KEY_NEXT: defer.Deferred()}
        session_id, session_data = super(RTDeferredSessions, self).newSession(
            data, profile=profile
        )
        if isinstance(deferreds, dict):
            session_data[KEY_DEFERREDS] = list(deferreds.values())
            iterator = iter(deferreds.items())
        else:
            session_data[KEY_DEFERREDS] = deferreds
            iterator = enumerate(deferreds)

        for idx, d in iterator:
            d._RTDeferred_index = idx
            d._RTDeferred_return = None
            d.addCallback(self._callback, d, session_id, profile)
            d.addErrback(self._errback, d, session_id, profile)
        return session_id

    def _purgeSession(
        self, session_id, reason="timeout", no_warning=False, got_result=False
    ):
        """Purge the session

        @param session_id(str): id of the session to purge
        @param reason (unicode): human readable reason why the session is purged
        @param no_warning(bool): if True, no warning will be put in logs
        @param got_result(bool): True if the session is purged after normal ending (i.e.: all the results have been gotten).
            reason and no_warning are ignored if got_result is True.
        @raise KeyError: session doesn't exists (anymore ?)
        """
        if not got_result:
            try:
                timer, session_data, profile = self._sessions[session_id]
            except ValueError:
                raise exceptions.InternalError(
                    "was expecting timer, session_data and profile; is profile set ?"
                )

            # next_defer must be called before deferreds,
            # else its callback will be called by _gotResult
            next_defer = session_data[KEY_NEXT]
            if not next_defer.called:
                next_defer.errback(failure.Failure(defer.CancelledError(reason)))

            deferreds = session_data[KEY_DEFERREDS]
            for d in deferreds:
                d.cancel()

            if not no_warning:
                log.warning(
                    "RTDeferredList cancelled: {} (profile {})".format(reason, profile)
                )

        super(RTDeferredSessions, self)._purgeSession(session_id)

    def _gotResult(self, session_id, profile):
        """Method called after each callback or errback

        manage the next_defer deferred
        """
        session_data = self.profileGet(session_id, profile)
        defer_next = session_data[KEY_NEXT]
        if not defer_next.called:
            defer_next.callback(None)

    def _callback(self, result, deferred, session_id, profile):
        deferred._RTDeferred_return = (True, result)
        self._gotResult(session_id, profile)

    def _errback(self, failure, deferred, session_id, profile):
        deferred._RTDeferred_return = (False, failure)
        self._gotResult(session_id, profile)

    def cancel(self, session_id, reason="timeout", no_log=False):
        """Stop this RTDeferredList

        Cancel all remaining deferred, and call self.final_defer.errback
        @param reason (unicode): reason of the cancellation
        @param no_log(bool): if True, don't log the cancellation
        """
        self._purgeSession(session_id, reason=reason, no_warning=no_log)

    def getResults(
        self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
    ):
        """Get current results of a real-time deferred session

        result already gotten are deleted
        @param session_id(str): session id
        @param on_success: can be:
            - None: add success normaly to results
            - callable: replace result by the return value of on_success(result) (may be deferred)
        @param on_error: can be:
            - None: add error normaly to results
            - C.IGNORE: don't put errors in results
            - callable: replace failure by the return value of on_error(failure) (may be deferred)
        @param profile=%(doc_profile)s
        @param result(tuple): tuple(remaining, results) where:
            - remaining[int] is the number of remaining deferred
                (deferreds from which we don't have result yet)
            - results is a dict where:
                - key is the index of the deferred if deferred is a list, or its key if it's a dict
                - value = (success, result) where:
                    - success is True if the deferred was successful
                    - result is the result in case of success, else the failure
            If remaining == 0, the session is ended
        @raise KeyError: the session is already finished or doesn't exists at all
        """
        if profile == C.PROF_KEY_NONE:
            raise exceptions.ProfileNotSetError
        session_data = self.profileGet(session_id, profile)

        @defer.inlineCallbacks
        def next_cb(__):
            # we got one or several results
            results = {}
            filtered_data = []  # used to keep deferreds without results
            deferreds = session_data[KEY_DEFERREDS]

            for d in deferreds:
                if (
                    d._RTDeferred_return
                ):  # we don't use d.called as called is True before the full callbacks chain has been called
                    # we have a result
                    idx = d._RTDeferred_index
                    success, result = d._RTDeferred_return
                    if success:
                        if on_success is not None:
                            if callable(on_success):
                                result = yield on_success(result)
                            else:
                                raise exceptions.InternalError(
                                    "Unknown value of on_success: {}".format(on_success)
                                )

                    else:
                        if on_error is not None:
                            if on_error == C.IGNORE:
                                continue
                            elif callable(on_error):
                                result = yield on_error(result)
                            else:
                                raise exceptions.InternalError(
                                    "Unknown value of on_error: {}".format(on_error)
                                )
                    results[idx] = (success, result)
                else:
                    filtered_data.append(d)

            # we change the deferred with the filtered list
            # in other terms, we don't want anymore deferred from which we have got the result
            session_data[KEY_DEFERREDS] = filtered_data

            if filtered_data:
                # we create a new next_defer only if we are still waiting for results
                session_data[KEY_NEXT] = defer.Deferred()
            else:
                # no more data to get, the result have been gotten,
                # we can cleanly finish the session
                self._purgeSession(session_id, got_result=True)

            defer.returnValue((len(filtered_data), results))

        # we wait for a result
        return session_data[KEY_NEXT].addCallback(next_cb)