comparison libervia/backend/core/xmpp.py @ 4151:18026ce0819c

core (xmpp): message reception workflow refactoring: - Call methods from a root async one instead of using Deferred callbacks chain. - Use a queue to be sure to process messages in order.
author Goffi <goffi@goffi.org>
date Wed, 22 Nov 2023 14:50:35 +0100
parents bc7d45dedeb0
children d67eaa684484
comparison
equal deleted inserted replaced
4150:26534d959d2d 4151:18026ce0819c
47 from libervia.backend.core.i18n import _ 47 from libervia.backend.core.i18n import _
48 from libervia.backend.core.log import getLogger 48 from libervia.backend.core.log import getLogger
49 from libervia.backend.memory import cache 49 from libervia.backend.memory import cache
50 from libervia.backend.memory import encryption 50 from libervia.backend.memory import encryption
51 from libervia.backend.memory import persistent 51 from libervia.backend.memory import persistent
52 from libervia.backend.models.core import MessageData
52 from libervia.backend.tools import xml_tools 53 from libervia.backend.tools import xml_tools
53 from libervia.backend.tools import utils 54 from libervia.backend.tools import utils
54 from libervia.backend.tools.common import data_format 55 from libervia.backend.tools.common import data_format
55 56
56 log = getLogger(__name__) 57 log = getLogger(__name__)
1222 class SatMessageProtocol(xmppim.MessageProtocol): 1223 class SatMessageProtocol(xmppim.MessageProtocol):
1223 1224
1224 def __init__(self, host): 1225 def __init__(self, host):
1225 xmppim.MessageProtocol.__init__(self) 1226 xmppim.MessageProtocol.__init__(self)
1226 self.host = host 1227 self.host = host
1228 self.messages_queue = defer.DeferredQueue()
1229
1230 def setHandlerParent(self, parent):
1231 super().setHandlerParent(parent)
1232 defer.ensureDeferred(self.process_messages())
1227 1233
1228 @property 1234 @property
1229 def client(self): 1235 def client(self):
1230 return self.parent 1236 return self.parent
1231 1237
1233 if elt.uri == namespace: 1239 if elt.uri == namespace:
1234 elt.defaultUri = elt.uri = C.NS_CLIENT 1240 elt.defaultUri = elt.uri = C.NS_CLIENT
1235 for child in elt.elements(): 1241 for child in elt.elements():
1236 self.normalize_ns(child, namespace) 1242 self.normalize_ns(child, namespace)
1237 1243
1238 def parse_message(self, message_elt): 1244 def parse_message(self, message_elt: domish.Element) -> MessageData:
1239 """Parse a message XML and return message_data 1245 """Parse a message XML and return message_data
1240 1246
1241 @param message_elt(domish.Element): raw <message> xml 1247 @param message_elt(domish.Element): raw <message> xml
1242 @param client(SatXMPPClient, None): client to map message id to uid 1248 @param client(SatXMPPClient, None): client to map message id to uid
1243 if None, mapping will not be done 1249 if None, mapping will not be done
1263 message_elt['to'] = client.jid.full() 1269 message_elt['to'] = client.jid.full()
1264 1270
1265 message = {} 1271 message = {}
1266 subject = {} 1272 subject = {}
1267 extra = {} 1273 extra = {}
1268 data = { 1274 data: MessageData = {
1269 "from": jid.JID(message_elt["from"]), 1275 "from": jid.JID(message_elt["from"]),
1270 "to": jid.JID(message_elt["to"]), 1276 "to": jid.JID(message_elt["to"]),
1271 "uid": message_elt.getAttribute( 1277 "uid": message_elt.getAttribute(
1272 "uid", str(uuid.uuid4()) 1278 "uid", str(uuid.uuid4())
1273 ), # XXX: uid is not a standard attribute but may be added by plugins 1279 ), # XXX: uid is not a standard attribute but may be added by plugins
1314 data["delay_sender"] = parsed_delay.sender.full() 1320 data["delay_sender"] = parsed_delay.sender.full()
1315 1321
1316 self.host.trigger.point("message_parse", client, message_elt, data) 1322 self.host.trigger.point("message_parse", client, message_elt, data)
1317 return data 1323 return data
1318 1324
1319 def _on_message_start_workflow(self, cont, client, message_elt, post_treat): 1325
1320 """Parse message and do post treatments 1326 def onMessage(self, message_elt: domish.Element) -> None:
1321 1327 message_elt._received_timestamp = time.time()
1322 It is the first callback called after message_received trigger 1328 self.messages_queue.put(message_elt)
1323 @param cont(bool): workflow will continue only if this is True 1329
1324 @param message_elt(domish.Element): message stanza 1330 async def process_messages(self) -> None:
1325 may have be modified by triggers 1331 """Process message in order
1326 @param post_treat(defer.Deferred): post parsing treatments 1332
1327 """ 1333 Messages are processed in a queue to avoid race conditions and ensure orderly
1328 if not cont: 1334 processing.
1329 return 1335 """
1330 data = self.parse_message(message_elt) 1336 client = self.parent
1331 post_treat.addCallback(self.complete_attachments) 1337 if client is None:
1332 post_treat.addCallback(self.skip_empty_message) 1338 log.error("client should not be None!")
1333 if not client.is_component or client.receiveHistory: 1339 raise exceptions.InternalError()
1334 post_treat.addCallback( 1340 while True:
1335 lambda ret: defer.ensureDeferred(self.add_to_history(ret)) 1341 message_elt = await self.messages_queue.get()
1336 ) 1342 try:
1337 if not client.is_component: 1343 await self.process_message(client, message_elt)
1338 post_treat.addCallback(self.bridge_signal, data) 1344 except Exception:
1339 post_treat.addErrback(self.cancel_error_trap) 1345 log.exception(f"Can't process message {message_elt.toXml()}")
1340 post_treat.callback(data) 1346
1341 1347 async def process_message(
1342 def onMessage(self, message_elt): 1348 self,
1349 client: SatXMPPEntity,
1350 message_elt: domish.Element
1351 ) -> None:
1343 # TODO: handle threads 1352 # TODO: handle threads
1344 message_elt._received_timestamp = time.time()
1345 client = self.parent
1346 if not "from" in message_elt.attributes: 1353 if not "from" in message_elt.attributes:
1347 message_elt["from"] = client.jid.host 1354 message_elt["from"] = client.jid.host
1348 log.debug(_("got message from: {from_}").format(from_=message_elt["from"])) 1355 log.debug(_("got message from: {from_}").format(from_=message_elt["from"]))
1349 if self.client.is_component and message_elt.uri == component.NS_COMPONENT_ACCEPT: 1356 if self.client.is_component and message_elt.uri == component.NS_COMPONENT_ACCEPT:
1350 # we use client namespace all the time to simplify parsing 1357 # we use client namespace all the time to simplify parsing
1351 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT) 1358 self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT)
1352 1359
1353 # plugin can add their treatments to this deferred 1360 # plugin can add their treatments to this deferred
1354 post_treat = defer.Deferred() 1361 post_treat = defer.Deferred()
1355 1362 if not await self.host.trigger.async_point(
1356 d = self.host.trigger.async_point(
1357 "message_received", client, message_elt, post_treat 1363 "message_received", client, message_elt, post_treat
1358 ) 1364 ):
1359 1365 return
1360 d.addCallback(self._on_message_start_workflow, client, message_elt, post_treat) 1366 try:
1361 1367 data = self.parse_message(message_elt)
1362 def complete_attachments(self, data): 1368
1369 self.complete_attachments(data)
1370 if not data["message"] and not data["extra"] and not data["subject"]:
1371 raise exceptions.CancelError("Cancelled empty message")
1372 if not client.is_component or client.receiveHistory:
1373 await self.add_to_history(data)
1374 if not client.is_component:
1375 self.bridge_signal(data)
1376 except exceptions.CancelError:
1377 pass
1378
1379 def complete_attachments(self, data: MessageData) -> MessageData:
1363 """Complete missing metadata of attachments""" 1380 """Complete missing metadata of attachments"""
1364 for attachment in data['extra'].get(C.KEY_ATTACHMENTS, []): 1381 for attachment in data['extra'].get(C.KEY_ATTACHMENTS, []):
1365 if "name" not in attachment and "url" in attachment: 1382 if "name" not in attachment and "url" in attachment:
1366 name = (Path(unquote(urlparse(attachment['url']).path)).name 1383 name = (Path(unquote(urlparse(attachment['url']).path)).name
1367 or C.FILE_DEFAULT_NAME) 1384 or C.FILE_DEFAULT_NAME)
1369 if ((C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment 1386 if ((C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment
1370 and "name" in attachment)): 1387 and "name" in attachment)):
1371 media_type = mimetypes.guess_type(attachment['name'], strict=False)[0] 1388 media_type = mimetypes.guess_type(attachment['name'], strict=False)[0]
1372 if media_type: 1389 if media_type:
1373 attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type 1390 attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type
1374
1375 return data 1391 return data
1376 1392
1377 def skip_empty_message(self, data): 1393 async def add_to_history(self, data: MessageData) -> MessageData:
1378 if not data["message"] and not data["extra"] and not data["subject"]:
1379 raise failure.Failure(exceptions.CancelError("Cancelled empty message"))
1380 return data
1381
1382 async def add_to_history(self, data):
1383 if data.pop("history", None) == C.HISTORY_SKIP: 1394 if data.pop("history", None) == C.HISTORY_SKIP:
1384 log.debug("history is skipped as requested") 1395 log.debug("history is skipped as requested")
1385 data["extra"]["history"] = C.HISTORY_SKIP 1396 data["extra"]["history"] = C.HISTORY_SKIP
1386 else: 1397 else:
1387 # we need a message to store 1398 # we need a message to store
1388 if self.parent.is_message_printable(data): 1399 if self.parent.is_message_printable(data):
1389 return await self.host.memory.add_to_history(self.parent, data) 1400 return await self.host.memory.add_to_history(self.parent, data)
1390 else: 1401 else:
1391 log.debug("not storing empty message to history: {data}" 1402 log.debug("not storing empty message to history: {data}"
1392 .format(data=data)) 1403 .format(data=data))
1393 1404 return data
1394 def bridge_signal(self, __, data): 1405
1406 def bridge_signal(self, data: MessageData) -> MessageData:
1407 """Send signal to frontends for the given message"""
1395 try: 1408 try:
1396 data["extra"]["received_timestamp"] = str(data["received_timestamp"]) 1409 data["extra"]["received_timestamp"] = str(data["received_timestamp"])
1397 data["extra"]["delay_sender"] = data["delay_sender"] 1410 data["extra"]["delay_sender"] = data["delay_sender"]
1398 except KeyError: 1411 except KeyError:
1399 pass 1412 pass
1414 ) 1427 )
1415 else: 1428 else:
1416 log.debug("Discarding bridge signal for empty message: {data}".format( 1429 log.debug("Discarding bridge signal for empty message: {data}".format(
1417 data=data)) 1430 data=data))
1418 return data 1431 return data
1419
1420 def cancel_error_trap(self, failure_):
1421 """A message sending can be cancelled by a plugin treatment"""
1422 failure_.trap(exceptions.CancelError)
1423 1432
1424 1433
1425 class LiberviaRosterProtocol(xmppim.RosterClientProtocol): 1434 class LiberviaRosterProtocol(xmppim.RosterClientProtocol):
1426 1435
1427 def __init__(self, host): 1436 def __init__(self, host):