changeset 4230:314d3c02bb67

core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:21:04 +0200 (9 months ago)
parents dd9bc7d791d7
children e11b13418ba6
files libervia/backend/core/xmpp.py
diffstat 1 files changed, 27 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/core/xmpp.py	Sat Apr 06 12:14:10 2024 +0200
+++ b/libervia/backend/core/xmpp.py	Sat Apr 06 12:21:04 2024 +0200
@@ -16,6 +16,7 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import asyncio
 import calendar
 import copy
 from functools import partial
@@ -28,7 +29,7 @@
 import uuid
 
 import shortuuid
-from twisted.internet import defer, error as internet_error
+from twisted.internet import defer, error as internet_error, reactor
 from twisted.internet import ssl
 from twisted.python import failure
 from twisted.words.protocols.jabber import xmlstream
@@ -1345,6 +1346,17 @@
             except Exception:
                 log.exception(f"Can't process message {message_elt.toXml()}")
 
+    def _on_processing_timeout(
+        self,
+        message_elt: domish.Element,
+        async_point_d: defer.Deferred
+    ) -> None:
+        log.error(
+            "Processing of following message took too long, cancelling:"
+            f"{message_elt.toXml()}"
+        )
+        async_point_d.cancel()
+
     async def process_message(
         self,
         client: SatXMPPEntity,
@@ -1360,10 +1372,21 @@
 
         # plugin can add their treatments to this deferred
         post_treat = defer.Deferred()
-        if not await self.host.trigger.async_point(
+        async_point_d = defer.ensureDeferred(self.host.trigger.async_point(
             "message_received", client, message_elt, post_treat
-        ):
-            return
+        ))
+        # message_received triggers block the messages queue, so they must not take too
+        # long to proceed.
+        delayed_call = reactor.callLater(
+            10,
+            self._on_processing_timeout,
+            message_elt,
+            async_point_d
+        )
+        await async_point_d
+        if delayed_call.active():
+            delayed_call.cancel()
+            log.debug(f"delayed_call for {async_point_d} cancelled")
         try:
             data = self.parse_message(message_elt)
             # we now do all post treatments added by plugins