Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_pubsub_cache.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | ba46d6a0ff3a |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
53 CACHE_LIMIT = 5000 | 53 CACHE_LIMIT = 5000 |
54 # number of second before a progress caching is considered failed and tried again | 54 # number of second before a progress caching is considered failed and tried again |
55 PROGRESS_DEADLINE = 60 * 60 * 6 | 55 PROGRESS_DEADLINE = 60 * 60 * 6 |
56 | 56 |
57 | 57 |
58 | |
59 class PubsubCache: | 58 class PubsubCache: |
60 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, | 59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, |
61 # but it would be necessary to have this data if some devices unsubscribe a cached | 60 # but it would be necessary to have this data if some devices unsubscribe a cached |
62 # node, as we can then get out of sync. A protoXEP could be proposed to fix this | 61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this |
63 # situation. | 62 # situation. |
194 raise ValueError('"name" is mandatory in analyser') | 193 raise ValueError('"name" is mandatory in analyser') |
195 if "type" not in analyser: | 194 if "type" not in analyser: |
196 raise ValueError('"type" is mandatory in analyser') | 195 raise ValueError('"type" is mandatory in analyser') |
197 type_test_keys = {"node", "namespace"} | 196 type_test_keys = {"node", "namespace"} |
198 if not type_test_keys.intersection(analyser): | 197 if not type_test_keys.intersection(analyser): |
199 raise ValueError(f'at least one of {type_test_keys} must be used') | 198 raise ValueError(f"at least one of {type_test_keys} must be used") |
200 if name in self.analysers: | 199 if name in self.analysers: |
201 raise exceptions.Conflict( | 200 raise exceptions.Conflict( |
202 f"An analyser with the name {name!r} is already registered" | 201 f"An analyser with the name {name!r} is already registered" |
203 ) | 202 ) |
204 self.analysers[name] = analyser | 203 self.analysers[name] = analyser |
205 | 204 |
206 async def cache_items( | 205 async def cache_items( |
207 self, | 206 self, client: SatXMPPEntity, pubsub_node: PubsubNode, items: List[domish.Element] |
208 client: SatXMPPEntity, | |
209 pubsub_node: PubsubNode, | |
210 items: List[domish.Element] | |
211 ) -> None: | 207 ) -> None: |
212 try: | 208 try: |
213 parser = self.analysers[pubsub_node.analyser].get("parser") | 209 parser = self.analysers[pubsub_node.analyser].get("parser") |
214 except KeyError: | 210 except KeyError: |
215 parser = None | 211 parser = None |
216 | 212 |
217 if parser is not None: | 213 if parser is not None: |
218 parsed_items = [ | 214 parsed_items = [ |
219 await utils.as_deferred( | 215 await utils.as_deferred( |
220 parser, | 216 parser, client, item, pubsub_node.service, pubsub_node.name |
221 client, | |
222 item, | |
223 pubsub_node.service, | |
224 pubsub_node.name | |
225 ) | 217 ) |
226 for item in items | 218 for item in items |
227 ] | 219 ] |
228 else: | 220 else: |
229 parsed_items = None | 221 parsed_items = None |
230 | 222 |
231 await self.host.memory.storage.cache_pubsub_items( | 223 await self.host.memory.storage.cache_pubsub_items( |
232 client, pubsub_node, items, parsed_items | 224 client, pubsub_node, items, parsed_items |
233 ) | 225 ) |
234 | 226 |
235 async def _cache_node( | 227 async def _cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None: |
236 self, | |
237 client: SatXMPPEntity, | |
238 pubsub_node: PubsubNode | |
239 ) -> None: | |
240 await self.host.memory.storage.update_pubsub_node_sync_state( | 228 await self.host.memory.storage.update_pubsub_node_sync_state( |
241 pubsub_node, SyncState.IN_PROGRESS | 229 pubsub_node, SyncState.IN_PROGRESS |
242 ) | 230 ) |
243 service, node = pubsub_node.service, pubsub_node.name | 231 service, node = pubsub_node.service, pubsub_node.name |
244 try: | 232 try: |
245 log.debug( | 233 log.debug(f"Caching node {node!r} at {service} for {client.profile}") |
246 f"Caching node {node!r} at {service} for {client.profile}" | |
247 ) | |
248 if not pubsub_node.subscribed: | 234 if not pubsub_node.subscribed: |
249 try: | 235 try: |
250 sub = await self._p.subscribe(client, service, node) | 236 sub = await self._p.subscribe(client, service, node) |
251 except Exception as e: | 237 except Exception as e: |
252 log.warning( | 238 log.warning( |
284 "latest 20 items" | 270 "latest 20 items" |
285 ) | 271 ) |
286 items, __ = await client.pubsub_client.items( | 272 items, __ = await client.pubsub_client.items( |
287 pubsub_node.service, pubsub_node.name, maxItems=20 | 273 pubsub_node.service, pubsub_node.name, maxItems=20 |
288 ) | 274 ) |
289 await self.cache_items( | 275 await self.cache_items(client, pubsub_node, items) |
290 client, pubsub_node, items | |
291 ) | |
292 else: | 276 else: |
293 raise e | 277 raise e |
294 except exceptions.FeatureNotFound: | 278 except exceptions.FeatureNotFound: |
295 log.warning( | 279 log.warning( |
296 f"service {service} doesn't handle Result Set Management " | 280 f"service {service} doesn't handle Result Set Management " |
297 "(XEP-0059), we'll only cache latest 20 items" | 281 "(XEP-0059), we'll only cache latest 20 items" |
298 ) | 282 ) |
299 items, __ = await client.pubsub_client.items( | 283 items, __ = await client.pubsub_client.items( |
300 pubsub_node.service, pubsub_node.name, maxItems=20 | 284 pubsub_node.service, pubsub_node.name, maxItems=20 |
301 ) | 285 ) |
302 await self.cache_items( | 286 await self.cache_items(client, pubsub_node, items) |
303 client, pubsub_node, items | |
304 ) | |
305 else: | 287 else: |
306 rsm_p = self.host.plugins["XEP-0059"] | 288 rsm_p = self.host.plugins["XEP-0059"] |
307 rsm_request = rsm.RSMRequest() | 289 rsm_request = rsm.RSMRequest() |
308 cached_ids = set() | 290 cached_ids = set() |
309 while True: | 291 while True: |
310 items, rsm_response = await client.pubsub_client.items( | 292 items, rsm_response = await client.pubsub_client.items( |
311 service, node, rsm_request=rsm_request | 293 service, node, rsm_request=rsm_request |
312 ) | 294 ) |
313 await self.cache_items( | 295 await self.cache_items(client, pubsub_node, items) |
314 client, pubsub_node, items | |
315 ) | |
316 for item in items: | 296 for item in items: |
317 item_id = item["id"] | 297 item_id = item["id"] |
318 if item_id in cached_ids: | 298 if item_id in cached_ids: |
319 log.warning( | 299 log.warning( |
320 f"Pubsub node {node!r} at {service} is returning several " | 300 f"Pubsub node {node!r} at {service} is returning several " |
341 await self.host.memory.storage.update_pubsub_node_sync_state( | 321 await self.host.memory.storage.update_pubsub_node_sync_state( |
342 pubsub_node, SyncState.COMPLETED | 322 pubsub_node, SyncState.COMPLETED |
343 ) | 323 ) |
344 except Exception as e: | 324 except Exception as e: |
345 import traceback | 325 import traceback |
326 | |
346 tb = traceback.format_tb(e.__traceback__) | 327 tb = traceback.format_tb(e.__traceback__) |
347 log.error( | 328 log.error( |
348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" | 329 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" |
349 ) | 330 ) |
350 await self.host.memory.storage.update_pubsub_node_sync_state( | 331 await self.host.memory.storage.update_pubsub_node_sync_state( |
354 raise e | 335 raise e |
355 | 336 |
356 def _cache_node_clean(self, __, pubsub_node): | 337 def _cache_node_clean(self, __, pubsub_node): |
357 del self.in_progress[(pubsub_node.service, pubsub_node.name)] | 338 del self.in_progress[(pubsub_node.service, pubsub_node.name)] |
358 | 339 |
359 def cache_node( | 340 def cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None: |
360 self, | |
361 client: SatXMPPEntity, | |
362 pubsub_node: PubsubNode | |
363 ) -> None: | |
364 """Launch node caching as a background task""" | 341 """Launch node caching as a background task""" |
365 d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) | 342 d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) |
366 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) | 343 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) |
367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d | 344 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d |
368 return d | 345 return d |
370 async def analyse_node( | 347 async def analyse_node( |
371 self, | 348 self, |
372 client: SatXMPPEntity, | 349 client: SatXMPPEntity, |
373 service: jid.JID, | 350 service: jid.JID, |
374 node: str, | 351 node: str, |
375 pubsub_node : PubsubNode = None, | 352 pubsub_node: PubsubNode = None, |
376 ) -> dict: | 353 ) -> dict: |
377 """Use registered analysers on a node to determine what it is used for""" | 354 """Use registered analysers on a node to determine what it is used for""" |
378 analyse = {"service": service, "node": node} | 355 analyse = {"service": service, "node": node} |
379 if pubsub_node is None: | 356 if pubsub_node is None: |
380 try: | 357 try: |
381 first_item = (await client.pubsub_client.items( | 358 first_item = (await client.pubsub_client.items(service, node, 1))[0][0] |
382 service, node, 1 | |
383 ))[0][0] | |
384 except IndexError: | 359 except IndexError: |
385 pass | 360 pass |
386 except error.StanzaError as e: | 361 except error.StanzaError as e: |
387 if e.condition == "item-not-found": | 362 if e.condition == "item-not-found": |
388 pass | 363 pass |
440 found = True | 415 found = True |
441 break | 416 break |
442 | 417 |
443 else: | 418 else: |
444 found = False | 419 found = False |
445 log.debug( | 420 log.debug(f"node {node!r} at service {service} doesn't match any known type") |
446 f"node {node!r} at service {service} doesn't match any known type" | |
447 ) | |
448 if found: | 421 if found: |
449 try: | 422 try: |
450 match_cb = analyser["match_cb"] | 423 match_cb = analyser["match_cb"] |
451 except KeyError: | 424 except KeyError: |
452 pass | 425 pass |
453 else: | 426 else: |
454 await utils.as_deferred(match_cb, client, analyse) | 427 await utils.as_deferred(match_cb, client, analyse) |
455 return analyse | 428 return analyse |
456 | 429 |
457 def _get_items_from_cache( | 430 def _get_items_from_cache( |
458 self, service="", node="", max_items=10, item_ids=None, sub_id=None, | 431 self, |
459 extra="", profile_key=C.PROF_KEY_NONE | 432 service="", |
433 node="", | |
434 max_items=10, | |
435 item_ids=None, | |
436 sub_id=None, | |
437 extra="", | |
438 profile_key=C.PROF_KEY_NONE, | |
460 ): | 439 ): |
461 d = defer.ensureDeferred(self._a_get_items_from_cache( | 440 d = defer.ensureDeferred( |
462 service, node, max_items, item_ids, sub_id, extra, profile_key | 441 self._a_get_items_from_cache( |
463 )) | 442 service, node, max_items, item_ids, sub_id, extra, profile_key |
443 ) | |
444 ) | |
464 d.addCallback(self._p.trans_items_data) | 445 d.addCallback(self._p.trans_items_data) |
465 d.addCallback(self._p.serialise_items) | 446 d.addCallback(self._p.serialise_items) |
466 return d | 447 return d |
467 | 448 |
468 async def _a_get_items_from_cache( | 449 async def _a_get_items_from_cache( |
496 node: PubsubNode, | 477 node: PubsubNode, |
497 max_items: Optional[int] = None, | 478 max_items: Optional[int] = None, |
498 item_ids: Optional[List[str]] = None, | 479 item_ids: Optional[List[str]] = None, |
499 sub_id: Optional[str] = None, | 480 sub_id: Optional[str] = None, |
500 rsm_request: Optional[rsm.RSMRequest] = None, | 481 rsm_request: Optional[rsm.RSMRequest] = None, |
501 extra: Optional[Dict[str, Any]] = None | 482 extra: Optional[Dict[str, Any]] = None, |
502 ) -> Tuple[List[PubsubItem], dict]: | 483 ) -> Tuple[List[PubsubItem], dict]: |
503 """Get items from cache, using same arguments as for external Pubsub request""" | 484 """Get items from cache, using same arguments as for external Pubsub request""" |
504 if extra is None: | 485 if extra is None: |
505 extra = {} | 486 extra = {} |
506 if "mam" in extra: | 487 if "mam" in extra: |
507 raise NotImplementedError("MAM queries are not supported yet") | 488 raise NotImplementedError("MAM queries are not supported yet") |
508 if max_items is None and rsm_request is None: | 489 if max_items is None and rsm_request is None: |
509 max_items = 20 | 490 max_items = 20 |
510 pubsub_items, metadata = await self.host.memory.storage.get_items( | 491 pubsub_items, metadata = await self.host.memory.storage.get_items( |
511 node, max_items=max_items, item_ids=item_ids or None, | 492 node, |
512 order_by=extra.get(C.KEY_ORDER_BY) | 493 max_items=max_items, |
494 item_ids=item_ids or None, | |
495 order_by=extra.get(C.KEY_ORDER_BY), | |
513 ) | 496 ) |
514 elif max_items is not None: | 497 elif max_items is not None: |
515 if rsm_request is not None: | 498 if rsm_request is not None: |
516 raise exceptions.InternalError( | 499 raise exceptions.InternalError( |
517 "Pubsub max items and RSM must not be used at the same time" | 500 "Pubsub max items and RSM must not be used at the same time" |
529 before = None | 512 before = None |
530 desc = True | 513 desc = True |
531 else: | 514 else: |
532 before = rsm_request.before | 515 before = rsm_request.before |
533 pubsub_items, metadata = await self.host.memory.storage.get_items( | 516 pubsub_items, metadata = await self.host.memory.storage.get_items( |
534 node, max_items=rsm_request.max, before=before, after=rsm_request.after, | 517 node, |
535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), | 518 max_items=rsm_request.max, |
536 desc=desc, force_rsm=True, | 519 before=before, |
520 after=rsm_request.after, | |
521 from_index=rsm_request.index, | |
522 order_by=extra.get(C.KEY_ORDER_BY), | |
523 desc=desc, | |
524 force_rsm=True, | |
537 ) | 525 ) |
538 | 526 |
539 return pubsub_items, metadata | 527 return pubsub_items, metadata |
540 | 528 |
541 async def on_items_event(self, client, event): | 529 async def on_items_event(self, client, event): |
564 log.warning( | 552 log.warning( |
565 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" | 553 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" |
566 ) | 554 ) |
567 if items: | 555 if items: |
568 log.debug(f"[{client.profile}] caching new items received from {node}") | 556 log.debug(f"[{client.profile}] caching new items received from {node}") |
569 await self.cache_items( | 557 await self.cache_items(client, node, items) |
570 client, node, items | |
571 ) | |
572 if retract_ids: | 558 if retract_ids: |
573 log.debug(f"deleting retracted items from {node}") | 559 log.debug(f"deleting retracted items from {node}") |
574 await self.host.memory.storage.delete_pubsub_items( | 560 await self.host.memory.storage.delete_pubsub_items( |
575 node, items_names=retract_ids | 561 node, items_names=retract_ids |
576 ) | 562 ) |
600 node: str, | 586 node: str, |
601 max_items: Optional[int], | 587 max_items: Optional[int], |
602 item_ids: Optional[List[str]], | 588 item_ids: Optional[List[str]], |
603 sub_id: Optional[str], | 589 sub_id: Optional[str], |
604 rsm_request: Optional[rsm.RSMRequest], | 590 rsm_request: Optional[rsm.RSMRequest], |
605 extra: dict | 591 extra: dict, |
606 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: | 592 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: |
607 if not self.use_cache: | 593 if not self.use_cache: |
608 log.debug("cache disabled in settings") | 594 log.debug("cache disabled in settings") |
609 return True, None | 595 return True, None |
610 if extra.get(C.KEY_USE_CACHE) == False: | 596 if extra.get(C.KEY_USE_CACHE) == False: |
697 client: SatXMPPEntity, | 683 client: SatXMPPEntity, |
698 service: jid.JID, | 684 service: jid.JID, |
699 nodeIdentifier: str, | 685 nodeIdentifier: str, |
700 sub_jid: Optional[jid.JID], | 686 sub_jid: Optional[jid.JID], |
701 options: Optional[dict], | 687 options: Optional[dict], |
702 subscription: pubsub.Subscription | 688 subscription: pubsub.Subscription, |
703 ) -> None: | 689 ) -> None: |
704 pass | 690 pass |
705 | 691 |
706 async def _unsubscribe_trigger( | 692 async def _unsubscribe_trigger( |
707 self, | 693 self, |
718 client = self.host.get_client(profile_key) | 704 client = self.host.get_client(profile_key) |
719 service = client.jid.userhostJID() if not service else jid.JID(service) | 705 service = client.jid.userhostJID() if not service else jid.JID(service) |
720 return defer.ensureDeferred(self.synchronise(client, service, node)) | 706 return defer.ensureDeferred(self.synchronise(client, service, node)) |
721 | 707 |
722 async def synchronise( | 708 async def synchronise( |
723 self, | 709 self, client: SatXMPPEntity, service: jid.JID, node: str, resync: bool = True |
724 client: SatXMPPEntity, | |
725 service: jid.JID, | |
726 node: str, | |
727 resync: bool = True | |
728 ) -> None: | 710 ) -> None: |
729 """Synchronise a node with a pubsub service | 711 """Synchronise a node with a pubsub service |
730 | 712 |
731 The node will be synchronised even if there is no matching analyser. | 713 The node will be synchronised even if there is no matching analyser. |
732 | 714 |
738 pubsub_node = await self.host.memory.storage.get_pubsub_node( | 720 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
739 client, service, node | 721 client, service, node |
740 ) | 722 ) |
741 if pubsub_node is None: | 723 if pubsub_node is None: |
742 log.info( | 724 log.info( |
743 _( | 725 _("Synchronising the new node {node} at {service}").format( |
744 "Synchronising the new node {node} at {service}" | 726 node=node, service=service.full |
745 ).format(node=node, service=service.full) | 727 ) |
746 ) | 728 ) |
747 analyse = await self.analyse_node(client, service, node) | 729 analyse = await self.analyse_node(client, service, node) |
748 pubsub_node = await self.host.memory.storage.set_pubsub_node( | 730 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
749 client, | 731 client, |
750 service, | 732 service, |
751 node, | 733 node, |
752 analyser=analyse.get("name"), | 734 analyser=analyse.get("name"), |
753 type_=analyse.get("type"), | 735 type_=analyse.get("type"), |
754 ) | 736 ) |
755 elif not resync and pubsub_node.sync_state is not None: | 737 elif not resync and pubsub_node.sync_state is not None: |
756 # the node exists, nothing to do | 738 # the node exists, nothing to do |
757 return | 739 return |
758 | 740 |
759 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS | 741 if ( |
760 or (service, node) in self.in_progress)): | 742 pubsub_node.sync_state == SyncState.IN_PROGRESS |
743 or (service, node) in self.in_progress | |
744 ): | |
761 log.warning( | 745 log.warning( |
762 _( | 746 _( |
763 "{node} at {service} is already being synchronised, can't do a new " | 747 "{node} at {service} is already being synchronised, can't do a new " |
764 "synchronisation." | 748 "synchronisation." |
765 ).format(node=node, service=service) | 749 ).format(node=node, service=service) |