# HG changeset patch # User Goffi # Date 1665859113 -7200 # Node ID 036188fff7146e616c5357d46b7d51f09a0cd0dd # Parent 512487ce35791310106167ee7a4e1d70e6ef75e4 plugin pubsub cache: avoid race condition by retrying node request: When retrieving items, there may be a race condition if a node is not found in cache, then created before `setPubsubNode` is called. This patch handle this case by retrying in case of UNIQUE constraint violation. diff -r 512487ce3579 -r 036188fff714 sat/plugins/plugin_pubsub_cache.py --- a/sat/plugins/plugin_pubsub_cache.py Sat Oct 15 20:38:33 2022 +0200 +++ b/sat/plugins/plugin_pubsub_cache.py Sat Oct 15 20:38:33 2022 +0200 @@ -30,7 +30,7 @@ from sat.core.core_types import SatXMPPEntity from sat.tools import xml_tools, utils from sat.tools.common import data_format -from sat.memory.sqla import PubsubNode, PubsubItem, SyncState +from sat.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError log = getLogger(__name__) @@ -612,22 +612,37 @@ return True, None if service is None: service = client.jid.userhostJID() - pubsub_node = await self.host.memory.storage.getPubsubNode( - client, service, node - ) - if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: - analyse = {"to_sync": True} - else: - analyse = await self.analyseNode(client, service, node) + for __ in range(5): + pubsub_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) + if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: + analyse = {"to_sync": True} + else: + analyse = await self.analyseNode(client, service, node) - if pubsub_node is None: - pubsub_node = await self.host.memory.storage.setPubsubNode( - client, - service, - node, - analyser=analyse.get("name"), - type_=analyse.get("type"), - subtype=analyse.get("subtype"), + if pubsub_node is None: + try: + pubsub_node = await self.host.memory.storage.setPubsubNode( + client, + service, + node, + analyser=analyse.get("name"), + type_=analyse.get("type"), + subtype=analyse.get("subtype"), + ) + except IntegrityError as e: + if "unique" in str(e.orig).lower(): + log.debug( + "race condition on pubsub node creation in cache, trying " + "again" + ) + else: + raise e + break + else: + raise exceptions.InternalError( + "Too many IntegrityError with UNIQUE constraint, something is going wrong" ) if analyse.get("to_sync"):