Mercurial > libervia-backend
annotate libervia/backend/plugins/plugin_pubsub_cache.py @ 4351:6a0a081485b8
plugin autocrypt: Autocrypt protocol implementation:
Implementation of autocrypt: `autocrypt` header is checked, and if present and no public
key is known for the peer, the key is imported.
`autocrypt` header is also added to outgoing message (only if an email gateway is
detected).
For the moment, the JID is use as identifier, but the real email used by gateway should be
used in the future.
rel 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | 0d7bb4df2343 |
children |
rev | line source |
---|---|
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
1 #!/usr/bin/env python3 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
2 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
3 # Libervia plugin for PubSub Caching |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
5 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
6 # This program is free software: you can redistribute it and/or modify |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
7 # it under the terms of the GNU Affero General Public License as published by |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
8 # the Free Software Foundation, either version 3 of the License, or |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
9 # (at your option) any later version. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
10 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
11 # This program is distributed in the hope that it will be useful, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
14 # GNU Affero General Public License for more details. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
15 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
16 # You should have received a copy of the GNU Affero General Public License |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
18 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
19 import time |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
20 from datetime import datetime |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
21 from typing import Optional, List, Tuple, Dict, Any |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
22 from twisted.words.protocols.jabber import jid, error |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
23 from twisted.words.xish import domish |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
24 from twisted.internet import defer |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
25 from wokkel import pubsub, rsm |
4071
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
26 from libervia.backend.core.i18n import _ |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
27 from libervia.backend.core.constants import Const as C |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
28 from libervia.backend.core import exceptions |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
29 from libervia.backend.core.log import getLogger |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
30 from libervia.backend.core.core_types import SatXMPPEntity |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
31 from libervia.backend.tools import xml_tools, utils |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
32 from libervia.backend.tools.common import data_format |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
33 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
34 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
35 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
36 log = getLogger(__name__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
37 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
38 PLUGIN_INFO = { |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
39 C.PI_NAME: "PubSub Cache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB, |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
42 C.PI_MODES: C.PLUG_MODE_BOTH, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
43 C.PI_PROTOCOLS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
44 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
45 C.PI_RECOMMENDATIONS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
46 C.PI_MAIN: "PubsubCache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
47 C.PI_HANDLER: "no", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
48 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
49 } |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
50 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
51 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
52 # maximum of items to cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
53 CACHE_LIMIT = 5000 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
54 # number of second before a progress caching is considered failed and tried again |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
55 PROGRESS_DEADLINE = 60 * 60 * 6 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
56 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
57 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
58 class PubsubCache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
60 # but it would be necessary to have this data if some devices unsubscribe a cached |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
62 # situation. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
63 # TODO: handle configuration events |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
64 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
65 def __init__(self, host): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
66 log.info(_("PubSub Cache initialization")) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
67 strategy = host.memory.config_get(None, "pubsub_cache_strategy") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
68 if strategy == "no_cache": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
69 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
70 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
71 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
72 "setting." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
73 ).format(value=repr(strategy)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
74 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
75 self.use_cache = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
76 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
77 self.use_cache = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
78 self.host = host |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
79 self._p = host.plugins["XEP-0060"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
80 self.analysers = {} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
81 # map for caching in progress (node, service) => Deferred |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
82 self.in_progress = {} |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
83 self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
84 self._p.add_managed_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
85 "", |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
86 items_cb=self.on_items_event, |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
87 delete_cb=self.on_delete_event, |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
88 purge_db=self.on_purge_event, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
89 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
90 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
91 "ps_cache_get", |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
92 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
93 in_sign="ssiassss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
94 out_sign="s", |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
95 method=self._get_items_from_cache, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
96 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
97 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
98 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
99 "ps_cache_sync", |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
100 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
101 "sss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
102 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
103 method=self._synchronise, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
104 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
105 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
106 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
107 "ps_cache_purge", |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
108 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
109 "s", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
110 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
111 method=self._purge, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
112 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
113 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
114 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
115 "ps_cache_reset", |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
116 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
117 "", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
118 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
119 method=self._reset, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
120 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
121 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
122 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
123 "ps_cache_search", |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
124 ".plugin", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
125 "s", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
126 out_sign="s", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
127 method=self._search, |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
128 async_=True, |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
129 ) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
130 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
131 def register_analyser(self, analyser: dict) -> None: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
132 """Register a new pubsub node analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
133 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
134 @param analyser: An analyser is a dictionary which may have the following keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
135 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
136 must be used): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
137 |
4249 | 138 name (str)* |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
139 a unique name for this analyser. This name will be stored in database |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
140 to retrieve the analyser when necessary (notably to get the parsing method), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
141 thus it is recommended to use a stable name such as the source plugin name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
142 instead of a name which may change with standard evolution, such as the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
143 feature namespace. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
144 |
4249 | 145 type (str)* |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
146 indicates what kind of items we are dealing with. Type must be a human |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
147 readable word, as it may be used in searches. Good types examples are |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
148 **blog** or **event**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
149 |
4249 | 150 node (str) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
151 prefix of a node name which may be used to identify its type. Example: |
4249 | 152 *urnxmpp:microblog0* (a node starting with this name will be identified as |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
153 *blog* node). |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
154 |
4249 | 155 namespace (str) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
156 root namespace of items. When analysing a node, the first item will be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
157 retrieved. The analyser will be chosen its given namespace match the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
158 namespace of the first child element of ``<item>`` element. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
159 |
4249 | 160 to_sync (bool) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
161 if True, the node must be synchronised in cache. The default False value |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
162 means that the pubsub service will always be requested. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
163 |
4249 | 164 parser (callable) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
165 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
166 to call to parse the ``domish.Element`` of the item. The result must be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
167 dictionary which can be serialised to JSON. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
168 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
169 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
170 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
171 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
172 service: Optional[jid.JID], node: Optional[str]) \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
173 -> dict |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
174 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
175 |
4249 | 176 match_cb (callable) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
177 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
178 called when the analyser matches. The method is called with the curreny |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
179 analyse which is can modify **in-place**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
180 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
181 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
182 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
183 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
184 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
185 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
186 @raise exceptions.Conflict: a analyser with this name already exists |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
187 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
188 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
189 name = analyser.get("name", "").strip().lower() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
190 # we want the normalised name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
191 analyser["name"] = name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
192 if not name: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
193 raise ValueError('"name" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
194 if "type" not in analyser: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
195 raise ValueError('"type" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
196 type_test_keys = {"node", "namespace"} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
197 if not type_test_keys.intersection(analyser): |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
198 raise ValueError(f"at least one of {type_test_keys} must be used") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
199 if name in self.analysers: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
200 raise exceptions.Conflict( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
201 f"An analyser with the name {name!r} is already registered" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
202 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
203 self.analysers[name] = analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
204 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
205 async def cache_items( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
206 self, client: SatXMPPEntity, pubsub_node: PubsubNode, items: List[domish.Element] |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
207 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
208 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
209 parser = self.analysers[pubsub_node.analyser].get("parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
210 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
211 parser = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
212 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
213 if parser is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
214 parsed_items = [ |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
215 await utils.as_deferred( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
216 parser, client, item, pubsub_node.service, pubsub_node.name |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
217 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
218 for item in items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
219 ] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
220 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
221 parsed_items = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
222 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
223 await self.host.memory.storage.cache_pubsub_items( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
224 client, pubsub_node, items, parsed_items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
225 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
226 |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
227 async def _cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
228 await self.host.memory.storage.update_pubsub_node_sync_state( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
229 pubsub_node, SyncState.IN_PROGRESS |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
230 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
231 service, node = pubsub_node.service, pubsub_node.name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
232 try: |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
233 log.debug(f"Caching node {node!r} at {service} for {client.profile}") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
234 if not pubsub_node.subscribed: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
235 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
236 sub = await self._p.subscribe(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
237 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
238 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
239 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
240 "Can't subscribe node {pubsub_node}, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
241 "synchronisation can't be maintained: {reason}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
242 ).format(pubsub_node=pubsub_node, reason=e) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
243 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
244 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
245 if sub.state == "subscribed": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
246 sub_id = sub.subscriptionIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
247 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
248 f"{pubsub_node} subscribed (subscription id: {sub_id!r})" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
249 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
250 pubsub_node.subscribed = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
251 await self.host.memory.storage.add(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
252 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
253 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
254 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
255 "{pubsub_node} is not subscribed, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
256 "synchronisation can't be maintained, and you may have " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
257 "to enforce subscription manually. Subscription state: " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
258 "{state}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
259 ).format(pubsub_node=pubsub_node, state=sub.state) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
260 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
261 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
262 try: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
263 await self.host.check_features( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
264 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
265 ) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
266 except error.StanzaError as e: |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
267 if e.condition == "service-unavailable": |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
268 log.warning( |
3854
8a2c46122a11
plugin XEP-0060: fix bad naming of return variable
Goffi <goffi@goffi.org>
parents:
3831
diff
changeset
|
269 "service {service} is hidding disco infos, we'll only cache " |
8a2c46122a11
plugin XEP-0060: fix bad naming of return variable
Goffi <goffi@goffi.org>
parents:
3831
diff
changeset
|
270 "latest 20 items" |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
271 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
272 items, __ = await client.pubsub_client.items( |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
273 pubsub_node.service, pubsub_node.name, maxItems=20 |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
274 ) |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
275 await self.cache_items(client, pubsub_node, items) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
276 else: |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
277 raise e |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
278 except exceptions.FeatureNotFound: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
279 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
280 f"service {service} doesn't handle Result Set Management " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
281 "(XEP-0059), we'll only cache latest 20 items" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
282 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
283 items, __ = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
284 pubsub_node.service, pubsub_node.name, maxItems=20 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
285 ) |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
286 await self.cache_items(client, pubsub_node, items) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
287 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
288 rsm_p = self.host.plugins["XEP-0059"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
289 rsm_request = rsm.RSMRequest() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
290 cached_ids = set() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
291 while True: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
292 items, rsm_response = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
293 service, node, rsm_request=rsm_request |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
294 ) |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
295 await self.cache_items(client, pubsub_node, items) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
296 for item in items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
297 item_id = item["id"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
298 if item_id in cached_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
299 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
300 f"Pubsub node {node!r} at {service} is returning several " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
301 f"times the same item ({item_id!r}). This is illegal " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
302 "behaviour, and it means that Pubsub service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
303 f"{service} is buggy and can't be cached properly. " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
304 f"Please report this to {service.host} administrators" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
305 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
306 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
307 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
308 cached_ids.add(item["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
309 if len(cached_ids) >= CACHE_LIMIT: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
310 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
311 f"Pubsub node {node!r} at {service} contains more items " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
312 f"than the cache limit ({CACHE_LIMIT}). We stop " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
313 "caching here, at item {item['id']!r}." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
314 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
315 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
316 break |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
317 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
318 if rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
319 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
320 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
321 await self.host.memory.storage.update_pubsub_node_sync_state( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
322 pubsub_node, SyncState.COMPLETED |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
323 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
324 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
325 import traceback |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
326 |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
327 tb = traceback.format_tb(e.__traceback__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
328 log.error( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
329 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
330 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
331 await self.host.memory.storage.update_pubsub_node_sync_state( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
332 pubsub_node, SyncState.ERROR |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
333 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
334 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
335 raise e |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
336 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
337 def _cache_node_clean(self, __, pubsub_node): |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
338 del self.in_progress[(pubsub_node.service, pubsub_node.name)] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
339 |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
340 def cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
341 """Launch node caching as a background task""" |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
342 d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
343 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
344 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
345 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
346 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
347 async def analyse_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
348 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
349 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
350 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
351 node: str, |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
352 pubsub_node: PubsubNode = None, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
353 ) -> dict: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
354 """Use registered analysers on a node to determine what it is used for""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
355 analyse = {"service": service, "node": node} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
356 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
357 try: |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
358 first_item = (await client.pubsub_client.items(service, node, 1))[0][0] |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
359 except IndexError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
360 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
361 except error.StanzaError as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
362 if e.condition == "item-not-found": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
363 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
364 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
365 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
366 f"Can't retrieve last item on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
367 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
368 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
369 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
370 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
371 uri = first_item.firstChildElement().uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
372 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
373 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
374 f"Can't retrieve item namespace on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
375 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
376 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
377 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
378 analyse["namespace"] = uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
379 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
380 conf = await self._p.getConfiguration(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
381 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
382 log.warning( |
4221
2b000790b197
plugin XEP-0060: fix incorrect type hint.
Goffi <goffi@goffi.org>
parents:
4071
diff
changeset
|
383 f"Can't retrieve configuration for node {node!r} at service " |
2b000790b197
plugin XEP-0060: fix incorrect type hint.
Goffi <goffi@goffi.org>
parents:
4071
diff
changeset
|
384 f"{service} for {client.profile}: {e}" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
385 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
386 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
387 analyse["conf"] = conf |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
388 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
389 for analyser in self.analysers.values(): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
390 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
391 an_node = analyser["node"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
392 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
393 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
394 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
395 if node.startswith(an_node): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
396 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
397 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
398 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
399 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
400 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
401 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
402 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
403 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
404 namespace = analyse["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
405 an_namespace = analyser["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
406 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
407 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
408 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
409 if namespace == an_namespace: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
410 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
411 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
412 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
413 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
414 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
415 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
416 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
417 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
418 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
419 found = False |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
420 log.debug(f"node {node!r} at service {service} doesn't match any known type") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
421 if found: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
422 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
423 match_cb = analyser["match_cb"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
424 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
425 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
426 else: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
427 await utils.as_deferred(match_cb, client, analyse) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
428 return analyse |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
429 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
430 def _get_items_from_cache( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
431 self, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
432 service="", |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
433 node="", |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
434 max_items=10, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
435 item_ids=None, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
436 sub_id=None, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
437 extra="", |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
438 profile_key=C.PROF_KEY_NONE, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
439 ): |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
440 d = defer.ensureDeferred( |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
441 self._a_get_items_from_cache( |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
442 service, node, max_items, item_ids, sub_id, extra, profile_key |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
443 ) |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
444 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
445 d.addCallback(self._p.trans_items_data) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
446 d.addCallback(self._p.serialise_items) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
447 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
448 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
449 async def _a_get_items_from_cache( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
450 self, service, node, max_items, item_ids, sub_id, extra, profile_key |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
451 ): |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
452 client = self.host.get_client(profile_key) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
453 service = jid.JID(service) if service else client.jid.userhostJID() |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
454 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
455 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
456 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
457 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
458 raise exceptions.NotFound( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
459 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
460 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
461 max_items = None if max_items == C.NO_LIMIT else max_items |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
462 extra = self._p.parse_extra(data_format.deserialise(extra)) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
463 items, metadata = await self.get_items_from_cache( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
464 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
465 pubsub_node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
466 max_items, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
467 item_ids, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
468 sub_id or None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
469 extra.rsm_request, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
470 extra.extra, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
471 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
472 return [i.data for i in items], metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
473 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
474 async def get_items_from_cache( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
475 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
476 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
477 node: PubsubNode, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
478 max_items: Optional[int] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
479 item_ids: Optional[List[str]] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
480 sub_id: Optional[str] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
481 rsm_request: Optional[rsm.RSMRequest] = None, |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
482 extra: Optional[Dict[str, Any]] = None, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
483 ) -> Tuple[List[PubsubItem], dict]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
484 """Get items from cache, using same arguments as for external Pubsub request""" |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
485 if extra is None: |
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
486 extra = {} |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
487 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
488 raise NotImplementedError("MAM queries are not supported yet") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
489 if max_items is None and rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
490 max_items = 20 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
491 pubsub_items, metadata = await self.host.memory.storage.get_items( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
492 node, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
493 max_items=max_items, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
494 item_ids=item_ids or None, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
495 order_by=extra.get(C.KEY_ORDER_BY), |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
496 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
497 elif max_items is not None: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
498 if rsm_request is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
499 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
500 "Pubsub max items and RSM must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
501 ) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
502 elif item_ids: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
503 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
504 "Pubsub max items and item IDs must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
505 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
506 pubsub_items, metadata = await self.host.memory.storage.get_items( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
507 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
508 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
509 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
510 desc = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
511 if rsm_request.before == "": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
512 before = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
513 desc = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
514 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
515 before = rsm_request.before |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
516 pubsub_items, metadata = await self.host.memory.storage.get_items( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
517 node, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
518 max_items=rsm_request.max, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
519 before=before, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
520 after=rsm_request.after, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
521 from_index=rsm_request.index, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
522 order_by=extra.get(C.KEY_ORDER_BY), |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
523 desc=desc, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
524 force_rsm=True, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
525 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
526 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
527 return pubsub_items, metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
528 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
529 async def on_items_event(self, client, event): |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
530 node = await self.host.memory.storage.get_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
531 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
532 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
533 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
534 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
535 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
536 items = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
537 retract_ids = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
538 for elt in event.items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
539 if elt.name == "item": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
540 items.append(elt) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
541 elif elt.name == "retract": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
542 item_id = elt.getAttribute("id") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
543 if not item_id: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
544 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
545 "Ignoring invalid retract item element: " |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
546 f"{xml_tools.p_fmt_elt(elt)}" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
547 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
548 continue |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
549 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
550 retract_ids.append(elt["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
551 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
552 log.warning( |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
553 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
554 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
555 if items: |
3831
604b6acaee22
plugin pubsub cache: resync in `synchronise` when node's `sync_state` is not set:
Goffi <goffi@goffi.org>
parents:
3760
diff
changeset
|
556 log.debug(f"[{client.profile}] caching new items received from {node}") |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
557 await self.cache_items(client, node, items) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
558 if retract_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
559 log.debug(f"deleting retracted items from {node}") |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
560 await self.host.memory.storage.delete_pubsub_items( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
561 node, items_names=retract_ids |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
562 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
563 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
564 async def on_delete_event(self, client, event): |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
565 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
566 f"deleting node {event.nodeIdentifier} from {event.sender} for " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
567 f"{client.profile}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
568 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
569 await self.host.memory.storage.delete_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
570 [client.profile], [event.sender], [event.nodeIdentifier] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
571 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
572 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
573 async def on_purge_event(self, client, event): |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
574 node = await self.host.memory.storage.get_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
575 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
576 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
577 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
578 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
579 log.debug(f"purging node {node} for {client.profile}") |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
580 await self.host.memory.storage.delete_pubsub_items(node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
581 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
582 async def _get_items_trigger( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
583 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
584 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
585 service: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
586 node: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
587 max_items: Optional[int], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
588 item_ids: Optional[List[str]], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
589 sub_id: Optional[str], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
590 rsm_request: Optional[rsm.RSMRequest], |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
591 extra: dict, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
592 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
593 if not self.use_cache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
594 log.debug("cache disabled in settings") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
595 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
596 if extra.get(C.KEY_USE_CACHE) == False: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
597 log.debug("skipping pubsub cache as requested") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
598 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
599 if service is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
600 service = client.jid.userhostJID() |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
601 for __ in range(5): |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
602 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
603 client, service, node |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
604 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
605 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
606 analyse = {"to_sync": True} |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
607 else: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
608 analyse = await self.analyse_node(client, service, node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
609 |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
610 if pubsub_node is None: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
611 try: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
612 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
613 client, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
614 service, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
615 node, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
616 analyser=analyse.get("name"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
617 type_=analyse.get("type"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
618 subtype=analyse.get("subtype"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
619 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
620 except IntegrityError as e: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
621 if "unique" in str(e.orig).lower(): |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
622 log.debug( |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
623 "race condition on pubsub node creation in cache, trying " |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
624 "again" |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
625 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
626 else: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
627 raise e |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
628 break |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
629 else: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
630 raise exceptions.InternalError( |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
631 "Too many IntegrityError with UNIQUE constraint, something is going wrong" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
632 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
633 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
634 if analyse.get("to_sync"): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
635 if pubsub_node.sync_state == SyncState.COMPLETED: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
636 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
637 log.debug("MAM caching is not supported yet, skipping cache") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
638 return True, None |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
639 pubsub_items, metadata = await self.get_items_from_cache( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
640 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
641 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
642 return False, ([i.data for i in pubsub_items], metadata) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
643 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
644 if pubsub_node.sync_state == SyncState.IN_PROGRESS: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
645 if (service, node) not in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
646 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
647 f"{pubsub_node} is reported as being cached, but not caching is " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
648 "in progress, this is most probably due to the backend being " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
649 "restarted. Resetting the status, caching will be done again." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
650 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
651 pubsub_node.sync_state = None |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
652 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
653 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
654 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
655 f"{pubsub_node} is in progress for too long " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
656 f"({pubsub_node.sync_state_updated//60} minutes), " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
657 "cancelling it and retrying." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
658 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
659 self.in_progress.pop[(service, node)].cancel() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
660 pubsub_node.sync_state = None |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
661 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
662 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
663 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
664 f"{pubsub_node} synchronisation is already in progress, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
665 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
666 if pubsub_node.sync_state is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
667 key = (service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
668 if key in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
669 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
670 f"There is already a caching in progress for {pubsub_node}, this " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
671 "should not happen" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
672 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
673 self.cache_node(client, pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
674 elif pubsub_node.sync_state == SyncState.ERROR: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
675 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
676 f"{pubsub_node} synchronisation has previously failed, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
677 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
678 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
679 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
680 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
681 async def _subscribe_trigger( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
682 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
683 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
684 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
685 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
686 sub_jid: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
687 options: Optional[dict], |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
688 subscription: pubsub.Subscription, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
689 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
690 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
691 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
692 async def _unsubscribe_trigger( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
693 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
694 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
695 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
696 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
697 sub_jid, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
698 subscriptionIdentifier, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
699 sender, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
700 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
701 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
702 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
703 def _synchronise(self, service, node, profile_key): |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
704 client = self.host.get_client(profile_key) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
705 service = client.jid.userhostJID() if not service else jid.JID(service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
706 return defer.ensureDeferred(self.synchronise(client, service, node)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
707 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
708 async def synchronise( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
709 self, client: SatXMPPEntity, service: jid.JID, node: str, resync: bool = True |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
710 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
711 """Synchronise a node with a pubsub service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
712 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
713 The node will be synchronised even if there is no matching analyser. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
714 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
715 Note that when a node is synchronised, it is automatically subscribed. |
3760
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
716 @param resync: if True and the node is already synchronised, it will be |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
717 resynchronised (all items will be deleted and re-downloaded). |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
718 |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
719 """ |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
720 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
721 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
722 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
723 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
724 log.info( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
725 _("Synchronising the new node {node} at {service}").format( |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
726 node=node, service=service.full |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
727 ) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
728 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
729 analyse = await self.analyse_node(client, service, node) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
730 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
731 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
732 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
733 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
734 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
735 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
736 ) |
3831
604b6acaee22
plugin pubsub cache: resync in `synchronise` when node's `sync_state` is not set:
Goffi <goffi@goffi.org>
parents:
3760
diff
changeset
|
737 elif not resync and pubsub_node.sync_state is not None: |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
738 # the node exists, nothing to do |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
739 return |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
740 |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
741 if ( |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
742 pubsub_node.sync_state == SyncState.IN_PROGRESS |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
743 or (service, node) in self.in_progress |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
744 ): |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
745 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
746 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
747 "{node} at {service} is already being synchronised, can't do a new " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
748 "synchronisation." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
749 ).format(node=node, service=service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
750 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
751 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
752 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
753 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
754 "(Re)Synchronising the node {node} at {service} on user request" |
3934
e345d93fb6e5
plugin OXPS: OpenPGP for XMPP Pubsub implementation:
Goffi <goffi@goffi.org>
parents:
3863
diff
changeset
|
755 ).format(node=node, service=service.full()) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
756 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
757 # we first delete and recreate the node (will also delete its items) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
758 await self.host.memory.storage.delete(pubsub_node) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
759 analyse = await self.analyse_node(client, service, node) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
760 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
761 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
762 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
763 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
764 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
765 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
766 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
767 # then we can put node in cache |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
768 await self.cache_node(client, pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
769 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
770 async def purge(self, purge_filters: dict) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
771 """Remove items according to filters |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
772 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
773 filters can have on of the following keys, all are optional: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
774 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
775 :services: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
776 list of JIDs of services from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
777 :nodes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
778 list of node names to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
779 :types: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
780 list of node types to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
781 :subtypes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
782 list of node subtypes to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
783 :profiles: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
784 list of profiles from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
785 :created_before: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
786 datetime before which items must have been created to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
787 :created_update: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
788 datetime before which items must have been updated last to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
789 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
790 purge_filters["names"] = purge_filters.pop("nodes", None) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
791 await self.host.memory.storage.purge_pubsub_items(**purge_filters) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
792 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
793 def _purge(self, purge_filters: str) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
794 purge_filters = data_format.deserialise(purge_filters) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
795 for key in "created_before", "updated_before": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
796 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
797 purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
798 except (KeyError, TypeError): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
799 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
800 return defer.ensureDeferred(self.purge(purge_filters)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
801 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
802 async def reset(self) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
803 """Remove ALL nodes and items from cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
804 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
805 After calling this method, cache will be refilled progressively as if it where new |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
806 """ |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
807 await self.host.memory.storage.delete_pubsub_node(None, None, None) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
808 |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
809 def _reset(self) -> defer.Deferred: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
810 return defer.ensureDeferred(self.reset()) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
811 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
812 async def search(self, query: dict) -> List[PubsubItem]: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
813 """Search pubsub items in cache""" |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
814 return await self.host.memory.storage.search_pubsub_items(query) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
815 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
816 async def serialisable_search(self, query: dict) -> List[dict]: |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
817 """Search pubsub items in cache and returns parsed data |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
818 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
819 The returned data can be serialised. |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
820 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
821 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
822 """ |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
823 items = await self.search(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
824 ret = [] |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
825 for item in items: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
826 parsed = item.parsed |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
827 parsed["pubsub_service"] = item.node.service.full() |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
828 parsed["pubsub_node"] = item.node.name |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
829 if query.get("with_payload"): |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
830 parsed["item_payload"] = item.data.toXml() |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
831 parsed["node_profile"] = self.host.memory.storage.get_profile_by_id( |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
832 item.node.profile_id |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
833 ) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
834 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
835 ret.append(parsed) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
836 return ret |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
837 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
838 def _search(self, query: str) -> defer.Deferred: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
839 query = data_format.deserialise(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
840 services = query.get("services") |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
841 if services: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
842 query["services"] = [jid.JID(s) for s in services] |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
843 d = defer.ensureDeferred(self.serialisable_search(query)) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
844 d.addCallback(data_format.serialise) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
845 return d |