Mercurial > libervia-pubsub
changeset 294:df1edebb0466
PEP implementation, draft (huge patch sorry):
/!\ database schema has changed ! /!\
- whole PEP behaviour is not managed yet
- if the stanza is delegated, PEP is assumed
- fixed potential SQL injection in pgsql_storage
- publish notifications manage PEP
- added retract notifications (if "notify" attribute is present), with PEP handling
- a publisher can't replace an item he didn't publised anymore
- /!\ schema has changed, sat_pubsub_update_0_1.sql update it
- sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 16 Aug 2015 01:32:42 +0200 |
parents | b96a4ac25f8b |
children | bed30cef11a8 |
files | db/pubsub.sql db/sat_pubsub_update_0_1.sql sat_pubsub/__init__.py sat_pubsub/backend.py sat_pubsub/const.py sat_pubsub/error.py sat_pubsub/pgsql_storage.py |
diffstat | 7 files changed, 681 insertions(+), 371 deletions(-) [+] |
line wrap: on
line diff
--- a/db/pubsub.sql Sun Aug 16 01:15:13 2015 +0200 +++ b/db/pubsub.sql Sun Aug 16 01:32:42 2015 +0200 @@ -5,7 +5,8 @@ CREATE TABLE nodes ( node_id serial PRIMARY KEY, - node text NOT NULL UNIQUE, + node text NOT NULL, + pep text, node_type text NOT NULL DEFAULT 'leaf' CHECK (node_type IN ('leaf', 'collection')), access_model text NOT NULL DEFAULT 'open' @@ -15,9 +16,15 @@ send_last_published_item text NOT NULL DEFAULT 'on_sub' CHECK (send_last_published_item IN ('never', 'on_sub')), publish_model text NOT NULL DEFAULT 'publishers' - CHECK (publish_model IN ('publishers', 'subscribers', 'open')) + CHECK (publish_model IN ('publishers', 'subscribers', 'open')), + UNIQUE (node, pep) WHERE pep IS NOT NULL, + UNIQUE (node) WHERE pep IS NULL ); +/* we need 2 partial indexes to manage NULL value for PEP */ +CREATE UNIQUE INDEX nodes_node_pep_key_not_null ON nodes(node, pep) WHERE pep IS NOT NULL; +CREATE UNIQUE INDEX nodes_node_pep_key_null ON nodes(node) WHERE pep IS NULL; + INSERT INTO nodes (node, node_type) values ('', 'collection'); CREATE TABLE affiliations ( @@ -68,3 +75,9 @@ UNIQUE (item_id,groupname) ); +CREATE TABLE metadata ( + key text PRIMARY KEY, + value text +); + +INSERT INTO metadata VALUES ('version', '1');
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/db/sat_pubsub_update_0_1.sql Sun Aug 16 01:32:42 2015 +0200 @@ -0,0 +1,25 @@ +ALTER TABLE nodes ADD COLUMN pep text; + +ALTER TABLE nodes DROP CONSTRAINT nodes_node_key; +/* we need 2 partial indexes to manage NULL value for PEP */ +CREATE UNIQUE INDEX nodes_node_pep_key_not_null ON nodes(node, pep) WHERE pep IS NOT NULL; +CREATE UNIQUE INDEX nodes_node_pep_key_null ON nodes(node) WHERE pep IS NULL; + +CREATE TABLE metadata ( + key text PRIMARY KEY, + value text +); + +INSERT INTO metadata VALUES ('version', '1'); + +UPDATE nodes SET node='urn:xmpp:microblog:0', pep=substring(node from 20) WHERE node LIKE 'urn:xmpp:groupblog:_%'; + +/* This is to update namespaces, SàT was bugguy before 0.6 and didn't set the atom namespace in <entry/> */ +/* But yeah, this is a crazy query */ +UPDATE items SET data = xmlelement(name item, xmlattributes((xpath('/item/@id', data::xml))[1] as id), + XMLPARSE(CONTENT NULLIF(array_to_string(xpath('/item/entry/preceding-sibling::*', data::xml)::text[],''),'')), + xmlelement(name entry, xmlattributes('http://www.w3.org/2005/Atom' as xmlns), array_to_string(xpath('/item/entry/*', data::xml)::text[], '')::xml), + XMLPARSE(CONTENT NULLIF(array_to_string(xpath('/item/entry/following-sibling::*', data::xml)::text[],''),''))) + FROM nodes WHERE nodes.node_id = items.node_id + AND node = 'urn:xmpp:microblog:0' + AND XMLEXISTS('/item/entry' PASSING (data::xml));
--- a/sat_pubsub/__init__.py Sun Aug 16 01:15:13 2015 +0200 +++ b/sat_pubsub/__init__.py Sun Aug 16 01:32:42 2015 +0200 @@ -60,6 +60,7 @@ # TODO: remove this when RSM and MAM are in wokkel import wokkel +from wokkel import pubsub, delay from sat.tmp.wokkel import delay as tmp_delay, pubsub as tmp_pubsub, rsm as tmp_rsm, mam as tmp_mam wokkel.delay = tmp_delay wokkel.pubsub = tmp_pubsub
--- a/sat_pubsub/backend.py Sun Aug 16 01:15:13 2015 +0200 +++ b/sat_pubsub/backend.py Sun Aug 16 01:32:42 2015 +0200 @@ -70,7 +70,7 @@ from twisted.python import components, log from twisted.internet import defer, reactor from twisted.words.protocols.jabber.error import StanzaError -from twisted.words.protocols.jabber.jid import JID, InvalidFormat +# from twisted.words.protocols.jabber.jid import JID, InvalidFormat from twisted.words.xish import utility from wokkel import disco, data_form, rsm @@ -103,13 +103,13 @@ implements(iidavoll.IBackendService) nodeOptions = { - "pubsub#persist_items": + const.OPT_PERSIST_ITEMS: {"type": "boolean", "label": "Persist items to storage"}, - "pubsub#deliver_payloads": + const.OPT_DELIVER_PAYLOADS: {"type": "boolean", "label": "Deliver payloads with event notifications"}, - "pubsub#send_last_published_item": + const.OPT_SEND_LAST_PUBLISHED_ITEM: {"type": "list-single", "label": "When to send the last published item", "options": { @@ -181,18 +181,20 @@ return True - def getNodeType(self, nodeIdentifier): - d = self.storage.getNode(nodeIdentifier) + def getNodeType(self, nodeIdentifier, pep, recipient=None): + # FIXME: manage pep and recipient + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getType()) return d - def getNodes(self): - return self.storage.getNodeIds() + def getNodes(self, pep): + return self.storage.getNodeIds(pep) - def getNodeMetaData(self, nodeIdentifier): - d = self.storage.getNode(nodeIdentifier) + def getNodeMetaData(self, nodeIdentifier, pep, recipient=None): + # FIXME: manage pep and recipient + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getMetaData()) d.addCallback(self._makeMetaData) return d @@ -214,14 +216,13 @@ """ Check authorisation of publishing in node for requestor """ def check(affiliation): - d = defer.succeed(node) + d = defer.succeed((affiliation, node)) configuration = node.getConfiguration() publish_model = configuration[const.OPT_PUBLISH_MODEL] - - if (publish_model == const.VAL_PMODEL_PUBLISHERS): + if publish_model == const.VAL_PMODEL_PUBLISHERS: if affiliation not in ['owner', 'publisher']: raise error.Forbidden() - elif (publish_model == const.VAL_PMODEL_SUBSCRIBERS): + elif publish_model == const.VAL_PMODEL_SUBSCRIBERS: if affiliation not in ['owner', 'publisher']: # we are in subscribers publish model, we must check that # the requestor is a subscriber to allow him to publish @@ -229,12 +230,12 @@ def checkSubscription(subscribed): if not subscribed: raise error.Forbidden() - return node + return (affiliation, node) d.addCallback(lambda ignore: node.isSubscribed(requestor)) d.addCallback(checkSubscription) elif publish_model != const.VAL_PMODEL_OPEN: - raise Exception('Unexpected value') # publish_model must be publishers (default), subscribers or open. + raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open. return d @@ -246,6 +247,7 @@ """Get and remove item configuration information @param item: """ + # FIXME: dirty ! Need to use elements() item_config = None access_model = const.VAL_AMODEL_DEFAULT for i in range(len(item.children)): @@ -264,60 +266,87 @@ return (access_model, item_config) - def publish(self, nodeIdentifier, items, requestor): - d = self.storage.getNode(nodeIdentifier) + def _checkOverwrite(self, node, itemIdentifiers, publisher): + """Check that the itemIdentifiers correspond to items published + by the current publisher""" + def doCheck(item_pub_map): + for item_publisher in item_pub_map.iterValues(): + if item_publisher.userhost() != publisher.userhost(): + raise error.ItemForbidden() + + d = node.getItemsPublishers(itemIdentifiers) + d.addCallback(doCheck) + return d + + + def publish(self, nodeIdentifier, items, requestor, pep, recipient): + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(self._checkAuth, requestor) #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster. #FIXME: in addition, there can be several owners: that is not managed yet - d.addCallback(self._doPublish, items, requestor) + d.addCallback(self._doPublish, items, requestor, pep, recipient) return d - def _doPublish(self, node, items, requestor): + def _doPublish(self, result, items, requestor, pep, recipient): + affiliation, node = result if node.nodeType == 'collection': raise error.NoPublishing() configuration = node.getConfiguration() - persistItems = configuration["pubsub#persist_items"] - deliverPayloads = configuration["pubsub#deliver_payloads"] + persistItems = configuration[const.OPT_PERSIST_ITEMS] + deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS] if items and not persistItems and not deliverPayloads: raise error.ItemForbidden() elif not items and (persistItems or deliverPayloads): raise error.ItemRequired() - parsed_items = [] + items_data = [] + check_overwrite = False for item in items: if persistItems or deliverPayloads: item.uri = None item.defaultUri = None if not item.getAttribute("id"): item["id"] = str(uuid.uuid4()) + else: + check_overwrite = True access_model, item_config = self.parseItemConfig(item) - parsed_items.append((access_model, item_config, item)) + items_data.append((item, access_model, item_config)) if persistItems: - d = node.storeItems(parsed_items, requestor) + if check_overwrite and affiliation != 'owner': + # we don't want a publisher to overwrite the item + # of an other publisher + d = self._checkOverwrite(node, [item['id'] for item in items if item.getAttribute('id')], requestor) + d.addCallback(lambda _: node.storeItems(items_data, requestor)) + else: + d = node.storeItems(items_data, requestor) else: d = defer.succeed(None) - d.addCallback(self._doNotify, node, parsed_items, - deliverPayloads) + d.addCallback(self._doNotify, node, items_data, + deliverPayloads, pep, recipient) return d - def _doNotify(self, result, node, items, deliverPayloads): - if items and not deliverPayloads: - for access_model, item_config, item in items: + def _doNotify(self, result, node, items_data, deliverPayloads, pep, recipient): + if items_data and not deliverPayloads: + for access_model, item_config, item in items_data: item.children = [] - - self.dispatch({'items': items, 'node': node}, + self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient}, '//event/pubsub/notify') - def getNotifications(self, nodeIdentifier, items): + def getNotifications(self, nodeDbId, items_data): + """Build a list of subscriber to the node - def toNotifications(subscriptions, nodeIdentifier, items): + subscribers will be associated with subscribed items, + and subscription type. + """ + + def toNotifications(subscriptions, items_data): subsBySubscriber = {} for subscription in subscriptions: if subscription.options.get('pubsub#subscription_type', @@ -326,7 +355,7 @@ set()) subs.add(subscription) - notifications = [(subscriber, subscriptions_, items) + notifications = [(subscriber, subscriptions_, items_data) for subscriber, subscriptions_ in subsBySubscriber.iteritems()] @@ -336,37 +365,46 @@ failure.trap(error.NodeNotFound) return [] - d1 = self.storage.getNode(nodeIdentifier) + d1 = self.storage.getNodeById(nodeDbId) d1.addCallback(lambda node: node.getSubscriptions('subscribed')) - d2 = self.storage.getNode('') - d2.addCallback(lambda node: node.getSubscriptions('subscribed')) - d2.addErrback(rootNotFound) - d = defer.gatherResults([d1, d2]) - d.addCallback(lambda result: result[0] + result[1]) - d.addCallback(toNotifications, nodeIdentifier, items) - return d + # FIXME: must add root node subscriptions ? + # d2 = self.storage.getNode('', False) # FIXME: to check + # d2.addCallback(lambda node: node.getSubscriptions('subscribed')) + # d2.addErrback(rootNotFound) + # d = defer.gatherResults([d1, d2]) + # d.addCallback(lambda result: result[0] + result[1]) + d1.addCallback(toNotifications, items_data) + return d1 - def registerNotifier(self, observerfn, *args, **kwargs): + def registerPublishNotifier(self, observerfn, *args, **kwargs): self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) - def subscribe(self, nodeIdentifier, subscriber, requestor): + def registerRetractNotifier(self, observerfn, *args, **kwargs): + self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs) + + def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): subscriberEntity = subscriber.userhostJID() if subscriberEntity != requestor.userhostJID(): return defer.fail(error.Forbidden()) - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, subscriberEntity) d.addCallback(self._doSubscribe, subscriber) return d def _doSubscribe(self, result, subscriber): + # TODO: implement other access models node, affiliation = result - #FIXME: must check node's access_model before subscribing if affiliation == 'outcast': raise error.Forbidden() + access_model = node.getAccessModel() + + if access_model != const.VAL_AMODEL_OPEN: + raise NotImplementedError + def trapExists(failure): failure.trap(error.SubscriptionExists) return False @@ -380,6 +418,7 @@ d = node.addSubscription(subscriber, 'subscribed', {}) d.addCallbacks(lambda _: True, trapExists) d.addCallback(cb) + return d @@ -406,11 +445,11 @@ return subscription - def unsubscribe(self, nodeIdentifier, subscriber, requestor): + def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): if subscriber.userhostJID() != requestor.userhostJID(): return defer.fail(error.Forbidden()) - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.removeSubscription(subscriber)) return d @@ -428,33 +467,33 @@ return True - def createNode(self, nodeIdentifier, requestor, options = None): + def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None): if not nodeIdentifier: nodeIdentifier = 'generic/%s' % uuid.uuid4() if not options: options = {} - if self.supportsCreatorCheck(): - groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) - try: - nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) - except InvalidFormat: - is_user_jid = False - else: - is_user_jid = bool(nodeIdentifierJID.user) + # if self.supportsCreatorCheck(): + # groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) + # try: + # nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) + # except InvalidFormat: + # is_user_jid = False + # else: + # is_user_jid = bool(nodeIdentifierJID.user) - if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): - #we have an user jid node, but not created by the owner of this jid - print "Wrong creator" - raise error.Forbidden() + # if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): + # #we have an user jid node, but not created by the owner of this jid + # print "Wrong creator" + # raise error.Forbidden() nodeType = 'leaf' config = self.storage.getDefaultConfiguration(nodeType) config['pubsub#node_type'] = nodeType config.update(options) - d = self.storage.createNode(nodeIdentifier, requestor, config) + d = self.storage.createNode(nodeIdentifier, requestor, config, pep, recipient) d.addCallback(lambda _: nodeIdentifier) return d @@ -464,21 +503,21 @@ return d - def getNodeConfiguration(self, nodeIdentifier): + def getNodeConfiguration(self, nodeIdentifier, pep, recipient): if not nodeIdentifier: return defer.fail(error.NoRootNode()) - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getConfiguration()) return d - def setNodeConfiguration(self, nodeIdentifier, options, requestor): + def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient): if not nodeIdentifier: return defer.fail(error.NoRootNode()) - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, requestor) d.addCallback(self._doSetNodeConfiguration, options) return d @@ -497,13 +536,13 @@ return self.storage.getAffiliations(entity) - def getItems(self, nodeIdentifier, requestor, maxItems=None, + def getItems(self, nodeIdentifier, recipient, maxItems=None, itemIdentifiers=None, ext_data=None): if ext_data is None: ext_data = {} - d = self.storage.getNode(nodeIdentifier) - d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doGetItems, requestor, maxItems, itemIdentifiers, + d = self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) + d.addCallback(_getAffiliation, recipient) + d.addCallback(self._doGetItems, recipient, maxItems, itemIdentifiers, ext_data) return d @@ -545,7 +584,7 @@ elif access_model == const.VAL_AMODEL_ROSTER: form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG) access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER) - allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list) + allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list[const.OPT_ROSTER_GROUPS_ALLOWED]) form.addField(access) form.addField(allowed) item.addChild(form.toElement()) @@ -590,15 +629,15 @@ if affiliation == 'outcast': raise error.Forbidden() - access_model = node.getConfiguration()["pubsub#access_model"] + access_model = node.getAccessModel() d = node.getNodeOwner() d.addCallback(self.privilege.getRoster) d.addErrback(self._rosterEb) - if access_model == 'open' or affiliation == 'owner': + if access_model == const.VAL_AMODEL_OPEN or affiliation == 'owner': d.addCallback(lambda roster: (True, roster)) d.addCallback(access_checked) - elif access_model == 'roster': + elif access_model == const.VAL_AMODEL_ROSTER: d.addCallback(self._getNodeGroups,node.nodeIdentifier) d.addCallback(self.checkGroup, requestor) d.addCallback(access_checked) @@ -650,36 +689,39 @@ return defer.DeferredList(d_list).addCallback(render) - def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): - d = self.storage.getNode(nodeIdentifier) + def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, requestor) - if const.FLAG_RETRACT_ALLOW_PUBLISHER: - d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor) - else: - d.addCallback(self._doRetract, itemIdentifiers) + # FIXME: to be checked + # if const.FLAG_RETRACT_ALLOW_PUBLISHER: + # d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor) + # else: + # d.addCallback(self._doRetract, itemIdentifiers) + d.addCallback(self._doRetract, itemIdentifiers, notify, pep, recipient) return d - def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor): - """This method has been added to allow the publisher - of an item to retract it, even if he has no affiliation - to that item. For instance, this allows you to delete - an item you posted in a node of "open" publish model. - """ + # FIXME: to be checked + # def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor): + # """This method has been added to allow the publisher + # of an item to retract it, even if he has no affiliation + # to that item. For instance, this allows you to delete + # an item you posted in a node of "open" publish model. + # """ + # node, affiliation = result + # if affiliation in ['owner', 'publisher']: + # return self._doRetract(result, itemIdentifiers) + # d = node.filterItemsWithPublisher(itemIdentifiers, requestor) + # def filterCb(filteredItems): + # if not filteredItems: + # return self._doRetract(result, itemIdentifiers) + # # XXX: fake an affiliation that does NOT exist + # return self._doRetract((node, 'publisher'), filteredItems) + # d.addCallback(filterCb) + # return d + + def _doRetract(self, result, itemIdentifiers, notify, pep, recipient): node, affiliation = result - if affiliation in ['owner', 'publisher']: - return self._doRetract(result, itemIdentifiers) - d = node.filterItemsWithPublisher(itemIdentifiers, requestor) - def filterCb(filteredItems): - if not filteredItems: - return self._doRetract(result, itemIdentifiers) - # XXX: fake an affiliation that does NOT exist - return self._doRetract((node, 'publisher'), filteredItems) - d.addCallback(filterCb) - return d - - def _doRetract(self, result, itemIdentifiers): - node, affiliation = result - persistItems = node.getConfiguration()["pubsub#persist_items"] + persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] if affiliation not in ['owner', 'publisher']: raise error.Forbidden() @@ -687,19 +729,32 @@ if not persistItems: raise error.NodeNotPersistent() - d = node.removeItems(itemIdentifiers) - d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) + # we need to get the items before removing them, for the notifications + + def removeItems(items_data): + """Remove the items and keep only actually removed ones in items_data""" + d = node.removeItems(itemIdentifiers) + d.addCallback(lambda removed: [item_data for item_data in items_data if item_data[0]["id"] in removed]) + return d + + d = node.getItemsById(None, True, itemIdentifiers) + d.addCallback(removeItems) + + if notify: + d.addCallback(self._doNotifyRetraction, node, pep, recipient) return d - def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): - self.dispatch({'itemIdentifiers': itemIdentifiers, - 'nodeIdentifier': nodeIdentifier }, + def _doNotifyRetraction(self, items_data, node, pep, recipient): + self.dispatch({'items_data': items_data, + 'node': node, + 'pep': pep, + 'recipient': recipient}, '//event/pubsub/retract') - def purgeNode(self, nodeIdentifier, requestor): - d = self.storage.getNode(nodeIdentifier) + def purgeNode(self, nodeIdentifier, requestor, pep, recipient): + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, requestor) d.addCallback(self._doPurge) return d @@ -707,7 +762,7 @@ def _doPurge(self, result): node, affiliation = result - persistItems = node.getConfiguration()["pubsub#persist_items"] + persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] if affiliation != 'owner': raise error.Forbidden() @@ -728,24 +783,24 @@ self._callbackList.append(preDeleteFn) - def getSubscribers(self, nodeIdentifier): + def getSubscribers(self, nodeIdentifier, pep, recipient): def cb(subscriptions): return [subscription.subscriber for subscription in subscriptions] - d = self.storage.getNode(nodeIdentifier) + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(lambda node: node.getSubscriptions('subscribed')) d.addCallback(cb) return d - def deleteNode(self, nodeIdentifier, requestor, redirectURI=None): - d = self.storage.getNode(nodeIdentifier) + def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): + d = self.storage.getNode(nodeIdentifier, pep, recipient) d.addCallback(_getAffiliation, requestor) - d.addCallback(self._doPreDelete, redirectURI) + d.addCallback(self._doPreDelete, redirectURI, pep, recipient) return d - def _doPreDelete(self, result, redirectURI): + def _doPreDelete(self, result, redirectURI, pep, recipient): node, affiliation = result if affiliation != 'owner': @@ -754,19 +809,19 @@ data = {'node': node, 'redirectURI': redirectURI} - d = defer.DeferredList([cb(data) + d = defer.DeferredList([cb(data, pep, recipient) for cb in self._callbackList], consumeErrors=1) - d.addCallback(self._doDelete, node.nodeIdentifier) + d.addCallback(self._doDelete, node.nodeDbId) - def _doDelete(self, result, nodeIdentifier): + def _doDelete(self, result, nodeDbId): dl = [] for succeeded, r in result: if succeeded and r: dl.extend(r) - d = self.storage.deleteNode(nodeIdentifier) + d = self.storage.deleteNodeByDbId(nodeDbId) d.addCallback(self._doNotifyDelete, dl) return d @@ -812,6 +867,7 @@ error.Forbidden: ('forbidden', None, None), error.NotAuthorized: ('not-authorized', None, None), error.NotInRoster: ('not-authorized', 'not-in-roster-group', None), + error.ItemNotFound: ('item-not-found', None, None), error.ItemForbidden: ('bad-request', 'item-forbidden', None), error.ItemRequired: ('bad-request', 'item-required', None), error.NoInstantNodes: ('not-acceptable', @@ -838,11 +894,13 @@ self.backend = backend self.hideNodes = False - self.backend.registerNotifier(self._notify) + self.backend.registerPublishNotifier(self._notifyPublish) + self.backend.registerRetractNotifier(self._notifyRetract) self.backend.registerPreDelete(self._preDelete) - if self.backend.supportsCreatorCheck(): - self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to + # FIXME: to be removed, it's not useful anymore as PEP is now used + # if self.backend.supportsCreatorCheck(): + # self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to # a jid in this server) is created by the right jid if self.backend.supportsAutoCreate(): @@ -866,39 +924,14 @@ # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) - def _notify(self, data): - items = data['items'] + def _notifyPublish(self, data): + items_data = data['items_data'] node = data['node'] - - def _notifyAllowed(result): - """Check access of subscriber for each item, - and notify only allowed ones""" - notifications, (owner_jid,roster) = result - - #we filter items not allowed for the subscribers - notifications_filtered = [] - - for subscriber, subscriptions, _items in notifications: - allowed_items = [] #we keep only item which subscriber can access + pep = data['pep'] + recipient = data['recipient'] - for access_model, item_config, item in _items: - if access_model == 'open': - allowed_items.append(item) - elif access_model == 'roster': - _subscriber = subscriber.userhostJID() - if not _subscriber in roster: - continue - #the subscriber is known, is he in the right group ? - authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] - if roster[_subscriber].groups.intersection(authorized_groups): - allowed_items.append(item) - - else: #unknown access_model - raise NotImplementedError - - if allowed_items: - notifications_filtered.append((subscriber, subscriptions, allowed_items)) - + def afterPrepare(result): + owner_jid, notifications_filtered = result #we notify the owner #FIXME: check if this comply with XEP-0060 (option needed ?) #TODO: item's access model have to be sent back to owner @@ -910,7 +943,7 @@ """ #TODO: a test should check that only the owner get the item configuration back - access_model, item_config, item = item_data + item, access_model, item_config = item_data new_item = deepcopy(item) if item_config: new_item.addChild(item_config.toElement()) @@ -920,37 +953,136 @@ set([Subscription(node.nodeIdentifier, owner_jid, 'subscribed')]), - [getFullItem(item_data) for item_data in items])) + [getFullItem(item_data) for item_data in items_data])) + + if pep: + return self.backend.privilege.notifyPublish( + recipient, + node.nodeIdentifier, + notifications_filtered) + + else: + return self.pubsubService.notifyPublish( + self.serviceJID, + node.nodeIdentifier, + notifications_filtered) + + d = self._prepareNotify(items_data, node, data.get('subscription')) + d.addCallback(afterPrepare) + return d + + def _notifyRetract(self, data): + items_data = data['items_data'] + node = data['node'] + pep = data['pep'] + recipient = data['recipient'] - return self.pubsubService.notifyPublish( - self.serviceJID, - node.nodeIdentifier, - notifications_filtered) + def afterPrepare(result): + owner_jid, notifications_filtered = result + #we add the owner + + notifications_filtered.append((owner_jid, + set([Subscription(node.nodeIdentifier, + owner_jid, + 'subscribed')]), + [item for item, _, _ in items_data])) + + if pep: + return self.backend.privilege.notifyRetract( + recipient, + node.nodeIdentifier, + notifications_filtered) + + else: + return self.pubsubService.notifyRetract( + self.serviceJID, + node.nodeIdentifier, + notifications_filtered) + + d = self._prepareNotify(items_data, node, data.get('subscription')) + d.addCallback(afterPrepare) + return d - if 'subscription' not in data: - d1 = self.backend.getNotifications(node.nodeIdentifier, items) + def _prepareNotify(self, items_data, node, subscription=None): + """Do a bunch of permissions check and filter notifications + + The owner is not added to these notifications, + it must be called by the calling method + @param items_data(tuple): must contain: + - item (domish.Element) + - access_model (unicode) + - access_list (dict as returned getItemsById, or item_config) + @param node(LeafNode): node hosting the items + @param subscription(pubsub.Subscription, None): TODO + + @return (tuple): will contain: + - notifications_filtered + - node_owner_jid + - items_data + """ + + def filterNotifications(result): + """Check access of subscriber for each item, and keep only allowed ones""" + notifications, (owner_jid,roster) = result + + #we filter items not allowed for the subscribers + notifications_filtered = [] + + for subscriber, subscriptions, _items_data in notifications: + if subscriber == owner_jid: + # as notification is always sent to owner, + # we ignore owner if he is here + continue + allowed_items = [] #we keep only item which subscriber can access + + for item, access_model, access_list in _items_data: + if access_model == const.VAL_AMODEL_OPEN: + allowed_items.append(item) + elif access_model == const.VAL_AMODEL_ROSTER: + _subscriber = subscriber.userhostJID() + if not _subscriber in roster: + continue + #the subscriber is known, is he in the right group ? + authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED] + if roster[_subscriber].groups.intersection(authorized_groups): + allowed_items.append(item) + + else: #unknown access_model + raise NotImplementedError + + if allowed_items: + notifications_filtered.append((subscriber, subscriptions, allowed_items)) + return (owner_jid, notifications_filtered) + + + if subscription is None: + d1 = self.backend.getNotifications(node.nodeDbId, items_data) else: - subscription = data['subscription'] d1 = defer.succeed([(subscription.subscriber, [subscription], - items)]) + items_data)]) def _got_owner(owner_jid): #return a tuple with owner_jid and roster + def rosterEb(failure): + log.msg("Error while getting roster: {}".format(failure.value)) + return (owner_jid, {}) + d = self.backend.privilege.getRoster(owner_jid) - d.addErrback(self._rosterEb) + d.addErrback(rosterEb) d.addCallback(lambda roster: (owner_jid,roster)) + return d d2 = node.getNodeOwner() d2.addCallback(_got_owner) + d = defer.gatherResults([d1, d2]) + d.addCallback(filterNotifications) + return d - d = defer.gatherResults([d1, d2]) - d.addCallback(_notifyAllowed) - - def _preDelete(self, data): + def _preDelete(self, data, pep, recipient): nodeIdentifier = data['node'].nodeIdentifier redirectURI = data.get('redirectURI', None) - d = self.backend.getSubscribers(nodeIdentifier) + d = self.backend.getSubscribers(nodeIdentifier, pep, recipient) d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( self.serviceJID, nodeIdentifier, @@ -972,7 +1104,8 @@ raise exc - def getInfo(self, requestor, service, nodeIdentifier): + def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None): + return [] # FIXME: disabled for now, need to manage PEP if not requestor.resource: # this avoid error when getting a disco request from server during namespace delegation return [] @@ -1000,10 +1133,11 @@ return d - def getNodes(self, requestor, service, nodeIdentifier): + def getNodes(self, requestor, service, nodeIdentifier, pep=None): + return defer.succeed([]) # FIXME: disabled for now, need to manage PEP if service.resource: return defer.succeed([]) - d = self.backend.getNodes() + d = self.backend.getNodes(pep) return d.addErrback(self._mapErrors) @@ -1014,20 +1148,33 @@ if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): print "Auto-creating node %s" % (request.nodeIdentifier,) d = self.backend.createNode(request.nodeIdentifier, - request.sender) + request.sender, + pep=self._isPep(request), + recipient=request.recipient) d.addCallback(lambda ignore, request: self.backend.publish(request.nodeIdentifier, request.items, - request.sender), + request.sender, + self._isPep(request), + request.recipient, + ), request) return d return failure + def _isPep(self, request): + try: + return request.delegated + except AttributeError: + return False + def publish(self, request): d = self.backend.publish(request.nodeIdentifier, request.items, - request.sender) + request.sender, + self._isPep(request), + request.recipient) d.addErrback(self._publish_errb, request) return d.addErrback(self._mapErrors) @@ -1035,56 +1182,74 @@ def subscribe(self, request): d = self.backend.subscribe(request.nodeIdentifier, request.subscriber, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def unsubscribe(self, request): d = self.backend.unsubscribe(request.nodeIdentifier, request.subscriber, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def subscriptions(self, request): - d = self.backend.getSubscriptions(request.sender) + d = self.backend.getSubscriptions(self._isPep(request), + request.sender) return d.addErrback(self._mapErrors) def affiliations(self, request): - d = self.backend.getAffiliations(request.sender) + d = self.backend.getAffiliations(self._isPep(request), + request.sender) return d.addErrback(self._mapErrors) def create(self, request): d = self.backend.createNode(request.nodeIdentifier, - request.sender, request.options) + request.sender, request.options, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def default(self, request): - d = self.backend.getDefaultConfiguration(request.nodeType) + d = self.backend.getDefaultConfiguration(request.nodeType, + self._isPep(request), + request.sender) return d.addErrback(self._mapErrors) def configureGet(self, request): - d = self.backend.getNodeConfiguration(request.nodeIdentifier) + d = self.backend.getNodeConfiguration(request.nodeIdentifier, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def configureSet(self, request): d = self.backend.setNodeConfiguration(request.nodeIdentifier, request.options, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def items(self, request): ext_data = {} - if const.FLAG_ENABLE_RSM: + if const.FLAG_ENABLE_RSM and request.rsm is not None: ext_data['rsm'] = request.rsm + try: + ext_data['pep'] = request.delegated + except AttributeError: + pass d = self.backend.getItems(request.nodeIdentifier, - request.sender, + request.recipient, request.maxItems, request.itemIdentifiers, ext_data) @@ -1093,19 +1258,26 @@ def retract(self, request): d = self.backend.retractItem(request.nodeIdentifier, request.itemIdentifiers, - request.sender) + request.sender, + request.notify, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def purge(self, request): d = self.backend.purgeNode(request.nodeIdentifier, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) def delete(self, request): d = self.backend.deleteNode(request.nodeIdentifier, - request.sender) + request.sender, + self._isPep(request), + request.recipient) return d.addErrback(self._mapErrors) components.registerAdapter(PubSubResourceFromBackend,
--- a/sat_pubsub/const.py Sun Aug 16 01:15:13 2015 +0200 +++ b/sat_pubsub/const.py Sun Aug 16 01:32:42 2015 +0200 @@ -57,6 +57,9 @@ NS_ITEM_CONFIG = "http://jabber.org/protocol/pubsub#item-config" OPT_ACCESS_MODEL = 'pubsub#access_model' OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed' +OPT_PERSIST_ITEMS = "pubsub#persist_items" +OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads" +OPT_SEND_LAST_PUBLISHED_ITEM = "pubsub#send_last_published_item" OPT_PUBLISH_MODEL = 'pubsub#publish_model' VAL_AMODEL_OPEN = 'open' VAL_AMODEL_ROSTER = 'roster' @@ -67,7 +70,7 @@ VAL_PMODEL_OPEN = 'open' VAL_PMODEL_DEFAULT = VAL_PMODEL_PUBLISHERS -FLAG_RETRACT_ALLOW_PUBLISHER = True # XXX: see the method BackendService._doRetractAllowPublisher +# FLAG_RETRACT_ALLOW_PUBLISHER = True # XXX: see the method BackendService._doRetractAllowPublisher FLAG_ENABLE_RSM = True VAL_RSM_MAX_DEFAULT = 10 FLAG_ENABLE_MAM = True
--- a/sat_pubsub/error.py Sun Aug 16 01:15:13 2015 +0200 +++ b/sat_pubsub/error.py Sun Aug 16 01:32:42 2015 +0200 @@ -97,12 +97,17 @@ class NotAuthorized(Error): pass - + class NotInRoster(Error): pass - + + + +class ItemNotFound(Error): + pass + class ItemForbidden(Error):
--- a/sat_pubsub/pgsql_storage.py Sun Aug 16 01:15:13 2015 +0200 +++ b/sat_pubsub/pgsql_storage.py Sun Aug 16 01:32:42 2015 +0200 @@ -72,21 +72,44 @@ # parseXml manage str, but we get unicode parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) + +def withPEP(query, values, pep, recipient, pep_table=None): + """Helper method to facilitate PEP management + + @param query: SQL query basis + @param values: current values to replace in query + @param pep: True if we are in PEP mode + @param recipient: jid of the recipient + @param pep_table: added before pep if table need to be specified + @return: query + PEP AND check, + recipient's bare jid is added to value if needed + """ + pep_col_name = "{}pep".format( + '' if pep_table is None + else ".{}".format(pep_table)) + if pep: + pep_check="AND {}=%s".format(pep_col_name) + values=list(values) + [recipient.userhost()] + else: + pep_check="AND {} IS NULL".format(pep_col_name) + return "{} {}".format(query, pep_check), values + + class Storage: implements(iidavoll.IStorage) defaultConfig = { 'leaf': { - "pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', + const.OPT_PERSIST_ITEMS: True, + const.OPT_DELIVER_PAYLOADS: True, + const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, }, 'collection': { - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', + const.OPT_DELIVER_PAYLOADS: True, + const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, } @@ -95,83 +118,119 @@ def __init__(self, dbpool): self.dbpool = dbpool - def getNode(self, nodeIdentifier): - return self.dbpool.runInteraction(self._getNode, nodeIdentifier) - - def _getNode(self, cursor, nodeIdentifier): + def _buildNode(self, row): + """Build a note class from database result row""" configuration = {} - cursor.execute("""SELECT node_type, - persist_items, - deliver_payloads, - send_last_published_item, - access_model, - publish_model - FROM nodes - WHERE node=%s""", - (nodeIdentifier,)) - row = cursor.fetchone() if not row: raise error.NodeNotFound() - if row[0] == 'leaf': + if row[2] == 'leaf': configuration = { - 'pubsub#persist_items': row[1], - 'pubsub#deliver_payloads': row[2], - 'pubsub#send_last_published_item': row[3], - const.OPT_ACCESS_MODEL:row[4], - const.OPT_PUBLISH_MODEL:row[5], + 'pubsub#persist_items': row[3], + 'pubsub#deliver_payloads': row[4], + 'pubsub#send_last_published_item': row[5], + const.OPT_ACCESS_MODEL:row[6], + const.OPT_PUBLISH_MODEL:row[7], } - node = LeafNode(nodeIdentifier, configuration) + node = LeafNode(row[0], row[1], configuration) + node.dbpool = self.dbpool + return node + elif row[2] == 'collection': + configuration = { + 'pubsub#deliver_payloads': row[4], + 'pubsub#send_last_published_item': row[5], + const.OPT_ACCESS_MODEL: row[6], + const.OPT_PUBLISH_MODEL:row[7], + } + node = CollectionNode(row[0], row[1], configuration) node.dbpool = self.dbpool return node - elif row[0] == 'collection': - configuration = { - 'pubsub#deliver_payloads': row[2], - 'pubsub#send_last_published_item': row[3], - const.OPT_ACCESS_MODEL: row[4], - const.OPT_PUBLISH_MODEL:row[5], - } - node = CollectionNode(nodeIdentifier, configuration) - node.dbpool = self.dbpool - return node + else: + raise ValueError("Unknown node type !") + + def getNodeById(self, nodeDbId): + """Get node using database ID insted of pubsub identifier + + @param nodeDbId(unicode): database ID + """ + return self.dbpool.runInteraction(self._getNodeById, nodeDbId) + def _getNodeById(self, cursor, nodeDbId): + cursor.execute("""SELECT node_id, + node, + node_type, + persist_items, + deliver_payloads, + send_last_published_item, + access_model, + publish_model, + pep + FROM nodes + WHERE node_id=%s""", + (nodeDbId,)) + row = cursor.fetchone() + return self._buildNode(row) - def getNodeIds(self): - d = self.dbpool.runQuery("""SELECT node from nodes""") + def getNode(self, nodeIdentifier, pep, recipient=None): + return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient) + + + def _getNode(self, cursor, nodeIdentifier, pep, recipient): + cursor.execute(*withPEP("""SELECT node_id, + node, + node_type, + persist_items, + deliver_payloads, + send_last_published_item, + access_model, + publish_model, + pep + FROM nodes + WHERE node=%s""", + (nodeIdentifier,), pep, recipient)) + row = cursor.fetchone() + return self._buildNode(row) + + def getNodeIds(self, pep): + d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL""" + .format("NOT " if pep else "")) d.addCallback(lambda results: [r[0] for r in results]) return d - def createNode(self, nodeIdentifier, owner, config): + def createNode(self, nodeIdentifier, owner, config, pep, recipient=None): return self.dbpool.runInteraction(self._createNode, nodeIdentifier, - owner, config) + owner, config, pep, recipient) - def _createNode(self, cursor, nodeIdentifier, owner, config): + def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient): if config['pubsub#node_type'] != 'leaf': raise error.NoCollections() owner = owner.userhost() + try: cursor.execute("""INSERT INTO nodes (node, node_type, persist_items, - deliver_payloads, send_last_published_item, access_model, publish_model) + deliver_payloads, send_last_published_item, access_model, publish_model, pep) VALUES - (%s, 'leaf', %s, %s, %s, %s, %s)""", + (%s, 'leaf', %s, %s, %s, %s, %s, %s)""", (nodeIdentifier, config['pubsub#persist_items'], config['pubsub#deliver_payloads'], config['pubsub#send_last_published_item'], config[const.OPT_ACCESS_MODEL], config[const.OPT_PUBLISH_MODEL], + recipient.userhost() if pep else None ) ) except cursor._pool.dbapi.IntegrityError: raise error.NodeExists() - cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,)); + cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""", + (nodeIdentifier,), pep, recipient)); node_id = cursor.fetchone()[0] cursor.execute("""SELECT 1 as bool from entities where jid=%s""", @@ -210,39 +269,49 @@ cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) VALUES (%s,%s)""" , (node_id, group)) + def deleteNodeByDbId(self, db_id): + """Delete a node using directly its database id""" + return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id) - def deleteNode(self, nodeIdentifier): - return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) + def _deleteNodeByDbId(self, cursor, db_id): + cursor.execute("""DELETE FROM nodes WHERE node_id=%s""", + (db_id,)) + + if cursor.rowcount != 1: + raise error.NodeNotFound() + + def deleteNode(self, nodeIdentifier, pep, recipient=None): + return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient) - def _deleteNode(self, cursor, nodeIdentifier): - cursor.execute("""DELETE FROM nodes WHERE node=%s""", - (nodeIdentifier,)) + def _deleteNode(self, cursor, nodeIdentifier, pep, recipient): + cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""", + (nodeIdentifier,), pep, recipient)) if cursor.rowcount != 1: raise error.NodeNotFound() - def getNodeGroups(self, nodeIdentifier): - return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier) + def getNodeGroups(self, nodeIdentifier, pep, recipient=None): + return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier, pep, recipient) - def _getNodeGroups(self, cursor, nodeIdentifier): - cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", - (nodeIdentifier,)) + def _getNodeGroups(self, cursor, nodeIdentifier, pep, recipient): + cursor.execute(*withPEP("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", + (nodeIdentifier,), pep, recipient)) rows = cursor.fetchall() return [row[0] for row in rows] - def getAffiliations(self, entity): - d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities + def getAffiliations(self, entity, pep, recipient=None): + d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities NATURAL JOIN affiliations NATURAL JOIN nodes WHERE jid=%s""", - (entity.userhost(),)) + (entity.userhost(),), pep, recipient, 'nodes')) d.addCallback(lambda results: [tuple(r) for r in results]) return d - def getSubscriptions(self, entity): + def getSubscriptions(self, entity, pep, recipient=None): def toSubscriptions(rows): subscriptions = [] for row in rows: @@ -256,8 +325,8 @@ FROM entities NATURAL JOIN subscriptions NATURAL JOIN nodes - WHERE jid=%s""", - (entity.userhost(),)) + WHERE jid=%s AND nodes.pep=%s""", + (entity.userhost(), recipient.userhost() if pep else None)) d.addCallback(toSubscriptions) return d @@ -271,15 +340,16 @@ implements(iidavoll.INode) - def __init__(self, nodeIdentifier, config): + def __init__(self, nodeDbId, nodeIdentifier, config): + self.nodeDbId = nodeDbId self.nodeIdentifier = nodeIdentifier self._config = config self.owner = None; def _checkNodeExists(self, cursor): - cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", - (self.nodeIdentifier,)) + cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", + (self.nodeDbId,)) if not cursor.fetchone(): raise error.NodeNotFound() @@ -290,7 +360,7 @@ def getNodeOwner(self): if self.owner: return defer.succeed(self.owner) - d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node=%s""", (self.nodeIdentifier,)) + d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s""", (self.nodeDbId,)) d.addCallback(lambda result: jid.JID(result[0][0])) return d @@ -315,12 +385,16 @@ self._checkNodeExists(cursor) cursor.execute("""UPDATE nodes SET persist_items=%s, deliver_payloads=%s, - send_last_published_item=%s - WHERE node=%s""", - (config["pubsub#persist_items"], - config["pubsub#deliver_payloads"], - config["pubsub#send_last_published_item"], - self.nodeIdentifier)) + send_last_published_item=%s, + access_model=%s, + publish_model=%s + WHERE node_id=%s""", + (config[const.OPT_PERSIST_ITEMS], + config[const.OPT_DELIVER_PAYLOADS], + config[const.OPT_SEND_LAST_PUBLISHED_ITEM], + config[const.OPT_ACCESS_MODEL], + config[const.OPT_PUBLISH_MODEL], + self.nodeDbId)) def _setCachedConfiguration(self, void, config): @@ -342,8 +416,8 @@ cursor.execute("""SELECT affiliation FROM affiliations NATURAL JOIN nodes NATURAL JOIN entities - WHERE node=%s AND jid=%s""", - (self.nodeIdentifier, + WHERE node_id=%s AND jid=%s""", + (self.nodeDbId, entity.userhost())) try: @@ -351,19 +425,9 @@ except TypeError: return None + def getAccessModel(self): - return self.dbpool.runInteraction(self._getAccessModel) - - def _getAccessModel(self, cursor, entity): - self._checkNodeExists(cursor) - cursor.execute("""SELECT access_model FROM nodes - WHERE node=%s""", - (self.nodeIdentifier,)) - - try: - return cursor.fetchone()[0] - except TypeError: - return None + return self._config[const.OPT_ACCESS_MODEL] def getSubscription(self, subscriber): @@ -379,8 +443,8 @@ cursor.execute("""SELECT state FROM subscriptions NATURAL JOIN nodes NATURAL JOIN entities - WHERE node=%s AND jid=%s AND resource=%s""", - (self.nodeIdentifier, + WHERE node_id=%s AND jid=%s AND resource=%s""", + (self.nodeDbId, userhost, resource)) @@ -398,13 +462,13 @@ def _getSubscriptions(self, cursor, state): self._checkNodeExists(cursor) - query = """SELECT jid, resource, state, + query = """SELECT node, jid, resource, state, subscription_type, subscription_depth FROM subscriptions NATURAL JOIN nodes NATURAL JOIN entities - WHERE node=%s""" - values = [self.nodeIdentifier] + WHERE node_id=%s""" + values = [self.nodeDbId] if state: query += " AND state=%s" @@ -415,16 +479,16 @@ subscriptions = [] for row in rows: - subscriber = jid.JID(u'%s/%s' % (row[0], row[1])) + subscriber = jid.JID(u'%s/%s' % (row[1], row[2])) options = {} - if row[3]: - options['pubsub#subscription_type'] = row[3]; if row[4]: - options['pubsub#subscription_depth'] = row[4]; + options['pubsub#subscription_type'] = row[4]; + if row[5]: + options['pubsub#subscription_depth'] = row[5]; - subscriptions.append(Subscription(self.nodeIdentifier, subscriber, - row[2], options)) + subscriptions.append(Subscription(row[0], subscriber, + row[3], options)) return subscriptions @@ -453,17 +517,14 @@ cursor.execute("""INSERT INTO subscriptions (node_id, entity_id, resource, state, subscription_type, subscription_depth) - SELECT node_id, entity_id, %s, %s, %s, %s FROM - (SELECT node_id FROM nodes - WHERE node=%s) as n - CROSS JOIN + SELECT %s, entity_id, %s, %s, %s, %s FROM (SELECT entity_id FROM entities - WHERE jid=%s) as e""", - (resource, + WHERE jid=%s) AS ent_id""", + (self.nodeDbId, + resource, state, subscription_type, subscription_depth, - self.nodeIdentifier, userhost)) except cursor._pool.dbapi.IntegrityError: raise error.SubscriptionExists() @@ -481,12 +542,11 @@ resource = subscriber.resource or '' cursor.execute("""DELETE FROM subscriptions WHERE - node_id=(SELECT node_id FROM nodes - WHERE node=%s) AND + node_id=%s AND entity_id=(SELECT entity_id FROM entities WHERE jid=%s) AND resource=%s""", - (self.nodeIdentifier, + (self.nodeDbId, userhost, resource)) if cursor.rowcount != 1: @@ -506,9 +566,9 @@ NATURAL JOIN subscriptions NATURAL JOIN nodes WHERE entities.jid=%s - AND node=%s AND state='subscribed'""", + AND node_id=%s AND state='subscribed'""", (entity.userhost(), - self.nodeIdentifier)) + self.nodeDbId)) return cursor.fetchone() is not None @@ -523,8 +583,8 @@ cursor.execute("""SELECT jid, affiliation FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities - WHERE node=%s""", - (self.nodeIdentifier,)) + WHERE node_id=%s""", + (self.nodeDbId,)) result = cursor.fetchall() return [(jid.internJID(r[0]), r[1]) for r in result] @@ -541,36 +601,37 @@ return self.dbpool.runInteraction(self._storeItems, item_data, publisher) - def _storeItems(self, cursor, item_data, publisher): + def _storeItems(self, cursor, items_data, publisher): self._checkNodeExists(cursor) - for item_datum in item_data: - self._storeItem(cursor, item_datum, publisher) + for item_data in items_data: + self._storeItem(cursor, item_data, publisher) - def _storeItem(self, cursor, item_datum, publisher): - access_model, item_config, item = item_datum + def _storeItem(self, cursor, item_data, publisher): + item, access_model, item_config = item_data data = item.toXml() cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s FROM nodes WHERE nodes.node_id = items.node_id AND - nodes.node = %s and items.item=%s""", + nodes.node_id = %s and items.item=%s""", (publisher.full(), data, - self.nodeIdentifier, + self.nodeDbId, item["id"])) if cursor.rowcount == 1: return cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model) - SELECT node_id, %s, %s, %s, %s FROM nodes - WHERE node=%s + SELECT %s, %s, %s, %s, %s FROM nodes + WHERE node_id=%s RETURNING item_id""", - (item["id"], + (self.nodeDbId, + item["id"], publisher.full(), data, access_model, - self.nodeIdentifier)) + self.nodeDbId)) if access_model == const.VAL_AMODEL_ROSTER: item_id = cursor.fetchone()[0]; @@ -596,10 +657,9 @@ for itemIdentifier in itemIdentifiers: cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT node_id FROM nodes - WHERE node=%s) AND + node_id=%s AND item=%s""", - (self.nodeIdentifier, + (self.nodeDbId, itemIdentifier)) if cursor.rowcount: @@ -623,40 +683,44 @@ return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data) def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data): + # FIXME: simplify the query construction self._checkNodeExists(cursor) if unrestricted: query = ["SELECT data,items.access_model,item_id"] source = """FROM nodes INNER JOIN items USING (node_id) - WHERE node=%s""" - args = [self.nodeIdentifier] + WHERE node_id=%s""" + args = [self.nodeDbId] else: query = ["SELECT data"] groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else "" source = """FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) - WHERE node=%s AND + WHERE node_id=%s AND (items.access_model='open'""" + groups + ")" - args = [self.nodeIdentifier] + args = [self.nodeDbId] if authorized_groups: args.append(authorized_groups) if 'filters' in ext_data: # MAM filters for filter_ in ext_data['filters']: if filter_.var == 'start': - source += " AND date>='{date}'".format(date=filter_.value) + source += " AND date>=%s" + args.append(filter_.value) if filter_.var == 'end': - source += " AND date<='{date}'".format(date=filter_.value) + source += " AND date<=%s" + args.append(filter_.value) if filter_.var == 'with': jid_s = filter_.value if '/' in jid_s: - source += " AND publisher='{pub}'".format(pub=filter_.value) - else: # assume the publisher field in DB is always a full JID - # XXX: need to escape the % with itself to avoid formatting error - source += " AND publisher LIKE '{pub}/%%'".format(pub=filter_.value) + source += " AND publisher=%s" + args.append(filter_.value) + else: + source += " AND publisher LIKE %s" + args.append(u"{}%".format(filter_.value)) query.append(source) order = "DESC" @@ -666,7 +730,9 @@ maxItems = rsm.max if rsm.index is not None: query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)") - args.append(self.nodeIdentifier) + # FIXME: change the request so source is not used 2 times + # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args + args.append(self.nodeDbId) if authorized_groups: args.append(authorized_groups) args.append(rsm.index) @@ -694,11 +760,10 @@ item = generic.stripNamespace(parseXml(data[0])) access_model = data[1] item_id = data[2] - if access_model == 'roster': #TODO: jid access_model + access_list = {} + if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) - access_list = [r[0] for r in cursor.fetchall()] - else: - access_list = None + access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] ret.append((item, access_model, access_list)) return ret @@ -720,18 +785,18 @@ if unrestricted: query = ["""SELECT count(item_id) FROM nodes INNER JOIN items USING (node_id) - WHERE node=%s"""] - args = [self.nodeIdentifier] + WHERE node_id=%s"""] + args = [self.nodeDbId] else: query = ["""SELECT count(item_id) FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) - WHERE node=%s AND + WHERE node_id=%s AND (items.access_model='open' """ + ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")"] - args = [self.nodeIdentifier] + args = [self.nodeDbId] if authorized_groups: args.append(authorized_groups) @@ -755,22 +820,22 @@ query = ["""SELECT row_number FROM ( SELECT row_number() OVER (ORDER BY date DESC), item FROM nodes INNER JOIN items USING (node_id) - WHERE node=%s + WHERE node_id=%s ) as x WHERE item=%s LIMIT 1"""] - args = [self.nodeIdentifier] + args = [self.nodeDbId] else: query = ["""SELECT row_number FROM ( SELECT row_number() OVER (ORDER BY date DESC), item FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) - WHERE node=%s AND + WHERE node_id=%s AND (items.access_model='open' """ + ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + """)) as x WHERE item=%s LIMIT 1"""] - args = [self.nodeIdentifier] + args = [self.nodeDbId] if authorized_groups: args.append(authorized_groups) @@ -784,7 +849,8 @@ @param authorized_groups: we want to get items that these groups can access @param unrestricted: if true, don't check permissions @param itemIdentifiers: list of ids of the items we want to get - @return: list of (item, access_model, access_model) if unrestricted is True, else list of items + @return: list of (item, access_model, access_list) if unrestricted is True, else list of items + access_list is managed as a dictionnary with same key as for item_config """ return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) @@ -796,41 +862,66 @@ for itemIdentifier in itemIdentifiers: cursor.execute("""SELECT data,items.access_model,item_id FROM nodes INNER JOIN items USING (node_id) - WHERE node=%s AND item=%s""", - (self.nodeIdentifier, + WHERE node_id=%s AND item=%s""", + (self.nodeDbId, itemIdentifier)) result = cursor.fetchone() - if result: - for data in result: - item = generic.stripNamespace(parseXml(data[0])) - access_model = data[1] - item_id = data[2] - if access_model == 'roster': #TODO: jid access_model - cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) - access_list = [r[0] for r in cursor.fetchall()] - else: - access_list = None + if not result: + raise error.ItemNotFound() - ret.append((item, access_model, access_list)) + item = generic.stripNamespace(parseXml(result[0])) + access_model = result[1] + item_id = result[2] + access_list = {} + if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model + cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) + access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] + + ret.append((item, access_model, access_list)) else: #we check permission before returning items for itemIdentifier in itemIdentifiers: - args = [self.nodeIdentifier, itemIdentifier] + args = [self.nodeDbId, itemIdentifier] if authorized_groups: args.append(authorized_groups) cursor.execute("""SELECT data FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) - WHERE node=%s AND item=%s AND + WHERE node_id=%s AND item=%s AND (items.access_model='open' """ + ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", args) result = cursor.fetchone() if result: - ret.append(parseXml(result[0])) + ret.append(generic.stripNamespace(parseXml(result[0]))) return ret + + def getItemsPublishers(self, itemIdentifiers): + """Get the publishers for all given identifiers + + @return (dict): map of itemIdentifiers to publisher + """ + return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers) + + + def _getItemsPublishers(self, cursor, itemIdentifiers): + self._checkNodeExists(cursor) + ret = {} + for itemIdentifier in itemIdentifiers: + cursor.execute("""SELECT publisher FROM items + WHERE item=%s""", + (itemIdentifier,)) + result = cursor.fetchone() + if not result: + # We have an internal error, that's why we use ValueError + # and not error.ItemNotFound() + raise ValueError() # itemIdentifier must exists + ret[itemIdentifier] = jid.JID(result[0]) + return ret + + def purge(self): return self.dbpool.runInteraction(self._purge) @@ -839,23 +930,23 @@ self._checkNodeExists(cursor) cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT node_id FROM nodes WHERE node=%s)""", - (self.nodeIdentifier,)) + node_id=%s""", + (self.nodeDbId,)) - - def filterItemsWithPublisher(self, itemIdentifiers, requestor): - return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, requestor) + # FIXME: to be checked + # def filterItemsWithPublisher(self, itemIdentifiers, recipient): + # return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, recipient) - def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor): - self._checkNodeExists(cursor) - ret = [] - for itemIdentifier in itemIdentifiers: - args = ["%s/%%" % requestor.userhost(), itemIdentifier] - cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args) - result = cursor.fetchone() - if result: - ret.append(result[0]) - return ret + # def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor): + # self._checkNodeExists(cursor) + # ret = [] + # for itemIdentifier in itemIdentifiers: + # args = ["%s/%%" % requestor.userhost(), itemIdentifier] + # cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args) + # result = cursor.fetchone() + # if result: + # ret.append(result[0]) + # return ret class CollectionNode(Node):