comparison sat/plugins/plugin_pubsub_cache.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents 036188fff714
children
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
63 # situation. 63 # situation.
64 # TODO: handle configuration events 64 # TODO: handle configuration events
65 65
66 def __init__(self, host): 66 def __init__(self, host):
67 log.info(_("PubSub Cache initialization")) 67 log.info(_("PubSub Cache initialization"))
68 strategy = host.memory.getConfig(None, "pubsub_cache_strategy") 68 strategy = host.memory.config_get(None, "pubsub_cache_strategy")
69 if strategy == "no_cache": 69 if strategy == "no_cache":
70 log.info( 70 log.info(
71 _( 71 _(
72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " 72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} "
73 "setting." 73 "setting."
79 self.host = host 79 self.host = host
80 self._p = host.plugins["XEP-0060"] 80 self._p = host.plugins["XEP-0060"]
81 self.analysers = {} 81 self.analysers = {}
82 # map for caching in progress (node, service) => Deferred 82 # map for caching in progress (node, service) => Deferred
83 self.in_progress = {} 83 self.in_progress = {}
84 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) 84 self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger)
85 self._p.addManagedNode( 85 self._p.add_managed_node(
86 "", 86 "",
87 items_cb=self.onItemsEvent, 87 items_cb=self.on_items_event,
88 delete_cb=self.onDeleteEvent, 88 delete_cb=self.on_delete_event,
89 purge_db=self.onPurgeEvent, 89 purge_db=self.on_purge_event,
90 ) 90 )
91 host.bridge.addMethod( 91 host.bridge.add_method(
92 "psCacheGet", 92 "ps_cache_get",
93 ".plugin", 93 ".plugin",
94 in_sign="ssiassss", 94 in_sign="ssiassss",
95 out_sign="s", 95 out_sign="s",
96 method=self._getItemsFromCache, 96 method=self._get_items_from_cache,
97 async_=True, 97 async_=True,
98 ) 98 )
99 host.bridge.addMethod( 99 host.bridge.add_method(
100 "psCacheSync", 100 "ps_cache_sync",
101 ".plugin", 101 ".plugin",
102 "sss", 102 "sss",
103 out_sign="", 103 out_sign="",
104 method=self._synchronise, 104 method=self._synchronise,
105 async_=True, 105 async_=True,
106 ) 106 )
107 host.bridge.addMethod( 107 host.bridge.add_method(
108 "psCachePurge", 108 "ps_cache_purge",
109 ".plugin", 109 ".plugin",
110 "s", 110 "s",
111 out_sign="", 111 out_sign="",
112 method=self._purge, 112 method=self._purge,
113 async_=True, 113 async_=True,
114 ) 114 )
115 host.bridge.addMethod( 115 host.bridge.add_method(
116 "psCacheReset", 116 "ps_cache_reset",
117 ".plugin", 117 ".plugin",
118 "", 118 "",
119 out_sign="", 119 out_sign="",
120 method=self._reset, 120 method=self._reset,
121 async_=True, 121 async_=True,
122 ) 122 )
123 host.bridge.addMethod( 123 host.bridge.add_method(
124 "psCacheSearch", 124 "ps_cache_search",
125 ".plugin", 125 ".plugin",
126 "s", 126 "s",
127 out_sign="s", 127 out_sign="s",
128 method=self._search, 128 method=self._search,
129 async_=True, 129 async_=True,
130 ) 130 )
131 131
132 def registerAnalyser(self, analyser: dict) -> None: 132 def register_analyser(self, analyser: dict) -> None:
133 """Register a new pubsub node analyser 133 """Register a new pubsub node analyser
134 134
135 @param analyser: An analyser is a dictionary which may have the following keys 135 @param analyser: An analyser is a dictionary which may have the following keys
136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys 136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys
137 must be used): 137 must be used):
201 raise exceptions.Conflict( 201 raise exceptions.Conflict(
202 f"An analyser with the name {name!r} is already registered" 202 f"An analyser with the name {name!r} is already registered"
203 ) 203 )
204 self.analysers[name] = analyser 204 self.analysers[name] = analyser
205 205
206 async def cacheItems( 206 async def cache_items(
207 self, 207 self,
208 client: SatXMPPEntity, 208 client: SatXMPPEntity,
209 pubsub_node: PubsubNode, 209 pubsub_node: PubsubNode,
210 items: List[domish.Element] 210 items: List[domish.Element]
211 ) -> None: 211 ) -> None:
214 except KeyError: 214 except KeyError:
215 parser = None 215 parser = None
216 216
217 if parser is not None: 217 if parser is not None:
218 parsed_items = [ 218 parsed_items = [
219 await utils.asDeferred( 219 await utils.as_deferred(
220 parser, 220 parser,
221 client, 221 client,
222 item, 222 item,
223 pubsub_node.service, 223 pubsub_node.service,
224 pubsub_node.name 224 pubsub_node.name
226 for item in items 226 for item in items
227 ] 227 ]
228 else: 228 else:
229 parsed_items = None 229 parsed_items = None
230 230
231 await self.host.memory.storage.cachePubsubItems( 231 await self.host.memory.storage.cache_pubsub_items(
232 client, pubsub_node, items, parsed_items 232 client, pubsub_node, items, parsed_items
233 ) 233 )
234 234
235 async def _cacheNode( 235 async def _cache_node(
236 self, 236 self,
237 client: SatXMPPEntity, 237 client: SatXMPPEntity,
238 pubsub_node: PubsubNode 238 pubsub_node: PubsubNode
239 ) -> None: 239 ) -> None:
240 await self.host.memory.storage.updatePubsubNodeSyncState( 240 await self.host.memory.storage.update_pubsub_node_sync_state(
241 pubsub_node, SyncState.IN_PROGRESS 241 pubsub_node, SyncState.IN_PROGRESS
242 ) 242 )
243 service, node = pubsub_node.service, pubsub_node.name 243 service, node = pubsub_node.service, pubsub_node.name
244 try: 244 try:
245 log.debug( 245 log.debug(
272 "{state}" 272 "{state}"
273 ).format(pubsub_node=pubsub_node, state=sub.state) 273 ).format(pubsub_node=pubsub_node, state=sub.state)
274 ) 274 )
275 275
276 try: 276 try:
277 await self.host.checkFeatures( 277 await self.host.check_features(
278 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service 278 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
279 ) 279 )
280 except error.StanzaError as e: 280 except error.StanzaError as e:
281 if e.condition == "service-unavailable": 281 if e.condition == "service-unavailable":
282 log.warning( 282 log.warning(
284 "latest 20 items" 284 "latest 20 items"
285 ) 285 )
286 items, __ = await client.pubsub_client.items( 286 items, __ = await client.pubsub_client.items(
287 pubsub_node.service, pubsub_node.name, maxItems=20 287 pubsub_node.service, pubsub_node.name, maxItems=20
288 ) 288 )
289 await self.cacheItems( 289 await self.cache_items(
290 client, pubsub_node, items 290 client, pubsub_node, items
291 ) 291 )
292 else: 292 else:
293 raise e 293 raise e
294 except exceptions.FeatureNotFound: 294 except exceptions.FeatureNotFound:
297 "(XEP-0059), we'll only cache latest 20 items" 297 "(XEP-0059), we'll only cache latest 20 items"
298 ) 298 )
299 items, __ = await client.pubsub_client.items( 299 items, __ = await client.pubsub_client.items(
300 pubsub_node.service, pubsub_node.name, maxItems=20 300 pubsub_node.service, pubsub_node.name, maxItems=20
301 ) 301 )
302 await self.cacheItems( 302 await self.cache_items(
303 client, pubsub_node, items 303 client, pubsub_node, items
304 ) 304 )
305 else: 305 else:
306 rsm_p = self.host.plugins["XEP-0059"] 306 rsm_p = self.host.plugins["XEP-0059"]
307 rsm_request = rsm.RSMRequest() 307 rsm_request = rsm.RSMRequest()
308 cached_ids = set() 308 cached_ids = set()
309 while True: 309 while True:
310 items, rsm_response = await client.pubsub_client.items( 310 items, rsm_response = await client.pubsub_client.items(
311 service, node, rsm_request=rsm_request 311 service, node, rsm_request=rsm_request
312 ) 312 )
313 await self.cacheItems( 313 await self.cache_items(
314 client, pubsub_node, items 314 client, pubsub_node, items
315 ) 315 )
316 for item in items: 316 for item in items:
317 item_id = item["id"] 317 item_id = item["id"]
318 if item_id in cached_ids: 318 if item_id in cached_ids:
332 f"than the cache limit ({CACHE_LIMIT}). We stop " 332 f"than the cache limit ({CACHE_LIMIT}). We stop "
333 "caching here, at item {item['id']!r}." 333 "caching here, at item {item['id']!r}."
334 ) 334 )
335 rsm_request = None 335 rsm_request = None
336 break 336 break
337 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) 337 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
338 if rsm_request is None: 338 if rsm_request is None:
339 break 339 break
340 340
341 await self.host.memory.storage.updatePubsubNodeSyncState( 341 await self.host.memory.storage.update_pubsub_node_sync_state(
342 pubsub_node, SyncState.COMPLETED 342 pubsub_node, SyncState.COMPLETED
343 ) 343 )
344 except Exception as e: 344 except Exception as e:
345 import traceback 345 import traceback
346 tb = traceback.format_tb(e.__traceback__) 346 tb = traceback.format_tb(e.__traceback__)
347 log.error( 347 log.error(
348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" 348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
349 ) 349 )
350 await self.host.memory.storage.updatePubsubNodeSyncState( 350 await self.host.memory.storage.update_pubsub_node_sync_state(
351 pubsub_node, SyncState.ERROR 351 pubsub_node, SyncState.ERROR
352 ) 352 )
353 await self.host.memory.storage.deletePubsubItems(pubsub_node) 353 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
354 raise e 354 raise e
355 355
356 def _cacheNodeClean(self, __, pubsub_node): 356 def _cache_node_clean(self, __, pubsub_node):
357 del self.in_progress[(pubsub_node.service, pubsub_node.name)] 357 del self.in_progress[(pubsub_node.service, pubsub_node.name)]
358 358
359 def cacheNode( 359 def cache_node(
360 self, 360 self,
361 client: SatXMPPEntity, 361 client: SatXMPPEntity,
362 pubsub_node: PubsubNode 362 pubsub_node: PubsubNode
363 ) -> None: 363 ) -> None:
364 """Launch node caching as a background task""" 364 """Launch node caching as a background task"""
365 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) 365 d = defer.ensureDeferred(self._cache_node(client, pubsub_node))
366 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) 366 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node)
367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d 367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
368 return d 368 return d
369 369
370 async def analyseNode( 370 async def analyse_node(
371 self, 371 self,
372 client: SatXMPPEntity, 372 client: SatXMPPEntity,
373 service: jid.JID, 373 service: jid.JID,
374 node: str, 374 node: str,
375 pubsub_node : PubsubNode = None, 375 pubsub_node : PubsubNode = None,
449 try: 449 try:
450 match_cb = analyser["match_cb"] 450 match_cb = analyser["match_cb"]
451 except KeyError: 451 except KeyError:
452 pass 452 pass
453 else: 453 else:
454 await utils.asDeferred(match_cb, client, analyse) 454 await utils.as_deferred(match_cb, client, analyse)
455 return analyse 455 return analyse
456 456
457 def _getItemsFromCache( 457 def _get_items_from_cache(
458 self, service="", node="", max_items=10, item_ids=None, sub_id=None, 458 self, service="", node="", max_items=10, item_ids=None, sub_id=None,
459 extra="", profile_key=C.PROF_KEY_NONE 459 extra="", profile_key=C.PROF_KEY_NONE
460 ): 460 ):
461 d = defer.ensureDeferred(self._aGetItemsFromCache( 461 d = defer.ensureDeferred(self._a_get_items_from_cache(
462 service, node, max_items, item_ids, sub_id, extra, profile_key 462 service, node, max_items, item_ids, sub_id, extra, profile_key
463 )) 463 ))
464 d.addCallback(self._p.transItemsData) 464 d.addCallback(self._p.trans_items_data)
465 d.addCallback(self._p.serialiseItems) 465 d.addCallback(self._p.serialise_items)
466 return d 466 return d
467 467
468 async def _aGetItemsFromCache( 468 async def _a_get_items_from_cache(
469 self, service, node, max_items, item_ids, sub_id, extra, profile_key 469 self, service, node, max_items, item_ids, sub_id, extra, profile_key
470 ): 470 ):
471 client = self.host.getClient(profile_key) 471 client = self.host.get_client(profile_key)
472 service = jid.JID(service) if service else client.jid.userhostJID() 472 service = jid.JID(service) if service else client.jid.userhostJID()
473 pubsub_node = await self.host.memory.storage.getPubsubNode( 473 pubsub_node = await self.host.memory.storage.get_pubsub_node(
474 client, service, node 474 client, service, node
475 ) 475 )
476 if pubsub_node is None: 476 if pubsub_node is None:
477 raise exceptions.NotFound( 477 raise exceptions.NotFound(
478 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" 478 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
479 ) 479 )
480 max_items = None if max_items == C.NO_LIMIT else max_items 480 max_items = None if max_items == C.NO_LIMIT else max_items
481 extra = self._p.parseExtra(data_format.deserialise(extra)) 481 extra = self._p.parse_extra(data_format.deserialise(extra))
482 items, metadata = await self.getItemsFromCache( 482 items, metadata = await self.get_items_from_cache(
483 client, 483 client,
484 pubsub_node, 484 pubsub_node,
485 max_items, 485 max_items,
486 item_ids, 486 item_ids,
487 sub_id or None, 487 sub_id or None,
488 extra.rsm_request, 488 extra.rsm_request,
489 extra.extra, 489 extra.extra,
490 ) 490 )
491 return [i.data for i in items], metadata 491 return [i.data for i in items], metadata
492 492
493 async def getItemsFromCache( 493 async def get_items_from_cache(
494 self, 494 self,
495 client: SatXMPPEntity, 495 client: SatXMPPEntity,
496 node: PubsubNode, 496 node: PubsubNode,
497 max_items: Optional[int] = None, 497 max_items: Optional[int] = None,
498 item_ids: Optional[List[str]] = None, 498 item_ids: Optional[List[str]] = None,
505 extra = {} 505 extra = {}
506 if "mam" in extra: 506 if "mam" in extra:
507 raise NotImplementedError("MAM queries are not supported yet") 507 raise NotImplementedError("MAM queries are not supported yet")
508 if max_items is None and rsm_request is None: 508 if max_items is None and rsm_request is None:
509 max_items = 20 509 max_items = 20
510 pubsub_items, metadata = await self.host.memory.storage.getItems( 510 pubsub_items, metadata = await self.host.memory.storage.get_items(
511 node, max_items=max_items, item_ids=item_ids or None, 511 node, max_items=max_items, item_ids=item_ids or None,
512 order_by=extra.get(C.KEY_ORDER_BY) 512 order_by=extra.get(C.KEY_ORDER_BY)
513 ) 513 )
514 elif max_items is not None: 514 elif max_items is not None:
515 if rsm_request is not None: 515 if rsm_request is not None:
518 ) 518 )
519 elif item_ids: 519 elif item_ids:
520 raise exceptions.InternalError( 520 raise exceptions.InternalError(
521 "Pubsub max items and item IDs must not be used at the same time" 521 "Pubsub max items and item IDs must not be used at the same time"
522 ) 522 )
523 pubsub_items, metadata = await self.host.memory.storage.getItems( 523 pubsub_items, metadata = await self.host.memory.storage.get_items(
524 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) 524 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
525 ) 525 )
526 else: 526 else:
527 desc = False 527 desc = False
528 if rsm_request.before == "": 528 if rsm_request.before == "":
529 before = None 529 before = None
530 desc = True 530 desc = True
531 else: 531 else:
532 before = rsm_request.before 532 before = rsm_request.before
533 pubsub_items, metadata = await self.host.memory.storage.getItems( 533 pubsub_items, metadata = await self.host.memory.storage.get_items(
534 node, max_items=rsm_request.max, before=before, after=rsm_request.after, 534 node, max_items=rsm_request.max, before=before, after=rsm_request.after,
535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), 535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
536 desc=desc, force_rsm=True, 536 desc=desc, force_rsm=True,
537 ) 537 )
538 538
539 return pubsub_items, metadata 539 return pubsub_items, metadata
540 540
541 async def onItemsEvent(self, client, event): 541 async def on_items_event(self, client, event):
542 node = await self.host.memory.storage.getPubsubNode( 542 node = await self.host.memory.storage.get_pubsub_node(
543 client, event.sender, event.nodeIdentifier 543 client, event.sender, event.nodeIdentifier
544 ) 544 )
545 if node is None: 545 if node is None:
546 return 546 return
547 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): 547 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS):
553 elif elt.name == "retract": 553 elif elt.name == "retract":
554 item_id = elt.getAttribute("id") 554 item_id = elt.getAttribute("id")
555 if not item_id: 555 if not item_id:
556 log.warning( 556 log.warning(
557 "Ignoring invalid retract item element: " 557 "Ignoring invalid retract item element: "
558 f"{xml_tools.pFmtElt(elt)}" 558 f"{xml_tools.p_fmt_elt(elt)}"
559 ) 559 )
560 continue 560 continue
561 561
562 retract_ids.append(elt["id"]) 562 retract_ids.append(elt["id"])
563 else: 563 else:
564 log.warning( 564 log.warning(
565 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" 565 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}"
566 ) 566 )
567 if items: 567 if items:
568 log.debug(f"[{client.profile}] caching new items received from {node}") 568 log.debug(f"[{client.profile}] caching new items received from {node}")
569 await self.cacheItems( 569 await self.cache_items(
570 client, node, items 570 client, node, items
571 ) 571 )
572 if retract_ids: 572 if retract_ids:
573 log.debug(f"deleting retracted items from {node}") 573 log.debug(f"deleting retracted items from {node}")
574 await self.host.memory.storage.deletePubsubItems( 574 await self.host.memory.storage.delete_pubsub_items(
575 node, items_names=retract_ids 575 node, items_names=retract_ids
576 ) 576 )
577 577
578 async def onDeleteEvent(self, client, event): 578 async def on_delete_event(self, client, event):
579 log.debug( 579 log.debug(
580 f"deleting node {event.nodeIdentifier} from {event.sender} for " 580 f"deleting node {event.nodeIdentifier} from {event.sender} for "
581 f"{client.profile}" 581 f"{client.profile}"
582 ) 582 )
583 await self.host.memory.storage.deletePubsubNode( 583 await self.host.memory.storage.delete_pubsub_node(
584 [client.profile], [event.sender], [event.nodeIdentifier] 584 [client.profile], [event.sender], [event.nodeIdentifier]
585 ) 585 )
586 586
587 async def onPurgeEvent(self, client, event): 587 async def on_purge_event(self, client, event):
588 node = await self.host.memory.storage.getPubsubNode( 588 node = await self.host.memory.storage.get_pubsub_node(
589 client, event.sender, event.nodeIdentifier 589 client, event.sender, event.nodeIdentifier
590 ) 590 )
591 if node is None: 591 if node is None:
592 return 592 return
593 log.debug(f"purging node {node} for {client.profile}") 593 log.debug(f"purging node {node} for {client.profile}")
594 await self.host.memory.storage.deletePubsubItems(node) 594 await self.host.memory.storage.delete_pubsub_items(node)
595 595
596 async def _getItemsTrigger( 596 async def _get_items_trigger(
597 self, 597 self,
598 client: SatXMPPEntity, 598 client: SatXMPPEntity,
599 service: Optional[jid.JID], 599 service: Optional[jid.JID],
600 node: str, 600 node: str,
601 max_items: Optional[int], 601 max_items: Optional[int],
611 log.debug("skipping pubsub cache as requested") 611 log.debug("skipping pubsub cache as requested")
612 return True, None 612 return True, None
613 if service is None: 613 if service is None:
614 service = client.jid.userhostJID() 614 service = client.jid.userhostJID()
615 for __ in range(5): 615 for __ in range(5):
616 pubsub_node = await self.host.memory.storage.getPubsubNode( 616 pubsub_node = await self.host.memory.storage.get_pubsub_node(
617 client, service, node 617 client, service, node
618 ) 618 )
619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: 619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
620 analyse = {"to_sync": True} 620 analyse = {"to_sync": True}
621 else: 621 else:
622 analyse = await self.analyseNode(client, service, node) 622 analyse = await self.analyse_node(client, service, node)
623 623
624 if pubsub_node is None: 624 if pubsub_node is None:
625 try: 625 try:
626 pubsub_node = await self.host.memory.storage.setPubsubNode( 626 pubsub_node = await self.host.memory.storage.set_pubsub_node(
627 client, 627 client,
628 service, 628 service,
629 node, 629 node,
630 analyser=analyse.get("name"), 630 analyser=analyse.get("name"),
631 type_=analyse.get("type"), 631 type_=analyse.get("type"),
648 if analyse.get("to_sync"): 648 if analyse.get("to_sync"):
649 if pubsub_node.sync_state == SyncState.COMPLETED: 649 if pubsub_node.sync_state == SyncState.COMPLETED:
650 if "mam" in extra: 650 if "mam" in extra:
651 log.debug("MAM caching is not supported yet, skipping cache") 651 log.debug("MAM caching is not supported yet, skipping cache")
652 return True, None 652 return True, None
653 pubsub_items, metadata = await self.getItemsFromCache( 653 pubsub_items, metadata = await self.get_items_from_cache(
654 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra 654 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
655 ) 655 )
656 return False, ([i.data for i in pubsub_items], metadata) 656 return False, ([i.data for i in pubsub_items], metadata)
657 657
658 if pubsub_node.sync_state == SyncState.IN_PROGRESS: 658 if pubsub_node.sync_state == SyncState.IN_PROGRESS:
661 f"{pubsub_node} is reported as being cached, but not caching is " 661 f"{pubsub_node} is reported as being cached, but not caching is "
662 "in progress, this is most probably due to the backend being " 662 "in progress, this is most probably due to the backend being "
663 "restarted. Resetting the status, caching will be done again." 663 "restarted. Resetting the status, caching will be done again."
664 ) 664 )
665 pubsub_node.sync_state = None 665 pubsub_node.sync_state = None
666 await self.host.memory.storage.deletePubsubItems(pubsub_node) 666 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
667 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: 667 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
668 log.warning( 668 log.warning(
669 f"{pubsub_node} is in progress for too long " 669 f"{pubsub_node} is in progress for too long "
670 f"({pubsub_node.sync_state_updated//60} minutes), " 670 f"({pubsub_node.sync_state_updated//60} minutes), "
671 "cancelling it and retrying." 671 "cancelling it and retrying."
672 ) 672 )
673 self.in_progress.pop[(service, node)].cancel() 673 self.in_progress.pop[(service, node)].cancel()
674 pubsub_node.sync_state = None 674 pubsub_node.sync_state = None
675 await self.host.memory.storage.deletePubsubItems(pubsub_node) 675 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
676 else: 676 else:
677 log.debug( 677 log.debug(
678 f"{pubsub_node} synchronisation is already in progress, skipping" 678 f"{pubsub_node} synchronisation is already in progress, skipping"
679 ) 679 )
680 if pubsub_node.sync_state is None: 680 if pubsub_node.sync_state is None:
682 if key in self.in_progress: 682 if key in self.in_progress:
683 raise exceptions.InternalError( 683 raise exceptions.InternalError(
684 f"There is already a caching in progress for {pubsub_node}, this " 684 f"There is already a caching in progress for {pubsub_node}, this "
685 "should not happen" 685 "should not happen"
686 ) 686 )
687 self.cacheNode(client, pubsub_node) 687 self.cache_node(client, pubsub_node)
688 elif pubsub_node.sync_state == SyncState.ERROR: 688 elif pubsub_node.sync_state == SyncState.ERROR:
689 log.debug( 689 log.debug(
690 f"{pubsub_node} synchronisation has previously failed, skipping" 690 f"{pubsub_node} synchronisation has previously failed, skipping"
691 ) 691 )
692 692
693 return True, None 693 return True, None
694 694
695 async def _subscribeTrigger( 695 async def _subscribe_trigger(
696 self, 696 self,
697 client: SatXMPPEntity, 697 client: SatXMPPEntity,
698 service: jid.JID, 698 service: jid.JID,
699 nodeIdentifier: str, 699 nodeIdentifier: str,
700 sub_jid: Optional[jid.JID], 700 sub_jid: Optional[jid.JID],
701 options: Optional[dict], 701 options: Optional[dict],
702 subscription: pubsub.Subscription 702 subscription: pubsub.Subscription
703 ) -> None: 703 ) -> None:
704 pass 704 pass
705 705
706 async def _unsubscribeTrigger( 706 async def _unsubscribe_trigger(
707 self, 707 self,
708 client: SatXMPPEntity, 708 client: SatXMPPEntity,
709 service: jid.JID, 709 service: jid.JID,
710 nodeIdentifier: str, 710 nodeIdentifier: str,
711 sub_jid, 711 sub_jid,
713 sender, 713 sender,
714 ) -> None: 714 ) -> None:
715 pass 715 pass
716 716
717 def _synchronise(self, service, node, profile_key): 717 def _synchronise(self, service, node, profile_key):
718 client = self.host.getClient(profile_key) 718 client = self.host.get_client(profile_key)
719 service = client.jid.userhostJID() if not service else jid.JID(service) 719 service = client.jid.userhostJID() if not service else jid.JID(service)
720 return defer.ensureDeferred(self.synchronise(client, service, node)) 720 return defer.ensureDeferred(self.synchronise(client, service, node))
721 721
722 async def synchronise( 722 async def synchronise(
723 self, 723 self,
733 Note that when a node is synchronised, it is automatically subscribed. 733 Note that when a node is synchronised, it is automatically subscribed.
734 @param resync: if True and the node is already synchronised, it will be 734 @param resync: if True and the node is already synchronised, it will be
735 resynchronised (all items will be deleted and re-downloaded). 735 resynchronised (all items will be deleted and re-downloaded).
736 736
737 """ 737 """
738 pubsub_node = await self.host.memory.storage.getPubsubNode( 738 pubsub_node = await self.host.memory.storage.get_pubsub_node(
739 client, service, node 739 client, service, node
740 ) 740 )
741 if pubsub_node is None: 741 if pubsub_node is None:
742 log.info( 742 log.info(
743 _( 743 _(
744 "Synchronising the new node {node} at {service}" 744 "Synchronising the new node {node} at {service}"
745 ).format(node=node, service=service.full) 745 ).format(node=node, service=service.full)
746 ) 746 )
747 analyse = await self.analyseNode(client, service, node) 747 analyse = await self.analyse_node(client, service, node)
748 pubsub_node = await self.host.memory.storage.setPubsubNode( 748 pubsub_node = await self.host.memory.storage.set_pubsub_node(
749 client, 749 client,
750 service, 750 service,
751 node, 751 node,
752 analyser=analyse.get("name"), 752 analyser=analyse.get("name"),
753 type_=analyse.get("type"), 753 type_=analyse.get("type"),
770 "(Re)Synchronising the node {node} at {service} on user request" 770 "(Re)Synchronising the node {node} at {service} on user request"
771 ).format(node=node, service=service.full()) 771 ).format(node=node, service=service.full())
772 ) 772 )
773 # we first delete and recreate the node (will also delete its items) 773 # we first delete and recreate the node (will also delete its items)
774 await self.host.memory.storage.delete(pubsub_node) 774 await self.host.memory.storage.delete(pubsub_node)
775 analyse = await self.analyseNode(client, service, node) 775 analyse = await self.analyse_node(client, service, node)
776 pubsub_node = await self.host.memory.storage.setPubsubNode( 776 pubsub_node = await self.host.memory.storage.set_pubsub_node(
777 client, 777 client,
778 service, 778 service,
779 node, 779 node,
780 analyser=analyse.get("name"), 780 analyser=analyse.get("name"),
781 type_=analyse.get("type"), 781 type_=analyse.get("type"),
782 ) 782 )
783 # then we can put node in cache 783 # then we can put node in cache
784 await self.cacheNode(client, pubsub_node) 784 await self.cache_node(client, pubsub_node)
785 785
786 async def purge(self, purge_filters: dict) -> None: 786 async def purge(self, purge_filters: dict) -> None:
787 """Remove items according to filters 787 """Remove items according to filters
788 788
789 filters can have on of the following keys, all are optional: 789 filters can have on of the following keys, all are optional:
802 datetime before which items must have been created to be deleted 802 datetime before which items must have been created to be deleted
803 :created_update: 803 :created_update:
804 datetime before which items must have been updated last to be deleted 804 datetime before which items must have been updated last to be deleted
805 """ 805 """
806 purge_filters["names"] = purge_filters.pop("nodes", None) 806 purge_filters["names"] = purge_filters.pop("nodes", None)
807 await self.host.memory.storage.purgePubsubItems(**purge_filters) 807 await self.host.memory.storage.purge_pubsub_items(**purge_filters)
808 808
809 def _purge(self, purge_filters: str) -> None: 809 def _purge(self, purge_filters: str) -> None:
810 purge_filters = data_format.deserialise(purge_filters) 810 purge_filters = data_format.deserialise(purge_filters)
811 for key in "created_before", "updated_before": 811 for key in "created_before", "updated_before":
812 try: 812 try:
818 async def reset(self) -> None: 818 async def reset(self) -> None:
819 """Remove ALL nodes and items from cache 819 """Remove ALL nodes and items from cache
820 820
821 After calling this method, cache will be refilled progressively as if it where new 821 After calling this method, cache will be refilled progressively as if it where new
822 """ 822 """
823 await self.host.memory.storage.deletePubsubNode(None, None, None) 823 await self.host.memory.storage.delete_pubsub_node(None, None, None)
824 824
825 def _reset(self) -> defer.Deferred: 825 def _reset(self) -> defer.Deferred:
826 return defer.ensureDeferred(self.reset()) 826 return defer.ensureDeferred(self.reset())
827 827
828 async def search(self, query: dict) -> List[PubsubItem]: 828 async def search(self, query: dict) -> List[PubsubItem]:
829 """Search pubsub items in cache""" 829 """Search pubsub items in cache"""
830 return await self.host.memory.storage.searchPubsubItems(query) 830 return await self.host.memory.storage.search_pubsub_items(query)
831 831
832 async def serialisableSearch(self, query: dict) -> List[dict]: 832 async def serialisable_search(self, query: dict) -> List[dict]:
833 """Search pubsub items in cache and returns parsed data 833 """Search pubsub items in cache and returns parsed data
834 834
835 The returned data can be serialised. 835 The returned data can be serialised.
836 836
837 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) 837 "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
842 parsed = item.parsed 842 parsed = item.parsed
843 parsed["pubsub_service"] = item.node.service.full() 843 parsed["pubsub_service"] = item.node.service.full()
844 parsed["pubsub_node"] = item.node.name 844 parsed["pubsub_node"] = item.node.name
845 if query.get("with_payload"): 845 if query.get("with_payload"):
846 parsed["item_payload"] = item.data.toXml() 846 parsed["item_payload"] = item.data.toXml()
847 parsed["node_profile"] = self.host.memory.storage.getProfileById( 847 parsed["node_profile"] = self.host.memory.storage.get_profile_by_id(
848 item.node.profile_id 848 item.node.profile_id
849 ) 849 )
850 850
851 ret.append(parsed) 851 ret.append(parsed)
852 return ret 852 return ret
854 def _search(self, query: str) -> defer.Deferred: 854 def _search(self, query: str) -> defer.Deferred:
855 query = data_format.deserialise(query) 855 query = data_format.deserialise(query)
856 services = query.get("services") 856 services = query.get("services")
857 if services: 857 if services:
858 query["services"] = [jid.JID(s) for s in services] 858 query["services"] = [jid.JID(s) for s in services]
859 d = defer.ensureDeferred(self.serialisableSearch(query)) 859 d = defer.ensureDeferred(self.serialisable_search(query))
860 d.addCallback(data_format.serialise) 860 d.addCallback(data_format.serialise)
861 return d 861 return d