Mercurial > libervia-pubsub
comparison sat_pubsub/backend.py @ 463:f520ac3164b0
privilege: improvment on last message sending on presence with `+notify`:
- local entities subscribed to the presence of an other local entity which is connecting
are now added to presence map. This helps getting their notification even if they didn't
connect recently
- nodes with `presence` access model are now also used for `+notify`
- notifications are not sent anymore in case of status change if the resource was already
present.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 15 Oct 2021 13:40:56 +0200 |
parents | c9238fca1fb3 |
children | 391aa65f72b2 |
comparison
equal
deleted
inserted
replaced
462:a017af61a32b | 463:f520ac3164b0 |
---|---|
60 publish-subscribe protocol. | 60 publish-subscribe protocol. |
61 """ | 61 """ |
62 | 62 |
63 import copy | 63 import copy |
64 import uuid | 64 import uuid |
65 from typing import Optional | 65 from typing import Optional, List, Tuple |
66 | 66 |
67 from zope.interface import implementer | 67 from zope.interface import implementer |
68 | 68 |
69 from twisted.application import service | 69 from twisted.application import service |
70 from twisted.python import components, failure, log | 70 from twisted.python import components, failure, log |
275 allowed_accesses = {'open', 'whitelist'} | 275 allowed_accesses = {'open', 'whitelist'} |
276 else: | 276 else: |
277 allowed_accesses = {'open', 'presence', 'whitelist'} | 277 allowed_accesses = {'open', 'presence', 'whitelist'} |
278 return self.storage.getNodeIds(pep, recipient, allowed_accesses) | 278 return self.storage.getNodeIds(pep, recipient, allowed_accesses) |
279 | 279 |
280 def getNodes(self, requestor, pep, recipient): | 280 async def getNodes(self, requestor, pep, recipient): |
281 if pep: | 281 if pep: |
282 d = self.privilege.isSubscribedFrom(requestor, recipient) | 282 subscribed = await self.privilege.isSubscribedFrom(requestor, recipient) |
283 d.addCallback(self._getNodesIds, pep, recipient) | 283 return await self._getNodesIds(subscribed, pep, recipient) |
284 return d | 284 return await self.storage.getNodeIds(pep, recipient) |
285 return self.storage.getNodeIds(pep, recipient) | |
286 | 285 |
287 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None): | 286 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None): |
288 # FIXME: manage pep and recipient | 287 # FIXME: manage pep and recipient |
289 d = self.storage.getNode(nodeIdentifier, pep, recipient) | 288 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
290 d.addCallback(lambda node: node.getMetaData()) | 289 d.addCallback(lambda node: node.getMetaData()) |
1039 else: | 1038 else: |
1040 raise Exception("Unknown access_model") | 1039 raise Exception("Unknown access_model") |
1041 | 1040 |
1042 defer.returnValue((affiliation, owner, roster, access_model)) | 1041 defer.returnValue((affiliation, owner, roster, access_model)) |
1043 | 1042 |
1044 @defer.inlineCallbacks | 1043 async def getItemsIds( |
1045 def getItemsIds(self, nodeIdentifier, requestor, authorized_groups, unrestricted, maxItems=None, ext_data=None, pep=False, recipient=None): | 1044 self, |
1045 nodeIdentifier: str, | |
1046 requestor: jid.JID, | |
1047 authorized_groups: Optional[List[str]], | |
1048 unrestricted: bool, | |
1049 maxItems: Optional[int] = None, | |
1050 ext_data: Optional[dict] = None, | |
1051 pep: bool = False, | |
1052 recipient: Optional[jid.JID] = None | |
1053 ) -> List[str]: | |
1054 """Retrieve IDs of items from a nodeIdentifier | |
1055 | |
1056 Requestor permission is checked before retrieving items IDs | |
1057 """ | |
1046 # FIXME: items access model are not checked | 1058 # FIXME: items access model are not checked |
1047 # TODO: check items access model | 1059 # TODO: check items access model |
1048 node = yield self.storage.getNode(nodeIdentifier, pep, recipient) | 1060 node = await self.storage.getNode(nodeIdentifier, pep, recipient) |
1049 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor) | 1061 await self.checkNodeAccess(node, requestor) |
1050 ids = yield node.getItemsIds(authorized_groups, | 1062 ids = await node.getItemsIds( |
1051 unrestricted, | 1063 authorized_groups, |
1052 maxItems, | 1064 unrestricted, |
1053 ext_data) | 1065 maxItems, |
1054 defer.returnValue(ids) | 1066 ext_data |
1067 ) | |
1068 return ids | |
1055 | 1069 |
1056 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, | 1070 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, |
1057 itemIdentifiers=None, ext_data=None): | 1071 itemIdentifiers=None, ext_data=None): |
1058 d = defer.ensureDeferred( | 1072 d = defer.ensureDeferred( |
1059 self.getItemsData( | 1073 self.getItemsData( |
1453 new_item.addChild(item_config.toElement()) | 1467 new_item.addChild(item_config.toElement()) |
1454 return new_item | 1468 return new_item |
1455 else: | 1469 else: |
1456 return item | 1470 return item |
1457 | 1471 |
1458 @defer.inlineCallbacks | 1472 def _notifyPublish(self, data: dict) -> defer.Deferred: |
1459 def _notifyPublish(self, data): | 1473 return defer.ensureDeferred(self.notifyPublish(data)) |
1474 | |
1475 async def notifyPublish(self, data: dict) -> None: | |
1460 items_data = data['items_data'] | 1476 items_data = data['items_data'] |
1461 node = data['node'] | 1477 node = data['node'] |
1462 pep = data['pep'] | 1478 pep = data['pep'] |
1463 recipient = data['recipient'] | 1479 recipient = data['recipient'] |
1464 | 1480 |
1465 owners, notifications_filtered = yield self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient) | 1481 owners, notifications_filtered = await self._prepareNotify( |
1482 items_data, node, data.get('subscription'), pep, recipient | |
1483 ) | |
1466 | 1484 |
1467 # we notify the owners | 1485 # we notify the owners |
1468 # FIXME: check if this comply with XEP-0060 (option needed ?) | 1486 # FIXME: check if this comply with XEP-0060 (option needed ?) |
1469 # TODO: item's access model have to be sent back to owner | 1487 # TODO: item's access model have to be sent back to owner |
1470 # TODO: same thing for getItems | 1488 # TODO: same thing for getItems |
1476 owner_jid, | 1494 owner_jid, |
1477 'subscribed')}, | 1495 'subscribed')}, |
1478 [self.getFullItem(item_data) for item_data in items_data])) | 1496 [self.getFullItem(item_data) for item_data in items_data])) |
1479 | 1497 |
1480 if pep: | 1498 if pep: |
1481 defer.returnValue(self.backend.privilege.notifyPublish( | 1499 self.backend.privilege.notifyPublish( |
1482 recipient, | 1500 recipient, |
1483 node.nodeIdentifier, | 1501 node.nodeIdentifier, |
1484 notifications_filtered)) | 1502 notifications_filtered |
1503 ) | |
1485 | 1504 |
1486 else: | 1505 else: |
1487 defer.returnValue(self.pubsubService.notifyPublish( | 1506 self.pubsubService.notifyPublish( |
1488 self.serviceJID, | 1507 self.serviceJID, |
1489 node.nodeIdentifier, | 1508 node.nodeIdentifier, |
1490 notifications_filtered)) | 1509 notifications_filtered) |
1491 | 1510 |
1492 def _notifyRetract(self, data): | 1511 def _notifyRetract(self, data: dict) -> defer.Deferred: |
1512 return defer.ensureDeferred(self.notifyRetract(data)) | |
1513 | |
1514 async def notifyRetract(self, data: dict) -> None: | |
1493 items_data = data['items_data'] | 1515 items_data = data['items_data'] |
1494 node = data['node'] | 1516 node = data['node'] |
1495 pep = data['pep'] | 1517 pep = data['pep'] |
1496 recipient = data['recipient'] | 1518 recipient = data['recipient'] |
1497 | 1519 |
1498 def afterPrepare(result): | 1520 owners, notifications_filtered = await self._prepareNotify( |
1499 owners, notifications_filtered = result | 1521 items_data, node, data.get('subscription'), pep, recipient |
1500 #we add the owners | 1522 ) |
1501 | 1523 |
1502 for owner_jid in owners: | 1524 #we add the owners |
1503 notifications_filtered.append( | 1525 |
1504 (owner_jid, | 1526 for owner_jid in owners: |
1505 {pubsub.Subscription(node.nodeIdentifier, | 1527 notifications_filtered.append( |
1506 owner_jid, | 1528 (owner_jid, |
1507 'subscribed')}, | 1529 {pubsub.Subscription(node.nodeIdentifier, |
1508 [item_data.item for item_data in items_data])) | 1530 owner_jid, |
1509 | 1531 'subscribed')}, |
1510 if pep: | 1532 [item_data.item for item_data in items_data])) |
1511 return self.backend.privilege.notifyRetract( | 1533 |
1512 recipient, | 1534 if pep: |
1513 node.nodeIdentifier, | 1535 return self.backend.privilege.notifyRetract( |
1514 notifications_filtered) | 1536 recipient, |
1515 | 1537 node.nodeIdentifier, |
1516 else: | 1538 notifications_filtered) |
1517 return self.pubsubService.notifyRetract( | 1539 |
1518 self.serviceJID, | 1540 else: |
1519 node.nodeIdentifier, | 1541 return self.pubsubService.notifyRetract( |
1520 notifications_filtered) | 1542 self.serviceJID, |
1521 | 1543 node.nodeIdentifier, |
1522 d = self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient) | 1544 notifications_filtered) |
1523 d.addCallback(afterPrepare) | 1545 |
1524 return d | 1546 async def _prepareNotify( |
1525 | 1547 self, |
1526 @defer.inlineCallbacks | 1548 items_data: Tuple[domish.Element, str, dict], |
1527 def _prepareNotify(self, items_data, node, subscription=None, pep=None, recipient=None): | 1549 node, |
1550 subscription: Optional[pubsub.Subscription] = None, | |
1551 pep: bool = False, | |
1552 recipient: Optional[jid.JID] = None | |
1553 ) -> Tuple: | |
1528 """Do a bunch of permissions check and filter notifications | 1554 """Do a bunch of permissions check and filter notifications |
1529 | 1555 |
1530 The owner is not added to these notifications, | 1556 The owner is not added to these notifications, |
1531 it must be added by the calling method | 1557 it must be added by the calling method |
1532 @param items_data(tuple): must contain: | 1558 @param items_data: must contain: |
1533 - item (domish.Element) | 1559 - item |
1534 - access_model (unicode) | 1560 - access_model |
1535 - access_list (dict as returned getItemsById, or item_config) | 1561 - access_list (dict as returned getItemsById, or item_config) |
1536 @param node(LeafNode): node hosting the items | 1562 @param node(LeafNode): node hosting the items |
1537 @param subscription(pubsub.Subscription, None): TODO | 1563 @param subscription: TODO |
1538 | 1564 |
1539 @return (tuple): will contain: | 1565 @return: will contain: |
1540 - notifications_filtered | 1566 - notifications_filtered |
1541 - node_owner_jid | 1567 - node_owner_jid |
1542 - items_data | 1568 - items_data |
1543 """ | 1569 """ |
1544 if subscription is None: | 1570 if subscription is None: |
1545 notifications = yield self.backend.getNotifications(node, items_data) | 1571 notifications = await self.backend.getNotifications(node, items_data) |
1546 else: | 1572 else: |
1547 notifications = [(subscription.subscriber, [subscription], items_data)] | 1573 notifications = [(subscription.subscriber, [subscription], items_data)] |
1548 | 1574 |
1549 if pep and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence'): | 1575 if ((pep |
1576 and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence') | |
1577 )): | |
1550 # for PEP we need to manage automatic subscriptions (cf. XEP-0163 §4) | 1578 # for PEP we need to manage automatic subscriptions (cf. XEP-0163 §4) |
1551 explicit_subscribers = {subscriber for subscriber, _, _ in notifications} | 1579 explicit_subscribers = {subscriber for subscriber, _, _ in notifications} |
1552 auto_subscribers = yield self.backend.privilege.getAutoSubscribers(recipient, node.nodeIdentifier, explicit_subscribers) | 1580 auto_subscribers = await self.backend.privilege.getAutoSubscribers( |
1581 recipient, node.nodeIdentifier, explicit_subscribers | |
1582 ) | |
1553 for sub_jid in auto_subscribers: | 1583 for sub_jid in auto_subscribers: |
1554 sub = pubsub.Subscription(node.nodeIdentifier, sub_jid, 'subscribed') | 1584 sub = pubsub.Subscription(node.nodeIdentifier, sub_jid, 'subscribed') |
1555 notifications.append((sub_jid, [sub], items_data)) | 1585 notifications.append((sub_jid, [sub], items_data)) |
1556 | 1586 |
1557 owners = yield node.getOwners() | 1587 owners = await node.getOwners() |
1558 owner_roster = None | 1588 owner_roster = None |
1559 | 1589 |
1560 # now we check access of subscriber for each item, and keep only allowed ones | 1590 # now we check access of subscriber for each item, and keep only allowed ones |
1561 | 1591 |
1562 #we filter items not allowed for the subscribers | 1592 #we filter items not allowed for the subscribers |
1583 if access_model == const.VAL_AMODEL_OPEN: | 1613 if access_model == const.VAL_AMODEL_OPEN: |
1584 allowed_items.append(item) | 1614 allowed_items.append(item) |
1585 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | 1615 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: |
1586 if owner_roster is None: | 1616 if owner_roster is None: |
1587 # FIXME: publisher roster should be used, not owner | 1617 # FIXME: publisher roster should be used, not owner |
1588 owner_roster= yield self.backend.getOwnerRoster(node, owners) | 1618 owner_roster= await self.backend.getOwnerRoster(node, owners) |
1589 if owner_roster is None: | 1619 if owner_roster is None: |
1590 owner_roster = {} | 1620 owner_roster = {} |
1591 if not subscriber_bare in owner_roster: | 1621 if not subscriber_bare in owner_roster: |
1592 continue | 1622 continue |
1593 #the subscriber is known, is he in the right group ? | 1623 #the subscriber is known, is he in the right group ? |
1599 raise NotImplementedError | 1629 raise NotImplementedError |
1600 | 1630 |
1601 if allowed_items: | 1631 if allowed_items: |
1602 notifications_filtered.append((subscriber, subscriptions, allowed_items)) | 1632 notifications_filtered.append((subscriber, subscriptions, allowed_items)) |
1603 | 1633 |
1604 defer.returnValue((owners, notifications_filtered)) | 1634 return (owners, notifications_filtered) |
1605 | 1635 |
1606 async def _aNotifyDelete(self, data): | 1636 async def _aNotifyDelete(self, data): |
1607 nodeIdentifier = data['node'].nodeIdentifier | 1637 nodeIdentifier = data['node'].nodeIdentifier |
1608 pep = data['pep'] | 1638 pep = data['pep'] |
1609 recipient = data['recipient'] | 1639 recipient = data['recipient'] |
1751 def getConfigurationOptions(self): | 1781 def getConfigurationOptions(self): |
1752 return self.backend.nodeOptions | 1782 return self.backend.nodeOptions |
1753 | 1783 |
1754 def _publish_errb(self, failure, request): | 1784 def _publish_errb(self, failure, request): |
1755 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): | 1785 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): |
1756 print("Auto-creating node %s" % (request.nodeIdentifier,)) | 1786 print(f"Auto-creating node {request.nodeIdentifier}") |
1757 d = self.backend.createNode(request.nodeIdentifier, | 1787 d = self.backend.createNode(request.nodeIdentifier, |
1758 request.sender, | 1788 request.sender, |
1759 request.options, | 1789 request.options, |
1760 pep=self._isPep(request), | 1790 pep=self._isPep(request), |
1761 recipient=request.recipient) | 1791 recipient=request.recipient) |