Mercurial > libervia-backend
comparison sat/plugins/plugin_pubsub_cache.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | 036188fff714 |
children |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
63 # situation. | 63 # situation. |
64 # TODO: handle configuration events | 64 # TODO: handle configuration events |
65 | 65 |
66 def __init__(self, host): | 66 def __init__(self, host): |
67 log.info(_("PubSub Cache initialization")) | 67 log.info(_("PubSub Cache initialization")) |
68 strategy = host.memory.getConfig(None, "pubsub_cache_strategy") | 68 strategy = host.memory.config_get(None, "pubsub_cache_strategy") |
69 if strategy == "no_cache": | 69 if strategy == "no_cache": |
70 log.info( | 70 log.info( |
71 _( | 71 _( |
72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " | 72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " |
73 "setting." | 73 "setting." |
79 self.host = host | 79 self.host = host |
80 self._p = host.plugins["XEP-0060"] | 80 self._p = host.plugins["XEP-0060"] |
81 self.analysers = {} | 81 self.analysers = {} |
82 # map for caching in progress (node, service) => Deferred | 82 # map for caching in progress (node, service) => Deferred |
83 self.in_progress = {} | 83 self.in_progress = {} |
84 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) | 84 self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger) |
85 self._p.addManagedNode( | 85 self._p.add_managed_node( |
86 "", | 86 "", |
87 items_cb=self.onItemsEvent, | 87 items_cb=self.on_items_event, |
88 delete_cb=self.onDeleteEvent, | 88 delete_cb=self.on_delete_event, |
89 purge_db=self.onPurgeEvent, | 89 purge_db=self.on_purge_event, |
90 ) | 90 ) |
91 host.bridge.addMethod( | 91 host.bridge.add_method( |
92 "psCacheGet", | 92 "ps_cache_get", |
93 ".plugin", | 93 ".plugin", |
94 in_sign="ssiassss", | 94 in_sign="ssiassss", |
95 out_sign="s", | 95 out_sign="s", |
96 method=self._getItemsFromCache, | 96 method=self._get_items_from_cache, |
97 async_=True, | 97 async_=True, |
98 ) | 98 ) |
99 host.bridge.addMethod( | 99 host.bridge.add_method( |
100 "psCacheSync", | 100 "ps_cache_sync", |
101 ".plugin", | 101 ".plugin", |
102 "sss", | 102 "sss", |
103 out_sign="", | 103 out_sign="", |
104 method=self._synchronise, | 104 method=self._synchronise, |
105 async_=True, | 105 async_=True, |
106 ) | 106 ) |
107 host.bridge.addMethod( | 107 host.bridge.add_method( |
108 "psCachePurge", | 108 "ps_cache_purge", |
109 ".plugin", | 109 ".plugin", |
110 "s", | 110 "s", |
111 out_sign="", | 111 out_sign="", |
112 method=self._purge, | 112 method=self._purge, |
113 async_=True, | 113 async_=True, |
114 ) | 114 ) |
115 host.bridge.addMethod( | 115 host.bridge.add_method( |
116 "psCacheReset", | 116 "ps_cache_reset", |
117 ".plugin", | 117 ".plugin", |
118 "", | 118 "", |
119 out_sign="", | 119 out_sign="", |
120 method=self._reset, | 120 method=self._reset, |
121 async_=True, | 121 async_=True, |
122 ) | 122 ) |
123 host.bridge.addMethod( | 123 host.bridge.add_method( |
124 "psCacheSearch", | 124 "ps_cache_search", |
125 ".plugin", | 125 ".plugin", |
126 "s", | 126 "s", |
127 out_sign="s", | 127 out_sign="s", |
128 method=self._search, | 128 method=self._search, |
129 async_=True, | 129 async_=True, |
130 ) | 130 ) |
131 | 131 |
132 def registerAnalyser(self, analyser: dict) -> None: | 132 def register_analyser(self, analyser: dict) -> None: |
133 """Register a new pubsub node analyser | 133 """Register a new pubsub node analyser |
134 | 134 |
135 @param analyser: An analyser is a dictionary which may have the following keys | 135 @param analyser: An analyser is a dictionary which may have the following keys |
136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys | 136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys |
137 must be used): | 137 must be used): |
201 raise exceptions.Conflict( | 201 raise exceptions.Conflict( |
202 f"An analyser with the name {name!r} is already registered" | 202 f"An analyser with the name {name!r} is already registered" |
203 ) | 203 ) |
204 self.analysers[name] = analyser | 204 self.analysers[name] = analyser |
205 | 205 |
206 async def cacheItems( | 206 async def cache_items( |
207 self, | 207 self, |
208 client: SatXMPPEntity, | 208 client: SatXMPPEntity, |
209 pubsub_node: PubsubNode, | 209 pubsub_node: PubsubNode, |
210 items: List[domish.Element] | 210 items: List[domish.Element] |
211 ) -> None: | 211 ) -> None: |
214 except KeyError: | 214 except KeyError: |
215 parser = None | 215 parser = None |
216 | 216 |
217 if parser is not None: | 217 if parser is not None: |
218 parsed_items = [ | 218 parsed_items = [ |
219 await utils.asDeferred( | 219 await utils.as_deferred( |
220 parser, | 220 parser, |
221 client, | 221 client, |
222 item, | 222 item, |
223 pubsub_node.service, | 223 pubsub_node.service, |
224 pubsub_node.name | 224 pubsub_node.name |
226 for item in items | 226 for item in items |
227 ] | 227 ] |
228 else: | 228 else: |
229 parsed_items = None | 229 parsed_items = None |
230 | 230 |
231 await self.host.memory.storage.cachePubsubItems( | 231 await self.host.memory.storage.cache_pubsub_items( |
232 client, pubsub_node, items, parsed_items | 232 client, pubsub_node, items, parsed_items |
233 ) | 233 ) |
234 | 234 |
235 async def _cacheNode( | 235 async def _cache_node( |
236 self, | 236 self, |
237 client: SatXMPPEntity, | 237 client: SatXMPPEntity, |
238 pubsub_node: PubsubNode | 238 pubsub_node: PubsubNode |
239 ) -> None: | 239 ) -> None: |
240 await self.host.memory.storage.updatePubsubNodeSyncState( | 240 await self.host.memory.storage.update_pubsub_node_sync_state( |
241 pubsub_node, SyncState.IN_PROGRESS | 241 pubsub_node, SyncState.IN_PROGRESS |
242 ) | 242 ) |
243 service, node = pubsub_node.service, pubsub_node.name | 243 service, node = pubsub_node.service, pubsub_node.name |
244 try: | 244 try: |
245 log.debug( | 245 log.debug( |
272 "{state}" | 272 "{state}" |
273 ).format(pubsub_node=pubsub_node, state=sub.state) | 273 ).format(pubsub_node=pubsub_node, state=sub.state) |
274 ) | 274 ) |
275 | 275 |
276 try: | 276 try: |
277 await self.host.checkFeatures( | 277 await self.host.check_features( |
278 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service | 278 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service |
279 ) | 279 ) |
280 except error.StanzaError as e: | 280 except error.StanzaError as e: |
281 if e.condition == "service-unavailable": | 281 if e.condition == "service-unavailable": |
282 log.warning( | 282 log.warning( |
284 "latest 20 items" | 284 "latest 20 items" |
285 ) | 285 ) |
286 items, __ = await client.pubsub_client.items( | 286 items, __ = await client.pubsub_client.items( |
287 pubsub_node.service, pubsub_node.name, maxItems=20 | 287 pubsub_node.service, pubsub_node.name, maxItems=20 |
288 ) | 288 ) |
289 await self.cacheItems( | 289 await self.cache_items( |
290 client, pubsub_node, items | 290 client, pubsub_node, items |
291 ) | 291 ) |
292 else: | 292 else: |
293 raise e | 293 raise e |
294 except exceptions.FeatureNotFound: | 294 except exceptions.FeatureNotFound: |
297 "(XEP-0059), we'll only cache latest 20 items" | 297 "(XEP-0059), we'll only cache latest 20 items" |
298 ) | 298 ) |
299 items, __ = await client.pubsub_client.items( | 299 items, __ = await client.pubsub_client.items( |
300 pubsub_node.service, pubsub_node.name, maxItems=20 | 300 pubsub_node.service, pubsub_node.name, maxItems=20 |
301 ) | 301 ) |
302 await self.cacheItems( | 302 await self.cache_items( |
303 client, pubsub_node, items | 303 client, pubsub_node, items |
304 ) | 304 ) |
305 else: | 305 else: |
306 rsm_p = self.host.plugins["XEP-0059"] | 306 rsm_p = self.host.plugins["XEP-0059"] |
307 rsm_request = rsm.RSMRequest() | 307 rsm_request = rsm.RSMRequest() |
308 cached_ids = set() | 308 cached_ids = set() |
309 while True: | 309 while True: |
310 items, rsm_response = await client.pubsub_client.items( | 310 items, rsm_response = await client.pubsub_client.items( |
311 service, node, rsm_request=rsm_request | 311 service, node, rsm_request=rsm_request |
312 ) | 312 ) |
313 await self.cacheItems( | 313 await self.cache_items( |
314 client, pubsub_node, items | 314 client, pubsub_node, items |
315 ) | 315 ) |
316 for item in items: | 316 for item in items: |
317 item_id = item["id"] | 317 item_id = item["id"] |
318 if item_id in cached_ids: | 318 if item_id in cached_ids: |
332 f"than the cache limit ({CACHE_LIMIT}). We stop " | 332 f"than the cache limit ({CACHE_LIMIT}). We stop " |
333 "caching here, at item {item['id']!r}." | 333 "caching here, at item {item['id']!r}." |
334 ) | 334 ) |
335 rsm_request = None | 335 rsm_request = None |
336 break | 336 break |
337 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) | 337 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) |
338 if rsm_request is None: | 338 if rsm_request is None: |
339 break | 339 break |
340 | 340 |
341 await self.host.memory.storage.updatePubsubNodeSyncState( | 341 await self.host.memory.storage.update_pubsub_node_sync_state( |
342 pubsub_node, SyncState.COMPLETED | 342 pubsub_node, SyncState.COMPLETED |
343 ) | 343 ) |
344 except Exception as e: | 344 except Exception as e: |
345 import traceback | 345 import traceback |
346 tb = traceback.format_tb(e.__traceback__) | 346 tb = traceback.format_tb(e.__traceback__) |
347 log.error( | 347 log.error( |
348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" | 348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" |
349 ) | 349 ) |
350 await self.host.memory.storage.updatePubsubNodeSyncState( | 350 await self.host.memory.storage.update_pubsub_node_sync_state( |
351 pubsub_node, SyncState.ERROR | 351 pubsub_node, SyncState.ERROR |
352 ) | 352 ) |
353 await self.host.memory.storage.deletePubsubItems(pubsub_node) | 353 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
354 raise e | 354 raise e |
355 | 355 |
356 def _cacheNodeClean(self, __, pubsub_node): | 356 def _cache_node_clean(self, __, pubsub_node): |
357 del self.in_progress[(pubsub_node.service, pubsub_node.name)] | 357 del self.in_progress[(pubsub_node.service, pubsub_node.name)] |
358 | 358 |
359 def cacheNode( | 359 def cache_node( |
360 self, | 360 self, |
361 client: SatXMPPEntity, | 361 client: SatXMPPEntity, |
362 pubsub_node: PubsubNode | 362 pubsub_node: PubsubNode |
363 ) -> None: | 363 ) -> None: |
364 """Launch node caching as a background task""" | 364 """Launch node caching as a background task""" |
365 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) | 365 d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) |
366 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) | 366 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) |
367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d | 367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d |
368 return d | 368 return d |
369 | 369 |
370 async def analyseNode( | 370 async def analyse_node( |
371 self, | 371 self, |
372 client: SatXMPPEntity, | 372 client: SatXMPPEntity, |
373 service: jid.JID, | 373 service: jid.JID, |
374 node: str, | 374 node: str, |
375 pubsub_node : PubsubNode = None, | 375 pubsub_node : PubsubNode = None, |
449 try: | 449 try: |
450 match_cb = analyser["match_cb"] | 450 match_cb = analyser["match_cb"] |
451 except KeyError: | 451 except KeyError: |
452 pass | 452 pass |
453 else: | 453 else: |
454 await utils.asDeferred(match_cb, client, analyse) | 454 await utils.as_deferred(match_cb, client, analyse) |
455 return analyse | 455 return analyse |
456 | 456 |
457 def _getItemsFromCache( | 457 def _get_items_from_cache( |
458 self, service="", node="", max_items=10, item_ids=None, sub_id=None, | 458 self, service="", node="", max_items=10, item_ids=None, sub_id=None, |
459 extra="", profile_key=C.PROF_KEY_NONE | 459 extra="", profile_key=C.PROF_KEY_NONE |
460 ): | 460 ): |
461 d = defer.ensureDeferred(self._aGetItemsFromCache( | 461 d = defer.ensureDeferred(self._a_get_items_from_cache( |
462 service, node, max_items, item_ids, sub_id, extra, profile_key | 462 service, node, max_items, item_ids, sub_id, extra, profile_key |
463 )) | 463 )) |
464 d.addCallback(self._p.transItemsData) | 464 d.addCallback(self._p.trans_items_data) |
465 d.addCallback(self._p.serialiseItems) | 465 d.addCallback(self._p.serialise_items) |
466 return d | 466 return d |
467 | 467 |
468 async def _aGetItemsFromCache( | 468 async def _a_get_items_from_cache( |
469 self, service, node, max_items, item_ids, sub_id, extra, profile_key | 469 self, service, node, max_items, item_ids, sub_id, extra, profile_key |
470 ): | 470 ): |
471 client = self.host.getClient(profile_key) | 471 client = self.host.get_client(profile_key) |
472 service = jid.JID(service) if service else client.jid.userhostJID() | 472 service = jid.JID(service) if service else client.jid.userhostJID() |
473 pubsub_node = await self.host.memory.storage.getPubsubNode( | 473 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
474 client, service, node | 474 client, service, node |
475 ) | 475 ) |
476 if pubsub_node is None: | 476 if pubsub_node is None: |
477 raise exceptions.NotFound( | 477 raise exceptions.NotFound( |
478 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" | 478 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" |
479 ) | 479 ) |
480 max_items = None if max_items == C.NO_LIMIT else max_items | 480 max_items = None if max_items == C.NO_LIMIT else max_items |
481 extra = self._p.parseExtra(data_format.deserialise(extra)) | 481 extra = self._p.parse_extra(data_format.deserialise(extra)) |
482 items, metadata = await self.getItemsFromCache( | 482 items, metadata = await self.get_items_from_cache( |
483 client, | 483 client, |
484 pubsub_node, | 484 pubsub_node, |
485 max_items, | 485 max_items, |
486 item_ids, | 486 item_ids, |
487 sub_id or None, | 487 sub_id or None, |
488 extra.rsm_request, | 488 extra.rsm_request, |
489 extra.extra, | 489 extra.extra, |
490 ) | 490 ) |
491 return [i.data for i in items], metadata | 491 return [i.data for i in items], metadata |
492 | 492 |
493 async def getItemsFromCache( | 493 async def get_items_from_cache( |
494 self, | 494 self, |
495 client: SatXMPPEntity, | 495 client: SatXMPPEntity, |
496 node: PubsubNode, | 496 node: PubsubNode, |
497 max_items: Optional[int] = None, | 497 max_items: Optional[int] = None, |
498 item_ids: Optional[List[str]] = None, | 498 item_ids: Optional[List[str]] = None, |
505 extra = {} | 505 extra = {} |
506 if "mam" in extra: | 506 if "mam" in extra: |
507 raise NotImplementedError("MAM queries are not supported yet") | 507 raise NotImplementedError("MAM queries are not supported yet") |
508 if max_items is None and rsm_request is None: | 508 if max_items is None and rsm_request is None: |
509 max_items = 20 | 509 max_items = 20 |
510 pubsub_items, metadata = await self.host.memory.storage.getItems( | 510 pubsub_items, metadata = await self.host.memory.storage.get_items( |
511 node, max_items=max_items, item_ids=item_ids or None, | 511 node, max_items=max_items, item_ids=item_ids or None, |
512 order_by=extra.get(C.KEY_ORDER_BY) | 512 order_by=extra.get(C.KEY_ORDER_BY) |
513 ) | 513 ) |
514 elif max_items is not None: | 514 elif max_items is not None: |
515 if rsm_request is not None: | 515 if rsm_request is not None: |
518 ) | 518 ) |
519 elif item_ids: | 519 elif item_ids: |
520 raise exceptions.InternalError( | 520 raise exceptions.InternalError( |
521 "Pubsub max items and item IDs must not be used at the same time" | 521 "Pubsub max items and item IDs must not be used at the same time" |
522 ) | 522 ) |
523 pubsub_items, metadata = await self.host.memory.storage.getItems( | 523 pubsub_items, metadata = await self.host.memory.storage.get_items( |
524 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) | 524 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) |
525 ) | 525 ) |
526 else: | 526 else: |
527 desc = False | 527 desc = False |
528 if rsm_request.before == "": | 528 if rsm_request.before == "": |
529 before = None | 529 before = None |
530 desc = True | 530 desc = True |
531 else: | 531 else: |
532 before = rsm_request.before | 532 before = rsm_request.before |
533 pubsub_items, metadata = await self.host.memory.storage.getItems( | 533 pubsub_items, metadata = await self.host.memory.storage.get_items( |
534 node, max_items=rsm_request.max, before=before, after=rsm_request.after, | 534 node, max_items=rsm_request.max, before=before, after=rsm_request.after, |
535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), | 535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), |
536 desc=desc, force_rsm=True, | 536 desc=desc, force_rsm=True, |
537 ) | 537 ) |
538 | 538 |
539 return pubsub_items, metadata | 539 return pubsub_items, metadata |
540 | 540 |
541 async def onItemsEvent(self, client, event): | 541 async def on_items_event(self, client, event): |
542 node = await self.host.memory.storage.getPubsubNode( | 542 node = await self.host.memory.storage.get_pubsub_node( |
543 client, event.sender, event.nodeIdentifier | 543 client, event.sender, event.nodeIdentifier |
544 ) | 544 ) |
545 if node is None: | 545 if node is None: |
546 return | 546 return |
547 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): | 547 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): |
553 elif elt.name == "retract": | 553 elif elt.name == "retract": |
554 item_id = elt.getAttribute("id") | 554 item_id = elt.getAttribute("id") |
555 if not item_id: | 555 if not item_id: |
556 log.warning( | 556 log.warning( |
557 "Ignoring invalid retract item element: " | 557 "Ignoring invalid retract item element: " |
558 f"{xml_tools.pFmtElt(elt)}" | 558 f"{xml_tools.p_fmt_elt(elt)}" |
559 ) | 559 ) |
560 continue | 560 continue |
561 | 561 |
562 retract_ids.append(elt["id"]) | 562 retract_ids.append(elt["id"]) |
563 else: | 563 else: |
564 log.warning( | 564 log.warning( |
565 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" | 565 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" |
566 ) | 566 ) |
567 if items: | 567 if items: |
568 log.debug(f"[{client.profile}] caching new items received from {node}") | 568 log.debug(f"[{client.profile}] caching new items received from {node}") |
569 await self.cacheItems( | 569 await self.cache_items( |
570 client, node, items | 570 client, node, items |
571 ) | 571 ) |
572 if retract_ids: | 572 if retract_ids: |
573 log.debug(f"deleting retracted items from {node}") | 573 log.debug(f"deleting retracted items from {node}") |
574 await self.host.memory.storage.deletePubsubItems( | 574 await self.host.memory.storage.delete_pubsub_items( |
575 node, items_names=retract_ids | 575 node, items_names=retract_ids |
576 ) | 576 ) |
577 | 577 |
578 async def onDeleteEvent(self, client, event): | 578 async def on_delete_event(self, client, event): |
579 log.debug( | 579 log.debug( |
580 f"deleting node {event.nodeIdentifier} from {event.sender} for " | 580 f"deleting node {event.nodeIdentifier} from {event.sender} for " |
581 f"{client.profile}" | 581 f"{client.profile}" |
582 ) | 582 ) |
583 await self.host.memory.storage.deletePubsubNode( | 583 await self.host.memory.storage.delete_pubsub_node( |
584 [client.profile], [event.sender], [event.nodeIdentifier] | 584 [client.profile], [event.sender], [event.nodeIdentifier] |
585 ) | 585 ) |
586 | 586 |
587 async def onPurgeEvent(self, client, event): | 587 async def on_purge_event(self, client, event): |
588 node = await self.host.memory.storage.getPubsubNode( | 588 node = await self.host.memory.storage.get_pubsub_node( |
589 client, event.sender, event.nodeIdentifier | 589 client, event.sender, event.nodeIdentifier |
590 ) | 590 ) |
591 if node is None: | 591 if node is None: |
592 return | 592 return |
593 log.debug(f"purging node {node} for {client.profile}") | 593 log.debug(f"purging node {node} for {client.profile}") |
594 await self.host.memory.storage.deletePubsubItems(node) | 594 await self.host.memory.storage.delete_pubsub_items(node) |
595 | 595 |
596 async def _getItemsTrigger( | 596 async def _get_items_trigger( |
597 self, | 597 self, |
598 client: SatXMPPEntity, | 598 client: SatXMPPEntity, |
599 service: Optional[jid.JID], | 599 service: Optional[jid.JID], |
600 node: str, | 600 node: str, |
601 max_items: Optional[int], | 601 max_items: Optional[int], |
611 log.debug("skipping pubsub cache as requested") | 611 log.debug("skipping pubsub cache as requested") |
612 return True, None | 612 return True, None |
613 if service is None: | 613 if service is None: |
614 service = client.jid.userhostJID() | 614 service = client.jid.userhostJID() |
615 for __ in range(5): | 615 for __ in range(5): |
616 pubsub_node = await self.host.memory.storage.getPubsubNode( | 616 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
617 client, service, node | 617 client, service, node |
618 ) | 618 ) |
619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: | 619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: |
620 analyse = {"to_sync": True} | 620 analyse = {"to_sync": True} |
621 else: | 621 else: |
622 analyse = await self.analyseNode(client, service, node) | 622 analyse = await self.analyse_node(client, service, node) |
623 | 623 |
624 if pubsub_node is None: | 624 if pubsub_node is None: |
625 try: | 625 try: |
626 pubsub_node = await self.host.memory.storage.setPubsubNode( | 626 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
627 client, | 627 client, |
628 service, | 628 service, |
629 node, | 629 node, |
630 analyser=analyse.get("name"), | 630 analyser=analyse.get("name"), |
631 type_=analyse.get("type"), | 631 type_=analyse.get("type"), |
648 if analyse.get("to_sync"): | 648 if analyse.get("to_sync"): |
649 if pubsub_node.sync_state == SyncState.COMPLETED: | 649 if pubsub_node.sync_state == SyncState.COMPLETED: |
650 if "mam" in extra: | 650 if "mam" in extra: |
651 log.debug("MAM caching is not supported yet, skipping cache") | 651 log.debug("MAM caching is not supported yet, skipping cache") |
652 return True, None | 652 return True, None |
653 pubsub_items, metadata = await self.getItemsFromCache( | 653 pubsub_items, metadata = await self.get_items_from_cache( |
654 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra | 654 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra |
655 ) | 655 ) |
656 return False, ([i.data for i in pubsub_items], metadata) | 656 return False, ([i.data for i in pubsub_items], metadata) |
657 | 657 |
658 if pubsub_node.sync_state == SyncState.IN_PROGRESS: | 658 if pubsub_node.sync_state == SyncState.IN_PROGRESS: |
661 f"{pubsub_node} is reported as being cached, but not caching is " | 661 f"{pubsub_node} is reported as being cached, but not caching is " |
662 "in progress, this is most probably due to the backend being " | 662 "in progress, this is most probably due to the backend being " |
663 "restarted. Resetting the status, caching will be done again." | 663 "restarted. Resetting the status, caching will be done again." |
664 ) | 664 ) |
665 pubsub_node.sync_state = None | 665 pubsub_node.sync_state = None |
666 await self.host.memory.storage.deletePubsubItems(pubsub_node) | 666 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
667 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: | 667 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: |
668 log.warning( | 668 log.warning( |
669 f"{pubsub_node} is in progress for too long " | 669 f"{pubsub_node} is in progress for too long " |
670 f"({pubsub_node.sync_state_updated//60} minutes), " | 670 f"({pubsub_node.sync_state_updated//60} minutes), " |
671 "cancelling it and retrying." | 671 "cancelling it and retrying." |
672 ) | 672 ) |
673 self.in_progress.pop[(service, node)].cancel() | 673 self.in_progress.pop[(service, node)].cancel() |
674 pubsub_node.sync_state = None | 674 pubsub_node.sync_state = None |
675 await self.host.memory.storage.deletePubsubItems(pubsub_node) | 675 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
676 else: | 676 else: |
677 log.debug( | 677 log.debug( |
678 f"{pubsub_node} synchronisation is already in progress, skipping" | 678 f"{pubsub_node} synchronisation is already in progress, skipping" |
679 ) | 679 ) |
680 if pubsub_node.sync_state is None: | 680 if pubsub_node.sync_state is None: |
682 if key in self.in_progress: | 682 if key in self.in_progress: |
683 raise exceptions.InternalError( | 683 raise exceptions.InternalError( |
684 f"There is already a caching in progress for {pubsub_node}, this " | 684 f"There is already a caching in progress for {pubsub_node}, this " |
685 "should not happen" | 685 "should not happen" |
686 ) | 686 ) |
687 self.cacheNode(client, pubsub_node) | 687 self.cache_node(client, pubsub_node) |
688 elif pubsub_node.sync_state == SyncState.ERROR: | 688 elif pubsub_node.sync_state == SyncState.ERROR: |
689 log.debug( | 689 log.debug( |
690 f"{pubsub_node} synchronisation has previously failed, skipping" | 690 f"{pubsub_node} synchronisation has previously failed, skipping" |
691 ) | 691 ) |
692 | 692 |
693 return True, None | 693 return True, None |
694 | 694 |
695 async def _subscribeTrigger( | 695 async def _subscribe_trigger( |
696 self, | 696 self, |
697 client: SatXMPPEntity, | 697 client: SatXMPPEntity, |
698 service: jid.JID, | 698 service: jid.JID, |
699 nodeIdentifier: str, | 699 nodeIdentifier: str, |
700 sub_jid: Optional[jid.JID], | 700 sub_jid: Optional[jid.JID], |
701 options: Optional[dict], | 701 options: Optional[dict], |
702 subscription: pubsub.Subscription | 702 subscription: pubsub.Subscription |
703 ) -> None: | 703 ) -> None: |
704 pass | 704 pass |
705 | 705 |
706 async def _unsubscribeTrigger( | 706 async def _unsubscribe_trigger( |
707 self, | 707 self, |
708 client: SatXMPPEntity, | 708 client: SatXMPPEntity, |
709 service: jid.JID, | 709 service: jid.JID, |
710 nodeIdentifier: str, | 710 nodeIdentifier: str, |
711 sub_jid, | 711 sub_jid, |
713 sender, | 713 sender, |
714 ) -> None: | 714 ) -> None: |
715 pass | 715 pass |
716 | 716 |
717 def _synchronise(self, service, node, profile_key): | 717 def _synchronise(self, service, node, profile_key): |
718 client = self.host.getClient(profile_key) | 718 client = self.host.get_client(profile_key) |
719 service = client.jid.userhostJID() if not service else jid.JID(service) | 719 service = client.jid.userhostJID() if not service else jid.JID(service) |
720 return defer.ensureDeferred(self.synchronise(client, service, node)) | 720 return defer.ensureDeferred(self.synchronise(client, service, node)) |
721 | 721 |
722 async def synchronise( | 722 async def synchronise( |
723 self, | 723 self, |
733 Note that when a node is synchronised, it is automatically subscribed. | 733 Note that when a node is synchronised, it is automatically subscribed. |
734 @param resync: if True and the node is already synchronised, it will be | 734 @param resync: if True and the node is already synchronised, it will be |
735 resynchronised (all items will be deleted and re-downloaded). | 735 resynchronised (all items will be deleted and re-downloaded). |
736 | 736 |
737 """ | 737 """ |
738 pubsub_node = await self.host.memory.storage.getPubsubNode( | 738 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
739 client, service, node | 739 client, service, node |
740 ) | 740 ) |
741 if pubsub_node is None: | 741 if pubsub_node is None: |
742 log.info( | 742 log.info( |
743 _( | 743 _( |
744 "Synchronising the new node {node} at {service}" | 744 "Synchronising the new node {node} at {service}" |
745 ).format(node=node, service=service.full) | 745 ).format(node=node, service=service.full) |
746 ) | 746 ) |
747 analyse = await self.analyseNode(client, service, node) | 747 analyse = await self.analyse_node(client, service, node) |
748 pubsub_node = await self.host.memory.storage.setPubsubNode( | 748 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
749 client, | 749 client, |
750 service, | 750 service, |
751 node, | 751 node, |
752 analyser=analyse.get("name"), | 752 analyser=analyse.get("name"), |
753 type_=analyse.get("type"), | 753 type_=analyse.get("type"), |
770 "(Re)Synchronising the node {node} at {service} on user request" | 770 "(Re)Synchronising the node {node} at {service} on user request" |
771 ).format(node=node, service=service.full()) | 771 ).format(node=node, service=service.full()) |
772 ) | 772 ) |
773 # we first delete and recreate the node (will also delete its items) | 773 # we first delete and recreate the node (will also delete its items) |
774 await self.host.memory.storage.delete(pubsub_node) | 774 await self.host.memory.storage.delete(pubsub_node) |
775 analyse = await self.analyseNode(client, service, node) | 775 analyse = await self.analyse_node(client, service, node) |
776 pubsub_node = await self.host.memory.storage.setPubsubNode( | 776 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
777 client, | 777 client, |
778 service, | 778 service, |
779 node, | 779 node, |
780 analyser=analyse.get("name"), | 780 analyser=analyse.get("name"), |
781 type_=analyse.get("type"), | 781 type_=analyse.get("type"), |
782 ) | 782 ) |
783 # then we can put node in cache | 783 # then we can put node in cache |
784 await self.cacheNode(client, pubsub_node) | 784 await self.cache_node(client, pubsub_node) |
785 | 785 |
786 async def purge(self, purge_filters: dict) -> None: | 786 async def purge(self, purge_filters: dict) -> None: |
787 """Remove items according to filters | 787 """Remove items according to filters |
788 | 788 |
789 filters can have on of the following keys, all are optional: | 789 filters can have on of the following keys, all are optional: |
802 datetime before which items must have been created to be deleted | 802 datetime before which items must have been created to be deleted |
803 :created_update: | 803 :created_update: |
804 datetime before which items must have been updated last to be deleted | 804 datetime before which items must have been updated last to be deleted |
805 """ | 805 """ |
806 purge_filters["names"] = purge_filters.pop("nodes", None) | 806 purge_filters["names"] = purge_filters.pop("nodes", None) |
807 await self.host.memory.storage.purgePubsubItems(**purge_filters) | 807 await self.host.memory.storage.purge_pubsub_items(**purge_filters) |
808 | 808 |
809 def _purge(self, purge_filters: str) -> None: | 809 def _purge(self, purge_filters: str) -> None: |
810 purge_filters = data_format.deserialise(purge_filters) | 810 purge_filters = data_format.deserialise(purge_filters) |
811 for key in "created_before", "updated_before": | 811 for key in "created_before", "updated_before": |
812 try: | 812 try: |
818 async def reset(self) -> None: | 818 async def reset(self) -> None: |
819 """Remove ALL nodes and items from cache | 819 """Remove ALL nodes and items from cache |
820 | 820 |
821 After calling this method, cache will be refilled progressively as if it where new | 821 After calling this method, cache will be refilled progressively as if it where new |
822 """ | 822 """ |
823 await self.host.memory.storage.deletePubsubNode(None, None, None) | 823 await self.host.memory.storage.delete_pubsub_node(None, None, None) |
824 | 824 |
825 def _reset(self) -> defer.Deferred: | 825 def _reset(self) -> defer.Deferred: |
826 return defer.ensureDeferred(self.reset()) | 826 return defer.ensureDeferred(self.reset()) |
827 | 827 |
828 async def search(self, query: dict) -> List[PubsubItem]: | 828 async def search(self, query: dict) -> List[PubsubItem]: |
829 """Search pubsub items in cache""" | 829 """Search pubsub items in cache""" |
830 return await self.host.memory.storage.searchPubsubItems(query) | 830 return await self.host.memory.storage.search_pubsub_items(query) |
831 | 831 |
832 async def serialisableSearch(self, query: dict) -> List[dict]: | 832 async def serialisable_search(self, query: dict) -> List[dict]: |
833 """Search pubsub items in cache and returns parsed data | 833 """Search pubsub items in cache and returns parsed data |
834 | 834 |
835 The returned data can be serialised. | 835 The returned data can be serialised. |
836 | 836 |
837 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) | 837 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) |
842 parsed = item.parsed | 842 parsed = item.parsed |
843 parsed["pubsub_service"] = item.node.service.full() | 843 parsed["pubsub_service"] = item.node.service.full() |
844 parsed["pubsub_node"] = item.node.name | 844 parsed["pubsub_node"] = item.node.name |
845 if query.get("with_payload"): | 845 if query.get("with_payload"): |
846 parsed["item_payload"] = item.data.toXml() | 846 parsed["item_payload"] = item.data.toXml() |
847 parsed["node_profile"] = self.host.memory.storage.getProfileById( | 847 parsed["node_profile"] = self.host.memory.storage.get_profile_by_id( |
848 item.node.profile_id | 848 item.node.profile_id |
849 ) | 849 ) |
850 | 850 |
851 ret.append(parsed) | 851 ret.append(parsed) |
852 return ret | 852 return ret |
854 def _search(self, query: str) -> defer.Deferred: | 854 def _search(self, query: str) -> defer.Deferred: |
855 query = data_format.deserialise(query) | 855 query = data_format.deserialise(query) |
856 services = query.get("services") | 856 services = query.get("services") |
857 if services: | 857 if services: |
858 query["services"] = [jid.JID(s) for s in services] | 858 query["services"] = [jid.JID(s) for s in services] |
859 d = defer.ensureDeferred(self.serialisableSearch(query)) | 859 d = defer.ensureDeferred(self.serialisable_search(query)) |
860 d.addCallback(data_format.serialise) | 860 d.addCallback(data_format.serialise) |
861 return d | 861 return d |