Mercurial > libervia-pubsub
comparison sat_pubsub/privilege.py @ 467:d86e0f8a1405
privilege: store roster cache in database:
- rosters are now stored on database and restored on startup. This way, presence map can
be restored without the need to wait for all contact to send presence again
- roster version are checked, if a new version is received, presence map is updated
accordingly
- roster are not retrieved if presence are received in a too short delay (see ROSTER_TTL),
to avoid using too much resources if a client connect/disconnect a lot
The current behaviour works around XEP-0356 limitations. An update of the XEP will be
needed to get roster pushes and roster version.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 15 Oct 2021 15:30:18 +0200 |
parents | f520ac3164b0 |
children | a549c8e17827 |
comparison
equal
deleted
inserted
replaced
466:0d38c3529972 | 467:d86e0f8a1405 |
---|---|
18 | 18 |
19 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and " | 19 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and " |
20 "presences" | 20 "presences" |
21 | 21 |
22 import time | 22 import time |
23 from typing import List, Set | 23 from typing import Optional, Dict, List, Set |
24 from datetime import datetime, timezone | |
24 from wokkel import xmppim | 25 from wokkel import xmppim |
25 from wokkel.compat import IQ | 26 from wokkel.compat import IQ |
26 from wokkel import pubsub | 27 from wokkel import pubsub |
27 from wokkel import disco | 28 from wokkel import disco |
28 from wokkel.iwokkel import IPubSubService | 29 from wokkel.iwokkel import IPubSubService |
46 PERM_ROSTER:ALLOWED_ROSTER, | 47 PERM_ROSTER:ALLOWED_ROSTER, |
47 PERM_MESSAGE:ALLOWED_MESSAGE, | 48 PERM_MESSAGE:ALLOWED_MESSAGE, |
48 PERM_PRESENCE:ALLOWED_PRESENCE | 49 PERM_PRESENCE:ALLOWED_PRESENCE |
49 } | 50 } |
50 | 51 |
52 # Number of seconds before a roster cache is not considered valid anymore. | |
53 # We keep this delay to avoid requesting roster too much in a row if an entity is | |
54 # connecting/disconnecting often in a short time. | |
55 ROSTER_TTL = 3600 | |
56 | |
57 | |
58 Roster = Dict[jid.JID, xmppim.RosterItem] | |
59 | |
51 | 60 |
52 class InvalidStanza(Exception): | 61 class InvalidStanza(Exception): |
53 pass | 62 pass |
54 | 63 |
55 class NotAllowedError(Exception): | 64 class NotAllowedError(Exception): |
68 self._pubsub_service = None | 77 self._pubsub_service = None |
69 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash | 78 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash |
70 # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to | 79 # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to |
71 # notify (notify) | 80 # notify (notify) |
72 self.hash_map = {} | 81 self.hash_map = {} |
73 self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster" | 82 # dict which will be filled from database once connection is initialized, |
83 # key: jid, value: dict with "timestamp" and "roster" | |
84 self.roster_cache = None | |
74 # key: jid, value: set of entities who need to receive a notification when we | 85 # key: jid, value: set of entities who need to receive a notification when we |
75 # get a presence from them. All entities in value have a presence subscription | 86 # get a presence from them. All entities in value have a presence subscription |
76 # to the key entity. | 87 # to the key entity. |
77 self.presence_map = {} | 88 self.presence_map = {} |
78 # resource currently online | 89 # resource currently online |
79 self.presences = set() | 90 self.presences = set() |
80 | 91 |
81 @property | 92 @property |
82 def permissions(self): | 93 def permissions(self): |
83 return self._permissions | 94 return self._permissions |
95 | |
96 async def getRosterCacheFromDB(self): | |
97 rows = await self.backend.storage.getRosterCache() | |
98 for __, owner_jid, version, timestamp, roster_elt in rows: | |
99 roster = self.getRosterFromElement(roster_elt) | |
100 self.roster_cache[owner_jid] = { | |
101 "timestamp": timestamp, | |
102 "roster": roster, | |
103 "version": version | |
104 } | |
105 self.updatePresenceMap(owner_jid, roster, None) | |
84 | 106 |
85 def connectionInitialized(self): | 107 def connectionInitialized(self): |
86 for handler in self.parent.handlers: | 108 for handler in self.parent.handlers: |
87 if IPubSubService.providedBy(handler): | 109 if IPubSubService.providedBy(handler): |
88 self._pubsub_service = handler | 110 self._pubsub_service = handler |
89 break | 111 break |
90 self.backend = self.parent.parent.getServiceNamed('backend') | 112 self.backend = self.parent.parent.getServiceNamed('backend') |
91 self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise) | 113 self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise) |
92 self.xmlstream.addObserver('/presence', self._onPresence) | 114 self.xmlstream.addObserver('/presence', self._onPresence) |
115 if self.roster_cache is None: | |
116 self.roster_cache = {} | |
117 defer.ensureDeferred(self.getRosterCacheFromDB()) | |
93 | 118 |
94 def onAdvertise(self, message): | 119 def onAdvertise(self, message): |
95 """Managage the <message/> advertising privileges | 120 """Managage the <message/> advertising privileges |
96 | 121 |
97 self._permissions will be updated according to advertised privileged | 122 self._permissions will be updated according to advertised privileged |
126 .format(**self._permissions) | 151 .format(**self._permissions) |
127 ) | 152 ) |
128 | 153 |
129 ## roster ## | 154 ## roster ## |
130 | 155 |
131 def getRoster(self, to_jid): | 156 def updatePresenceMap( |
132 """ | 157 self, |
133 Retrieve contact list. | 158 owner_jid: jid.JID, |
134 | 159 roster: Roster, |
135 @return: Roster as a mapping from L{JID} to L{RosterItem}. | 160 old_roster: Optional[Roster] |
136 @rtype: L{twisted.internet.defer.Deferred} | 161 ) -> None: |
137 """ | 162 """Update ``self.presence_map`` from roster |
138 # TODO: cache results | 163 |
164 @param owner_jid: jid of the owner of the roster | |
165 @param roster: roster dict as returned by self.getRoster | |
166 @param old_roster: previously cached roster if any | |
167 """ | |
168 if old_roster is not None: | |
169 # we check if presence subscription have not been removed and update | |
170 # presence_map accordingly | |
171 for roster_jid, roster_item in old_roster.items(): | |
172 if ((roster_item.subscriptionFrom | |
173 and (roster_jid not in roster | |
174 or not roster[roster_jid].subscriptionFrom) | |
175 )): | |
176 try: | |
177 self.presence_map[roster_jid].discard(owner_jid) | |
178 except KeyError: | |
179 pass | |
180 if ((roster_item.subscriptionTo | |
181 and (roster_jid not in roster | |
182 or not roster[roster_jid].subscriptionTo) | |
183 )): | |
184 try: | |
185 self.presence_map[owner_jid].discard(roster_jid) | |
186 except KeyError: | |
187 pass | |
188 | |
189 for roster_jid, roster_item in roster.items(): | |
190 if roster_item.subscriptionFrom: | |
191 # we need to know who is subscribed to our user, to send them | |
192 # notifications when they send presence to us | |
193 self.presence_map.setdefault(roster_jid, set()).add(owner_jid) | |
194 if ((roster_item.subscriptionTo | |
195 and jid.JID(roster_jid.host) == self.backend.server_jid)): | |
196 # we also need to know who on this server we are subscribed to, so | |
197 # we can get their notifications even if they didn't connect so far. | |
198 self.presence_map.setdefault(owner_jid, set()).add(roster_jid) | |
199 | |
200 def serialiseRoster( | |
201 self, | |
202 roster: Roster, | |
203 version: Optional[str] = None | |
204 ) -> domish.Element: | |
205 """Reconstruct Query element of the roster""" | |
206 roster_elt = domish.Element((ROSTER_NS, "query")) | |
207 if version: | |
208 roster_elt["ver"] = version | |
209 for item in roster.values(): | |
210 roster_elt.addChild(item.toElement()) | |
211 return roster_elt | |
212 | |
213 async def updateRosterCache( | |
214 self, | |
215 owner_jid: jid.JID, | |
216 roster: Roster, | |
217 version: str | |
218 ) -> None: | |
219 """Update local roster cache and database""" | |
220 now = time.time() | |
221 self.roster_cache[owner_jid] = { | |
222 'timestamp': now, | |
223 'roster': roster, | |
224 'version': version | |
225 } | |
226 roster_elt = self.serialiseRoster(roster, version) | |
227 await self.backend.storage.setRosterCache( | |
228 owner_jid, version, now, roster_elt | |
229 ) | |
230 | |
231 def getRosterFromElement(self, query_elt: domish.Element) -> Roster: | |
232 """Parse roster query result payload to get a Roster dict""" | |
233 roster = {} | |
234 for element in query_elt.elements(ROSTER_NS, 'item'): | |
235 item = xmppim.RosterItem.fromElement(element) | |
236 roster[item.entity] = item | |
237 return roster | |
238 | |
239 async def getRoster(self, to_jid: jid.JID) -> Roster: | |
240 """Retrieve contact list. | |
241 | |
242 @param to_jid: jid of the entity owning the roster | |
243 @return: roster data | |
244 """ | |
139 if self._permissions[PERM_ROSTER] not in ('get', 'both'): | 245 if self._permissions[PERM_ROSTER] not in ('get', 'both'): |
140 log.msg("WARNING: permission not allowed to get roster") | 246 log.msg("WARNING: permission not allowed to get roster") |
141 raise failure.Failure(NotAllowedError('roster get is not allowed')) | 247 raise failure.Failure(NotAllowedError('roster get is not allowed')) |
142 | 248 |
143 def processRoster(result): | |
144 roster = {} | |
145 for element in result.query.elements(ROSTER_NS, 'item'): | |
146 item = xmppim.RosterItem.fromElement(element) | |
147 roster[item.entity] = item | |
148 | |
149 return roster | |
150 | |
151 iq = IQ(self.xmlstream, 'get') | 249 iq = IQ(self.xmlstream, 'get') |
152 iq.addElement((ROSTER_NS, 'query')) | 250 iq.addElement((ROSTER_NS, 'query')) |
153 iq["to"] = to_jid.userhost() | 251 iq["to"] = to_jid.userhost() |
154 d = iq.send() | 252 iq_result = await iq.send() |
155 d.addCallback(processRoster) | 253 roster = self.getRosterFromElement(iq_result.query) |
156 return d | 254 |
157 | 255 version = iq_result.query.getAttribute('ver') |
158 def _isSubscribedFrom(self, roster, entity, roster_owner_jid): | 256 cached_roster = self.roster_cache.get("to_jid") |
257 if not cached_roster: | |
258 self.updatePresenceMap(to_jid, roster, None) | |
259 await self.updateRosterCache(to_jid, roster, version) | |
260 else: | |
261 # we already have a roster in cache, we have to check it if the new one is | |
262 # modified, and update presence_map and database | |
263 if version: | |
264 if cached_roster["version"] != version: | |
265 self.updatePresenceMap(to_jid, roster, cached_roster["roster"]) | |
266 await self.updateRosterCache(to_jid, roster, version) | |
267 else: | |
268 cached_roster["timestamp"] = time.time() | |
269 else: | |
270 # no version available, we have to compare the whole XML | |
271 if ((self.serialiseRoster(cached_roster["roster"]).toXml() != | |
272 self.serialiseRoster(roster))): | |
273 self.updatePresenceMap(to_jid, roster, cached_roster["roster"]) | |
274 await self.updateRosterCache(to_jid, roster, version) | |
275 else: | |
276 cached_roster["timestamp"] = time.time() | |
277 | |
278 return roster | |
279 | |
280 async def isSubscribedFrom(self, entity: jid.JID, roster_owner_jid: jid.JID) -> bool: | |
281 """Check if entity has presence subscription from roster_owner_jid | |
282 | |
283 @param entity: entity to check subscription to | |
284 @param roster_owner_jid: owner of the roster to check | |
285 @return: True if entity has a subscription from roster_owner_jid | |
286 """ | |
287 roster = await self.getRoster(roster_owner_jid) | |
159 try: | 288 try: |
160 return roster[entity.userhostJID()].subscriptionFrom | 289 return roster[entity.userhostJID()].subscriptionFrom |
161 except KeyError: | 290 except KeyError: |
162 return False | 291 return False |
163 | |
164 def isSubscribedFrom(self, entity, roster_owner_jid): | |
165 """Check if entity has presence subscription from roster_owner_jid | |
166 | |
167 @param entity(jid.JID): entity to check subscription to | |
168 @param roster_owner_jid(jid.JID): owner of the roster to check | |
169 @return D(bool): True if entity has a subscription from roster_owner_jid | |
170 """ | |
171 d = self.getRoster(roster_owner_jid) | |
172 d.addCallback(self._isSubscribedFrom, entity, roster_owner_jid) | |
173 return d | |
174 | 292 |
175 ## message ## | 293 ## message ## |
176 | 294 |
177 def sendMessage(self, priv_message, to_jid=None): | 295 def sendMessage(self, priv_message, to_jid=None): |
178 """Send privileged message (in the name of the server) | 296 """Send privileged message (in the name of the server) |
255 | 373 |
256 async def onPresence(self, presence_elt: domish.Element) -> None: | 374 async def onPresence(self, presence_elt: domish.Element) -> None: |
257 from_jid = jid.JID(presence_elt['from']) | 375 from_jid = jid.JID(presence_elt['from']) |
258 from_jid_bare = from_jid.userhostJID() | 376 from_jid_bare = from_jid.userhostJID() |
259 if ((jid.JID(from_jid.host) == self.backend.server_jid | 377 if ((jid.JID(from_jid.host) == self.backend.server_jid |
260 and from_jid_bare not in self.roster_cache)): | 378 and ( |
261 roster = await self.getRoster(from_jid_bare) | 379 from_jid_bare not in self.roster_cache |
262 timestamp = time.time() | 380 or time.time()-self.roster_cache[from_jid_bare]["timestamp"]>ROSTER_TTL |
263 self.roster_cache[from_jid_bare] = {'timestamp': timestamp, | 381 ))): |
264 'roster': roster, | 382 roster = await self.getRoster(from_jid) |
265 } | |
266 for roster_jid, roster_item in roster.items(): | |
267 if roster_item.subscriptionFrom: | |
268 # we need to know who is subscribed to our user, to send them | |
269 # notifications when they send presence to us | |
270 self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare) | |
271 if ((roster_item.subscriptionTo | |
272 and jid.JID(roster_jid.host) == self.backend.server_jid)): | |
273 # we also need to know who on this server we are subscribed to, so | |
274 # we can get their notifications even if they didn't connect so far. | |
275 self.presence_map.setdefault(from_jid_bare, set()).add(roster_jid) | |
276 | 383 |
277 presence_type = presence_elt.getAttribute('type') | 384 presence_type = presence_elt.getAttribute('type') |
278 if presence_type == "unavailable": | 385 if presence_type == "unavailable": |
279 self.presences.discard(from_jid) | 386 self.presences.discard(from_jid) |
280 elif from_jid not in self.presences: | 387 elif from_jid not in self.presences: |