comparison sat_pubsub/privilege.py @ 467:d86e0f8a1405

privilege: store roster cache in database: - rosters are now stored on database and restored on startup. This way, presence map can be restored without the need to wait for all contact to send presence again - roster version are checked, if a new version is received, presence map is updated accordingly - roster are not retrieved if presence are received in a too short delay (see ROSTER_TTL), to avoid using too much resources if a client connect/disconnect a lot The current behaviour works around XEP-0356 limitations. An update of the XEP will be needed to get roster pushes and roster version.
author Goffi <goffi@goffi.org>
date Fri, 15 Oct 2021 15:30:18 +0200
parents f520ac3164b0
children a549c8e17827
comparison
equal deleted inserted replaced
466:0d38c3529972 467:d86e0f8a1405
18 18
19 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and " 19 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and "
20 "presences" 20 "presences"
21 21
22 import time 22 import time
23 from typing import List, Set 23 from typing import Optional, Dict, List, Set
24 from datetime import datetime, timezone
24 from wokkel import xmppim 25 from wokkel import xmppim
25 from wokkel.compat import IQ 26 from wokkel.compat import IQ
26 from wokkel import pubsub 27 from wokkel import pubsub
27 from wokkel import disco 28 from wokkel import disco
28 from wokkel.iwokkel import IPubSubService 29 from wokkel.iwokkel import IPubSubService
46 PERM_ROSTER:ALLOWED_ROSTER, 47 PERM_ROSTER:ALLOWED_ROSTER,
47 PERM_MESSAGE:ALLOWED_MESSAGE, 48 PERM_MESSAGE:ALLOWED_MESSAGE,
48 PERM_PRESENCE:ALLOWED_PRESENCE 49 PERM_PRESENCE:ALLOWED_PRESENCE
49 } 50 }
50 51
52 # Number of seconds before a roster cache is not considered valid anymore.
53 # We keep this delay to avoid requesting roster too much in a row if an entity is
54 # connecting/disconnecting often in a short time.
55 ROSTER_TTL = 3600
56
57
58 Roster = Dict[jid.JID, xmppim.RosterItem]
59
51 60
52 class InvalidStanza(Exception): 61 class InvalidStanza(Exception):
53 pass 62 pass
54 63
55 class NotAllowedError(Exception): 64 class NotAllowedError(Exception):
68 self._pubsub_service = None 77 self._pubsub_service = None
69 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash 78 self.caps_map = {} # key: bare jid, value: dict of resources with caps hash
70 # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to 79 # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to
71 # notify (notify) 80 # notify (notify)
72 self.hash_map = {} 81 self.hash_map = {}
73 self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster" 82 # dict which will be filled from database once connection is initialized,
83 # key: jid, value: dict with "timestamp" and "roster"
84 self.roster_cache = None
74 # key: jid, value: set of entities who need to receive a notification when we 85 # key: jid, value: set of entities who need to receive a notification when we
75 # get a presence from them. All entities in value have a presence subscription 86 # get a presence from them. All entities in value have a presence subscription
76 # to the key entity. 87 # to the key entity.
77 self.presence_map = {} 88 self.presence_map = {}
78 # resource currently online 89 # resource currently online
79 self.presences = set() 90 self.presences = set()
80 91
81 @property 92 @property
82 def permissions(self): 93 def permissions(self):
83 return self._permissions 94 return self._permissions
95
96 async def getRosterCacheFromDB(self):
97 rows = await self.backend.storage.getRosterCache()
98 for __, owner_jid, version, timestamp, roster_elt in rows:
99 roster = self.getRosterFromElement(roster_elt)
100 self.roster_cache[owner_jid] = {
101 "timestamp": timestamp,
102 "roster": roster,
103 "version": version
104 }
105 self.updatePresenceMap(owner_jid, roster, None)
84 106
85 def connectionInitialized(self): 107 def connectionInitialized(self):
86 for handler in self.parent.handlers: 108 for handler in self.parent.handlers:
87 if IPubSubService.providedBy(handler): 109 if IPubSubService.providedBy(handler):
88 self._pubsub_service = handler 110 self._pubsub_service = handler
89 break 111 break
90 self.backend = self.parent.parent.getServiceNamed('backend') 112 self.backend = self.parent.parent.getServiceNamed('backend')
91 self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise) 113 self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise)
92 self.xmlstream.addObserver('/presence', self._onPresence) 114 self.xmlstream.addObserver('/presence', self._onPresence)
115 if self.roster_cache is None:
116 self.roster_cache = {}
117 defer.ensureDeferred(self.getRosterCacheFromDB())
93 118
94 def onAdvertise(self, message): 119 def onAdvertise(self, message):
95 """Managage the <message/> advertising privileges 120 """Managage the <message/> advertising privileges
96 121
97 self._permissions will be updated according to advertised privileged 122 self._permissions will be updated according to advertised privileged
126 .format(**self._permissions) 151 .format(**self._permissions)
127 ) 152 )
128 153
129 ## roster ## 154 ## roster ##
130 155
131 def getRoster(self, to_jid): 156 def updatePresenceMap(
132 """ 157 self,
133 Retrieve contact list. 158 owner_jid: jid.JID,
134 159 roster: Roster,
135 @return: Roster as a mapping from L{JID} to L{RosterItem}. 160 old_roster: Optional[Roster]
136 @rtype: L{twisted.internet.defer.Deferred} 161 ) -> None:
137 """ 162 """Update ``self.presence_map`` from roster
138 # TODO: cache results 163
164 @param owner_jid: jid of the owner of the roster
165 @param roster: roster dict as returned by self.getRoster
166 @param old_roster: previously cached roster if any
167 """
168 if old_roster is not None:
169 # we check if presence subscription have not been removed and update
170 # presence_map accordingly
171 for roster_jid, roster_item in old_roster.items():
172 if ((roster_item.subscriptionFrom
173 and (roster_jid not in roster
174 or not roster[roster_jid].subscriptionFrom)
175 )):
176 try:
177 self.presence_map[roster_jid].discard(owner_jid)
178 except KeyError:
179 pass
180 if ((roster_item.subscriptionTo
181 and (roster_jid not in roster
182 or not roster[roster_jid].subscriptionTo)
183 )):
184 try:
185 self.presence_map[owner_jid].discard(roster_jid)
186 except KeyError:
187 pass
188
189 for roster_jid, roster_item in roster.items():
190 if roster_item.subscriptionFrom:
191 # we need to know who is subscribed to our user, to send them
192 # notifications when they send presence to us
193 self.presence_map.setdefault(roster_jid, set()).add(owner_jid)
194 if ((roster_item.subscriptionTo
195 and jid.JID(roster_jid.host) == self.backend.server_jid)):
196 # we also need to know who on this server we are subscribed to, so
197 # we can get their notifications even if they didn't connect so far.
198 self.presence_map.setdefault(owner_jid, set()).add(roster_jid)
199
200 def serialiseRoster(
201 self,
202 roster: Roster,
203 version: Optional[str] = None
204 ) -> domish.Element:
205 """Reconstruct Query element of the roster"""
206 roster_elt = domish.Element((ROSTER_NS, "query"))
207 if version:
208 roster_elt["ver"] = version
209 for item in roster.values():
210 roster_elt.addChild(item.toElement())
211 return roster_elt
212
213 async def updateRosterCache(
214 self,
215 owner_jid: jid.JID,
216 roster: Roster,
217 version: str
218 ) -> None:
219 """Update local roster cache and database"""
220 now = time.time()
221 self.roster_cache[owner_jid] = {
222 'timestamp': now,
223 'roster': roster,
224 'version': version
225 }
226 roster_elt = self.serialiseRoster(roster, version)
227 await self.backend.storage.setRosterCache(
228 owner_jid, version, now, roster_elt
229 )
230
231 def getRosterFromElement(self, query_elt: domish.Element) -> Roster:
232 """Parse roster query result payload to get a Roster dict"""
233 roster = {}
234 for element in query_elt.elements(ROSTER_NS, 'item'):
235 item = xmppim.RosterItem.fromElement(element)
236 roster[item.entity] = item
237 return roster
238
239 async def getRoster(self, to_jid: jid.JID) -> Roster:
240 """Retrieve contact list.
241
242 @param to_jid: jid of the entity owning the roster
243 @return: roster data
244 """
139 if self._permissions[PERM_ROSTER] not in ('get', 'both'): 245 if self._permissions[PERM_ROSTER] not in ('get', 'both'):
140 log.msg("WARNING: permission not allowed to get roster") 246 log.msg("WARNING: permission not allowed to get roster")
141 raise failure.Failure(NotAllowedError('roster get is not allowed')) 247 raise failure.Failure(NotAllowedError('roster get is not allowed'))
142 248
143 def processRoster(result):
144 roster = {}
145 for element in result.query.elements(ROSTER_NS, 'item'):
146 item = xmppim.RosterItem.fromElement(element)
147 roster[item.entity] = item
148
149 return roster
150
151 iq = IQ(self.xmlstream, 'get') 249 iq = IQ(self.xmlstream, 'get')
152 iq.addElement((ROSTER_NS, 'query')) 250 iq.addElement((ROSTER_NS, 'query'))
153 iq["to"] = to_jid.userhost() 251 iq["to"] = to_jid.userhost()
154 d = iq.send() 252 iq_result = await iq.send()
155 d.addCallback(processRoster) 253 roster = self.getRosterFromElement(iq_result.query)
156 return d 254
157 255 version = iq_result.query.getAttribute('ver')
158 def _isSubscribedFrom(self, roster, entity, roster_owner_jid): 256 cached_roster = self.roster_cache.get("to_jid")
257 if not cached_roster:
258 self.updatePresenceMap(to_jid, roster, None)
259 await self.updateRosterCache(to_jid, roster, version)
260 else:
261 # we already have a roster in cache, we have to check it if the new one is
262 # modified, and update presence_map and database
263 if version:
264 if cached_roster["version"] != version:
265 self.updatePresenceMap(to_jid, roster, cached_roster["roster"])
266 await self.updateRosterCache(to_jid, roster, version)
267 else:
268 cached_roster["timestamp"] = time.time()
269 else:
270 # no version available, we have to compare the whole XML
271 if ((self.serialiseRoster(cached_roster["roster"]).toXml() !=
272 self.serialiseRoster(roster))):
273 self.updatePresenceMap(to_jid, roster, cached_roster["roster"])
274 await self.updateRosterCache(to_jid, roster, version)
275 else:
276 cached_roster["timestamp"] = time.time()
277
278 return roster
279
280 async def isSubscribedFrom(self, entity: jid.JID, roster_owner_jid: jid.JID) -> bool:
281 """Check if entity has presence subscription from roster_owner_jid
282
283 @param entity: entity to check subscription to
284 @param roster_owner_jid: owner of the roster to check
285 @return: True if entity has a subscription from roster_owner_jid
286 """
287 roster = await self.getRoster(roster_owner_jid)
159 try: 288 try:
160 return roster[entity.userhostJID()].subscriptionFrom 289 return roster[entity.userhostJID()].subscriptionFrom
161 except KeyError: 290 except KeyError:
162 return False 291 return False
163
164 def isSubscribedFrom(self, entity, roster_owner_jid):
165 """Check if entity has presence subscription from roster_owner_jid
166
167 @param entity(jid.JID): entity to check subscription to
168 @param roster_owner_jid(jid.JID): owner of the roster to check
169 @return D(bool): True if entity has a subscription from roster_owner_jid
170 """
171 d = self.getRoster(roster_owner_jid)
172 d.addCallback(self._isSubscribedFrom, entity, roster_owner_jid)
173 return d
174 292
175 ## message ## 293 ## message ##
176 294
177 def sendMessage(self, priv_message, to_jid=None): 295 def sendMessage(self, priv_message, to_jid=None):
178 """Send privileged message (in the name of the server) 296 """Send privileged message (in the name of the server)
255 373
256 async def onPresence(self, presence_elt: domish.Element) -> None: 374 async def onPresence(self, presence_elt: domish.Element) -> None:
257 from_jid = jid.JID(presence_elt['from']) 375 from_jid = jid.JID(presence_elt['from'])
258 from_jid_bare = from_jid.userhostJID() 376 from_jid_bare = from_jid.userhostJID()
259 if ((jid.JID(from_jid.host) == self.backend.server_jid 377 if ((jid.JID(from_jid.host) == self.backend.server_jid
260 and from_jid_bare not in self.roster_cache)): 378 and (
261 roster = await self.getRoster(from_jid_bare) 379 from_jid_bare not in self.roster_cache
262 timestamp = time.time() 380 or time.time()-self.roster_cache[from_jid_bare]["timestamp"]>ROSTER_TTL
263 self.roster_cache[from_jid_bare] = {'timestamp': timestamp, 381 ))):
264 'roster': roster, 382 roster = await self.getRoster(from_jid)
265 }
266 for roster_jid, roster_item in roster.items():
267 if roster_item.subscriptionFrom:
268 # we need to know who is subscribed to our user, to send them
269 # notifications when they send presence to us
270 self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare)
271 if ((roster_item.subscriptionTo
272 and jid.JID(roster_jid.host) == self.backend.server_jid)):
273 # we also need to know who on this server we are subscribed to, so
274 # we can get their notifications even if they didn't connect so far.
275 self.presence_map.setdefault(from_jid_bare, set()).add(roster_jid)
276 383
277 presence_type = presence_elt.getAttribute('type') 384 presence_type = presence_elt.getAttribute('type')
278 if presence_type == "unavailable": 385 if presence_type == "unavailable":
279 self.presences.discard(from_jid) 386 self.presences.discard(from_jid)
280 elif from_jid not in self.presences: 387 elif from_jid not in self.presences: