Mercurial > libervia-backend
comparison libervia/backend/core/xmpp.py @ 4151:18026ce0819c
core (xmpp): message reception workflow refactoring:
- Call methods from a root async one instead of using Deferred callbacks chain.
- Use a queue to be sure to process messages in order.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:50:35 +0100 |
parents | bc7d45dedeb0 |
children | d67eaa684484 |
comparison
equal
deleted
inserted
replaced
4150:26534d959d2d | 4151:18026ce0819c |
---|---|
47 from libervia.backend.core.i18n import _ | 47 from libervia.backend.core.i18n import _ |
48 from libervia.backend.core.log import getLogger | 48 from libervia.backend.core.log import getLogger |
49 from libervia.backend.memory import cache | 49 from libervia.backend.memory import cache |
50 from libervia.backend.memory import encryption | 50 from libervia.backend.memory import encryption |
51 from libervia.backend.memory import persistent | 51 from libervia.backend.memory import persistent |
52 from libervia.backend.models.core import MessageData | |
52 from libervia.backend.tools import xml_tools | 53 from libervia.backend.tools import xml_tools |
53 from libervia.backend.tools import utils | 54 from libervia.backend.tools import utils |
54 from libervia.backend.tools.common import data_format | 55 from libervia.backend.tools.common import data_format |
55 | 56 |
56 log = getLogger(__name__) | 57 log = getLogger(__name__) |
1222 class SatMessageProtocol(xmppim.MessageProtocol): | 1223 class SatMessageProtocol(xmppim.MessageProtocol): |
1223 | 1224 |
1224 def __init__(self, host): | 1225 def __init__(self, host): |
1225 xmppim.MessageProtocol.__init__(self) | 1226 xmppim.MessageProtocol.__init__(self) |
1226 self.host = host | 1227 self.host = host |
1228 self.messages_queue = defer.DeferredQueue() | |
1229 | |
1230 def setHandlerParent(self, parent): | |
1231 super().setHandlerParent(parent) | |
1232 defer.ensureDeferred(self.process_messages()) | |
1227 | 1233 |
1228 @property | 1234 @property |
1229 def client(self): | 1235 def client(self): |
1230 return self.parent | 1236 return self.parent |
1231 | 1237 |
1233 if elt.uri == namespace: | 1239 if elt.uri == namespace: |
1234 elt.defaultUri = elt.uri = C.NS_CLIENT | 1240 elt.defaultUri = elt.uri = C.NS_CLIENT |
1235 for child in elt.elements(): | 1241 for child in elt.elements(): |
1236 self.normalize_ns(child, namespace) | 1242 self.normalize_ns(child, namespace) |
1237 | 1243 |
1238 def parse_message(self, message_elt): | 1244 def parse_message(self, message_elt: domish.Element) -> MessageData: |
1239 """Parse a message XML and return message_data | 1245 """Parse a message XML and return message_data |
1240 | 1246 |
1241 @param message_elt(domish.Element): raw <message> xml | 1247 @param message_elt(domish.Element): raw <message> xml |
1242 @param client(SatXMPPClient, None): client to map message id to uid | 1248 @param client(SatXMPPClient, None): client to map message id to uid |
1243 if None, mapping will not be done | 1249 if None, mapping will not be done |
1263 message_elt['to'] = client.jid.full() | 1269 message_elt['to'] = client.jid.full() |
1264 | 1270 |
1265 message = {} | 1271 message = {} |
1266 subject = {} | 1272 subject = {} |
1267 extra = {} | 1273 extra = {} |
1268 data = { | 1274 data: MessageData = { |
1269 "from": jid.JID(message_elt["from"]), | 1275 "from": jid.JID(message_elt["from"]), |
1270 "to": jid.JID(message_elt["to"]), | 1276 "to": jid.JID(message_elt["to"]), |
1271 "uid": message_elt.getAttribute( | 1277 "uid": message_elt.getAttribute( |
1272 "uid", str(uuid.uuid4()) | 1278 "uid", str(uuid.uuid4()) |
1273 ), # XXX: uid is not a standard attribute but may be added by plugins | 1279 ), # XXX: uid is not a standard attribute but may be added by plugins |
1314 data["delay_sender"] = parsed_delay.sender.full() | 1320 data["delay_sender"] = parsed_delay.sender.full() |
1315 | 1321 |
1316 self.host.trigger.point("message_parse", client, message_elt, data) | 1322 self.host.trigger.point("message_parse", client, message_elt, data) |
1317 return data | 1323 return data |
1318 | 1324 |
1319 def _on_message_start_workflow(self, cont, client, message_elt, post_treat): | 1325 |
1320 """Parse message and do post treatments | 1326 def onMessage(self, message_elt: domish.Element) -> None: |
1321 | 1327 message_elt._received_timestamp = time.time() |
1322 It is the first callback called after message_received trigger | 1328 self.messages_queue.put(message_elt) |
1323 @param cont(bool): workflow will continue only if this is True | 1329 |
1324 @param message_elt(domish.Element): message stanza | 1330 async def process_messages(self) -> None: |
1325 may have be modified by triggers | 1331 """Process message in order |
1326 @param post_treat(defer.Deferred): post parsing treatments | 1332 |
1327 """ | 1333 Messages are processed in a queue to avoid race conditions and ensure orderly |
1328 if not cont: | 1334 processing. |
1329 return | 1335 """ |
1330 data = self.parse_message(message_elt) | 1336 client = self.parent |
1331 post_treat.addCallback(self.complete_attachments) | 1337 if client is None: |
1332 post_treat.addCallback(self.skip_empty_message) | 1338 log.error("client should not be None!") |
1333 if not client.is_component or client.receiveHistory: | 1339 raise exceptions.InternalError() |
1334 post_treat.addCallback( | 1340 while True: |
1335 lambda ret: defer.ensureDeferred(self.add_to_history(ret)) | 1341 message_elt = await self.messages_queue.get() |
1336 ) | 1342 try: |
1337 if not client.is_component: | 1343 await self.process_message(client, message_elt) |
1338 post_treat.addCallback(self.bridge_signal, data) | 1344 except Exception: |
1339 post_treat.addErrback(self.cancel_error_trap) | 1345 log.exception(f"Can't process message {message_elt.toXml()}") |
1340 post_treat.callback(data) | 1346 |
1341 | 1347 async def process_message( |
1342 def onMessage(self, message_elt): | 1348 self, |
1349 client: SatXMPPEntity, | |
1350 message_elt: domish.Element | |
1351 ) -> None: | |
1343 # TODO: handle threads | 1352 # TODO: handle threads |
1344 message_elt._received_timestamp = time.time() | |
1345 client = self.parent | |
1346 if not "from" in message_elt.attributes: | 1353 if not "from" in message_elt.attributes: |
1347 message_elt["from"] = client.jid.host | 1354 message_elt["from"] = client.jid.host |
1348 log.debug(_("got message from: {from_}").format(from_=message_elt["from"])) | 1355 log.debug(_("got message from: {from_}").format(from_=message_elt["from"])) |
1349 if self.client.is_component and message_elt.uri == component.NS_COMPONENT_ACCEPT: | 1356 if self.client.is_component and message_elt.uri == component.NS_COMPONENT_ACCEPT: |
1350 # we use client namespace all the time to simplify parsing | 1357 # we use client namespace all the time to simplify parsing |
1351 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT) | 1358 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT) |
1352 | 1359 |
1353 # plugin can add their treatments to this deferred | 1360 # plugin can add their treatments to this deferred |
1354 post_treat = defer.Deferred() | 1361 post_treat = defer.Deferred() |
1355 | 1362 if not await self.host.trigger.async_point( |
1356 d = self.host.trigger.async_point( | |
1357 "message_received", client, message_elt, post_treat | 1363 "message_received", client, message_elt, post_treat |
1358 ) | 1364 ): |
1359 | 1365 return |
1360 d.addCallback(self._on_message_start_workflow, client, message_elt, post_treat) | 1366 try: |
1361 | 1367 data = self.parse_message(message_elt) |
1362 def complete_attachments(self, data): | 1368 |
1369 self.complete_attachments(data) | |
1370 if not data["message"] and not data["extra"] and not data["subject"]: | |
1371 raise exceptions.CancelError("Cancelled empty message") | |
1372 if not client.is_component or client.receiveHistory: | |
1373 await self.add_to_history(data) | |
1374 if not client.is_component: | |
1375 self.bridge_signal(data) | |
1376 except exceptions.CancelError: | |
1377 pass | |
1378 | |
1379 def complete_attachments(self, data: MessageData) -> MessageData: | |
1363 """Complete missing metadata of attachments""" | 1380 """Complete missing metadata of attachments""" |
1364 for attachment in data['extra'].get(C.KEY_ATTACHMENTS, []): | 1381 for attachment in data['extra'].get(C.KEY_ATTACHMENTS, []): |
1365 if "name" not in attachment and "url" in attachment: | 1382 if "name" not in attachment and "url" in attachment: |
1366 name = (Path(unquote(urlparse(attachment['url']).path)).name | 1383 name = (Path(unquote(urlparse(attachment['url']).path)).name |
1367 or C.FILE_DEFAULT_NAME) | 1384 or C.FILE_DEFAULT_NAME) |
1369 if ((C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment | 1386 if ((C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment |
1370 and "name" in attachment)): | 1387 and "name" in attachment)): |
1371 media_type = mimetypes.guess_type(attachment['name'], strict=False)[0] | 1388 media_type = mimetypes.guess_type(attachment['name'], strict=False)[0] |
1372 if media_type: | 1389 if media_type: |
1373 attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type | 1390 attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type |
1374 | |
1375 return data | 1391 return data |
1376 | 1392 |
1377 def skip_empty_message(self, data): | 1393 async def add_to_history(self, data: MessageData) -> MessageData: |
1378 if not data["message"] and not data["extra"] and not data["subject"]: | |
1379 raise failure.Failure(exceptions.CancelError("Cancelled empty message")) | |
1380 return data | |
1381 | |
1382 async def add_to_history(self, data): | |
1383 if data.pop("history", None) == C.HISTORY_SKIP: | 1394 if data.pop("history", None) == C.HISTORY_SKIP: |
1384 log.debug("history is skipped as requested") | 1395 log.debug("history is skipped as requested") |
1385 data["extra"]["history"] = C.HISTORY_SKIP | 1396 data["extra"]["history"] = C.HISTORY_SKIP |
1386 else: | 1397 else: |
1387 # we need a message to store | 1398 # we need a message to store |
1388 if self.parent.is_message_printable(data): | 1399 if self.parent.is_message_printable(data): |
1389 return await self.host.memory.add_to_history(self.parent, data) | 1400 return await self.host.memory.add_to_history(self.parent, data) |
1390 else: | 1401 else: |
1391 log.debug("not storing empty message to history: {data}" | 1402 log.debug("not storing empty message to history: {data}" |
1392 .format(data=data)) | 1403 .format(data=data)) |
1393 | 1404 return data |
1394 def bridge_signal(self, __, data): | 1405 |
1406 def bridge_signal(self, data: MessageData) -> MessageData: | |
1407 """Send signal to frontends for the given message""" | |
1395 try: | 1408 try: |
1396 data["extra"]["received_timestamp"] = str(data["received_timestamp"]) | 1409 data["extra"]["received_timestamp"] = str(data["received_timestamp"]) |
1397 data["extra"]["delay_sender"] = data["delay_sender"] | 1410 data["extra"]["delay_sender"] = data["delay_sender"] |
1398 except KeyError: | 1411 except KeyError: |
1399 pass | 1412 pass |
1414 ) | 1427 ) |
1415 else: | 1428 else: |
1416 log.debug("Discarding bridge signal for empty message: {data}".format( | 1429 log.debug("Discarding bridge signal for empty message: {data}".format( |
1417 data=data)) | 1430 data=data)) |
1418 return data | 1431 return data |
1419 | |
1420 def cancel_error_trap(self, failure_): | |
1421 """A message sending can be cancelled by a plugin treatment""" | |
1422 failure_.trap(exceptions.CancelError) | |
1423 | 1432 |
1424 | 1433 |
1425 class LiberviaRosterProtocol(xmppim.RosterClientProtocol): | 1434 class LiberviaRosterProtocol(xmppim.RosterClientProtocol): |
1426 | 1435 |
1427 def __init__(self, host): | 1436 def __init__(self, host): |