Mercurial > libervia-backend
annotate sat/plugins/plugin_pubsub_cache.py @ 4008:56e5b18f4d06
plugin XEP-0465: log a warning and return empty list/dict when `forbidden` error is received:
this error probably means that the service doesn't support PPS.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 16 Mar 2023 12:33:33 +0100 |
parents | 036188fff714 |
children | 524856bd7b19 |
rev | line source |
---|---|
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
1 #!/usr/bin/env python3 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
2 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
3 # Libervia plugin for PubSub Caching |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
5 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
6 # This program is free software: you can redistribute it and/or modify |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
7 # it under the terms of the GNU Affero General Public License as published by |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
8 # the Free Software Foundation, either version 3 of the License, or |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
9 # (at your option) any later version. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
10 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
11 # This program is distributed in the hope that it will be useful, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
14 # GNU Affero General Public License for more details. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
15 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
16 # You should have received a copy of the GNU Affero General Public License |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
18 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
19 import time |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
20 from datetime import datetime |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
21 from typing import Optional, List, Tuple, Dict, Any |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
22 from twisted.words.protocols.jabber import jid, error |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
23 from twisted.words.xish import domish |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
24 from twisted.internet import defer |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
25 from wokkel import pubsub, rsm |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
26 from sat.core.i18n import _ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
27 from sat.core.constants import Const as C |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
28 from sat.core import exceptions |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
29 from sat.core.log import getLogger |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
30 from sat.core.core_types import SatXMPPEntity |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
31 from sat.tools import xml_tools, utils |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
32 from sat.tools.common import data_format |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState, IntegrityError |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
34 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
35 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
36 log = getLogger(__name__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
37 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
38 PLUGIN_INFO = { |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
39 C.PI_NAME: "PubSub Cache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB, |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
42 C.PI_MODES: C.PLUG_MODE_BOTH, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
43 C.PI_PROTOCOLS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
44 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
45 C.PI_RECOMMENDATIONS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
46 C.PI_MAIN: "PubsubCache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
47 C.PI_HANDLER: "no", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
48 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
49 } |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
50 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
51 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
52 # maximum of items to cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
53 CACHE_LIMIT = 5000 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
54 # number of second before a progress caching is considered failed and tried again |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
55 PROGRESS_DEADLINE = 60 * 60 * 6 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
56 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
57 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
58 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
59 class PubsubCache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
60 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
61 # but it would be necessary to have this data if some devices unsubscribe a cached |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
62 # node, as we can then get out of sync. A protoXEP could be proposed to fix this |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
63 # situation. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
64 # TODO: handle configuration events |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
65 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
66 def __init__(self, host): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
67 log.info(_("PubSub Cache initialization")) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
68 strategy = host.memory.getConfig(None, "pubsub_cache_strategy") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
69 if strategy == "no_cache": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
70 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
71 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
73 "setting." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
74 ).format(value=repr(strategy)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
75 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
76 self.use_cache = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
77 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
78 self.use_cache = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
79 self.host = host |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
80 self._p = host.plugins["XEP-0060"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
81 self.analysers = {} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
82 # map for caching in progress (node, service) => Deferred |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
83 self.in_progress = {} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
84 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
85 self._p.addManagedNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
86 "", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
87 items_cb=self.onItemsEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
88 delete_cb=self.onDeleteEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
89 purge_db=self.onPurgeEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
90 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
91 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
92 "psCacheGet", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
93 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
94 in_sign="ssiassss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
95 out_sign="s", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
96 method=self._getItemsFromCache, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
97 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
98 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
99 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
100 "psCacheSync", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
101 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
102 "sss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
103 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
104 method=self._synchronise, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
105 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
106 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
107 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
108 "psCachePurge", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
109 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
110 "s", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
111 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
112 method=self._purge, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
113 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
114 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
115 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
116 "psCacheReset", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
117 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
118 "", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
119 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
120 method=self._reset, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
121 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
122 ) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
123 host.bridge.addMethod( |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
124 "psCacheSearch", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
125 ".plugin", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
126 "s", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
127 out_sign="s", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
128 method=self._search, |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
129 async_=True, |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
130 ) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
131 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
132 def registerAnalyser(self, analyser: dict) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
133 """Register a new pubsub node analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
134 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
135 @param analyser: An analyser is a dictionary which may have the following keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
137 must be used): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
138 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
139 :name (str)*: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
140 a unique name for this analyser. This name will be stored in database |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
141 to retrieve the analyser when necessary (notably to get the parsing method), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
142 thus it is recommended to use a stable name such as the source plugin name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
143 instead of a name which may change with standard evolution, such as the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
144 feature namespace. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
145 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
146 :type (str)*: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
147 indicates what kind of items we are dealing with. Type must be a human |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
148 readable word, as it may be used in searches. Good types examples are |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
149 **blog** or **event**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
150 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
151 :node (str): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
152 prefix of a node name which may be used to identify its type. Example: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
153 *urn:xmpp:microblog:0* (a node starting with this name will be identified as |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
154 *blog* node). |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
155 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
156 :namespace (str): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
157 root namespace of items. When analysing a node, the first item will be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
158 retrieved. The analyser will be chosen its given namespace match the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
159 namespace of the first child element of ``<item>`` element. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
160 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
161 :to_sync (bool): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
162 if True, the node must be synchronised in cache. The default False value |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
163 means that the pubsub service will always be requested. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
164 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
165 :parser (callable): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
166 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
167 to call to parse the ``domish.Element`` of the item. The result must be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
168 dictionary which can be serialised to JSON. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
169 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
170 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
171 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
172 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
173 service: Optional[jid.JID], node: Optional[str]) \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
174 -> dict |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
175 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
176 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
177 :match_cb (callable): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
178 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
179 called when the analyser matches. The method is called with the curreny |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
180 analyse which is can modify **in-place**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
181 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
182 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
183 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
184 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
185 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
186 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
187 @raise exceptions.Conflict: a analyser with this name already exists |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
188 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
189 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
190 name = analyser.get("name", "").strip().lower() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
191 # we want the normalised name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
192 analyser["name"] = name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
193 if not name: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
194 raise ValueError('"name" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
195 if "type" not in analyser: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
196 raise ValueError('"type" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
197 type_test_keys = {"node", "namespace"} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
198 if not type_test_keys.intersection(analyser): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
199 raise ValueError(f'at least one of {type_test_keys} must be used') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
200 if name in self.analysers: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
201 raise exceptions.Conflict( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
202 f"An analyser with the name {name!r} is already registered" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
203 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
204 self.analysers[name] = analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
205 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
206 async def cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
207 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
208 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
209 pubsub_node: PubsubNode, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
210 items: List[domish.Element] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
211 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
212 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
213 parser = self.analysers[pubsub_node.analyser].get("parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
214 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
215 parser = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
216 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
217 if parser is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
218 parsed_items = [ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
219 await utils.asDeferred( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
220 parser, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
221 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
222 item, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
223 pubsub_node.service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
224 pubsub_node.name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
225 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
226 for item in items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
227 ] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
228 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
229 parsed_items = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
230 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
231 await self.host.memory.storage.cachePubsubItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
232 client, pubsub_node, items, parsed_items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
233 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
234 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
235 async def _cacheNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
236 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
237 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
238 pubsub_node: PubsubNode |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
239 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
240 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
241 pubsub_node, SyncState.IN_PROGRESS |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
242 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
243 service, node = pubsub_node.service, pubsub_node.name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
244 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
245 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
246 f"Caching node {node!r} at {service} for {client.profile}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
247 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
248 if not pubsub_node.subscribed: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
249 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
250 sub = await self._p.subscribe(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
251 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
252 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
253 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
254 "Can't subscribe node {pubsub_node}, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
255 "synchronisation can't be maintained: {reason}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
256 ).format(pubsub_node=pubsub_node, reason=e) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
257 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
258 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
259 if sub.state == "subscribed": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
260 sub_id = sub.subscriptionIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
261 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
262 f"{pubsub_node} subscribed (subscription id: {sub_id!r})" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
263 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
264 pubsub_node.subscribed = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
265 await self.host.memory.storage.add(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
266 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
267 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
268 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
269 "{pubsub_node} is not subscribed, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
270 "synchronisation can't be maintained, and you may have " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
271 "to enforce subscription manually. Subscription state: " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
272 "{state}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
273 ).format(pubsub_node=pubsub_node, state=sub.state) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
274 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
275 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
276 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
277 await self.host.checkFeatures( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
278 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
279 ) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
280 except error.StanzaError as e: |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
281 if e.condition == "service-unavailable": |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
282 log.warning( |
3854
8a2c46122a11
plugin XEP-0060: fix bad naming of return variable
Goffi <goffi@goffi.org>
parents:
3831
diff
changeset
|
283 "service {service} is hidding disco infos, we'll only cache " |
8a2c46122a11
plugin XEP-0060: fix bad naming of return variable
Goffi <goffi@goffi.org>
parents:
3831
diff
changeset
|
284 "latest 20 items" |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
285 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
286 items, __ = await client.pubsub_client.items( |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
287 pubsub_node.service, pubsub_node.name, maxItems=20 |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
288 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
289 await self.cacheItems( |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
290 client, pubsub_node, items |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
291 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
292 else: |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
293 raise e |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
294 except exceptions.FeatureNotFound: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
295 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
296 f"service {service} doesn't handle Result Set Management " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
297 "(XEP-0059), we'll only cache latest 20 items" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
298 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
299 items, __ = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
300 pubsub_node.service, pubsub_node.name, maxItems=20 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
301 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
302 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
303 client, pubsub_node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
304 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
305 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
306 rsm_p = self.host.plugins["XEP-0059"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
307 rsm_request = rsm.RSMRequest() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
308 cached_ids = set() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
309 while True: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
310 items, rsm_response = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
311 service, node, rsm_request=rsm_request |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
312 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
313 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
314 client, pubsub_node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
315 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
316 for item in items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
317 item_id = item["id"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
318 if item_id in cached_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
319 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
320 f"Pubsub node {node!r} at {service} is returning several " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
321 f"times the same item ({item_id!r}). This is illegal " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
322 "behaviour, and it means that Pubsub service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
323 f"{service} is buggy and can't be cached properly. " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
324 f"Please report this to {service.host} administrators" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
325 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
326 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
327 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
328 cached_ids.add(item["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
329 if len(cached_ids) >= CACHE_LIMIT: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
330 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
331 f"Pubsub node {node!r} at {service} contains more items " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
332 f"than the cache limit ({CACHE_LIMIT}). We stop " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
333 "caching here, at item {item['id']!r}." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
334 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
335 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
336 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
337 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
338 if rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
339 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
340 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
341 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
342 pubsub_node, SyncState.COMPLETED |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
343 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
344 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
345 import traceback |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
346 tb = traceback.format_tb(e.__traceback__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
347 log.error( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
348 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
349 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
350 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
351 pubsub_node, SyncState.ERROR |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
352 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
353 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
354 raise e |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
355 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
356 def _cacheNodeClean(self, __, pubsub_node): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
357 del self.in_progress[(pubsub_node.service, pubsub_node.name)] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
358 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
359 def cacheNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
360 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
361 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
362 pubsub_node: PubsubNode |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
363 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
364 """Launch node caching as a background task""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
365 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
366 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
367 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
368 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
369 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
370 async def analyseNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
371 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
372 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
373 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
374 node: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
375 pubsub_node : PubsubNode = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
376 ) -> dict: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
377 """Use registered analysers on a node to determine what it is used for""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
378 analyse = {"service": service, "node": node} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
379 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
380 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
381 first_item = (await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
382 service, node, 1 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
383 ))[0][0] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
384 except IndexError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
385 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
386 except error.StanzaError as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
387 if e.condition == "item-not-found": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
388 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
389 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
390 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
391 f"Can't retrieve last item on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
392 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
393 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
394 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
395 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
396 uri = first_item.firstChildElement().uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
397 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
398 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
399 f"Can't retrieve item namespace on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
400 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
401 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
402 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
403 analyse["namespace"] = uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
404 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
405 conf = await self._p.getConfiguration(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
406 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
407 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
408 f"Can't retrieve configuration for node {node!r} at service {service} " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
409 f"for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
410 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
411 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
412 analyse["conf"] = conf |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
413 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
414 for analyser in self.analysers.values(): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
415 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
416 an_node = analyser["node"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
417 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
418 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
419 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
420 if node.startswith(an_node): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
421 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
422 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
423 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
424 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
425 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
426 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
427 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
428 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
429 namespace = analyse["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
430 an_namespace = analyser["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
431 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
432 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
433 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
434 if namespace == an_namespace: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
435 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
436 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
437 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
438 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
439 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
440 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
441 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
442 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
443 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
444 found = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
445 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
446 f"node {node!r} at service {service} doesn't match any known type" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
447 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
448 if found: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
449 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
450 match_cb = analyser["match_cb"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
451 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
452 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
453 else: |
3619
32181a45d54b
plugin pubsub cache: use `asDeferred` with `match_cb`
Goffi <goffi@goffi.org>
parents:
3597
diff
changeset
|
454 await utils.asDeferred(match_cb, client, analyse) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
455 return analyse |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
456 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
457 def _getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
458 self, service="", node="", max_items=10, item_ids=None, sub_id=None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
459 extra="", profile_key=C.PROF_KEY_NONE |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
460 ): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
461 d = defer.ensureDeferred(self._aGetItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
462 service, node, max_items, item_ids, sub_id, extra, profile_key |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
463 )) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
464 d.addCallback(self._p.transItemsData) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
465 d.addCallback(self._p.serialiseItems) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
466 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
467 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
468 async def _aGetItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
469 self, service, node, max_items, item_ids, sub_id, extra, profile_key |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
470 ): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
471 client = self.host.getClient(profile_key) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
472 service = jid.JID(service) if service else client.jid.userhostJID() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
473 pubsub_node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
474 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
475 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
476 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
477 raise exceptions.NotFound( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
478 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
479 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
480 max_items = None if max_items == C.NO_LIMIT else max_items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
481 extra = self._p.parseExtra(data_format.deserialise(extra)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
482 items, metadata = await self.getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
483 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
484 pubsub_node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
485 max_items, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
486 item_ids, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
487 sub_id or None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
488 extra.rsm_request, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
489 extra.extra, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
490 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
491 return [i.data for i in items], metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
492 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
493 async def getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
494 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
495 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
496 node: PubsubNode, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
497 max_items: Optional[int] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
498 item_ids: Optional[List[str]] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
499 sub_id: Optional[str] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
500 rsm_request: Optional[rsm.RSMRequest] = None, |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
501 extra: Optional[Dict[str, Any]] = None |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
502 ) -> Tuple[List[PubsubItem], dict]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
503 """Get items from cache, using same arguments as for external Pubsub request""" |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
504 if extra is None: |
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
505 extra = {} |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
506 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
507 raise NotImplementedError("MAM queries are not supported yet") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
508 if max_items is None and rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
509 max_items = 20 |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
510 pubsub_items, metadata = await self.host.memory.storage.getItems( |
3863
c04f5e8a3568
plugin pubsub cache: replace empty list by None in `getItemsFromCache`:
Goffi <goffi@goffi.org>
parents:
3854
diff
changeset
|
511 node, max_items=max_items, item_ids=item_ids or None, |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
512 order_by=extra.get(C.KEY_ORDER_BY) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
513 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
514 elif max_items is not None: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
515 if rsm_request is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
516 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
517 "Pubsub max items and RSM must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
518 ) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
519 elif item_ids: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
520 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
521 "Pubsub max items and item IDs must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
522 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
523 pubsub_items, metadata = await self.host.memory.storage.getItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
524 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
525 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
526 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
527 desc = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
528 if rsm_request.before == "": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
529 before = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
530 desc = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
531 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
532 before = rsm_request.before |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
533 pubsub_items, metadata = await self.host.memory.storage.getItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
534 node, max_items=rsm_request.max, before=before, after=rsm_request.after, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
535 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
536 desc=desc, force_rsm=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
537 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
538 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
539 return pubsub_items, metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
540 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
541 async def onItemsEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
542 node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
543 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
544 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
545 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
546 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
547 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
548 items = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
549 retract_ids = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
550 for elt in event.items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
551 if elt.name == "item": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
552 items.append(elt) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
553 elif elt.name == "retract": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
554 item_id = elt.getAttribute("id") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
555 if not item_id: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
556 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
557 "Ignoring invalid retract item element: " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
558 f"{xml_tools.pFmtElt(elt)}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
559 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
560 continue |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
561 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
562 retract_ids.append(elt["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
563 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
564 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
565 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
566 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
567 if items: |
3831
604b6acaee22
plugin pubsub cache: resync in `synchronise` when node's `sync_state` is not set:
Goffi <goffi@goffi.org>
parents:
3760
diff
changeset
|
568 log.debug(f"[{client.profile}] caching new items received from {node}") |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
569 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
570 client, node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
571 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
572 if retract_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
573 log.debug(f"deleting retracted items from {node}") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
574 await self.host.memory.storage.deletePubsubItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
575 node, items_names=retract_ids |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
576 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
577 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
578 async def onDeleteEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
579 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
580 f"deleting node {event.nodeIdentifier} from {event.sender} for " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
581 f"{client.profile}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
582 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
583 await self.host.memory.storage.deletePubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
584 [client.profile], [event.sender], [event.nodeIdentifier] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
585 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
586 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
587 async def onPurgeEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
588 node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
589 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
590 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
591 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
592 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
593 log.debug(f"purging node {node} for {client.profile}") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
594 await self.host.memory.storage.deletePubsubItems(node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
595 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
596 async def _getItemsTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
597 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
598 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
599 service: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
600 node: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
601 max_items: Optional[int], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
602 item_ids: Optional[List[str]], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
603 sub_id: Optional[str], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
604 rsm_request: Optional[rsm.RSMRequest], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
605 extra: dict |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
606 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
607 if not self.use_cache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
608 log.debug("cache disabled in settings") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
609 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
610 if extra.get(C.KEY_USE_CACHE) == False: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
611 log.debug("skipping pubsub cache as requested") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
612 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
613 if service is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
614 service = client.jid.userhostJID() |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
615 for __ in range(5): |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
616 pubsub_node = await self.host.memory.storage.getPubsubNode( |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
617 client, service, node |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
618 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
619 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
620 analyse = {"to_sync": True} |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
621 else: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
622 analyse = await self.analyseNode(client, service, node) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
623 |
3941
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
624 if pubsub_node is None: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
625 try: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
626 pubsub_node = await self.host.memory.storage.setPubsubNode( |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
627 client, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
628 service, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
629 node, |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
630 analyser=analyse.get("name"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
631 type_=analyse.get("type"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
632 subtype=analyse.get("subtype"), |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
633 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
634 except IntegrityError as e: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
635 if "unique" in str(e.orig).lower(): |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
636 log.debug( |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
637 "race condition on pubsub node creation in cache, trying " |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
638 "again" |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
639 ) |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
640 else: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
641 raise e |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
642 break |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
643 else: |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
644 raise exceptions.InternalError( |
036188fff714
plugin pubsub cache: avoid race condition by retrying node request:
Goffi <goffi@goffi.org>
parents:
3934
diff
changeset
|
645 "Too many IntegrityError with UNIQUE constraint, something is going wrong" |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
646 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
647 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
648 if analyse.get("to_sync"): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
649 if pubsub_node.sync_state == SyncState.COMPLETED: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
650 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
651 log.debug("MAM caching is not supported yet, skipping cache") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
652 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
653 pubsub_items, metadata = await self.getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
654 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
655 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
656 return False, ([i.data for i in pubsub_items], metadata) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
657 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
658 if pubsub_node.sync_state == SyncState.IN_PROGRESS: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
659 if (service, node) not in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
660 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
661 f"{pubsub_node} is reported as being cached, but not caching is " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
662 "in progress, this is most probably due to the backend being " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
663 "restarted. Resetting the status, caching will be done again." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
664 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
665 pubsub_node.sync_state = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
666 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
667 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
668 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
669 f"{pubsub_node} is in progress for too long " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
670 f"({pubsub_node.sync_state_updated//60} minutes), " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
671 "cancelling it and retrying." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
672 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
673 self.in_progress.pop[(service, node)].cancel() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
674 pubsub_node.sync_state = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
675 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
676 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
677 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
678 f"{pubsub_node} synchronisation is already in progress, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
679 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
680 if pubsub_node.sync_state is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
681 key = (service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
682 if key in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
683 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
684 f"There is already a caching in progress for {pubsub_node}, this " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
685 "should not happen" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
686 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
687 self.cacheNode(client, pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
688 elif pubsub_node.sync_state == SyncState.ERROR: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
689 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
690 f"{pubsub_node} synchronisation has previously failed, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
691 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
692 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
693 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
694 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
695 async def _subscribeTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
696 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
697 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
698 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
699 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
700 sub_jid: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
701 options: Optional[dict], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
702 subscription: pubsub.Subscription |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
703 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
704 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
705 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
706 async def _unsubscribeTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
707 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
708 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
709 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
710 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
711 sub_jid, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
712 subscriptionIdentifier, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
713 sender, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
714 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
715 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
716 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
717 def _synchronise(self, service, node, profile_key): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
718 client = self.host.getClient(profile_key) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
719 service = client.jid.userhostJID() if not service else jid.JID(service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
720 return defer.ensureDeferred(self.synchronise(client, service, node)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
721 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
722 async def synchronise( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
723 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
724 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
725 service: jid.JID, |
3760
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
726 node: str, |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
727 resync: bool = True |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
728 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
729 """Synchronise a node with a pubsub service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
730 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
731 The node will be synchronised even if there is no matching analyser. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
732 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
733 Note that when a node is synchronised, it is automatically subscribed. |
3760
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
734 @param resync: if True and the node is already synchronised, it will be |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
735 resynchronised (all items will be deleted and re-downloaded). |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
736 |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
737 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
738 pubsub_node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
739 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
740 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
741 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
742 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
743 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
744 "Synchronising the new node {node} at {service}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
745 ).format(node=node, service=service.full) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
746 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
747 analyse = await self.analyseNode(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
748 pubsub_node = await self.host.memory.storage.setPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
749 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
750 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
751 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
752 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
753 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
754 ) |
3831
604b6acaee22
plugin pubsub cache: resync in `synchronise` when node's `sync_state` is not set:
Goffi <goffi@goffi.org>
parents:
3760
diff
changeset
|
755 elif not resync and pubsub_node.sync_state is not None: |
3760
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
756 # the node exists, nothing to do |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
757 return |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
758 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
759 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
760 or (service, node) in self.in_progress)): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
761 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
762 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
763 "{node} at {service} is already being synchronised, can't do a new " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
764 "synchronisation." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
765 ).format(node=node, service=service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
766 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
767 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
768 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
769 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
770 "(Re)Synchronising the node {node} at {service} on user request" |
3934
e345d93fb6e5
plugin OXPS: OpenPGP for XMPP Pubsub implementation:
Goffi <goffi@goffi.org>
parents:
3863
diff
changeset
|
771 ).format(node=node, service=service.full()) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
772 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
773 # we first delete and recreate the node (will also delete its items) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
774 await self.host.memory.storage.delete(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
775 analyse = await self.analyseNode(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
776 pubsub_node = await self.host.memory.storage.setPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
777 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
778 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
779 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
780 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
781 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
782 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
783 # then we can put node in cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
784 await self.cacheNode(client, pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
785 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
786 async def purge(self, purge_filters: dict) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
787 """Remove items according to filters |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
788 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
789 filters can have on of the following keys, all are optional: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
790 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
791 :services: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
792 list of JIDs of services from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
793 :nodes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
794 list of node names to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
795 :types: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
796 list of node types to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
797 :subtypes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
798 list of node subtypes to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
799 :profiles: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
800 list of profiles from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
801 :created_before: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
802 datetime before which items must have been created to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
803 :created_update: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
804 datetime before which items must have been updated last to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
805 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
806 purge_filters["names"] = purge_filters.pop("nodes", None) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
807 await self.host.memory.storage.purgePubsubItems(**purge_filters) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
808 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
809 def _purge(self, purge_filters: str) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
810 purge_filters = data_format.deserialise(purge_filters) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
811 for key in "created_before", "updated_before": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
812 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
813 purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
814 except (KeyError, TypeError): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
815 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
816 return defer.ensureDeferred(self.purge(purge_filters)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
817 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
818 async def reset(self) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
819 """Remove ALL nodes and items from cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
820 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
821 After calling this method, cache will be refilled progressively as if it where new |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
822 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
823 await self.host.memory.storage.deletePubsubNode(None, None, None) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
824 |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
825 def _reset(self) -> defer.Deferred: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
826 return defer.ensureDeferred(self.reset()) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
827 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
828 async def search(self, query: dict) -> List[PubsubItem]: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
829 """Search pubsub items in cache""" |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
830 return await self.host.memory.storage.searchPubsubItems(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
831 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
832 async def serialisableSearch(self, query: dict) -> List[dict]: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
833 """Search pubsub items in cache and returns parsed data |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
834 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
835 The returned data can be serialised. |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
836 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
837 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
838 """ |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
839 items = await self.search(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
840 ret = [] |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
841 for item in items: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
842 parsed = item.parsed |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
843 parsed["pubsub_service"] = item.node.service.full() |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
844 parsed["pubsub_node"] = item.node.name |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
845 if query.get("with_payload"): |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
846 parsed["item_payload"] = item.data.toXml() |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
847 parsed["node_profile"] = self.host.memory.storage.getProfileById( |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
848 item.node.profile_id |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
849 ) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
850 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
851 ret.append(parsed) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
852 return ret |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
853 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
854 def _search(self, query: str) -> defer.Deferred: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
855 query = data_format.deserialise(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
856 services = query.get("services") |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
857 if services: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
858 query["services"] = [jid.JID(s) for s in services] |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
859 d = defer.ensureDeferred(self.serialisableSearch(query)) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
860 d.addCallback(data_format.serialise) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
861 return d |