Mercurial > libervia-backend
comparison sat/plugins/plugin_pubsub_cache.py @ 3941:036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
When retrieving items, there may be a race condition if a node is not found in cache, then
created before `setPubsubNode` is called. This patch handle this case by retrying in case
of UNIQUE constraint violation.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 15 Oct 2022 20:38:33 +0200 |
parents | e345d93fb6e5 |
children | 524856bd7b19 |
comparison
equal
deleted
inserted
replaced
3940:512487ce3579 | 3941:036188fff714 |
---|---|
28 from sat.core import exceptions | 28 from sat.core import exceptions |
29 from sat.core.log import getLogger | 29 from sat.core.log import getLogger |
30 from sat.core.core_types import SatXMPPEntity | 30 from sat.core.core_types import SatXMPPEntity |
31 from sat.tools import xml_tools, utils | 31 from sat.tools import xml_tools, utils |
32 from sat.tools.common import data_format | 32 from sat.tools.common import data_format |
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState | 33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError |
34 | 34 |
35 | 35 |
36 log = getLogger(__name__) | 36 log = getLogger(__name__) |
37 | 37 |
38 PLUGIN_INFO = { | 38 PLUGIN_INFO = { |
610 if extra.get(C.KEY_USE_CACHE) == False: | 610 if extra.get(C.KEY_USE_CACHE) == False: |
611 log.debug("skipping pubsub cache as requested") | 611 log.debug("skipping pubsub cache as requested") |
612 return True, None | 612 return True, None |
613 if service is None: | 613 if service is None: |
614 service = client.jid.userhostJID() | 614 service = client.jid.userhostJID() |
615 pubsub_node = await self.host.memory.storage.getPubsubNode( | 615 for __ in range(5): |
616 client, service, node | 616 pubsub_node = await self.host.memory.storage.getPubsubNode( |
617 ) | 617 client, service, node |
618 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: | 618 ) |
619 analyse = {"to_sync": True} | 619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: |
620 analyse = {"to_sync": True} | |
621 else: | |
622 analyse = await self.analyseNode(client, service, node) | |
623 | |
624 if pubsub_node is None: | |
625 try: | |
626 pubsub_node = await self.host.memory.storage.setPubsubNode( | |
627 client, | |
628 service, | |
629 node, | |
630 analyser=analyse.get("name"), | |
631 type_=analyse.get("type"), | |
632 subtype=analyse.get("subtype"), | |
633 ) | |
634 except IntegrityError as e: | |
635 if "unique" in str(e.orig).lower(): | |
636 log.debug( | |
637 "race condition on pubsub node creation in cache, trying " | |
638 "again" | |
639 ) | |
640 else: | |
641 raise e | |
642 break | |
620 else: | 643 else: |
621 analyse = await self.analyseNode(client, service, node) | 644 raise exceptions.InternalError( |
622 | 645 "Too many IntegrityError with UNIQUE constraint, something is going wrong" |
623 if pubsub_node is None: | |
624 pubsub_node = await self.host.memory.storage.setPubsubNode( | |
625 client, | |
626 service, | |
627 node, | |
628 analyser=analyse.get("name"), | |
629 type_=analyse.get("type"), | |
630 subtype=analyse.get("subtype"), | |
631 ) | 646 ) |
632 | 647 |
633 if analyse.get("to_sync"): | 648 if analyse.get("to_sync"): |
634 if pubsub_node.sync_state == SyncState.COMPLETED: | 649 if pubsub_node.sync_state == SyncState.COMPLETED: |
635 if "mam" in extra: | 650 if "mam" in extra: |