comparison libervia/backend/plugins/plugin_pubsub_cache.py @ 4356:c9626f46b63e

plugin XEP-0059: Use Pydantic models for RSM.
author Goffi <goffi@goffi.org>
date Fri, 11 Apr 2025 18:19:28 +0200
parents 0d7bb4df2343
children
comparison
equal deleted inserted replaced
4355:01ee3b902d33 4356:c9626f46b63e
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import time 19 import time
20 from datetime import datetime 20 from datetime import datetime
21 from typing import Optional, List, Tuple, Dict, Any 21 from typing import Optional, List, Tuple, Dict, Any, cast
22 from twisted.words.protocols.jabber import jid, error 22 from twisted.words.protocols.jabber import jid, error
23 from twisted.words.xish import domish 23 from twisted.words.xish import domish
24 from twisted.internet import defer 24 from twisted.internet import defer
25 from wokkel import pubsub, rsm 25 from wokkel import pubsub, rsm
26 from libervia.backend.core.i18n import _ 26 from libervia.backend.core.i18n import _
27 from libervia.backend.core.constants import Const as C 27 from libervia.backend.core.constants import Const as C
28 from libervia.backend.core import exceptions 28 from libervia.backend.core import exceptions
29 from libervia.backend.core.log import getLogger 29 from libervia.backend.core.log import getLogger
30 from libervia.backend.core.core_types import SatXMPPEntity 30 from libervia.backend.core.core_types import SatXMPPEntity
31 from libervia.backend.plugins.plugin_xep_0059 import XEP_0059, RSMRequest
31 from libervia.backend.tools import xml_tools, utils 32 from libervia.backend.tools import xml_tools, utils
32 from libervia.backend.tools.common import data_format 33 from libervia.backend.tools.common import data_format
33 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError 34 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError
34 35
35 36
283 items, __ = await client.pubsub_client.items( 284 items, __ = await client.pubsub_client.items(
284 pubsub_node.service, pubsub_node.name, maxItems=20 285 pubsub_node.service, pubsub_node.name, maxItems=20
285 ) 286 )
286 await self.cache_items(client, pubsub_node, items) 287 await self.cache_items(client, pubsub_node, items)
287 else: 288 else:
288 rsm_p = self.host.plugins["XEP-0059"] 289 rsm_p = cast(XEP_0059, self.host.plugins["XEP-0059"])
289 rsm_request = rsm.RSMRequest() 290 rsm_request = RSMRequest()
290 cached_ids = set() 291 cached_ids = set()
291 while True: 292 while True:
292 items, rsm_response = await client.pubsub_client.items( 293 items, rsm_response = await client.pubsub_client.items(
293 service, node, rsm_request=rsm_request 294 service, node, rsm_request=rsm_request
294 ) 295 )
312 f"than the cache limit ({CACHE_LIMIT}). We stop " 313 f"than the cache limit ({CACHE_LIMIT}). We stop "
313 "caching here, at item {item['id']!r}." 314 "caching here, at item {item['id']!r}."
314 ) 315 )
315 rsm_request = None 316 rsm_request = None
316 break 317 break
317 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
318 if rsm_request is None: 318 if rsm_request is None:
319 break 319 break
320 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
320 321
321 await self.host.memory.storage.update_pubsub_node_sync_state( 322 await self.host.memory.storage.update_pubsub_node_sync_state(
322 pubsub_node, SyncState.COMPLETED 323 pubsub_node, SyncState.COMPLETED
323 ) 324 )
324 except Exception as e: 325 except Exception as e:
654 log.warning( 655 log.warning(
655 f"{pubsub_node} is in progress for too long " 656 f"{pubsub_node} is in progress for too long "
656 f"({pubsub_node.sync_state_updated//60} minutes), " 657 f"({pubsub_node.sync_state_updated//60} minutes), "
657 "cancelling it and retrying." 658 "cancelling it and retrying."
658 ) 659 )
659 self.in_progress.pop[(service, node)].cancel() 660 self.in_progress.pop((service, node)).cancel()
660 pubsub_node.sync_state = None 661 pubsub_node.sync_state = None
661 await self.host.memory.storage.delete_pubsub_items(pubsub_node) 662 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
662 else: 663 else:
663 log.debug( 664 log.debug(
664 f"{pubsub_node} synchronisation is already in progress, skipping" 665 f"{pubsub_node} synchronisation is already in progress, skipping"