comparison sat_pubsub/backend.py @ 463:f520ac3164b0

privilege: improvment on last message sending on presence with `+notify`: - local entities subscribed to the presence of an other local entity which is connecting are now added to presence map. This helps getting their notification even if they didn't connect recently - nodes with `presence` access model are now also used for `+notify` - notifications are not sent anymore in case of status change if the resource was already present.
author Goffi <goffi@goffi.org>
date Fri, 15 Oct 2021 13:40:56 +0200
parents c9238fca1fb3
children 391aa65f72b2
comparison
equal deleted inserted replaced
462:a017af61a32b 463:f520ac3164b0
60 publish-subscribe protocol. 60 publish-subscribe protocol.
61 """ 61 """
62 62
63 import copy 63 import copy
64 import uuid 64 import uuid
65 from typing import Optional 65 from typing import Optional, List, Tuple
66 66
67 from zope.interface import implementer 67 from zope.interface import implementer
68 68
69 from twisted.application import service 69 from twisted.application import service
70 from twisted.python import components, failure, log 70 from twisted.python import components, failure, log
275 allowed_accesses = {'open', 'whitelist'} 275 allowed_accesses = {'open', 'whitelist'}
276 else: 276 else:
277 allowed_accesses = {'open', 'presence', 'whitelist'} 277 allowed_accesses = {'open', 'presence', 'whitelist'}
278 return self.storage.getNodeIds(pep, recipient, allowed_accesses) 278 return self.storage.getNodeIds(pep, recipient, allowed_accesses)
279 279
280 def getNodes(self, requestor, pep, recipient): 280 async def getNodes(self, requestor, pep, recipient):
281 if pep: 281 if pep:
282 d = self.privilege.isSubscribedFrom(requestor, recipient) 282 subscribed = await self.privilege.isSubscribedFrom(requestor, recipient)
283 d.addCallback(self._getNodesIds, pep, recipient) 283 return await self._getNodesIds(subscribed, pep, recipient)
284 return d 284 return await self.storage.getNodeIds(pep, recipient)
285 return self.storage.getNodeIds(pep, recipient)
286 285
287 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None): 286 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None):
288 # FIXME: manage pep and recipient 287 # FIXME: manage pep and recipient
289 d = self.storage.getNode(nodeIdentifier, pep, recipient) 288 d = self.storage.getNode(nodeIdentifier, pep, recipient)
290 d.addCallback(lambda node: node.getMetaData()) 289 d.addCallback(lambda node: node.getMetaData())
1039 else: 1038 else:
1040 raise Exception("Unknown access_model") 1039 raise Exception("Unknown access_model")
1041 1040
1042 defer.returnValue((affiliation, owner, roster, access_model)) 1041 defer.returnValue((affiliation, owner, roster, access_model))
1043 1042
1044 @defer.inlineCallbacks 1043 async def getItemsIds(
1045 def getItemsIds(self, nodeIdentifier, requestor, authorized_groups, unrestricted, maxItems=None, ext_data=None, pep=False, recipient=None): 1044 self,
1045 nodeIdentifier: str,
1046 requestor: jid.JID,
1047 authorized_groups: Optional[List[str]],
1048 unrestricted: bool,
1049 maxItems: Optional[int] = None,
1050 ext_data: Optional[dict] = None,
1051 pep: bool = False,
1052 recipient: Optional[jid.JID] = None
1053 ) -> List[str]:
1054 """Retrieve IDs of items from a nodeIdentifier
1055
1056 Requestor permission is checked before retrieving items IDs
1057 """
1046 # FIXME: items access model are not checked 1058 # FIXME: items access model are not checked
1047 # TODO: check items access model 1059 # TODO: check items access model
1048 node = yield self.storage.getNode(nodeIdentifier, pep, recipient) 1060 node = await self.storage.getNode(nodeIdentifier, pep, recipient)
1049 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor) 1061 await self.checkNodeAccess(node, requestor)
1050 ids = yield node.getItemsIds(authorized_groups, 1062 ids = await node.getItemsIds(
1051 unrestricted, 1063 authorized_groups,
1052 maxItems, 1064 unrestricted,
1053 ext_data) 1065 maxItems,
1054 defer.returnValue(ids) 1066 ext_data
1067 )
1068 return ids
1055 1069
1056 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, 1070 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None,
1057 itemIdentifiers=None, ext_data=None): 1071 itemIdentifiers=None, ext_data=None):
1058 d = defer.ensureDeferred( 1072 d = defer.ensureDeferred(
1059 self.getItemsData( 1073 self.getItemsData(
1453 new_item.addChild(item_config.toElement()) 1467 new_item.addChild(item_config.toElement())
1454 return new_item 1468 return new_item
1455 else: 1469 else:
1456 return item 1470 return item
1457 1471
1458 @defer.inlineCallbacks 1472 def _notifyPublish(self, data: dict) -> defer.Deferred:
1459 def _notifyPublish(self, data): 1473 return defer.ensureDeferred(self.notifyPublish(data))
1474
1475 async def notifyPublish(self, data: dict) -> None:
1460 items_data = data['items_data'] 1476 items_data = data['items_data']
1461 node = data['node'] 1477 node = data['node']
1462 pep = data['pep'] 1478 pep = data['pep']
1463 recipient = data['recipient'] 1479 recipient = data['recipient']
1464 1480
1465 owners, notifications_filtered = yield self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient) 1481 owners, notifications_filtered = await self._prepareNotify(
1482 items_data, node, data.get('subscription'), pep, recipient
1483 )
1466 1484
1467 # we notify the owners 1485 # we notify the owners
1468 # FIXME: check if this comply with XEP-0060 (option needed ?) 1486 # FIXME: check if this comply with XEP-0060 (option needed ?)
1469 # TODO: item's access model have to be sent back to owner 1487 # TODO: item's access model have to be sent back to owner
1470 # TODO: same thing for getItems 1488 # TODO: same thing for getItems
1476 owner_jid, 1494 owner_jid,
1477 'subscribed')}, 1495 'subscribed')},
1478 [self.getFullItem(item_data) for item_data in items_data])) 1496 [self.getFullItem(item_data) for item_data in items_data]))
1479 1497
1480 if pep: 1498 if pep:
1481 defer.returnValue(self.backend.privilege.notifyPublish( 1499 self.backend.privilege.notifyPublish(
1482 recipient, 1500 recipient,
1483 node.nodeIdentifier, 1501 node.nodeIdentifier,
1484 notifications_filtered)) 1502 notifications_filtered
1503 )
1485 1504
1486 else: 1505 else:
1487 defer.returnValue(self.pubsubService.notifyPublish( 1506 self.pubsubService.notifyPublish(
1488 self.serviceJID, 1507 self.serviceJID,
1489 node.nodeIdentifier, 1508 node.nodeIdentifier,
1490 notifications_filtered)) 1509 notifications_filtered)
1491 1510
1492 def _notifyRetract(self, data): 1511 def _notifyRetract(self, data: dict) -> defer.Deferred:
1512 return defer.ensureDeferred(self.notifyRetract(data))
1513
1514 async def notifyRetract(self, data: dict) -> None:
1493 items_data = data['items_data'] 1515 items_data = data['items_data']
1494 node = data['node'] 1516 node = data['node']
1495 pep = data['pep'] 1517 pep = data['pep']
1496 recipient = data['recipient'] 1518 recipient = data['recipient']
1497 1519
1498 def afterPrepare(result): 1520 owners, notifications_filtered = await self._prepareNotify(
1499 owners, notifications_filtered = result 1521 items_data, node, data.get('subscription'), pep, recipient
1500 #we add the owners 1522 )
1501 1523
1502 for owner_jid in owners: 1524 #we add the owners
1503 notifications_filtered.append( 1525
1504 (owner_jid, 1526 for owner_jid in owners:
1505 {pubsub.Subscription(node.nodeIdentifier, 1527 notifications_filtered.append(
1506 owner_jid, 1528 (owner_jid,
1507 'subscribed')}, 1529 {pubsub.Subscription(node.nodeIdentifier,
1508 [item_data.item for item_data in items_data])) 1530 owner_jid,
1509 1531 'subscribed')},
1510 if pep: 1532 [item_data.item for item_data in items_data]))
1511 return self.backend.privilege.notifyRetract( 1533
1512 recipient, 1534 if pep:
1513 node.nodeIdentifier, 1535 return self.backend.privilege.notifyRetract(
1514 notifications_filtered) 1536 recipient,
1515 1537 node.nodeIdentifier,
1516 else: 1538 notifications_filtered)
1517 return self.pubsubService.notifyRetract( 1539
1518 self.serviceJID, 1540 else:
1519 node.nodeIdentifier, 1541 return self.pubsubService.notifyRetract(
1520 notifications_filtered) 1542 self.serviceJID,
1521 1543 node.nodeIdentifier,
1522 d = self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient) 1544 notifications_filtered)
1523 d.addCallback(afterPrepare) 1545
1524 return d 1546 async def _prepareNotify(
1525 1547 self,
1526 @defer.inlineCallbacks 1548 items_data: Tuple[domish.Element, str, dict],
1527 def _prepareNotify(self, items_data, node, subscription=None, pep=None, recipient=None): 1549 node,
1550 subscription: Optional[pubsub.Subscription] = None,
1551 pep: bool = False,
1552 recipient: Optional[jid.JID] = None
1553 ) -> Tuple:
1528 """Do a bunch of permissions check and filter notifications 1554 """Do a bunch of permissions check and filter notifications
1529 1555
1530 The owner is not added to these notifications, 1556 The owner is not added to these notifications,
1531 it must be added by the calling method 1557 it must be added by the calling method
1532 @param items_data(tuple): must contain: 1558 @param items_data: must contain:
1533 - item (domish.Element) 1559 - item
1534 - access_model (unicode) 1560 - access_model
1535 - access_list (dict as returned getItemsById, or item_config) 1561 - access_list (dict as returned getItemsById, or item_config)
1536 @param node(LeafNode): node hosting the items 1562 @param node(LeafNode): node hosting the items
1537 @param subscription(pubsub.Subscription, None): TODO 1563 @param subscription: TODO
1538 1564
1539 @return (tuple): will contain: 1565 @return: will contain:
1540 - notifications_filtered 1566 - notifications_filtered
1541 - node_owner_jid 1567 - node_owner_jid
1542 - items_data 1568 - items_data
1543 """ 1569 """
1544 if subscription is None: 1570 if subscription is None:
1545 notifications = yield self.backend.getNotifications(node, items_data) 1571 notifications = await self.backend.getNotifications(node, items_data)
1546 else: 1572 else:
1547 notifications = [(subscription.subscriber, [subscription], items_data)] 1573 notifications = [(subscription.subscriber, [subscription], items_data)]
1548 1574
1549 if pep and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence'): 1575 if ((pep
1576 and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence')
1577 )):
1550 # for PEP we need to manage automatic subscriptions (cf. XEP-0163 §4) 1578 # for PEP we need to manage automatic subscriptions (cf. XEP-0163 §4)
1551 explicit_subscribers = {subscriber for subscriber, _, _ in notifications} 1579 explicit_subscribers = {subscriber for subscriber, _, _ in notifications}
1552 auto_subscribers = yield self.backend.privilege.getAutoSubscribers(recipient, node.nodeIdentifier, explicit_subscribers) 1580 auto_subscribers = await self.backend.privilege.getAutoSubscribers(
1581 recipient, node.nodeIdentifier, explicit_subscribers
1582 )
1553 for sub_jid in auto_subscribers: 1583 for sub_jid in auto_subscribers:
1554 sub = pubsub.Subscription(node.nodeIdentifier, sub_jid, 'subscribed') 1584 sub = pubsub.Subscription(node.nodeIdentifier, sub_jid, 'subscribed')
1555 notifications.append((sub_jid, [sub], items_data)) 1585 notifications.append((sub_jid, [sub], items_data))
1556 1586
1557 owners = yield node.getOwners() 1587 owners = await node.getOwners()
1558 owner_roster = None 1588 owner_roster = None
1559 1589
1560 # now we check access of subscriber for each item, and keep only allowed ones 1590 # now we check access of subscriber for each item, and keep only allowed ones
1561 1591
1562 #we filter items not allowed for the subscribers 1592 #we filter items not allowed for the subscribers
1583 if access_model == const.VAL_AMODEL_OPEN: 1613 if access_model == const.VAL_AMODEL_OPEN:
1584 allowed_items.append(item) 1614 allowed_items.append(item)
1585 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: 1615 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
1586 if owner_roster is None: 1616 if owner_roster is None:
1587 # FIXME: publisher roster should be used, not owner 1617 # FIXME: publisher roster should be used, not owner
1588 owner_roster= yield self.backend.getOwnerRoster(node, owners) 1618 owner_roster= await self.backend.getOwnerRoster(node, owners)
1589 if owner_roster is None: 1619 if owner_roster is None:
1590 owner_roster = {} 1620 owner_roster = {}
1591 if not subscriber_bare in owner_roster: 1621 if not subscriber_bare in owner_roster:
1592 continue 1622 continue
1593 #the subscriber is known, is he in the right group ? 1623 #the subscriber is known, is he in the right group ?
1599 raise NotImplementedError 1629 raise NotImplementedError
1600 1630
1601 if allowed_items: 1631 if allowed_items:
1602 notifications_filtered.append((subscriber, subscriptions, allowed_items)) 1632 notifications_filtered.append((subscriber, subscriptions, allowed_items))
1603 1633
1604 defer.returnValue((owners, notifications_filtered)) 1634 return (owners, notifications_filtered)
1605 1635
1606 async def _aNotifyDelete(self, data): 1636 async def _aNotifyDelete(self, data):
1607 nodeIdentifier = data['node'].nodeIdentifier 1637 nodeIdentifier = data['node'].nodeIdentifier
1608 pep = data['pep'] 1638 pep = data['pep']
1609 recipient = data['recipient'] 1639 recipient = data['recipient']
1751 def getConfigurationOptions(self): 1781 def getConfigurationOptions(self):
1752 return self.backend.nodeOptions 1782 return self.backend.nodeOptions
1753 1783
1754 def _publish_errb(self, failure, request): 1784 def _publish_errb(self, failure, request):
1755 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): 1785 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
1756 print("Auto-creating node %s" % (request.nodeIdentifier,)) 1786 print(f"Auto-creating node {request.nodeIdentifier}")
1757 d = self.backend.createNode(request.nodeIdentifier, 1787 d = self.backend.createNode(request.nodeIdentifier,
1758 request.sender, 1788 request.sender,
1759 request.options, 1789 request.options,
1760 pep=self._isPep(request), 1790 pep=self._isPep(request),
1761 recipient=request.recipient) 1791 recipient=request.recipient)