comparison libervia/backend/plugins/plugin_pubsub_cache.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_pubsub_cache.py@524856bd7b19
children 2b000790b197
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia plugin for PubSub Caching
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import time
20 from datetime import datetime
21 from typing import Optional, List, Tuple, Dict, Any
22 from twisted.words.protocols.jabber import jid, error
23 from twisted.words.xish import domish
24 from twisted.internet import defer
25 from wokkel import pubsub, rsm
26 from libervia.backend.core.i18n import _
27 from libervia.backend.core.constants import Const as C
28 from libervia.backend.core import exceptions
29 from libervia.backend.core.log import getLogger
30 from libervia.backend.core.core_types import SatXMPPEntity
31 from libervia.backend.tools import xml_tools, utils
32 from libervia.backend.tools.common import data_format
33 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError
34
35
36 log = getLogger(__name__)
37
38 PLUGIN_INFO = {
39 C.PI_NAME: "PubSub Cache",
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE",
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB,
42 C.PI_MODES: C.PLUG_MODE_BOTH,
43 C.PI_PROTOCOLS: [],
44 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"],
45 C.PI_RECOMMENDATIONS: [],
46 C.PI_MAIN: "PubsubCache",
47 C.PI_HANDLER: "no",
48 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""),
49 }
50
51 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser")
52 # maximum of items to cache
53 CACHE_LIMIT = 5000
54 # number of second before a progress caching is considered failed and tried again
55 PROGRESS_DEADLINE = 60 * 60 * 6
56
57
58
59 class PubsubCache:
60 # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
61 # but it would be necessary to have this data if some devices unsubscribe a cached
62 # node, as we can then get out of sync. A protoXEP could be proposed to fix this
63 # situation.
64 # TODO: handle configuration events
65
66 def __init__(self, host):
67 log.info(_("PubSub Cache initialization"))
68 strategy = host.memory.config_get(None, "pubsub_cache_strategy")
69 if strategy == "no_cache":
70 log.info(
71 _(
72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} "
73 "setting."
74 ).format(value=repr(strategy))
75 )
76 self.use_cache = False
77 else:
78 self.use_cache = True
79 self.host = host
80 self._p = host.plugins["XEP-0060"]
81 self.analysers = {}
82 # map for caching in progress (node, service) => Deferred
83 self.in_progress = {}
84 self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger)
85 self._p.add_managed_node(
86 "",
87 items_cb=self.on_items_event,
88 delete_cb=self.on_delete_event,
89 purge_db=self.on_purge_event,
90 )
91 host.bridge.add_method(
92 "ps_cache_get",
93 ".plugin",
94 in_sign="ssiassss",
95 out_sign="s",
96 method=self._get_items_from_cache,
97 async_=True,
98 )
99 host.bridge.add_method(
100 "ps_cache_sync",
101 ".plugin",
102 "sss",
103 out_sign="",
104 method=self._synchronise,
105 async_=True,
106 )
107 host.bridge.add_method(
108 "ps_cache_purge",
109 ".plugin",
110 "s",
111 out_sign="",
112 method=self._purge,
113 async_=True,
114 )
115 host.bridge.add_method(
116 "ps_cache_reset",
117 ".plugin",
118 "",
119 out_sign="",
120 method=self._reset,
121 async_=True,
122 )
123 host.bridge.add_method(
124 "ps_cache_search",
125 ".plugin",
126 "s",
127 out_sign="s",
128 method=self._search,
129 async_=True,
130 )
131
132 def register_analyser(self, analyser: dict) -> None:
133 """Register a new pubsub node analyser
134
135 @param analyser: An analyser is a dictionary which may have the following keys
136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys
137 must be used):
138
139 :name (str)*:
140 a unique name for this analyser. This name will be stored in database
141 to retrieve the analyser when necessary (notably to get the parsing method),
142 thus it is recommended to use a stable name such as the source plugin name
143 instead of a name which may change with standard evolution, such as the
144 feature namespace.
145
146 :type (str)*:
147 indicates what kind of items we are dealing with. Type must be a human
148 readable word, as it may be used in searches. Good types examples are
149 **blog** or **event**.
150
151 :node (str):
152 prefix of a node name which may be used to identify its type. Example:
153 *urn:xmpp:microblog:0* (a node starting with this name will be identified as
154 *blog* node).
155
156 :namespace (str):
157 root namespace of items. When analysing a node, the first item will be
158 retrieved. The analyser will be chosen its given namespace match the
159 namespace of the first child element of ``<item>`` element.
160
161 :to_sync (bool):
162 if True, the node must be synchronised in cache. The default False value
163 means that the pubsub service will always be requested.
164
165 :parser (callable):
166 method (which may be sync, a coroutine or a method returning a "Deferred")
167 to call to parse the ``domish.Element`` of the item. The result must be
168 dictionary which can be serialised to JSON.
169
170 The method must have the following signature:
171
172 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \
173 service: Optional[jid.JID], node: Optional[str]) \
174 -> dict
175 :noindex:
176
177 :match_cb (callable):
178 method (which may be sync, a coroutine or a method returning a "Deferred")
179 called when the analyser matches. The method is called with the curreny
180 analyse which is can modify **in-place**.
181
182 The method must have the following signature:
183
184 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None
185 :noindex:
186
187 @raise exceptions.Conflict: a analyser with this name already exists
188 """
189
190 name = analyser.get("name", "").strip().lower()
191 # we want the normalised name
192 analyser["name"] = name
193 if not name:
194 raise ValueError('"name" is mandatory in analyser')
195 if "type" not in analyser:
196 raise ValueError('"type" is mandatory in analyser')
197 type_test_keys = {"node", "namespace"}
198 if not type_test_keys.intersection(analyser):
199 raise ValueError(f'at least one of {type_test_keys} must be used')
200 if name in self.analysers:
201 raise exceptions.Conflict(
202 f"An analyser with the name {name!r} is already registered"
203 )
204 self.analysers[name] = analyser
205
206 async def cache_items(
207 self,
208 client: SatXMPPEntity,
209 pubsub_node: PubsubNode,
210 items: List[domish.Element]
211 ) -> None:
212 try:
213 parser = self.analysers[pubsub_node.analyser].get("parser")
214 except KeyError:
215 parser = None
216
217 if parser is not None:
218 parsed_items = [
219 await utils.as_deferred(
220 parser,
221 client,
222 item,
223 pubsub_node.service,
224 pubsub_node.name
225 )
226 for item in items
227 ]
228 else:
229 parsed_items = None
230
231 await self.host.memory.storage.cache_pubsub_items(
232 client, pubsub_node, items, parsed_items
233 )
234
235 async def _cache_node(
236 self,
237 client: SatXMPPEntity,
238 pubsub_node: PubsubNode
239 ) -> None:
240 await self.host.memory.storage.update_pubsub_node_sync_state(
241 pubsub_node, SyncState.IN_PROGRESS
242 )
243 service, node = pubsub_node.service, pubsub_node.name
244 try:
245 log.debug(
246 f"Caching node {node!r} at {service} for {client.profile}"
247 )
248 if not pubsub_node.subscribed:
249 try:
250 sub = await self._p.subscribe(client, service, node)
251 except Exception as e:
252 log.warning(
253 _(
254 "Can't subscribe node {pubsub_node}, that means that "
255 "synchronisation can't be maintained: {reason}"
256 ).format(pubsub_node=pubsub_node, reason=e)
257 )
258 else:
259 if sub.state == "subscribed":
260 sub_id = sub.subscriptionIdentifier
261 log.debug(
262 f"{pubsub_node} subscribed (subscription id: {sub_id!r})"
263 )
264 pubsub_node.subscribed = True
265 await self.host.memory.storage.add(pubsub_node)
266 else:
267 log.warning(
268 _(
269 "{pubsub_node} is not subscribed, that means that "
270 "synchronisation can't be maintained, and you may have "
271 "to enforce subscription manually. Subscription state: "
272 "{state}"
273 ).format(pubsub_node=pubsub_node, state=sub.state)
274 )
275
276 try:
277 await self.host.check_features(
278 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
279 )
280 except error.StanzaError as e:
281 if e.condition == "service-unavailable":
282 log.warning(
283 "service {service} is hidding disco infos, we'll only cache "
284 "latest 20 items"
285 )
286 items, __ = await client.pubsub_client.items(
287 pubsub_node.service, pubsub_node.name, maxItems=20
288 )
289 await self.cache_items(
290 client, pubsub_node, items
291 )
292 else:
293 raise e
294 except exceptions.FeatureNotFound:
295 log.warning(
296 f"service {service} doesn't handle Result Set Management "
297 "(XEP-0059), we'll only cache latest 20 items"
298 )
299 items, __ = await client.pubsub_client.items(
300 pubsub_node.service, pubsub_node.name, maxItems=20
301 )
302 await self.cache_items(
303 client, pubsub_node, items
304 )
305 else:
306 rsm_p = self.host.plugins["XEP-0059"]
307 rsm_request = rsm.RSMRequest()
308 cached_ids = set()
309 while True:
310 items, rsm_response = await client.pubsub_client.items(
311 service, node, rsm_request=rsm_request
312 )
313 await self.cache_items(
314 client, pubsub_node, items
315 )
316 for item in items:
317 item_id = item["id"]
318 if item_id in cached_ids:
319 log.warning(
320 f"Pubsub node {node!r} at {service} is returning several "
321 f"times the same item ({item_id!r}). This is illegal "
322 "behaviour, and it means that Pubsub service "
323 f"{service} is buggy and can't be cached properly. "
324 f"Please report this to {service.host} administrators"
325 )
326 rsm_request = None
327 break
328 cached_ids.add(item["id"])
329 if len(cached_ids) >= CACHE_LIMIT:
330 log.warning(
331 f"Pubsub node {node!r} at {service} contains more items "
332 f"than the cache limit ({CACHE_LIMIT}). We stop "
333 "caching here, at item {item['id']!r}."
334 )
335 rsm_request = None
336 break
337 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
338 if rsm_request is None:
339 break
340
341 await self.host.memory.storage.update_pubsub_node_sync_state(
342 pubsub_node, SyncState.COMPLETED
343 )
344 except Exception as e:
345 import traceback
346 tb = traceback.format_tb(e.__traceback__)
347 log.error(
348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
349 )
350 await self.host.memory.storage.update_pubsub_node_sync_state(
351 pubsub_node, SyncState.ERROR
352 )
353 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
354 raise e
355
356 def _cache_node_clean(self, __, pubsub_node):
357 del self.in_progress[(pubsub_node.service, pubsub_node.name)]
358
359 def cache_node(
360 self,
361 client: SatXMPPEntity,
362 pubsub_node: PubsubNode
363 ) -> None:
364 """Launch node caching as a background task"""
365 d = defer.ensureDeferred(self._cache_node(client, pubsub_node))
366 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node)
367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
368 return d
369
370 async def analyse_node(
371 self,
372 client: SatXMPPEntity,
373 service: jid.JID,
374 node: str,
375 pubsub_node : PubsubNode = None,
376 ) -> dict:
377 """Use registered analysers on a node to determine what it is used for"""
378 analyse = {"service": service, "node": node}
379 if pubsub_node is None:
380 try:
381 first_item = (await client.pubsub_client.items(
382 service, node, 1
383 ))[0][0]
384 except IndexError:
385 pass
386 except error.StanzaError as e:
387 if e.condition == "item-not-found":
388 pass
389 else:
390 log.warning(
391 f"Can't retrieve last item on node {node!r} at service "
392 f"{service} for {client.profile}: {e}"
393 )
394 else:
395 try:
396 uri = first_item.firstChildElement().uri
397 except Exception as e:
398 log.warning(
399 f"Can't retrieve item namespace on node {node!r} at service "
400 f"{service} for {client.profile}: {e}"
401 )
402 else:
403 analyse["namespace"] = uri
404 try:
405 conf = await self._p.getConfiguration(client, service, node)
406 except Exception as e:
407 log.warning(
408 f"Can't retrieve configuration for node {node!r} at service {service} "
409 f"for {client.profile}: {e}"
410 )
411 else:
412 analyse["conf"] = conf
413
414 for analyser in self.analysers.values():
415 try:
416 an_node = analyser["node"]
417 except KeyError:
418 pass
419 else:
420 if node.startswith(an_node):
421 for key in ANALYSER_KEYS_TO_COPY:
422 try:
423 analyse[key] = analyser[key]
424 except KeyError:
425 pass
426 found = True
427 break
428 try:
429 namespace = analyse["namespace"]
430 an_namespace = analyser["namespace"]
431 except KeyError:
432 pass
433 else:
434 if namespace == an_namespace:
435 for key in ANALYSER_KEYS_TO_COPY:
436 try:
437 analyse[key] = analyser[key]
438 except KeyError:
439 pass
440 found = True
441 break
442
443 else:
444 found = False
445 log.debug(
446 f"node {node!r} at service {service} doesn't match any known type"
447 )
448 if found:
449 try:
450 match_cb = analyser["match_cb"]
451 except KeyError:
452 pass
453 else:
454 await utils.as_deferred(match_cb, client, analyse)
455 return analyse
456
457 def _get_items_from_cache(
458 self, service="", node="", max_items=10, item_ids=None, sub_id=None,
459 extra="", profile_key=C.PROF_KEY_NONE
460 ):
461 d = defer.ensureDeferred(self._a_get_items_from_cache(
462 service, node, max_items, item_ids, sub_id, extra, profile_key
463 ))
464 d.addCallback(self._p.trans_items_data)
465 d.addCallback(self._p.serialise_items)
466 return d
467
468 async def _a_get_items_from_cache(
469 self, service, node, max_items, item_ids, sub_id, extra, profile_key
470 ):
471 client = self.host.get_client(profile_key)
472 service = jid.JID(service) if service else client.jid.userhostJID()
473 pubsub_node = await self.host.memory.storage.get_pubsub_node(
474 client, service, node
475 )
476 if pubsub_node is None:
477 raise exceptions.NotFound(
478 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
479 )
480 max_items = None if max_items == C.NO_LIMIT else max_items
481 extra = self._p.parse_extra(data_format.deserialise(extra))
482 items, metadata = await self.get_items_from_cache(
483 client,
484 pubsub_node,
485 max_items,
486 item_ids,
487 sub_id or None,
488 extra.rsm_request,
489 extra.extra,
490 )
491 return [i.data for i in items], metadata
492
493 async def get_items_from_cache(
494 self,
495 client: SatXMPPEntity,
496 node: PubsubNode,
497 max_items: Optional[int] = None,
498 item_ids: Optional[List[str]] = None,
499 sub_id: Optional[str] = None,
500 rsm_request: Optional[rsm.RSMRequest] = None,
501 extra: Optional[Dict[str, Any]] = None
502 ) -> Tuple[List[PubsubItem], dict]:
503 """Get items from cache, using same arguments as for external Pubsub request"""
504 if extra is None:
505 extra = {}
506 if "mam" in extra:
507 raise NotImplementedError("MAM queries are not supported yet")
508 if max_items is None and rsm_request is None:
509 max_items = 20
510 pubsub_items, metadata = await self.host.memory.storage.get_items(
511 node, max_items=max_items, item_ids=item_ids or None,
512 order_by=extra.get(C.KEY_ORDER_BY)
513 )
514 elif max_items is not None:
515 if rsm_request is not None:
516 raise exceptions.InternalError(
517 "Pubsub max items and RSM must not be used at the same time"
518 )
519 elif item_ids:
520 raise exceptions.InternalError(
521 "Pubsub max items and item IDs must not be used at the same time"
522 )
523 pubsub_items, metadata = await self.host.memory.storage.get_items(
524 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
525 )
526 else:
527 desc = False
528 if rsm_request.before == "":
529 before = None
530 desc = True
531 else:
532 before = rsm_request.before
533 pubsub_items, metadata = await self.host.memory.storage.get_items(
534 node, max_items=rsm_request.max, before=before, after=rsm_request.after,
535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
536 desc=desc, force_rsm=True,
537 )
538
539 return pubsub_items, metadata
540
541 async def on_items_event(self, client, event):
542 node = await self.host.memory.storage.get_pubsub_node(
543 client, event.sender, event.nodeIdentifier
544 )
545 if node is None:
546 return
547 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS):
548 items = []
549 retract_ids = []
550 for elt in event.items:
551 if elt.name == "item":
552 items.append(elt)
553 elif elt.name == "retract":
554 item_id = elt.getAttribute("id")
555 if not item_id:
556 log.warning(
557 "Ignoring invalid retract item element: "
558 f"{xml_tools.p_fmt_elt(elt)}"
559 )
560 continue
561
562 retract_ids.append(elt["id"])
563 else:
564 log.warning(
565 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}"
566 )
567 if items:
568 log.debug(f"[{client.profile}] caching new items received from {node}")
569 await self.cache_items(
570 client, node, items
571 )
572 if retract_ids:
573 log.debug(f"deleting retracted items from {node}")
574 await self.host.memory.storage.delete_pubsub_items(
575 node, items_names=retract_ids
576 )
577
578 async def on_delete_event(self, client, event):
579 log.debug(
580 f"deleting node {event.nodeIdentifier} from {event.sender} for "
581 f"{client.profile}"
582 )
583 await self.host.memory.storage.delete_pubsub_node(
584 [client.profile], [event.sender], [event.nodeIdentifier]
585 )
586
587 async def on_purge_event(self, client, event):
588 node = await self.host.memory.storage.get_pubsub_node(
589 client, event.sender, event.nodeIdentifier
590 )
591 if node is None:
592 return
593 log.debug(f"purging node {node} for {client.profile}")
594 await self.host.memory.storage.delete_pubsub_items(node)
595
596 async def _get_items_trigger(
597 self,
598 client: SatXMPPEntity,
599 service: Optional[jid.JID],
600 node: str,
601 max_items: Optional[int],
602 item_ids: Optional[List[str]],
603 sub_id: Optional[str],
604 rsm_request: Optional[rsm.RSMRequest],
605 extra: dict
606 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
607 if not self.use_cache:
608 log.debug("cache disabled in settings")
609 return True, None
610 if extra.get(C.KEY_USE_CACHE) == False:
611 log.debug("skipping pubsub cache as requested")
612 return True, None
613 if service is None:
614 service = client.jid.userhostJID()
615 for __ in range(5):
616 pubsub_node = await self.host.memory.storage.get_pubsub_node(
617 client, service, node
618 )
619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
620 analyse = {"to_sync": True}
621 else:
622 analyse = await self.analyse_node(client, service, node)
623
624 if pubsub_node is None:
625 try:
626 pubsub_node = await self.host.memory.storage.set_pubsub_node(
627 client,
628 service,
629 node,
630 analyser=analyse.get("name"),
631 type_=analyse.get("type"),
632 subtype=analyse.get("subtype"),
633 )
634 except IntegrityError as e:
635 if "unique" in str(e.orig).lower():
636 log.debug(
637 "race condition on pubsub node creation in cache, trying "
638 "again"
639 )
640 else:
641 raise e
642 break
643 else:
644 raise exceptions.InternalError(
645 "Too many IntegrityError with UNIQUE constraint, something is going wrong"
646 )
647
648 if analyse.get("to_sync"):
649 if pubsub_node.sync_state == SyncState.COMPLETED:
650 if "mam" in extra:
651 log.debug("MAM caching is not supported yet, skipping cache")
652 return True, None
653 pubsub_items, metadata = await self.get_items_from_cache(
654 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
655 )
656 return False, ([i.data for i in pubsub_items], metadata)
657
658 if pubsub_node.sync_state == SyncState.IN_PROGRESS:
659 if (service, node) not in self.in_progress:
660 log.warning(
661 f"{pubsub_node} is reported as being cached, but not caching is "
662 "in progress, this is most probably due to the backend being "
663 "restarted. Resetting the status, caching will be done again."
664 )
665 pubsub_node.sync_state = None
666 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
667 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
668 log.warning(
669 f"{pubsub_node} is in progress for too long "
670 f"({pubsub_node.sync_state_updated//60} minutes), "
671 "cancelling it and retrying."
672 )
673 self.in_progress.pop[(service, node)].cancel()
674 pubsub_node.sync_state = None
675 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
676 else:
677 log.debug(
678 f"{pubsub_node} synchronisation is already in progress, skipping"
679 )
680 if pubsub_node.sync_state is None:
681 key = (service, node)
682 if key in self.in_progress:
683 raise exceptions.InternalError(
684 f"There is already a caching in progress for {pubsub_node}, this "
685 "should not happen"
686 )
687 self.cache_node(client, pubsub_node)
688 elif pubsub_node.sync_state == SyncState.ERROR:
689 log.debug(
690 f"{pubsub_node} synchronisation has previously failed, skipping"
691 )
692
693 return True, None
694
695 async def _subscribe_trigger(
696 self,
697 client: SatXMPPEntity,
698 service: jid.JID,
699 nodeIdentifier: str,
700 sub_jid: Optional[jid.JID],
701 options: Optional[dict],
702 subscription: pubsub.Subscription
703 ) -> None:
704 pass
705
706 async def _unsubscribe_trigger(
707 self,
708 client: SatXMPPEntity,
709 service: jid.JID,
710 nodeIdentifier: str,
711 sub_jid,
712 subscriptionIdentifier,
713 sender,
714 ) -> None:
715 pass
716
717 def _synchronise(self, service, node, profile_key):
718 client = self.host.get_client(profile_key)
719 service = client.jid.userhostJID() if not service else jid.JID(service)
720 return defer.ensureDeferred(self.synchronise(client, service, node))
721
722 async def synchronise(
723 self,
724 client: SatXMPPEntity,
725 service: jid.JID,
726 node: str,
727 resync: bool = True
728 ) -> None:
729 """Synchronise a node with a pubsub service
730
731 The node will be synchronised even if there is no matching analyser.
732
733 Note that when a node is synchronised, it is automatically subscribed.
734 @param resync: if True and the node is already synchronised, it will be
735 resynchronised (all items will be deleted and re-downloaded).
736
737 """
738 pubsub_node = await self.host.memory.storage.get_pubsub_node(
739 client, service, node
740 )
741 if pubsub_node is None:
742 log.info(
743 _(
744 "Synchronising the new node {node} at {service}"
745 ).format(node=node, service=service.full)
746 )
747 analyse = await self.analyse_node(client, service, node)
748 pubsub_node = await self.host.memory.storage.set_pubsub_node(
749 client,
750 service,
751 node,
752 analyser=analyse.get("name"),
753 type_=analyse.get("type"),
754 )
755 elif not resync and pubsub_node.sync_state is not None:
756 # the node exists, nothing to do
757 return
758
759 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS
760 or (service, node) in self.in_progress)):
761 log.warning(
762 _(
763 "{node} at {service} is already being synchronised, can't do a new "
764 "synchronisation."
765 ).format(node=node, service=service)
766 )
767 else:
768 log.info(
769 _(
770 "(Re)Synchronising the node {node} at {service} on user request"
771 ).format(node=node, service=service.full())
772 )
773 # we first delete and recreate the node (will also delete its items)
774 await self.host.memory.storage.delete(pubsub_node)
775 analyse = await self.analyse_node(client, service, node)
776 pubsub_node = await self.host.memory.storage.set_pubsub_node(
777 client,
778 service,
779 node,
780 analyser=analyse.get("name"),
781 type_=analyse.get("type"),
782 )
783 # then we can put node in cache
784 await self.cache_node(client, pubsub_node)
785
786 async def purge(self, purge_filters: dict) -> None:
787 """Remove items according to filters
788
789 filters can have on of the following keys, all are optional:
790
791 :services:
792 list of JIDs of services from which items must be deleted
793 :nodes:
794 list of node names to delete
795 :types:
796 list of node types to delete
797 :subtypes:
798 list of node subtypes to delete
799 :profiles:
800 list of profiles from which items must be deleted
801 :created_before:
802 datetime before which items must have been created to be deleted
803 :created_update:
804 datetime before which items must have been updated last to be deleted
805 """
806 purge_filters["names"] = purge_filters.pop("nodes", None)
807 await self.host.memory.storage.purge_pubsub_items(**purge_filters)
808
809 def _purge(self, purge_filters: str) -> None:
810 purge_filters = data_format.deserialise(purge_filters)
811 for key in "created_before", "updated_before":
812 try:
813 purge_filters[key] = datetime.fromtimestamp(purge_filters[key])
814 except (KeyError, TypeError):
815 pass
816 return defer.ensureDeferred(self.purge(purge_filters))
817
818 async def reset(self) -> None:
819 """Remove ALL nodes and items from cache
820
821 After calling this method, cache will be refilled progressively as if it where new
822 """
823 await self.host.memory.storage.delete_pubsub_node(None, None, None)
824
825 def _reset(self) -> defer.Deferred:
826 return defer.ensureDeferred(self.reset())
827
828 async def search(self, query: dict) -> List[PubsubItem]:
829 """Search pubsub items in cache"""
830 return await self.host.memory.storage.search_pubsub_items(query)
831
832 async def serialisable_search(self, query: dict) -> List[dict]:
833 """Search pubsub items in cache and returns parsed data
834
835 The returned data can be serialised.
836
837 "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
838 """
839 items = await self.search(query)
840 ret = []
841 for item in items:
842 parsed = item.parsed
843 parsed["pubsub_service"] = item.node.service.full()
844 parsed["pubsub_node"] = item.node.name
845 if query.get("with_payload"):
846 parsed["item_payload"] = item.data.toXml()
847 parsed["node_profile"] = self.host.memory.storage.get_profile_by_id(
848 item.node.profile_id
849 )
850
851 ret.append(parsed)
852 return ret
853
854 def _search(self, query: str) -> defer.Deferred:
855 query = data_format.deserialise(query)
856 services = query.get("services")
857 if services:
858 query["services"] = [jid.JID(s) for s in services]
859 d = defer.ensureDeferred(self.serialisable_search(query))
860 d.addCallback(data_format.serialise)
861 return d