Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_pubsub_cache.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_pubsub_cache.py@524856bd7b19 |
children | 2b000790b197 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for PubSub Caching | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import time | |
20 from datetime import datetime | |
21 from typing import Optional, List, Tuple, Dict, Any | |
22 from twisted.words.protocols.jabber import jid, error | |
23 from twisted.words.xish import domish | |
24 from twisted.internet import defer | |
25 from wokkel import pubsub, rsm | |
26 from libervia.backend.core.i18n import _ | |
27 from libervia.backend.core.constants import Const as C | |
28 from libervia.backend.core import exceptions | |
29 from libervia.backend.core.log import getLogger | |
30 from libervia.backend.core.core_types import SatXMPPEntity | |
31 from libervia.backend.tools import xml_tools, utils | |
32 from libervia.backend.tools.common import data_format | |
33 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError | |
34 | |
35 | |
36 log = getLogger(__name__) | |
37 | |
38 PLUGIN_INFO = { | |
39 C.PI_NAME: "PubSub Cache", | |
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE", | |
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB, | |
42 C.PI_MODES: C.PLUG_MODE_BOTH, | |
43 C.PI_PROTOCOLS: [], | |
44 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], | |
45 C.PI_RECOMMENDATIONS: [], | |
46 C.PI_MAIN: "PubsubCache", | |
47 C.PI_HANDLER: "no", | |
48 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), | |
49 } | |
50 | |
51 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") | |
52 # maximum of items to cache | |
53 CACHE_LIMIT = 5000 | |
54 # number of second before a progress caching is considered failed and tried again | |
55 PROGRESS_DEADLINE = 60 * 60 * 6 | |
56 | |
57 | |
58 | |
59 class PubsubCache: | |
60 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, | |
61 # but it would be necessary to have this data if some devices unsubscribe a cached | |
62 # node, as we can then get out of sync. A protoXEP could be proposed to fix this | |
63 # situation. | |
64 # TODO: handle configuration events | |
65 | |
66 def __init__(self, host): | |
67 log.info(_("PubSub Cache initialization")) | |
68 strategy = host.memory.config_get(None, "pubsub_cache_strategy") | |
69 if strategy == "no_cache": | |
70 log.info( | |
71 _( | |
72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " | |
73 "setting." | |
74 ).format(value=repr(strategy)) | |
75 ) | |
76 self.use_cache = False | |
77 else: | |
78 self.use_cache = True | |
79 self.host = host | |
80 self._p = host.plugins["XEP-0060"] | |
81 self.analysers = {} | |
82 # map for caching in progress (node, service) => Deferred | |
83 self.in_progress = {} | |
84 self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger) | |
85 self._p.add_managed_node( | |
86 "", | |
87 items_cb=self.on_items_event, | |
88 delete_cb=self.on_delete_event, | |
89 purge_db=self.on_purge_event, | |
90 ) | |
91 host.bridge.add_method( | |
92 "ps_cache_get", | |
93 ".plugin", | |
94 in_sign="ssiassss", | |
95 out_sign="s", | |
96 method=self._get_items_from_cache, | |
97 async_=True, | |
98 ) | |
99 host.bridge.add_method( | |
100 "ps_cache_sync", | |
101 ".plugin", | |
102 "sss", | |
103 out_sign="", | |
104 method=self._synchronise, | |
105 async_=True, | |
106 ) | |
107 host.bridge.add_method( | |
108 "ps_cache_purge", | |
109 ".plugin", | |
110 "s", | |
111 out_sign="", | |
112 method=self._purge, | |
113 async_=True, | |
114 ) | |
115 host.bridge.add_method( | |
116 "ps_cache_reset", | |
117 ".plugin", | |
118 "", | |
119 out_sign="", | |
120 method=self._reset, | |
121 async_=True, | |
122 ) | |
123 host.bridge.add_method( | |
124 "ps_cache_search", | |
125 ".plugin", | |
126 "s", | |
127 out_sign="s", | |
128 method=self._search, | |
129 async_=True, | |
130 ) | |
131 | |
132 def register_analyser(self, analyser: dict) -> None: | |
133 """Register a new pubsub node analyser | |
134 | |
135 @param analyser: An analyser is a dictionary which may have the following keys | |
136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys | |
137 must be used): | |
138 | |
139 :name (str)*: | |
140 a unique name for this analyser. This name will be stored in database | |
141 to retrieve the analyser when necessary (notably to get the parsing method), | |
142 thus it is recommended to use a stable name such as the source plugin name | |
143 instead of a name which may change with standard evolution, such as the | |
144 feature namespace. | |
145 | |
146 :type (str)*: | |
147 indicates what kind of items we are dealing with. Type must be a human | |
148 readable word, as it may be used in searches. Good types examples are | |
149 **blog** or **event**. | |
150 | |
151 :node (str): | |
152 prefix of a node name which may be used to identify its type. Example: | |
153 *urn:xmpp:microblog:0* (a node starting with this name will be identified as | |
154 *blog* node). | |
155 | |
156 :namespace (str): | |
157 root namespace of items. When analysing a node, the first item will be | |
158 retrieved. The analyser will be chosen its given namespace match the | |
159 namespace of the first child element of ``<item>`` element. | |
160 | |
161 :to_sync (bool): | |
162 if True, the node must be synchronised in cache. The default False value | |
163 means that the pubsub service will always be requested. | |
164 | |
165 :parser (callable): | |
166 method (which may be sync, a coroutine or a method returning a "Deferred") | |
167 to call to parse the ``domish.Element`` of the item. The result must be | |
168 dictionary which can be serialised to JSON. | |
169 | |
170 The method must have the following signature: | |
171 | |
172 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ | |
173 service: Optional[jid.JID], node: Optional[str]) \ | |
174 -> dict | |
175 :noindex: | |
176 | |
177 :match_cb (callable): | |
178 method (which may be sync, a coroutine or a method returning a "Deferred") | |
179 called when the analyser matches. The method is called with the curreny | |
180 analyse which is can modify **in-place**. | |
181 | |
182 The method must have the following signature: | |
183 | |
184 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None | |
185 :noindex: | |
186 | |
187 @raise exceptions.Conflict: a analyser with this name already exists | |
188 """ | |
189 | |
190 name = analyser.get("name", "").strip().lower() | |
191 # we want the normalised name | |
192 analyser["name"] = name | |
193 if not name: | |
194 raise ValueError('"name" is mandatory in analyser') | |
195 if "type" not in analyser: | |
196 raise ValueError('"type" is mandatory in analyser') | |
197 type_test_keys = {"node", "namespace"} | |
198 if not type_test_keys.intersection(analyser): | |
199 raise ValueError(f'at least one of {type_test_keys} must be used') | |
200 if name in self.analysers: | |
201 raise exceptions.Conflict( | |
202 f"An analyser with the name {name!r} is already registered" | |
203 ) | |
204 self.analysers[name] = analyser | |
205 | |
206 async def cache_items( | |
207 self, | |
208 client: SatXMPPEntity, | |
209 pubsub_node: PubsubNode, | |
210 items: List[domish.Element] | |
211 ) -> None: | |
212 try: | |
213 parser = self.analysers[pubsub_node.analyser].get("parser") | |
214 except KeyError: | |
215 parser = None | |
216 | |
217 if parser is not None: | |
218 parsed_items = [ | |
219 await utils.as_deferred( | |
220 parser, | |
221 client, | |
222 item, | |
223 pubsub_node.service, | |
224 pubsub_node.name | |
225 ) | |
226 for item in items | |
227 ] | |
228 else: | |
229 parsed_items = None | |
230 | |
231 await self.host.memory.storage.cache_pubsub_items( | |
232 client, pubsub_node, items, parsed_items | |
233 ) | |
234 | |
235 async def _cache_node( | |
236 self, | |
237 client: SatXMPPEntity, | |
238 pubsub_node: PubsubNode | |
239 ) -> None: | |
240 await self.host.memory.storage.update_pubsub_node_sync_state( | |
241 pubsub_node, SyncState.IN_PROGRESS | |
242 ) | |
243 service, node = pubsub_node.service, pubsub_node.name | |
244 try: | |
245 log.debug( | |
246 f"Caching node {node!r} at {service} for {client.profile}" | |
247 ) | |
248 if not pubsub_node.subscribed: | |
249 try: | |
250 sub = await self._p.subscribe(client, service, node) | |
251 except Exception as e: | |
252 log.warning( | |
253 _( | |
254 "Can't subscribe node {pubsub_node}, that means that " | |
255 "synchronisation can't be maintained: {reason}" | |
256 ).format(pubsub_node=pubsub_node, reason=e) | |
257 ) | |
258 else: | |
259 if sub.state == "subscribed": | |
260 sub_id = sub.subscriptionIdentifier | |
261 log.debug( | |
262 f"{pubsub_node} subscribed (subscription id: {sub_id!r})" | |
263 ) | |
264 pubsub_node.subscribed = True | |
265 await self.host.memory.storage.add(pubsub_node) | |
266 else: | |
267 log.warning( | |
268 _( | |
269 "{pubsub_node} is not subscribed, that means that " | |
270 "synchronisation can't be maintained, and you may have " | |
271 "to enforce subscription manually. Subscription state: " | |
272 "{state}" | |
273 ).format(pubsub_node=pubsub_node, state=sub.state) | |
274 ) | |
275 | |
276 try: | |
277 await self.host.check_features( | |
278 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service | |
279 ) | |
280 except error.StanzaError as e: | |
281 if e.condition == "service-unavailable": | |
282 log.warning( | |
283 "service {service} is hidding disco infos, we'll only cache " | |
284 "latest 20 items" | |
285 ) | |
286 items, __ = await client.pubsub_client.items( | |
287 pubsub_node.service, pubsub_node.name, maxItems=20 | |
288 ) | |
289 await self.cache_items( | |
290 client, pubsub_node, items | |
291 ) | |
292 else: | |
293 raise e | |
294 except exceptions.FeatureNotFound: | |
295 log.warning( | |
296 f"service {service} doesn't handle Result Set Management " | |
297 "(XEP-0059), we'll only cache latest 20 items" | |
298 ) | |
299 items, __ = await client.pubsub_client.items( | |
300 pubsub_node.service, pubsub_node.name, maxItems=20 | |
301 ) | |
302 await self.cache_items( | |
303 client, pubsub_node, items | |
304 ) | |
305 else: | |
306 rsm_p = self.host.plugins["XEP-0059"] | |
307 rsm_request = rsm.RSMRequest() | |
308 cached_ids = set() | |
309 while True: | |
310 items, rsm_response = await client.pubsub_client.items( | |
311 service, node, rsm_request=rsm_request | |
312 ) | |
313 await self.cache_items( | |
314 client, pubsub_node, items | |
315 ) | |
316 for item in items: | |
317 item_id = item["id"] | |
318 if item_id in cached_ids: | |
319 log.warning( | |
320 f"Pubsub node {node!r} at {service} is returning several " | |
321 f"times the same item ({item_id!r}). This is illegal " | |
322 "behaviour, and it means that Pubsub service " | |
323 f"{service} is buggy and can't be cached properly. " | |
324 f"Please report this to {service.host} administrators" | |
325 ) | |
326 rsm_request = None | |
327 break | |
328 cached_ids.add(item["id"]) | |
329 if len(cached_ids) >= CACHE_LIMIT: | |
330 log.warning( | |
331 f"Pubsub node {node!r} at {service} contains more items " | |
332 f"than the cache limit ({CACHE_LIMIT}). We stop " | |
333 "caching here, at item {item['id']!r}." | |
334 ) | |
335 rsm_request = None | |
336 break | |
337 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) | |
338 if rsm_request is None: | |
339 break | |
340 | |
341 await self.host.memory.storage.update_pubsub_node_sync_state( | |
342 pubsub_node, SyncState.COMPLETED | |
343 ) | |
344 except Exception as e: | |
345 import traceback | |
346 tb = traceback.format_tb(e.__traceback__) | |
347 log.error( | |
348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" | |
349 ) | |
350 await self.host.memory.storage.update_pubsub_node_sync_state( | |
351 pubsub_node, SyncState.ERROR | |
352 ) | |
353 await self.host.memory.storage.delete_pubsub_items(pubsub_node) | |
354 raise e | |
355 | |
356 def _cache_node_clean(self, __, pubsub_node): | |
357 del self.in_progress[(pubsub_node.service, pubsub_node.name)] | |
358 | |
359 def cache_node( | |
360 self, | |
361 client: SatXMPPEntity, | |
362 pubsub_node: PubsubNode | |
363 ) -> None: | |
364 """Launch node caching as a background task""" | |
365 d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) | |
366 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) | |
367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d | |
368 return d | |
369 | |
370 async def analyse_node( | |
371 self, | |
372 client: SatXMPPEntity, | |
373 service: jid.JID, | |
374 node: str, | |
375 pubsub_node : PubsubNode = None, | |
376 ) -> dict: | |
377 """Use registered analysers on a node to determine what it is used for""" | |
378 analyse = {"service": service, "node": node} | |
379 if pubsub_node is None: | |
380 try: | |
381 first_item = (await client.pubsub_client.items( | |
382 service, node, 1 | |
383 ))[0][0] | |
384 except IndexError: | |
385 pass | |
386 except error.StanzaError as e: | |
387 if e.condition == "item-not-found": | |
388 pass | |
389 else: | |
390 log.warning( | |
391 f"Can't retrieve last item on node {node!r} at service " | |
392 f"{service} for {client.profile}: {e}" | |
393 ) | |
394 else: | |
395 try: | |
396 uri = first_item.firstChildElement().uri | |
397 except Exception as e: | |
398 log.warning( | |
399 f"Can't retrieve item namespace on node {node!r} at service " | |
400 f"{service} for {client.profile}: {e}" | |
401 ) | |
402 else: | |
403 analyse["namespace"] = uri | |
404 try: | |
405 conf = await self._p.getConfiguration(client, service, node) | |
406 except Exception as e: | |
407 log.warning( | |
408 f"Can't retrieve configuration for node {node!r} at service {service} " | |
409 f"for {client.profile}: {e}" | |
410 ) | |
411 else: | |
412 analyse["conf"] = conf | |
413 | |
414 for analyser in self.analysers.values(): | |
415 try: | |
416 an_node = analyser["node"] | |
417 except KeyError: | |
418 pass | |
419 else: | |
420 if node.startswith(an_node): | |
421 for key in ANALYSER_KEYS_TO_COPY: | |
422 try: | |
423 analyse[key] = analyser[key] | |
424 except KeyError: | |
425 pass | |
426 found = True | |
427 break | |
428 try: | |
429 namespace = analyse["namespace"] | |
430 an_namespace = analyser["namespace"] | |
431 except KeyError: | |
432 pass | |
433 else: | |
434 if namespace == an_namespace: | |
435 for key in ANALYSER_KEYS_TO_COPY: | |
436 try: | |
437 analyse[key] = analyser[key] | |
438 except KeyError: | |
439 pass | |
440 found = True | |
441 break | |
442 | |
443 else: | |
444 found = False | |
445 log.debug( | |
446 f"node {node!r} at service {service} doesn't match any known type" | |
447 ) | |
448 if found: | |
449 try: | |
450 match_cb = analyser["match_cb"] | |
451 except KeyError: | |
452 pass | |
453 else: | |
454 await utils.as_deferred(match_cb, client, analyse) | |
455 return analyse | |
456 | |
457 def _get_items_from_cache( | |
458 self, service="", node="", max_items=10, item_ids=None, sub_id=None, | |
459 extra="", profile_key=C.PROF_KEY_NONE | |
460 ): | |
461 d = defer.ensureDeferred(self._a_get_items_from_cache( | |
462 service, node, max_items, item_ids, sub_id, extra, profile_key | |
463 )) | |
464 d.addCallback(self._p.trans_items_data) | |
465 d.addCallback(self._p.serialise_items) | |
466 return d | |
467 | |
468 async def _a_get_items_from_cache( | |
469 self, service, node, max_items, item_ids, sub_id, extra, profile_key | |
470 ): | |
471 client = self.host.get_client(profile_key) | |
472 service = jid.JID(service) if service else client.jid.userhostJID() | |
473 pubsub_node = await self.host.memory.storage.get_pubsub_node( | |
474 client, service, node | |
475 ) | |
476 if pubsub_node is None: | |
477 raise exceptions.NotFound( | |
478 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" | |
479 ) | |
480 max_items = None if max_items == C.NO_LIMIT else max_items | |
481 extra = self._p.parse_extra(data_format.deserialise(extra)) | |
482 items, metadata = await self.get_items_from_cache( | |
483 client, | |
484 pubsub_node, | |
485 max_items, | |
486 item_ids, | |
487 sub_id or None, | |
488 extra.rsm_request, | |
489 extra.extra, | |
490 ) | |
491 return [i.data for i in items], metadata | |
492 | |
493 async def get_items_from_cache( | |
494 self, | |
495 client: SatXMPPEntity, | |
496 node: PubsubNode, | |
497 max_items: Optional[int] = None, | |
498 item_ids: Optional[List[str]] = None, | |
499 sub_id: Optional[str] = None, | |
500 rsm_request: Optional[rsm.RSMRequest] = None, | |
501 extra: Optional[Dict[str, Any]] = None | |
502 ) -> Tuple[List[PubsubItem], dict]: | |
503 """Get items from cache, using same arguments as for external Pubsub request""" | |
504 if extra is None: | |
505 extra = {} | |
506 if "mam" in extra: | |
507 raise NotImplementedError("MAM queries are not supported yet") | |
508 if max_items is None and rsm_request is None: | |
509 max_items = 20 | |
510 pubsub_items, metadata = await self.host.memory.storage.get_items( | |
511 node, max_items=max_items, item_ids=item_ids or None, | |
512 order_by=extra.get(C.KEY_ORDER_BY) | |
513 ) | |
514 elif max_items is not None: | |
515 if rsm_request is not None: | |
516 raise exceptions.InternalError( | |
517 "Pubsub max items and RSM must not be used at the same time" | |
518 ) | |
519 elif item_ids: | |
520 raise exceptions.InternalError( | |
521 "Pubsub max items and item IDs must not be used at the same time" | |
522 ) | |
523 pubsub_items, metadata = await self.host.memory.storage.get_items( | |
524 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) | |
525 ) | |
526 else: | |
527 desc = False | |
528 if rsm_request.before == "": | |
529 before = None | |
530 desc = True | |
531 else: | |
532 before = rsm_request.before | |
533 pubsub_items, metadata = await self.host.memory.storage.get_items( | |
534 node, max_items=rsm_request.max, before=before, after=rsm_request.after, | |
535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), | |
536 desc=desc, force_rsm=True, | |
537 ) | |
538 | |
539 return pubsub_items, metadata | |
540 | |
541 async def on_items_event(self, client, event): | |
542 node = await self.host.memory.storage.get_pubsub_node( | |
543 client, event.sender, event.nodeIdentifier | |
544 ) | |
545 if node is None: | |
546 return | |
547 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): | |
548 items = [] | |
549 retract_ids = [] | |
550 for elt in event.items: | |
551 if elt.name == "item": | |
552 items.append(elt) | |
553 elif elt.name == "retract": | |
554 item_id = elt.getAttribute("id") | |
555 if not item_id: | |
556 log.warning( | |
557 "Ignoring invalid retract item element: " | |
558 f"{xml_tools.p_fmt_elt(elt)}" | |
559 ) | |
560 continue | |
561 | |
562 retract_ids.append(elt["id"]) | |
563 else: | |
564 log.warning( | |
565 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" | |
566 ) | |
567 if items: | |
568 log.debug(f"[{client.profile}] caching new items received from {node}") | |
569 await self.cache_items( | |
570 client, node, items | |
571 ) | |
572 if retract_ids: | |
573 log.debug(f"deleting retracted items from {node}") | |
574 await self.host.memory.storage.delete_pubsub_items( | |
575 node, items_names=retract_ids | |
576 ) | |
577 | |
578 async def on_delete_event(self, client, event): | |
579 log.debug( | |
580 f"deleting node {event.nodeIdentifier} from {event.sender} for " | |
581 f"{client.profile}" | |
582 ) | |
583 await self.host.memory.storage.delete_pubsub_node( | |
584 [client.profile], [event.sender], [event.nodeIdentifier] | |
585 ) | |
586 | |
587 async def on_purge_event(self, client, event): | |
588 node = await self.host.memory.storage.get_pubsub_node( | |
589 client, event.sender, event.nodeIdentifier | |
590 ) | |
591 if node is None: | |
592 return | |
593 log.debug(f"purging node {node} for {client.profile}") | |
594 await self.host.memory.storage.delete_pubsub_items(node) | |
595 | |
596 async def _get_items_trigger( | |
597 self, | |
598 client: SatXMPPEntity, | |
599 service: Optional[jid.JID], | |
600 node: str, | |
601 max_items: Optional[int], | |
602 item_ids: Optional[List[str]], | |
603 sub_id: Optional[str], | |
604 rsm_request: Optional[rsm.RSMRequest], | |
605 extra: dict | |
606 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: | |
607 if not self.use_cache: | |
608 log.debug("cache disabled in settings") | |
609 return True, None | |
610 if extra.get(C.KEY_USE_CACHE) == False: | |
611 log.debug("skipping pubsub cache as requested") | |
612 return True, None | |
613 if service is None: | |
614 service = client.jid.userhostJID() | |
615 for __ in range(5): | |
616 pubsub_node = await self.host.memory.storage.get_pubsub_node( | |
617 client, service, node | |
618 ) | |
619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: | |
620 analyse = {"to_sync": True} | |
621 else: | |
622 analyse = await self.analyse_node(client, service, node) | |
623 | |
624 if pubsub_node is None: | |
625 try: | |
626 pubsub_node = await self.host.memory.storage.set_pubsub_node( | |
627 client, | |
628 service, | |
629 node, | |
630 analyser=analyse.get("name"), | |
631 type_=analyse.get("type"), | |
632 subtype=analyse.get("subtype"), | |
633 ) | |
634 except IntegrityError as e: | |
635 if "unique" in str(e.orig).lower(): | |
636 log.debug( | |
637 "race condition on pubsub node creation in cache, trying " | |
638 "again" | |
639 ) | |
640 else: | |
641 raise e | |
642 break | |
643 else: | |
644 raise exceptions.InternalError( | |
645 "Too many IntegrityError with UNIQUE constraint, something is going wrong" | |
646 ) | |
647 | |
648 if analyse.get("to_sync"): | |
649 if pubsub_node.sync_state == SyncState.COMPLETED: | |
650 if "mam" in extra: | |
651 log.debug("MAM caching is not supported yet, skipping cache") | |
652 return True, None | |
653 pubsub_items, metadata = await self.get_items_from_cache( | |
654 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra | |
655 ) | |
656 return False, ([i.data for i in pubsub_items], metadata) | |
657 | |
658 if pubsub_node.sync_state == SyncState.IN_PROGRESS: | |
659 if (service, node) not in self.in_progress: | |
660 log.warning( | |
661 f"{pubsub_node} is reported as being cached, but not caching is " | |
662 "in progress, this is most probably due to the backend being " | |
663 "restarted. Resetting the status, caching will be done again." | |
664 ) | |
665 pubsub_node.sync_state = None | |
666 await self.host.memory.storage.delete_pubsub_items(pubsub_node) | |
667 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: | |
668 log.warning( | |
669 f"{pubsub_node} is in progress for too long " | |
670 f"({pubsub_node.sync_state_updated//60} minutes), " | |
671 "cancelling it and retrying." | |
672 ) | |
673 self.in_progress.pop[(service, node)].cancel() | |
674 pubsub_node.sync_state = None | |
675 await self.host.memory.storage.delete_pubsub_items(pubsub_node) | |
676 else: | |
677 log.debug( | |
678 f"{pubsub_node} synchronisation is already in progress, skipping" | |
679 ) | |
680 if pubsub_node.sync_state is None: | |
681 key = (service, node) | |
682 if key in self.in_progress: | |
683 raise exceptions.InternalError( | |
684 f"There is already a caching in progress for {pubsub_node}, this " | |
685 "should not happen" | |
686 ) | |
687 self.cache_node(client, pubsub_node) | |
688 elif pubsub_node.sync_state == SyncState.ERROR: | |
689 log.debug( | |
690 f"{pubsub_node} synchronisation has previously failed, skipping" | |
691 ) | |
692 | |
693 return True, None | |
694 | |
695 async def _subscribe_trigger( | |
696 self, | |
697 client: SatXMPPEntity, | |
698 service: jid.JID, | |
699 nodeIdentifier: str, | |
700 sub_jid: Optional[jid.JID], | |
701 options: Optional[dict], | |
702 subscription: pubsub.Subscription | |
703 ) -> None: | |
704 pass | |
705 | |
706 async def _unsubscribe_trigger( | |
707 self, | |
708 client: SatXMPPEntity, | |
709 service: jid.JID, | |
710 nodeIdentifier: str, | |
711 sub_jid, | |
712 subscriptionIdentifier, | |
713 sender, | |
714 ) -> None: | |
715 pass | |
716 | |
717 def _synchronise(self, service, node, profile_key): | |
718 client = self.host.get_client(profile_key) | |
719 service = client.jid.userhostJID() if not service else jid.JID(service) | |
720 return defer.ensureDeferred(self.synchronise(client, service, node)) | |
721 | |
722 async def synchronise( | |
723 self, | |
724 client: SatXMPPEntity, | |
725 service: jid.JID, | |
726 node: str, | |
727 resync: bool = True | |
728 ) -> None: | |
729 """Synchronise a node with a pubsub service | |
730 | |
731 The node will be synchronised even if there is no matching analyser. | |
732 | |
733 Note that when a node is synchronised, it is automatically subscribed. | |
734 @param resync: if True and the node is already synchronised, it will be | |
735 resynchronised (all items will be deleted and re-downloaded). | |
736 | |
737 """ | |
738 pubsub_node = await self.host.memory.storage.get_pubsub_node( | |
739 client, service, node | |
740 ) | |
741 if pubsub_node is None: | |
742 log.info( | |
743 _( | |
744 "Synchronising the new node {node} at {service}" | |
745 ).format(node=node, service=service.full) | |
746 ) | |
747 analyse = await self.analyse_node(client, service, node) | |
748 pubsub_node = await self.host.memory.storage.set_pubsub_node( | |
749 client, | |
750 service, | |
751 node, | |
752 analyser=analyse.get("name"), | |
753 type_=analyse.get("type"), | |
754 ) | |
755 elif not resync and pubsub_node.sync_state is not None: | |
756 # the node exists, nothing to do | |
757 return | |
758 | |
759 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS | |
760 or (service, node) in self.in_progress)): | |
761 log.warning( | |
762 _( | |
763 "{node} at {service} is already being synchronised, can't do a new " | |
764 "synchronisation." | |
765 ).format(node=node, service=service) | |
766 ) | |
767 else: | |
768 log.info( | |
769 _( | |
770 "(Re)Synchronising the node {node} at {service} on user request" | |
771 ).format(node=node, service=service.full()) | |
772 ) | |
773 # we first delete and recreate the node (will also delete its items) | |
774 await self.host.memory.storage.delete(pubsub_node) | |
775 analyse = await self.analyse_node(client, service, node) | |
776 pubsub_node = await self.host.memory.storage.set_pubsub_node( | |
777 client, | |
778 service, | |
779 node, | |
780 analyser=analyse.get("name"), | |
781 type_=analyse.get("type"), | |
782 ) | |
783 # then we can put node in cache | |
784 await self.cache_node(client, pubsub_node) | |
785 | |
786 async def purge(self, purge_filters: dict) -> None: | |
787 """Remove items according to filters | |
788 | |
789 filters can have on of the following keys, all are optional: | |
790 | |
791 :services: | |
792 list of JIDs of services from which items must be deleted | |
793 :nodes: | |
794 list of node names to delete | |
795 :types: | |
796 list of node types to delete | |
797 :subtypes: | |
798 list of node subtypes to delete | |
799 :profiles: | |
800 list of profiles from which items must be deleted | |
801 :created_before: | |
802 datetime before which items must have been created to be deleted | |
803 :created_update: | |
804 datetime before which items must have been updated last to be deleted | |
805 """ | |
806 purge_filters["names"] = purge_filters.pop("nodes", None) | |
807 await self.host.memory.storage.purge_pubsub_items(**purge_filters) | |
808 | |
809 def _purge(self, purge_filters: str) -> None: | |
810 purge_filters = data_format.deserialise(purge_filters) | |
811 for key in "created_before", "updated_before": | |
812 try: | |
813 purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) | |
814 except (KeyError, TypeError): | |
815 pass | |
816 return defer.ensureDeferred(self.purge(purge_filters)) | |
817 | |
818 async def reset(self) -> None: | |
819 """Remove ALL nodes and items from cache | |
820 | |
821 After calling this method, cache will be refilled progressively as if it where new | |
822 """ | |
823 await self.host.memory.storage.delete_pubsub_node(None, None, None) | |
824 | |
825 def _reset(self) -> defer.Deferred: | |
826 return defer.ensureDeferred(self.reset()) | |
827 | |
828 async def search(self, query: dict) -> List[PubsubItem]: | |
829 """Search pubsub items in cache""" | |
830 return await self.host.memory.storage.search_pubsub_items(query) | |
831 | |
832 async def serialisable_search(self, query: dict) -> List[dict]: | |
833 """Search pubsub items in cache and returns parsed data | |
834 | |
835 The returned data can be serialised. | |
836 | |
837 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) | |
838 """ | |
839 items = await self.search(query) | |
840 ret = [] | |
841 for item in items: | |
842 parsed = item.parsed | |
843 parsed["pubsub_service"] = item.node.service.full() | |
844 parsed["pubsub_node"] = item.node.name | |
845 if query.get("with_payload"): | |
846 parsed["item_payload"] = item.data.toXml() | |
847 parsed["node_profile"] = self.host.memory.storage.get_profile_by_id( | |
848 item.node.profile_id | |
849 ) | |
850 | |
851 ret.append(parsed) | |
852 return ret | |
853 | |
854 def _search(self, query: str) -> defer.Deferred: | |
855 query = data_format.deserialise(query) | |
856 services = query.get("services") | |
857 if services: | |
858 query["services"] = [jid.JID(s) for s in services] | |
859 d = defer.ensureDeferred(self.serialisable_search(query)) | |
860 d.addCallback(data_format.serialise) | |
861 return d |