Mercurial > libervia-backend
annotate sat/plugins/plugin_pubsub_cache.py @ 3619:32181a45d54b
plugin pubsub cache: use `asDeferred` with `match_cb`
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Aug 2021 21:52:17 +0200 |
parents | 5d108ce026d7 |
children | 342e3ddefd23 |
rev | line source |
---|---|
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
1 #!/usr/bin/env python3 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
2 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
3 # Libervia plugin for PubSub Caching |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
5 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
6 # This program is free software: you can redistribute it and/or modify |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
7 # it under the terms of the GNU Affero General Public License as published by |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
8 # the Free Software Foundation, either version 3 of the License, or |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
9 # (at your option) any later version. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
10 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
11 # This program is distributed in the hope that it will be useful, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
14 # GNU Affero General Public License for more details. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
15 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
16 # You should have received a copy of the GNU Affero General Public License |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
18 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
19 import time |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
20 from datetime import datetime |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
21 from typing import Optional, List, Tuple |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
22 from twisted.words.protocols.jabber import jid, error |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
23 from twisted.words.xish import domish |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
24 from twisted.internet import defer |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
25 from wokkel import pubsub, rsm |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
26 from sat.core.i18n import _ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
27 from sat.core.constants import Const as C |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
28 from sat.core import exceptions |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
29 from sat.core.log import getLogger |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
30 from sat.core.core_types import SatXMPPEntity |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
31 from sat.tools import xml_tools, utils |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
32 from sat.tools.common import data_format |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
34 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
35 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
36 log = getLogger(__name__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
37 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
38 PLUGIN_INFO = { |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
39 C.PI_NAME: "PubSub Cache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
42 C.PI_PROTOCOLS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
43 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
44 C.PI_RECOMMENDATIONS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
45 C.PI_MAIN: "PubsubCache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
46 C.PI_HANDLER: "no", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
47 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
48 } |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
49 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
50 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
51 # maximum of items to cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
52 CACHE_LIMIT = 5000 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
53 # number of second before a progress caching is considered failed and tried again |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
54 PROGRESS_DEADLINE = 60 * 60 * 6 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
55 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
56 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
57 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
58 class PubsubCache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
59 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
60 # but it would be necessary to have this data if some devices unsubscribe a cached |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
61 # node, as we can then get out of sync. A protoXEP could be proposed to fix this |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
62 # situation. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
63 # TODO: handle configuration events |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
64 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
65 def __init__(self, host): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
66 log.info(_("PubSub Cache initialization")) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
67 strategy = host.memory.getConfig(None, "pubsub_cache_strategy") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
68 if strategy == "no_cache": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
69 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
70 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
71 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
72 "setting." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
73 ).format(value=repr(strategy)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
74 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
75 self.use_cache = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
76 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
77 self.use_cache = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
78 self.host = host |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
79 self._p = host.plugins["XEP-0060"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
80 self.analysers = {} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
81 # map for caching in progress (node, service) => Deferred |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
82 self.in_progress = {} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
83 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
84 self._p.addManagedNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
85 "", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
86 items_cb=self.onItemsEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
87 delete_cb=self.onDeleteEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
88 purge_db=self.onPurgeEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
89 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
90 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
91 "psCacheGet", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
92 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
93 in_sign="ssiassss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
94 out_sign="s", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
95 method=self._getItemsFromCache, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
96 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
97 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
98 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
99 "psCacheSync", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
100 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
101 "sss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
102 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
103 method=self._synchronise, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
104 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
105 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
106 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
107 "psCachePurge", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
108 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
109 "s", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
110 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
111 method=self._purge, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
112 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
113 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
114 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
115 "psCacheReset", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
116 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
117 "", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
118 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
119 method=self._reset, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
120 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
121 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
122 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
123 def registerAnalyser(self, analyser: dict) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
124 """Register a new pubsub node analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
125 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
126 @param analyser: An analyser is a dictionary which may have the following keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
127 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
128 must be used): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
129 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
130 :name (str)*: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
131 a unique name for this analyser. This name will be stored in database |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
132 to retrieve the analyser when necessary (notably to get the parsing method), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
133 thus it is recommended to use a stable name such as the source plugin name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
134 instead of a name which may change with standard evolution, such as the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
135 feature namespace. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
136 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
137 :type (str)*: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
138 indicates what kind of items we are dealing with. Type must be a human |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
139 readable word, as it may be used in searches. Good types examples are |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
140 **blog** or **event**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
141 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
142 :node (str): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
143 prefix of a node name which may be used to identify its type. Example: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
144 *urn:xmpp:microblog:0* (a node starting with this name will be identified as |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
145 *blog* node). |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
146 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
147 :namespace (str): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
148 root namespace of items. When analysing a node, the first item will be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
149 retrieved. The analyser will be chosen its given namespace match the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
150 namespace of the first child element of ``<item>`` element. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
151 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
152 :to_sync (bool): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
153 if True, the node must be synchronised in cache. The default False value |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
154 means that the pubsub service will always be requested. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
155 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
156 :parser (callable): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
157 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
158 to call to parse the ``domish.Element`` of the item. The result must be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
159 dictionary which can be serialised to JSON. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
160 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
161 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
162 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
163 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
164 service: Optional[jid.JID], node: Optional[str]) \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
165 -> dict |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
166 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
167 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
168 :match_cb (callable): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
169 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
170 called when the analyser matches. The method is called with the curreny |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
171 analyse which is can modify **in-place**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
172 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
173 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
174 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
175 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
176 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
177 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
178 @raise exceptions.Conflict: a analyser with this name already exists |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
179 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
180 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
181 name = analyser.get("name", "").strip().lower() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
182 # we want the normalised name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
183 analyser["name"] = name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
184 if not name: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
185 raise ValueError('"name" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
186 if "type" not in analyser: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
187 raise ValueError('"type" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
188 type_test_keys = {"node", "namespace"} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
189 if not type_test_keys.intersection(analyser): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
190 raise ValueError(f'at least one of {type_test_keys} must be used') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
191 if name in self.analysers: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
192 raise exceptions.Conflict( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
193 f"An analyser with the name {name!r} is already registered" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
194 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
195 self.analysers[name] = analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
196 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
197 async def cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
198 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
199 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
200 pubsub_node: PubsubNode, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
201 items: List[domish.Element] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
202 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
203 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
204 parser = self.analysers[pubsub_node.analyser].get("parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
205 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
206 parser = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
207 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
208 if parser is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
209 parsed_items = [ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
210 await utils.asDeferred( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
211 parser, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
212 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
213 item, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
214 pubsub_node.service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
215 pubsub_node.name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
216 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
217 for item in items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
218 ] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
219 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
220 parsed_items = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
221 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
222 await self.host.memory.storage.cachePubsubItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
223 client, pubsub_node, items, parsed_items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
224 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
225 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
226 async def _cacheNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
227 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
228 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
229 pubsub_node: PubsubNode |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
230 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
231 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
232 pubsub_node, SyncState.IN_PROGRESS |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
233 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
234 service, node = pubsub_node.service, pubsub_node.name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
235 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
236 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
237 f"Caching node {node!r} at {service} for {client.profile}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
238 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
239 if not pubsub_node.subscribed: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
240 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
241 sub = await self._p.subscribe(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
242 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
243 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
244 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
245 "Can't subscribe node {pubsub_node}, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
246 "synchronisation can't be maintained: {reason}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
247 ).format(pubsub_node=pubsub_node, reason=e) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
248 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
249 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
250 if sub.state == "subscribed": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
251 sub_id = sub.subscriptionIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
252 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
253 f"{pubsub_node} subscribed (subscription id: {sub_id!r})" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
254 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
255 pubsub_node.subscribed = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
256 await self.host.memory.storage.add(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
257 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
258 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
259 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
260 "{pubsub_node} is not subscribed, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
261 "synchronisation can't be maintained, and you may have " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
262 "to enforce subscription manually. Subscription state: " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
263 "{state}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
264 ).format(pubsub_node=pubsub_node, state=sub.state) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
265 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
266 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
267 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
268 await self.host.checkFeatures( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
269 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
270 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
271 except exceptions.FeatureNotFound: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
272 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
273 f"service {service} doesn't handle Result Set Management " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
274 "(XEP-0059), we'll only cache latest 20 items" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
275 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
276 items, __ = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
277 pubsub_node.service, pubsub_node.name, maxItems=20 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
278 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
279 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
280 client, pubsub_node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
281 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
282 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
283 rsm_p = self.host.plugins["XEP-0059"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
284 rsm_request = rsm.RSMRequest() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
285 cached_ids = set() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
286 while True: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
287 items, rsm_response = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
288 service, node, rsm_request=rsm_request |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
289 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
290 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
291 client, pubsub_node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
292 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
293 for item in items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
294 item_id = item["id"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
295 if item_id in cached_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
296 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
297 f"Pubsub node {node!r} at {service} is returning several " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
298 f"times the same item ({item_id!r}). This is illegal " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
299 "behaviour, and it means that Pubsub service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
300 f"{service} is buggy and can't be cached properly. " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
301 f"Please report this to {service.host} administrators" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
302 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
303 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
304 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
305 cached_ids.add(item["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
306 if len(cached_ids) >= CACHE_LIMIT: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
307 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
308 f"Pubsub node {node!r} at {service} contains more items " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
309 f"than the cache limit ({CACHE_LIMIT}). We stop " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
310 "caching here, at item {item['id']!r}." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
311 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
312 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
313 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
314 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
315 if rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
316 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
317 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
318 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
319 pubsub_node, SyncState.COMPLETED |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
320 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
321 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
322 import traceback |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
323 tb = traceback.format_tb(e.__traceback__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
324 log.error( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
325 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
326 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
327 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
328 pubsub_node, SyncState.ERROR |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
329 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
330 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
331 raise e |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
332 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
333 def _cacheNodeClean(self, __, pubsub_node): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
334 del self.in_progress[(pubsub_node.service, pubsub_node.name)] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
335 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
336 def cacheNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
337 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
338 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
339 pubsub_node: PubsubNode |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
340 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
341 """Launch node caching as a background task""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
342 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
343 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
344 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
345 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
346 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
347 async def analyseNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
348 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
349 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
350 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
351 node: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
352 pubsub_node : PubsubNode = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
353 ) -> dict: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
354 """Use registered analysers on a node to determine what it is used for""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
355 analyse = {"service": service, "node": node} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
356 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
357 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
358 first_item = (await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
359 service, node, 1 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
360 ))[0][0] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
361 except IndexError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
362 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
363 except error.StanzaError as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
364 if e.condition == "item-not-found": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
365 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
366 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
367 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
368 f"Can't retrieve last item on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
369 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
370 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
371 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
372 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
373 uri = first_item.firstChildElement().uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
374 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
375 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
376 f"Can't retrieve item namespace on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
377 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
378 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
379 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
380 analyse["namespace"] = uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
381 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
382 conf = await self._p.getConfiguration(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
383 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
384 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
385 f"Can't retrieve configuration for node {node!r} at service {service} " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
386 f"for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
387 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
388 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
389 analyse["conf"] = conf |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
390 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
391 for analyser in self.analysers.values(): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
392 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
393 an_node = analyser["node"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
394 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
395 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
396 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
397 if node.startswith(an_node): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
398 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
399 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
400 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
401 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
402 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
403 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
404 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
405 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
406 namespace = analyse["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
407 an_namespace = analyser["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
408 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
409 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
410 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
411 if namespace == an_namespace: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
412 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
413 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
414 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
415 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
416 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
417 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
418 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
419 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
420 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
421 found = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
422 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
423 f"node {node!r} at service {service} doesn't match any known type" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
424 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
425 if found: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
426 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
427 match_cb = analyser["match_cb"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
428 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
429 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
430 else: |
3619
32181a45d54b
plugin pubsub cache: use `asDeferred` with `match_cb`
Goffi <goffi@goffi.org>
parents:
3597
diff
changeset
|
431 await utils.asDeferred(match_cb, client, analyse) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
432 return analyse |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
433 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
434 def _getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
435 self, service="", node="", max_items=10, item_ids=None, sub_id=None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
436 extra="", profile_key=C.PROF_KEY_NONE |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
437 ): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
438 d = defer.ensureDeferred(self._aGetItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
439 service, node, max_items, item_ids, sub_id, extra, profile_key |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
440 )) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
441 d.addCallback(self._p.transItemsData) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
442 d.addCallback(self._p.serialiseItems) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
443 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
444 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
445 async def _aGetItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
446 self, service, node, max_items, item_ids, sub_id, extra, profile_key |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
447 ): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
448 client = self.host.getClient(profile_key) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
449 service = jid.JID(service) if service else client.jid.userhostJID() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
450 pubsub_node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
451 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
452 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
453 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
454 raise exceptions.NotFound( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
455 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
456 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
457 max_items = None if max_items == C.NO_LIMIT else max_items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
458 extra = self._p.parseExtra(data_format.deserialise(extra)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
459 items, metadata = await self.getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
460 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
461 pubsub_node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
462 max_items, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
463 item_ids, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
464 sub_id or None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
465 extra.rsm_request, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
466 extra.extra, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
467 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
468 return [i.data for i in items], metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
469 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
470 async def getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
471 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
472 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
473 node: PubsubNode, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
474 max_items: Optional[int] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
475 item_ids: Optional[List[str]] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
476 sub_id: Optional[str] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
477 rsm_request: Optional[rsm.RSMRequest] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
478 extra: Optional[dict] = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
479 ) -> Tuple[List[PubsubItem], dict]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
480 """Get items from cache, using same arguments as for external Pubsub request""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
481 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
482 raise NotImplementedError("MAM queries are not supported yet") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
483 if max_items is None and rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
484 max_items = 20 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
485 if max_items is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
486 if rsm_request is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
487 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
488 "Pubsub max items and RSM must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
489 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
490 elif item_ids is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
491 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
492 "Pubsub max items and item IDs must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
493 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
494 pubsub_items, metadata = await self.host.memory.storage.getItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
495 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
496 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
497 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
498 desc = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
499 if rsm_request.before == "": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
500 before = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
501 desc = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
502 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
503 before = rsm_request.before |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
504 pubsub_items, metadata = await self.host.memory.storage.getItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
505 node, max_items=rsm_request.max, before=before, after=rsm_request.after, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
506 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
507 desc=desc, force_rsm=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
508 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
509 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
510 return pubsub_items, metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
511 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
512 async def onItemsEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
513 node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
514 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
515 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
516 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
517 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
518 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
519 items = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
520 retract_ids = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
521 for elt in event.items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
522 if elt.name == "item": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
523 items.append(elt) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
524 elif elt.name == "retract": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
525 item_id = elt.getAttribute("id") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
526 if not item_id: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
527 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
528 "Ignoring invalid retract item element: " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
529 f"{xml_tools.pFmtElt(elt)}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
530 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
531 continue |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
532 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
533 retract_ids.append(elt["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
534 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
535 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
536 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
537 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
538 if items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
539 log.debug("caching new items received from {node}") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
540 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
541 client, node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
542 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
543 if retract_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
544 log.debug(f"deleting retracted items from {node}") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
545 await self.host.memory.storage.deletePubsubItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
546 node, items_names=retract_ids |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
547 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
548 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
549 async def onDeleteEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
550 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
551 f"deleting node {event.nodeIdentifier} from {event.sender} for " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
552 f"{client.profile}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
553 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
554 await self.host.memory.storage.deletePubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
555 [client.profile], [event.sender], [event.nodeIdentifier] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
556 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
557 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
558 async def onPurgeEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
559 node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
560 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
561 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
562 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
563 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
564 log.debug(f"purging node {node} for {client.profile}") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
565 await self.host.memory.storage.deletePubsubItems(node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
566 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
567 async def _getItemsTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
568 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
569 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
570 service: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
571 node: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
572 max_items: Optional[int], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
573 item_ids: Optional[List[str]], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
574 sub_id: Optional[str], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
575 rsm_request: Optional[rsm.RSMRequest], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
576 extra: dict |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
577 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
578 if not self.use_cache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
579 log.debug("cache disabled in settings") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
580 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
581 if extra.get(C.KEY_USE_CACHE) == False: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
582 log.debug("skipping pubsub cache as requested") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
583 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
584 if service is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
585 service = client.jid.userhostJID() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
586 pubsub_node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
587 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
588 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
589 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
590 analyse = {"to_sync": True} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
591 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
592 analyse = await self.analyseNode(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
593 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
594 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
595 pubsub_node = await self.host.memory.storage.setPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
596 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
597 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
598 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
599 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
600 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
601 subtype=analyse.get("subtype"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
602 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
603 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
604 if analyse.get("to_sync"): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
605 if pubsub_node.sync_state == SyncState.COMPLETED: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
606 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
607 log.debug("MAM caching is not supported yet, skipping cache") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
608 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
609 pubsub_items, metadata = await self.getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
610 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
611 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
612 return False, ([i.data for i in pubsub_items], metadata) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
613 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
614 if pubsub_node.sync_state == SyncState.IN_PROGRESS: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
615 if (service, node) not in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
616 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
617 f"{pubsub_node} is reported as being cached, but not caching is " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
618 "in progress, this is most probably due to the backend being " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
619 "restarted. Resetting the status, caching will be done again." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
620 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
621 pubsub_node.sync_state = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
622 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
623 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
624 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
625 f"{pubsub_node} is in progress for too long " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
626 f"({pubsub_node.sync_state_updated//60} minutes), " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
627 "cancelling it and retrying." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
628 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
629 self.in_progress.pop[(service, node)].cancel() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
630 pubsub_node.sync_state = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
631 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
632 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
633 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
634 f"{pubsub_node} synchronisation is already in progress, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
635 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
636 if pubsub_node.sync_state is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
637 key = (service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
638 if key in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
639 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
640 f"There is already a caching in progress for {pubsub_node}, this " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
641 "should not happen" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
642 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
643 self.cacheNode(client, pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
644 elif pubsub_node.sync_state == SyncState.ERROR: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
645 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
646 f"{pubsub_node} synchronisation has previously failed, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
647 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
648 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
649 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
650 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
651 async def _subscribeTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
652 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
653 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
654 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
655 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
656 sub_jid: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
657 options: Optional[dict], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
658 subscription: pubsub.Subscription |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
659 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
660 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
661 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
662 async def _unsubscribeTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
663 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
664 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
665 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
666 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
667 sub_jid, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
668 subscriptionIdentifier, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
669 sender, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
670 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
671 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
672 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
673 def _synchronise(self, service, node, profile_key): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
674 client = self.host.getClient(profile_key) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
675 service = client.jid.userhostJID() if not service else jid.JID(service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
676 return defer.ensureDeferred(self.synchronise(client, service, node)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
677 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
678 async def synchronise( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
679 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
680 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
681 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
682 node: str |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
683 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
684 """Synchronise a node with a pubsub service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
685 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
686 If the node is already synchronised, it will be resynchronised (all items will be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
687 deleted and re-downloaded). |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
688 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
689 The node will be synchronised even if there is no matching analyser. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
690 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
691 Note that when a node is synchronised, it is automatically subscribed. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
692 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
693 pubsub_node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
694 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
695 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
696 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
697 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
698 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
699 "Synchronising the new node {node} at {service}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
700 ).format(node=node, service=service.full) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
701 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
702 analyse = await self.analyseNode(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
703 pubsub_node = await self.host.memory.storage.setPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
704 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
705 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
706 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
707 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
708 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
709 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
710 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
711 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
712 or (service, node) in self.in_progress)): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
713 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
714 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
715 "{node} at {service} is already being synchronised, can't do a new " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
716 "synchronisation." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
717 ).format(node=node, service=service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
718 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
719 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
720 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
721 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
722 "(Re)Synchronising the node {node} at {service} on user request" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
723 ).format(node=node, service=service.full) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
724 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
725 # we first delete and recreate the node (will also delete its items) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
726 await self.host.memory.storage.delete(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
727 analyse = await self.analyseNode(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
728 pubsub_node = await self.host.memory.storage.setPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
729 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
730 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
731 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
732 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
733 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
734 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
735 # then we can put node in cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
736 await self.cacheNode(client, pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
737 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
738 async def purge(self, purge_filters: dict) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
739 """Remove items according to filters |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
740 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
741 filters can have on of the following keys, all are optional: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
742 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
743 :services: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
744 list of JIDs of services from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
745 :nodes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
746 list of node names to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
747 :types: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
748 list of node types to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
749 :subtypes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
750 list of node subtypes to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
751 :profiles: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
752 list of profiles from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
753 :created_before: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
754 datetime before which items must have been created to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
755 :created_update: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
756 datetime before which items must have been updated last to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
757 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
758 purge_filters["names"] = purge_filters.pop("nodes", None) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
759 await self.host.memory.storage.purgePubsubItems(**purge_filters) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
760 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
761 def _purge(self, purge_filters: str) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
762 purge_filters = data_format.deserialise(purge_filters) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
763 for key in "created_before", "updated_before": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
764 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
765 purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
766 except (KeyError, TypeError): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
767 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
768 return defer.ensureDeferred(self.purge(purge_filters)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
769 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
770 async def reset(self) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
771 """Remove ALL nodes and items from cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
772 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
773 After calling this method, cache will be refilled progressively as if it where new |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
774 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
775 await self.host.memory.storage.deletePubsubNode(None, None, None) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
776 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
777 def _reset(self) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
778 return defer.ensureDeferred(self.reset()) |