comparison sat/plugins/plugin_pubsub_cache.py @ 3597:5d108ce026d7

plugin pubsub cache: Pubsub Caching implementation
author Goffi <goffi@goffi.org>
date Thu, 29 Jul 2021 22:51:01 +0200
parents
children 32181a45d54b
comparison
equal deleted inserted replaced
3596:2d97c695af05 3597:5d108ce026d7
1 #!/usr/bin/env python3
2
3 # Libervia plugin for PubSub Caching
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import time
20 from datetime import datetime
21 from typing import Optional, List, Tuple
22 from twisted.words.protocols.jabber import jid, error
23 from twisted.words.xish import domish
24 from twisted.internet import defer
25 from wokkel import pubsub, rsm
26 from sat.core.i18n import _
27 from sat.core.constants import Const as C
28 from sat.core import exceptions
29 from sat.core.log import getLogger
30 from sat.core.core_types import SatXMPPEntity
31 from sat.tools import xml_tools, utils
32 from sat.tools.common import data_format
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState
34
35
36 log = getLogger(__name__)
37
38 PLUGIN_INFO = {
39 C.PI_NAME: "PubSub Cache",
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE",
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB,
42 C.PI_PROTOCOLS: [],
43 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"],
44 C.PI_RECOMMENDATIONS: [],
45 C.PI_MAIN: "PubsubCache",
46 C.PI_HANDLER: "no",
47 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""),
48 }
49
50 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser")
51 # maximum of items to cache
52 CACHE_LIMIT = 5000
53 # number of second before a progress caching is considered failed and tried again
54 PROGRESS_DEADLINE = 60 * 60 * 6
55
56
57
58 class PubsubCache:
59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
60 # but it would be necessary to have this data if some devices unsubscribe a cached
61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this
62 # situation.
63 # TODO: handle configuration events
64
65 def __init__(self, host):
66 log.info(_("PubSub Cache initialization"))
67 strategy = host.memory.getConfig(None, "pubsub_cache_strategy")
68 if strategy == "no_cache":
69 log.info(
70 _(
71 "Pubsub cache won't be used due to pubsub_cache_strategy={value} "
72 "setting."
73 ).format(value=repr(strategy))
74 )
75 self.use_cache = False
76 else:
77 self.use_cache = True
78 self.host = host
79 self._p = host.plugins["XEP-0060"]
80 self.analysers = {}
81 # map for caching in progress (node, service) => Deferred
82 self.in_progress = {}
83 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger)
84 self._p.addManagedNode(
85 "",
86 items_cb=self.onItemsEvent,
87 delete_cb=self.onDeleteEvent,
88 purge_db=self.onPurgeEvent,
89 )
90 host.bridge.addMethod(
91 "psCacheGet",
92 ".plugin",
93 in_sign="ssiassss",
94 out_sign="s",
95 method=self._getItemsFromCache,
96 async_=True,
97 )
98 host.bridge.addMethod(
99 "psCacheSync",
100 ".plugin",
101 "sss",
102 out_sign="",
103 method=self._synchronise,
104 async_=True,
105 )
106 host.bridge.addMethod(
107 "psCachePurge",
108 ".plugin",
109 "s",
110 out_sign="",
111 method=self._purge,
112 async_=True,
113 )
114 host.bridge.addMethod(
115 "psCacheReset",
116 ".plugin",
117 "",
118 out_sign="",
119 method=self._reset,
120 async_=True,
121 )
122
123 def registerAnalyser(self, analyser: dict) -> None:
124 """Register a new pubsub node analyser
125
126 @param analyser: An analyser is a dictionary which may have the following keys
127 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys
128 must be used):
129
130 :name (str)*:
131 a unique name for this analyser. This name will be stored in database
132 to retrieve the analyser when necessary (notably to get the parsing method),
133 thus it is recommended to use a stable name such as the source plugin name
134 instead of a name which may change with standard evolution, such as the
135 feature namespace.
136
137 :type (str)*:
138 indicates what kind of items we are dealing with. Type must be a human
139 readable word, as it may be used in searches. Good types examples are
140 **blog** or **event**.
141
142 :node (str):
143 prefix of a node name which may be used to identify its type. Example:
144 *urn:xmpp:microblog:0* (a node starting with this name will be identified as
145 *blog* node).
146
147 :namespace (str):
148 root namespace of items. When analysing a node, the first item will be
149 retrieved. The analyser will be chosen its given namespace match the
150 namespace of the first child element of ``<item>`` element.
151
152 :to_sync (bool):
153 if True, the node must be synchronised in cache. The default False value
154 means that the pubsub service will always be requested.
155
156 :parser (callable):
157 method (which may be sync, a coroutine or a method returning a "Deferred")
158 to call to parse the ``domish.Element`` of the item. The result must be
159 dictionary which can be serialised to JSON.
160
161 The method must have the following signature:
162
163 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \
164 service: Optional[jid.JID], node: Optional[str]) \
165 -> dict
166 :noindex:
167
168 :match_cb (callable):
169 method (which may be sync, a coroutine or a method returning a "Deferred")
170 called when the analyser matches. The method is called with the curreny
171 analyse which is can modify **in-place**.
172
173 The method must have the following signature:
174
175 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None
176 :noindex:
177
178 @raise exceptions.Conflict: a analyser with this name already exists
179 """
180
181 name = analyser.get("name", "").strip().lower()
182 # we want the normalised name
183 analyser["name"] = name
184 if not name:
185 raise ValueError('"name" is mandatory in analyser')
186 if "type" not in analyser:
187 raise ValueError('"type" is mandatory in analyser')
188 type_test_keys = {"node", "namespace"}
189 if not type_test_keys.intersection(analyser):
190 raise ValueError(f'at least one of {type_test_keys} must be used')
191 if name in self.analysers:
192 raise exceptions.Conflict(
193 f"An analyser with the name {name!r} is already registered"
194 )
195 self.analysers[name] = analyser
196
197 async def cacheItems(
198 self,
199 client: SatXMPPEntity,
200 pubsub_node: PubsubNode,
201 items: List[domish.Element]
202 ) -> None:
203 try:
204 parser = self.analysers[pubsub_node.analyser].get("parser")
205 except KeyError:
206 parser = None
207
208 if parser is not None:
209 parsed_items = [
210 await utils.asDeferred(
211 parser,
212 client,
213 item,
214 pubsub_node.service,
215 pubsub_node.name
216 )
217 for item in items
218 ]
219 else:
220 parsed_items = None
221
222 await self.host.memory.storage.cachePubsubItems(
223 client, pubsub_node, items, parsed_items
224 )
225
226 async def _cacheNode(
227 self,
228 client: SatXMPPEntity,
229 pubsub_node: PubsubNode
230 ) -> None:
231 await self.host.memory.storage.updatePubsubNodeSyncState(
232 pubsub_node, SyncState.IN_PROGRESS
233 )
234 service, node = pubsub_node.service, pubsub_node.name
235 try:
236 log.debug(
237 f"Caching node {node!r} at {service} for {client.profile}"
238 )
239 if not pubsub_node.subscribed:
240 try:
241 sub = await self._p.subscribe(client, service, node)
242 except Exception as e:
243 log.warning(
244 _(
245 "Can't subscribe node {pubsub_node}, that means that "
246 "synchronisation can't be maintained: {reason}"
247 ).format(pubsub_node=pubsub_node, reason=e)
248 )
249 else:
250 if sub.state == "subscribed":
251 sub_id = sub.subscriptionIdentifier
252 log.debug(
253 f"{pubsub_node} subscribed (subscription id: {sub_id!r})"
254 )
255 pubsub_node.subscribed = True
256 await self.host.memory.storage.add(pubsub_node)
257 else:
258 log.warning(
259 _(
260 "{pubsub_node} is not subscribed, that means that "
261 "synchronisation can't be maintained, and you may have "
262 "to enforce subscription manually. Subscription state: "
263 "{state}"
264 ).format(pubsub_node=pubsub_node, state=sub.state)
265 )
266
267 try:
268 await self.host.checkFeatures(
269 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
270 )
271 except exceptions.FeatureNotFound:
272 log.warning(
273 f"service {service} doesn't handle Result Set Management "
274 "(XEP-0059), we'll only cache latest 20 items"
275 )
276 items, __ = await client.pubsub_client.items(
277 pubsub_node.service, pubsub_node.name, maxItems=20
278 )
279 await self.cacheItems(
280 client, pubsub_node, items
281 )
282 else:
283 rsm_p = self.host.plugins["XEP-0059"]
284 rsm_request = rsm.RSMRequest()
285 cached_ids = set()
286 while True:
287 items, rsm_response = await client.pubsub_client.items(
288 service, node, rsm_request=rsm_request
289 )
290 await self.cacheItems(
291 client, pubsub_node, items
292 )
293 for item in items:
294 item_id = item["id"]
295 if item_id in cached_ids:
296 log.warning(
297 f"Pubsub node {node!r} at {service} is returning several "
298 f"times the same item ({item_id!r}). This is illegal "
299 "behaviour, and it means that Pubsub service "
300 f"{service} is buggy and can't be cached properly. "
301 f"Please report this to {service.host} administrators"
302 )
303 rsm_request = None
304 break
305 cached_ids.add(item["id"])
306 if len(cached_ids) >= CACHE_LIMIT:
307 log.warning(
308 f"Pubsub node {node!r} at {service} contains more items "
309 f"than the cache limit ({CACHE_LIMIT}). We stop "
310 "caching here, at item {item['id']!r}."
311 )
312 rsm_request = None
313 break
314 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response)
315 if rsm_request is None:
316 break
317
318 await self.host.memory.storage.updatePubsubNodeSyncState(
319 pubsub_node, SyncState.COMPLETED
320 )
321 except Exception as e:
322 import traceback
323 tb = traceback.format_tb(e.__traceback__)
324 log.error(
325 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
326 )
327 await self.host.memory.storage.updatePubsubNodeSyncState(
328 pubsub_node, SyncState.ERROR
329 )
330 await self.host.memory.storage.deletePubsubItems(pubsub_node)
331 raise e
332
333 def _cacheNodeClean(self, __, pubsub_node):
334 del self.in_progress[(pubsub_node.service, pubsub_node.name)]
335
336 def cacheNode(
337 self,
338 client: SatXMPPEntity,
339 pubsub_node: PubsubNode
340 ) -> None:
341 """Launch node caching as a background task"""
342 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node))
343 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node)
344 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
345 return d
346
347 async def analyseNode(
348 self,
349 client: SatXMPPEntity,
350 service: jid.JID,
351 node: str,
352 pubsub_node : PubsubNode = None,
353 ) -> dict:
354 """Use registered analysers on a node to determine what it is used for"""
355 analyse = {"service": service, "node": node}
356 if pubsub_node is None:
357 try:
358 first_item = (await client.pubsub_client.items(
359 service, node, 1
360 ))[0][0]
361 except IndexError:
362 pass
363 except error.StanzaError as e:
364 if e.condition == "item-not-found":
365 pass
366 else:
367 log.warning(
368 f"Can't retrieve last item on node {node!r} at service "
369 f"{service} for {client.profile}: {e}"
370 )
371 else:
372 try:
373 uri = first_item.firstChildElement().uri
374 except Exception as e:
375 log.warning(
376 f"Can't retrieve item namespace on node {node!r} at service "
377 f"{service} for {client.profile}: {e}"
378 )
379 else:
380 analyse["namespace"] = uri
381 try:
382 conf = await self._p.getConfiguration(client, service, node)
383 except Exception as e:
384 log.warning(
385 f"Can't retrieve configuration for node {node!r} at service {service} "
386 f"for {client.profile}: {e}"
387 )
388 else:
389 analyse["conf"] = conf
390
391 for analyser in self.analysers.values():
392 match_cb = analyser.get("match_cb")
393 try:
394 an_node = analyser["node"]
395 except KeyError:
396 pass
397 else:
398 if node.startswith(an_node):
399 for key in ANALYSER_KEYS_TO_COPY:
400 try:
401 analyse[key] = analyser[key]
402 except KeyError:
403 pass
404 found = True
405 break
406 try:
407 namespace = analyse["namespace"]
408 an_namespace = analyser["namespace"]
409 except KeyError:
410 pass
411 else:
412 if namespace == an_namespace:
413 for key in ANALYSER_KEYS_TO_COPY:
414 try:
415 analyse[key] = analyser[key]
416 except KeyError:
417 pass
418 found = True
419 break
420
421 else:
422 found = False
423 log.debug(
424 f"node {node!r} at service {service} doesn't match any known type"
425 )
426 if found:
427 try:
428 match_cb = analyser["match_cb"]
429 except KeyError:
430 pass
431 else:
432 await match_cb(client, analyse)
433 return analyse
434
435 def _getItemsFromCache(
436 self, service="", node="", max_items=10, item_ids=None, sub_id=None,
437 extra="", profile_key=C.PROF_KEY_NONE
438 ):
439 d = defer.ensureDeferred(self._aGetItemsFromCache(
440 service, node, max_items, item_ids, sub_id, extra, profile_key
441 ))
442 d.addCallback(self._p.transItemsData)
443 d.addCallback(self._p.serialiseItems)
444 return d
445
446 async def _aGetItemsFromCache(
447 self, service, node, max_items, item_ids, sub_id, extra, profile_key
448 ):
449 client = self.host.getClient(profile_key)
450 service = jid.JID(service) if service else client.jid.userhostJID()
451 pubsub_node = await self.host.memory.storage.getPubsubNode(
452 client, service, node
453 )
454 if pubsub_node is None:
455 raise exceptions.NotFound(
456 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
457 )
458 max_items = None if max_items == C.NO_LIMIT else max_items
459 extra = self._p.parseExtra(data_format.deserialise(extra))
460 items, metadata = await self.getItemsFromCache(
461 client,
462 pubsub_node,
463 max_items,
464 item_ids,
465 sub_id or None,
466 extra.rsm_request,
467 extra.extra,
468 )
469 return [i.data for i in items], metadata
470
471 async def getItemsFromCache(
472 self,
473 client: SatXMPPEntity,
474 node: PubsubNode,
475 max_items: Optional[int] = None,
476 item_ids: Optional[List[str]] = None,
477 sub_id: Optional[str] = None,
478 rsm_request: Optional[rsm.RSMRequest] = None,
479 extra: Optional[dict] = None
480 ) -> Tuple[List[PubsubItem], dict]:
481 """Get items from cache, using same arguments as for external Pubsub request"""
482 if "mam" in extra:
483 raise NotImplementedError("MAM queries are not supported yet")
484 if max_items is None and rsm_request is None:
485 max_items = 20
486 if max_items is not None:
487 if rsm_request is not None:
488 raise exceptions.InternalError(
489 "Pubsub max items and RSM must not be used at the same time"
490 )
491 elif item_ids is None:
492 raise exceptions.InternalError(
493 "Pubsub max items and item IDs must not be used at the same time"
494 )
495 pubsub_items, metadata = await self.host.memory.storage.getItems(
496 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
497 )
498 else:
499 desc = False
500 if rsm_request.before == "":
501 before = None
502 desc = True
503 else:
504 before = rsm_request.before
505 pubsub_items, metadata = await self.host.memory.storage.getItems(
506 node, max_items=rsm_request.max, before=before, after=rsm_request.after,
507 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
508 desc=desc, force_rsm=True,
509 )
510
511 return pubsub_items, metadata
512
513 async def onItemsEvent(self, client, event):
514 node = await self.host.memory.storage.getPubsubNode(
515 client, event.sender, event.nodeIdentifier
516 )
517 if node is None:
518 return
519 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS):
520 items = []
521 retract_ids = []
522 for elt in event.items:
523 if elt.name == "item":
524 items.append(elt)
525 elif elt.name == "retract":
526 item_id = elt.getAttribute("id")
527 if not item_id:
528 log.warning(
529 "Ignoring invalid retract item element: "
530 f"{xml_tools.pFmtElt(elt)}"
531 )
532 continue
533
534 retract_ids.append(elt["id"])
535 else:
536 log.warning(
537 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}"
538 )
539 if items:
540 log.debug("caching new items received from {node}")
541 await self.cacheItems(
542 client, node, items
543 )
544 if retract_ids:
545 log.debug(f"deleting retracted items from {node}")
546 await self.host.memory.storage.deletePubsubItems(
547 node, items_names=retract_ids
548 )
549
550 async def onDeleteEvent(self, client, event):
551 log.debug(
552 f"deleting node {event.nodeIdentifier} from {event.sender} for "
553 f"{client.profile}"
554 )
555 await self.host.memory.storage.deletePubsubNode(
556 [client.profile], [event.sender], [event.nodeIdentifier]
557 )
558
559 async def onPurgeEvent(self, client, event):
560 node = await self.host.memory.storage.getPubsubNode(
561 client, event.sender, event.nodeIdentifier
562 )
563 if node is None:
564 return
565 log.debug(f"purging node {node} for {client.profile}")
566 await self.host.memory.storage.deletePubsubItems(node)
567
568 async def _getItemsTrigger(
569 self,
570 client: SatXMPPEntity,
571 service: Optional[jid.JID],
572 node: str,
573 max_items: Optional[int],
574 item_ids: Optional[List[str]],
575 sub_id: Optional[str],
576 rsm_request: Optional[rsm.RSMRequest],
577 extra: dict
578 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
579 if not self.use_cache:
580 log.debug("cache disabled in settings")
581 return True, None
582 if extra.get(C.KEY_USE_CACHE) == False:
583 log.debug("skipping pubsub cache as requested")
584 return True, None
585 if service is None:
586 service = client.jid.userhostJID()
587 pubsub_node = await self.host.memory.storage.getPubsubNode(
588 client, service, node
589 )
590 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
591 analyse = {"to_sync": True}
592 else:
593 analyse = await self.analyseNode(client, service, node)
594
595 if pubsub_node is None:
596 pubsub_node = await self.host.memory.storage.setPubsubNode(
597 client,
598 service,
599 node,
600 analyser=analyse.get("name"),
601 type_=analyse.get("type"),
602 subtype=analyse.get("subtype"),
603 )
604
605 if analyse.get("to_sync"):
606 if pubsub_node.sync_state == SyncState.COMPLETED:
607 if "mam" in extra:
608 log.debug("MAM caching is not supported yet, skipping cache")
609 return True, None
610 pubsub_items, metadata = await self.getItemsFromCache(
611 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
612 )
613 return False, ([i.data for i in pubsub_items], metadata)
614
615 if pubsub_node.sync_state == SyncState.IN_PROGRESS:
616 if (service, node) not in self.in_progress:
617 log.warning(
618 f"{pubsub_node} is reported as being cached, but not caching is "
619 "in progress, this is most probably due to the backend being "
620 "restarted. Resetting the status, caching will be done again."
621 )
622 pubsub_node.sync_state = None
623 await self.host.memory.storage.deletePubsubItems(pubsub_node)
624 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
625 log.warning(
626 f"{pubsub_node} is in progress for too long "
627 f"({pubsub_node.sync_state_updated//60} minutes), "
628 "cancelling it and retrying."
629 )
630 self.in_progress.pop[(service, node)].cancel()
631 pubsub_node.sync_state = None
632 await self.host.memory.storage.deletePubsubItems(pubsub_node)
633 else:
634 log.debug(
635 f"{pubsub_node} synchronisation is already in progress, skipping"
636 )
637 if pubsub_node.sync_state is None:
638 key = (service, node)
639 if key in self.in_progress:
640 raise exceptions.InternalError(
641 f"There is already a caching in progress for {pubsub_node}, this "
642 "should not happen"
643 )
644 self.cacheNode(client, pubsub_node)
645 elif pubsub_node.sync_state == SyncState.ERROR:
646 log.debug(
647 f"{pubsub_node} synchronisation has previously failed, skipping"
648 )
649
650 return True, None
651
652 async def _subscribeTrigger(
653 self,
654 client: SatXMPPEntity,
655 service: jid.JID,
656 nodeIdentifier: str,
657 sub_jid: Optional[jid.JID],
658 options: Optional[dict],
659 subscription: pubsub.Subscription
660 ) -> None:
661 pass
662
663 async def _unsubscribeTrigger(
664 self,
665 client: SatXMPPEntity,
666 service: jid.JID,
667 nodeIdentifier: str,
668 sub_jid,
669 subscriptionIdentifier,
670 sender,
671 ) -> None:
672 pass
673
674 def _synchronise(self, service, node, profile_key):
675 client = self.host.getClient(profile_key)
676 service = client.jid.userhostJID() if not service else jid.JID(service)
677 return defer.ensureDeferred(self.synchronise(client, service, node))
678
679 async def synchronise(
680 self,
681 client: SatXMPPEntity,
682 service: jid.JID,
683 node: str
684 ) -> None:
685 """Synchronise a node with a pubsub service
686
687 If the node is already synchronised, it will be resynchronised (all items will be
688 deleted and re-downloaded).
689
690 The node will be synchronised even if there is no matching analyser.
691
692 Note that when a node is synchronised, it is automatically subscribed.
693 """
694 pubsub_node = await self.host.memory.storage.getPubsubNode(
695 client, service, node
696 )
697 if pubsub_node is None:
698 log.info(
699 _(
700 "Synchronising the new node {node} at {service}"
701 ).format(node=node, service=service.full)
702 )
703 analyse = await self.analyseNode(client, service, node)
704 pubsub_node = await self.host.memory.storage.setPubsubNode(
705 client,
706 service,
707 node,
708 analyser=analyse.get("name"),
709 type_=analyse.get("type"),
710 )
711
712 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS
713 or (service, node) in self.in_progress)):
714 log.warning(
715 _(
716 "{node} at {service} is already being synchronised, can't do a new "
717 "synchronisation."
718 ).format(node=node, service=service)
719 )
720 else:
721 log.info(
722 _(
723 "(Re)Synchronising the node {node} at {service} on user request"
724 ).format(node=node, service=service.full)
725 )
726 # we first delete and recreate the node (will also delete its items)
727 await self.host.memory.storage.delete(pubsub_node)
728 analyse = await self.analyseNode(client, service, node)
729 pubsub_node = await self.host.memory.storage.setPubsubNode(
730 client,
731 service,
732 node,
733 analyser=analyse.get("name"),
734 type_=analyse.get("type"),
735 )
736 # then we can put node in cache
737 await self.cacheNode(client, pubsub_node)
738
739 async def purge(self, purge_filters: dict) -> None:
740 """Remove items according to filters
741
742 filters can have on of the following keys, all are optional:
743
744 :services:
745 list of JIDs of services from which items must be deleted
746 :nodes:
747 list of node names to delete
748 :types:
749 list of node types to delete
750 :subtypes:
751 list of node subtypes to delete
752 :profiles:
753 list of profiles from which items must be deleted
754 :created_before:
755 datetime before which items must have been created to be deleted
756 :created_update:
757 datetime before which items must have been updated last to be deleted
758 """
759 purge_filters["names"] = purge_filters.pop("nodes", None)
760 await self.host.memory.storage.purgePubsubItems(**purge_filters)
761
762 def _purge(self, purge_filters: str) -> None:
763 purge_filters = data_format.deserialise(purge_filters)
764 for key in "created_before", "updated_before":
765 try:
766 purge_filters[key] = datetime.fromtimestamp(purge_filters[key])
767 except (KeyError, TypeError):
768 pass
769 return defer.ensureDeferred(self.purge(purge_filters))
770
771 async def reset(self) -> None:
772 """Remove ALL nodes and items from cache
773
774 After calling this method, cache will be refilled progressively as if it where new
775 """
776 await self.host.memory.storage.deletePubsubNode(None, None, None)
777
778 def _reset(self) -> None:
779 return defer.ensureDeferred(self.reset())