diff sat/plugins/plugin_pubsub_cache.py @ 3941:036188fff714

plugin pubsub cache: avoid race condition by retrying node request: When retrieving items, there may be a race condition if a node is not found in cache, then created before `setPubsubNode` is called. This patch handle this case by retrying in case of UNIQUE constraint violation.
author Goffi <goffi@goffi.org>
date Sat, 15 Oct 2022 20:38:33 +0200
parents e345d93fb6e5
children 524856bd7b19
line wrap: on
line diff
--- a/sat/plugins/plugin_pubsub_cache.py	Sat Oct 15 20:38:33 2022 +0200
+++ b/sat/plugins/plugin_pubsub_cache.py	Sat Oct 15 20:38:33 2022 +0200
@@ -30,7 +30,7 @@
 from sat.core.core_types import SatXMPPEntity
 from sat.tools import xml_tools, utils
 from sat.tools.common import data_format
-from sat.memory.sqla import PubsubNode, PubsubItem, SyncState
+from sat.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError
 
 
 log = getLogger(__name__)
@@ -612,22 +612,37 @@
             return True, None
         if service is None:
             service = client.jid.userhostJID()
-        pubsub_node = await self.host.memory.storage.getPubsubNode(
-            client, service, node
-        )
-        if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
-            analyse = {"to_sync": True}
-        else:
-            analyse = await self.analyseNode(client, service, node)
+        for __ in range(5):
+            pubsub_node = await self.host.memory.storage.getPubsubNode(
+                client, service, node
+            )
+            if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
+                analyse = {"to_sync": True}
+            else:
+                analyse = await self.analyseNode(client, service, node)
 
-        if pubsub_node is None:
-            pubsub_node = await self.host.memory.storage.setPubsubNode(
-                client,
-                service,
-                node,
-                analyser=analyse.get("name"),
-                type_=analyse.get("type"),
-                subtype=analyse.get("subtype"),
+            if pubsub_node is None:
+                try:
+                    pubsub_node = await self.host.memory.storage.setPubsubNode(
+                        client,
+                        service,
+                        node,
+                        analyser=analyse.get("name"),
+                        type_=analyse.get("type"),
+                        subtype=analyse.get("subtype"),
+                    )
+                except IntegrityError as e:
+                    if "unique" in str(e.orig).lower():
+                        log.debug(
+                            "race condition on pubsub node creation in cache, trying "
+                            "again"
+                        )
+                    else:
+                        raise e
+            break
+        else:
+            raise exceptions.InternalError(
+                "Too many IntegrityError with UNIQUE constraint, something is going wrong"
             )
 
         if analyse.get("to_sync"):