Mercurial > libervia-pubsub
diff sat_pubsub/backend.py @ 294:df1edebb0466
PEP implementation, draft (huge patch sorry):
/!\ database schema has changed ! /!\
- whole PEP behaviour is not managed yet
- if the stanza is delegated, PEP is assumed
- fixed potential SQL injection in pgsql_storage
- publish notifications manage PEP
- added retract notifications (if "notify" attribute is present), with PEP handling
- a publisher can't replace an item he didn't publised anymore
- /!\ schema has changed, sat_pubsub_update_0_1.sql update it
- sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 16 Aug 2015 01:32:42 +0200 |
parents | 9f612fa19eea |
children | 6ce33757d21e |
line wrap: on
line diff
--- a/sat_pubsub/backend.py Sun Aug 16 01:15:13 2015 +0200 +++ b/sat_pubsub/backend.py Sun Aug 16 01:32:42 2015 +0200 @@ -70,7 +70,7 @@ from twisted.python import components, log from twisted.internet import defer, reactor from twisted.words.protocols.jabber.error import StanzaError -from twisted.words.protocols.jabber.jid import JID, InvalidFormat +# from twisted.words.protocols.jabber.jid import JID, InvalidFormat from twisted.words.xish import utility from wokkel import disco, data_form, rsm @@ -103,13 +103,13 @@ implements(iidavoll.IBackendService) nodeOptions = { - "pubsub#persist_items": + const.OPT_PERSIST_ITEMS: {"type": "boolean", "label": "Persist items to storage"}, - "pubsub#deliver_payloads": + const.OPT_DELIVER_PAYLOADS: {"type": "boolean", "label": "Deliver payloads with event notifications"}, - "pubsub#send_last_published_item": + const.OPT_SEND_LAST_PUBLISHED_ITEM: {"type": "list-single", "label": "When to send the last published item", "options": { @@ -181,18 +181,20 @@ return True - def getNodeType(self, nodeIdentifier): - d = self.storage.getNode(nodeIdentifier) + def getNodeType(self, nodeIdentifier, pep, recipient=None): + # FIXME: manage pep and recipient + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getType()) return d - def getNodes(self): - return self.storage.getNodeIds() + def getNodes(self, pep): + return self.storage.getNodeIds(pep) - def getNodeMetaData(self, nodeIdentifier): - d = self.storage.getNode(nodeIdentifier) + def getNodeMetaData(self, nodeIdentifier, pep, recipient=None): + # FIXME: manage pep and recipient + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getMetaData()) d.addCallback(self._makeMetaData) return d @@ -214,14 +216,13 @@ """ Check authorisation of publishing in node for requestor """ def check(affiliation): - d = defer.succeed(node) + d = defer.succeed((affiliation, node)) configuration = node.getConfiguration() publish_model = configuration[const.OPT_PUBLISH_MODEL] - - if (publish_model == const.VAL_PMODEL_PUBLISHERS): + if publish_model == const.VAL_PMODEL_PUBLISHERS: if affiliation not in ['owner', 'publisher']: raise error.Forbidden() - elif (publish_model == const.VAL_PMODEL_SUBSCRIBERS): + elif publish_model == const.VAL_PMODEL_SUBSCRIBERS: if affiliation not in ['owner', 'publisher']: # we are in subscribers publish model, we must check that # the requestor is a subscriber to allow him to publish @@ -229,12 +230,12 @@ def checkSubscription(subscribed): if not subscribed: raise error.Forbidden() - return node + return (affiliation, node) d.addCallback(lambda ignore: node.isSubscribed(requestor)) d.addCallback(checkSubscription) elif publish_model != const.VAL_PMODEL_OPEN: - raise Exception('Unexpected value') # publish_model must be publishers (default), subscribers or open. + raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open. return d @@ -246,6 +247,7 @@ """Get and remove item configuration information @param item: """ + # FIXME: dirty ! Need to use elements() item_config = None access_model = const.VAL_AMODEL_DEFAULT for i in range(len(item.children)): @@ -264,60 +266,87 @@ return (access_model, item_config) - def publish(self, nodeIdentifier, items, requestor): - d = self.storage.getNode(nodeIdentifier) + def _checkOverwrite(self, node, itemIdentifiers, publisher): + """Check that the itemIdentifiers correspond to items published + by the current publisher""" + def doCheck(item_pub_map): + for item_publisher in item_pub_map.iterValues(): + if item_publisher.userhost() != publisher.userhost(): + raise error.ItemForbidden() + + d = node.getItemsPublishers(itemIdentifiers) + d.addCallback(doCheck) + return d + + + def publish(self, nodeIdentifier, items, requestor, pep, recipient): + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(self._checkAuth, requestor) #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster. #FIXME: in addition, there can be several owners: that is not managed yet - d.addCallback(self._doPublish, items, requestor) + d.addCallback(self._doPublish, items, requestor, pep, recipient) return d - def _doPublish(self, node, items, requestor): + def _doPublish(self, result, items, requestor, pep, recipient): + affiliation, node = result if node.nodeType == 'collection': raise error.NoPublishing() configuration = node.getConfiguration() - persistItems = configuration["pubsub#persist_items"] - deliverPayloads = configuration["pubsub#deliver_payloads"] + persistItems = configuration[const.OPT_PERSIST_ITEMS] + deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS] if items and not persistItems and not deliverPayloads: raise error.ItemForbidden() elif not items and (persistItems or deliverPayloads): raise error.ItemRequired() - parsed_items = [] + items_data = [] + check_overwrite = False for item in items: if persistItems or deliverPayloads: item.uri = None item.defaultUri = None if not item.getAttribute("id"): item["id"] = str(uuid.uuid4()) + else: + check_overwrite = True access_model, item_config = self.parseItemConfig(item) - parsed_items.append((access_model, item_config, item)) + items_data.append((item, access_model, item_config)) if persistItems: - d = node.storeItems(parsed_items, requestor) + if check_overwrite and affiliation != 'owner': + # we don't want a publisher to overwrite the item + # of an other publisher + d = self._checkOverwrite(node, [item['id'] for item in items if item.getAttribute('id')], requestor) + d.addCallback(lambda _: node.storeItems(items_data, requestor)) + else: + d = node.storeItems(items_data, requestor) else: d = defer.succeed(None) - d.addCallback(self._doNotify, node, parsed_items, - deliverPayloads) + d.addCallback(self._doNotify, node, items_data, + deliverPayloads, pep, recipient) return d - def _doNotify(self, result, node, items, deliverPayloads): - if items and not deliverPayloads: - for access_model, item_config, item in items: + def _doNotify(self, result, node, items_data, deliverPayloads, pep, recipient): + if items_data and not deliverPayloads: + for access_model, item_config, item in items_data: item.children = [] - - self.dispatch({'items': items, 'node': node}, + self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient}, '//event/pubsub/notify') - def getNotifications(self, nodeIdentifier, items): + def getNotifications(self, nodeDbId, items_data): + """Build a list of subscriber to the node - def toNotifications(subscriptions, nodeIdentifier, items): + subscribers will be associated with subscribed items, + and subscription type. + """ + + def toNotifications(subscriptions, items_data): subsBySubscriber = {} for subscription in subscriptions: if subscription.options.get('pubsub#subscription_type', @@ -326,7 +355,7 @@ set()) subs.add(subscription) - notifications = [(subscriber, subscriptions_, items) + notifications = [(subscriber, subscriptions_, items_data) for subscriber, subscriptions_ in subsBySubscriber.iteritems()] @@ -336,37 +365,46 @@ failure.trap(error.NodeNotFound) return [] - d1 = self.storage.getNode(nodeIdentifier) + d1 = self.storage.getNodeById(nodeDbId) d1.addCallback(lambda node: node.getSubscriptions('subscribed')) - d2 = self.storage.getNode('') - d2.addCallback(lambda node: node.getSubscriptions('subscribed')) - d2.addErrback(rootNotFound) - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda result: result[0] + result[1]) - d.addCallback(toNotifications, nodeIdentifier, items) - return d + # FIXME: must add root node subscriptions ? + # d2 = self.storage.getNode('', False) # FIXME: to check + # d2.addCallback(lambda node: node.getSubscriptions('subscribed')) + # d2.addErrback(rootNotFound) + # d = defer.gatherResults([d1, d2]) + # d.addCallback(lambda result: result[0] + result[1]) + d1.addCallback(toNotifications, items_data) + return d1 - def registerNotifier(self, observerfn, *args, **kwargs): + def registerPublishNotifier(self, observerfn, *args, **kwargs): self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) - def subscribe(self, nodeIdentifier, subscriber, requestor): + def registerRetractNotifier(self, observerfn, *args, **kwargs): + self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs) + + def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): subscriberEntity = subscriber.userhostJID() if subscriberEntity != requestor.userhostJID(): return defer.fail(error.Forbidden()) - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, subscriberEntity) d.addCallback(self._doSubscribe, subscriber) return d def _doSubscribe(self, result, subscriber): + # TODO: implement other access models node, affiliation = result - #FIXME: must check node's access_model before subscribing if affiliation == 'outcast': raise error.Forbidden() + access_model = node.getAccessModel() + + if access_model != const.VAL_AMODEL_OPEN: + raise NotImplementedError + def trapExists(failure): failure.trap(error.SubscriptionExists) return False @@ -380,6 +418,7 @@ d = node.addSubscription(subscriber, 'subscribed', {}) d.addCallbacks(lambda _: True, trapExists) d.addCallback(cb) + return d @@ -406,11 +445,11 @@ return subscription - def unsubscribe(self, nodeIdentifier, subscriber, requestor): + def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): if subscriber.userhostJID() != requestor.userhostJID(): return defer.fail(error.Forbidden()) - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.removeSubscription(subscriber)) return d @@ -428,33 +467,33 @@ return True - def createNode(self, nodeIdentifier, requestor, options = None): + def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None): if not nodeIdentifier: nodeIdentifier = 'generic/%s' % uuid.uuid4() if not options: options = {} - if self.supportsCreatorCheck(): - groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) - try: - nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) - except InvalidFormat: - is_user_jid = False - else: - is_user_jid = bool(nodeIdentifierJID.user) + # if self.supportsCreatorCheck(): + # groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) + # try: + # nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) + # except InvalidFormat: + # is_user_jid = False + # else: + # is_user_jid = bool(nodeIdentifierJID.user) - if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): - #we have an user jid node, but not created by the owner of this jid - print "Wrong creator" - raise error.Forbidden() + # if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): + # #we have an user jid node, but not created by the owner of this jid + # print "Wrong creator" + # raise error.Forbidden() nodeType = 'leaf' config = self.storage.getDefaultConfiguration(nodeType) config['pubsub#node_type'] = nodeType config.update(options) - d = self.storage.createNode(nodeIdentifier, requestor, config) + d = self.storage.createNode(nodeIdentifier, requestor, config, pep, recipient) d.addCallback(lambda _: nodeIdentifier) return d @@ -464,21 +503,21 @@ return d - def getNodeConfiguration(self, nodeIdentifier): + def getNodeConfiguration(self, nodeIdentifier, pep, recipient): if not nodeIdentifier: return defer.fail(error.NoRootNode()) - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getConfiguration()) return d - def setNodeConfiguration(self, nodeIdentifier, options, requestor): + def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient): if not nodeIdentifier: return defer.fail(error.NoRootNode()) - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, requestor) d.addCallback(self._doSetNodeConfiguration, options) return d @@ -497,13 +536,13 @@ return self.storage.getAffiliations(entity) - def getItems(self, nodeIdentifier, requestor, maxItems=None, + def getItems(self, nodeIdentifier, recipient, maxItems=None, itemIdentifiers=None, ext_data=None): if ext_data is None: ext_data = {} - d = self.storage.getNode(nodeIdentifier) - d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doGetItems, requestor, maxItems, itemIdentifiers, + d = self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) + d.addCallback(_getAffiliation, recipient) + d.addCallback(self._doGetItems, recipient, maxItems, itemIdentifiers, ext_data) return d @@ -545,7 +584,7 @@ elif access_model == const.VAL_AMODEL_ROSTER: form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG) access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER) - allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list) + allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list[const.OPT_ROSTER_GROUPS_ALLOWED]) form.addField(access) form.addField(allowed) item.addChild(form.toElement()) @@ -590,15 +629,15 @@ if affiliation == 'outcast': raise error.Forbidden() - access_model = node.getConfiguration()["pubsub#access_model"] + access_model = node.getAccessModel() d = node.getNodeOwner() d.addCallback(self.privilege.getRoster) d.addErrback(self._rosterEb) - if access_model == 'open' or affiliation == 'owner': + if access_model == const.VAL_AMODEL_OPEN or affiliation == 'owner': d.addCallback(lambda roster: (True, roster)) d.addCallback(access_checked) - elif access_model == 'roster': + elif access_model == const.VAL_AMODEL_ROSTER: d.addCallback(self._getNodeGroups,node.nodeIdentifier) d.addCallback(self.checkGroup, requestor) d.addCallback(access_checked) @@ -650,36 +689,39 @@ return defer.DeferredList(d_list).addCallback(render) - def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): - d = self.storage.getNode(nodeIdentifier) + def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, requestor) - if const.FLAG_RETRACT_ALLOW_PUBLISHER: - d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor) - else: - d.addCallback(self._doRetract, itemIdentifiers) + # FIXME: to be checked + # if const.FLAG_RETRACT_ALLOW_PUBLISHER: + # d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor) + # else: + # d.addCallback(self._doRetract, itemIdentifiers) + d.addCallback(self._doRetract, itemIdentifiers, notify, pep, recipient) return d - def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor): - """This method has been added to allow the publisher - of an item to retract it, even if he has no affiliation - to that item. For instance, this allows you to delete - an item you posted in a node of "open" publish model. - """ + # FIXME: to be checked + # def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor): + # """This method has been added to allow the publisher + # of an item to retract it, even if he has no affiliation + # to that item. For instance, this allows you to delete + # an item you posted in a node of "open" publish model. + # """ + # node, affiliation = result + # if affiliation in ['owner', 'publisher']: + # return self._doRetract(result, itemIdentifiers) + # d = node.filterItemsWithPublisher(itemIdentifiers, requestor) + # def filterCb(filteredItems): + # if not filteredItems: + # return self._doRetract(result, itemIdentifiers) + # # XXX: fake an affiliation that does NOT exist + # return self._doRetract((node, 'publisher'), filteredItems) + # d.addCallback(filterCb) + # return d + + def _doRetract(self, result, itemIdentifiers, notify, pep, recipient): node, affiliation = result - if affiliation in ['owner', 'publisher']: - return self._doRetract(result, itemIdentifiers) - d = node.filterItemsWithPublisher(itemIdentifiers, requestor) - def filterCb(filteredItems): - if not filteredItems: - return self._doRetract(result, itemIdentifiers) - # XXX: fake an affiliation that does NOT exist - return self._doRetract((node, 'publisher'), filteredItems) - d.addCallback(filterCb) - return d - - def _doRetract(self, result, itemIdentifiers): - node, affiliation = result - persistItems = node.getConfiguration()["pubsub#persist_items"] + persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] if affiliation not in ['owner', 'publisher']: raise error.Forbidden() @@ -687,19 +729,32 @@ if not persistItems: raise error.NodeNotPersistent() - d = node.removeItems(itemIdentifiers) - d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) + # we need to get the items before removing them, for the notifications + + def removeItems(items_data): + """Remove the items and keep only actually removed ones in items_data""" + d = node.removeItems(itemIdentifiers) + d.addCallback(lambda removed: [item_data for item_data in items_data if item_data[0]["id"] in removed]) + return d + + d = node.getItemsById(None, True, itemIdentifiers) + d.addCallback(removeItems) + + if notify: + d.addCallback(self._doNotifyRetraction, node, pep, recipient) return d - def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): - self.dispatch({'itemIdentifiers': itemIdentifiers, - 'nodeIdentifier': nodeIdentifier }, + def _doNotifyRetraction(self, items_data, node, pep, recipient): + self.dispatch({'items_data': items_data, + 'node': node, + 'pep': pep, + 'recipient': recipient}, '//event/pubsub/retract') - def purgeNode(self, nodeIdentifier, requestor): - d = self.storage.getNode(nodeIdentifier) + def purgeNode(self, nodeIdentifier, requestor, pep, recipient): + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, requestor) d.addCallback(self._doPurge) return d @@ -707,7 +762,7 @@ def _doPurge(self, result): node, affiliation = result - persistItems = node.getConfiguration()["pubsub#persist_items"] + persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] if affiliation != 'owner': raise error.Forbidden() @@ -728,24 +783,24 @@ self._callbackList.append(preDeleteFn) - def getSubscribers(self, nodeIdentifier): + def getSubscribers(self, nodeIdentifier, pep, recipient): def cb(subscriptions): return [subscription.subscriber for subscription in subscriptions] - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getSubscriptions('subscribed')) d.addCallback(cb) return d - def deleteNode(self, nodeIdentifier, requestor, redirectURI=None): - d = self.storage.getNode(nodeIdentifier) + def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doPreDelete, redirectURI) + d.addCallback(self._doPreDelete, redirectURI, pep, recipient) return d - def _doPreDelete(self, result, redirectURI): + def _doPreDelete(self, result, redirectURI, pep, recipient): node, affiliation = result if affiliation != 'owner': @@ -754,19 +809,19 @@ data = {'node': node, 'redirectURI': redirectURI} - d = defer.DeferredList([cb(data) + d = defer.DeferredList([cb(data, pep, recipient) for cb in self._callbackList], consumeErrors=1) - d.addCallback(self._doDelete, node.nodeIdentifier) + d.addCallback(self._doDelete, node.nodeDbId) - def _doDelete(self, result, nodeIdentifier): + def _doDelete(self, result, nodeDbId): dl = [] for succeeded, r in result: if succeeded and r: dl.extend(r) - d = self.storage.deleteNode(nodeIdentifier) + d = self.storage.deleteNodeByDbId(nodeDbId) d.addCallback(self._doNotifyDelete, dl) return d @@ -812,6 +867,7 @@ error.Forbidden: ('forbidden', None, None), error.NotAuthorized: ('not-authorized', None, None), error.NotInRoster: ('not-authorized', 'not-in-roster-group', None), + error.ItemNotFound: ('item-not-found', None, None), error.ItemForbidden: ('bad-request', 'item-forbidden', None), error.ItemRequired: ('bad-request', 'item-required', None), error.NoInstantNodes: ('not-acceptable', @@ -838,11 +894,13 @@ self.backend = backend self.hideNodes = False - self.backend.registerNotifier(self._notify) + self.backend.registerPublishNotifier(self._notifyPublish) + self.backend.registerRetractNotifier(self._notifyRetract) self.backend.registerPreDelete(self._preDelete) - if self.backend.supportsCreatorCheck(): - self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to + # FIXME: to be removed, it's not useful anymore as PEP is now used + # if self.backend.supportsCreatorCheck(): + # self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to # a jid in this server) is created by the right jid if self.backend.supportsAutoCreate(): @@ -866,39 +924,14 @@ # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) - def _notify(self, data): - items = data['items'] + def _notifyPublish(self, data): + items_data = data['items_data'] node = data['node'] - - def _notifyAllowed(result): - """Check access of subscriber for each item, - and notify only allowed ones""" - notifications, (owner_jid,roster) = result - - #we filter items not allowed for the subscribers - notifications_filtered = [] - - for subscriber, subscriptions, _items in notifications: - allowed_items = [] #we keep only item which subscriber can access + pep = data['pep'] + recipient = data['recipient'] - for access_model, item_config, item in _items: - if access_model == 'open': - allowed_items.append(item) - elif access_model == 'roster': - _subscriber = subscriber.userhostJID() - if not _subscriber in roster: - continue - #the subscriber is known, is he in the right group ? - authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] - if roster[_subscriber].groups.intersection(authorized_groups): - allowed_items.append(item) - - else: #unknown access_model - raise NotImplementedError - - if allowed_items: - notifications_filtered.append((subscriber, subscriptions, allowed_items)) - + def afterPrepare(result): + owner_jid, notifications_filtered = result #we notify the owner #FIXME: check if this comply with XEP-0060 (option needed ?) #TODO: item's access model have to be sent back to owner @@ -910,7 +943,7 @@ """ #TODO: a test should check that only the owner get the item configuration back - access_model, item_config, item = item_data + item, access_model, item_config = item_data new_item = deepcopy(item) if item_config: new_item.addChild(item_config.toElement()) @@ -920,37 +953,136 @@ set([Subscription(node.nodeIdentifier, owner_jid, 'subscribed')]), - [getFullItem(item_data) for item_data in items])) + [getFullItem(item_data) for item_data in items_data])) + + if pep: + return self.backend.privilege.notifyPublish( + recipient, + node.nodeIdentifier, + notifications_filtered) + + else: + return self.pubsubService.notifyPublish( + self.serviceJID, + node.nodeIdentifier, + notifications_filtered) + + d = self._prepareNotify(items_data, node, data.get('subscription')) + d.addCallback(afterPrepare) + return d + + def _notifyRetract(self, data): + items_data = data['items_data'] + node = data['node'] + pep = data['pep'] + recipient = data['recipient'] - return self.pubsubService.notifyPublish( - self.serviceJID, - node.nodeIdentifier, - notifications_filtered) + def afterPrepare(result): + owner_jid, notifications_filtered = result + #we add the owner + + notifications_filtered.append((owner_jid, + set([Subscription(node.nodeIdentifier, + owner_jid, + 'subscribed')]), + [item for item, _, _ in items_data])) + + if pep: + return self.backend.privilege.notifyRetract( + recipient, + node.nodeIdentifier, + notifications_filtered) + + else: + return self.pubsubService.notifyRetract( + self.serviceJID, + node.nodeIdentifier, + notifications_filtered) + + d = self._prepareNotify(items_data, node, data.get('subscription')) + d.addCallback(afterPrepare) + return d - if 'subscription' not in data: - d1 = self.backend.getNotifications(node.nodeIdentifier, items) + def _prepareNotify(self, items_data, node, subscription=None): + """Do a bunch of permissions check and filter notifications + + The owner is not added to these notifications, + it must be called by the calling method + @param items_data(tuple): must contain: + - item (domish.Element) + - access_model (unicode) + - access_list (dict as returned getItemsById, or item_config) + @param node(LeafNode): node hosting the items + @param subscription(pubsub.Subscription, None): TODO + + @return (tuple): will contain: + - notifications_filtered + - node_owner_jid + - items_data + """ + + def filterNotifications(result): + """Check access of subscriber for each item, and keep only allowed ones""" + notifications, (owner_jid,roster) = result + + #we filter items not allowed for the subscribers + notifications_filtered = [] + + for subscriber, subscriptions, _items_data in notifications: + if subscriber == owner_jid: + # as notification is always sent to owner, + # we ignore owner if he is here + continue + allowed_items = [] #we keep only item which subscriber can access + + for item, access_model, access_list in _items_data: + if access_model == const.VAL_AMODEL_OPEN: + allowed_items.append(item) + elif access_model == const.VAL_AMODEL_ROSTER: + _subscriber = subscriber.userhostJID() + if not _subscriber in roster: + continue + #the subscriber is known, is he in the right group ? + authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED] + if roster[_subscriber].groups.intersection(authorized_groups): + allowed_items.append(item) + + else: #unknown access_model + raise NotImplementedError + + if allowed_items: + notifications_filtered.append((subscriber, subscriptions, allowed_items)) + return (owner_jid, notifications_filtered) + + + if subscription is None: + d1 = self.backend.getNotifications(node.nodeDbId, items_data) else: - subscription = data['subscription'] d1 = defer.succeed([(subscription.subscriber, [subscription], - items)]) + items_data)]) def _got_owner(owner_jid): #return a tuple with owner_jid and roster + def rosterEb(failure): + log.msg("Error while getting roster: {}".format(failure.value)) + return (owner_jid, {}) + d = self.backend.privilege.getRoster(owner_jid) - d.addErrback(self._rosterEb) + d.addErrback(rosterEb) d.addCallback(lambda roster: (owner_jid,roster)) + return d d2 = node.getNodeOwner() d2.addCallback(_got_owner) + d = defer.gatherResults([d1, d2]) + d.addCallback(filterNotifications) + return d - d = defer.gatherResults([d1, d2]) - d.addCallback(_notifyAllowed) - - def _preDelete(self, data): + def _preDelete(self, data, pep, recipient): nodeIdentifier = data['node'].nodeIdentifier redirectURI = data.get('redirectURI', None) - d = self.backend.getSubscribers(nodeIdentifier) + d = self.backend.getSubscribers(nodeIdentifier, pep, recipient) d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( self.serviceJID, nodeIdentifier, @@ -972,7 +1104,8 @@ raise exc - def getInfo(self, requestor, service, nodeIdentifier): + def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None): + return [] # FIXME: disabled for now, need to manage PEP if not requestor.resource: # this avoid error when getting a disco request from server during namespace delegation return [] @@ -1000,10 +1133,11 @@ return d - def getNodes(self, requestor, service, nodeIdentifier): + def getNodes(self, requestor, service, nodeIdentifier, pep=None): + return defer.succeed([]) # FIXME: disabled for now, need to manage PEP if service.resource: return defer.succeed([]) - d = self.backend.getNodes() + d = self.backend.getNodes(pep) return d.addErrback(self._mapErrors) @@ -1014,20 +1148,33 @@ if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): print "Auto-creating node %s" % (request.nodeIdentifier,) d = self.backend.createNode(request.nodeIdentifier, - request.sender) + request.sender, + pep=self._isPep(request), + recipient=request.recipient) d.addCallback(lambda ignore, request: self.backend.publish(request.nodeIdentifier, request.items, - request.sender), + request.sender, + self._isPep(request), + request.recipient, + ), request) return d return failure + def _isPep(self, request): + try: + return request.delegated + except AttributeError: + return False + def publish(self, request): d = self.backend.publish(request.nodeIdentifier, request.items, - request.sender) + request.sender, + self._isPep(request), + request.recipient) d.addErrback(self._publish_errb, request) return d.addErrback(self._mapErrors) @@ -1035,56 +1182,74 @@ def subscribe(self, request): d = self.backend.subscribe(request.nodeIdentifier, request.subscriber, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def unsubscribe(self, request): d = self.backend.unsubscribe(request.nodeIdentifier, request.subscriber, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def subscriptions(self, request): - d = self.backend.getSubscriptions(request.sender) + d = self.backend.getSubscriptions(self._isPep(request), + request.sender) return d.addErrback(self._mapErrors) def affiliations(self, request): - d = self.backend.getAffiliations(request.sender) + d = self.backend.getAffiliations(self._isPep(request), + request.sender) return d.addErrback(self._mapErrors) def create(self, request): d = self.backend.createNode(request.nodeIdentifier, - request.sender, request.options) + request.sender, request.options, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def default(self, request): - d = self.backend.getDefaultConfiguration(request.nodeType) + d = self.backend.getDefaultConfiguration(request.nodeType, + self._isPep(request), + request.sender) return d.addErrback(self._mapErrors) def configureGet(self, request): - d = self.backend.getNodeConfiguration(request.nodeIdentifier) + d = self.backend.getNodeConfiguration(request.nodeIdentifier, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def configureSet(self, request): d = self.backend.setNodeConfiguration(request.nodeIdentifier, request.options, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def items(self, request): ext_data = {} - if const.FLAG_ENABLE_RSM: + if const.FLAG_ENABLE_RSM and request.rsm is not None: ext_data['rsm'] = request.rsm + try: + ext_data['pep'] = request.delegated + except AttributeError: + pass d = self.backend.getItems(request.nodeIdentifier, - request.sender, + request.recipient, request.maxItems, request.itemIdentifiers, ext_data) @@ -1093,19 +1258,26 @@ def retract(self, request): d = self.backend.retractItem(request.nodeIdentifier, request.itemIdentifiers, - request.sender) + request.sender, + request.notify, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def purge(self, request): d = self.backend.purgeNode(request.nodeIdentifier, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def delete(self, request): d = self.backend.deleteNode(request.nodeIdentifier, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) components.registerAdapter(PubSubResourceFromBackend,