Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_pubsub_cache.py @ 4356:c9626f46b63e
plugin XEP-0059: Use Pydantic models for RSM.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 11 Apr 2025 18:19:28 +0200 |
parents | 0d7bb4df2343 |
children |
comparison
equal
deleted
inserted
replaced
4355:01ee3b902d33 | 4356:c9626f46b63e |
---|---|
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 import time | 19 import time |
20 from datetime import datetime | 20 from datetime import datetime |
21 from typing import Optional, List, Tuple, Dict, Any | 21 from typing import Optional, List, Tuple, Dict, Any, cast |
22 from twisted.words.protocols.jabber import jid, error | 22 from twisted.words.protocols.jabber import jid, error |
23 from twisted.words.xish import domish | 23 from twisted.words.xish import domish |
24 from twisted.internet import defer | 24 from twisted.internet import defer |
25 from wokkel import pubsub, rsm | 25 from wokkel import pubsub, rsm |
26 from libervia.backend.core.i18n import _ | 26 from libervia.backend.core.i18n import _ |
27 from libervia.backend.core.constants import Const as C | 27 from libervia.backend.core.constants import Const as C |
28 from libervia.backend.core import exceptions | 28 from libervia.backend.core import exceptions |
29 from libervia.backend.core.log import getLogger | 29 from libervia.backend.core.log import getLogger |
30 from libervia.backend.core.core_types import SatXMPPEntity | 30 from libervia.backend.core.core_types import SatXMPPEntity |
31 from libervia.backend.plugins.plugin_xep_0059 import XEP_0059, RSMRequest | |
31 from libervia.backend.tools import xml_tools, utils | 32 from libervia.backend.tools import xml_tools, utils |
32 from libervia.backend.tools.common import data_format | 33 from libervia.backend.tools.common import data_format |
33 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError | 34 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError |
34 | 35 |
35 | 36 |
283 items, __ = await client.pubsub_client.items( | 284 items, __ = await client.pubsub_client.items( |
284 pubsub_node.service, pubsub_node.name, maxItems=20 | 285 pubsub_node.service, pubsub_node.name, maxItems=20 |
285 ) | 286 ) |
286 await self.cache_items(client, pubsub_node, items) | 287 await self.cache_items(client, pubsub_node, items) |
287 else: | 288 else: |
288 rsm_p = self.host.plugins["XEP-0059"] | 289 rsm_p = cast(XEP_0059, self.host.plugins["XEP-0059"]) |
289 rsm_request = rsm.RSMRequest() | 290 rsm_request = RSMRequest() |
290 cached_ids = set() | 291 cached_ids = set() |
291 while True: | 292 while True: |
292 items, rsm_response = await client.pubsub_client.items( | 293 items, rsm_response = await client.pubsub_client.items( |
293 service, node, rsm_request=rsm_request | 294 service, node, rsm_request=rsm_request |
294 ) | 295 ) |
312 f"than the cache limit ({CACHE_LIMIT}). We stop " | 313 f"than the cache limit ({CACHE_LIMIT}). We stop " |
313 "caching here, at item {item['id']!r}." | 314 "caching here, at item {item['id']!r}." |
314 ) | 315 ) |
315 rsm_request = None | 316 rsm_request = None |
316 break | 317 break |
317 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) | |
318 if rsm_request is None: | 318 if rsm_request is None: |
319 break | 319 break |
320 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) | |
320 | 321 |
321 await self.host.memory.storage.update_pubsub_node_sync_state( | 322 await self.host.memory.storage.update_pubsub_node_sync_state( |
322 pubsub_node, SyncState.COMPLETED | 323 pubsub_node, SyncState.COMPLETED |
323 ) | 324 ) |
324 except Exception as e: | 325 except Exception as e: |
654 log.warning( | 655 log.warning( |
655 f"{pubsub_node} is in progress for too long " | 656 f"{pubsub_node} is in progress for too long " |
656 f"({pubsub_node.sync_state_updated//60} minutes), " | 657 f"({pubsub_node.sync_state_updated//60} minutes), " |
657 "cancelling it and retrying." | 658 "cancelling it and retrying." |
658 ) | 659 ) |
659 self.in_progress.pop[(service, node)].cancel() | 660 self.in_progress.pop((service, node)).cancel() |
660 pubsub_node.sync_state = None | 661 pubsub_node.sync_state = None |
661 await self.host.memory.storage.delete_pubsub_items(pubsub_node) | 662 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
662 else: | 663 else: |
663 log.debug( | 664 log.debug( |
664 f"{pubsub_node} synchronisation is already in progress, skipping" | 665 f"{pubsub_node} synchronisation is already in progress, skipping" |