Mercurial > libervia-backend
comparison sat/memory/memory.py @ 3777:001ea5f4a2f9
core: method to know if a profile/entity is an admin:
the new `is_admin` SatXMPPEntity property tell if it's an administrator.
Admin JIDs are retrieve on init, so they can be looked after when profle is not available
(notably when a component handle a request and has only a JID available). The new
`memory.isAdminJID` method is then used.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 15 May 2022 14:14:52 +0200 |
parents | 71516731d0aa |
children | 580df5b557be |
comparison
equal
deleted
inserted
replaced
3776:4d333f249625 | 3777:001ea5f4a2f9 |
---|---|
237 self.subscriptions = {} | 237 self.subscriptions = {} |
238 self.auth_sessions = PasswordSessions() # remember the authenticated profiles | 238 self.auth_sessions = PasswordSessions() # remember the authenticated profiles |
239 self.disco = Discovery(host) | 239 self.disco = Discovery(host) |
240 self.config = tools_config.parseMainConf(log_filenames=True) | 240 self.config = tools_config.parseMainConf(log_filenames=True) |
241 self._cache_path = Path(self.getConfig("", "local_dir"), C.CACHE_DIR) | 241 self._cache_path = Path(self.getConfig("", "local_dir"), C.CACHE_DIR) |
242 self.admins = self.getConfig("", "admins_list", []) | |
243 self.admin_jids = set() | |
244 | |
242 | 245 |
243 async def initialise(self): | 246 async def initialise(self): |
244 self.storage = Storage() | 247 self.storage = Storage() |
245 await self.storage.initialise() | 248 await self.storage.initialise() |
246 PersistentDict.storage = self.storage | 249 PersistentDict.storage = self.storage |
249 self.params.load_default_params() | 252 self.params.load_default_params() |
250 await self.load() | 253 await self.load() |
251 self.memory_data = PersistentDict("memory") | 254 self.memory_data = PersistentDict("memory") |
252 await self.memory_data.load() | 255 await self.memory_data.load() |
253 await self.disco.load() | 256 await self.disco.load() |
257 for admin in self.admins: | |
258 try: | |
259 admin_jid_s = await self.asyncGetParamA( | |
260 "JabberID", "Connection", profile_key=admin | |
261 ) | |
262 except Exception as e: | |
263 log.warning(f"Can't retrieve jid of admin {admin!r}: {e}") | |
264 else: | |
265 if admin_jid_s is not None: | |
266 try: | |
267 admin_jid = jid.JID(admin_jid_s).userhostJID() | |
268 except RuntimeError: | |
269 log.warning(f"Invalid JID for admin {admin}: {admin_jid_s}") | |
270 else: | |
271 self.admin_jids.add(admin_jid) | |
254 | 272 |
255 | 273 |
256 ## Configuration ## | 274 ## Configuration ## |
257 | 275 |
258 def getConfig(self, section, name, default=None): | 276 def getConfig(self, section, name, default=None): |
1847 presence_data = self.getEntityDatum(client, entity_jid, "presence") | 1865 presence_data = self.getEntityDatum(client, entity_jid, "presence") |
1848 except KeyError: | 1866 except KeyError: |
1849 log.debug("No presence information for {}".format(entity_jid)) | 1867 log.debug("No presence information for {}".format(entity_jid)) |
1850 return False | 1868 return False |
1851 return presence_data.show != C.PRESENCE_UNAVAILABLE | 1869 return presence_data.show != C.PRESENCE_UNAVAILABLE |
1870 | |
1871 def isAdmin(self, profile: str) -> bool: | |
1872 """Tell if given profile has administrator privileges""" | |
1873 return profile in self.admins | |
1874 | |
1875 def isAdminJID(self, entity: jid.JID) -> bool: | |
1876 """Tells if an entity jid correspond to an admin one | |
1877 | |
1878 It is sometime not possible to use the profile alone to check if an entity is an | |
1879 admin (e.g. a request managed by a component). In this case we check if the JID | |
1880 correspond to an admin profile | |
1881 """ | |
1882 return entity.userhostJID() in self.admin_jids |