annotate sat/plugins/plugin_pubsub_cache.py @ 3732:0fac164ff2d8

tools (xml_tools): fix `widget_args` modification in `_dataFormField2XMLUIData`: in `textbox` widget, the first arg was modified with all values, lettings other ones unchanger, resulting in invalid number of arguments. This has been fixed by replacing all args with the new value.
author Goffi <goffi@goffi.org>
date Mon, 31 Jan 2022 18:35:52 +0100
parents 342e3ddefd23
children ffa8c8c78115
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
1 #!/usr/bin/env python3
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
2
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
3 # Libervia plugin for PubSub Caching
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
5
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
6 # This program is free software: you can redistribute it and/or modify
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
7 # it under the terms of the GNU Affero General Public License as published by
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
8 # the Free Software Foundation, either version 3 of the License, or
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
9 # (at your option) any later version.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
10
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
11 # This program is distributed in the hope that it will be useful,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
14 # GNU Affero General Public License for more details.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
15
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
16 # You should have received a copy of the GNU Affero General Public License
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
18
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
19 import time
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
20 from datetime import datetime
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
21 from typing import Optional, List, Tuple
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
22 from twisted.words.protocols.jabber import jid, error
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
23 from twisted.words.xish import domish
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
24 from twisted.internet import defer
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
25 from wokkel import pubsub, rsm
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
26 from sat.core.i18n import _
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
27 from sat.core.constants import Const as C
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
28 from sat.core import exceptions
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
29 from sat.core.log import getLogger
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
30 from sat.core.core_types import SatXMPPEntity
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
31 from sat.tools import xml_tools, utils
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
32 from sat.tools.common import data_format
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
34
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
35
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
36 log = getLogger(__name__)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
37
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
38 PLUGIN_INFO = {
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
39 C.PI_NAME: "PubSub Cache",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
42 C.PI_PROTOCOLS: [],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
43 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
44 C.PI_RECOMMENDATIONS: [],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
45 C.PI_MAIN: "PubsubCache",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
46 C.PI_HANDLER: "no",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
47 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
48 }
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
49
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
50 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
51 # maximum of items to cache
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
52 CACHE_LIMIT = 5000
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
53 # number of second before a progress caching is considered failed and tried again
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
54 PROGRESS_DEADLINE = 60 * 60 * 6
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
55
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
56
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
57
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
58 class PubsubCache:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
60 # but it would be necessary to have this data if some devices unsubscribe a cached
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
62 # situation.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
63 # TODO: handle configuration events
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
64
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
65 def __init__(self, host):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
66 log.info(_("PubSub Cache initialization"))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
67 strategy = host.memory.getConfig(None, "pubsub_cache_strategy")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
68 if strategy == "no_cache":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
69 log.info(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
70 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
71 "Pubsub cache won't be used due to pubsub_cache_strategy={value} "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
72 "setting."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
73 ).format(value=repr(strategy))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
74 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
75 self.use_cache = False
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
76 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
77 self.use_cache = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
78 self.host = host
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
79 self._p = host.plugins["XEP-0060"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
80 self.analysers = {}
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
81 # map for caching in progress (node, service) => Deferred
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
82 self.in_progress = {}
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
83 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
84 self._p.addManagedNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
85 "",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
86 items_cb=self.onItemsEvent,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
87 delete_cb=self.onDeleteEvent,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
88 purge_db=self.onPurgeEvent,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
89 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
90 host.bridge.addMethod(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
91 "psCacheGet",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
92 ".plugin",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
93 in_sign="ssiassss",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
94 out_sign="s",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
95 method=self._getItemsFromCache,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
96 async_=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
97 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
98 host.bridge.addMethod(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
99 "psCacheSync",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
100 ".plugin",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
101 "sss",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
102 out_sign="",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
103 method=self._synchronise,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
104 async_=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
105 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
106 host.bridge.addMethod(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
107 "psCachePurge",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
108 ".plugin",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
109 "s",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
110 out_sign="",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
111 method=self._purge,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
112 async_=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
113 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
114 host.bridge.addMethod(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
115 "psCacheReset",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
116 ".plugin",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
117 "",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
118 out_sign="",
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
119 method=self._reset,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
120 async_=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
121 )
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
122 host.bridge.addMethod(
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
123 "psCacheSearch",
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
124 ".plugin",
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
125 "s",
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
126 out_sign="s",
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
127 method=self._search,
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
128 async_=True,
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
129 )
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
130
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
131 def registerAnalyser(self, analyser: dict) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
132 """Register a new pubsub node analyser
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
133
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
134 @param analyser: An analyser is a dictionary which may have the following keys
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
135 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
136 must be used):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
137
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
138 :name (str)*:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
139 a unique name for this analyser. This name will be stored in database
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
140 to retrieve the analyser when necessary (notably to get the parsing method),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
141 thus it is recommended to use a stable name such as the source plugin name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
142 instead of a name which may change with standard evolution, such as the
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
143 feature namespace.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
144
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
145 :type (str)*:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
146 indicates what kind of items we are dealing with. Type must be a human
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
147 readable word, as it may be used in searches. Good types examples are
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
148 **blog** or **event**.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
149
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
150 :node (str):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
151 prefix of a node name which may be used to identify its type. Example:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
152 *urn:xmpp:microblog:0* (a node starting with this name will be identified as
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
153 *blog* node).
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
154
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
155 :namespace (str):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
156 root namespace of items. When analysing a node, the first item will be
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
157 retrieved. The analyser will be chosen its given namespace match the
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
158 namespace of the first child element of ``<item>`` element.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
159
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
160 :to_sync (bool):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
161 if True, the node must be synchronised in cache. The default False value
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
162 means that the pubsub service will always be requested.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
163
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
164 :parser (callable):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
165 method (which may be sync, a coroutine or a method returning a "Deferred")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
166 to call to parse the ``domish.Element`` of the item. The result must be
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
167 dictionary which can be serialised to JSON.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
168
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
169 The method must have the following signature:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
170
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
171 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
172 service: Optional[jid.JID], node: Optional[str]) \
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
173 -> dict
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
174 :noindex:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
175
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
176 :match_cb (callable):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
177 method (which may be sync, a coroutine or a method returning a "Deferred")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
178 called when the analyser matches. The method is called with the curreny
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
179 analyse which is can modify **in-place**.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
180
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
181 The method must have the following signature:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
182
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
183 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
184 :noindex:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
185
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
186 @raise exceptions.Conflict: a analyser with this name already exists
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
187 """
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
188
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
189 name = analyser.get("name", "").strip().lower()
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
190 # we want the normalised name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
191 analyser["name"] = name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
192 if not name:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
193 raise ValueError('"name" is mandatory in analyser')
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
194 if "type" not in analyser:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
195 raise ValueError('"type" is mandatory in analyser')
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
196 type_test_keys = {"node", "namespace"}
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
197 if not type_test_keys.intersection(analyser):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
198 raise ValueError(f'at least one of {type_test_keys} must be used')
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
199 if name in self.analysers:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
200 raise exceptions.Conflict(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
201 f"An analyser with the name {name!r} is already registered"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
202 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
203 self.analysers[name] = analyser
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
204
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
205 async def cacheItems(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
206 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
207 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
208 pubsub_node: PubsubNode,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
209 items: List[domish.Element]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
210 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
211 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
212 parser = self.analysers[pubsub_node.analyser].get("parser")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
213 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
214 parser = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
215
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
216 if parser is not None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
217 parsed_items = [
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
218 await utils.asDeferred(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
219 parser,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
220 client,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
221 item,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
222 pubsub_node.service,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
223 pubsub_node.name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
224 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
225 for item in items
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
226 ]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
227 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
228 parsed_items = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
229
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
230 await self.host.memory.storage.cachePubsubItems(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
231 client, pubsub_node, items, parsed_items
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
232 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
233
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
234 async def _cacheNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
235 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
236 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
237 pubsub_node: PubsubNode
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
238 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
239 await self.host.memory.storage.updatePubsubNodeSyncState(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
240 pubsub_node, SyncState.IN_PROGRESS
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
241 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
242 service, node = pubsub_node.service, pubsub_node.name
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
243 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
244 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
245 f"Caching node {node!r} at {service} for {client.profile}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
246 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
247 if not pubsub_node.subscribed:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
248 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
249 sub = await self._p.subscribe(client, service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
250 except Exception as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
251 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
252 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
253 "Can't subscribe node {pubsub_node}, that means that "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
254 "synchronisation can't be maintained: {reason}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
255 ).format(pubsub_node=pubsub_node, reason=e)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
256 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
257 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
258 if sub.state == "subscribed":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
259 sub_id = sub.subscriptionIdentifier
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
260 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
261 f"{pubsub_node} subscribed (subscription id: {sub_id!r})"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
262 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
263 pubsub_node.subscribed = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
264 await self.host.memory.storage.add(pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
265 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
266 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
267 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
268 "{pubsub_node} is not subscribed, that means that "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
269 "synchronisation can't be maintained, and you may have "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
270 "to enforce subscription manually. Subscription state: "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
271 "{state}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
272 ).format(pubsub_node=pubsub_node, state=sub.state)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
273 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
274
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
275 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
276 await self.host.checkFeatures(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
277 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
278 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
279 except exceptions.FeatureNotFound:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
280 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
281 f"service {service} doesn't handle Result Set Management "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
282 "(XEP-0059), we'll only cache latest 20 items"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
283 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
284 items, __ = await client.pubsub_client.items(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
285 pubsub_node.service, pubsub_node.name, maxItems=20
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
286 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
287 await self.cacheItems(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
288 client, pubsub_node, items
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
289 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
290 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
291 rsm_p = self.host.plugins["XEP-0059"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
292 rsm_request = rsm.RSMRequest()
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
293 cached_ids = set()
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
294 while True:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
295 items, rsm_response = await client.pubsub_client.items(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
296 service, node, rsm_request=rsm_request
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
297 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
298 await self.cacheItems(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
299 client, pubsub_node, items
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
300 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
301 for item in items:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
302 item_id = item["id"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
303 if item_id in cached_ids:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
304 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
305 f"Pubsub node {node!r} at {service} is returning several "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
306 f"times the same item ({item_id!r}). This is illegal "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
307 "behaviour, and it means that Pubsub service "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
308 f"{service} is buggy and can't be cached properly. "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
309 f"Please report this to {service.host} administrators"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
310 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
311 rsm_request = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
312 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
313 cached_ids.add(item["id"])
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
314 if len(cached_ids) >= CACHE_LIMIT:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
315 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
316 f"Pubsub node {node!r} at {service} contains more items "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
317 f"than the cache limit ({CACHE_LIMIT}). We stop "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
318 "caching here, at item {item['id']!r}."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
319 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
320 rsm_request = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
321 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
322 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
323 if rsm_request is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
324 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
325
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
326 await self.host.memory.storage.updatePubsubNodeSyncState(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
327 pubsub_node, SyncState.COMPLETED
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
328 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
329 except Exception as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
330 import traceback
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
331 tb = traceback.format_tb(e.__traceback__)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
332 log.error(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
333 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
334 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
335 await self.host.memory.storage.updatePubsubNodeSyncState(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
336 pubsub_node, SyncState.ERROR
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
337 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
338 await self.host.memory.storage.deletePubsubItems(pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
339 raise e
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
340
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
341 def _cacheNodeClean(self, __, pubsub_node):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
342 del self.in_progress[(pubsub_node.service, pubsub_node.name)]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
343
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
344 def cacheNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
345 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
346 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
347 pubsub_node: PubsubNode
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
348 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
349 """Launch node caching as a background task"""
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
350 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
351 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
352 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
353 return d
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
354
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
355 async def analyseNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
356 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
357 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
358 service: jid.JID,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
359 node: str,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
360 pubsub_node : PubsubNode = None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
361 ) -> dict:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
362 """Use registered analysers on a node to determine what it is used for"""
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
363 analyse = {"service": service, "node": node}
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
364 if pubsub_node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
365 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
366 first_item = (await client.pubsub_client.items(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
367 service, node, 1
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
368 ))[0][0]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
369 except IndexError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
370 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
371 except error.StanzaError as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
372 if e.condition == "item-not-found":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
373 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
374 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
375 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
376 f"Can't retrieve last item on node {node!r} at service "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
377 f"{service} for {client.profile}: {e}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
378 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
379 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
380 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
381 uri = first_item.firstChildElement().uri
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
382 except Exception as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
383 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
384 f"Can't retrieve item namespace on node {node!r} at service "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
385 f"{service} for {client.profile}: {e}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
386 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
387 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
388 analyse["namespace"] = uri
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
389 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
390 conf = await self._p.getConfiguration(client, service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
391 except Exception as e:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
392 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
393 f"Can't retrieve configuration for node {node!r} at service {service} "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
394 f"for {client.profile}: {e}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
395 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
396 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
397 analyse["conf"] = conf
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
398
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
399 for analyser in self.analysers.values():
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
400 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
401 an_node = analyser["node"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
402 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
403 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
404 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
405 if node.startswith(an_node):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
406 for key in ANALYSER_KEYS_TO_COPY:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
407 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
408 analyse[key] = analyser[key]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
409 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
410 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
411 found = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
412 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
413 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
414 namespace = analyse["namespace"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
415 an_namespace = analyser["namespace"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
416 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
417 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
418 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
419 if namespace == an_namespace:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
420 for key in ANALYSER_KEYS_TO_COPY:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
421 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
422 analyse[key] = analyser[key]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
423 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
424 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
425 found = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
426 break
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
427
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
428 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
429 found = False
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
430 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
431 f"node {node!r} at service {service} doesn't match any known type"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
432 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
433 if found:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
434 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
435 match_cb = analyser["match_cb"]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
436 except KeyError:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
437 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
438 else:
3619
32181a45d54b plugin pubsub cache: use `asDeferred` with `match_cb`
Goffi <goffi@goffi.org>
parents: 3597
diff changeset
439 await utils.asDeferred(match_cb, client, analyse)
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
440 return analyse
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
441
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
442 def _getItemsFromCache(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
443 self, service="", node="", max_items=10, item_ids=None, sub_id=None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
444 extra="", profile_key=C.PROF_KEY_NONE
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
445 ):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
446 d = defer.ensureDeferred(self._aGetItemsFromCache(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
447 service, node, max_items, item_ids, sub_id, extra, profile_key
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
448 ))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
449 d.addCallback(self._p.transItemsData)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
450 d.addCallback(self._p.serialiseItems)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
451 return d
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
452
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
453 async def _aGetItemsFromCache(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
454 self, service, node, max_items, item_ids, sub_id, extra, profile_key
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
455 ):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
456 client = self.host.getClient(profile_key)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
457 service = jid.JID(service) if service else client.jid.userhostJID()
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
458 pubsub_node = await self.host.memory.storage.getPubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
459 client, service, node
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
460 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
461 if pubsub_node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
462 raise exceptions.NotFound(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
463 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
464 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
465 max_items = None if max_items == C.NO_LIMIT else max_items
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
466 extra = self._p.parseExtra(data_format.deserialise(extra))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
467 items, metadata = await self.getItemsFromCache(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
468 client,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
469 pubsub_node,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
470 max_items,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
471 item_ids,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
472 sub_id or None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
473 extra.rsm_request,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
474 extra.extra,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
475 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
476 return [i.data for i in items], metadata
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
477
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
478 async def getItemsFromCache(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
479 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
480 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
481 node: PubsubNode,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
482 max_items: Optional[int] = None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
483 item_ids: Optional[List[str]] = None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
484 sub_id: Optional[str] = None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
485 rsm_request: Optional[rsm.RSMRequest] = None,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
486 extra: Optional[dict] = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
487 ) -> Tuple[List[PubsubItem], dict]:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
488 """Get items from cache, using same arguments as for external Pubsub request"""
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
489 if "mam" in extra:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
490 raise NotImplementedError("MAM queries are not supported yet")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
491 if max_items is None and rsm_request is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
492 max_items = 20
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
493 if max_items is not None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
494 if rsm_request is not None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
495 raise exceptions.InternalError(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
496 "Pubsub max items and RSM must not be used at the same time"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
497 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
498 elif item_ids is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
499 raise exceptions.InternalError(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
500 "Pubsub max items and item IDs must not be used at the same time"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
501 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
502 pubsub_items, metadata = await self.host.memory.storage.getItems(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
503 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
504 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
505 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
506 desc = False
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
507 if rsm_request.before == "":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
508 before = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
509 desc = True
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
510 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
511 before = rsm_request.before
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
512 pubsub_items, metadata = await self.host.memory.storage.getItems(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
513 node, max_items=rsm_request.max, before=before, after=rsm_request.after,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
514 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
515 desc=desc, force_rsm=True,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
516 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
517
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
518 return pubsub_items, metadata
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
519
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
520 async def onItemsEvent(self, client, event):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
521 node = await self.host.memory.storage.getPubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
522 client, event.sender, event.nodeIdentifier
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
523 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
524 if node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
525 return
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
526 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
527 items = []
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
528 retract_ids = []
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
529 for elt in event.items:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
530 if elt.name == "item":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
531 items.append(elt)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
532 elif elt.name == "retract":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
533 item_id = elt.getAttribute("id")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
534 if not item_id:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
535 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
536 "Ignoring invalid retract item element: "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
537 f"{xml_tools.pFmtElt(elt)}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
538 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
539 continue
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
540
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
541 retract_ids.append(elt["id"])
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
542 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
543 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
544 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
545 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
546 if items:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
547 log.debug("caching new items received from {node}")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
548 await self.cacheItems(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
549 client, node, items
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
550 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
551 if retract_ids:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
552 log.debug(f"deleting retracted items from {node}")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
553 await self.host.memory.storage.deletePubsubItems(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
554 node, items_names=retract_ids
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
555 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
556
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
557 async def onDeleteEvent(self, client, event):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
558 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
559 f"deleting node {event.nodeIdentifier} from {event.sender} for "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
560 f"{client.profile}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
561 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
562 await self.host.memory.storage.deletePubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
563 [client.profile], [event.sender], [event.nodeIdentifier]
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
564 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
565
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
566 async def onPurgeEvent(self, client, event):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
567 node = await self.host.memory.storage.getPubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
568 client, event.sender, event.nodeIdentifier
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
569 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
570 if node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
571 return
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
572 log.debug(f"purging node {node} for {client.profile}")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
573 await self.host.memory.storage.deletePubsubItems(node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
574
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
575 async def _getItemsTrigger(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
576 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
577 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
578 service: Optional[jid.JID],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
579 node: str,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
580 max_items: Optional[int],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
581 item_ids: Optional[List[str]],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
582 sub_id: Optional[str],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
583 rsm_request: Optional[rsm.RSMRequest],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
584 extra: dict
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
585 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
586 if not self.use_cache:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
587 log.debug("cache disabled in settings")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
588 return True, None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
589 if extra.get(C.KEY_USE_CACHE) == False:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
590 log.debug("skipping pubsub cache as requested")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
591 return True, None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
592 if service is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
593 service = client.jid.userhostJID()
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
594 pubsub_node = await self.host.memory.storage.getPubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
595 client, service, node
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
596 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
597 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
598 analyse = {"to_sync": True}
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
599 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
600 analyse = await self.analyseNode(client, service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
601
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
602 if pubsub_node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
603 pubsub_node = await self.host.memory.storage.setPubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
604 client,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
605 service,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
606 node,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
607 analyser=analyse.get("name"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
608 type_=analyse.get("type"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
609 subtype=analyse.get("subtype"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
610 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
611
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
612 if analyse.get("to_sync"):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
613 if pubsub_node.sync_state == SyncState.COMPLETED:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
614 if "mam" in extra:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
615 log.debug("MAM caching is not supported yet, skipping cache")
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
616 return True, None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
617 pubsub_items, metadata = await self.getItemsFromCache(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
618 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
619 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
620 return False, ([i.data for i in pubsub_items], metadata)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
621
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
622 if pubsub_node.sync_state == SyncState.IN_PROGRESS:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
623 if (service, node) not in self.in_progress:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
624 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
625 f"{pubsub_node} is reported as being cached, but not caching is "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
626 "in progress, this is most probably due to the backend being "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
627 "restarted. Resetting the status, caching will be done again."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
628 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
629 pubsub_node.sync_state = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
630 await self.host.memory.storage.deletePubsubItems(pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
631 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
632 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
633 f"{pubsub_node} is in progress for too long "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
634 f"({pubsub_node.sync_state_updated//60} minutes), "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
635 "cancelling it and retrying."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
636 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
637 self.in_progress.pop[(service, node)].cancel()
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
638 pubsub_node.sync_state = None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
639 await self.host.memory.storage.deletePubsubItems(pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
640 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
641 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
642 f"{pubsub_node} synchronisation is already in progress, skipping"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
643 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
644 if pubsub_node.sync_state is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
645 key = (service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
646 if key in self.in_progress:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
647 raise exceptions.InternalError(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
648 f"There is already a caching in progress for {pubsub_node}, this "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
649 "should not happen"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
650 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
651 self.cacheNode(client, pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
652 elif pubsub_node.sync_state == SyncState.ERROR:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
653 log.debug(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
654 f"{pubsub_node} synchronisation has previously failed, skipping"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
655 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
656
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
657 return True, None
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
658
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
659 async def _subscribeTrigger(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
660 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
661 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
662 service: jid.JID,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
663 nodeIdentifier: str,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
664 sub_jid: Optional[jid.JID],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
665 options: Optional[dict],
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
666 subscription: pubsub.Subscription
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
667 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
668 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
669
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
670 async def _unsubscribeTrigger(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
671 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
672 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
673 service: jid.JID,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
674 nodeIdentifier: str,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
675 sub_jid,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
676 subscriptionIdentifier,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
677 sender,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
678 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
679 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
680
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
681 def _synchronise(self, service, node, profile_key):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
682 client = self.host.getClient(profile_key)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
683 service = client.jid.userhostJID() if not service else jid.JID(service)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
684 return defer.ensureDeferred(self.synchronise(client, service, node))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
685
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
686 async def synchronise(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
687 self,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
688 client: SatXMPPEntity,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
689 service: jid.JID,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
690 node: str
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
691 ) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
692 """Synchronise a node with a pubsub service
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
693
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
694 If the node is already synchronised, it will be resynchronised (all items will be
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
695 deleted and re-downloaded).
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
696
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
697 The node will be synchronised even if there is no matching analyser.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
698
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
699 Note that when a node is synchronised, it is automatically subscribed.
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
700 """
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
701 pubsub_node = await self.host.memory.storage.getPubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
702 client, service, node
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
703 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
704 if pubsub_node is None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
705 log.info(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
706 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
707 "Synchronising the new node {node} at {service}"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
708 ).format(node=node, service=service.full)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
709 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
710 analyse = await self.analyseNode(client, service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
711 pubsub_node = await self.host.memory.storage.setPubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
712 client,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
713 service,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
714 node,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
715 analyser=analyse.get("name"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
716 type_=analyse.get("type"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
717 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
718
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
719 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
720 or (service, node) in self.in_progress)):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
721 log.warning(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
722 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
723 "{node} at {service} is already being synchronised, can't do a new "
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
724 "synchronisation."
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
725 ).format(node=node, service=service)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
726 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
727 else:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
728 log.info(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
729 _(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
730 "(Re)Synchronising the node {node} at {service} on user request"
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
731 ).format(node=node, service=service.full)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
732 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
733 # we first delete and recreate the node (will also delete its items)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
734 await self.host.memory.storage.delete(pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
735 analyse = await self.analyseNode(client, service, node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
736 pubsub_node = await self.host.memory.storage.setPubsubNode(
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
737 client,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
738 service,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
739 node,
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
740 analyser=analyse.get("name"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
741 type_=analyse.get("type"),
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
742 )
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
743 # then we can put node in cache
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
744 await self.cacheNode(client, pubsub_node)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
745
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
746 async def purge(self, purge_filters: dict) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
747 """Remove items according to filters
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
748
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
749 filters can have on of the following keys, all are optional:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
750
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
751 :services:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
752 list of JIDs of services from which items must be deleted
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
753 :nodes:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
754 list of node names to delete
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
755 :types:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
756 list of node types to delete
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
757 :subtypes:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
758 list of node subtypes to delete
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
759 :profiles:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
760 list of profiles from which items must be deleted
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
761 :created_before:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
762 datetime before which items must have been created to be deleted
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
763 :created_update:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
764 datetime before which items must have been updated last to be deleted
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
765 """
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
766 purge_filters["names"] = purge_filters.pop("nodes", None)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
767 await self.host.memory.storage.purgePubsubItems(**purge_filters)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
768
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
769 def _purge(self, purge_filters: str) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
770 purge_filters = data_format.deserialise(purge_filters)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
771 for key in "created_before", "updated_before":
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
772 try:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
773 purge_filters[key] = datetime.fromtimestamp(purge_filters[key])
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
774 except (KeyError, TypeError):
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
775 pass
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
776 return defer.ensureDeferred(self.purge(purge_filters))
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
777
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
778 async def reset(self) -> None:
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
779 """Remove ALL nodes and items from cache
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
780
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
781 After calling this method, cache will be refilled progressively as if it where new
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
782 """
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
783 await self.host.memory.storage.deletePubsubNode(None, None, None)
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
784
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
785 def _reset(self) -> defer.Deferred:
3597
5d108ce026d7 plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff changeset
786 return defer.ensureDeferred(self.reset())
3666
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
787
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
788 async def search(self, query: dict) -> List[PubsubItem]:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
789 """Search pubsub items in cache"""
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
790 return await self.host.memory.storage.searchPubsubItems(query)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
791
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
792 async def serialisableSearch(self, query: dict) -> List[dict]:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
793 """Search pubsub items in cache and returns parsed data
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
794
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
795 The returned data can be serialised.
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
796
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
797 "pubsub_service" and "pubsub_name" will be added to each data (both as strings)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
798 """
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
799 items = await self.search(query)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
800 ret = []
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
801 for item in items:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
802 parsed = item.parsed
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
803 parsed["pubsub_service"] = item.node.service.full()
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
804 parsed["pubsub_node"] = item.node.name
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
805 if query.get("with_payload"):
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
806 parsed["item_payload"] = item.data.toXml()
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
807 parsed["node_profile"] = self.host.memory.storage.getProfileById(
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
808 item.node.profile_id
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
809 )
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
810
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
811 ret.append(parsed)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
812 return ret
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
813
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
814 def _search(self, query: str) -> defer.Deferred:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
815 query = data_format.deserialise(query)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
816 services = query.get("services")
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
817 if services:
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
818 query["services"] = [jid.JID(s) for s in services]
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
819 d = defer.ensureDeferred(self.serialisableSearch(query))
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
820 d.addCallback(data_format.serialise)
342e3ddefd23 plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents: 3619
diff changeset
821 return d