Mercurial > libervia-backend
comparison libervia/backend/core/xmpp.py @ 4230:314d3c02bb67
core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:21:04 +0200 |
parents | 4dc00e848961 |
children | 2417ad1d0f23 |
comparison
equal
deleted
inserted
replaced
4229:dd9bc7d791d7 | 4230:314d3c02bb67 |
---|---|
14 # GNU Affero General Public License for more details. | 14 # GNU Affero General Public License for more details. |
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 import asyncio | |
19 import calendar | 20 import calendar |
20 import copy | 21 import copy |
21 from functools import partial | 22 from functools import partial |
22 import mimetypes | 23 import mimetypes |
23 from pathlib import Path | 24 from pathlib import Path |
26 from typing import Callable, Dict, Tuple, Optional | 27 from typing import Callable, Dict, Tuple, Optional |
27 from urllib.parse import unquote, urlparse | 28 from urllib.parse import unquote, urlparse |
28 import uuid | 29 import uuid |
29 | 30 |
30 import shortuuid | 31 import shortuuid |
31 from twisted.internet import defer, error as internet_error | 32 from twisted.internet import defer, error as internet_error, reactor |
32 from twisted.internet import ssl | 33 from twisted.internet import ssl |
33 from twisted.python import failure | 34 from twisted.python import failure |
34 from twisted.words.protocols.jabber import xmlstream | 35 from twisted.words.protocols.jabber import xmlstream |
35 from twisted.words.protocols.jabber import error | 36 from twisted.words.protocols.jabber import error |
36 from twisted.words.protocols.jabber import jid | 37 from twisted.words.protocols.jabber import jid |
1343 try: | 1344 try: |
1344 await self.process_message(client, message_elt) | 1345 await self.process_message(client, message_elt) |
1345 except Exception: | 1346 except Exception: |
1346 log.exception(f"Can't process message {message_elt.toXml()}") | 1347 log.exception(f"Can't process message {message_elt.toXml()}") |
1347 | 1348 |
1349 def _on_processing_timeout( | |
1350 self, | |
1351 message_elt: domish.Element, | |
1352 async_point_d: defer.Deferred | |
1353 ) -> None: | |
1354 log.error( | |
1355 "Processing of following message took too long, cancelling:" | |
1356 f"{message_elt.toXml()}" | |
1357 ) | |
1358 async_point_d.cancel() | |
1359 | |
1348 async def process_message( | 1360 async def process_message( |
1349 self, | 1361 self, |
1350 client: SatXMPPEntity, | 1362 client: SatXMPPEntity, |
1351 message_elt: domish.Element | 1363 message_elt: domish.Element |
1352 ) -> None: | 1364 ) -> None: |
1358 # we use client namespace all the time to simplify parsing | 1370 # we use client namespace all the time to simplify parsing |
1359 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT) | 1371 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT) |
1360 | 1372 |
1361 # plugin can add their treatments to this deferred | 1373 # plugin can add their treatments to this deferred |
1362 post_treat = defer.Deferred() | 1374 post_treat = defer.Deferred() |
1363 if not await self.host.trigger.async_point( | 1375 async_point_d = defer.ensureDeferred(self.host.trigger.async_point( |
1364 "message_received", client, message_elt, post_treat | 1376 "message_received", client, message_elt, post_treat |
1365 ): | 1377 )) |
1366 return | 1378 # message_received triggers block the messages queue, so they must not take too |
1379 # long to proceed. | |
1380 delayed_call = reactor.callLater( | |
1381 10, | |
1382 self._on_processing_timeout, | |
1383 message_elt, | |
1384 async_point_d | |
1385 ) | |
1386 await async_point_d | |
1387 if delayed_call.active(): | |
1388 delayed_call.cancel() | |
1389 log.debug(f"delayed_call for {async_point_d} cancelled") | |
1367 try: | 1390 try: |
1368 data = self.parse_message(message_elt) | 1391 data = self.parse_message(message_elt) |
1369 # we now do all post treatments added by plugins | 1392 # we now do all post treatments added by plugins |
1370 post_treat.callback(data) | 1393 post_treat.callback(data) |
1371 await post_treat | 1394 await post_treat |