Mercurial > libervia-backend
changeset 4230:314d3c02bb67
core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:21:04 +0200 |
parents | dd9bc7d791d7 |
children | e11b13418ba6 |
files | libervia/backend/core/xmpp.py |
diffstat | 1 files changed, 27 insertions(+), 4 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/core/xmpp.py Sat Apr 06 12:14:10 2024 +0200 +++ b/libervia/backend/core/xmpp.py Sat Apr 06 12:21:04 2024 +0200 @@ -16,6 +16,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import asyncio import calendar import copy from functools import partial @@ -28,7 +29,7 @@ import uuid import shortuuid -from twisted.internet import defer, error as internet_error +from twisted.internet import defer, error as internet_error, reactor from twisted.internet import ssl from twisted.python import failure from twisted.words.protocols.jabber import xmlstream @@ -1345,6 +1346,17 @@ except Exception: log.exception(f"Can't process message {message_elt.toXml()}") + def _on_processing_timeout( + self, + message_elt: domish.Element, + async_point_d: defer.Deferred + ) -> None: + log.error( + "Processing of following message took too long, cancelling:" + f"{message_elt.toXml()}" + ) + async_point_d.cancel() + async def process_message( self, client: SatXMPPEntity, @@ -1360,10 +1372,21 @@ # plugin can add their treatments to this deferred post_treat = defer.Deferred() - if not await self.host.trigger.async_point( + async_point_d = defer.ensureDeferred(self.host.trigger.async_point( "message_received", client, message_elt, post_treat - ): - return + )) + # message_received triggers block the messages queue, so they must not take too + # long to proceed. + delayed_call = reactor.callLater( + 10, + self._on_processing_timeout, + message_elt, + async_point_d + ) + await async_point_d + if delayed_call.active(): + delayed_call.cancel() + log.debug(f"delayed_call for {async_point_d} cancelled") try: data = self.parse_message(message_elt) # we now do all post treatments added by plugins