changeset 3318:460606155bec

core (memory): `getFileAffiliations` and `setFileAffiliations` implementations: those methods allow to change files permissions using affiliations similar to the ones used with PubSub.
author Goffi <goffi@goffi.org>
date Sat, 01 Aug 2020 16:06:12 +0200
parents 83f25da66bec
children 3a15e76a694e
files sat/memory/memory.py
diffstat 1 files changed, 100 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/sat/memory/memory.py	Sat Aug 01 16:02:41 2020 +0200
+++ b/sat/memory/memory.py	Sat Aug 01 16:06:12 2020 +0200
@@ -22,7 +22,8 @@
 import shortuuid
 import mimetypes
 import time
-from typing import Optional, Tuple
+from functools import partial
+from typing import Optional, Tuple, Dict
 from pathlib import Path
 from uuid import uuid4
 from collections import namedtuple
@@ -1310,6 +1311,104 @@
                 parent = directory["id"]
         defer.returnValue((parent, []))
 
+    def getFileAffiliations(self, file_data: dict) -> Dict[jid.JID, str]:
+        """Convert file access to pubsub like affiliations"""
+        affiliations = {}
+        access_data = file_data['access']
+
+        read_data = access_data.get(C.ACCESS_PERM_READ, {})
+        if read_data.get('type') == C.ACCESS_TYPE_WHITELIST:
+            for entity_jid_s in read_data['jids']:
+                entity_jid = jid.JID(entity_jid_s)
+                affiliations[entity_jid] = 'member'
+
+        write_data = access_data.get(C.ACCESS_PERM_WRITE, {})
+        if write_data.get('type') == C.ACCESS_TYPE_WHITELIST:
+            for entity_jid_s in write_data['jids']:
+                entity_jid = jid.JID(entity_jid_s)
+                affiliations[entity_jid] = 'publisher'
+
+        owner = file_data.get('owner')
+        if owner:
+            affiliations[owner] = 'owner'
+
+        return affiliations
+
+    def _setFileAffiliationsUpdate(
+        self,
+        access: dict,
+        file_data: dict,
+        affiliations: Dict[jid.JID, str]
+    ) -> None:
+        read_data = access.setdefault(C.ACCESS_PERM_READ, {})
+        if read_data.get('type') != C.ACCESS_TYPE_WHITELIST:
+            read_data['type'] == C.ACCESS_TYPE_WHITELIST
+            if 'jids' not in read_data:
+                read_data['jids'] = []
+        read_whitelist = read_data['jids']
+        write_data = access.setdefault(C.ACCESS_PERM_WRITE, {})
+        if write_data.get('type') != C.ACCESS_TYPE_WHITELIST:
+            write_data['type'] == C.ACCESS_TYPE_WHITELIST
+            if 'jids' not in write_data:
+                write_data['jids'] = []
+        write_whitelist = write_data['jids']
+        for entity_jid, affiliation in affiliations.items():
+            entity_jid_s = entity_jid.full()
+            if affiliation == "none":
+                try:
+                    read_whitelist.remove(entity_jid_s)
+                except ValueError:
+                    log.warning(
+                        "removing affiliation from an entity without read permission: "
+                        f"{entity_jid}"
+                    )
+                try:
+                    write_whitelist.remove(entity_jid_s)
+                except ValueError:
+                    pass
+            elif affiliation == "publisher":
+                if entity_jid_s not in read_whitelist:
+                    read_whitelist.append(entity_jid_s)
+                if entity_jid_s not in write_whitelist:
+                    write_whitelist.append(entity_jid_s)
+            elif affiliation == "member":
+                if entity_jid_s not in read_whitelist:
+                    read_whitelist.append(entity_jid_s)
+                try:
+                    write_whitelist.remove(entity_jid_s)
+                except ValueError:
+                    pass
+            elif affiliation == "owner":
+                raise NotImplementedError('"owner" affiliation can\'t be set')
+            else:
+                raise ValueError(f"unknown affiliation: {affiliation!r}")
+
+    async def setFileAffiliations(
+        self,
+        client,
+        file_data: dict,
+        affiliations: Dict[jid.JID, str]
+    ) -> None:
+        """Apply pubsub like affiliation to file_data
+
+        Affiliations are converted to access types, then set in a whitelist.
+        Affiliation are mapped as follow:
+            - "owner" can't be set (for now)
+            - "publisher" gives read and write permissions
+            - "member" gives read permission only
+            - "none" removes both read and write permissions
+        """
+        file_id = file_data['id']
+        await self.fileUpdate(
+            file_id,
+            'access',
+            update_cb=partial(
+                self._setFileAffiliationsUpdate,
+                file_data=file_data,
+                affiliations=affiliations
+            ),
+        )
+
     @defer.inlineCallbacks
     def getFiles(
         self, client, peer_jid, file_id=None, version=None, parent=None, path=None,