Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_comp_file_sharing_management.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 4b842c1fb686 |
children | e9971a4b0627 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_comp_file_sharing_management.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_comp_file_sharing_management.py Wed Jun 19 18:44:57 2024 +0200 @@ -85,22 +85,30 @@ def profile_connected(self, client): self._c.add_ad_hoc_command( - client, self._on_change_file, "Change Permissions of File(s)", + client, + self._on_change_file, + "Change Permissions of File(s)", node=NS_FILE_MANAGEMENT_PERM, allowed_magics=C.ENTITY_ALL, ) self._c.add_ad_hoc_command( - client, self._on_delete_file, "Delete File(s)", + client, + self._on_delete_file, + "Delete File(s)", node=NS_FILE_MANAGEMENT_DELETE, allowed_magics=C.ENTITY_ALL, ) self._c.add_ad_hoc_command( - client, self._on_gen_thumbnails, "Generate Thumbnails", + client, + self._on_gen_thumbnails, + "Generate Thumbnails", node=NS_FILE_MANAGEMENT_THUMB, allowed_magics=C.ENTITY_ALL, ) self._c.add_ad_hoc_command( - client, self._on_quota, "Get Quota", + client, + self._on_quota, + "Get Quota", node=NS_FILE_MANAGEMENT_QUOTA, allowed_magics=C.ENTITY_ALL, ) @@ -108,12 +116,14 @@ def _delete(self, service_jid_s, path, namespace, profile): client = self.host.get_client(profile) service_jid = jid.JID(service_jid_s) if service_jid_s else None - return defer.ensureDeferred(self._c.sequence( - client, - [{"path": path, "namespace": namespace}, {"confirm": True}], - NS_FILE_MANAGEMENT_DELETE, - service_jid, - )) + return defer.ensureDeferred( + self._c.sequence( + client, + [{"path": path, "namespace": namespace}, {"confirm": True}], + NS_FILE_MANAGEMENT_DELETE, + service_jid, + ) + ) def _err(self, reason): """Helper method to get argument to return for error @@ -133,17 +143,14 @@ @return (tuple): arguments to use in defer.returnValue """ status = self._c.STATUS.EXECUTING - form = data_form.Form("form", title="File Management", - formNamespace=NS_FILE_MANAGEMENT) + form = data_form.Form( + "form", title="File Management", formNamespace=NS_FILE_MANAGEMENT + ) - field = data_form.Field( - "text-single", "path", required=True - ) + field = data_form.Field("text-single", "path", required=True) form.addField(field) - field = data_form.Field( - "text-single", "namespace", required=False - ) + field = data_form.Field("text-single", "namespace", required=False) form.addField(field) payload = form.toElement() @@ -159,17 +166,17 @@ """ fields = command_form.fields try: - path = fields['path'].value.strip() - namespace = fields['namespace'].value or None + path = fields["path"].value.strip() + namespace = fields["namespace"].value or None except KeyError: self._c.ad_hoc_error(self._c.ERROR.BAD_PAYLOAD) if not path: self._c.ad_hoc_error(self._c.ERROR.BAD_PAYLOAD) - requestor = session_data['requestor'] + requestor = session_data["requestor"] requestor_bare = requestor.userhostJID() - path = path.rstrip('/') + path = path.rstrip("/") parent_path, basename = os.path.split(path) # TODO: if parent_path and basename are empty, we ask for root directory @@ -177,35 +184,37 @@ try: found_files = await self.host.memory.get_files( - client, requestor_bare, path=parent_path, name=basename, - namespace=namespace) + client, + requestor_bare, + path=parent_path, + name=basename, + namespace=namespace, + ) found_file = found_files[0] except (exceptions.NotFound, IndexError): raise WorkflowError(self._err(_("file not found"))) except exceptions.PermissionError: raise WorkflowError(self._err(_("forbidden"))) - if found_file['owner'] != requestor_bare: + if found_file["owner"] != requestor_bare: # only owner can manage files log.warning(_("Only owner can manage files")) raise WorkflowError(self._err(_("forbidden"))) - session_data['found_file'] = found_file - session_data['namespace'] = namespace + session_data["found_file"] = found_file + session_data["namespace"] = namespace return found_file def _update_read_permission(self, access, allowed_jids): if not allowed_jids: if C.ACCESS_PERM_READ in access: del access[C.ACCESS_PERM_READ] - elif allowed_jids == 'PUBLIC': - access[C.ACCESS_PERM_READ] = { - "type": C.ACCESS_TYPE_PUBLIC - } + elif allowed_jids == "PUBLIC": + access[C.ACCESS_PERM_READ] = {"type": C.ACCESS_TYPE_PUBLIC} else: access[C.ACCESS_PERM_READ] = { "type": C.ACCESS_TYPE_WHITELIST, - "jids": [j.full() for j in allowed_jids] + "jids": [j.full() for j in allowed_jids], } async def _update_dir(self, client, requestor, namespace, file_data, allowed_jids): @@ -214,19 +223,25 @@ @param file_data(dict): metadata of the file @param allowed_jids(list[jid.JID]): list of entities allowed to read the file """ - assert file_data['type'] == C.FILE_TYPE_DIRECTORY + assert file_data["type"] == C.FILE_TYPE_DIRECTORY files_data = await self.host.memory.get_files( - client, requestor, parent=file_data['id'], namespace=namespace) + client, requestor, parent=file_data["id"], namespace=namespace + ) for file_data in files_data: - if not file_data['access'].get(C.ACCESS_PERM_READ, {}): - log.debug("setting {perm} read permission for {name}".format( - perm=allowed_jids, name=file_data['name'])) + if not file_data["access"].get(C.ACCESS_PERM_READ, {}): + log.debug( + "setting {perm} read permission for {name}".format( + perm=allowed_jids, name=file_data["name"] + ) + ) await self.host.memory.file_update( - file_data['id'], 'access', - partial(self._update_read_permission, allowed_jids=allowed_jids)) - if file_data['type'] == C.FILE_TYPE_DIRECTORY: - await self._update_dir(client, requestor, namespace, file_data, 'PUBLIC') + file_data["id"], + "access", + partial(self._update_read_permission, allowed_jids=allowed_jids), + ) + if file_data["type"] == C.FILE_TYPE_DIRECTORY: + await self._update_dir(client, requestor, namespace, file_data, "PUBLIC") async def _on_change_file(self, client, command_elt, session_data, action, node): try: @@ -235,8 +250,8 @@ except StopIteration: command_form = None - found_file = session_data.get('found_file') - requestor = session_data['requestor'] + found_file = session_data.get("found_file") + requestor = session_data["requestor"] requestor_bare = requestor.userhostJID() if command_form is None or len(command_form.fields) == 0: @@ -251,31 +266,39 @@ return e.err_args # management request - if found_file['type'] == C.FILE_TYPE_DIRECTORY: + if found_file["type"] == C.FILE_TYPE_DIRECTORY: instructions = D_("Please select permissions for this directory") else: instructions = D_("Please select permissions for this file") - form = data_form.Form("form", title="File Management", - instructions=[instructions], - formNamespace=NS_FILE_MANAGEMENT) + form = data_form.Form( + "form", + title="File Management", + instructions=[instructions], + formNamespace=NS_FILE_MANAGEMENT, + ) field = data_form.Field( - "text-multi", "read_allowed", required=False, - desc='list of jids allowed to read this file (beside yourself), or ' - '"PUBLIC" to let a public access' + "text-multi", + "read_allowed", + required=False, + desc="list of jids allowed to read this file (beside yourself), or " + '"PUBLIC" to let a public access', ) read_access = found_file["access"].get(C.ACCESS_PERM_READ, {}) - access_type = read_access.get('type', C.ACCESS_TYPE_WHITELIST) + access_type = read_access.get("type", C.ACCESS_TYPE_WHITELIST) if access_type == C.ACCESS_TYPE_PUBLIC: - field.values = ['PUBLIC'] + field.values = ["PUBLIC"] else: - field.values = read_access.get('jids', []) + field.values = read_access.get("jids", []) form.addField(field) - if found_file['type'] == C.FILE_TYPE_DIRECTORY: + if found_file["type"] == C.FILE_TYPE_DIRECTORY: field = data_form.Field( - "boolean", "recursive", value=False, required=False, + "boolean", + "recursive", + value=False, + required=False, desc="Files under it will be made public to follow this dir " - "permission (only if they don't have already a permission set)." + "permission (only if they don't have already a permission set).", ) form.addField(field) @@ -286,42 +309,49 @@ else: # final phase, we'll do permission change here try: - read_allowed = command_form.fields['read_allowed'] + read_allowed = command_form.fields["read_allowed"] except KeyError: self._c.ad_hoc_error(self._c.ERROR.BAD_PAYLOAD) - if read_allowed.value == 'PUBLIC': - allowed_jids = 'PUBLIC' - elif read_allowed.value.strip() == '': + if read_allowed.value == "PUBLIC": + allowed_jids = "PUBLIC" + elif read_allowed.value.strip() == "": allowed_jids = None else: try: - allowed_jids = [jid.JID(v.strip()) for v in read_allowed.values - if v.strip()] + allowed_jids = [ + jid.JID(v.strip()) for v in read_allowed.values if v.strip() + ] except RuntimeError as e: - log.warning(_("Can't use read_allowed values: {reason}").format( - reason=e)) + log.warning( + _("Can't use read_allowed values: {reason}").format(reason=e) + ) self._c.ad_hoc_error(self._c.ERROR.BAD_PAYLOAD) - if found_file['type'] == C.FILE_TYPE_FILE: + if found_file["type"] == C.FILE_TYPE_FILE: await self.host.memory.file_update( - found_file['id'], 'access', - partial(self._update_read_permission, allowed_jids=allowed_jids)) + found_file["id"], + "access", + partial(self._update_read_permission, allowed_jids=allowed_jids), + ) else: try: - recursive = command_form.fields['recursive'] + recursive = command_form.fields["recursive"] except KeyError: self._c.ad_hoc_error(self._c.ERROR.BAD_PAYLOAD) await self.host.memory.file_update( - found_file['id'], 'access', - partial(self._update_read_permission, allowed_jids=allowed_jids)) + found_file["id"], + "access", + partial(self._update_read_permission, allowed_jids=allowed_jids), + ) if recursive: # we set all file under the directory as public (if they haven't # already a permission set), so allowed entities of root directory # can read them. - namespace = session_data['namespace'] + namespace = session_data["namespace"] await self._update_dir( - client, requestor_bare, namespace, found_file, 'PUBLIC') + client, requestor_bare, namespace, found_file, "PUBLIC" + ) # job done, we can end the session status = self._c.STATUS.COMPLETED @@ -336,8 +366,8 @@ except StopIteration: command_form = None - found_file = session_data.get('found_file') - requestor = session_data['requestor'] + found_file = session_data.get("found_file") + requestor = session_data["requestor"] requestor_bare = requestor.userhostJID() if command_form is None or len(command_form.fields) == 0: @@ -350,18 +380,27 @@ found_file = await self._get_file_data(client, session_data, command_form) except WorkflowError as e: return e.err_args - if found_file['type'] == C.FILE_TYPE_DIRECTORY: - msg = D_("Are you sure to delete directory {name} and all files and " - "directories under it?").format(name=found_file['name']) + if found_file["type"] == C.FILE_TYPE_DIRECTORY: + msg = D_( + "Are you sure to delete directory {name} and all files and " + "directories under it?" + ).format(name=found_file["name"]) else: - msg = D_("Are you sure to delete file {name}?" - .format(name=found_file['name'])) - form = data_form.Form("form", title="File Management", - instructions = [msg], - formNamespace=NS_FILE_MANAGEMENT) + msg = D_( + "Are you sure to delete file {name}?".format(name=found_file["name"]) + ) + form = data_form.Form( + "form", + title="File Management", + instructions=[msg], + formNamespace=NS_FILE_MANAGEMENT, + ) field = data_form.Field( - "boolean", "confirm", value=False, required=True, - desc="check this box to confirm" + "boolean", + "confirm", + value=False, + required=True, + desc="check this box to confirm", ) form.addField(field) status = self._c.STATUS.EXECUTING @@ -371,15 +410,16 @@ else: # final phase, we'll do deletion here try: - confirmed = C.bool(command_form.fields['confirm'].value) + confirmed = C.bool(command_form.fields["confirm"].value) except KeyError: self._c.ad_hoc_error(self._c.ERROR.BAD_PAYLOAD) if not confirmed: note = None else: - recursive = found_file['type'] == C.FILE_TYPE_DIRECTORY + recursive = found_file["type"] == C.FILE_TYPE_DIRECTORY await self.host.memory.file_delete( - client, requestor_bare, found_file['id'], recursive) + client, requestor_bare, found_file["id"], recursive + ) note = (self._c.NOTE.INFO, _("file deleted")) status = self._c.STATUS.COMPLETED payload = None @@ -393,16 +433,17 @@ @param file_data(dict): metadata of the file """ - if file_data['type'] == C.FILE_TYPE_DIRECTORY: + if file_data["type"] == C.FILE_TYPE_DIRECTORY: sub_files_data = await self.host.memory.get_files( - client, requestor, parent=file_data['id'], namespace=namespace) + client, requestor, parent=file_data["id"], namespace=namespace + ) for sub_file_data in sub_files_data: await self._gen_thumbs(client, requestor, namespace, sub_file_data) - elif file_data['type'] == C.FILE_TYPE_FILE: - media_type = file_data['media_type'] - file_path = os.path.join(self.files_path, file_data['file_hash']) - if media_type == 'image': + elif file_data["type"] == C.FILE_TYPE_FILE: + media_type = file_data["media_type"] + file_path = os.path.join(self.files_path, file_data["file_hash"]) + if media_type == "image": thumbnails = [] for max_thumb_size in self._t.SIZES: @@ -414,20 +455,26 @@ 60 * 60 * 24 * 31 * 6, ) except Exception as e: - log.warning(_("Can't create thumbnail: {reason}") - .format(reason=e)) + log.warning( + _("Can't create thumbnail: {reason}").format(reason=e) + ) break thumbnails.append({"id": thumb_id, "size": thumb_size}) await self.host.memory.file_update( - file_data['id'], 'extra', - partial(self._update_thumbs, thumbnails=thumbnails)) + file_data["id"], + "extra", + partial(self._update_thumbs, thumbnails=thumbnails), + ) - log.info("thumbnails for [{file_name}] generated" - .format(file_name=file_data['name'])) + log.info( + "thumbnails for [{file_name}] generated".format( + file_name=file_data["name"] + ) + ) else: - log.warning("unmanaged file type: {type_}".format(type_=file_data['type'])) + log.warning("unmanaged file type: {type_}".format(type_=file_data["type"])) async def _on_gen_thumbnails(self, client, command_elt, session_data, action, node): try: @@ -436,8 +483,8 @@ except StopIteration: command_form = None - found_file = session_data.get('found_file') - requestor = session_data['requestor'] + found_file = session_data.get("found_file") + requestor = session_data["requestor"] if command_form is None or len(command_form.fields) == 0: # root request @@ -451,7 +498,7 @@ return e.err_args log.info("Generating thumbnails as requested") - await self._gen_thumbs(client, requestor, found_file['namespace'], found_file) + await self._gen_thumbs(client, requestor, found_file["namespace"], found_file) # job done, we can end the session status = self._c.STATUS.COMPLETED @@ -460,7 +507,7 @@ return (payload, status, None, note) async def _on_quota(self, client, command_elt, session_data, action, node): - requestor = session_data['requestor'] + requestor = session_data["requestor"] quota = self.host.plugins["file_sharing"].get_quota(client, requestor) try: size_used = await self.host.memory.file_get_used_space(client, requestor) @@ -473,11 +520,10 @@ note = ( self._c.NOTE.INFO, _("You are currently using {size_used} on {size_quota}").format( - size_used = utils.get_human_size(size_used), - size_quota = ( - _("unlimited quota") if quota is None - else utils.get_human_size(quota) - ) - ) + size_used=utils.get_human_size(size_used), + size_quota=( + _("unlimited quota") if quota is None else utils.get_human_size(quota) + ), + ), ) return (payload, status, None, note)