Mercurial > libervia-backend
annotate sat/plugins/plugin_pubsub_cache.py @ 3792:865167c34b82
comp AP gateway: convert pubsub item retractation to AP `Delete` activity:
when an item `retract` is received from a node from which an AP actor is subscribed, a
corresponding `Delete` activity is created and sent to AP followers.
Trying to retract an item from a virtual node of the gateway pubsub service now return a
`forbidden` stanza error (those nodes are handled on AP side and thus are not owned by
XMPP entities).
rel 367
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 17 Jun 2022 14:15:23 +0200 |
parents | 74f436e856ff |
children | 604b6acaee22 |
rev | line source |
---|---|
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
1 #!/usr/bin/env python3 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
2 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
3 # Libervia plugin for PubSub Caching |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
5 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
6 # This program is free software: you can redistribute it and/or modify |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
7 # it under the terms of the GNU Affero General Public License as published by |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
8 # the Free Software Foundation, either version 3 of the License, or |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
9 # (at your option) any later version. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
10 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
11 # This program is distributed in the hope that it will be useful, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
14 # GNU Affero General Public License for more details. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
15 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
16 # You should have received a copy of the GNU Affero General Public License |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
18 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
19 import time |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
20 from datetime import datetime |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
21 from typing import Optional, List, Tuple, Dict, Any |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
22 from twisted.words.protocols.jabber import jid, error |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
23 from twisted.words.xish import domish |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
24 from twisted.internet import defer |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
25 from wokkel import pubsub, rsm |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
26 from sat.core.i18n import _ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
27 from sat.core.constants import Const as C |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
28 from sat.core import exceptions |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
29 from sat.core.log import getLogger |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
30 from sat.core.core_types import SatXMPPEntity |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
31 from sat.tools import xml_tools, utils |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
32 from sat.tools.common import data_format |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
33 from sat.memory.sqla import PubsubNode, PubsubItem, SyncState |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
34 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
35 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
36 log = getLogger(__name__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
37 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
38 PLUGIN_INFO = { |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
39 C.PI_NAME: "PubSub Cache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
40 C.PI_IMPORT_NAME: "PUBSUB_CACHE", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
41 C.PI_TYPE: C.PLUG_TYPE_PUBSUB, |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
42 C.PI_MODES: C.PLUG_MODE_BOTH, |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
43 C.PI_PROTOCOLS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
44 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0060"], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
45 C.PI_RECOMMENDATIONS: [], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
46 C.PI_MAIN: "PubsubCache", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
47 C.PI_HANDLER: "no", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
48 C.PI_DESCRIPTION: _("""Local Cache for PubSub"""), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
49 } |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
50 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
51 ANALYSER_KEYS_TO_COPY = ("name", "type", "to_sync", "parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
52 # maximum of items to cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
53 CACHE_LIMIT = 5000 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
54 # number of second before a progress caching is considered failed and tried again |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
55 PROGRESS_DEADLINE = 60 * 60 * 6 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
56 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
57 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
58 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
59 class PubsubCache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
60 # TODO: there is currently no notification for (un)subscribe events with XEP-0060, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
61 # but it would be necessary to have this data if some devices unsubscribe a cached |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
62 # node, as we can then get out of sync. A protoXEP could be proposed to fix this |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
63 # situation. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
64 # TODO: handle configuration events |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
65 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
66 def __init__(self, host): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
67 log.info(_("PubSub Cache initialization")) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
68 strategy = host.memory.getConfig(None, "pubsub_cache_strategy") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
69 if strategy == "no_cache": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
70 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
71 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
72 "Pubsub cache won't be used due to pubsub_cache_strategy={value} " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
73 "setting." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
74 ).format(value=repr(strategy)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
75 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
76 self.use_cache = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
77 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
78 self.use_cache = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
79 self.host = host |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
80 self._p = host.plugins["XEP-0060"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
81 self.analysers = {} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
82 # map for caching in progress (node, service) => Deferred |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
83 self.in_progress = {} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
84 self.host.trigger.add("XEP-0060_getItems", self._getItemsTrigger) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
85 self._p.addManagedNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
86 "", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
87 items_cb=self.onItemsEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
88 delete_cb=self.onDeleteEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
89 purge_db=self.onPurgeEvent, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
90 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
91 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
92 "psCacheGet", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
93 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
94 in_sign="ssiassss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
95 out_sign="s", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
96 method=self._getItemsFromCache, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
97 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
98 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
99 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
100 "psCacheSync", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
101 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
102 "sss", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
103 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
104 method=self._synchronise, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
105 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
106 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
107 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
108 "psCachePurge", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
109 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
110 "s", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
111 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
112 method=self._purge, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
113 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
114 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
115 host.bridge.addMethod( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
116 "psCacheReset", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
117 ".plugin", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
118 "", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
119 out_sign="", |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
120 method=self._reset, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
121 async_=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
122 ) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
123 host.bridge.addMethod( |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
124 "psCacheSearch", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
125 ".plugin", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
126 "s", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
127 out_sign="s", |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
128 method=self._search, |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
129 async_=True, |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
130 ) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
131 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
132 def registerAnalyser(self, analyser: dict) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
133 """Register a new pubsub node analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
134 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
135 @param analyser: An analyser is a dictionary which may have the following keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
136 (keys with a ``*`` are mandatory, at least one of ``node`` or ``namespace`` keys |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
137 must be used): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
138 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
139 :name (str)*: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
140 a unique name for this analyser. This name will be stored in database |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
141 to retrieve the analyser when necessary (notably to get the parsing method), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
142 thus it is recommended to use a stable name such as the source plugin name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
143 instead of a name which may change with standard evolution, such as the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
144 feature namespace. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
145 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
146 :type (str)*: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
147 indicates what kind of items we are dealing with. Type must be a human |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
148 readable word, as it may be used in searches. Good types examples are |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
149 **blog** or **event**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
150 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
151 :node (str): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
152 prefix of a node name which may be used to identify its type. Example: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
153 *urn:xmpp:microblog:0* (a node starting with this name will be identified as |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
154 *blog* node). |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
155 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
156 :namespace (str): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
157 root namespace of items. When analysing a node, the first item will be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
158 retrieved. The analyser will be chosen its given namespace match the |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
159 namespace of the first child element of ``<item>`` element. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
160 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
161 :to_sync (bool): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
162 if True, the node must be synchronised in cache. The default False value |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
163 means that the pubsub service will always be requested. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
164 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
165 :parser (callable): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
166 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
167 to call to parse the ``domish.Element`` of the item. The result must be |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
168 dictionary which can be serialised to JSON. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
169 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
170 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
171 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
172 .. function:: parser(client: SatXMPPEntity, item_elt: domish.Element, \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
173 service: Optional[jid.JID], node: Optional[str]) \ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
174 -> dict |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
175 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
176 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
177 :match_cb (callable): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
178 method (which may be sync, a coroutine or a method returning a "Deferred") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
179 called when the analyser matches. The method is called with the curreny |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
180 analyse which is can modify **in-place**. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
181 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
182 The method must have the following signature: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
183 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
184 .. function:: match_cb(client: SatXMPPEntity, analyse: dict) -> None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
185 :noindex: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
186 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
187 @raise exceptions.Conflict: a analyser with this name already exists |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
188 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
189 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
190 name = analyser.get("name", "").strip().lower() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
191 # we want the normalised name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
192 analyser["name"] = name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
193 if not name: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
194 raise ValueError('"name" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
195 if "type" not in analyser: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
196 raise ValueError('"type" is mandatory in analyser') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
197 type_test_keys = {"node", "namespace"} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
198 if not type_test_keys.intersection(analyser): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
199 raise ValueError(f'at least one of {type_test_keys} must be used') |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
200 if name in self.analysers: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
201 raise exceptions.Conflict( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
202 f"An analyser with the name {name!r} is already registered" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
203 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
204 self.analysers[name] = analyser |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
205 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
206 async def cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
207 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
208 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
209 pubsub_node: PubsubNode, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
210 items: List[domish.Element] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
211 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
212 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
213 parser = self.analysers[pubsub_node.analyser].get("parser") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
214 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
215 parser = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
216 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
217 if parser is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
218 parsed_items = [ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
219 await utils.asDeferred( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
220 parser, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
221 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
222 item, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
223 pubsub_node.service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
224 pubsub_node.name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
225 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
226 for item in items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
227 ] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
228 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
229 parsed_items = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
230 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
231 await self.host.memory.storage.cachePubsubItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
232 client, pubsub_node, items, parsed_items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
233 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
234 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
235 async def _cacheNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
236 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
237 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
238 pubsub_node: PubsubNode |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
239 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
240 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
241 pubsub_node, SyncState.IN_PROGRESS |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
242 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
243 service, node = pubsub_node.service, pubsub_node.name |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
244 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
245 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
246 f"Caching node {node!r} at {service} for {client.profile}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
247 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
248 if not pubsub_node.subscribed: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
249 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
250 sub = await self._p.subscribe(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
251 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
252 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
253 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
254 "Can't subscribe node {pubsub_node}, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
255 "synchronisation can't be maintained: {reason}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
256 ).format(pubsub_node=pubsub_node, reason=e) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
257 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
258 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
259 if sub.state == "subscribed": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
260 sub_id = sub.subscriptionIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
261 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
262 f"{pubsub_node} subscribed (subscription id: {sub_id!r})" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
263 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
264 pubsub_node.subscribed = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
265 await self.host.memory.storage.add(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
266 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
267 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
268 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
269 "{pubsub_node} is not subscribed, that means that " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
270 "synchronisation can't be maintained, and you may have " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
271 "to enforce subscription manually. Subscription state: " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
272 "{state}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
273 ).format(pubsub_node=pubsub_node, state=sub.state) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
274 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
275 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
276 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
277 await self.host.checkFeatures( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
278 client, [rsm.NS_RSM, self._p.DISCO_RSM], pubsub_node.service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
279 ) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
280 except error.StanzaError as e: |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
281 if e.condition == "service-unavailable": |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
282 log.warning( |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
283 "service is hidding disco infos, we'll only cache latest 20 items" |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
284 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
285 items, __ = await client.pubsub_client.items( |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
286 pubsub_node.service, pubsub_node.name, maxItems=20 |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
287 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
288 await self.cacheItems( |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
289 client, pubsub_node, items |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
290 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
291 else: |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
292 raise e |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
293 except exceptions.FeatureNotFound: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
294 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
295 f"service {service} doesn't handle Result Set Management " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
296 "(XEP-0059), we'll only cache latest 20 items" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
297 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
298 items, __ = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
299 pubsub_node.service, pubsub_node.name, maxItems=20 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
300 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
301 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
302 client, pubsub_node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
303 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
304 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
305 rsm_p = self.host.plugins["XEP-0059"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
306 rsm_request = rsm.RSMRequest() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
307 cached_ids = set() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
308 while True: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
309 items, rsm_response = await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
310 service, node, rsm_request=rsm_request |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
311 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
312 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
313 client, pubsub_node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
314 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
315 for item in items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
316 item_id = item["id"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
317 if item_id in cached_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
318 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
319 f"Pubsub node {node!r} at {service} is returning several " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
320 f"times the same item ({item_id!r}). This is illegal " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
321 "behaviour, and it means that Pubsub service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
322 f"{service} is buggy and can't be cached properly. " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
323 f"Please report this to {service.host} administrators" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
324 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
325 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
326 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
327 cached_ids.add(item["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
328 if len(cached_ids) >= CACHE_LIMIT: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
329 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
330 f"Pubsub node {node!r} at {service} contains more items " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
331 f"than the cache limit ({CACHE_LIMIT}). We stop " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
332 "caching here, at item {item['id']!r}." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
333 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
334 rsm_request = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
335 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
336 rsm_request = rsm_p.getNextRequest(rsm_request, rsm_response) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
337 if rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
338 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
339 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
340 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
341 pubsub_node, SyncState.COMPLETED |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
342 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
343 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
344 import traceback |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
345 tb = traceback.format_tb(e.__traceback__) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
346 log.error( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
347 f"Can't cache node {node!r} at {service} for {client.profile}: {e}\n{tb}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
348 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
349 await self.host.memory.storage.updatePubsubNodeSyncState( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
350 pubsub_node, SyncState.ERROR |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
351 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
352 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
353 raise e |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
354 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
355 def _cacheNodeClean(self, __, pubsub_node): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
356 del self.in_progress[(pubsub_node.service, pubsub_node.name)] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
357 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
358 def cacheNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
359 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
360 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
361 pubsub_node: PubsubNode |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
362 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
363 """Launch node caching as a background task""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
364 d = defer.ensureDeferred(self._cacheNode(client, pubsub_node)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
365 d.addBoth(self._cacheNodeClean, pubsub_node=pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
366 self.in_progress[(pubsub_node.service, pubsub_node.name)] = d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
367 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
368 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
369 async def analyseNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
370 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
371 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
372 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
373 node: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
374 pubsub_node : PubsubNode = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
375 ) -> dict: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
376 """Use registered analysers on a node to determine what it is used for""" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
377 analyse = {"service": service, "node": node} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
378 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
379 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
380 first_item = (await client.pubsub_client.items( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
381 service, node, 1 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
382 ))[0][0] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
383 except IndexError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
384 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
385 except error.StanzaError as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
386 if e.condition == "item-not-found": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
387 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
388 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
389 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
390 f"Can't retrieve last item on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
391 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
392 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
393 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
394 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
395 uri = first_item.firstChildElement().uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
396 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
397 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
398 f"Can't retrieve item namespace on node {node!r} at service " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
399 f"{service} for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
400 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
401 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
402 analyse["namespace"] = uri |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
403 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
404 conf = await self._p.getConfiguration(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
405 except Exception as e: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
406 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
407 f"Can't retrieve configuration for node {node!r} at service {service} " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
408 f"for {client.profile}: {e}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
409 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
410 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
411 analyse["conf"] = conf |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
412 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
413 for analyser in self.analysers.values(): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
414 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
415 an_node = analyser["node"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
416 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
417 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
418 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
419 if node.startswith(an_node): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
420 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
421 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
422 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
423 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
424 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
425 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
426 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
427 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
428 namespace = analyse["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
429 an_namespace = analyser["namespace"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
430 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
431 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
432 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
433 if namespace == an_namespace: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
434 for key in ANALYSER_KEYS_TO_COPY: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
435 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
436 analyse[key] = analyser[key] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
437 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
438 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
439 found = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
440 break |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
441 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
442 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
443 found = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
444 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
445 f"node {node!r} at service {service} doesn't match any known type" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
446 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
447 if found: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
448 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
449 match_cb = analyser["match_cb"] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
450 except KeyError: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
451 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
452 else: |
3619
32181a45d54b
plugin pubsub cache: use `asDeferred` with `match_cb`
Goffi <goffi@goffi.org>
parents:
3597
diff
changeset
|
453 await utils.asDeferred(match_cb, client, analyse) |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
454 return analyse |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
455 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
456 def _getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
457 self, service="", node="", max_items=10, item_ids=None, sub_id=None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
458 extra="", profile_key=C.PROF_KEY_NONE |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
459 ): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
460 d = defer.ensureDeferred(self._aGetItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
461 service, node, max_items, item_ids, sub_id, extra, profile_key |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
462 )) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
463 d.addCallback(self._p.transItemsData) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
464 d.addCallback(self._p.serialiseItems) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
465 return d |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
466 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
467 async def _aGetItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
468 self, service, node, max_items, item_ids, sub_id, extra, profile_key |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
469 ): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
470 client = self.host.getClient(profile_key) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
471 service = jid.JID(service) if service else client.jid.userhostJID() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
472 pubsub_node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
473 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
474 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
475 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
476 raise exceptions.NotFound( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
477 f"{node!r} at {service} doesn't exist in cache for {client.profile!r}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
478 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
479 max_items = None if max_items == C.NO_LIMIT else max_items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
480 extra = self._p.parseExtra(data_format.deserialise(extra)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
481 items, metadata = await self.getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
482 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
483 pubsub_node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
484 max_items, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
485 item_ids, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
486 sub_id or None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
487 extra.rsm_request, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
488 extra.extra, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
489 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
490 return [i.data for i in items], metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
491 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
492 async def getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
493 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
494 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
495 node: PubsubNode, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
496 max_items: Optional[int] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
497 item_ids: Optional[List[str]] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
498 sub_id: Optional[str] = None, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
499 rsm_request: Optional[rsm.RSMRequest] = None, |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
500 extra: Optional[Dict[str, Any]] = None |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
501 ) -> Tuple[List[PubsubItem], dict]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
502 """Get items from cache, using same arguments as for external Pubsub request""" |
3738
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
503 if extra is None: |
ffa8c8c78115
plugin XEP-0059, cache: allow those plugins to work in component mode
Goffi <goffi@goffi.org>
parents:
3666
diff
changeset
|
504 extra = {} |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
505 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
506 raise NotImplementedError("MAM queries are not supported yet") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
507 if max_items is None and rsm_request is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
508 max_items = 20 |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
509 pubsub_items, metadata = await self.host.memory.storage.getItems( |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
510 node, max_items=max_items, item_ids=item_ids, |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
511 order_by=extra.get(C.KEY_ORDER_BY) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
512 ) |
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
513 elif max_items is not None: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
514 if rsm_request is not None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
515 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
516 "Pubsub max items and RSM must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
517 ) |
3759
c4881833cf8a
plugin pubsub cache: more resilient node caching:
Goffi <goffi@goffi.org>
parents:
3738
diff
changeset
|
518 elif item_ids: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
519 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
520 "Pubsub max items and item IDs must not be used at the same time" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
521 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
522 pubsub_items, metadata = await self.host.memory.storage.getItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
523 node, max_items=max_items, order_by=extra.get(C.KEY_ORDER_BY) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
524 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
525 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
526 desc = False |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
527 if rsm_request.before == "": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
528 before = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
529 desc = True |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
530 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
531 before = rsm_request.before |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
532 pubsub_items, metadata = await self.host.memory.storage.getItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
533 node, max_items=rsm_request.max, before=before, after=rsm_request.after, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
534 from_index=rsm_request.index, order_by=extra.get(C.KEY_ORDER_BY), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
535 desc=desc, force_rsm=True, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
536 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
537 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
538 return pubsub_items, metadata |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
539 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
540 async def onItemsEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
541 node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
542 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
543 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
544 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
545 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
546 if node.sync_state in (SyncState.COMPLETED, SyncState.IN_PROGRESS): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
547 items = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
548 retract_ids = [] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
549 for elt in event.items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
550 if elt.name == "item": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
551 items.append(elt) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
552 elif elt.name == "retract": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
553 item_id = elt.getAttribute("id") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
554 if not item_id: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
555 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
556 "Ignoring invalid retract item element: " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
557 f"{xml_tools.pFmtElt(elt)}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
558 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
559 continue |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
560 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
561 retract_ids.append(elt["id"]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
562 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
563 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
564 f"Unexpected Pubsub event element: {xml_tools.pFmtElt(elt)}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
565 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
566 if items: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
567 log.debug("caching new items received from {node}") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
568 await self.cacheItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
569 client, node, items |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
570 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
571 if retract_ids: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
572 log.debug(f"deleting retracted items from {node}") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
573 await self.host.memory.storage.deletePubsubItems( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
574 node, items_names=retract_ids |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
575 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
576 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
577 async def onDeleteEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
578 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
579 f"deleting node {event.nodeIdentifier} from {event.sender} for " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
580 f"{client.profile}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
581 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
582 await self.host.memory.storage.deletePubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
583 [client.profile], [event.sender], [event.nodeIdentifier] |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
584 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
585 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
586 async def onPurgeEvent(self, client, event): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
587 node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
588 client, event.sender, event.nodeIdentifier |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
589 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
590 if node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
591 return |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
592 log.debug(f"purging node {node} for {client.profile}") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
593 await self.host.memory.storage.deletePubsubItems(node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
594 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
595 async def _getItemsTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
596 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
597 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
598 service: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
599 node: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
600 max_items: Optional[int], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
601 item_ids: Optional[List[str]], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
602 sub_id: Optional[str], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
603 rsm_request: Optional[rsm.RSMRequest], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
604 extra: dict |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
605 ) -> Tuple[bool, Optional[Tuple[List[dict], dict]]]: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
606 if not self.use_cache: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
607 log.debug("cache disabled in settings") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
608 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
609 if extra.get(C.KEY_USE_CACHE) == False: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
610 log.debug("skipping pubsub cache as requested") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
611 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
612 if service is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
613 service = client.jid.userhostJID() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
614 pubsub_node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
615 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
616 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
617 if pubsub_node is not None and pubsub_node.sync_state == SyncState.COMPLETED: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
618 analyse = {"to_sync": True} |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
619 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
620 analyse = await self.analyseNode(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
621 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
622 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
623 pubsub_node = await self.host.memory.storage.setPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
624 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
625 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
626 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
627 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
628 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
629 subtype=analyse.get("subtype"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
630 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
631 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
632 if analyse.get("to_sync"): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
633 if pubsub_node.sync_state == SyncState.COMPLETED: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
634 if "mam" in extra: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
635 log.debug("MAM caching is not supported yet, skipping cache") |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
636 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
637 pubsub_items, metadata = await self.getItemsFromCache( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
638 client, pubsub_node, max_items, item_ids, sub_id, rsm_request, extra |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
639 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
640 return False, ([i.data for i in pubsub_items], metadata) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
641 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
642 if pubsub_node.sync_state == SyncState.IN_PROGRESS: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
643 if (service, node) not in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
644 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
645 f"{pubsub_node} is reported as being cached, but not caching is " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
646 "in progress, this is most probably due to the backend being " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
647 "restarted. Resetting the status, caching will be done again." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
648 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
649 pubsub_node.sync_state = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
650 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
651 elif time.time() - pubsub_node.sync_state_updated > PROGRESS_DEADLINE: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
652 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
653 f"{pubsub_node} is in progress for too long " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
654 f"({pubsub_node.sync_state_updated//60} minutes), " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
655 "cancelling it and retrying." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
656 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
657 self.in_progress.pop[(service, node)].cancel() |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
658 pubsub_node.sync_state = None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
659 await self.host.memory.storage.deletePubsubItems(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
660 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
661 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
662 f"{pubsub_node} synchronisation is already in progress, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
663 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
664 if pubsub_node.sync_state is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
665 key = (service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
666 if key in self.in_progress: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
667 raise exceptions.InternalError( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
668 f"There is already a caching in progress for {pubsub_node}, this " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
669 "should not happen" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
670 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
671 self.cacheNode(client, pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
672 elif pubsub_node.sync_state == SyncState.ERROR: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
673 log.debug( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
674 f"{pubsub_node} synchronisation has previously failed, skipping" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
675 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
676 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
677 return True, None |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
678 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
679 async def _subscribeTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
680 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
681 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
682 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
683 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
684 sub_jid: Optional[jid.JID], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
685 options: Optional[dict], |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
686 subscription: pubsub.Subscription |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
687 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
688 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
689 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
690 async def _unsubscribeTrigger( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
691 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
692 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
693 service: jid.JID, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
694 nodeIdentifier: str, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
695 sub_jid, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
696 subscriptionIdentifier, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
697 sender, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
698 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
699 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
700 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
701 def _synchronise(self, service, node, profile_key): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
702 client = self.host.getClient(profile_key) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
703 service = client.jid.userhostJID() if not service else jid.JID(service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
704 return defer.ensureDeferred(self.synchronise(client, service, node)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
705 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
706 async def synchronise( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
707 self, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
708 client: SatXMPPEntity, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
709 service: jid.JID, |
3760
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
710 node: str, |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
711 resync: bool = True |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
712 ) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
713 """Synchronise a node with a pubsub service |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
714 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
715 The node will be synchronised even if there is no matching analyser. |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
716 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
717 Note that when a node is synchronised, it is automatically subscribed. |
3760
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
718 @param resync: if True and the node is already synchronised, it will be |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
719 resynchronised (all items will be deleted and re-downloaded). |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
720 |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
721 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
722 pubsub_node = await self.host.memory.storage.getPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
723 client, service, node |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
724 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
725 if pubsub_node is None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
726 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
727 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
728 "Synchronising the new node {node} at {service}" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
729 ).format(node=node, service=service.full) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
730 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
731 analyse = await self.analyseNode(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
732 pubsub_node = await self.host.memory.storage.setPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
733 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
734 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
735 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
736 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
737 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
738 ) |
3760
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
739 elif not resync: |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
740 # the node exists, nothing to do |
74f436e856ff
plugin pubsub cache: new `resync` argument to force resynchronisation in `synchronize`
Goffi <goffi@goffi.org>
parents:
3759
diff
changeset
|
741 return |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
742 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
743 if ((pubsub_node.sync_state == SyncState.IN_PROGRESS |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
744 or (service, node) in self.in_progress)): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
745 log.warning( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
746 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
747 "{node} at {service} is already being synchronised, can't do a new " |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
748 "synchronisation." |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
749 ).format(node=node, service=service) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
750 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
751 else: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
752 log.info( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
753 _( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
754 "(Re)Synchronising the node {node} at {service} on user request" |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
755 ).format(node=node, service=service.full) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
756 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
757 # we first delete and recreate the node (will also delete its items) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
758 await self.host.memory.storage.delete(pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
759 analyse = await self.analyseNode(client, service, node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
760 pubsub_node = await self.host.memory.storage.setPubsubNode( |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
761 client, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
762 service, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
763 node, |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
764 analyser=analyse.get("name"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
765 type_=analyse.get("type"), |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
766 ) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
767 # then we can put node in cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
768 await self.cacheNode(client, pubsub_node) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
769 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
770 async def purge(self, purge_filters: dict) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
771 """Remove items according to filters |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
772 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
773 filters can have on of the following keys, all are optional: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
774 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
775 :services: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
776 list of JIDs of services from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
777 :nodes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
778 list of node names to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
779 :types: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
780 list of node types to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
781 :subtypes: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
782 list of node subtypes to delete |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
783 :profiles: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
784 list of profiles from which items must be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
785 :created_before: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
786 datetime before which items must have been created to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
787 :created_update: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
788 datetime before which items must have been updated last to be deleted |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
789 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
790 purge_filters["names"] = purge_filters.pop("nodes", None) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
791 await self.host.memory.storage.purgePubsubItems(**purge_filters) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
792 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
793 def _purge(self, purge_filters: str) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
794 purge_filters = data_format.deserialise(purge_filters) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
795 for key in "created_before", "updated_before": |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
796 try: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
797 purge_filters[key] = datetime.fromtimestamp(purge_filters[key]) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
798 except (KeyError, TypeError): |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
799 pass |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
800 return defer.ensureDeferred(self.purge(purge_filters)) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
801 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
802 async def reset(self) -> None: |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
803 """Remove ALL nodes and items from cache |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
804 |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
805 After calling this method, cache will be refilled progressively as if it where new |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
806 """ |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
807 await self.host.memory.storage.deletePubsubNode(None, None, None) |
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
808 |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
809 def _reset(self) -> defer.Deferred: |
3597
5d108ce026d7
plugin pubsub cache: Pubsub Caching implementation
Goffi <goffi@goffi.org>
parents:
diff
changeset
|
810 return defer.ensureDeferred(self.reset()) |
3666
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
811 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
812 async def search(self, query: dict) -> List[PubsubItem]: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
813 """Search pubsub items in cache""" |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
814 return await self.host.memory.storage.searchPubsubItems(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
815 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
816 async def serialisableSearch(self, query: dict) -> List[dict]: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
817 """Search pubsub items in cache and returns parsed data |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
818 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
819 The returned data can be serialised. |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
820 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
821 "pubsub_service" and "pubsub_name" will be added to each data (both as strings) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
822 """ |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
823 items = await self.search(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
824 ret = [] |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
825 for item in items: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
826 parsed = item.parsed |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
827 parsed["pubsub_service"] = item.node.service.full() |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
828 parsed["pubsub_node"] = item.node.name |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
829 if query.get("with_payload"): |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
830 parsed["item_payload"] = item.data.toXml() |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
831 parsed["node_profile"] = self.host.memory.storage.getProfileById( |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
832 item.node.profile_id |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
833 ) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
834 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
835 ret.append(parsed) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
836 return ret |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
837 |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
838 def _search(self, query: str) -> defer.Deferred: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
839 query = data_format.deserialise(query) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
840 services = query.get("services") |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
841 if services: |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
842 query["services"] = [jid.JID(s) for s in services] |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
843 d = defer.ensureDeferred(self.serialisableSearch(query)) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
844 d.addCallback(data_format.serialise) |
342e3ddefd23
plugin pubsub cache: implement `psCacheSearch`
Goffi <goffi@goffi.org>
parents:
3619
diff
changeset
|
845 return d |