comparison sat/plugins/plugin_pubsub_cache.py @ 3715:b9718216a1c0 0.9

merge bookmark 0.9
author Goffi <goffi@goffi.org>
date Wed, 01 Dec 2021 16:13:31 +0100
parents 342e3ddefd23
children ffa8c8c78115
comparison
equal deleted inserted replaced
3714:af09b5aaa5d7 3715:b9718216a1c0
1 #!/usr/bin/env python3
2
3 # Libervia plugin for PubSub Caching
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import time
20 from datetime import datetime
21 from typing import Optional, List, Tuple
22 from twisted.words.protocols.jabber import jid, error
23 from twisted.words.xish import domish
24 from twisted.internet import defer
25 from wokkel import pubsub, rsm
26 from sat.core.i18n import _
27 from sat.core.constants import Const as C
28 from sat.core import exceptions
29 from sat.core.log import getLogger
30 from sat.core.core_types import SatXMPPEntity
31 from sat.tools import xml_tools, utils
32 from sat.tools.common import data_format
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState
34
35
36 log = getLogger(__name__)
37
38 PLUGIN_INFO = {
39 C.PI_NAME: "PubSub Cache",
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE",
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB,
42 C.PI_PROTOCOLS: [],
43 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"],
44 C.PI_RECOMMENDATIONS: [],
45 C.PI_MAIN: "PubsubCache",
46 C.PI_HANDLER: "no",
47 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""),
48 }
49
50 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser")
51 # maximum of items to cache
52 CACHE_LIMIT = 5000
53 # number of second before a progress caching is considered failed and tried again
54 PROGRESS_DEADLINE = 60 * 60 * 6
55
56
57
58 class PubsubCache:
59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
60 # but it would be necessary to have this data if some devices unsubscribe a cached
61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this
62 # situation.
63 # TODO: handle configuration events
64
65 def __init__(self, host):
66 log.info(_("PubSub Cache initialization"))
67 strategy = host.memory.getConfig(None, "pubsub_cache_strategy")
68 if strategy == "no_cache":
69 log.info(
70 _(
71 "Pubsub cache won't be used due to pubsub_cache_strategy={value} "
72 "setting."
73 ).format(value=repr(strategy))
74 )
75 self.use_cache = False
76 else:
77 self.use_cache = True
78 self.host = host
79 self._p = host.plugins["XEP-0060"]
80 self.analysers = {}
81 # map for caching in progress (node, service) => Deferred
82 self.in_progress = {}
83 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger)
84 self._p.addManagedNode(
85 "",
86 items_cb=self.onItemsEvent,
87 delete_cb=self.onDeleteEvent,
88 purge_db=self.onPurgeEvent,
89 )
90 host.bridge.addMethod(
91 "psCacheGet",
92 ".plugin",
93 in_sign="ssiassss",
94 out_sign="s",
95 method=self._getItemsFromCache,
96 async_=True,
97 )
98 host.bridge.addMethod(
99 "psCacheSync",
100 ".plugin",
101 "sss",
102 out_sign="",
103 method=self._synchronise,
104 async_=True,
105 )
106 host.bridge.addMethod(
107 "psCachePurge",
108 ".plugin",
109 "s",
110 out_sign="",
111 method=self._purge,
112 async_=True,
113 )
114 host.bridge.addMethod(
115 "psCacheReset",
116 ".plugin",
117 "",
118 out_sign="",
119 method=self._reset,
120 async_=True,
121 )
122 host.bridge.addMethod(
123 "psCacheSearch",
124 ".plugin",
125 "s",
126 out_sign="s",
127 method=self._search,
128 async_=True,
129 )
130
131 def registerAnalyser(self, analyser: dict) -> None:
132 """Register a new pubsub node analyser
133
134 @param analyser: An analyser is a dictionary which may have the following keys
135 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys
136 must be used):
137
138 :name (str)*:
139 a unique name for this analyser. This name will be stored in database
140 to retrieve the analyser when necessary (notably to get the parsing method),
141 thus it is recommended to use a stable name such as the source plugin name
142 instead of a name which may change with standard evolution, such as the
143 feature namespace.
144
145 :type (str)*:
146 indicates what kind of items we are dealing with. Type must be a human
147 readable word, as it may be used in searches. Good types examples are
148 **blog** or **event**.
149
150 :node (str):
151 prefix of a node name which may be used to identify its type. Example:
152 *urn:xmpp:microblog:0* (a node starting with this name will be identified as
153 *blog* node).
154
155 :namespace (str):
156 root namespace of items. When analysing a node, the first item will be
157 retrieved. The analyser will be chosen its given namespace match the
158 namespace of the first child element of ``<item>`` element.
159
160 :to_sync (bool):
161 if True, the node must be synchronised in cache. The default False value
162 means that the pubsub service will always be requested.
163
164 :parser (callable):
165 method (which may be sync, a coroutine or a method returning a "Deferred")
166 to call to parse the ``domish.Element`` of the item. The result must be
167 dictionary which can be serialised to JSON.
168
169 The method must have the following signature:
170
171 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \
172 service: Optional[jid.JID], node: Optional[str]) \
173 -> dict
174 :noindex:
175
176 :match_cb (callable):
177 method (which may be sync, a coroutine or a method returning a "Deferred")
178 called when the analyser matches. The method is called with the curreny
179 analyse which is can modify **in-place**.
180
181 The method must have the following signature:
182
183 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None
184 :noindex:
185
186 @raise exceptions.Conflict: a analyser with this name already exists
187 """
188
189 name = analyser.get("name", "").strip().lower()
190 # we want the normalised name
191 analyser["name"] = name
192 if not name:
193 raise ValueError('"name" is mandatory in analyser')
194 if "type" not in analyser:
195 raise ValueError('"type" is mandatory in analyser')
196 type_test_keys = {"node", "namespace"}
197 if not type_test_keys.intersection(analyser):
198 raise ValueError(f'at least one of {type_test_keys} must be used')
199 if name in self.analysers:
200 raise exceptions.Conflict(
201 f"An analyser with the name {name!r} is already registered"
202 )
203 self.analysers[name] = analyser
204
205 async def cacheItems(
206 self,
207 client: SatXMPPEntity,
208 pubsub_node: PubsubNode,
209 items: List[domish.Element]
210 ) -> None:
211 try:
212 parser = self.analysers[pubsub_node.analyser].get("parser")
213 except KeyError:
214 parser = None
215
216 if parser is not None:
217 parsed_items = [
218 await utils.asDeferred(
219 parser,
220 client,
221 item,
222 pubsub_node.service,
223 pubsub_node.name
224 )
225 for item in items
226 ]
227 else:
228 parsed_items = None
229
230 await self.host.memory.storage.cachePubsubItems(
231 client, pubsub_node, items, parsed_items
232 )
233
234 async def _cacheNode(
235 self,
236 client: SatXMPPEntity,
237 pubsub_node: PubsubNode
238 ) -> None:
239 await self.host.memory.storage.updatePubsubNodeSyncState(
240 pubsub_node, SyncState.IN_PROGRESS
241 )
242 service, node = pubsub_node.service, pubsub_node.name
243 try:
244 log.debug(
245 f"Caching node {node!r} at {service} for {client.profile}"
246 )
247 if not pubsub_node.subscribed:
248 try:
249 sub = await self._p.subscribe(client, service, node)
250 except Exception as e:
251 log.warning(
252 _(
253 "Can't subscribe node {pubsub_node}, that means that "
254 "synchronisation can't be maintained: {reason}"
255 ).format(pubsub_node=pubsub_node, reason=e)
256 )
257 else:
258 if sub.state == "subscribed":
259 sub_id = sub.subscriptionIdentifier
260 log.debug(
261 f"{pubsub_node} subscribed (subscription id: {sub_id!r})"
262 )
263 pubsub_node.subscribed = True
264 await self.host.memory.storage.add(pubsub_node)
265 else:
266 log.warning(
267 _(
268 "{pubsub_node} is not subscribed, that means that "
269 "synchronisation can't be maintained, and you may have "
270 "to enforce subscription manually. Subscription state: "
271 "{state}"
272 ).format(pubsub_node=pubsub_node, state=sub.state)
273 )
274
275 try:
276 await self.host.checkFeatures(
277 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
278 )
279 except exceptions.FeatureNotFound:
280 log.warning(
281 f"service {service} doesn't handle Result Set Management "
282 "(XEP-0059), we'll only cache latest 20 items"
283 )
284 items, __ = await client.pubsub_client.items(
285 pubsub_node.service, pubsub_node.name, maxItems=20
286 )
287 await self.cacheItems(
288 client, pubsub_node, items
289 )
290 else:
291 rsm_p = self.host.plugins["XEP-0059"]
292 rsm_request = rsm.RSMRequest()
293 cached_ids = set()
294 while True:
295 items, rsm_response = await client.pubsub_client.items(
296 service, node, rsm_request=rsm_request
297 )
298 await self.cacheItems(
299 client, pubsub_node, items
300 )
301 for item in items:
302 item_id = item["id"]
303 if item_id in cached_ids:
304 log.warning(
305 f"Pubsub node {node!r} at {service} is returning several "
306 f"times the same item ({item_id!r}). This is illegal "
307 "behaviour, and it means that Pubsub service "
308 f"{service} is buggy and can't be cached properly. "
309 f"Please report this to {service.host} administrators"
310 )
311 rsm_request = None
312 break
313 cached_ids.add(item["id"])
314 if len(cached_ids) >= CACHE_LIMIT:
315 log.warning(
316 f"Pubsub node {node!r} at {service} contains more items "
317 f"than the cache limit ({CACHE_LIMIT}). We stop "
318 "caching here, at item {item['id']!r}."
319 )
320 rsm_request = None
321 break
322 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response)
323 if rsm_request is None:
324 break
325
326 await self.host.memory.storage.updatePubsubNodeSyncState(
327 pubsub_node, SyncState.COMPLETED
328 )
329 except Exception as e:
330 import traceback
331 tb = traceback.format_tb(e.__traceback__)
332 log.error(
333 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
334 )
335 await self.host.memory.storage.updatePubsubNodeSyncState(
336 pubsub_node, SyncState.ERROR
337 )
338 await self.host.memory.storage.deletePubsubItems(pubsub_node)
339 raise e
340
341 def _cacheNodeClean(self, __, pubsub_node):
342 del self.in_progress[(pubsub_node.service, pubsub_node.name)]
343
344 def cacheNode(
345 self,
346 client: SatXMPPEntity,
347 pubsub_node: PubsubNode
348 ) -> None:
349 """Launch node caching as a background task"""
350 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node))
351 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node)
352 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
353 return d
354
355 async def analyseNode(
356 self,
357 client: SatXMPPEntity,
358 service: jid.JID,
359 node: str,
360 pubsub_node : PubsubNode = None,
361 ) -> dict:
362 """Use registered analysers on a node to determine what it is used for"""
363 analyse = {"service": service, "node": node}
364 if pubsub_node is None:
365 try:
366 first_item = (await client.pubsub_client.items(
367 service, node, 1
368 ))[0][0]
369 except IndexError:
370 pass
371 except error.StanzaError as e:
372 if e.condition == "item-not-found":
373 pass
374 else:
375 log.warning(
376 f"Can't retrieve last item on node {node!r} at service "
377 f"{service} for {client.profile}: {e}"
378 )
379 else:
380 try:
381 uri = first_item.firstChildElement().uri
382 except Exception as e:
383 log.warning(
384 f"Can't retrieve item namespace on node {node!r} at service "
385 f"{service} for {client.profile}: {e}"
386 )
387 else:
388 analyse["namespace"] = uri
389 try:
390 conf = await self._p.getConfiguration(client, service, node)
391 except Exception as e:
392 log.warning(
393 f"Can't retrieve configuration for node {node!r} at service {service} "
394 f"for {client.profile}: {e}"
395 )
396 else:
397 analyse["conf"] = conf
398
399 for analyser in self.analysers.values():
400 try:
401 an_node = analyser["node"]
402 except KeyError:
403 pass
404 else:
405 if node.startswith(an_node):
406 for key in ANALYSER_KEYS_TO_COPY:
407 try:
408 analyse[key] = analyser[key]
409 except KeyError:
410 pass
411 found = True
412 break
413 try:
414 namespace = analyse["namespace"]
415 an_namespace = analyser["namespace"]
416 except KeyError:
417 pass
418 else:
419 if namespace == an_namespace:
420 for key in ANALYSER_KEYS_TO_COPY:
421 try:
422 analyse[key] = analyser[key]
423 except KeyError:
424 pass
425 found = True
426 break
427
428 else:
429 found = False
430 log.debug(
431 f"node {node!r} at service {service} doesn't match any known type"
432 )
433 if found:
434 try:
435 match_cb = analyser["match_cb"]
436 except KeyError:
437 pass
438 else:
439 await utils.asDeferred(match_cb, client, analyse)
440 return analyse
441
442 def _getItemsFromCache(
443 self, service="", node="", max_items=10, item_ids=None, sub_id=None,
444 extra="", profile_key=C.PROF_KEY_NONE
445 ):
446 d = defer.ensureDeferred(self._aGetItemsFromCache(
447 service, node, max_items, item_ids, sub_id, extra, profile_key
448 ))
449 d.addCallback(self._p.transItemsData)
450 d.addCallback(self._p.serialiseItems)
451 return d
452
453 async def _aGetItemsFromCache(
454 self, service, node, max_items, item_ids, sub_id, extra, profile_key
455 ):
456 client = self.host.getClient(profile_key)
457 service = jid.JID(service) if service else client.jid.userhostJID()
458 pubsub_node = await self.host.memory.storage.getPubsubNode(
459 client, service, node
460 )
461 if pubsub_node is None:
462 raise exceptions.NotFound(
463 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
464 )
465 max_items = None if max_items == C.NO_LIMIT else max_items
466 extra = self._p.parseExtra(data_format.deserialise(extra))
467 items, metadata = await self.getItemsFromCache(
468 client,
469 pubsub_node,
470 max_items,
471 item_ids,
472 sub_id or None,
473 extra.rsm_request,
474 extra.extra,
475 )
476 return [i.data for i in items], metadata
477
478 async def getItemsFromCache(
479 self,
480 client: SatXMPPEntity,
481 node: PubsubNode,
482 max_items: Optional[int] = None,
483 item_ids: Optional[List[str]] = None,
484 sub_id: Optional[str] = None,
485 rsm_request: Optional[rsm.RSMRequest] = None,
486 extra: Optional[dict] = None
487 ) -> Tuple[List[PubsubItem], dict]:
488 """Get items from cache, using same arguments as for external Pubsub request"""
489 if "mam" in extra:
490 raise NotImplementedError("MAM queries are not supported yet")
491 if max_items is None and rsm_request is None:
492 max_items = 20
493 if max_items is not None:
494 if rsm_request is not None:
495 raise exceptions.InternalError(
496 "Pubsub max items and RSM must not be used at the same time"
497 )
498 elif item_ids is None:
499 raise exceptions.InternalError(
500 "Pubsub max items and item IDs must not be used at the same time"
501 )
502 pubsub_items, metadata = await self.host.memory.storage.getItems(
503 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
504 )
505 else:
506 desc = False
507 if rsm_request.before == "":
508 before = None
509 desc = True
510 else:
511 before = rsm_request.before
512 pubsub_items, metadata = await self.host.memory.storage.getItems(
513 node, max_items=rsm_request.max, before=before, after=rsm_request.after,
514 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
515 desc=desc, force_rsm=True,
516 )
517
518 return pubsub_items, metadata
519
520 async def onItemsEvent(self, client, event):
521 node = await self.host.memory.storage.getPubsubNode(
522 client, event.sender, event.nodeIdentifier
523 )
524 if node is None:
525 return
526 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS):
527 items = []
528 retract_ids = []
529 for elt in event.items:
530 if elt.name == "item":
531 items.append(elt)
532 elif elt.name == "retract":
533 item_id = elt.getAttribute("id")
534 if not item_id:
535 log.warning(
536 "Ignoring invalid retract item element: "
537 f"{xml_tools.pFmtElt(elt)}"
538 )
539 continue
540
541 retract_ids.append(elt["id"])
542 else:
543 log.warning(
544 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}"
545 )
546 if items:
547 log.debug("caching new items received from {node}")
548 await self.cacheItems(
549 client, node, items
550 )
551 if retract_ids:
552 log.debug(f"deleting retracted items from {node}")
553 await self.host.memory.storage.deletePubsubItems(
554 node, items_names=retract_ids
555 )
556
557 async def onDeleteEvent(self, client, event):
558 log.debug(
559 f"deleting node {event.nodeIdentifier} from {event.sender} for "
560 f"{client.profile}"
561 )
562 await self.host.memory.storage.deletePubsubNode(
563 [client.profile], [event.sender], [event.nodeIdentifier]
564 )
565
566 async def onPurgeEvent(self, client, event):
567 node = await self.host.memory.storage.getPubsubNode(
568 client, event.sender, event.nodeIdentifier
569 )
570 if node is None:
571 return
572 log.debug(f"purging node {node} for {client.profile}")
573 await self.host.memory.storage.deletePubsubItems(node)
574
575 async def _getItemsTrigger(
576 self,
577 client: SatXMPPEntity,
578 service: Optional[jid.JID],
579 node: str,
580 max_items: Optional[int],
581 item_ids: Optional[List[str]],
582 sub_id: Optional[str],
583 rsm_request: Optional[rsm.RSMRequest],
584 extra: dict
585 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
586 if not self.use_cache:
587 log.debug("cache disabled in settings")
588 return True, None
589 if extra.get(C.KEY_USE_CACHE) == False:
590 log.debug("skipping pubsub cache as requested")
591 return True, None
592 if service is None:
593 service = client.jid.userhostJID()
594 pubsub_node = await self.host.memory.storage.getPubsubNode(
595 client, service, node
596 )
597 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
598 analyse = {"to_sync": True}
599 else:
600 analyse = await self.analyseNode(client, service, node)
601
602 if pubsub_node is None:
603 pubsub_node = await self.host.memory.storage.setPubsubNode(
604 client,
605 service,
606 node,
607 analyser=analyse.get("name"),
608 type_=analyse.get("type"),
609 subtype=analyse.get("subtype"),
610 )
611
612 if analyse.get("to_sync"):
613 if pubsub_node.sync_state == SyncState.COMPLETED:
614 if "mam" in extra:
615 log.debug("MAM caching is not supported yet, skipping cache")
616 return True, None
617 pubsub_items, metadata = await self.getItemsFromCache(
618 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
619 )
620 return False, ([i.data for i in pubsub_items], metadata)
621
622 if pubsub_node.sync_state == SyncState.IN_PROGRESS:
623 if (service, node) not in self.in_progress:
624 log.warning(
625 f"{pubsub_node} is reported as being cached, but not caching is "
626 "in progress, this is most probably due to the backend being "
627 "restarted. Resetting the status, caching will be done again."
628 )
629 pubsub_node.sync_state = None
630 await self.host.memory.storage.deletePubsubItems(pubsub_node)
631 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
632 log.warning(
633 f"{pubsub_node} is in progress for too long "
634 f"({pubsub_node.sync_state_updated//60} minutes), "
635 "cancelling it and retrying."
636 )
637 self.in_progress.pop[(service, node)].cancel()
638 pubsub_node.sync_state = None
639 await self.host.memory.storage.deletePubsubItems(pubsub_node)
640 else:
641 log.debug(
642 f"{pubsub_node} synchronisation is already in progress, skipping"
643 )
644 if pubsub_node.sync_state is None:
645 key = (service, node)
646 if key in self.in_progress:
647 raise exceptions.InternalError(
648 f"There is already a caching in progress for {pubsub_node}, this "
649 "should not happen"
650 )
651 self.cacheNode(client, pubsub_node)
652 elif pubsub_node.sync_state == SyncState.ERROR:
653 log.debug(
654 f"{pubsub_node} synchronisation has previously failed, skipping"
655 )
656
657 return True, None
658
659 async def _subscribeTrigger(
660 self,
661 client: SatXMPPEntity,
662 service: jid.JID,
663 nodeIdentifier: str,
664 sub_jid: Optional[jid.JID],
665 options: Optional[dict],
666 subscription: pubsub.Subscription
667 ) -> None:
668 pass
669
670 async def _unsubscribeTrigger(
671 self,
672 client: SatXMPPEntity,
673 service: jid.JID,
674 nodeIdentifier: str,
675 sub_jid,
676 subscriptionIdentifier,
677 sender,
678 ) -> None:
679 pass
680
681 def _synchronise(self, service, node, profile_key):
682 client = self.host.getClient(profile_key)
683 service = client.jid.userhostJID() if not service else jid.JID(service)
684 return defer.ensureDeferred(self.synchronise(client, service, node))
685
686 async def synchronise(
687 self,
688 client: SatXMPPEntity,
689 service: jid.JID,
690 node: str
691 ) -> None:
692 """Synchronise a node with a pubsub service
693
694 If the node is already synchronised, it will be resynchronised (all items will be
695 deleted and re-downloaded).
696
697 The node will be synchronised even if there is no matching analyser.
698
699 Note that when a node is synchronised, it is automatically subscribed.
700 """
701 pubsub_node = await self.host.memory.storage.getPubsubNode(
702 client, service, node
703 )
704 if pubsub_node is None:
705 log.info(
706 _(
707 "Synchronising the new node {node} at {service}"
708 ).format(node=node, service=service.full)
709 )
710 analyse = await self.analyseNode(client, service, node)
711 pubsub_node = await self.host.memory.storage.setPubsubNode(
712 client,
713 service,
714 node,
715 analyser=analyse.get("name"),
716 type_=analyse.get("type"),
717 )
718
719 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS
720 or (service, node) in self.in_progress)):
721 log.warning(
722 _(
723 "{node} at {service} is already being synchronised, can't do a new "
724 "synchronisation."
725 ).format(node=node, service=service)
726 )
727 else:
728 log.info(
729 _(
730 "(Re)Synchronising the node {node} at {service} on user request"
731 ).format(node=node, service=service.full)
732 )
733 # we first delete and recreate the node (will also delete its items)
734 await self.host.memory.storage.delete(pubsub_node)
735 analyse = await self.analyseNode(client, service, node)
736 pubsub_node = await self.host.memory.storage.setPubsubNode(
737 client,
738 service,
739 node,
740 analyser=analyse.get("name"),
741 type_=analyse.get("type"),
742 )
743 # then we can put node in cache
744 await self.cacheNode(client, pubsub_node)
745
746 async def purge(self, purge_filters: dict) -> None:
747 """Remove items according to filters
748
749 filters can have on of the following keys, all are optional:
750
751 :services:
752 list of JIDs of services from which items must be deleted
753 :nodes:
754 list of node names to delete
755 :types:
756 list of node types to delete
757 :subtypes:
758 list of node subtypes to delete
759 :profiles:
760 list of profiles from which items must be deleted
761 :created_before:
762 datetime before which items must have been created to be deleted
763 :created_update:
764 datetime before which items must have been updated last to be deleted
765 """
766 purge_filters["names"] = purge_filters.pop("nodes", None)
767 await self.host.memory.storage.purgePubsubItems(**purge_filters)
768
769 def _purge(self, purge_filters: str) -> None:
770 purge_filters = data_format.deserialise(purge_filters)
771 for key in "created_before", "updated_before":
772 try:
773 purge_filters[key] = datetime.fromtimestamp(purge_filters[key])
774 except (KeyError, TypeError):
775 pass
776 return defer.ensureDeferred(self.purge(purge_filters))
777
778 async def reset(self) -> None:
779 """Remove ALL nodes and items from cache
780
781 After calling this method, cache will be refilled progressively as if it where new
782 """
783 await self.host.memory.storage.deletePubsubNode(None, None, None)
784
785 def _reset(self) -> defer.Deferred:
786 return defer.ensureDeferred(self.reset())
787
788 async def search(self, query: dict) -> List[PubsubItem]:
789 """Search pubsub items in cache"""
790 return await self.host.memory.storage.searchPubsubItems(query)
791
792 async def serialisableSearch(self, query: dict) -> List[dict]:
793 """Search pubsub items in cache and returns parsed data
794
795 The returned data can be serialised.
796
797 "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
798 """
799 items = await self.search(query)
800 ret = []
801 for item in items:
802 parsed = item.parsed
803 parsed["pubsub_service"] = item.node.service.full()
804 parsed["pubsub_node"] = item.node.name
805 if query.get("with_payload"):
806 parsed["item_payload"] = item.data.toXml()
807 parsed["node_profile"] = self.host.memory.storage.getProfileById(
808 item.node.profile_id
809 )
810
811 ret.append(parsed)
812 return ret
813
814 def _search(self, query: str) -> defer.Deferred:
815 query = data_format.deserialise(query)
816 services = query.get("services")
817 if services:
818 query["services"] = [jid.JID(s) for s in services]
819 d = defer.ensureDeferred(self.serialisableSearch(query))
820 d.addCallback(data_format.serialise)
821 return d