Mercurial > libervia-backend
changeset 3941:036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
When retrieving items, there may be a race condition if a node is not found in cache, then
created before `setPubsubNode` is called. This patch handle this case by retrying in case
of UNIQUE constraint violation.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 15 Oct 2022 20:38:33 +0200 |
parents | 512487ce3579 |
children | a92eef737703 |
files | sat/plugins/plugin_pubsub_cache.py |
diffstat | 1 files changed, 31 insertions(+), 16 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/plugins/plugin_pubsub_cache.py Sat Oct 15 20:38:33 2022 +0200 +++ b/sat/plugins/plugin_pubsub_cache.py Sat Oct 15 20:38:33 2022 +0200 @@ -30,7 +30,7 @@ from sat.core.core_types import SatXMPPEntity from sat.tools import xml_tools, utils from sat.tools.common import data_format -from sat.memory.sqla import PubsubNode, PubsubItem, SyncState +from sat.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError log = getLogger(__name__) @@ -612,22 +612,37 @@ return True, None if service is None: service = client.jid.userhostJID() - pubsub_node = await self.host.memory.storage.getPubsubNode( - client, service, node - ) - if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: - analyse = {"to_sync": True} - else: - analyse = await self.analyseNode(client, service, node) + for __ in range(5): + pubsub_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) + if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: + analyse = {"to_sync": True} + else: + analyse = await self.analyseNode(client, service, node) - if pubsub_node is None: - pubsub_node = await self.host.memory.storage.setPubsubNode( - client, - service, - node, - analyser=analyse.get("name"), - type_=analyse.get("type"), - subtype=analyse.get("subtype"), + if pubsub_node is None: + try: + pubsub_node = await self.host.memory.storage.setPubsubNode( + client, + service, + node, + analyser=analyse.get("name"), + type_=analyse.get("type"), + subtype=analyse.get("subtype"), + ) + except IntegrityError as e: + if "unique" in str(e.orig).lower(): + log.debug( + "race condition on pubsub node creation in cache, trying " + "again" + ) + else: + raise e + break + else: + raise exceptions.InternalError( + "Too many IntegrityError with UNIQUE constraint, something is going wrong" ) if analyse.get("to_sync"):