Mercurial > libervia-backend
annotate libervia/backend/plugins/plugin_pubsub_cache.py @ 4335:430d5d99a740
plugin XEP-0358: "Publishing Available Jingle Sessions" implementation:
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:07 +0100 |
parents | 0d7bb4df2343 |
children |
rev | line source |
---|---|
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
1 #!/usr/bin/env python3 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
2 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
3 # Libervia plugin for PubSub Caching |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
5 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
6 # This program is free software: you can redistribute it and/or modify |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
7 # it under the terms of the GNU Affero General Public License as published by |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
8 # the Free Software Foundation, either version 3 of the License, or |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
9 # (at your option) any later version. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
10 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
11 # This program is distributed in the hope that it will be useful, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
14 # GNU Affero General Public License for more details. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
15 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
16 # You should have received a copy of the GNU Affero General Public License |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
18 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
19 import time |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
20 from datetime import datetime |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
21 from typing import Optional, List, Tuple, Dict, Any |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
22 from twisted.words.protocols.jabber import jid, error |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
23 from twisted.words.xish import domish |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
24 from twisted.internet import defer |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
25 from wokkel import pubsub, rsm |
4071
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
26 from libervia.backend.core.i18n import _ |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
27 from libervia.backend.core.constants import Const as C |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
28 from libervia.backend.core import exceptions |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
29 from libervia.backend.core.log import getLogger |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
30 from libervia.backend.core.core_types import SatXMPPEntity |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
31 from libervia.backend.tools import xml_tools, utils |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
32 from libervia.backend.tools.common import data_format |
4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents:
4037
diff
changeset
|
33 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
34 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
35 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
36 log = getLogger(__name__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
37 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
38 PLUGIN_INFO = { |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
39 C.PI_NAME: "PubSub Cache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB, |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
42 C.PI_MODES: C.PLUG_MODE_BOTH, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
43 C.PI_PROTOCOLS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
44 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
45 C.PI_RECOMMENDATIONS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
46 C.PI_MAIN: "PubsubCache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
47 C.PI_HANDLER: "no", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
48 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
49 } |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
50 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
51 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
52 # maximum of items to cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
53 CACHE_LIMIT = 5000 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
54 # number of second before a progress caching is considered failed and tried again |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
55 PROGRESS_DEADLINE = 60 * 60 * 6 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
56 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
57 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
58 class PubsubCache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
60 # but it would be necessary to have this data if some devices unsubscribe a cached |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
62 # situation. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
63 # TODO: handle configuration events |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
64 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
65 def __init__(self, host): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
66 log.info(_("PubSub Cache initialization")) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
67 strategy = host.memory.config_get(None, "pubsub_cache_strategy") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
68 if strategy == "no_cache": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
69 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
70 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
71 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
72 "setting." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
73 ).format(value=repr(strategy)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
74 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
75 self.use_cache = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
76 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
77 self.use_cache = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
78 self.host = host |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
79 self._p = host.plugins["XEP-0060"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
80 self.analysers = {} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
81 # map for caching in progress (node, service) => Deferred |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
82 self.in_progress = {} |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
83 self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
84 self._p.add_managed_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
85 "", |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
86 items_cb=self.on_items_event, |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
87 delete_cb=self.on_delete_event, |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
88 purge_db=self.on_purge_event, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
89 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
90 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
91 "ps_cache_get", |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
92 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
93 in_sign="ssiassss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
94 out_sign="s", |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
95 method=self._get_items_from_cache, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
96 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
97 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
98 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
99 "ps_cache_sync", |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
100 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
101 "sss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
102 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
103 method=self._synchronise, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
104 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
105 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
106 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
107 "ps_cache_purge", |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
108 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
109 "s", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
110 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
111 method=self._purge, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
112 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
113 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
114 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
115 "ps_cache_reset", |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
116 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
117 "", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
118 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
119 method=self._reset, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
120 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
121 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
122 host.bridge.add_method( |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
123 "ps_cache_search", |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
124 ".plugin", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
125 "s", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
126 out_sign="s", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
127 method=self._search, |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
128 async_=True, |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
129 ) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
130 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
131 def register_analyser(self, analyser: dict) -> None: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
132 """Register a new pubsub node analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
133 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
134 @param analyser: An analyser is a dictionary which may have the following keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
135 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
136 must be used): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
137 |
4249 | 138 name (str)* |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
139 a unique name for this analyser. This name will be stored in database |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
140 to retrieve the analyser when necessary (notably to get the parsing method), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
141 thus it is recommended to use a stable name such as the source plugin name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
142 instead of a name which may change with standard evolution, such as the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
143 feature namespace. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
144 |
4249 | 145 type (str)* |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
146 indicates what kind of items we are dealing with. Type must be a human |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
147 readable word, as it may be used in searches. Good types examples are |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
148 **blog** or **event**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
149 |
4249 | 150 node (str) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
151 prefix of a node name which may be used to identify its type. Example: |
4249 | 152 *urnxmpp:microblog0* (a node starting with this name will be identified as |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
153 *blog* node). |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
154 |
4249 | 155 namespace (str) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
156 root namespace of items. When analysing a node, the first item will be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
157 retrieved. The analyser will be chosen its given namespace match the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
158 namespace of the first child element of ``<item>`` element. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
159 |
4249 | 160 to_sync (bool) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
161 if True, the node must be synchronised in cache. The default False value |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
162 means that the pubsub service will always be requested. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
163 |
4249 | 164 parser (callable) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
165 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
166 to call to parse the ``domish.Element`` of the item. The result must be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
167 dictionary which can be serialised to JSON. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
168 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
169 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
170 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
171 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
172 service: Optional[jid.JID], node: Optional[str]) \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
173 -> dict |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
174 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
175 |
4249 | 176 match_cb (callable) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
177 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
178 called when the analyser matches. The method is called with the curreny |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
179 analyse which is can modify **in-place**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
180 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
181 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
182 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
183 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
184 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
185 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
186 @raise exceptions.Conflict: a analyser with this name already exists |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
187 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
188 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
189 name = analyser.get("name", "").strip().lower() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
190 # we want the normalised name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
191 analyser["name"] = name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
192 if not name: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
193 raise ValueError('"name" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
194 if "type" not in analyser: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
195 raise ValueError('"type" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
196 type_test_keys = {"node", "namespace"} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
197 if not type_test_keys.intersection(analyser): |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
198 raise ValueError(f"at least one of {type_test_keys} must be used") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
199 if name in self.analysers: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
200 raise exceptions.Conflict( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
201 f"An analyser with the name {name!r} is already registered" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
202 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
203 self.analysers[name] = analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
204 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
205 async def cache_items( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
206 self, client: SatXMPPEntity, pubsub_node: PubsubNode, items: List[domish.Element] |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
207 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
208 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
209 parser = self.analysers[pubsub_node.analyser].get("parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
210 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
211 parser = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
212 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
213 if parser is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
214 parsed_items = [ |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
215 await utils.as_deferred( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
216 parser, client, item, pubsub_node.service, pubsub_node.name |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
217 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
218 for item in items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
219 ] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
220 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
221 parsed_items = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
222 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
223 await self.host.memory.storage.cache_pubsub_items( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
224 client, pubsub_node, items, parsed_items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
225 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
226 |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
227 async def _cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
228 await self.host.memory.storage.update_pubsub_node_sync_state( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
229 pubsub_node, SyncState.IN_PROGRESS |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
230 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
231 service, node = pubsub_node.service, pubsub_node.name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
232 try: |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
233 log.debug(f"Caching node {node!r} at {service} for {client.profile}") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
234 if not pubsub_node.subscribed: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
235 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
236 sub = await self._p.subscribe(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
237 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
238 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
239 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
240 "Can't subscribe node {pubsub_node}, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
241 "synchronisation can't be maintained: {reason}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
242 ).format(pubsub_node=pubsub_node, reason=e) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
243 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
244 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
245 if sub.state == "subscribed": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
246 sub_id = sub.subscriptionIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
247 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
248 f"{pubsub_node} subscribed (subscription id: {sub_id!r})" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
249 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
250 pubsub_node.subscribed = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
251 await self.host.memory.storage.add(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
252 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
253 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
254 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
255 "{pubsub_node} is not subscribed, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
256 "synchronisation can't be maintained, and you may have " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
257 "to enforce subscription manually. Subscription state: " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
258 "{state}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
259 ).format(pubsub_node=pubsub_node, state=sub.state) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
260 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
261 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
262 try: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
263 await self.host.check_features( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
264 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
265 ) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
266 except error.StanzaError as e: |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
267 if e.condition == "service-unavailable": |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
268 log.warning( |
3854
8a2c46122a11
plugin XEP-0060: fix bad naming of return variable
Goffi <goffi@goffi.org>
parents:
3831
diff
changeset
|
269 "service {service} is hidding disco infos, we'll only cache " |
8a2c46122a11
plugin XEP-0060: fix bad naming of return variable
Goffi <goffi@goffi.org>
parents:
3831
diff
changeset
|
270 "latest 20 items" |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
271 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
272 items, __ = await client.pubsub_client.items( |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
273 pubsub_node.service, pubsub_node.name, maxItems=20 |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
274 ) |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
275 await self.cache_items(client, pubsub_node, items) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
276 else: |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
277 raise e |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
278 except exceptions.FeatureNotFound: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
279 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
280 f"service {service} doesn't handle Result Set Management " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
281 "(XEP-0059), we'll only cache latest 20 items" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
282 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
283 items, __ = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
284 pubsub_node.service, pubsub_node.name, maxItems=20 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
285 ) |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
286 await self.cache_items(client, pubsub_node, items) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
287 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
288 rsm_p = self.host.plugins["XEP-0059"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
289 rsm_request = rsm.RSMRequest() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
290 cached_ids = set() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
291 while True: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
292 items, rsm_response = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
293 service, node, rsm_request=rsm_request |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
294 ) |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
295 await self.cache_items(client, pubsub_node, items) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
296 for item in items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
297 item_id = item["id"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
298 if item_id in cached_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
299 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
300 f"Pubsub node {node!r} at {service} is returning several " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
301 f"times the same item ({item_id!r}). This is illegal " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
302 "behaviour, and it means that Pubsub service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
303 f"{service} is buggy and can't be cached properly. " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
304 f"Please report this to {service.host} administrators" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
305 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
306 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
307 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
308 cached_ids.add(item["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
309 if len(cached_ids) >= CACHE_LIMIT: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
310 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
311 f"Pubsub node {node!r} at {service} contains more items " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
312 f"than the cache limit ({CACHE_LIMIT}). We stop " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
313 "caching here, at item {item['id']!r}." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
314 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
315 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
316 break |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
317 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
318 if rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
319 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
320 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
321 await self.host.memory.storage.update_pubsub_node_sync_state( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
322 pubsub_node, SyncState.COMPLETED |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
323 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
324 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
325 import traceback |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
326 |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
327 tb = traceback.format_tb(e.__traceback__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
328 log.error( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
329 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
330 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
331 await self.host.memory.storage.update_pubsub_node_sync_state( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
332 pubsub_node, SyncState.ERROR |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
333 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
334 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
335 raise e |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
336 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
337 def _cache_node_clean(self, __, pubsub_node): |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
338 del self.in_progress[(pubsub_node.service, pubsub_node.name)] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
339 |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
340 def cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
341 """Launch node caching as a background task""" |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
342 d = defer.ensureDeferred(self._cache_node(client, pubsub_node)) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
343 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
344 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
345 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
346 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
347 async def analyse_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
348 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
349 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
350 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
351 node: str, |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
352 pubsub_node: PubsubNode = None, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
353 ) -> dict: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
354 """Use registered analysers on a node to determine what it is used for""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
355 analyse = {"service": service, "node": node} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
356 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
357 try: |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
358 first_item = (await client.pubsub_client.items(service, node, 1))[0][0] |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
359 except IndexError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
360 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
361 except error.StanzaError as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
362 if e.condition == "item-not-found": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
363 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
364 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
365 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
366 f"Can't retrieve last item on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
367 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
368 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
369 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
370 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
371 uri = first_item.firstChildElement().uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
372 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
373 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
374 f"Can't retrieve item namespace on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
375 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
376 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
377 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
378 analyse["namespace"] = uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
379 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
380 conf = await self._p.getConfiguration(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
381 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
382 log.warning( |
4221
2b000790b197
plugin XEP-0060: fix incorrect type hint.
Goffi <goffi@goffi.org>
parents:
4071
diff
changeset
|
383 f"Can't retrieve configuration for node {node!r} at service " |
2b000790b197
plugin XEP-0060: fix incorrect type hint.
Goffi <goffi@goffi.org>
parents:
4071
diff
changeset
|
384 f"{service} for {client.profile}: {e}" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
385 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
386 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
387 analyse["conf"] = conf |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
388 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
389 for analyser in self.analysers.values(): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
390 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
391 an_node = analyser["node"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
392 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
393 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
394 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
395 if node.startswith(an_node): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
396 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
397 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
398 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
399 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
400 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
401 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
402 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
403 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
404 namespace = analyse["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
405 an_namespace = analyser["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
406 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
407 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
408 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
409 if namespace == an_namespace: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
410 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
411 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
412 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
413 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
414 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
415 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
416 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
417 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
418 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
419 found = False |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
420 log.debug(f"node {node!r} at service {service} doesn't match any known type") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
421 if found: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
422 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
423 match_cb = analyser["match_cb"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
424 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
425 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
426 else: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
427 await utils.as_deferred(match_cb, client, analyse) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
428 return analyse |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
429 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
430 def _get_items_from_cache( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
431 self, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
432 service="", |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
433 node="", |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
434 max_items=10, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
435 item_ids=None, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
436 sub_id=None, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
437 extra="", |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
438 profile_key=C.PROF_KEY_NONE, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
439 ): |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
440 d = defer.ensureDeferred( |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
441 self._a_get_items_from_cache( |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
442 service, node, max_items, item_ids, sub_id, extra, profile_key |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
443 ) |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
444 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
445 d.addCallback(self._p.trans_items_data) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
446 d.addCallback(self._p.serialise_items) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
447 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
448 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
449 async def _a_get_items_from_cache( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
450 self, service, node, max_items, item_ids, sub_id, extra, profile_key |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
451 ): |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
452 client = self.host.get_client(profile_key) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
453 service = jid.JID(service) if service else client.jid.userhostJID() |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
454 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
455 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
456 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
457 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
458 raise exceptions.NotFound( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
459 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
460 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
461 max_items = None if max_items == C.NO_LIMIT else max_items |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
462 extra = self._p.parse_extra(data_format.deserialise(extra)) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
463 items, metadata = await self.get_items_from_cache( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
464 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
465 pubsub_node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
466 max_items, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
467 item_ids, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
468 sub_id or None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
469 extra.rsm_request, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
470 extra.extra, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
471 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
472 return [i.data for i in items], metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
473 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
474 async def get_items_from_cache( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
475 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
476 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
477 node: PubsubNode, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
478 max_items: Optional[int] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
479 item_ids: Optional[List[str]] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
480 sub_id: Optional[str] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
481 rsm_request: Optional[rsm.RSMRequest] = None, |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
482 extra: Optional[Dict[str, Any]] = None, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
483 ) -> Tuple[List[PubsubItem], dict]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
484 """Get items from cache, using same arguments as for external Pubsub request""" |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
485 if extra is None: |
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
486 extra = {} |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
487 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
488 raise NotImplementedError("MAM queries are not supported yet") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
489 if max_items is None and rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
490 max_items = 20 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
491 pubsub_items, metadata = await self.host.memory.storage.get_items( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
492 node, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
493 max_items=max_items, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
494 item_ids=item_ids or None, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
495 order_by=extra.get(C.KEY_ORDER_BY), |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
496 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
497 elif max_items is not None: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
498 if rsm_request is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
499 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
500 "Pubsub max items and RSM must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
501 ) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
502 elif item_ids: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
503 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
504 "Pubsub max items and item IDs must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
505 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
506 pubsub_items, metadata = await self.host.memory.storage.get_items( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
507 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
508 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
509 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
510 desc = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
511 if rsm_request.before == "": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
512 before = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
513 desc = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
514 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
515 before = rsm_request.before |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
516 pubsub_items, metadata = await self.host.memory.storage.get_items( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
517 node, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
518 max_items=rsm_request.max, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
519 before=before, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
520 after=rsm_request.after, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
521 from_index=rsm_request.index, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
522 order_by=extra.get(C.KEY_ORDER_BY), |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
523 desc=desc, |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
524 force_rsm=True, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
525 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
526 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
527 return pubsub_items, metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
528 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
529 async def on_items_event(self, client, event): |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
530 node = await self.host.memory.storage.get_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
531 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
532 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
533 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
534 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
535 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
536 items = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
537 retract_ids = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
538 for elt in event.items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
539 if elt.name == "item": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
540 items.append(elt) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
541 elif elt.name == "retract": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
542 item_id = elt.getAttribute("id") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
543 if not item_id: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
544 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
545 "Ignoring invalid retract item element: " |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
546 f"{xml_tools.p_fmt_elt(elt)}" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
547 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
548 continue |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
549 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
550 retract_ids.append(elt["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
551 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
552 log.warning( |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
553 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
554 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
555 if items: |
3831
604b6acaee22
plugin pubsub cache: resync in `synchronise` when node's `sync_state` is not set:
Goffi <goffi@goffi.org>
parents:
3760
diff
changeset
|
556 log.debug(f"[{client.profile}] caching new items received from {node}") |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
557 await self.cache_items(client, node, items) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
558 if retract_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
559 log.debug(f"deleting retracted items from {node}") |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
560 await self.host.memory.storage.delete_pubsub_items( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
561 node, items_names=retract_ids |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
562 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
563 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
564 async def on_delete_event(self, client, event): |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
565 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
566 f"deleting node {event.nodeIdentifier} from {event.sender} for " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
567 f"{client.profile}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
568 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
569 await self.host.memory.storage.delete_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
570 [client.profile], [event.sender], [event.nodeIdentifier] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
571 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
572 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
573 async def on_purge_event(self, client, event): |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
574 node = await self.host.memory.storage.get_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
575 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
576 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
577 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
578 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
579 log.debug(f"purging node {node} for {client.profile}") |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
580 await self.host.memory.storage.delete_pubsub_items(node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
581 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
582 async def _get_items_trigger( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
583 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
584 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
585 service: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
586 node: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
587 max_items: Optional[int], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
588 item_ids: Optional[List[str]], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
589 sub_id: Optional[str], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
590 rsm_request: Optional[rsm.RSMRequest], |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
591 extra: dict, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
592 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
593 if not self.use_cache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
594 log.debug("cache disabled in settings") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
595 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
596 if extra.get(C.KEY_USE_CACHE) == False: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
597 log.debug("skipping pubsub cache as requested") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
598 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
599 if service is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
600 service = client.jid.userhostJID() |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
601 for __ in range(5): |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
602 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
603 client, service, node |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
604 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
605 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
606 analyse = {"to_sync": True} |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
607 else: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
608 analyse = await self.analyse_node(client, service, node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
609 |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
610 if pubsub_node is None: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
611 try: |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
612 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
613 client, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
614 service, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
615 node, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
616 analyser=analyse.get("name"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
617 type_=analyse.get("type"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
618 subtype=analyse.get("subtype"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
619 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
620 except IntegrityError as e: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
621 if "unique" in str(e.orig).lower(): |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
622 log.debug( |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
623 "race condition on pubsub node creation in cache, trying " |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
624 "again" |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
625 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
626 else: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
627 raise e |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
628 break |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
629 else: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
630 raise exceptions.InternalError( |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
631 "Too many IntegrityError with UNIQUE constraint, something is going wrong" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
632 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
633 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
634 if analyse.get("to_sync"): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
635 if pubsub_node.sync_state == SyncState.COMPLETED: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
636 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
637 log.debug("MAM caching is not supported yet, skipping cache") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
638 return True, None |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
639 pubsub_items, metadata = await self.get_items_from_cache( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
640 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
641 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
642 return False, ([i.data for i in pubsub_items], metadata) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
643 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
644 if pubsub_node.sync_state == SyncState.IN_PROGRESS: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
645 if (service, node) not in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
646 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
647 f"{pubsub_node} is reported as being cached, but not caching is " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
648 "in progress, this is most probably due to the backend being " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
649 "restarted. Resetting the status, caching will be done again." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
650 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
651 pubsub_node.sync_state = None |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
652 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
653 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
654 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
655 f"{pubsub_node} is in progress for too long " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
656 f"({pubsub_node.sync_state_updated//60} minutes), " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
657 "cancelling it and retrying." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
658 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
659 self.in_progress.pop[(service, node)].cancel() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
660 pubsub_node.sync_state = None |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
661 await self.host.memory.storage.delete_pubsub_items(pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
662 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
663 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
664 f"{pubsub_node} synchronisation is already in progress, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
665 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
666 if pubsub_node.sync_state is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
667 key = (service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
668 if key in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
669 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
670 f"There is already a caching in progress for {pubsub_node}, this " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
671 "should not happen" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
672 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
673 self.cache_node(client, pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
674 elif pubsub_node.sync_state == SyncState.ERROR: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
675 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
676 f"{pubsub_node} synchronisation has previously failed, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
677 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
678 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
679 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
680 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
681 async def _subscribe_trigger( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
682 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
683 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
684 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
685 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
686 sub_jid: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
687 options: Optional[dict], |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
688 subscription: pubsub.Subscription, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
689 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
690 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
691 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
692 async def _unsubscribe_trigger( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
693 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
694 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
695 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
696 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
697 sub_jid, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
698 subscriptionIdentifier, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
699 sender, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
700 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
701 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
702 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
703 def _synchronise(self, service, node, profile_key): |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
704 client = self.host.get_client(profile_key) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
705 service = client.jid.userhostJID() if not service else jid.JID(service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
706 return defer.ensureDeferred(self.synchronise(client, service, node)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
707 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
708 async def synchronise( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
709 self, client: SatXMPPEntity, service: jid.JID, node: str, resync: bool = True |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
710 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
711 """Synchronise a node with a pubsub service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
712 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
713 The node will be synchronised even if there is no matching analyser. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
714 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
715 Note that when a node is synchronised, it is automatically subscribed. |
3760
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
716 @param resync: if True and the node is already synchronised, it will be |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
717 resynchronised (all items will be deleted and re-downloaded). |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
718 |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
719 """ |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
720 pubsub_node = await self.host.memory.storage.get_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
721 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
722 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
723 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
724 log.info( |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
725 _("Synchronising the new node {node} at {service}").format( |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
726 node=node, service=service.full |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
727 ) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
728 ) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
729 analyse = await self.analyse_node(client, service, node) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
730 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
731 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
732 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
733 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
734 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
735 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
736 ) |
3831
604b6acaee22
plugin pubsub cache: resync in `synchronise` when node's `sync_state` is not set:
Goffi <goffi@goffi.org>
parents:
3760
diff
changeset
|
737 elif not resync and pubsub_node.sync_state is not None: |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
738 # the node exists, nothing to do |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
739 return |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
740 |
4270
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
741 if ( |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
742 pubsub_node.sync_state == SyncState.IN_PROGRESS |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
743 or (service, node) in self.in_progress |
0d7bb4df2343
Reformatted code base using black.
Goffi <goffi@goffi.org>
parents:
4249
diff
changeset
|
744 ): |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
745 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
746 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
747 "{node} at {service} is already being synchronised, can't do a new " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
748 "synchronisation." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
749 ).format(node=node, service=service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
750 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
751 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
752 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
753 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
754 "(Re)Synchronising the node {node} at {service} on user request" |
3934
e345d93fb6e5
plugin OXPS: OpenPGP for XMPP Pubsub implementation:
Goffi <goffi@goffi.org>
parents:
3863
diff
changeset
|
755 ).format(node=node, service=service.full()) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
756 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
757 # we first delete and recreate the node (will also delete its items) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
758 await self.host.memory.storage.delete(pubsub_node) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
759 analyse = await self.analyse_node(client, service, node) |
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
760 pubsub_node = await self.host.memory.storage.set_pubsub_node( |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
761 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
762 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
763 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
764 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
765 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
766 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
767 # then we can put node in cache |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
768 await self.cache_node(client, pubsub_node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
769 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
770 async def purge(self, purge_filters: dict) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
771 """Remove items according to filters |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
772 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
773 filters can have on of the following keys, all are optional: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
774 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
775 :services: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
776 list of JIDs of services from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
777 :nodes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
778 list of node names to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
779 :types: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
780 list of node types to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
781 :subtypes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
782 list of node subtypes to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
783 :profiles: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
784 list of profiles from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
785 :created_before: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
786 datetime before which items must have been created to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
787 :created_update: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
788 datetime before which items must have been updated last to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
789 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
790 purge_filters["names"] = purge_filters.pop("nodes", None) |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
791 await self.host.memory.storage.purge_pubsub_items(**purge_filters) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
792 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
793 def _purge(self, purge_filters: str) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
794 purge_filters = data_format.deserialise(purge_filters) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
795 for key in "created_before", "updated_before": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
796 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
797 purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
798 except (KeyError, TypeError): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
799 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
800 return defer.ensureDeferred(self.purge(purge_filters)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
801 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
802 async def reset(self) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
803 """Remove ALL nodes and items from cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
804 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
805 After calling this method, cache will be refilled progressively as if it where new |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
806 """ |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
807 await self.host.memory.storage.delete_pubsub_node(None, None, None) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
808 |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
809 def _reset(self) -> defer.Deferred: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
810 return defer.ensureDeferred(self.reset()) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
811 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
812 async def search(self, query: dict) -> List[PubsubItem]: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
813 """Search pubsub items in cache""" |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
814 return await self.host.memory.storage.search_pubsub_items(query) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
815 |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
816 async def serialisable_search(self, query: dict) -> List[dict]: |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
817 """Search pubsub items in cache and returns parsed data |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
818 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
819 The returned data can be serialised. |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
820 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
821 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
822 """ |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
823 items = await self.search(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
824 ret = [] |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
825 for item in items: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
826 parsed = item.parsed |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
827 parsed["pubsub_service"] = item.node.service.full() |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
828 parsed["pubsub_node"] = item.node.name |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
829 if query.get("with_payload"): |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
830 parsed["item_payload"] = item.data.toXml() |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
831 parsed["node_profile"] = self.host.memory.storage.get_profile_by_id( |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
832 item.node.profile_id |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
833 ) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
834 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
835 ret.append(parsed) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
836 return ret |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
837 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
838 def _search(self, query: str) -> defer.Deferred: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
839 query = data_format.deserialise(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
840 services = query.get("services") |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
841 if services: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
842 query["services"] = [jid.JID(s) for s in services] |
4037
524856bd7b19
massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents:
3941
diff
changeset
|
843 d = defer.ensureDeferred(self.serialisable_search(query)) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
844 d.addCallback(data_format.serialise) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
845 return d |