# HG changeset patch # User Goffi # Date 1652616892 -7200 # Node ID 001ea5f4a2f99529625d73e2d9ea42deec09c197 # Parent 4d333f24962579d5ecbb48f9e68af0bde1037d93 core: method to know if a profile/entity is an admin: the new `is_admin` SatXMPPEntity property tell if it's an administrator. Admin JIDs are retrieve on init, so they can be looked after when profle is not available (notably when a component handle a request and has only a JID available). The new `memory.isAdminJID` method is then used. diff -r 4d333f249625 -r 001ea5f4a2f9 sat/core/sat_main.py --- a/sat/core/sat_main.py Sat May 14 23:00:35 2022 +0200 +++ b/sat/core/sat_main.py Sun May 15 14:14:52 2022 +0200 @@ -939,6 +939,10 @@ return False return self.profiles[profile].isConnected() + def isAdmin(self, client: xmpp.SatXMPPEntity) -> bool: + """Tells if a client is an administrator with extra privileges""" + return client.profile in self.memory.admins + ## Encryption ## def registerEncryptionPlugin(self, *args, **kwargs): diff -r 4d333f249625 -r 001ea5f4a2f9 sat/core/xmpp.py --- a/sat/core/xmpp.py Sat May 14 23:00:35 2022 +0200 +++ b/sat/core/xmpp.py Sun May 15 14:14:52 2022 +0200 @@ -600,6 +600,11 @@ post_xml_treatments.callback(data) return data + @property + def is_admin(self) -> bool: + """True if a client is an administrator with extra privileges""" + return self.host_app.memory.isAdmin(self.profile) + def addPostXmlCallbacks(self, post_xml_treatments): """Used to add class level callbacks at the end of the workflow @@ -1009,6 +1014,10 @@ # FIXME: not the best way to get server jid, maybe use config option? return jid.JID(self.jid.host.split(".", 1)[-1]) + @property + def is_admin(self) -> bool: + return False + def _buildDependencies(self, current, plugins, required=True): """build recursively dependencies needed for a plugin diff -r 4d333f249625 -r 001ea5f4a2f9 sat/memory/memory.py --- a/sat/memory/memory.py Sat May 14 23:00:35 2022 +0200 +++ b/sat/memory/memory.py Sun May 15 14:14:52 2022 +0200 @@ -239,6 +239,9 @@ self.disco = Discovery(host) self.config = tools_config.parseMainConf(log_filenames=True) self._cache_path = Path(self.getConfig("", "local_dir"), C.CACHE_DIR) + self.admins = self.getConfig("", "admins_list", []) + self.admin_jids = set() + async def initialise(self): self.storage = Storage() @@ -251,6 +254,21 @@ self.memory_data = PersistentDict("memory") await self.memory_data.load() await self.disco.load() + for admin in self.admins: + try: + admin_jid_s = await self.asyncGetParamA( + "JabberID", "Connection", profile_key=admin + ) + except Exception as e: + log.warning(f"Can't retrieve jid of admin {admin!r}: {e}") + else: + if admin_jid_s is not None: + try: + admin_jid = jid.JID(admin_jid_s).userhostJID() + except RuntimeError: + log.warning(f"Invalid JID for admin {admin}: {admin_jid_s}") + else: + self.admin_jids.add(admin_jid) ## Configuration ## @@ -1849,3 +1867,16 @@ log.debug("No presence information for {}".format(entity_jid)) return False return presence_data.show != C.PRESENCE_UNAVAILABLE + + def isAdmin(self, profile: str) -> bool: + """Tell if given profile has administrator privileges""" + return profile in self.admins + + def isAdminJID(self, entity: jid.JID) -> bool: + """Tells if an entity jid correspond to an admin one + + It is sometime not possible to use the profile alone to check if an entity is an + admin (e.g. a request managed by a component). In this case we check if the JID + correspond to an admin profile + """ + return entity.userhostJID() in self.admin_jids