Mercurial > libervia-backend
comparison sat/memory/memory.py @ 3318:460606155bec
core (memory): `getFileAffiliations` and `setFileAffiliations` implementations:
those methods allow to change files permissions using affiliations similar to the ones
used with PubSub.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 01 Aug 2020 16:06:12 +0200 |
parents | 83f25da66bec |
children | 3a15e76a694e |
comparison
equal
deleted
inserted
replaced
3317:83f25da66bec | 3318:460606155bec |
---|---|
20 import os.path | 20 import os.path |
21 import copy | 21 import copy |
22 import shortuuid | 22 import shortuuid |
23 import mimetypes | 23 import mimetypes |
24 import time | 24 import time |
25 from typing import Optional, Tuple | 25 from functools import partial |
26 from typing import Optional, Tuple, Dict | |
26 from pathlib import Path | 27 from pathlib import Path |
27 from uuid import uuid4 | 28 from uuid import uuid4 |
28 from collections import namedtuple | 29 from collections import namedtuple |
29 from twisted.python import failure | 30 from twisted.python import failure |
30 from twisted.internet import defer, reactor, error | 31 from twisted.internet import defer, reactor, error |
1307 else: | 1308 else: |
1308 directory = directories[0] | 1309 directory = directories[0] |
1309 self.checkFilePermission(directory, peer_jid, perms_to_check) | 1310 self.checkFilePermission(directory, peer_jid, perms_to_check) |
1310 parent = directory["id"] | 1311 parent = directory["id"] |
1311 defer.returnValue((parent, [])) | 1312 defer.returnValue((parent, [])) |
1313 | |
1314 def getFileAffiliations(self, file_data: dict) -> Dict[jid.JID, str]: | |
1315 """Convert file access to pubsub like affiliations""" | |
1316 affiliations = {} | |
1317 access_data = file_data['access'] | |
1318 | |
1319 read_data = access_data.get(C.ACCESS_PERM_READ, {}) | |
1320 if read_data.get('type') == C.ACCESS_TYPE_WHITELIST: | |
1321 for entity_jid_s in read_data['jids']: | |
1322 entity_jid = jid.JID(entity_jid_s) | |
1323 affiliations[entity_jid] = 'member' | |
1324 | |
1325 write_data = access_data.get(C.ACCESS_PERM_WRITE, {}) | |
1326 if write_data.get('type') == C.ACCESS_TYPE_WHITELIST: | |
1327 for entity_jid_s in write_data['jids']: | |
1328 entity_jid = jid.JID(entity_jid_s) | |
1329 affiliations[entity_jid] = 'publisher' | |
1330 | |
1331 owner = file_data.get('owner') | |
1332 if owner: | |
1333 affiliations[owner] = 'owner' | |
1334 | |
1335 return affiliations | |
1336 | |
1337 def _setFileAffiliationsUpdate( | |
1338 self, | |
1339 access: dict, | |
1340 file_data: dict, | |
1341 affiliations: Dict[jid.JID, str] | |
1342 ) -> None: | |
1343 read_data = access.setdefault(C.ACCESS_PERM_READ, {}) | |
1344 if read_data.get('type') != C.ACCESS_TYPE_WHITELIST: | |
1345 read_data['type'] == C.ACCESS_TYPE_WHITELIST | |
1346 if 'jids' not in read_data: | |
1347 read_data['jids'] = [] | |
1348 read_whitelist = read_data['jids'] | |
1349 write_data = access.setdefault(C.ACCESS_PERM_WRITE, {}) | |
1350 if write_data.get('type') != C.ACCESS_TYPE_WHITELIST: | |
1351 write_data['type'] == C.ACCESS_TYPE_WHITELIST | |
1352 if 'jids' not in write_data: | |
1353 write_data['jids'] = [] | |
1354 write_whitelist = write_data['jids'] | |
1355 for entity_jid, affiliation in affiliations.items(): | |
1356 entity_jid_s = entity_jid.full() | |
1357 if affiliation == "none": | |
1358 try: | |
1359 read_whitelist.remove(entity_jid_s) | |
1360 except ValueError: | |
1361 log.warning( | |
1362 "removing affiliation from an entity without read permission: " | |
1363 f"{entity_jid}" | |
1364 ) | |
1365 try: | |
1366 write_whitelist.remove(entity_jid_s) | |
1367 except ValueError: | |
1368 pass | |
1369 elif affiliation == "publisher": | |
1370 if entity_jid_s not in read_whitelist: | |
1371 read_whitelist.append(entity_jid_s) | |
1372 if entity_jid_s not in write_whitelist: | |
1373 write_whitelist.append(entity_jid_s) | |
1374 elif affiliation == "member": | |
1375 if entity_jid_s not in read_whitelist: | |
1376 read_whitelist.append(entity_jid_s) | |
1377 try: | |
1378 write_whitelist.remove(entity_jid_s) | |
1379 except ValueError: | |
1380 pass | |
1381 elif affiliation == "owner": | |
1382 raise NotImplementedError('"owner" affiliation can\'t be set') | |
1383 else: | |
1384 raise ValueError(f"unknown affiliation: {affiliation!r}") | |
1385 | |
1386 async def setFileAffiliations( | |
1387 self, | |
1388 client, | |
1389 file_data: dict, | |
1390 affiliations: Dict[jid.JID, str] | |
1391 ) -> None: | |
1392 """Apply pubsub like affiliation to file_data | |
1393 | |
1394 Affiliations are converted to access types, then set in a whitelist. | |
1395 Affiliation are mapped as follow: | |
1396 - "owner" can't be set (for now) | |
1397 - "publisher" gives read and write permissions | |
1398 - "member" gives read permission only | |
1399 - "none" removes both read and write permissions | |
1400 """ | |
1401 file_id = file_data['id'] | |
1402 await self.fileUpdate( | |
1403 file_id, | |
1404 'access', | |
1405 update_cb=partial( | |
1406 self._setFileAffiliationsUpdate, | |
1407 file_data=file_data, | |
1408 affiliations=affiliations | |
1409 ), | |
1410 ) | |
1312 | 1411 |
1313 @defer.inlineCallbacks | 1412 @defer.inlineCallbacks |
1314 def getFiles( | 1413 def getFiles( |
1315 self, client, peer_jid, file_id=None, version=None, parent=None, path=None, | 1414 self, client, peer_jid, file_id=None, version=None, parent=None, path=None, |
1316 type_=None, file_hash=None, hash_algo=None, name=None, namespace=None, | 1415 type_=None, file_hash=None, hash_algo=None, name=None, namespace=None, |