annotate libervia/backend/plugins/plugin_pubsub_cache.py @ 4371:ed683d56b64c default tip

test (XEP-0461): some tests for XEP-0461: rel 457
author Goffi <goffi@goffi.org>
date Tue, 06 May 2025 00:34:01 +0200
parents c9626f46b63e
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
1 #!/usr/bin/env python3
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
2
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
3 # Libervia plugin for PubSub Caching
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
5
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
6 # This program is free software: you can redistribute it and/or modify
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
7 # it under the terms of the GNU Affero General Public License as published by
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
8 # the Free Software Foundation, either version 3 of the License, or
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
9 # (at your option) any later version.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
10
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
11 # This program is distributed in the hope that it will be useful,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
14 # GNU Affero General Public License for more details.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
15
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
16 # You should have received a copy of the GNU Affero General Public License
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
18
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
19 import time
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
20 from datetime import datetime
4356
c9626f46b63e plugin XEP-0059: Use Pydantic models for RSM.
Goffi <goffi@goffi.org>
parents: 4270
diff changeset
21 from typing import Optional, List, Tuple, Dict, Any, cast
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
22 from twisted.words.protocols.jabber import jid, error
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
23 from twisted.words.xish import domish
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
24 from twisted.internet import defer
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
25 from wokkel import pubsub, rsm
4071
4b842c1fb686 refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents: 4037
diff changeset
26 from libervia.backend.core.i18n import _
4b842c1fb686 refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents: 4037
diff changeset
27 from libervia.backend.core.constants import Const as C
4b842c1fb686 refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents: 4037
diff changeset
28 from libervia.backend.core import exceptions
4b842c1fb686 refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents: 4037
diff changeset
29 from libervia.backend.core.log import getLogger
4b842c1fb686 refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents: 4037
diff changeset
30 from libervia.backend.core.core_types import SatXMPPEntity
4356
c9626f46b63e plugin XEP-0059: Use Pydantic models for RSM.
Goffi <goffi@goffi.org>
parents: 4270
diff changeset
31 from libervia.backend.plugins.plugin_xep_0059 import XEP_0059, RSMRequest
4071
4b842c1fb686 refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents: 4037
diff changeset
32 from libervia.backend.tools import xml_tools, utils
4b842c1fb686 refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents: 4037
diff changeset
33 from libervia.backend.tools.common import data_format
4b842c1fb686 refactoring: renamed `sat` package to `libervia.backend`
Goffi <goffi@goffi.org>
parents: 4037
diff changeset
34 from libervia.backend.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
35
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
36
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
37 log = getLogger(__name__)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
38
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
39 PLUGIN_INFO = {
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
40 C.PI_NAME: "PubSub Cache",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
41 C.PI_IMPORT_NAME: "PUBSUB_CACHE",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
42 C.PI_TYPE: C.PLUG_TYPE_PUBSUB,
3738
ffa8c8c78115 plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents: 3666
diff changeset
43 C.PI_MODES: C.PLUG_MODE_BOTH,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
44 C.PI_PROTOCOLS: [],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
45 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
46 C.PI_RECOMMENDATIONS: [],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
47 C.PI_MAIN: "PubsubCache",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
48 C.PI_HANDLER: "no",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
49 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
50 }
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
51
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
52 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
53 # maximum of items to cache
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
54 CACHE_LIMIT = 5000
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
55 # number of second before a progress caching is considered failed and tried again
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
56 PROGRESS_DEADLINE = 60 * 60 * 6
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
57
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
58
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
59 class PubsubCache:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
60 # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
61 # but it would be necessary to have this data if some devices unsubscribe a cached
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
62 # node, as we can then get out of sync. A protoXEP could be proposed to fix this
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
63 # situation.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
64 # TODO: handle configuration events
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
65
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
66 def __init__(self, host):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
67 log.info(_("PubSub Cache initialization"))
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
68 strategy = host.memory.config_get(None, "pubsub_cache_strategy")
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
69 if strategy == "no_cache":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
70 log.info(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
71 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
73 "setting."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
74 ).format(value=repr(strategy))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
75 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
76 self.use_cache = False
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
77 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
78 self.use_cache = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
79 self.host = host
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
80 self._p = host.plugins["XEP-0060"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
81 self.analysers = {}
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
82 # map for caching in progress (node, service) => Deferred
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
83 self.in_progress = {}
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
84 self.host.trigger.add("XEP-0060_getItems", self._get_items_trigger)
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
85 self._p.add_managed_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
86 "",
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
87 items_cb=self.on_items_event,
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
88 delete_cb=self.on_delete_event,
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
89 purge_db=self.on_purge_event,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
90 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
91 host.bridge.add_method(
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
92 "ps_cache_get",
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
93 ".plugin",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
94 in_sign="ssiassss",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
95 out_sign="s",
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
96 method=self._get_items_from_cache,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
97 async_=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
98 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
99 host.bridge.add_method(
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
100 "ps_cache_sync",
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
101 ".plugin",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
102 "sss",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
103 out_sign="",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
104 method=self._synchronise,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
105 async_=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
106 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
107 host.bridge.add_method(
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
108 "ps_cache_purge",
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
109 ".plugin",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
110 "s",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
111 out_sign="",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
112 method=self._purge,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
113 async_=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
114 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
115 host.bridge.add_method(
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
116 "ps_cache_reset",
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
117 ".plugin",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
118 "",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
119 out_sign="",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
120 method=self._reset,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
121 async_=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
122 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
123 host.bridge.add_method(
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
124 "ps_cache_search",
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
125 ".plugin",
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
126 "s",
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
127 out_sign="s",
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
128 method=self._search,
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
129 async_=True,
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
130 )
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
131
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
132 def register_analyser(self, analyser: dict) -> None:
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
133 """Register a new pubsub node analyser
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
134
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
135 @param analyser: An analyser is a dictionary which may have the following keys
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
137 must be used):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
138
4249
ba46d6a0ff3a doc: style/typos/URL fixes
Goffi <goffi@goffi.org>
parents: 4221
diff changeset
139 name (str)*
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
140 a unique name for this analyser. This name will be stored in database
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
141 to retrieve the analyser when necessary (notably to get the parsing method),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
142 thus it is recommended to use a stable name such as the source plugin name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
143 instead of a name which may change with standard evolution, such as the
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
144 feature namespace.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
145
4249
ba46d6a0ff3a doc: style/typos/URL fixes
Goffi <goffi@goffi.org>
parents: 4221
diff changeset
146 type (str)*
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
147 indicates what kind of items we are dealing with. Type must be a human
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
148 readable word, as it may be used in searches. Good types examples are
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
149 **blog** or **event**.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
150
4249
ba46d6a0ff3a doc: style/typos/URL fixes
Goffi <goffi@goffi.org>
parents: 4221
diff changeset
151 node (str)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
152 prefix of a node name which may be used to identify its type. Example:
4249
ba46d6a0ff3a doc: style/typos/URL fixes
Goffi <goffi@goffi.org>
parents: 4221
diff changeset
153 *urnxmpp:microblog0* (a node starting with this name will be identified as
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
154 *blog* node).
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
155
4249
ba46d6a0ff3a doc: style/typos/URL fixes
Goffi <goffi@goffi.org>
parents: 4221
diff changeset
156 namespace (str)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
157 root namespace of items. When analysing a node, the first item will be
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
158 retrieved. The analyser will be chosen its given namespace match the
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
159 namespace of the first child element of ``<item>`` element.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
160
4249
ba46d6a0ff3a doc: style/typos/URL fixes
Goffi <goffi@goffi.org>
parents: 4221
diff changeset
161 to_sync (bool)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
162 if True, the node must be synchronised in cache. The default False value
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
163 means that the pubsub service will always be requested.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
164
4249
ba46d6a0ff3a doc: style/typos/URL fixes
Goffi <goffi@goffi.org>
parents: 4221
diff changeset
165 parser (callable)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
166 method (which may be sync, a coroutine or a method returning a "Deferred")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
167 to call to parse the ``domish.Element`` of the item. The result must be
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
168 dictionary which can be serialised to JSON.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
169
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
170 The method must have the following signature:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
171
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
172 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
173 service: Optional[jid.JID], node: Optional[str]) \
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
174 -> dict
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
175 :noindex:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
176
4249
ba46d6a0ff3a doc: style/typos/URL fixes
Goffi <goffi@goffi.org>
parents: 4221
diff changeset
177 match_cb (callable)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
178 method (which may be sync, a coroutine or a method returning a "Deferred")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
179 called when the analyser matches. The method is called with the curreny
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
180 analyse which is can modify **in-place**.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
181
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
182 The method must have the following signature:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
183
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
184 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
185 :noindex:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
186
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
187 @raise exceptions.Conflict: a analyser with this name already exists
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
188 """
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
189
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
190 name = analyser.get("name", "").strip().lower()
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
191 # we want the normalised name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
192 analyser["name"] = name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
193 if not name:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
194 raise ValueError('"name" is mandatory in analyser')
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
195 if "type" not in analyser:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
196 raise ValueError('"type" is mandatory in analyser')
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
197 type_test_keys = {"node", "namespace"}
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
198 if not type_test_keys.intersection(analyser):
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
199 raise ValueError(f"at least one of {type_test_keys} must be used")
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
200 if name in self.analysers:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
201 raise exceptions.Conflict(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
202 f"An analyser with the name {name!r} is already registered"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
203 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
204 self.analysers[name] = analyser
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
205
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
206 async def cache_items(
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
207 self, client: SatXMPPEntity, pubsub_node: PubsubNode, items: List[domish.Element]
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
208 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
209 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
210 parser = self.analysers[pubsub_node.analyser].get("parser")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
211 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
212 parser = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
213
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
214 if parser is not None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
215 parsed_items = [
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
216 await utils.as_deferred(
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
217 parser, client, item, pubsub_node.service, pubsub_node.name
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
218 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
219 for item in items
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
220 ]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
221 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
222 parsed_items = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
223
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
224 await self.host.memory.storage.cache_pubsub_items(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
225 client, pubsub_node, items, parsed_items
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
226 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
227
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
228 async def _cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None:
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
229 await self.host.memory.storage.update_pubsub_node_sync_state(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
230 pubsub_node, SyncState.IN_PROGRESS
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
231 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
232 service, node = pubsub_node.service, pubsub_node.name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
233 try:
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
234 log.debug(f"Caching node {node!r} at {service} for {client.profile}")
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
235 if not pubsub_node.subscribed:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
236 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
237 sub = await self._p.subscribe(client, service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
238 except Exception as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
239 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
240 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
241 "Can't subscribe node {pubsub_node}, that means that "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
242 "synchronisation can't be maintained: {reason}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
243 ).format(pubsub_node=pubsub_node, reason=e)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
244 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
245 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
246 if sub.state == "subscribed":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
247 sub_id = sub.subscriptionIdentifier
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
248 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
249 f"{pubsub_node} subscribed (subscription id: {sub_id!r})"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
250 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
251 pubsub_node.subscribed = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
252 await self.host.memory.storage.add(pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
253 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
254 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
255 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
256 "{pubsub_node} is not subscribed, that means that "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
257 "synchronisation can't be maintained, and you may have "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
258 "to enforce subscription manually. Subscription state: "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
259 "{state}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
260 ).format(pubsub_node=pubsub_node, state=sub.state)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
261 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
262
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
263 try:
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
264 await self.host.check_features(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
265 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
266 )
3759
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
267 except error.StanzaError as e:
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
268 if e.condition == "service-unavailable":
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
269 log.warning(
3854
8a2c46122a11 plugin XEP-0060: fix bad naming of return variable
Goffi <goffi@goffi.org>
parents: 3831
diff changeset
270 "service {service} is hidding disco infos, we'll only cache "
8a2c46122a11 plugin XEP-0060: fix bad naming of return variable
Goffi <goffi@goffi.org>
parents: 3831
diff changeset
271 "latest 20 items"
3759
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
272 )
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
273 items, __ = await client.pubsub_client.items(
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
274 pubsub_node.service, pubsub_node.name, maxItems=20
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
275 )
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
276 await self.cache_items(client, pubsub_node, items)
3759
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
277 else:
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
278 raise e
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
279 except exceptions.FeatureNotFound:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
280 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
281 f"service {service} doesn't handle Result Set Management "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
282 "(XEP-0059), we'll only cache latest 20 items"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
283 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
284 items, __ = await client.pubsub_client.items(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
285 pubsub_node.service, pubsub_node.name, maxItems=20
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
286 )
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
287 await self.cache_items(client, pubsub_node, items)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
288 else:
4356
c9626f46b63e plugin XEP-0059: Use Pydantic models for RSM.
Goffi <goffi@goffi.org>
parents: 4270
diff changeset
289 rsm_p = cast(XEP_0059, self.host.plugins["XEP-0059"])
c9626f46b63e plugin XEP-0059: Use Pydantic models for RSM.
Goffi <goffi@goffi.org>
parents: 4270
diff changeset
290 rsm_request = RSMRequest()
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
291 cached_ids = set()
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
292 while True:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
293 items, rsm_response = await client.pubsub_client.items(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
294 service, node, rsm_request=rsm_request
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
295 )
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
296 await self.cache_items(client, pubsub_node, items)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
297 for item in items:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
298 item_id = item["id"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
299 if item_id in cached_ids:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
300 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
301 f"Pubsub node {node!r} at {service} is returning several "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
302 f"times the same item ({item_id!r}). This is illegal "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
303 "behaviour, and it means that Pubsub service "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
304 f"{service} is buggy and can't be cached properly. "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
305 f"Please report this to {service.host} administrators"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
306 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
307 rsm_request = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
308 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
309 cached_ids.add(item["id"])
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
310 if len(cached_ids) >= CACHE_LIMIT:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
311 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
312 f"Pubsub node {node!r} at {service} contains more items "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
313 f"than the cache limit ({CACHE_LIMIT}). We stop "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
314 "caching here, at item {item['id']!r}."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
315 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
316 rsm_request = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
317 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
318 if rsm_request is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
319 break
4356
c9626f46b63e plugin XEP-0059: Use Pydantic models for RSM.
Goffi <goffi@goffi.org>
parents: 4270
diff changeset
320 rsm_request = rsm_p.get_next_request(rsm_request, rsm_response)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
321
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
322 await self.host.memory.storage.update_pubsub_node_sync_state(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
323 pubsub_node, SyncState.COMPLETED
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
324 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
325 except Exception as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
326 import traceback
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
327
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
328 tb = traceback.format_tb(e.__traceback__)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
329 log.error(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
330 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
331 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
332 await self.host.memory.storage.update_pubsub_node_sync_state(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
333 pubsub_node, SyncState.ERROR
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
334 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
335 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
336 raise e
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
337
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
338 def _cache_node_clean(self, __, pubsub_node):
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
339 del self.in_progress[(pubsub_node.service, pubsub_node.name)]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
340
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
341 def cache_node(self, client: SatXMPPEntity, pubsub_node: PubsubNode) -> None:
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
342 """Launch node caching as a background task"""
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
343 d = defer.ensureDeferred(self._cache_node(client, pubsub_node))
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
344 d.addBoth(self._cache_node_clean, pubsub_node=pubsub_node)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
345 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
346 return d
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
347
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
348 async def analyse_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
349 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
350 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
351 service: jid.JID,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
352 node: str,
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
353 pubsub_node: PubsubNode = None,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
354 ) -> dict:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
355 """Use registered analysers on a node to determine what it is used for"""
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
356 analyse = {"service": service, "node": node}
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
357 if pubsub_node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
358 try:
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
359 first_item = (await client.pubsub_client.items(service, node, 1))[0][0]
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
360 except IndexError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
361 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
362 except error.StanzaError as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
363 if e.condition == "item-not-found":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
364 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
365 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
366 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
367 f"Can't retrieve last item on node {node!r} at service "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
368 f"{service} for {client.profile}: {e}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
369 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
370 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
371 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
372 uri = first_item.firstChildElement().uri
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
373 except Exception as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
374 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
375 f"Can't retrieve item namespace on node {node!r} at service "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
376 f"{service} for {client.profile}: {e}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
377 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
378 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
379 analyse["namespace"] = uri
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
380 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
381 conf = await self._p.getConfiguration(client, service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
382 except Exception as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
383 log.warning(
4221
2b000790b197 plugin XEP-0060: fix incorrect type hint.
Goffi <goffi@goffi.org>
parents: 4071
diff changeset
384 f"Can't retrieve configuration for node {node!r} at service "
2b000790b197 plugin XEP-0060: fix incorrect type hint.
Goffi <goffi@goffi.org>
parents: 4071
diff changeset
385 f"{service} for {client.profile}: {e}"
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
386 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
387 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
388 analyse["conf"] = conf
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
389
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
390 for analyser in self.analysers.values():
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
391 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
392 an_node = analyser["node"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
393 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
394 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
395 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
396 if node.startswith(an_node):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
397 for key in ANALYSER_KEYS_TO_COPY:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
398 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
399 analyse[key] = analyser[key]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
400 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
401 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
402 found = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
403 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
404 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
405 namespace = analyse["namespace"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
406 an_namespace = analyser["namespace"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
407 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
408 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
409 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
410 if namespace == an_namespace:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
411 for key in ANALYSER_KEYS_TO_COPY:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
412 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
413 analyse[key] = analyser[key]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
414 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
415 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
416 found = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
417 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
418
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
419 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
420 found = False
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
421 log.debug(f"node {node!r} at service {service} doesn't match any known type")
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
422 if found:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
423 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
424 match_cb = analyser["match_cb"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
425 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
426 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
427 else:
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
428 await utils.as_deferred(match_cb, client, analyse)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
429 return analyse
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
430
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
431 def _get_items_from_cache(
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
432 self,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
433 service="",
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
434 node="",
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
435 max_items=10,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
436 item_ids=None,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
437 sub_id=None,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
438 extra="",
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
439 profile_key=C.PROF_KEY_NONE,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
440 ):
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
441 d = defer.ensureDeferred(
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
442 self._a_get_items_from_cache(
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
443 service, node, max_items, item_ids, sub_id, extra, profile_key
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
444 )
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
445 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
446 d.addCallback(self._p.trans_items_data)
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
447 d.addCallback(self._p.serialise_items)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
448 return d
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
449
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
450 async def _a_get_items_from_cache(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
451 self, service, node, max_items, item_ids, sub_id, extra, profile_key
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
452 ):
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
453 client = self.host.get_client(profile_key)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
454 service = jid.JID(service) if service else client.jid.userhostJID()
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
455 pubsub_node = await self.host.memory.storage.get_pubsub_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
456 client, service, node
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
457 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
458 if pubsub_node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
459 raise exceptions.NotFound(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
460 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
461 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
462 max_items = None if max_items == C.NO_LIMIT else max_items
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
463 extra = self._p.parse_extra(data_format.deserialise(extra))
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
464 items, metadata = await self.get_items_from_cache(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
465 client,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
466 pubsub_node,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
467 max_items,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
468 item_ids,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
469 sub_id or None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
470 extra.rsm_request,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
471 extra.extra,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
472 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
473 return [i.data for i in items], metadata
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
474
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
475 async def get_items_from_cache(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
476 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
477 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
478 node: PubsubNode,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
479 max_items: Optional[int] = None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
480 item_ids: Optional[List[str]] = None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
481 sub_id: Optional[str] = None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
482 rsm_request: Optional[rsm.RSMRequest] = None,
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
483 extra: Optional[Dict[str, Any]] = None,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
484 ) -> Tuple[List[PubsubItem], dict]:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
485 """Get items from cache, using same arguments as for external Pubsub request"""
3738
ffa8c8c78115 plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents: 3666
diff changeset
486 if extra is None:
ffa8c8c78115 plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents: 3666
diff changeset
487 extra = {}
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
488 if "mam" in extra:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
489 raise NotImplementedError("MAM queries are not supported yet")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
490 if max_items is None and rsm_request is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
491 max_items = 20
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
492 pubsub_items, metadata = await self.host.memory.storage.get_items(
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
493 node,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
494 max_items=max_items,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
495 item_ids=item_ids or None,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
496 order_by=extra.get(C.KEY_ORDER_BY),
3759
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
497 )
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
498 elif max_items is not None:
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
499 if rsm_request is not None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
500 raise exceptions.InternalError(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
501 "Pubsub max items and RSM must not be used at the same time"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
502 )
3759
c4881833cf8a plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents: 3738
diff changeset
503 elif item_ids:
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
504 raise exceptions.InternalError(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
505 "Pubsub max items and item IDs must not be used at the same time"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
506 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
507 pubsub_items, metadata = await self.host.memory.storage.get_items(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
508 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
509 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
510 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
511 desc = False
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
512 if rsm_request.before == "":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
513 before = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
514 desc = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
515 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
516 before = rsm_request.before
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
517 pubsub_items, metadata = await self.host.memory.storage.get_items(
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
518 node,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
519 max_items=rsm_request.max,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
520 before=before,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
521 after=rsm_request.after,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
522 from_index=rsm_request.index,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
523 order_by=extra.get(C.KEY_ORDER_BY),
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
524 desc=desc,
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
525 force_rsm=True,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
526 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
527
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
528 return pubsub_items, metadata
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
529
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
530 async def on_items_event(self, client, event):
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
531 node = await self.host.memory.storage.get_pubsub_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
532 client, event.sender, event.nodeIdentifier
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
533 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
534 if node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
535 return
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
536 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
537 items = []
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
538 retract_ids = []
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
539 for elt in event.items:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
540 if elt.name == "item":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
541 items.append(elt)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
542 elif elt.name == "retract":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
543 item_id = elt.getAttribute("id")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
544 if not item_id:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
545 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
546 "Ignoring invalid retract item element: "
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
547 f"{xml_tools.p_fmt_elt(elt)}"
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
548 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
549 continue
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
550
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
551 retract_ids.append(elt["id"])
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
552 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
553 log.warning(
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
554 f"Unexpected Pubsub event element: {xml_tools.p_fmt_elt(elt)}"
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
555 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
556 if items:
3831
604b6acaee22 plugin pubsub cache: resync in `synchronise` when node's `sync_state` is not set:
Goffi <goffi@goffi.org>
parents: 3760
diff changeset
557 log.debug(f"[{client.profile}] caching new items received from {node}")
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
558 await self.cache_items(client, node, items)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
559 if retract_ids:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
560 log.debug(f"deleting retracted items from {node}")
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
561 await self.host.memory.storage.delete_pubsub_items(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
562 node, items_names=retract_ids
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
563 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
564
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
565 async def on_delete_event(self, client, event):
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
566 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
567 f"deleting node {event.nodeIdentifier} from {event.sender} for "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
568 f"{client.profile}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
569 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
570 await self.host.memory.storage.delete_pubsub_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
571 [client.profile], [event.sender], [event.nodeIdentifier]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
572 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
573
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
574 async def on_purge_event(self, client, event):
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
575 node = await self.host.memory.storage.get_pubsub_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
576 client, event.sender, event.nodeIdentifier
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
577 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
578 if node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
579 return
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
580 log.debug(f"purging node {node} for {client.profile}")
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
581 await self.host.memory.storage.delete_pubsub_items(node)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
582
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
583 async def _get_items_trigger(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
584 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
585 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
586 service: Optional[jid.JID],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
587 node: str,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
588 max_items: Optional[int],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
589 item_ids: Optional[List[str]],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
590 sub_id: Optional[str],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
591 rsm_request: Optional[rsm.RSMRequest],
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
592 extra: dict,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
593 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
594 if not self.use_cache:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
595 log.debug("cache disabled in settings")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
596 return True, None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
597 if extra.get(C.KEY_USE_CACHE) == False:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
598 log.debug("skipping pubsub cache as requested")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
599 return True, None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
600 if service is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
601 service = client.jid.userhostJID()
3941
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
602 for __ in range(5):
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
603 pubsub_node = await self.host.memory.storage.get_pubsub_node(
3941
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
604 client, service, node
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
605 )
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
606 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
607 analyse = {"to_sync": True}
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
608 else:
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
609 analyse = await self.analyse_node(client, service, node)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
610
3941
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
611 if pubsub_node is None:
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
612 try:
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
613 pubsub_node = await self.host.memory.storage.set_pubsub_node(
3941
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
614 client,
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
615 service,
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
616 node,
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
617 analyser=analyse.get("name"),
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
618 type_=analyse.get("type"),
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
619 subtype=analyse.get("subtype"),
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
620 )
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
621 except IntegrityError as e:
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
622 if "unique" in str(e.orig).lower():
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
623 log.debug(
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
624 "race condition on pubsub node creation in cache, trying "
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
625 "again"
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
626 )
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
627 else:
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
628 raise e
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
629 break
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
630 else:
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
631 raise exceptions.InternalError(
036188fff714 plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents: 3934
diff changeset
632 "Too many IntegrityError with UNIQUE constraint, something is going wrong"
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
633 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
634
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
635 if analyse.get("to_sync"):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
636 if pubsub_node.sync_state == SyncState.COMPLETED:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
637 if "mam" in extra:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
638 log.debug("MAM caching is not supported yet, skipping cache")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
639 return True, None
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
640 pubsub_items, metadata = await self.get_items_from_cache(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
641 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
642 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
643 return False, ([i.data for i in pubsub_items], metadata)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
644
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
645 if pubsub_node.sync_state == SyncState.IN_PROGRESS:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
646 if (service, node) not in self.in_progress:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
647 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
648 f"{pubsub_node} is reported as being cached, but not caching is "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
649 "in progress, this is most probably due to the backend being "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
650 "restarted. Resetting the status, caching will be done again."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
651 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
652 pubsub_node.sync_state = None
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
653 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
654 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
655 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
656 f"{pubsub_node} is in progress for too long "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
657 f"({pubsub_node.sync_state_updated//60} minutes), "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
658 "cancelling it and retrying."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
659 )
4356
c9626f46b63e plugin XEP-0059: Use Pydantic models for RSM.
Goffi <goffi@goffi.org>
parents: 4270
diff changeset
660 self.in_progress.pop((service, node)).cancel()
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
661 pubsub_node.sync_state = None
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
662 await self.host.memory.storage.delete_pubsub_items(pubsub_node)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
663 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
664 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
665 f"{pubsub_node} synchronisation is already in progress, skipping"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
666 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
667 if pubsub_node.sync_state is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
668 key = (service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
669 if key in self.in_progress:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
670 raise exceptions.InternalError(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
671 f"There is already a caching in progress for {pubsub_node}, this "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
672 "should not happen"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
673 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
674 self.cache_node(client, pubsub_node)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
675 elif pubsub_node.sync_state == SyncState.ERROR:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
676 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
677 f"{pubsub_node} synchronisation has previously failed, skipping"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
678 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
679
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
680 return True, None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
681
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
682 async def _subscribe_trigger(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
683 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
684 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
685 service: jid.JID,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
686 nodeIdentifier: str,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
687 sub_jid: Optional[jid.JID],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
688 options: Optional[dict],
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
689 subscription: pubsub.Subscription,
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
690 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
691 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
692
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
693 async def _unsubscribe_trigger(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
694 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
695 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
696 service: jid.JID,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
697 nodeIdentifier: str,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
698 sub_jid,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
699 subscriptionIdentifier,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
700 sender,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
701 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
702 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
703
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
704 def _synchronise(self, service, node, profile_key):
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
705 client = self.host.get_client(profile_key)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
706 service = client.jid.userhostJID() if not service else jid.JID(service)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
707 return defer.ensureDeferred(self.synchronise(client, service, node))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
708
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
709 async def synchronise(
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
710 self, client: SatXMPPEntity, service: jid.JID, node: str, resync: bool = True
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
711 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
712 """Synchronise a node with a pubsub service
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
713
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
714 The node will be synchronised even if there is no matching analyser.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
715
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
716 Note that when a node is synchronised, it is automatically subscribed.
3760
74f436e856ff plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents: 3759
diff changeset
717 @param resync: if True and the node is already synchronised, it will be
74f436e856ff plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents: 3759
diff changeset
718 resynchronised (all items will be deleted and re-downloaded).
74f436e856ff plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents: 3759
diff changeset
719
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
720 """
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
721 pubsub_node = await self.host.memory.storage.get_pubsub_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
722 client, service, node
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
723 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
724 if pubsub_node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
725 log.info(
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
726 _("Synchronising the new node {node} at {service}").format(
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
727 node=node, service=service.full
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
728 )
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
729 )
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
730 analyse = await self.analyse_node(client, service, node)
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
731 pubsub_node = await self.host.memory.storage.set_pubsub_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
732 client,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
733 service,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
734 node,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
735 analyser=analyse.get("name"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
736 type_=analyse.get("type"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
737 )
3831
604b6acaee22 plugin pubsub cache: resync in `synchronise` when node's `sync_state` is not set:
Goffi <goffi@goffi.org>
parents: 3760
diff changeset
738 elif not resync and pubsub_node.sync_state is not None:
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
739 # the node exists, nothing to do
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
740 return
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
741
4270
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
742 if (
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
743 pubsub_node.sync_state == SyncState.IN_PROGRESS
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
744 or (service, node) in self.in_progress
0d7bb4df2343 Reformatted code base using black.
Goffi <goffi@goffi.org>
parents: 4249
diff changeset
745 ):
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
746 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
747 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
748 "{node} at {service} is already being synchronised, can't do a new "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
749 "synchronisation."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
750 ).format(node=node, service=service)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
751 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
752 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
753 log.info(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
754 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
755 "(Re)Synchronising the node {node} at {service} on user request"
3934
e345d93fb6e5 plugin OXPS: OpenPGP for XMPP Pubsub implementation:
Goffi <goffi@goffi.org>
parents: 3863
diff changeset
756 ).format(node=node, service=service.full())
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
757 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
758 # we first delete and recreate the node (will also delete its items)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
759 await self.host.memory.storage.delete(pubsub_node)
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
760 analyse = await self.analyse_node(client, service, node)
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
761 pubsub_node = await self.host.memory.storage.set_pubsub_node(
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
762 client,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
763 service,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
764 node,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
765 analyser=analyse.get("name"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
766 type_=analyse.get("type"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
767 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
768 # then we can put node in cache
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
769 await self.cache_node(client, pubsub_node)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
770
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
771 async def purge(self, purge_filters: dict) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
772 """Remove items according to filters
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
773
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
774 filters can have on of the following keys, all are optional:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
775
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
776 :services:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
777 list of JIDs of services from which items must be deleted
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
778 :nodes:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
779 list of node names to delete
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
780 :types:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
781 list of node types to delete
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
782 :subtypes:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
783 list of node subtypes to delete
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
784 :profiles:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
785 list of profiles from which items must be deleted
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
786 :created_before:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
787 datetime before which items must have been created to be deleted
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
788 :created_update:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
789 datetime before which items must have been updated last to be deleted
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
790 """
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
791 purge_filters["names"] = purge_filters.pop("nodes", None)
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
792 await self.host.memory.storage.purge_pubsub_items(**purge_filters)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
793
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
794 def _purge(self, purge_filters: str) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
795 purge_filters = data_format.deserialise(purge_filters)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
796 for key in "created_before", "updated_before":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
797 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
798 purge_filters[key] = datetime.fromtimestamp(purge_filters[key])
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
799 except (KeyError, TypeError):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
800 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
801 return defer.ensureDeferred(self.purge(purge_filters))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
802
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
803 async def reset(self) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
804 """Remove ALL nodes and items from cache
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
805
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
806 After calling this method, cache will be refilled progressively as if it where new
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
807 """
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
808 await self.host.memory.storage.delete_pubsub_node(None, None, None)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
809
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
810 def _reset(self) -> defer.Deferred:
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
811 return defer.ensureDeferred(self.reset())
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
812
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
813 async def search(self, query: dict) -> List[PubsubItem]:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
814 """Search pubsub items in cache"""
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
815 return await self.host.memory.storage.search_pubsub_items(query)
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
816
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
817 async def serialisable_search(self, query: dict) -> List[dict]:
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
818 """Search pubsub items in cache and returns parsed data
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
819
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
820 The returned data can be serialised.
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
821
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
822 "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
823 """
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
824 items = await self.search(query)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
825 ret = []
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
826 for item in items:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
827 parsed = item.parsed
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
828 parsed["pubsub_service"] = item.node.service.full()
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
829 parsed["pubsub_node"] = item.node.name
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
830 if query.get("with_payload"):
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
831 parsed["item_payload"] = item.data.toXml()
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
832 parsed["node_profile"] = self.host.memory.storage.get_profile_by_id(
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
833 item.node.profile_id
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
834 )
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
835
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
836 ret.append(parsed)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
837 return ret
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
838
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
839 def _search(self, query: str) -> defer.Deferred:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
840 query = data_format.deserialise(query)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
841 services = query.get("services")
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
842 if services:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
843 query["services"] = [jid.JID(s) for s in services]
4037
524856bd7b19 massive refactoring to switch from camelCase to snake_case:
Goffi <goffi@goffi.org>
parents: 3941
diff changeset
844 d = defer.ensureDeferred(self.serialisable_search(query))
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
845 d.addCallback(data_format.serialise)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
846 return d