Mercurial > libervia-backend
comparison sat/plugins/plugin_pubsub_cache.py @ 3597:5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 29 Jul 2021 22:51:01 +0200 |
parents | |
children | 32181a45d54b |
comparison
equal
deleted
inserted
replaced
3596:2d97c695af05 | 3597:5d108ce026d7 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for PubSub Caching | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import time | |
20 from datetime import datetime | |
21 from typing import Optional, List, Tuple | |
22 from twisted.words.protocols.jabber import jid, error | |
23 from twisted.words.xish import domish | |
24 from twisted.internet import defer | |
25 from wokkel import pubsub, rsm | |
26 from sat.core.i18n import _ | |
27 from sat.core.constants import Const as C | |
28 from sat.core import exceptions | |
29 from sat.core.log import getLogger | |
30 from sat.core.core_types import SatXMPPEntity | |
31 from sat.tools import xml_tools, utils | |
32 from sat.tools.common import data_format | |
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState | |
34 | |
35 | |
36 log = getLogger(__name__) | |
37 | |
38 PLUGIN_INFO = { | |
39 C.PI_NAME: "PubSub Cache", | |
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE", | |
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB, | |
42 C.PI_PROTOCOLS: [], | |
43 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], | |
44 C.PI_RECOMMENDATIONS: [], | |
45 C.PI_MAIN: "PubsubCache", | |
46 C.PI_HANDLER: "no", | |
47 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), | |
48 } | |
49 | |
50 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") | |
51 # maximum of items to cache | |
52 CACHE_LIMIT = 5000 | |
53 # number of second before a progress caching is considered failed and tried again | |
54 PROGRESS_DEADLINE = 60 * 60 * 6 | |
55 | |
56 | |
57 | |
58 class PubsubCache: | |
59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, | |
60 # but it would be necessary to have this data if some devices unsubscribe a cached | |
61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this | |
62 # situation. | |
63 # TODO: handle configuration events | |
64 | |
65 def __init__(self, host): | |
66 log.info(_("PubSub Cache initialization")) | |
67 strategy = host.memory.getConfig(None, "pubsub_cache_strategy") | |
68 if strategy == "no_cache": | |
69 log.info( | |
70 _( | |
71 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " | |
72 "setting." | |
73 ).format(value=repr(strategy)) | |
74 ) | |
75 self.use_cache = False | |
76 else: | |
77 self.use_cache = True | |
78 self.host = host | |
79 self._p = host.plugins["XEP-0060"] | |
80 self.analysers = {} | |
81 # map for caching in progress (node, service) => Deferred | |
82 self.in_progress = {} | |
83 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) | |
84 self._p.addManagedNode( | |
85 "", | |
86 items_cb=self.onItemsEvent, | |
87 delete_cb=self.onDeleteEvent, | |
88 purge_db=self.onPurgeEvent, | |
89 ) | |
90 host.bridge.addMethod( | |
91 "psCacheGet", | |
92 ".plugin", | |
93 in_sign="ssiassss", | |
94 out_sign="s", | |
95 method=self._getItemsFromCache, | |
96 async_=True, | |
97 ) | |
98 host.bridge.addMethod( | |
99 "psCacheSync", | |
100 ".plugin", | |
101 "sss", | |
102 out_sign="", | |
103 method=self._synchronise, | |
104 async_=True, | |
105 ) | |
106 host.bridge.addMethod( | |
107 "psCachePurge", | |
108 ".plugin", | |
109 "s", | |
110 out_sign="", | |
111 method=self._purge, | |
112 async_=True, | |
113 ) | |
114 host.bridge.addMethod( | |
115 "psCacheReset", | |
116 ".plugin", | |
117 "", | |
118 out_sign="", | |
119 method=self._reset, | |
120 async_=True, | |
121 ) | |
122 | |
123 def registerAnalyser(self, analyser: dict) -> None: | |
124 """Register a new pubsub node analyser | |
125 | |
126 @param analyser: An analyser is a dictionary which may have the following keys | |
127 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys | |
128 must be used): | |
129 | |
130 :name (str)*: | |
131 a unique name for this analyser. This name will be stored in database | |
132 to retrieve the analyser when necessary (notably to get the parsing method), | |
133 thus it is recommended to use a stable name such as the source plugin name | |
134 instead of a name which may change with standard evolution, such as the | |
135 feature namespace. | |
136 | |
137 :type (str)*: | |
138 indicates what kind of items we are dealing with. Type must be a human | |
139 readable word, as it may be used in searches. Good types examples are | |
140 **blog** or **event**. | |
141 | |
142 :node (str): | |
143 prefix of a node name which may be used to identify its type. Example: | |
144 *urn:xmpp:microblog:0* (a node starting with this name will be identified as | |
145 *blog* node). | |
146 | |
147 :namespace (str): | |
148 root namespace of items. When analysing a node, the first item will be | |
149 retrieved. The analyser will be chosen its given namespace match the | |
150 namespace of the first child element of ``<item>`` element. | |
151 | |
152 :to_sync (bool): | |
153 if True, the node must be synchronised in cache. The default False value | |
154 means that the pubsub service will always be requested. | |
155 | |
156 :parser (callable): | |
157 method (which may be sync, a coroutine or a method returning a "Deferred") | |
158 to call to parse the ``domish.Element`` of the item. The result must be | |
159 dictionary which can be serialised to JSON. | |
160 | |
161 The method must have the following signature: | |
162 | |
163 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ | |
164 service: Optional[jid.JID], node: Optional[str]) \ | |
165 -> dict | |
166 :noindex: | |
167 | |
168 :match_cb (callable): | |
169 method (which may be sync, a coroutine or a method returning a "Deferred") | |
170 called when the analyser matches. The method is called with the curreny | |
171 analyse which is can modify **in-place**. | |
172 | |
173 The method must have the following signature: | |
174 | |
175 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None | |
176 :noindex: | |
177 | |
178 @raise exceptions.Conflict: a analyser with this name already exists | |
179 """ | |
180 | |
181 name = analyser.get("name", "").strip().lower() | |
182 # we want the normalised name | |
183 analyser["name"] = name | |
184 if not name: | |
185 raise ValueError('"name" is mandatory in analyser') | |
186 if "type" not in analyser: | |
187 raise ValueError('"type" is mandatory in analyser') | |
188 type_test_keys = {"node", "namespace"} | |
189 if not type_test_keys.intersection(analyser): | |
190 raise ValueError(f'at least one of {type_test_keys} must be used') | |
191 if name in self.analysers: | |
192 raise exceptions.Conflict( | |
193 f"An analyser with the name {name!r} is already registered" | |
194 ) | |
195 self.analysers[name] = analyser | |
196 | |
197 async def cacheItems( | |
198 self, | |
199 client: SatXMPPEntity, | |
200 pubsub_node: PubsubNode, | |
201 items: List[domish.Element] | |
202 ) -> None: | |
203 try: | |
204 parser = self.analysers[pubsub_node.analyser].get("parser") | |
205 except KeyError: | |
206 parser = None | |
207 | |
208 if parser is not None: | |
209 parsed_items = [ | |
210 await utils.asDeferred( | |
211 parser, | |
212 client, | |
213 item, | |
214 pubsub_node.service, | |
215 pubsub_node.name | |
216 ) | |
217 for item in items | |
218 ] | |
219 else: | |
220 parsed_items = None | |
221 | |
222 await self.host.memory.storage.cachePubsubItems( | |
223 client, pubsub_node, items, parsed_items | |
224 ) | |
225 | |
226 async def _cacheNode( | |
227 self, | |
228 client: SatXMPPEntity, | |
229 pubsub_node: PubsubNode | |
230 ) -> None: | |
231 await self.host.memory.storage.updatePubsubNodeSyncState( | |
232 pubsub_node, SyncState.IN_PROGRESS | |
233 ) | |
234 service, node = pubsub_node.service, pubsub_node.name | |
235 try: | |
236 log.debug( | |
237 f"Caching node {node!r} at {service} for {client.profile}" | |
238 ) | |
239 if not pubsub_node.subscribed: | |
240 try: | |
241 sub = await self._p.subscribe(client, service, node) | |
242 except Exception as e: | |
243 log.warning( | |
244 _( | |
245 "Can't subscribe node {pubsub_node}, that means that " | |
246 "synchronisation can't be maintained: {reason}" | |
247 ).format(pubsub_node=pubsub_node, reason=e) | |
248 ) | |
249 else: | |
250 if sub.state == "subscribed": | |
251 sub_id = sub.subscriptionIdentifier | |
252 log.debug( | |
253 f"{pubsub_node} subscribed (subscription id: {sub_id!r})" | |
254 ) | |
255 pubsub_node.subscribed = True | |
256 await self.host.memory.storage.add(pubsub_node) | |
257 else: | |
258 log.warning( | |
259 _( | |
260 "{pubsub_node} is not subscribed, that means that " | |
261 "synchronisation can't be maintained, and you may have " | |
262 "to enforce subscription manually. Subscription state: " | |
263 "{state}" | |
264 ).format(pubsub_node=pubsub_node, state=sub.state) | |
265 ) | |
266 | |
267 try: | |
268 await self.host.checkFeatures( | |
269 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service | |
270 ) | |
271 except exceptions.FeatureNotFound: | |
272 log.warning( | |
273 f"service {service} doesn't handle Result Set Management " | |
274 "(XEP-0059), we'll only cache latest 20 items" | |
275 ) | |
276 items, __ = await client.pubsub_client.items( | |
277 pubsub_node.service, pubsub_node.name, maxItems=20 | |
278 ) | |
279 await self.cacheItems( | |
280 client, pubsub_node, items | |
281 ) | |
282 else: | |
283 rsm_p = self.host.plugins["XEP-0059"] | |
284 rsm_request = rsm.RSMRequest() | |
285 cached_ids = set() | |
286 while True: | |
287 items, rsm_response = await client.pubsub_client.items( | |
288 service, node, rsm_request=rsm_request | |
289 ) | |
290 await self.cacheItems( | |
291 client, pubsub_node, items | |
292 ) | |
293 for item in items: | |
294 item_id = item["id"] | |
295 if item_id in cached_ids: | |
296 log.warning( | |
297 f"Pubsub node {node!r} at {service} is returning several " | |
298 f"times the same item ({item_id!r}). This is illegal " | |
299 "behaviour, and it means that Pubsub service " | |
300 f"{service} is buggy and can't be cached properly. " | |
301 f"Please report this to {service.host} administrators" | |
302 ) | |
303 rsm_request = None | |
304 break | |
305 cached_ids.add(item["id"]) | |
306 if len(cached_ids) >= CACHE_LIMIT: | |
307 log.warning( | |
308 f"Pubsub node {node!r} at {service} contains more items " | |
309 f"than the cache limit ({CACHE_LIMIT}). We stop " | |
310 "caching here, at item {item['id']!r}." | |
311 ) | |
312 rsm_request = None | |
313 break | |
314 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) | |
315 if rsm_request is None: | |
316 break | |
317 | |
318 await self.host.memory.storage.updatePubsubNodeSyncState( | |
319 pubsub_node, SyncState.COMPLETED | |
320 ) | |
321 except Exception as e: | |
322 import traceback | |
323 tb = traceback.format_tb(e.__traceback__) | |
324 log.error( | |
325 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" | |
326 ) | |
327 await self.host.memory.storage.updatePubsubNodeSyncState( | |
328 pubsub_node, SyncState.ERROR | |
329 ) | |
330 await self.host.memory.storage.deletePubsubItems(pubsub_node) | |
331 raise e | |
332 | |
333 def _cacheNodeClean(self, __, pubsub_node): | |
334 del self.in_progress[(pubsub_node.service, pubsub_node.name)] | |
335 | |
336 def cacheNode( | |
337 self, | |
338 client: SatXMPPEntity, | |
339 pubsub_node: PubsubNode | |
340 ) -> None: | |
341 """Launch node caching as a background task""" | |
342 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) | |
343 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) | |
344 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d | |
345 return d | |
346 | |
347 async def analyseNode( | |
348 self, | |
349 client: SatXMPPEntity, | |
350 service: jid.JID, | |
351 node: str, | |
352 pubsub_node : PubsubNode = None, | |
353 ) -> dict: | |
354 """Use registered analysers on a node to determine what it is used for""" | |
355 analyse = {"service": service, "node": node} | |
356 if pubsub_node is None: | |
357 try: | |
358 first_item = (await client.pubsub_client.items( | |
359 service, node, 1 | |
360 ))[0][0] | |
361 except IndexError: | |
362 pass | |
363 except error.StanzaError as e: | |
364 if e.condition == "item-not-found": | |
365 pass | |
366 else: | |
367 log.warning( | |
368 f"Can't retrieve last item on node {node!r} at service " | |
369 f"{service} for {client.profile}: {e}" | |
370 ) | |
371 else: | |
372 try: | |
373 uri = first_item.firstChildElement().uri | |
374 except Exception as e: | |
375 log.warning( | |
376 f"Can't retrieve item namespace on node {node!r} at service " | |
377 f"{service} for {client.profile}: {e}" | |
378 ) | |
379 else: | |
380 analyse["namespace"] = uri | |
381 try: | |
382 conf = await self._p.getConfiguration(client, service, node) | |
383 except Exception as e: | |
384 log.warning( | |
385 f"Can't retrieve configuration for node {node!r} at service {service} " | |
386 f"for {client.profile}: {e}" | |
387 ) | |
388 else: | |
389 analyse["conf"] = conf | |
390 | |
391 for analyser in self.analysers.values(): | |
392 match_cb = analyser.get("match_cb") | |
393 try: | |
394 an_node = analyser["node"] | |
395 except KeyError: | |
396 pass | |
397 else: | |
398 if node.startswith(an_node): | |
399 for key in ANALYSER_KEYS_TO_COPY: | |
400 try: | |
401 analyse[key] = analyser[key] | |
402 except KeyError: | |
403 pass | |
404 found = True | |
405 break | |
406 try: | |
407 namespace = analyse["namespace"] | |
408 an_namespace = analyser["namespace"] | |
409 except KeyError: | |
410 pass | |
411 else: | |
412 if namespace == an_namespace: | |
413 for key in ANALYSER_KEYS_TO_COPY: | |
414 try: | |
415 analyse[key] = analyser[key] | |
416 except KeyError: | |
417 pass | |
418 found = True | |
419 break | |
420 | |
421 else: | |
422 found = False | |
423 log.debug( | |
424 f"node {node!r} at service {service} doesn't match any known type" | |
425 ) | |
426 if found: | |
427 try: | |
428 match_cb = analyser["match_cb"] | |
429 except KeyError: | |
430 pass | |
431 else: | |
432 await match_cb(client, analyse) | |
433 return analyse | |
434 | |
435 def _getItemsFromCache( | |
436 self, service="", node="", max_items=10, item_ids=None, sub_id=None, | |
437 extra="", profile_key=C.PROF_KEY_NONE | |
438 ): | |
439 d = defer.ensureDeferred(self._aGetItemsFromCache( | |
440 service, node, max_items, item_ids, sub_id, extra, profile_key | |
441 )) | |
442 d.addCallback(self._p.transItemsData) | |
443 d.addCallback(self._p.serialiseItems) | |
444 return d | |
445 | |
446 async def _aGetItemsFromCache( | |
447 self, service, node, max_items, item_ids, sub_id, extra, profile_key | |
448 ): | |
449 client = self.host.getClient(profile_key) | |
450 service = jid.JID(service) if service else client.jid.userhostJID() | |
451 pubsub_node = await self.host.memory.storage.getPubsubNode( | |
452 client, service, node | |
453 ) | |
454 if pubsub_node is None: | |
455 raise exceptions.NotFound( | |
456 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" | |
457 ) | |
458 max_items = None if max_items == C.NO_LIMIT else max_items | |
459 extra = self._p.parseExtra(data_format.deserialise(extra)) | |
460 items, metadata = await self.getItemsFromCache( | |
461 client, | |
462 pubsub_node, | |
463 max_items, | |
464 item_ids, | |
465 sub_id or None, | |
466 extra.rsm_request, | |
467 extra.extra, | |
468 ) | |
469 return [i.data for i in items], metadata | |
470 | |
471 async def getItemsFromCache( | |
472 self, | |
473 client: SatXMPPEntity, | |
474 node: PubsubNode, | |
475 max_items: Optional[int] = None, | |
476 item_ids: Optional[List[str]] = None, | |
477 sub_id: Optional[str] = None, | |
478 rsm_request: Optional[rsm.RSMRequest] = None, | |
479 extra: Optional[dict] = None | |
480 ) -> Tuple[List[PubsubItem], dict]: | |
481 """Get items from cache, using same arguments as for external Pubsub request""" | |
482 if "mam" in extra: | |
483 raise NotImplementedError("MAM queries are not supported yet") | |
484 if max_items is None and rsm_request is None: | |
485 max_items = 20 | |
486 if max_items is not None: | |
487 if rsm_request is not None: | |
488 raise exceptions.InternalError( | |
489 "Pubsub max items and RSM must not be used at the same time" | |
490 ) | |
491 elif item_ids is None: | |
492 raise exceptions.InternalError( | |
493 "Pubsub max items and item IDs must not be used at the same time" | |
494 ) | |
495 pubsub_items, metadata = await self.host.memory.storage.getItems( | |
496 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) | |
497 ) | |
498 else: | |
499 desc = False | |
500 if rsm_request.before == "": | |
501 before = None | |
502 desc = True | |
503 else: | |
504 before = rsm_request.before | |
505 pubsub_items, metadata = await self.host.memory.storage.getItems( | |
506 node, max_items=rsm_request.max, before=before, after=rsm_request.after, | |
507 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), | |
508 desc=desc, force_rsm=True, | |
509 ) | |
510 | |
511 return pubsub_items, metadata | |
512 | |
513 async def onItemsEvent(self, client, event): | |
514 node = await self.host.memory.storage.getPubsubNode( | |
515 client, event.sender, event.nodeIdentifier | |
516 ) | |
517 if node is None: | |
518 return | |
519 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): | |
520 items = [] | |
521 retract_ids = [] | |
522 for elt in event.items: | |
523 if elt.name == "item": | |
524 items.append(elt) | |
525 elif elt.name == "retract": | |
526 item_id = elt.getAttribute("id") | |
527 if not item_id: | |
528 log.warning( | |
529 "Ignoring invalid retract item element: " | |
530 f"{xml_tools.pFmtElt(elt)}" | |
531 ) | |
532 continue | |
533 | |
534 retract_ids.append(elt["id"]) | |
535 else: | |
536 log.warning( | |
537 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" | |
538 ) | |
539 if items: | |
540 log.debug("caching new items received from {node}") | |
541 await self.cacheItems( | |
542 client, node, items | |
543 ) | |
544 if retract_ids: | |
545 log.debug(f"deleting retracted items from {node}") | |
546 await self.host.memory.storage.deletePubsubItems( | |
547 node, items_names=retract_ids | |
548 ) | |
549 | |
550 async def onDeleteEvent(self, client, event): | |
551 log.debug( | |
552 f"deleting node {event.nodeIdentifier} from {event.sender} for " | |
553 f"{client.profile}" | |
554 ) | |
555 await self.host.memory.storage.deletePubsubNode( | |
556 [client.profile], [event.sender], [event.nodeIdentifier] | |
557 ) | |
558 | |
559 async def onPurgeEvent(self, client, event): | |
560 node = await self.host.memory.storage.getPubsubNode( | |
561 client, event.sender, event.nodeIdentifier | |
562 ) | |
563 if node is None: | |
564 return | |
565 log.debug(f"purging node {node} for {client.profile}") | |
566 await self.host.memory.storage.deletePubsubItems(node) | |
567 | |
568 async def _getItemsTrigger( | |
569 self, | |
570 client: SatXMPPEntity, | |
571 service: Optional[jid.JID], | |
572 node: str, | |
573 max_items: Optional[int], | |
574 item_ids: Optional[List[str]], | |
575 sub_id: Optional[str], | |
576 rsm_request: Optional[rsm.RSMRequest], | |
577 extra: dict | |
578 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: | |
579 if not self.use_cache: | |
580 log.debug("cache disabled in settings") | |
581 return True, None | |
582 if extra.get(C.KEY_USE_CACHE) == False: | |
583 log.debug("skipping pubsub cache as requested") | |
584 return True, None | |
585 if service is None: | |
586 service = client.jid.userhostJID() | |
587 pubsub_node = await self.host.memory.storage.getPubsubNode( | |
588 client, service, node | |
589 ) | |
590 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: | |
591 analyse = {"to_sync": True} | |
592 else: | |
593 analyse = await self.analyseNode(client, service, node) | |
594 | |
595 if pubsub_node is None: | |
596 pubsub_node = await self.host.memory.storage.setPubsubNode( | |
597 client, | |
598 service, | |
599 node, | |
600 analyser=analyse.get("name"), | |
601 type_=analyse.get("type"), | |
602 subtype=analyse.get("subtype"), | |
603 ) | |
604 | |
605 if analyse.get("to_sync"): | |
606 if pubsub_node.sync_state == SyncState.COMPLETED: | |
607 if "mam" in extra: | |
608 log.debug("MAM caching is not supported yet, skipping cache") | |
609 return True, None | |
610 pubsub_items, metadata = await self.getItemsFromCache( | |
611 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra | |
612 ) | |
613 return False, ([i.data for i in pubsub_items], metadata) | |
614 | |
615 if pubsub_node.sync_state == SyncState.IN_PROGRESS: | |
616 if (service, node) not in self.in_progress: | |
617 log.warning( | |
618 f"{pubsub_node} is reported as being cached, but not caching is " | |
619 "in progress, this is most probably due to the backend being " | |
620 "restarted. Resetting the status, caching will be done again." | |
621 ) | |
622 pubsub_node.sync_state = None | |
623 await self.host.memory.storage.deletePubsubItems(pubsub_node) | |
624 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: | |
625 log.warning( | |
626 f"{pubsub_node} is in progress for too long " | |
627 f"({pubsub_node.sync_state_updated//60} minutes), " | |
628 "cancelling it and retrying." | |
629 ) | |
630 self.in_progress.pop[(service, node)].cancel() | |
631 pubsub_node.sync_state = None | |
632 await self.host.memory.storage.deletePubsubItems(pubsub_node) | |
633 else: | |
634 log.debug( | |
635 f"{pubsub_node} synchronisation is already in progress, skipping" | |
636 ) | |
637 if pubsub_node.sync_state is None: | |
638 key = (service, node) | |
639 if key in self.in_progress: | |
640 raise exceptions.InternalError( | |
641 f"There is already a caching in progress for {pubsub_node}, this " | |
642 "should not happen" | |
643 ) | |
644 self.cacheNode(client, pubsub_node) | |
645 elif pubsub_node.sync_state == SyncState.ERROR: | |
646 log.debug( | |
647 f"{pubsub_node} synchronisation has previously failed, skipping" | |
648 ) | |
649 | |
650 return True, None | |
651 | |
652 async def _subscribeTrigger( | |
653 self, | |
654 client: SatXMPPEntity, | |
655 service: jid.JID, | |
656 nodeIdentifier: str, | |
657 sub_jid: Optional[jid.JID], | |
658 options: Optional[dict], | |
659 subscription: pubsub.Subscription | |
660 ) -> None: | |
661 pass | |
662 | |
663 async def _unsubscribeTrigger( | |
664 self, | |
665 client: SatXMPPEntity, | |
666 service: jid.JID, | |
667 nodeIdentifier: str, | |
668 sub_jid, | |
669 subscriptionIdentifier, | |
670 sender, | |
671 ) -> None: | |
672 pass | |
673 | |
674 def _synchronise(self, service, node, profile_key): | |
675 client = self.host.getClient(profile_key) | |
676 service = client.jid.userhostJID() if not service else jid.JID(service) | |
677 return defer.ensureDeferred(self.synchronise(client, service, node)) | |
678 | |
679 async def synchronise( | |
680 self, | |
681 client: SatXMPPEntity, | |
682 service: jid.JID, | |
683 node: str | |
684 ) -> None: | |
685 """Synchronise a node with a pubsub service | |
686 | |
687 If the node is already synchronised, it will be resynchronised (all items will be | |
688 deleted and re-downloaded). | |
689 | |
690 The node will be synchronised even if there is no matching analyser. | |
691 | |
692 Note that when a node is synchronised, it is automatically subscribed. | |
693 """ | |
694 pubsub_node = await self.host.memory.storage.getPubsubNode( | |
695 client, service, node | |
696 ) | |
697 if pubsub_node is None: | |
698 log.info( | |
699 _( | |
700 "Synchronising the new node {node} at {service}" | |
701 ).format(node=node, service=service.full) | |
702 ) | |
703 analyse = await self.analyseNode(client, service, node) | |
704 pubsub_node = await self.host.memory.storage.setPubsubNode( | |
705 client, | |
706 service, | |
707 node, | |
708 analyser=analyse.get("name"), | |
709 type_=analyse.get("type"), | |
710 ) | |
711 | |
712 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS | |
713 or (service, node) in self.in_progress)): | |
714 log.warning( | |
715 _( | |
716 "{node} at {service} is already being synchronised, can't do a new " | |
717 "synchronisation." | |
718 ).format(node=node, service=service) | |
719 ) | |
720 else: | |
721 log.info( | |
722 _( | |
723 "(Re)Synchronising the node {node} at {service} on user request" | |
724 ).format(node=node, service=service.full) | |
725 ) | |
726 # we first delete and recreate the node (will also delete its items) | |
727 await self.host.memory.storage.delete(pubsub_node) | |
728 analyse = await self.analyseNode(client, service, node) | |
729 pubsub_node = await self.host.memory.storage.setPubsubNode( | |
730 client, | |
731 service, | |
732 node, | |
733 analyser=analyse.get("name"), | |
734 type_=analyse.get("type"), | |
735 ) | |
736 # then we can put node in cache | |
737 await self.cacheNode(client, pubsub_node) | |
738 | |
739 async def purge(self, purge_filters: dict) -> None: | |
740 """Remove items according to filters | |
741 | |
742 filters can have on of the following keys, all are optional: | |
743 | |
744 :services: | |
745 list of JIDs of services from which items must be deleted | |
746 :nodes: | |
747 list of node names to delete | |
748 :types: | |
749 list of node types to delete | |
750 :subtypes: | |
751 list of node subtypes to delete | |
752 :profiles: | |
753 list of profiles from which items must be deleted | |
754 :created_before: | |
755 datetime before which items must have been created to be deleted | |
756 :created_update: | |
757 datetime before which items must have been updated last to be deleted | |
758 """ | |
759 purge_filters["names"] = purge_filters.pop("nodes", None) | |
760 await self.host.memory.storage.purgePubsubItems(**purge_filters) | |
761 | |
762 def _purge(self, purge_filters: str) -> None: | |
763 purge_filters = data_format.deserialise(purge_filters) | |
764 for key in "created_before", "updated_before": | |
765 try: | |
766 purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) | |
767 except (KeyError, TypeError): | |
768 pass | |
769 return defer.ensureDeferred(self.purge(purge_filters)) | |
770 | |
771 async def reset(self) -> None: | |
772 """Remove ALL nodes and items from cache | |
773 | |
774 After calling this method, cache will be refilled progressively as if it where new | |
775 """ | |
776 await self.host.memory.storage.deletePubsubNode(None, None, None) | |
777 | |
778 def _reset(self) -> None: | |
779 return defer.ensureDeferred(self.reset()) |