Mercurial > libervia-backend
comparison sat/plugins/plugin_pubsub_cache.py @ 3715:b9718216a1c0 0.9
merge bookmark 0.9
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Dec 2021 16:13:31 +0100 |
parents | 342e3ddefd23 |
children | ffa8c8c78115 |
comparison
equal
deleted
inserted
replaced
3714:af09b5aaa5d7 | 3715:b9718216a1c0 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for PubSub Caching | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import time | |
20 from datetime import datetime | |
21 from typing import Optional, List, Tuple | |
22 from twisted.words.protocols.jabber import jid, error | |
23 from twisted.words.xish import domish | |
24 from twisted.internet import defer | |
25 from wokkel import pubsub, rsm | |
26 from sat.core.i18n import _ | |
27 from sat.core.constants import Const as C | |
28 from sat.core import exceptions | |
29 from sat.core.log import getLogger | |
30 from sat.core.core_types import SatXMPPEntity | |
31 from sat.tools import xml_tools, utils | |
32 from sat.tools.common import data_format | |
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState | |
34 | |
35 | |
36 log = getLogger(__name__) | |
37 | |
38 PLUGIN_INFO = { | |
39 C.PI_NAME: "PubSub Cache", | |
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE", | |
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB, | |
42 C.PI_PROTOCOLS: [], | |
43 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], | |
44 C.PI_RECOMMENDATIONS: [], | |
45 C.PI_MAIN: "PubsubCache", | |
46 C.PI_HANDLER: "no", | |
47 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), | |
48 } | |
49 | |
50 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") | |
51 # maximum of items to cache | |
52 CACHE_LIMIT = 5000 | |
53 # number of second before a progress caching is considered failed and tried again | |
54 PROGRESS_DEADLINE = 60 * 60 * 6 | |
55 | |
56 | |
57 | |
58 class PubsubCache: | |
59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, | |
60 # but it would be necessary to have this data if some devices unsubscribe a cached | |
61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this | |
62 # situation. | |
63 # TODO: handle configuration events | |
64 | |
65 def __init__(self, host): | |
66 log.info(_("PubSub Cache initialization")) | |
67 strategy = host.memory.getConfig(None, "pubsub_cache_strategy") | |
68 if strategy == "no_cache": | |
69 log.info( | |
70 _( | |
71 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " | |
72 "setting." | |
73 ).format(value=repr(strategy)) | |
74 ) | |
75 self.use_cache = False | |
76 else: | |
77 self.use_cache = True | |
78 self.host = host | |
79 self._p = host.plugins["XEP-0060"] | |
80 self.analysers = {} | |
81 # map for caching in progress (node, service) => Deferred | |
82 self.in_progress = {} | |
83 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) | |
84 self._p.addManagedNode( | |
85 "", | |
86 items_cb=self.onItemsEvent, | |
87 delete_cb=self.onDeleteEvent, | |
88 purge_db=self.onPurgeEvent, | |
89 ) | |
90 host.bridge.addMethod( | |
91 "psCacheGet", | |
92 ".plugin", | |
93 in_sign="ssiassss", | |
94 out_sign="s", | |
95 method=self._getItemsFromCache, | |
96 async_=True, | |
97 ) | |
98 host.bridge.addMethod( | |
99 "psCacheSync", | |
100 ".plugin", | |
101 "sss", | |
102 out_sign="", | |
103 method=self._synchronise, | |
104 async_=True, | |
105 ) | |
106 host.bridge.addMethod( | |
107 "psCachePurge", | |
108 ".plugin", | |
109 "s", | |
110 out_sign="", | |
111 method=self._purge, | |
112 async_=True, | |
113 ) | |
114 host.bridge.addMethod( | |
115 "psCacheReset", | |
116 ".plugin", | |
117 "", | |
118 out_sign="", | |
119 method=self._reset, | |
120 async_=True, | |
121 ) | |
122 host.bridge.addMethod( | |
123 "psCacheSearch", | |
124 ".plugin", | |
125 "s", | |
126 out_sign="s", | |
127 method=self._search, | |
128 async_=True, | |
129 ) | |
130 | |
131 def registerAnalyser(self, analyser: dict) -> None: | |
132 """Register a new pubsub node analyser | |
133 | |
134 @param analyser: An analyser is a dictionary which may have the following keys | |
135 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys | |
136 must be used): | |
137 | |
138 :name (str)*: | |
139 a unique name for this analyser. This name will be stored in database | |
140 to retrieve the analyser when necessary (notably to get the parsing method), | |
141 thus it is recommended to use a stable name such as the source plugin name | |
142 instead of a name which may change with standard evolution, such as the | |
143 feature namespace. | |
144 | |
145 :type (str)*: | |
146 indicates what kind of items we are dealing with. Type must be a human | |
147 readable word, as it may be used in searches. Good types examples are | |
148 **blog** or **event**. | |
149 | |
150 :node (str): | |
151 prefix of a node name which may be used to identify its type. Example: | |
152 *urn:xmpp:microblog:0* (a node starting with this name will be identified as | |
153 *blog* node). | |
154 | |
155 :namespace (str): | |
156 root namespace of items. When analysing a node, the first item will be | |
157 retrieved. The analyser will be chosen its given namespace match the | |
158 namespace of the first child element of ``<item>`` element. | |
159 | |
160 :to_sync (bool): | |
161 if True, the node must be synchronised in cache. The default False value | |
162 means that the pubsub service will always be requested. | |
163 | |
164 :parser (callable): | |
165 method (which may be sync, a coroutine or a method returning a "Deferred") | |
166 to call to parse the ``domish.Element`` of the item. The result must be | |
167 dictionary which can be serialised to JSON. | |
168 | |
169 The method must have the following signature: | |
170 | |
171 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ | |
172 service: Optional[jid.JID], node: Optional[str]) \ | |
173 -> dict | |
174 :noindex: | |
175 | |
176 :match_cb (callable): | |
177 method (which may be sync, a coroutine or a method returning a "Deferred") | |
178 called when the analyser matches. The method is called with the curreny | |
179 analyse which is can modify **in-place**. | |
180 | |
181 The method must have the following signature: | |
182 | |
183 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None | |
184 :noindex: | |
185 | |
186 @raise exceptions.Conflict: a analyser with this name already exists | |
187 """ | |
188 | |
189 name = analyser.get("name", "").strip().lower() | |
190 # we want the normalised name | |
191 analyser["name"] = name | |
192 if not name: | |
193 raise ValueError('"name" is mandatory in analyser') | |
194 if "type" not in analyser: | |
195 raise ValueError('"type" is mandatory in analyser') | |
196 type_test_keys = {"node", "namespace"} | |
197 if not type_test_keys.intersection(analyser): | |
198 raise ValueError(f'at least one of {type_test_keys} must be used') | |
199 if name in self.analysers: | |
200 raise exceptions.Conflict( | |
201 f"An analyser with the name {name!r} is already registered" | |
202 ) | |
203 self.analysers[name] = analyser | |
204 | |
205 async def cacheItems( | |
206 self, | |
207 client: SatXMPPEntity, | |
208 pubsub_node: PubsubNode, | |
209 items: List[domish.Element] | |
210 ) -> None: | |
211 try: | |
212 parser = self.analysers[pubsub_node.analyser].get("parser") | |
213 except KeyError: | |
214 parser = None | |
215 | |
216 if parser is not None: | |
217 parsed_items = [ | |
218 await utils.asDeferred( | |
219 parser, | |
220 client, | |
221 item, | |
222 pubsub_node.service, | |
223 pubsub_node.name | |
224 ) | |
225 for item in items | |
226 ] | |
227 else: | |
228 parsed_items = None | |
229 | |
230 await self.host.memory.storage.cachePubsubItems( | |
231 client, pubsub_node, items, parsed_items | |
232 ) | |
233 | |
234 async def _cacheNode( | |
235 self, | |
236 client: SatXMPPEntity, | |
237 pubsub_node: PubsubNode | |
238 ) -> None: | |
239 await self.host.memory.storage.updatePubsubNodeSyncState( | |
240 pubsub_node, SyncState.IN_PROGRESS | |
241 ) | |
242 service, node = pubsub_node.service, pubsub_node.name | |
243 try: | |
244 log.debug( | |
245 f"Caching node {node!r} at {service} for {client.profile}" | |
246 ) | |
247 if not pubsub_node.subscribed: | |
248 try: | |
249 sub = await self._p.subscribe(client, service, node) | |
250 except Exception as e: | |
251 log.warning( | |
252 _( | |
253 "Can't subscribe node {pubsub_node}, that means that " | |
254 "synchronisation can't be maintained: {reason}" | |
255 ).format(pubsub_node=pubsub_node, reason=e) | |
256 ) | |
257 else: | |
258 if sub.state == "subscribed": | |
259 sub_id = sub.subscriptionIdentifier | |
260 log.debug( | |
261 f"{pubsub_node} subscribed (subscription id: {sub_id!r})" | |
262 ) | |
263 pubsub_node.subscribed = True | |
264 await self.host.memory.storage.add(pubsub_node) | |
265 else: | |
266 log.warning( | |
267 _( | |
268 "{pubsub_node} is not subscribed, that means that " | |
269 "synchronisation can't be maintained, and you may have " | |
270 "to enforce subscription manually. Subscription state: " | |
271 "{state}" | |
272 ).format(pubsub_node=pubsub_node, state=sub.state) | |
273 ) | |
274 | |
275 try: | |
276 await self.host.checkFeatures( | |
277 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service | |
278 ) | |
279 except exceptions.FeatureNotFound: | |
280 log.warning( | |
281 f"service {service} doesn't handle Result Set Management " | |
282 "(XEP-0059), we'll only cache latest 20 items" | |
283 ) | |
284 items, __ = await client.pubsub_client.items( | |
285 pubsub_node.service, pubsub_node.name, maxItems=20 | |
286 ) | |
287 await self.cacheItems( | |
288 client, pubsub_node, items | |
289 ) | |
290 else: | |
291 rsm_p = self.host.plugins["XEP-0059"] | |
292 rsm_request = rsm.RSMRequest() | |
293 cached_ids = set() | |
294 while True: | |
295 items, rsm_response = await client.pubsub_client.items( | |
296 service, node, rsm_request=rsm_request | |
297 ) | |
298 await self.cacheItems( | |
299 client, pubsub_node, items | |
300 ) | |
301 for item in items: | |
302 item_id = item["id"] | |
303 if item_id in cached_ids: | |
304 log.warning( | |
305 f"Pubsub node {node!r} at {service} is returning several " | |
306 f"times the same item ({item_id!r}). This is illegal " | |
307 "behaviour, and it means that Pubsub service " | |
308 f"{service} is buggy and can't be cached properly. " | |
309 f"Please report this to {service.host} administrators" | |
310 ) | |
311 rsm_request = None | |
312 break | |
313 cached_ids.add(item["id"]) | |
314 if len(cached_ids) >= CACHE_LIMIT: | |
315 log.warning( | |
316 f"Pubsub node {node!r} at {service} contains more items " | |
317 f"than the cache limit ({CACHE_LIMIT}). We stop " | |
318 "caching here, at item {item['id']!r}." | |
319 ) | |
320 rsm_request = None | |
321 break | |
322 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) | |
323 if rsm_request is None: | |
324 break | |
325 | |
326 await self.host.memory.storage.updatePubsubNodeSyncState( | |
327 pubsub_node, SyncState.COMPLETED | |
328 ) | |
329 except Exception as e: | |
330 import traceback | |
331 tb = traceback.format_tb(e.__traceback__) | |
332 log.error( | |
333 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" | |
334 ) | |
335 await self.host.memory.storage.updatePubsubNodeSyncState( | |
336 pubsub_node, SyncState.ERROR | |
337 ) | |
338 await self.host.memory.storage.deletePubsubItems(pubsub_node) | |
339 raise e | |
340 | |
341 def _cacheNodeClean(self, __, pubsub_node): | |
342 del self.in_progress[(pubsub_node.service, pubsub_node.name)] | |
343 | |
344 def cacheNode( | |
345 self, | |
346 client: SatXMPPEntity, | |
347 pubsub_node: PubsubNode | |
348 ) -> None: | |
349 """Launch node caching as a background task""" | |
350 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) | |
351 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) | |
352 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d | |
353 return d | |
354 | |
355 async def analyseNode( | |
356 self, | |
357 client: SatXMPPEntity, | |
358 service: jid.JID, | |
359 node: str, | |
360 pubsub_node : PubsubNode = None, | |
361 ) -> dict: | |
362 """Use registered analysers on a node to determine what it is used for""" | |
363 analyse = {"service": service, "node": node} | |
364 if pubsub_node is None: | |
365 try: | |
366 first_item = (await client.pubsub_client.items( | |
367 service, node, 1 | |
368 ))[0][0] | |
369 except IndexError: | |
370 pass | |
371 except error.StanzaError as e: | |
372 if e.condition == "item-not-found": | |
373 pass | |
374 else: | |
375 log.warning( | |
376 f"Can't retrieve last item on node {node!r} at service " | |
377 f"{service} for {client.profile}: {e}" | |
378 ) | |
379 else: | |
380 try: | |
381 uri = first_item.firstChildElement().uri | |
382 except Exception as e: | |
383 log.warning( | |
384 f"Can't retrieve item namespace on node {node!r} at service " | |
385 f"{service} for {client.profile}: {e}" | |
386 ) | |
387 else: | |
388 analyse["namespace"] = uri | |
389 try: | |
390 conf = await self._p.getConfiguration(client, service, node) | |
391 except Exception as e: | |
392 log.warning( | |
393 f"Can't retrieve configuration for node {node!r} at service {service} " | |
394 f"for {client.profile}: {e}" | |
395 ) | |
396 else: | |
397 analyse["conf"] = conf | |
398 | |
399 for analyser in self.analysers.values(): | |
400 try: | |
401 an_node = analyser["node"] | |
402 except KeyError: | |
403 pass | |
404 else: | |
405 if node.startswith(an_node): | |
406 for key in ANALYSER_KEYS_TO_COPY: | |
407 try: | |
408 analyse[key] = analyser[key] | |
409 except KeyError: | |
410 pass | |
411 found = True | |
412 break | |
413 try: | |
414 namespace = analyse["namespace"] | |
415 an_namespace = analyser["namespace"] | |
416 except KeyError: | |
417 pass | |
418 else: | |
419 if namespace == an_namespace: | |
420 for key in ANALYSER_KEYS_TO_COPY: | |
421 try: | |
422 analyse[key] = analyser[key] | |
423 except KeyError: | |
424 pass | |
425 found = True | |
426 break | |
427 | |
428 else: | |
429 found = False | |
430 log.debug( | |
431 f"node {node!r} at service {service} doesn't match any known type" | |
432 ) | |
433 if found: | |
434 try: | |
435 match_cb = analyser["match_cb"] | |
436 except KeyError: | |
437 pass | |
438 else: | |
439 await utils.asDeferred(match_cb, client, analyse) | |
440 return analyse | |
441 | |
442 def _getItemsFromCache( | |
443 self, service="", node="", max_items=10, item_ids=None, sub_id=None, | |
444 extra="", profile_key=C.PROF_KEY_NONE | |
445 ): | |
446 d = defer.ensureDeferred(self._aGetItemsFromCache( | |
447 service, node, max_items, item_ids, sub_id, extra, profile_key | |
448 )) | |
449 d.addCallback(self._p.transItemsData) | |
450 d.addCallback(self._p.serialiseItems) | |
451 return d | |
452 | |
453 async def _aGetItemsFromCache( | |
454 self, service, node, max_items, item_ids, sub_id, extra, profile_key | |
455 ): | |
456 client = self.host.getClient(profile_key) | |
457 service = jid.JID(service) if service else client.jid.userhostJID() | |
458 pubsub_node = await self.host.memory.storage.getPubsubNode( | |
459 client, service, node | |
460 ) | |
461 if pubsub_node is None: | |
462 raise exceptions.NotFound( | |
463 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" | |
464 ) | |
465 max_items = None if max_items == C.NO_LIMIT else max_items | |
466 extra = self._p.parseExtra(data_format.deserialise(extra)) | |
467 items, metadata = await self.getItemsFromCache( | |
468 client, | |
469 pubsub_node, | |
470 max_items, | |
471 item_ids, | |
472 sub_id or None, | |
473 extra.rsm_request, | |
474 extra.extra, | |
475 ) | |
476 return [i.data for i in items], metadata | |
477 | |
478 async def getItemsFromCache( | |
479 self, | |
480 client: SatXMPPEntity, | |
481 node: PubsubNode, | |
482 max_items: Optional[int] = None, | |
483 item_ids: Optional[List[str]] = None, | |
484 sub_id: Optional[str] = None, | |
485 rsm_request: Optional[rsm.RSMRequest] = None, | |
486 extra: Optional[dict] = None | |
487 ) -> Tuple[List[PubsubItem], dict]: | |
488 """Get items from cache, using same arguments as for external Pubsub request""" | |
489 if "mam" in extra: | |
490 raise NotImplementedError("MAM queries are not supported yet") | |
491 if max_items is None and rsm_request is None: | |
492 max_items = 20 | |
493 if max_items is not None: | |
494 if rsm_request is not None: | |
495 raise exceptions.InternalError( | |
496 "Pubsub max items and RSM must not be used at the same time" | |
497 ) | |
498 elif item_ids is None: | |
499 raise exceptions.InternalError( | |
500 "Pubsub max items and item IDs must not be used at the same time" | |
501 ) | |
502 pubsub_items, metadata = await self.host.memory.storage.getItems( | |
503 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) | |
504 ) | |
505 else: | |
506 desc = False | |
507 if rsm_request.before == "": | |
508 before = None | |
509 desc = True | |
510 else: | |
511 before = rsm_request.before | |
512 pubsub_items, metadata = await self.host.memory.storage.getItems( | |
513 node, max_items=rsm_request.max, before=before, after=rsm_request.after, | |
514 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), | |
515 desc=desc, force_rsm=True, | |
516 ) | |
517 | |
518 return pubsub_items, metadata | |
519 | |
520 async def onItemsEvent(self, client, event): | |
521 node = await self.host.memory.storage.getPubsubNode( | |
522 client, event.sender, event.nodeIdentifier | |
523 ) | |
524 if node is None: | |
525 return | |
526 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): | |
527 items = [] | |
528 retract_ids = [] | |
529 for elt in event.items: | |
530 if elt.name == "item": | |
531 items.append(elt) | |
532 elif elt.name == "retract": | |
533 item_id = elt.getAttribute("id") | |
534 if not item_id: | |
535 log.warning( | |
536 "Ignoring invalid retract item element: " | |
537 f"{xml_tools.pFmtElt(elt)}" | |
538 ) | |
539 continue | |
540 | |
541 retract_ids.append(elt["id"]) | |
542 else: | |
543 log.warning( | |
544 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" | |
545 ) | |
546 if items: | |
547 log.debug("caching new items received from {node}") | |
548 await self.cacheItems( | |
549 client, node, items | |
550 ) | |
551 if retract_ids: | |
552 log.debug(f"deleting retracted items from {node}") | |
553 await self.host.memory.storage.deletePubsubItems( | |
554 node, items_names=retract_ids | |
555 ) | |
556 | |
557 async def onDeleteEvent(self, client, event): | |
558 log.debug( | |
559 f"deleting node {event.nodeIdentifier} from {event.sender} for " | |
560 f"{client.profile}" | |
561 ) | |
562 await self.host.memory.storage.deletePubsubNode( | |
563 [client.profile], [event.sender], [event.nodeIdentifier] | |
564 ) | |
565 | |
566 async def onPurgeEvent(self, client, event): | |
567 node = await self.host.memory.storage.getPubsubNode( | |
568 client, event.sender, event.nodeIdentifier | |
569 ) | |
570 if node is None: | |
571 return | |
572 log.debug(f"purging node {node} for {client.profile}") | |
573 await self.host.memory.storage.deletePubsubItems(node) | |
574 | |
575 async def _getItemsTrigger( | |
576 self, | |
577 client: SatXMPPEntity, | |
578 service: Optional[jid.JID], | |
579 node: str, | |
580 max_items: Optional[int], | |
581 item_ids: Optional[List[str]], | |
582 sub_id: Optional[str], | |
583 rsm_request: Optional[rsm.RSMRequest], | |
584 extra: dict | |
585 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: | |
586 if not self.use_cache: | |
587 log.debug("cache disabled in settings") | |
588 return True, None | |
589 if extra.get(C.KEY_USE_CACHE) == False: | |
590 log.debug("skipping pubsub cache as requested") | |
591 return True, None | |
592 if service is None: | |
593 service = client.jid.userhostJID() | |
594 pubsub_node = await self.host.memory.storage.getPubsubNode( | |
595 client, service, node | |
596 ) | |
597 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: | |
598 analyse = {"to_sync": True} | |
599 else: | |
600 analyse = await self.analyseNode(client, service, node) | |
601 | |
602 if pubsub_node is None: | |
603 pubsub_node = await self.host.memory.storage.setPubsubNode( | |
604 client, | |
605 service, | |
606 node, | |
607 analyser=analyse.get("name"), | |
608 type_=analyse.get("type"), | |
609 subtype=analyse.get("subtype"), | |
610 ) | |
611 | |
612 if analyse.get("to_sync"): | |
613 if pubsub_node.sync_state == SyncState.COMPLETED: | |
614 if "mam" in extra: | |
615 log.debug("MAM caching is not supported yet, skipping cache") | |
616 return True, None | |
617 pubsub_items, metadata = await self.getItemsFromCache( | |
618 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra | |
619 ) | |
620 return False, ([i.data for i in pubsub_items], metadata) | |
621 | |
622 if pubsub_node.sync_state == SyncState.IN_PROGRESS: | |
623 if (service, node) not in self.in_progress: | |
624 log.warning( | |
625 f"{pubsub_node} is reported as being cached, but not caching is " | |
626 "in progress, this is most probably due to the backend being " | |
627 "restarted. Resetting the status, caching will be done again." | |
628 ) | |
629 pubsub_node.sync_state = None | |
630 await self.host.memory.storage.deletePubsubItems(pubsub_node) | |
631 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: | |
632 log.warning( | |
633 f"{pubsub_node} is in progress for too long " | |
634 f"({pubsub_node.sync_state_updated//60} minutes), " | |
635 "cancelling it and retrying." | |
636 ) | |
637 self.in_progress.pop[(service, node)].cancel() | |
638 pubsub_node.sync_state = None | |
639 await self.host.memory.storage.deletePubsubItems(pubsub_node) | |
640 else: | |
641 log.debug( | |
642 f"{pubsub_node} synchronisation is already in progress, skipping" | |
643 ) | |
644 if pubsub_node.sync_state is None: | |
645 key = (service, node) | |
646 if key in self.in_progress: | |
647 raise exceptions.InternalError( | |
648 f"There is already a caching in progress for {pubsub_node}, this " | |
649 "should not happen" | |
650 ) | |
651 self.cacheNode(client, pubsub_node) | |
652 elif pubsub_node.sync_state == SyncState.ERROR: | |
653 log.debug( | |
654 f"{pubsub_node} synchronisation has previously failed, skipping" | |
655 ) | |
656 | |
657 return True, None | |
658 | |
659 async def _subscribeTrigger( | |
660 self, | |
661 client: SatXMPPEntity, | |
662 service: jid.JID, | |
663 nodeIdentifier: str, | |
664 sub_jid: Optional[jid.JID], | |
665 options: Optional[dict], | |
666 subscription: pubsub.Subscription | |
667 ) -> None: | |
668 pass | |
669 | |
670 async def _unsubscribeTrigger( | |
671 self, | |
672 client: SatXMPPEntity, | |
673 service: jid.JID, | |
674 nodeIdentifier: str, | |
675 sub_jid, | |
676 subscriptionIdentifier, | |
677 sender, | |
678 ) -> None: | |
679 pass | |
680 | |
681 def _synchronise(self, service, node, profile_key): | |
682 client = self.host.getClient(profile_key) | |
683 service = client.jid.userhostJID() if not service else jid.JID(service) | |
684 return defer.ensureDeferred(self.synchronise(client, service, node)) | |
685 | |
686 async def synchronise( | |
687 self, | |
688 client: SatXMPPEntity, | |
689 service: jid.JID, | |
690 node: str | |
691 ) -> None: | |
692 """Synchronise a node with a pubsub service | |
693 | |
694 If the node is already synchronised, it will be resynchronised (all items will be | |
695 deleted and re-downloaded). | |
696 | |
697 The node will be synchronised even if there is no matching analyser. | |
698 | |
699 Note that when a node is synchronised, it is automatically subscribed. | |
700 """ | |
701 pubsub_node = await self.host.memory.storage.getPubsubNode( | |
702 client, service, node | |
703 ) | |
704 if pubsub_node is None: | |
705 log.info( | |
706 _( | |
707 "Synchronising the new node {node} at {service}" | |
708 ).format(node=node, service=service.full) | |
709 ) | |
710 analyse = await self.analyseNode(client, service, node) | |
711 pubsub_node = await self.host.memory.storage.setPubsubNode( | |
712 client, | |
713 service, | |
714 node, | |
715 analyser=analyse.get("name"), | |
716 type_=analyse.get("type"), | |
717 ) | |
718 | |
719 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS | |
720 or (service, node) in self.in_progress)): | |
721 log.warning( | |
722 _( | |
723 "{node} at {service} is already being synchronised, can't do a new " | |
724 "synchronisation." | |
725 ).format(node=node, service=service) | |
726 ) | |
727 else: | |
728 log.info( | |
729 _( | |
730 "(Re)Synchronising the node {node} at {service} on user request" | |
731 ).format(node=node, service=service.full) | |
732 ) | |
733 # we first delete and recreate the node (will also delete its items) | |
734 await self.host.memory.storage.delete(pubsub_node) | |
735 analyse = await self.analyseNode(client, service, node) | |
736 pubsub_node = await self.host.memory.storage.setPubsubNode( | |
737 client, | |
738 service, | |
739 node, | |
740 analyser=analyse.get("name"), | |
741 type_=analyse.get("type"), | |
742 ) | |
743 # then we can put node in cache | |
744 await self.cacheNode(client, pubsub_node) | |
745 | |
746 async def purge(self, purge_filters: dict) -> None: | |
747 """Remove items according to filters | |
748 | |
749 filters can have on of the following keys, all are optional: | |
750 | |
751 :services: | |
752 list of JIDs of services from which items must be deleted | |
753 :nodes: | |
754 list of node names to delete | |
755 :types: | |
756 list of node types to delete | |
757 :subtypes: | |
758 list of node subtypes to delete | |
759 :profiles: | |
760 list of profiles from which items must be deleted | |
761 :created_before: | |
762 datetime before which items must have been created to be deleted | |
763 :created_update: | |
764 datetime before which items must have been updated last to be deleted | |
765 """ | |
766 purge_filters["names"] = purge_filters.pop("nodes", None) | |
767 await self.host.memory.storage.purgePubsubItems(**purge_filters) | |
768 | |
769 def _purge(self, purge_filters: str) -> None: | |
770 purge_filters = data_format.deserialise(purge_filters) | |
771 for key in "created_before", "updated_before": | |
772 try: | |
773 purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) | |
774 except (KeyError, TypeError): | |
775 pass | |
776 return defer.ensureDeferred(self.purge(purge_filters)) | |
777 | |
778 async def reset(self) -> None: | |
779 """Remove ALL nodes and items from cache | |
780 | |
781 After calling this method, cache will be refilled progressively as if it where new | |
782 """ | |
783 await self.host.memory.storage.deletePubsubNode(None, None, None) | |
784 | |
785 def _reset(self) -> defer.Deferred: | |
786 return defer.ensureDeferred(self.reset()) | |
787 | |
788 async def search(self, query: dict) -> List[PubsubItem]: | |
789 """Search pubsub items in cache""" | |
790 return await self.host.memory.storage.searchPubsubItems(query) | |
791 | |
792 async def serialisableSearch(self, query: dict) -> List[dict]: | |
793 """Search pubsub items in cache and returns parsed data | |
794 | |
795 The returned data can be serialised. | |
796 | |
797 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) | |
798 """ | |
799 items = await self.search(query) | |
800 ret = [] | |
801 for item in items: | |
802 parsed = item.parsed | |
803 parsed["pubsub_service"] = item.node.service.full() | |
804 parsed["pubsub_node"] = item.node.name | |
805 if query.get("with_payload"): | |
806 parsed["item_payload"] = item.data.toXml() | |
807 parsed["node_profile"] = self.host.memory.storage.getProfileById( | |
808 item.node.profile_id | |
809 ) | |
810 | |
811 ret.append(parsed) | |
812 return ret | |
813 | |
814 def _search(self, query: str) -> defer.Deferred: | |
815 query = data_format.deserialise(query) | |
816 services = query.get("services") | |
817 if services: | |
818 query["services"] = [jid.JID(s) for s in services] | |
819 d = defer.ensureDeferred(self.serialisableSearch(query)) | |
820 d.addCallback(data_format.serialise) | |
821 return d |