comparison sat/plugins/plugin_pubsub_cache.py @ 3941:036188fff714

plugin pubsub cache: avoid race condition by retrying node request: When retrieving items, there may be a race condition if a node is not found in cache, then created before `setPubsubNode` is called. This patch handle this case by retrying in case of UNIQUE constraint violation.
author Goffi <goffi@goffi.org>
date Sat, 15 Oct 2022 20:38:33 +0200
parents e345d93fb6e5
children 524856bd7b19
comparison
equal deleted inserted replaced
3940:512487ce3579 3941:036188fff714
28 from sat.core import exceptions 28 from sat.core import exceptions
29 from sat.core.log import getLogger 29 from sat.core.log import getLogger
30 from sat.core.core_types import SatXMPPEntity 30 from sat.core.core_types import SatXMPPEntity
31 from sat.tools import xml_tools, utils 31 from sat.tools import xml_tools, utils
32 from sat.tools.common import data_format 32 from sat.tools.common import data_format
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState 33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError
34 34
35 35
36 log = getLogger(__name__) 36 log = getLogger(__name__)
37 37
38 PLUGIN_INFO = { 38 PLUGIN_INFO = {
610 if extra.get(C.KEY_USE_CACHE) == False: 610 if extra.get(C.KEY_USE_CACHE) == False:
611 log.debug("skipping pubsub cache as requested") 611 log.debug("skipping pubsub cache as requested")
612 return True, None 612 return True, None
613 if service is None: 613 if service is None:
614 service = client.jid.userhostJID() 614 service = client.jid.userhostJID()
615 pubsub_node = await self.host.memory.storage.getPubsubNode( 615 for __ in range(5):
616 client, service, node 616 pubsub_node = await self.host.memory.storage.getPubsubNode(
617 ) 617 client, service, node
618 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: 618 )
619 analyse = {"to_sync": True} 619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
620 analyse = {"to_sync": True}
621 else:
622 analyse = await self.analyseNode(client, service, node)
623
624 if pubsub_node is None:
625 try:
626 pubsub_node = await self.host.memory.storage.setPubsubNode(
627 client,
628 service,
629 node,
630 analyser=analyse.get("name"),
631 type_=analyse.get("type"),
632 subtype=analyse.get("subtype"),
633 )
634 except IntegrityError as e:
635 if "unique" in str(e.orig).lower():
636 log.debug(
637 "race condition on pubsub node creation in cache, trying "
638 "again"
639 )
640 else:
641 raise e
642 break
620 else: 643 else:
621 analyse = await self.analyseNode(client, service, node) 644 raise exceptions.InternalError(
622 645 "Too many IntegrityError with UNIQUE constraint, something is going wrong"
623 if pubsub_node is None:
624 pubsub_node = await self.host.memory.storage.setPubsubNode(
625 client,
626 service,
627 node,
628 analyser=analyse.get("name"),
629 type_=analyse.get("type"),
630 subtype=analyse.get("subtype"),
631 ) 646 )
632 647
633 if analyse.get("to_sync"): 648 if analyse.get("to_sync"):
634 if pubsub_node.sync_state == SyncState.COMPLETED: 649 if pubsub_node.sync_state == SyncState.COMPLETED:
635 if "mam" in extra: 650 if "mam" in extra: