comparison sat/memory/memory.py @ 3777:001ea5f4a2f9

core: method to know if a profile/entity is an admin: the new `is_admin` SatXMPPEntity property tell if it's an administrator. Admin JIDs are retrieve on init, so they can be looked after when profle is not available (notably when a component handle a request and has only a JID available). The new `memory.isAdminJID` method is then used.
author Goffi <goffi@goffi.org>
date Sun, 15 May 2022 14:14:52 +0200
parents 71516731d0aa
children 580df5b557be
comparison
equal deleted inserted replaced
3776:4d333f249625 3777:001ea5f4a2f9
237 self.subscriptions = {} 237 self.subscriptions = {}
238 self.auth_sessions = PasswordSessions() # remember the authenticated profiles 238 self.auth_sessions = PasswordSessions() # remember the authenticated profiles
239 self.disco = Discovery(host) 239 self.disco = Discovery(host)
240 self.config = tools_config.parseMainConf(log_filenames=True) 240 self.config = tools_config.parseMainConf(log_filenames=True)
241 self._cache_path = Path(self.getConfig("", "local_dir"), C.CACHE_DIR) 241 self._cache_path = Path(self.getConfig("", "local_dir"), C.CACHE_DIR)
242 self.admins = self.getConfig("", "admins_list", [])
243 self.admin_jids = set()
244
242 245
243 async def initialise(self): 246 async def initialise(self):
244 self.storage = Storage() 247 self.storage = Storage()
245 await self.storage.initialise() 248 await self.storage.initialise()
246 PersistentDict.storage = self.storage 249 PersistentDict.storage = self.storage
249 self.params.load_default_params() 252 self.params.load_default_params()
250 await self.load() 253 await self.load()
251 self.memory_data = PersistentDict("memory") 254 self.memory_data = PersistentDict("memory")
252 await self.memory_data.load() 255 await self.memory_data.load()
253 await self.disco.load() 256 await self.disco.load()
257 for admin in self.admins:
258 try:
259 admin_jid_s = await self.asyncGetParamA(
260 "JabberID", "Connection", profile_key=admin
261 )
262 except Exception as e:
263 log.warning(f"Can't retrieve jid of admin {admin!r}: {e}")
264 else:
265 if admin_jid_s is not None:
266 try:
267 admin_jid = jid.JID(admin_jid_s).userhostJID()
268 except RuntimeError:
269 log.warning(f"Invalid JID for admin {admin}: {admin_jid_s}")
270 else:
271 self.admin_jids.add(admin_jid)
254 272
255 273
256 ## Configuration ## 274 ## Configuration ##
257 275
258 def getConfig(self, section, name, default=None): 276 def getConfig(self, section, name, default=None):
1847 presence_data = self.getEntityDatum(client, entity_jid, "presence") 1865 presence_data = self.getEntityDatum(client, entity_jid, "presence")
1848 except KeyError: 1866 except KeyError:
1849 log.debug("No presence information for {}".format(entity_jid)) 1867 log.debug("No presence information for {}".format(entity_jid))
1850 return False 1868 return False
1851 return presence_data.show != C.PRESENCE_UNAVAILABLE 1869 return presence_data.show != C.PRESENCE_UNAVAILABLE
1870
1871 def isAdmin(self, profile: str) -> bool:
1872 """Tell if given profile has administrator privileges"""
1873 return profile in self.admins
1874
1875 def isAdminJID(self, entity: jid.JID) -> bool:
1876 """Tells if an entity jid correspond to an admin one
1877
1878 It is sometime not possible to use the profile alone to check if an entity is an
1879 admin (e.g. a request managed by a component). In this case we check if the JID
1880 correspond to an admin profile
1881 """
1882 return entity.userhostJID() in self.admin_jids