comparison libervia/backend/plugins/plugin_pubsub_cache.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents ba46d6a0ff3a
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
53 CACHE_LIMIT = 5000 53 CACHE_LIMIT = 5000
54 # number of second before a progress caching is considered failed and tried again 54 # number of second before a progress caching is considered failed and tried again
55 PROGRESS_DEADLINE = 60 * 60 * 6 55 PROGRESS_DEADLINE = 60 * 60 * 6
56 56
57 57
58
59 class PubsubCache: 58 class PubsubCache:
60 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, 59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
61 # but it would be necessary to have this data if some devices unsubscribe a cached 60 # but it would be necessary to have this data if some devices unsubscribe a cached
62 # node, as we can then get out of sync. A protoXEP could be proposed to fix this 61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this
63 # situation. 62 # situation.
194 raise ValueError('"name" is mandatory in analyser') 193 raise ValueError('"name" is mandatory in analyser')
195 if "type" not in analyser: 194 if "type" not in analyser:
196 raise ValueError('"type" is mandatory in analyser') 195 raise ValueError('"type" is mandatory in analyser')
197 type_test_keys = {"node", "namespace"} 196 type_test_keys = {"node", "namespace"}
198 if not type_test_keys.intersection(analyser): 197 if not type_test_keys.intersection(analyser):
199 raise ValueError(f'at least one of {type_test_keys} must be used') 198 raise ValueError(f"at least one of {type_test_keys} must be used")
200 if name in self.analysers: 199 if name in self.analysers:
201 raise exceptions.Conflict( 200 raise exceptions.Conflict(
202 f"An analyser with the name {name!r} is already registered" 201 f"An analyser with the name {name!r} is already registered"
203 ) 202 )
204 self.analysers[name] = analyser 203 self.analysers[name] = analyser
205 204
206 async def cache_items( 205 async def cache_items(
207 self, 206 self, client: SatXMPPEntity, pubsub_node: PubsubNode, items: List[domish.Element]
208 client: SatXMPPEntity,
209 pubsub_node: PubsubNode,
210 items: List[domish.Element]
211 ) -> None: 207 ) -> None:
212 try: 208 try:
213 parser = self.analysers[pubsub_node.analyser].get("parser") 209 parser = self.analysers[pubsub_node.analyser].get("parser")
214 except KeyError: 210 except KeyError:
215 parser = None 211 parser = None
216 212
217 if parser is not None: 213 if parser is not None:
218 parsed_items = [ 214 parsed_items = [
219 await utils.as_deferred( 215 await utils.as_deferred(
220 parser, 216 parser, client, item, pubsub_node.service, pubsub_node.name
221 client,
222 item,
223 pubsub_node.service,
224 pubsub_node.name
225 ) 217 )
226 for item in items 218 for item in items
227 ] 219 ]
228 else: 220 else:
229 parsed_items = None 221 parsed_items = None
230 222
231 await self.host.memory.storage.cache_pubsub_items( 223 await self.host.memory.storage.cache_pubsub_items(
232 client, pubsub_node, items, parsed_items 224 client, pubsub_node, items, parsed_items
233 ) 225 )
234 226
235 async def _cache_node( 227 async def _cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None:
236 self,
237 client: SatXMPPEntity,
238 pubsub_node: PubsubNode
239 ) -> None:
240 await self.host.memory.storage.update_pubsub_node_sync_state( 228 await self.host.memory.storage.update_pubsub_node_sync_state(
241 pubsub_node, SyncState.IN_PROGRESS 229 pubsub_node, SyncState.IN_PROGRESS
242 ) 230 )
243 service, node = pubsub_node.service, pubsub_node.name 231 service, node = pubsub_node.service, pubsub_node.name
244 try: 232 try:
245 log.debug( 233 log.debug(f"Caching node {node!r} at {service} for {client.profile}")
246 f"Caching node {node!r} at {service} for {client.profile}"
247 )
248 if not pubsub_node.subscribed: 234 if not pubsub_node.subscribed:
249 try: 235 try:
250 sub = await self._p.subscribe(client, service, node) 236 sub = await self._p.subscribe(client, service, node)
251 except Exception as e: 237 except Exception as e:
252 log.warning( 238 log.warning(
284 "latest 20 items" 270 "latest 20 items"
285 ) 271 )
286 items, __ = await client.pubsub_client.items( 272 items, __ = await client.pubsub_client.items(
287 pubsub_node.service, pubsub_node.name, maxItems=20 273 pubsub_node.service, pubsub_node.name, maxItems=20
288 ) 274 )
289 await self.cache_items( 275 await self.cache_items(client, pubsub_node, items)
290 client, pubsub_node, items
291 )
292 else: 276 else:
293 raise e 277 raise e
294 except exceptions.FeatureNotFound: 278 except exceptions.FeatureNotFound:
295 log.warning( 279 log.warning(
296 f"service {service} doesn't handle Result Set Management " 280 f"service {service} doesn't handle Result Set Management "
297 "(XEP-0059), we'll only cache latest 20 items" 281 "(XEP-0059), we'll only cache latest 20 items"
298 ) 282 )
299 items, __ = await client.pubsub_client.items( 283 items, __ = await client.pubsub_client.items(
300 pubsub_node.service, pubsub_node.name, maxItems=20 284 pubsub_node.service, pubsub_node.name, maxItems=20
301 ) 285 )
302 await self.cache_items( 286 await self.cache_items(client, pubsub_node, items)
303 client, pubsub_node, items
304 )
305 else: 287 else:
306 rsm_p = self.host.plugins["XEP-0059"] 288 rsm_p = self.host.plugins["XEP-0059"]
307 rsm_request = rsm.RSMRequest() 289 rsm_request = rsm.RSMRequest()
308 cached_ids = set() 290 cached_ids = set()
309 while True: 291 while True:
310 items, rsm_response = await client.pubsub_client.items( 292 items, rsm_response = await client.pubsub_client.items(
311 service, node, rsm_request=rsm_request 293 service, node, rsm_request=rsm_request
312 ) 294 )
313 await self.cache_items( 295 await self.cache_items(client, pubsub_node, items)
314 client, pubsub_node, items
315 )
316 for item in items: 296 for item in items:
317 item_id = item["id"] 297 item_id = item["id"]
318 if item_id in cached_ids: 298 if item_id in cached_ids:
319 log.warning( 299 log.warning(
320 f"Pubsub node {node!r} at {service} is returning several " 300 f"Pubsub node {node!r} at {service} is returning several "
341 await self.host.memory.storage.update_pubsub_node_sync_state( 321 await self.host.memory.storage.update_pubsub_node_sync_state(
342 pubsub_node, SyncState.COMPLETED 322 pubsub_node, SyncState.COMPLETED
343 ) 323 )
344 except Exception as e: 324 except Exception as e:
345 import traceback 325 import traceback
326
346 tb = traceback.format_tb(e.__traceback__) 327 tb = traceback.format_tb(e.__traceback__)
347 log.error( 328 log.error(
348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" 329 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
349 ) 330 )
350 await self.host.memory.storage.update_pubsub_node_sync_state( 331 await self.host.memory.storage.update_pubsub_node_sync_state(
354 raise e 335 raise e
355 336
356 def _cache_node_clean(self, __, pubsub_node): 337 def _cache_node_clean(self, __, pubsub_node):
357 del self.in_progress[(pubsub_node.service, pubsub_node.name)] 338 del self.in_progress[(pubsub_node.service, pubsub_node.name)]
358 339
359 def cache_node( 340 def cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None:
360 self,
361 client: SatXMPPEntity,
362 pubsub_node: PubsubNode
363 ) -> None:
364 """Launch node caching as a background task""" 341 """Launch node caching as a background task"""
365 d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) 342 d = defer.ensureDeferred(self._cache_node(client, pubsub_node))
366 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) 343 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node)
367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d 344 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
368 return d 345 return d
370 async def analyse_node( 347 async def analyse_node(
371 self, 348 self,
372 client: SatXMPPEntity, 349 client: SatXMPPEntity,
373 service: jid.JID, 350 service: jid.JID,
374 node: str, 351 node: str,
375 pubsub_node : PubsubNode = None, 352 pubsub_node: PubsubNode = None,
376 ) -> dict: 353 ) -> dict:
377 """Use registered analysers on a node to determine what it is used for""" 354 """Use registered analysers on a node to determine what it is used for"""
378 analyse = {"service": service, "node": node} 355 analyse = {"service": service, "node": node}
379 if pubsub_node is None: 356 if pubsub_node is None:
380 try: 357 try:
381 first_item = (await client.pubsub_client.items( 358 first_item = (await client.pubsub_client.items(service, node, 1))[0][0]
382 service, node, 1
383 ))[0][0]
384 except IndexError: 359 except IndexError:
385 pass 360 pass
386 except error.StanzaError as e: 361 except error.StanzaError as e:
387 if e.condition == "item-not-found": 362 if e.condition == "item-not-found":
388 pass 363 pass
440 found = True 415 found = True
441 break 416 break
442 417
443 else: 418 else:
444 found = False 419 found = False
445 log.debug( 420 log.debug(f"node {node!r} at service {service} doesn't match any known type")
446 f"node {node!r} at service {service} doesn't match any known type"
447 )
448 if found: 421 if found:
449 try: 422 try:
450 match_cb = analyser["match_cb"] 423 match_cb = analyser["match_cb"]
451 except KeyError: 424 except KeyError:
452 pass 425 pass
453 else: 426 else:
454 await utils.as_deferred(match_cb, client, analyse) 427 await utils.as_deferred(match_cb, client, analyse)
455 return analyse 428 return analyse
456 429
457 def _get_items_from_cache( 430 def _get_items_from_cache(
458 self, service="", node="", max_items=10, item_ids=None, sub_id=None, 431 self,
459 extra="", profile_key=C.PROF_KEY_NONE 432 service="",
433 node="",
434 max_items=10,
435 item_ids=None,
436 sub_id=None,
437 extra="",
438 profile_key=C.PROF_KEY_NONE,
460 ): 439 ):
461 d = defer.ensureDeferred(self._a_get_items_from_cache( 440 d = defer.ensureDeferred(
462 service, node, max_items, item_ids, sub_id, extra, profile_key 441 self._a_get_items_from_cache(
463 )) 442 service, node, max_items, item_ids, sub_id, extra, profile_key
443 )
444 )
464 d.addCallback(self._p.trans_items_data) 445 d.addCallback(self._p.trans_items_data)
465 d.addCallback(self._p.serialise_items) 446 d.addCallback(self._p.serialise_items)
466 return d 447 return d
467 448
468 async def _a_get_items_from_cache( 449 async def _a_get_items_from_cache(
496 node: PubsubNode, 477 node: PubsubNode,
497 max_items: Optional[int] = None, 478 max_items: Optional[int] = None,
498 item_ids: Optional[List[str]] = None, 479 item_ids: Optional[List[str]] = None,
499 sub_id: Optional[str] = None, 480 sub_id: Optional[str] = None,
500 rsm_request: Optional[rsm.RSMRequest] = None, 481 rsm_request: Optional[rsm.RSMRequest] = None,
501 extra: Optional[Dict[str, Any]] = None 482 extra: Optional[Dict[str, Any]] = None,
502 ) -> Tuple[List[PubsubItem], dict]: 483 ) -> Tuple[List[PubsubItem], dict]:
503 """Get items from cache, using same arguments as for external Pubsub request""" 484 """Get items from cache, using same arguments as for external Pubsub request"""
504 if extra is None: 485 if extra is None:
505 extra = {} 486 extra = {}
506 if "mam" in extra: 487 if "mam" in extra:
507 raise NotImplementedError("MAM queries are not supported yet") 488 raise NotImplementedError("MAM queries are not supported yet")
508 if max_items is None and rsm_request is None: 489 if max_items is None and rsm_request is None:
509 max_items = 20 490 max_items = 20
510 pubsub_items, metadata = await self.host.memory.storage.get_items( 491 pubsub_items, metadata = await self.host.memory.storage.get_items(
511 node, max_items=max_items, item_ids=item_ids or None, 492 node,
512 order_by=extra.get(C.KEY_ORDER_BY) 493 max_items=max_items,
494 item_ids=item_ids or None,
495 order_by=extra.get(C.KEY_ORDER_BY),
513 ) 496 )
514 elif max_items is not None: 497 elif max_items is not None:
515 if rsm_request is not None: 498 if rsm_request is not None:
516 raise exceptions.InternalError( 499 raise exceptions.InternalError(
517 "Pubsub max items and RSM must not be used at the same time" 500 "Pubsub max items and RSM must not be used at the same time"
529 before = None 512 before = None
530 desc = True 513 desc = True
531 else: 514 else:
532 before = rsm_request.before 515 before = rsm_request.before
533 pubsub_items, metadata = await self.host.memory.storage.get_items( 516 pubsub_items, metadata = await self.host.memory.storage.get_items(
534 node, max_items=rsm_request.max, before=before, after=rsm_request.after, 517 node,
535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), 518 max_items=rsm_request.max,
536 desc=desc, force_rsm=True, 519 before=before,
520 after=rsm_request.after,
521 from_index=rsm_request.index,
522 order_by=extra.get(C.KEY_ORDER_BY),
523 desc=desc,
524 force_rsm=True,
537 ) 525 )
538 526
539 return pubsub_items, metadata 527 return pubsub_items, metadata
540 528
541 async def on_items_event(self, client, event): 529 async def on_items_event(self, client, event):
564 log.warning( 552 log.warning(
565 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" 553 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}"
566 ) 554 )
567 if items: 555 if items:
568 log.debug(f"[{client.profile}] caching new items received from {node}") 556 log.debug(f"[{client.profile}] caching new items received from {node}")
569 await self.cache_items( 557 await self.cache_items(client, node, items)
570 client, node, items
571 )
572 if retract_ids: 558 if retract_ids:
573 log.debug(f"deleting retracted items from {node}") 559 log.debug(f"deleting retracted items from {node}")
574 await self.host.memory.storage.delete_pubsub_items( 560 await self.host.memory.storage.delete_pubsub_items(
575 node, items_names=retract_ids 561 node, items_names=retract_ids
576 ) 562 )
600 node: str, 586 node: str,
601 max_items: Optional[int], 587 max_items: Optional[int],
602 item_ids: Optional[List[str]], 588 item_ids: Optional[List[str]],
603 sub_id: Optional[str], 589 sub_id: Optional[str],
604 rsm_request: Optional[rsm.RSMRequest], 590 rsm_request: Optional[rsm.RSMRequest],
605 extra: dict 591 extra: dict,
606 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: 592 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
607 if not self.use_cache: 593 if not self.use_cache:
608 log.debug("cache disabled in settings") 594 log.debug("cache disabled in settings")
609 return True, None 595 return True, None
610 if extra.get(C.KEY_USE_CACHE) == False: 596 if extra.get(C.KEY_USE_CACHE) == False:
697 client: SatXMPPEntity, 683 client: SatXMPPEntity,
698 service: jid.JID, 684 service: jid.JID,
699 nodeIdentifier: str, 685 nodeIdentifier: str,
700 sub_jid: Optional[jid.JID], 686 sub_jid: Optional[jid.JID],
701 options: Optional[dict], 687 options: Optional[dict],
702 subscription: pubsub.Subscription 688 subscription: pubsub.Subscription,
703 ) -> None: 689 ) -> None:
704 pass 690 pass
705 691
706 async def _unsubscribe_trigger( 692 async def _unsubscribe_trigger(
707 self, 693 self,
718 client = self.host.get_client(profile_key) 704 client = self.host.get_client(profile_key)
719 service = client.jid.userhostJID() if not service else jid.JID(service) 705 service = client.jid.userhostJID() if not service else jid.JID(service)
720 return defer.ensureDeferred(self.synchronise(client, service, node)) 706 return defer.ensureDeferred(self.synchronise(client, service, node))
721 707
722 async def synchronise( 708 async def synchronise(
723 self, 709 self, client: SatXMPPEntity, service: jid.JID, node: str, resync: bool = True
724 client: SatXMPPEntity,
725 service: jid.JID,
726 node: str,
727 resync: bool = True
728 ) -> None: 710 ) -> None:
729 """Synchronise a node with a pubsub service 711 """Synchronise a node with a pubsub service
730 712
731 The node will be synchronised even if there is no matching analyser. 713 The node will be synchronised even if there is no matching analyser.
732 714
738 pubsub_node = await self.host.memory.storage.get_pubsub_node( 720 pubsub_node = await self.host.memory.storage.get_pubsub_node(
739 client, service, node 721 client, service, node
740 ) 722 )
741 if pubsub_node is None: 723 if pubsub_node is None:
742 log.info( 724 log.info(
743 _( 725 _("Synchronising the new node {node} at {service}").format(
744 "Synchronising the new node {node} at {service}" 726 node=node, service=service.full
745 ).format(node=node, service=service.full) 727 )
746 ) 728 )
747 analyse = await self.analyse_node(client, service, node) 729 analyse = await self.analyse_node(client, service, node)
748 pubsub_node = await self.host.memory.storage.set_pubsub_node( 730 pubsub_node = await self.host.memory.storage.set_pubsub_node(
749 client, 731 client,
750 service, 732 service,
751 node, 733 node,
752 analyser=analyse.get("name"), 734 analyser=analyse.get("name"),
753 type_=analyse.get("type"), 735 type_=analyse.get("type"),
754 ) 736 )
755 elif not resync and pubsub_node.sync_state is not None: 737 elif not resync and pubsub_node.sync_state is not None:
756 # the node exists, nothing to do 738 # the node exists, nothing to do
757 return 739 return
758 740
759 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS 741 if (
760 or (service, node) in self.in_progress)): 742 pubsub_node.sync_state == SyncState.IN_PROGRESS
743 or (service, node) in self.in_progress
744 ):
761 log.warning( 745 log.warning(
762 _( 746 _(
763 "{node} at {service} is already being synchronised, can't do a new " 747 "{node} at {service} is already being synchronised, can't do a new "
764 "synchronisation." 748 "synchronisation."
765 ).format(node=node, service=service) 749 ).format(node=node, service=service)