comparison libervia/backend/core/xmpp.py @ 4230:314d3c02bb67

core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:21:04 +0200
parents 4dc00e848961
children 2417ad1d0f23
comparison
equal deleted inserted replaced
4229:dd9bc7d791d7 4230:314d3c02bb67
14 # GNU Affero General Public License for more details. 14 # GNU Affero General Public License for more details.
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import asyncio
19 import calendar 20 import calendar
20 import copy 21 import copy
21 from functools import partial 22 from functools import partial
22 import mimetypes 23 import mimetypes
23 from pathlib import Path 24 from pathlib import Path
26 from typing import Callable, Dict, Tuple, Optional 27 from typing import Callable, Dict, Tuple, Optional
27 from urllib.parse import unquote, urlparse 28 from urllib.parse import unquote, urlparse
28 import uuid 29 import uuid
29 30
30 import shortuuid 31 import shortuuid
31 from twisted.internet import defer, error as internet_error 32 from twisted.internet import defer, error as internet_error, reactor
32 from twisted.internet import ssl 33 from twisted.internet import ssl
33 from twisted.python import failure 34 from twisted.python import failure
34 from twisted.words.protocols.jabber import xmlstream 35 from twisted.words.protocols.jabber import xmlstream
35 from twisted.words.protocols.jabber import error 36 from twisted.words.protocols.jabber import error
36 from twisted.words.protocols.jabber import jid 37 from twisted.words.protocols.jabber import jid
1343 try: 1344 try:
1344 await self.process_message(client, message_elt) 1345 await self.process_message(client, message_elt)
1345 except Exception: 1346 except Exception:
1346 log.exception(f"Can't process message {message_elt.toXml()}") 1347 log.exception(f"Can't process message {message_elt.toXml()}")
1347 1348
1349 def _on_processing_timeout(
1350 self,
1351 message_elt: domish.Element,
1352 async_point_d: defer.Deferred
1353 ) -> None:
1354 log.error(
1355 "Processing of following message took too long, cancelling:"
1356 f"{message_elt.toXml()}"
1357 )
1358 async_point_d.cancel()
1359
1348 async def process_message( 1360 async def process_message(
1349 self, 1361 self,
1350 client: SatXMPPEntity, 1362 client: SatXMPPEntity,
1351 message_elt: domish.Element 1363 message_elt: domish.Element
1352 ) -> None: 1364 ) -> None:
1358 # we use client namespace all the time to simplify parsing 1370 # we use client namespace all the time to simplify parsing
1359 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT) 1371 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT)
1360 1372
1361 # plugin can add their treatments to this deferred 1373 # plugin can add their treatments to this deferred
1362 post_treat = defer.Deferred() 1374 post_treat = defer.Deferred()
1363 if not await self.host.trigger.async_point( 1375 async_point_d = defer.ensureDeferred(self.host.trigger.async_point(
1364 "message_received", client, message_elt, post_treat 1376 "message_received", client, message_elt, post_treat
1365 ): 1377 ))
1366 return 1378 # message_received triggers block the messages queue, so they must not take too
1379 # long to proceed.
1380 delayed_call = reactor.callLater(
1381 10,
1382 self._on_processing_timeout,
1383 message_elt,
1384 async_point_d
1385 )
1386 await async_point_d
1387 if delayed_call.active():
1388 delayed_call.cancel()
1389 log.debug(f"delayed_call for {async_point_d} cancelled")
1367 try: 1390 try:
1368 data = self.parse_message(message_elt) 1391 data = self.parse_message(message_elt)
1369 # we now do all post treatments added by plugins 1392 # we now do all post treatments added by plugins
1370 post_treat.callback(data) 1393 post_treat.callback(data)
1371 await post_treat 1394 await post_treat