changeset 294:df1edebb0466

PEP implementation, draft (huge patch sorry): /!\ database schema has changed ! /!\ - whole PEP behaviour is not managed yet - if the stanza is delegated, PEP is assumed - fixed potential SQL injection in pgsql_storage - publish notifications manage PEP - added retract notifications (if "notify" attribute is present), with PEP handling - a publisher can't replace an item he didn't publised anymore - /!\ schema has changed, sat_pubsub_update_0_1.sql update it - sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author Goffi <goffi@goffi.org>
date Sun, 16 Aug 2015 01:32:42 +0200
parents b96a4ac25f8b
children bed30cef11a8
files db/pubsub.sql db/sat_pubsub_update_0_1.sql sat_pubsub/__init__.py sat_pubsub/backend.py sat_pubsub/const.py sat_pubsub/error.py sat_pubsub/pgsql_storage.py
diffstat 7 files changed, 681 insertions(+), 371 deletions(-) [+]
line wrap: on
line diff
--- a/db/pubsub.sql	Sun Aug 16 01:15:13 2015 +0200
+++ b/db/pubsub.sql	Sun Aug 16 01:32:42 2015 +0200
@@ -5,7 +5,8 @@
 
 CREATE TABLE nodes (
     node_id serial PRIMARY KEY,
-    node text NOT NULL UNIQUE,
+    node text NOT NULL,
+	pep text,
     node_type text NOT NULL DEFAULT 'leaf'
         CHECK (node_type IN ('leaf', 'collection')),
     access_model text NOT NULL DEFAULT 'open'
@@ -15,9 +16,15 @@
     send_last_published_item text NOT NULL DEFAULT 'on_sub'
         CHECK (send_last_published_item IN ('never', 'on_sub')),
     publish_model text NOT NULL DEFAULT 'publishers'
-        CHECK (publish_model IN ('publishers', 'subscribers', 'open'))
+        CHECK (publish_model IN ('publishers', 'subscribers', 'open')),
+	UNIQUE (node, pep) WHERE pep IS NOT NULL,
+	UNIQUE (node) WHERE pep IS NULL
 );
 
+/* we need 2 partial indexes to manage NULL value for PEP */
+CREATE UNIQUE INDEX nodes_node_pep_key_not_null ON nodes(node, pep) WHERE pep IS NOT NULL;
+CREATE UNIQUE INDEX nodes_node_pep_key_null ON nodes(node) WHERE pep IS NULL;
+
 INSERT INTO nodes (node, node_type) values ('', 'collection');
 
 CREATE TABLE affiliations (
@@ -68,3 +75,9 @@
     UNIQUE (item_id,groupname)
 );
 
+CREATE TABLE metadata (
+	key text PRIMARY KEY,
+	value text
+);
+
+INSERT INTO metadata VALUES ('version', '1');
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/db/sat_pubsub_update_0_1.sql	Sun Aug 16 01:32:42 2015 +0200
@@ -0,0 +1,25 @@
+ALTER TABLE nodes ADD COLUMN pep text;
+
+ALTER TABLE nodes DROP CONSTRAINT nodes_node_key;
+/* we need 2 partial indexes to manage NULL value for PEP */
+CREATE UNIQUE INDEX nodes_node_pep_key_not_null ON nodes(node, pep) WHERE pep IS NOT NULL;
+CREATE UNIQUE INDEX nodes_node_pep_key_null ON nodes(node) WHERE pep IS NULL;
+
+CREATE TABLE metadata (
+	key text PRIMARY KEY,
+	value text
+);
+
+INSERT INTO metadata VALUES ('version', '1');
+
+UPDATE nodes SET node='urn:xmpp:microblog:0', pep=substring(node from 20) WHERE node LIKE 'urn:xmpp:groupblog:_%';
+
+/* This is to update namespaces, SàT was bugguy before 0.6 and didn't set the atom namespace in <entry/> */
+/* But yeah, this is a crazy query */
+UPDATE items SET data = xmlelement(name item, xmlattributes((xpath('/item/@id', data::xml))[1] as id),
+                        XMLPARSE(CONTENT NULLIF(array_to_string(xpath('/item/entry/preceding-sibling::*', data::xml)::text[],''),'')),
+                        xmlelement(name entry, xmlattributes('http://www.w3.org/2005/Atom' as xmlns), array_to_string(xpath('/item/entry/*', data::xml)::text[], '')::xml),
+                        XMLPARSE(CONTENT NULLIF(array_to_string(xpath('/item/entry/following-sibling::*', data::xml)::text[],''),'')))
+             FROM nodes WHERE nodes.node_id = items.node_id
+             AND node = 'urn:xmpp:microblog:0'
+             AND XMLEXISTS('/item/entry' PASSING (data::xml));
--- a/sat_pubsub/__init__.py	Sun Aug 16 01:15:13 2015 +0200
+++ b/sat_pubsub/__init__.py	Sun Aug 16 01:32:42 2015 +0200
@@ -60,6 +60,7 @@
 
 # TODO: remove this when RSM and MAM are in wokkel
 import wokkel
+from wokkel import pubsub, delay
 from sat.tmp.wokkel import delay as tmp_delay, pubsub as tmp_pubsub, rsm as tmp_rsm, mam as tmp_mam
 wokkel.delay = tmp_delay
 wokkel.pubsub = tmp_pubsub
--- a/sat_pubsub/backend.py	Sun Aug 16 01:15:13 2015 +0200
+++ b/sat_pubsub/backend.py	Sun Aug 16 01:32:42 2015 +0200
@@ -70,7 +70,7 @@
 from twisted.python import components, log
 from twisted.internet import defer, reactor
 from twisted.words.protocols.jabber.error import StanzaError
-from twisted.words.protocols.jabber.jid import JID, InvalidFormat
+# from twisted.words.protocols.jabber.jid import JID, InvalidFormat
 from twisted.words.xish import utility
 
 from wokkel import disco, data_form, rsm
@@ -103,13 +103,13 @@
     implements(iidavoll.IBackendService)
 
     nodeOptions = {
-            "pubsub#persist_items":
+            const.OPT_PERSIST_ITEMS:
                 {"type": "boolean",
                  "label": "Persist items to storage"},
-            "pubsub#deliver_payloads":
+            const.OPT_DELIVER_PAYLOADS:
                 {"type": "boolean",
                  "label": "Deliver payloads with event notifications"},
-            "pubsub#send_last_published_item":
+            const.OPT_SEND_LAST_PUBLISHED_ITEM:
                 {"type": "list-single",
                  "label": "When to send the last published item",
                  "options": {
@@ -181,18 +181,20 @@
         return True
 
 
-    def getNodeType(self, nodeIdentifier):
-        d = self.storage.getNode(nodeIdentifier)
+    def getNodeType(self, nodeIdentifier, pep, recipient=None):
+        # FIXME: manage pep and recipient
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getType())
         return d
 
 
-    def getNodes(self):
-        return self.storage.getNodeIds()
+    def getNodes(self, pep):
+        return self.storage.getNodeIds(pep)
 
 
-    def getNodeMetaData(self, nodeIdentifier):
-        d = self.storage.getNode(nodeIdentifier)
+    def getNodeMetaData(self, nodeIdentifier, pep, recipient=None):
+        # FIXME: manage pep and recipient
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getMetaData())
         d.addCallback(self._makeMetaData)
         return d
@@ -214,14 +216,13 @@
         """ Check authorisation of publishing in node for requestor """
 
         def check(affiliation):
-            d = defer.succeed(node)
+            d = defer.succeed((affiliation, node))
             configuration = node.getConfiguration()
             publish_model = configuration[const.OPT_PUBLISH_MODEL]
-
-            if (publish_model == const.VAL_PMODEL_PUBLISHERS):
+            if publish_model == const.VAL_PMODEL_PUBLISHERS:
                 if affiliation not in ['owner', 'publisher']:
                     raise error.Forbidden()
-            elif (publish_model == const.VAL_PMODEL_SUBSCRIBERS):
+            elif publish_model == const.VAL_PMODEL_SUBSCRIBERS:
                 if affiliation not in ['owner', 'publisher']:
                     # we are in subscribers publish model, we must check that
                     # the requestor is a subscriber to allow him to publish
@@ -229,12 +230,12 @@
                     def checkSubscription(subscribed):
                         if not subscribed:
                             raise error.Forbidden()
-                        return node
+                        return (affiliation, node)
 
                     d.addCallback(lambda ignore: node.isSubscribed(requestor))
                     d.addCallback(checkSubscription)
             elif publish_model != const.VAL_PMODEL_OPEN:
-                raise Exception('Unexpected value') # publish_model must be publishers (default), subscribers or open.
+                raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open.
 
             return d
 
@@ -246,6 +247,7 @@
         """Get and remove item configuration information
         @param item:
         """
+        # FIXME: dirty ! Need to use elements()
         item_config = None
         access_model = const.VAL_AMODEL_DEFAULT
         for i in range(len(item.children)):
@@ -264,60 +266,87 @@
         return (access_model, item_config)
 
 
-    def publish(self, nodeIdentifier, items, requestor):
-        d = self.storage.getNode(nodeIdentifier)
+    def _checkOverwrite(self, node, itemIdentifiers, publisher):
+        """Check that the itemIdentifiers correspond to items published
+        by the current publisher"""
+        def doCheck(item_pub_map):
+            for item_publisher in item_pub_map.iterValues():
+                if item_publisher.userhost() != publisher.userhost():
+                    raise error.ItemForbidden()
+
+        d = node.getItemsPublishers(itemIdentifiers)
+        d.addCallback(doCheck)
+        return d
+
+
+    def publish(self, nodeIdentifier, items, requestor, pep, recipient):
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(self._checkAuth, requestor)
         #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster.
         #FIXME: in addition, there can be several owners: that is not managed yet
-        d.addCallback(self._doPublish, items, requestor)
+        d.addCallback(self._doPublish, items, requestor, pep, recipient)
         return d
 
 
-    def _doPublish(self, node, items, requestor):
+    def _doPublish(self, result, items, requestor, pep, recipient):
+        affiliation, node = result
         if node.nodeType == 'collection':
             raise error.NoPublishing()
 
         configuration = node.getConfiguration()
-        persistItems = configuration["pubsub#persist_items"]
-        deliverPayloads = configuration["pubsub#deliver_payloads"]
+        persistItems = configuration[const.OPT_PERSIST_ITEMS]
+        deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS]
 
         if items and not persistItems and not deliverPayloads:
             raise error.ItemForbidden()
         elif not items and (persistItems or deliverPayloads):
             raise error.ItemRequired()
 
-        parsed_items = []
+        items_data = []
+        check_overwrite = False
         for item in items:
             if persistItems or deliverPayloads:
                 item.uri = None
                 item.defaultUri = None
                 if not item.getAttribute("id"):
                     item["id"] = str(uuid.uuid4())
+                else:
+                    check_overwrite = True
             access_model, item_config = self.parseItemConfig(item)
-            parsed_items.append((access_model, item_config, item))
+            items_data.append((item, access_model, item_config))
 
         if persistItems:
-            d = node.storeItems(parsed_items, requestor)
+            if check_overwrite and affiliation != 'owner':
+                # we don't want a publisher to overwrite the item
+                # of an other publisher
+                d = self._checkOverwrite(node, [item['id'] for item in items if item.getAttribute('id')], requestor)
+                d.addCallback(lambda _: node.storeItems(items_data, requestor))
+            else:
+                d = node.storeItems(items_data, requestor)
         else:
             d = defer.succeed(None)
 
-        d.addCallback(self._doNotify, node, parsed_items,
-                      deliverPayloads)
+        d.addCallback(self._doNotify, node, items_data,
+                      deliverPayloads, pep, recipient)
         return d
 
 
-    def _doNotify(self, result, node, items, deliverPayloads):
-        if items and not deliverPayloads:
-            for access_model, item_config, item in items:
+    def _doNotify(self, result, node, items_data, deliverPayloads, pep, recipient):
+        if items_data and not deliverPayloads:
+            for access_model, item_config, item in items_data:
                 item.children = []
-
-        self.dispatch({'items': items, 'node': node},
+        self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient},
                       '//event/pubsub/notify')
 
 
-    def getNotifications(self, nodeIdentifier, items):
+    def getNotifications(self, nodeDbId, items_data):
+        """Build a list of subscriber to the node
 
-        def toNotifications(subscriptions, nodeIdentifier, items):
+        subscribers will be associated with subscribed items,
+        and subscription type.
+        """
+
+        def toNotifications(subscriptions, items_data):
             subsBySubscriber = {}
             for subscription in subscriptions:
                 if subscription.options.get('pubsub#subscription_type',
@@ -326,7 +355,7 @@
                                                        set())
                     subs.add(subscription)
 
-            notifications = [(subscriber, subscriptions_, items)
+            notifications = [(subscriber, subscriptions_, items_data)
                              for subscriber, subscriptions_
                              in subsBySubscriber.iteritems()]
 
@@ -336,37 +365,46 @@
             failure.trap(error.NodeNotFound)
             return []
 
-        d1 = self.storage.getNode(nodeIdentifier)
+        d1 = self.storage.getNodeById(nodeDbId)
         d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
-        d2 = self.storage.getNode('')
-        d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
-        d2.addErrback(rootNotFound)
-        d = defer.gatherResults([d1, d2])
-        d.addCallback(lambda result: result[0] + result[1])
-        d.addCallback(toNotifications, nodeIdentifier, items)
-        return d
+        # FIXME: must add root node subscriptions ?
+        # d2 = self.storage.getNode('', False) # FIXME: to check
+        # d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        # d2.addErrback(rootNotFound)
+        # d = defer.gatherResults([d1, d2])
+        # d.addCallback(lambda result: result[0] + result[1])
+        d1.addCallback(toNotifications, items_data)
+        return d1
 
-    def registerNotifier(self, observerfn, *args, **kwargs):
+    def registerPublishNotifier(self, observerfn, *args, **kwargs):
         self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
 
-    def subscribe(self, nodeIdentifier, subscriber, requestor):
+    def registerRetractNotifier(self, observerfn, *args, **kwargs):
+        self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs)
+
+    def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
         subscriberEntity = subscriber.userhostJID()
         if subscriberEntity != requestor.userhostJID():
             return defer.fail(error.Forbidden())
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, subscriberEntity)
         d.addCallback(self._doSubscribe, subscriber)
         return d
 
 
     def _doSubscribe(self, result, subscriber):
+        # TODO: implement other access models
         node, affiliation = result
-        #FIXME: must check node's access_model before subscribing
 
         if affiliation == 'outcast':
             raise error.Forbidden()
 
+        access_model = node.getAccessModel()
+
+        if access_model != const.VAL_AMODEL_OPEN:
+            raise NotImplementedError
+
         def trapExists(failure):
             failure.trap(error.SubscriptionExists)
             return False
@@ -380,6 +418,7 @@
         d = node.addSubscription(subscriber, 'subscribed', {})
         d.addCallbacks(lambda _: True, trapExists)
         d.addCallback(cb)
+
         return d
 
 
@@ -406,11 +445,11 @@
         return subscription
 
 
-    def unsubscribe(self, nodeIdentifier, subscriber, requestor):
+    def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
         if subscriber.userhostJID() != requestor.userhostJID():
             return defer.fail(error.Forbidden())
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.removeSubscription(subscriber))
         return d
 
@@ -428,33 +467,33 @@
         return True
 
 
-    def createNode(self, nodeIdentifier, requestor, options = None):
+    def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None):
         if not nodeIdentifier:
             nodeIdentifier = 'generic/%s' % uuid.uuid4()
 
         if not options:
             options = {}
 
-        if self.supportsCreatorCheck():
-            groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX)
-            try:
-                nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier)
-            except InvalidFormat:
-                is_user_jid = False
-            else:
-                is_user_jid = bool(nodeIdentifierJID.user)
+        # if self.supportsCreatorCheck():
+        #     groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX)
+        #     try:
+        #         nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier)
+        #     except InvalidFormat:
+        #         is_user_jid = False
+        #     else:
+        #         is_user_jid = bool(nodeIdentifierJID.user)
 
-            if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID():
-                #we have an user jid node, but not created by the owner of this jid
-                print "Wrong creator"
-                raise error.Forbidden()
+        #     if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID():
+        #         #we have an user jid node, but not created by the owner of this jid
+        #         print "Wrong creator"
+        #         raise error.Forbidden()
 
         nodeType = 'leaf'
         config = self.storage.getDefaultConfiguration(nodeType)
         config['pubsub#node_type'] = nodeType
         config.update(options)
 
-        d = self.storage.createNode(nodeIdentifier, requestor, config)
+        d = self.storage.createNode(nodeIdentifier, requestor, config, pep, recipient)
         d.addCallback(lambda _: nodeIdentifier)
         return d
 
@@ -464,21 +503,21 @@
         return d
 
 
-    def getNodeConfiguration(self, nodeIdentifier):
+    def getNodeConfiguration(self, nodeIdentifier, pep, recipient):
         if not nodeIdentifier:
             return defer.fail(error.NoRootNode())
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getConfiguration())
 
         return d
 
 
-    def setNodeConfiguration(self, nodeIdentifier, options, requestor):
+    def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient):
         if not nodeIdentifier:
             return defer.fail(error.NoRootNode())
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, requestor)
         d.addCallback(self._doSetNodeConfiguration, options)
         return d
@@ -497,13 +536,13 @@
         return self.storage.getAffiliations(entity)
 
 
-    def getItems(self, nodeIdentifier, requestor, maxItems=None,
+    def getItems(self, nodeIdentifier, recipient, maxItems=None,
                        itemIdentifiers=None, ext_data=None):
         if ext_data is None:
             ext_data = {}
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doGetItems, requestor, maxItems, itemIdentifiers,
+        d = self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
+        d.addCallback(_getAffiliation, recipient)
+        d.addCallback(self._doGetItems, recipient, maxItems, itemIdentifiers,
                       ext_data)
         return d
 
@@ -545,7 +584,7 @@
                 elif access_model == const.VAL_AMODEL_ROSTER:
                     form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG)
                     access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER)
-                    allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list)
+                    allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list[const.OPT_ROSTER_GROUPS_ALLOWED])
                     form.addField(access)
                     form.addField(allowed)
                     item.addChild(form.toElement())
@@ -590,15 +629,15 @@
         if affiliation == 'outcast':
             raise error.Forbidden()
 
-        access_model = node.getConfiguration()["pubsub#access_model"]
+        access_model = node.getAccessModel()
         d = node.getNodeOwner()
         d.addCallback(self.privilege.getRoster)
         d.addErrback(self._rosterEb)
 
-        if access_model == 'open' or affiliation == 'owner':
+        if access_model == const.VAL_AMODEL_OPEN or affiliation == 'owner':
             d.addCallback(lambda roster: (True, roster))
             d.addCallback(access_checked)
-        elif access_model == 'roster':
+        elif access_model == const.VAL_AMODEL_ROSTER:
             d.addCallback(self._getNodeGroups,node.nodeIdentifier)
             d.addCallback(self.checkGroup, requestor)
             d.addCallback(access_checked)
@@ -650,36 +689,39 @@
 
         return defer.DeferredList(d_list).addCallback(render)
 
-    def retractItem(self, nodeIdentifier, itemIdentifiers, requestor):
-        d = self.storage.getNode(nodeIdentifier)
+    def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, requestor)
-        if const.FLAG_RETRACT_ALLOW_PUBLISHER:
-            d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor)
-        else:
-            d.addCallback(self._doRetract, itemIdentifiers)
+        # FIXME: to be checked
+        # if const.FLAG_RETRACT_ALLOW_PUBLISHER:
+        #     d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor)
+        # else:
+        #     d.addCallback(self._doRetract, itemIdentifiers)
+        d.addCallback(self._doRetract, itemIdentifiers, notify, pep, recipient)
         return d
 
-    def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor):
-        """This method has been added to allow the publisher
-        of an item to retract it, even if he has no affiliation
-        to that item. For instance, this allows you to delete
-        an item you posted in a node of "open" publish model.
-        """
+    # FIXME: to be checked
+    # def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor):
+    #     """This method has been added to allow the publisher
+    #     of an item to retract it, even if he has no affiliation
+    #     to that item. For instance, this allows you to delete
+    #     an item you posted in a node of "open" publish model.
+    #     """
+    #     node, affiliation = result
+    #     if affiliation in ['owner', 'publisher']:
+    #         return self._doRetract(result, itemIdentifiers)
+    #     d = node.filterItemsWithPublisher(itemIdentifiers, requestor)
+    #     def filterCb(filteredItems):
+    #         if not filteredItems:
+    #             return self._doRetract(result, itemIdentifiers)
+    #         # XXX: fake an affiliation that does NOT exist
+    #         return self._doRetract((node, 'publisher'), filteredItems)
+    #     d.addCallback(filterCb)
+    #     return d
+
+    def _doRetract(self, result, itemIdentifiers, notify, pep, recipient):
         node, affiliation = result
-        if affiliation in ['owner', 'publisher']:
-            return self._doRetract(result, itemIdentifiers)
-        d = node.filterItemsWithPublisher(itemIdentifiers, requestor)
-        def filterCb(filteredItems):
-            if not filteredItems:
-                return self._doRetract(result, itemIdentifiers)
-            # XXX: fake an affiliation that does NOT exist
-            return self._doRetract((node, 'publisher'), filteredItems)
-        d.addCallback(filterCb)
-        return d
-
-    def _doRetract(self, result, itemIdentifiers):
-        node, affiliation = result
-        persistItems = node.getConfiguration()["pubsub#persist_items"]
+        persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
 
         if affiliation not in ['owner', 'publisher']:
             raise error.Forbidden()
@@ -687,19 +729,32 @@
         if not persistItems:
             raise error.NodeNotPersistent()
 
-        d = node.removeItems(itemIdentifiers)
-        d.addCallback(self._doNotifyRetraction, node.nodeIdentifier)
+        # we need to get the items before removing them, for the notifications
+
+        def removeItems(items_data):
+            """Remove the items and keep only actually removed ones in items_data"""
+            d = node.removeItems(itemIdentifiers)
+            d.addCallback(lambda removed: [item_data for item_data in items_data if item_data[0]["id"] in removed])
+            return d
+
+        d = node.getItemsById(None, True, itemIdentifiers)
+        d.addCallback(removeItems)
+
+        if notify:
+            d.addCallback(self._doNotifyRetraction, node, pep, recipient)
         return d
 
 
-    def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier):
-        self.dispatch({'itemIdentifiers': itemIdentifiers,
-                       'nodeIdentifier': nodeIdentifier },
+    def _doNotifyRetraction(self, items_data, node, pep, recipient):
+        self.dispatch({'items_data': items_data,
+                       'node': node,
+                       'pep': pep,
+                       'recipient': recipient},
                       '//event/pubsub/retract')
 
 
-    def purgeNode(self, nodeIdentifier, requestor):
-        d = self.storage.getNode(nodeIdentifier)
+    def purgeNode(self, nodeIdentifier, requestor, pep, recipient):
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, requestor)
         d.addCallback(self._doPurge)
         return d
@@ -707,7 +762,7 @@
 
     def _doPurge(self, result):
         node, affiliation = result
-        persistItems = node.getConfiguration()["pubsub#persist_items"]
+        persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
 
         if affiliation != 'owner':
             raise error.Forbidden()
@@ -728,24 +783,24 @@
         self._callbackList.append(preDeleteFn)
 
 
-    def getSubscribers(self, nodeIdentifier):
+    def getSubscribers(self, nodeIdentifier, pep, recipient):
         def cb(subscriptions):
             return [subscription.subscriber for subscription in subscriptions]
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getSubscriptions('subscribed'))
         d.addCallback(cb)
         return d
 
 
-    def deleteNode(self, nodeIdentifier, requestor, redirectURI=None):
-        d = self.storage.getNode(nodeIdentifier)
+    def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None):
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doPreDelete, redirectURI)
+        d.addCallback(self._doPreDelete, redirectURI, pep, recipient)
         return d
 
 
-    def _doPreDelete(self, result, redirectURI):
+    def _doPreDelete(self, result, redirectURI, pep, recipient):
         node, affiliation = result
 
         if affiliation != 'owner':
@@ -754,19 +809,19 @@
         data = {'node': node,
                 'redirectURI': redirectURI}
 
-        d = defer.DeferredList([cb(data)
+        d = defer.DeferredList([cb(data, pep, recipient)
                                 for cb in self._callbackList],
                                consumeErrors=1)
-        d.addCallback(self._doDelete, node.nodeIdentifier)
+        d.addCallback(self._doDelete, node.nodeDbId)
 
 
-    def _doDelete(self, result, nodeIdentifier):
+    def _doDelete(self, result, nodeDbId):
         dl = []
         for succeeded, r in result:
             if succeeded and r:
                 dl.extend(r)
 
-        d = self.storage.deleteNode(nodeIdentifier)
+        d = self.storage.deleteNodeByDbId(nodeDbId)
         d.addCallback(self._doNotifyDelete, dl)
 
         return d
@@ -812,6 +867,7 @@
         error.Forbidden: ('forbidden', None, None),
         error.NotAuthorized: ('not-authorized', None, None),
         error.NotInRoster: ('not-authorized', 'not-in-roster-group', None),
+        error.ItemNotFound: ('item-not-found', None, None),
         error.ItemForbidden: ('bad-request', 'item-forbidden', None),
         error.ItemRequired: ('bad-request', 'item-required', None),
         error.NoInstantNodes: ('not-acceptable',
@@ -838,11 +894,13 @@
         self.backend = backend
         self.hideNodes = False
 
-        self.backend.registerNotifier(self._notify)
+        self.backend.registerPublishNotifier(self._notifyPublish)
+        self.backend.registerRetractNotifier(self._notifyRetract)
         self.backend.registerPreDelete(self._preDelete)
 
-        if self.backend.supportsCreatorCheck():
-            self.features.append("creator-jid-check")  #SàT custom feature: Check that a node (which correspond to
+        # FIXME: to be removed, it's not useful anymore as PEP is now used
+        # if self.backend.supportsCreatorCheck():
+        #     self.features.append("creator-jid-check")  #SàT custom feature: Check that a node (which correspond to
                                                        #                    a jid in this server) is created by the right jid
 
         if self.backend.supportsAutoCreate():
@@ -866,39 +924,14 @@
         # if self.backend.supportsPublishModel():       #XXX: this feature is not really described in XEP-0060, we just can see it in examples
         #     self.features.append("publish_model")     #     but it's necessary for microblogging comments (see XEP-0277)
 
-    def _notify(self, data):
-        items = data['items']
+    def _notifyPublish(self, data):
+        items_data = data['items_data']
         node = data['node']
-
-        def _notifyAllowed(result):
-            """Check access of subscriber for each item,
-            and notify only allowed ones"""
-            notifications, (owner_jid,roster) = result
-
-            #we filter items not allowed for the subscribers
-            notifications_filtered = []
-
-            for subscriber, subscriptions, _items in notifications:
-                allowed_items = [] #we keep only item which subscriber can access
+        pep = data['pep']
+        recipient = data['recipient']
 
-                for access_model, item_config, item in _items:
-                    if access_model == 'open':
-                        allowed_items.append(item)
-                    elif access_model == 'roster':
-                        _subscriber = subscriber.userhostJID()
-                        if not _subscriber in roster:
-                            continue
-                        #the subscriber is known, is he in the right group ?
-                        authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED]
-                        if roster[_subscriber].groups.intersection(authorized_groups):
-                            allowed_items.append(item)
-
-                    else: #unknown access_model
-                        raise NotImplementedError
-
-                if allowed_items:
-                    notifications_filtered.append((subscriber, subscriptions, allowed_items))
-
+        def afterPrepare(result):
+            owner_jid, notifications_filtered = result
             #we notify the owner
             #FIXME: check if this comply with XEP-0060 (option needed ?)
             #TODO: item's access model have to be sent back to owner
@@ -910,7 +943,7 @@
                 """
                 #TODO: a test should check that only the owner get the item configuration back
 
-                access_model, item_config, item = item_data
+                item, access_model, item_config = item_data
                 new_item = deepcopy(item)
                 if item_config:
                     new_item.addChild(item_config.toElement())
@@ -920,37 +953,136 @@
                                            set([Subscription(node.nodeIdentifier,
                                                             owner_jid,
                                                             'subscribed')]),
-                                           [getFullItem(item_data) for item_data in items]))
+                                           [getFullItem(item_data) for item_data in items_data]))
+
+            if pep:
+                return self.backend.privilege.notifyPublish(
+                    recipient,
+                    node.nodeIdentifier,
+                    notifications_filtered)
+
+            else:
+                return self.pubsubService.notifyPublish(
+                    self.serviceJID,
+                    node.nodeIdentifier,
+                    notifications_filtered)
+
+        d = self._prepareNotify(items_data, node, data.get('subscription'))
+        d.addCallback(afterPrepare)
+        return d
+
+    def _notifyRetract(self, data):
+        items_data = data['items_data']
+        node = data['node']
+        pep = data['pep']
+        recipient = data['recipient']
 
-            return self.pubsubService.notifyPublish(
-                                                self.serviceJID,
-                                                node.nodeIdentifier,
-                                                notifications_filtered)
+        def afterPrepare(result):
+            owner_jid, notifications_filtered = result
+            #we add the owner
+
+            notifications_filtered.append((owner_jid,
+                                           set([Subscription(node.nodeIdentifier,
+                                                            owner_jid,
+                                                            'subscribed')]),
+                                           [item for item, _, _ in items_data]))
+
+            if pep:
+                return self.backend.privilege.notifyRetract(
+                    recipient,
+                    node.nodeIdentifier,
+                    notifications_filtered)
+
+            else:
+                return self.pubsubService.notifyRetract(
+                    self.serviceJID,
+                    node.nodeIdentifier,
+                    notifications_filtered)
+
+        d = self._prepareNotify(items_data, node, data.get('subscription'))
+        d.addCallback(afterPrepare)
+        return d
 
 
-        if 'subscription' not in data:
-            d1 = self.backend.getNotifications(node.nodeIdentifier, items)
+    def _prepareNotify(self, items_data, node, subscription=None):
+        """Do a bunch of permissions check and filter notifications
+
+        The owner is not added to these notifications,
+        it must be called by the calling method
+        @param items_data(tuple): must contain:
+            - item (domish.Element)
+            - access_model (unicode)
+            - access_list (dict as returned getItemsById, or item_config)
+        @param node(LeafNode): node hosting the items
+        @param subscription(pubsub.Subscription, None): TODO
+
+        @return (tuple): will contain:
+            - notifications_filtered
+            - node_owner_jid
+            - items_data
+        """
+
+        def filterNotifications(result):
+            """Check access of subscriber for each item, and keep only allowed ones"""
+            notifications, (owner_jid,roster) = result
+
+            #we filter items not allowed for the subscribers
+            notifications_filtered = []
+
+            for subscriber, subscriptions, _items_data in notifications:
+                if subscriber == owner_jid:
+                    # as notification is always sent to owner,
+                    # we ignore owner if he is here
+                    continue
+                allowed_items = [] #we keep only item which subscriber can access
+
+                for item, access_model, access_list in _items_data:
+                    if access_model == const.VAL_AMODEL_OPEN:
+                        allowed_items.append(item)
+                    elif access_model == const.VAL_AMODEL_ROSTER:
+                        _subscriber = subscriber.userhostJID()
+                        if not _subscriber in roster:
+                            continue
+                        #the subscriber is known, is he in the right group ?
+                        authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED]
+                        if roster[_subscriber].groups.intersection(authorized_groups):
+                            allowed_items.append(item)
+
+                    else: #unknown access_model
+                        raise NotImplementedError
+
+                if allowed_items:
+                    notifications_filtered.append((subscriber, subscriptions, allowed_items))
+            return (owner_jid, notifications_filtered)
+
+
+        if subscription is None:
+            d1 = self.backend.getNotifications(node.nodeDbId, items_data)
         else:
-            subscription = data['subscription']
             d1 = defer.succeed([(subscription.subscriber, [subscription],
-                                items)])
+                                items_data)])
 
         def _got_owner(owner_jid):
             #return a tuple with owner_jid and roster
+            def rosterEb(failure):
+                log.msg("Error while getting roster: {}".format(failure.value))
+                return (owner_jid, {})
+
             d = self.backend.privilege.getRoster(owner_jid)
-            d.addErrback(self._rosterEb)
+            d.addErrback(rosterEb)
             d.addCallback(lambda roster: (owner_jid,roster))
+            return d
 
         d2 = node.getNodeOwner()
         d2.addCallback(_got_owner)
+        d = defer.gatherResults([d1, d2])
+        d.addCallback(filterNotifications)
+        return d
 
-        d = defer.gatherResults([d1, d2])
-        d.addCallback(_notifyAllowed)
-
-    def _preDelete(self, data):
+    def _preDelete(self, data, pep, recipient):
         nodeIdentifier = data['node'].nodeIdentifier
         redirectURI = data.get('redirectURI', None)
-        d = self.backend.getSubscribers(nodeIdentifier)
+        d = self.backend.getSubscribers(nodeIdentifier, pep, recipient)
         d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
                                                 self.serviceJID,
                                                 nodeIdentifier,
@@ -972,7 +1104,8 @@
 
         raise exc
 
-    def getInfo(self, requestor, service, nodeIdentifier):
+    def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None):
+        return [] # FIXME: disabled for now, need to manage PEP
         if not requestor.resource:
             # this avoid error when getting a disco request from server during namespace delegation
             return []
@@ -1000,10 +1133,11 @@
         return d
 
 
-    def getNodes(self, requestor, service, nodeIdentifier):
+    def getNodes(self, requestor, service, nodeIdentifier, pep=None):
+        return defer.succeed([]) # FIXME: disabled for now, need to manage PEP
         if service.resource:
             return defer.succeed([])
-        d = self.backend.getNodes()
+        d = self.backend.getNodes(pep)
         return d.addErrback(self._mapErrors)
 
 
@@ -1014,20 +1148,33 @@
         if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
             print "Auto-creating node %s" % (request.nodeIdentifier,)
             d = self.backend.createNode(request.nodeIdentifier,
-                                        request.sender)
+                                        request.sender,
+                                        pep=self._isPep(request),
+                                        recipient=request.recipient)
             d.addCallback(lambda ignore,
                                  request: self.backend.publish(request.nodeIdentifier,
                                                                request.items,
-                                                               request.sender),
+                                                               request.sender,
+                                                               self._isPep(request),
+                                                               request.recipient,
+                                                              ),
                           request)
             return d
 
         return failure
 
+    def _isPep(self, request):
+        try:
+            return request.delegated
+        except AttributeError:
+            return False
+
     def publish(self, request):
         d = self.backend.publish(request.nodeIdentifier,
                                  request.items,
-                                 request.sender)
+                                 request.sender,
+                                 self._isPep(request),
+                                 request.recipient)
         d.addErrback(self._publish_errb, request)
         return d.addErrback(self._mapErrors)
 
@@ -1035,56 +1182,74 @@
     def subscribe(self, request):
         d = self.backend.subscribe(request.nodeIdentifier,
                                    request.subscriber,
-                                   request.sender)
+                                   request.sender,
+                                   self._isPep(request),
+                                   request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def unsubscribe(self, request):
         d = self.backend.unsubscribe(request.nodeIdentifier,
                                      request.subscriber,
-                                     request.sender)
+                                     request.sender,
+                                     self._isPep(request),
+                                     request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def subscriptions(self, request):
-        d = self.backend.getSubscriptions(request.sender)
+        d = self.backend.getSubscriptions(self._isPep(request),
+                                          request.sender)
         return d.addErrback(self._mapErrors)
 
 
     def affiliations(self, request):
-        d = self.backend.getAffiliations(request.sender)
+        d = self.backend.getAffiliations(self._isPep(request),
+                                         request.sender)
         return d.addErrback(self._mapErrors)
 
 
     def create(self, request):
         d = self.backend.createNode(request.nodeIdentifier,
-                                    request.sender, request.options)
+                                    request.sender, request.options,
+                                    self._isPep(request),
+                                    request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def default(self, request):
-        d = self.backend.getDefaultConfiguration(request.nodeType)
+        d = self.backend.getDefaultConfiguration(request.nodeType,
+                                                 self._isPep(request),
+                                                 request.sender)
         return d.addErrback(self._mapErrors)
 
 
     def configureGet(self, request):
-        d = self.backend.getNodeConfiguration(request.nodeIdentifier)
+        d = self.backend.getNodeConfiguration(request.nodeIdentifier,
+                                              self._isPep(request),
+                                              request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def configureSet(self, request):
         d = self.backend.setNodeConfiguration(request.nodeIdentifier,
                                               request.options,
-                                              request.sender)
+                                              request.sender,
+                                              self._isPep(request),
+                                              request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def items(self, request):
         ext_data = {}
-        if const.FLAG_ENABLE_RSM:
+        if const.FLAG_ENABLE_RSM and request.rsm is not None:
             ext_data['rsm'] = request.rsm
+        try:
+            ext_data['pep'] = request.delegated
+        except AttributeError:
+            pass
         d = self.backend.getItems(request.nodeIdentifier,
-                                  request.sender,
+                                  request.recipient,
                                   request.maxItems,
                                   request.itemIdentifiers,
                                   ext_data)
@@ -1093,19 +1258,26 @@
     def retract(self, request):
         d = self.backend.retractItem(request.nodeIdentifier,
                                      request.itemIdentifiers,
-                                     request.sender)
+                                     request.sender,
+                                     request.notify,
+                                     self._isPep(request),
+                                     request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def purge(self, request):
         d = self.backend.purgeNode(request.nodeIdentifier,
-                                   request.sender)
+                                   request.sender,
+                                   self._isPep(request),
+                                   request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def delete(self, request):
         d = self.backend.deleteNode(request.nodeIdentifier,
-                                    request.sender)
+                                    request.sender,
+                                    self._isPep(request),
+                                    request.recipient)
         return d.addErrback(self._mapErrors)
 
 components.registerAdapter(PubSubResourceFromBackend,
--- a/sat_pubsub/const.py	Sun Aug 16 01:15:13 2015 +0200
+++ b/sat_pubsub/const.py	Sun Aug 16 01:32:42 2015 +0200
@@ -57,6 +57,9 @@
 NS_ITEM_CONFIG = "http://jabber.org/protocol/pubsub#item-config"
 OPT_ACCESS_MODEL = 'pubsub#access_model'
 OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed'
+OPT_PERSIST_ITEMS = "pubsub#persist_items"
+OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads"
+OPT_SEND_LAST_PUBLISHED_ITEM = "pubsub#send_last_published_item"
 OPT_PUBLISH_MODEL = 'pubsub#publish_model'
 VAL_AMODEL_OPEN = 'open'
 VAL_AMODEL_ROSTER = 'roster'
@@ -67,7 +70,7 @@
 VAL_PMODEL_OPEN = 'open'
 VAL_PMODEL_DEFAULT = VAL_PMODEL_PUBLISHERS
 
-FLAG_RETRACT_ALLOW_PUBLISHER = True  # XXX: see the method BackendService._doRetractAllowPublisher
+# FLAG_RETRACT_ALLOW_PUBLISHER = True  # XXX: see the method BackendService._doRetractAllowPublisher
 FLAG_ENABLE_RSM = True
 VAL_RSM_MAX_DEFAULT = 10
 FLAG_ENABLE_MAM = True
--- a/sat_pubsub/error.py	Sun Aug 16 01:15:13 2015 +0200
+++ b/sat_pubsub/error.py	Sun Aug 16 01:32:42 2015 +0200
@@ -97,12 +97,17 @@
 
 class NotAuthorized(Error):
     pass
- 
+
 
 
 class NotInRoster(Error):
     pass
- 
+
+
+
+class ItemNotFound(Error):
+    pass
+
 
 
 class ItemForbidden(Error):
--- a/sat_pubsub/pgsql_storage.py	Sun Aug 16 01:15:13 2015 +0200
+++ b/sat_pubsub/pgsql_storage.py	Sun Aug 16 01:32:42 2015 +0200
@@ -72,21 +72,44 @@
 # parseXml manage str, but we get unicode
 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
 
+
+def withPEP(query, values, pep, recipient, pep_table=None):
+    """Helper method to facilitate PEP management
+
+    @param query: SQL query basis
+    @param values: current values to replace in query
+    @param pep: True if we are in PEP mode
+    @param recipient: jid of the recipient
+    @param pep_table: added before pep if table need to be specified
+    @return: query + PEP AND check,
+        recipient's bare jid is added to value if needed
+    """
+    pep_col_name = "{}pep".format(
+                   '' if pep_table is None
+                   else ".{}".format(pep_table))
+    if pep:
+        pep_check="AND {}=%s".format(pep_col_name)
+        values=list(values) + [recipient.userhost()]
+    else:
+        pep_check="AND {} IS NULL".format(pep_col_name)
+    return "{} {}".format(query, pep_check), values
+
+
 class Storage:
 
     implements(iidavoll.IStorage)
 
     defaultConfig = {
             'leaf': {
-                "pubsub#persist_items": True,
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
+                const.OPT_PERSIST_ITEMS: True,
+                const.OPT_DELIVER_PAYLOADS: True,
+                const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
                 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
                 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
             },
             'collection': {
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
+                const.OPT_DELIVER_PAYLOADS: True,
+                const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
                 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
                 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
             }
@@ -95,83 +118,119 @@
     def __init__(self, dbpool):
         self.dbpool = dbpool
 
-    def getNode(self, nodeIdentifier):
-        return self.dbpool.runInteraction(self._getNode, nodeIdentifier)
-
-    def _getNode(self, cursor, nodeIdentifier):
+    def _buildNode(self, row):
+        """Build a note class from database result row"""
         configuration = {}
-        cursor.execute("""SELECT node_type,
-                                 persist_items,
-                                 deliver_payloads,
-                                 send_last_published_item,
-                                 access_model,
-                                 publish_model
-                          FROM nodes
-                          WHERE node=%s""",
-                       (nodeIdentifier,))
-        row = cursor.fetchone()
 
         if not row:
             raise error.NodeNotFound()
 
-        if row[0] == 'leaf':
+        if row[2] == 'leaf':
             configuration = {
-                    'pubsub#persist_items': row[1],
-                    'pubsub#deliver_payloads': row[2],
-                    'pubsub#send_last_published_item': row[3],
-                    const.OPT_ACCESS_MODEL:row[4],
-                    const.OPT_PUBLISH_MODEL:row[5],
+                    'pubsub#persist_items': row[3],
+                    'pubsub#deliver_payloads': row[4],
+                    'pubsub#send_last_published_item': row[5],
+                    const.OPT_ACCESS_MODEL:row[6],
+                    const.OPT_PUBLISH_MODEL:row[7],
                     }
-            node = LeafNode(nodeIdentifier, configuration)
+            node = LeafNode(row[0], row[1], configuration)
+            node.dbpool = self.dbpool
+            return node
+        elif row[2] == 'collection':
+            configuration = {
+                    'pubsub#deliver_payloads': row[4],
+                    'pubsub#send_last_published_item': row[5],
+                    const.OPT_ACCESS_MODEL: row[6],
+                    const.OPT_PUBLISH_MODEL:row[7],
+                    }
+            node = CollectionNode(row[0], row[1], configuration)
             node.dbpool = self.dbpool
             return node
-        elif row[0] == 'collection':
-            configuration = {
-                    'pubsub#deliver_payloads': row[2],
-                    'pubsub#send_last_published_item': row[3],
-                    const.OPT_ACCESS_MODEL: row[4],
-                    const.OPT_PUBLISH_MODEL:row[5],
-                    }
-            node = CollectionNode(nodeIdentifier, configuration)
-            node.dbpool = self.dbpool
-            return node
+        else:
+            raise ValueError("Unknown node type !")
+
+    def getNodeById(self, nodeDbId):
+        """Get node using database ID insted of pubsub identifier
+
+        @param nodeDbId(unicode): database ID
+        """
+        return self.dbpool.runInteraction(self._getNodeById, nodeDbId)
 
 
+    def _getNodeById(self, cursor, nodeDbId):
+        cursor.execute("""SELECT node_id,
+                                 node,
+                                 node_type,
+                                 persist_items,
+                                 deliver_payloads,
+                                 send_last_published_item,
+                                 access_model,
+                                 publish_model,
+                                 pep
+                            FROM nodes
+                            WHERE node_id=%s""",
+                       (nodeDbId,))
+        row = cursor.fetchone()
+        return self._buildNode(row)
 
-    def getNodeIds(self):
-        d = self.dbpool.runQuery("""SELECT node from nodes""")
+    def getNode(self, nodeIdentifier, pep, recipient=None):
+        return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient)
+
+
+    def _getNode(self, cursor, nodeIdentifier, pep, recipient):
+        cursor.execute(*withPEP("""SELECT node_id,
+                                          node,
+                                          node_type,
+                                          persist_items,
+                                          deliver_payloads,
+                                          send_last_published_item,
+                                          access_model,
+                                          publish_model,
+                                          pep
+                                   FROM nodes
+                                   WHERE node=%s""",
+                              (nodeIdentifier,), pep, recipient))
+        row = cursor.fetchone()
+        return self._buildNode(row)
+
+    def getNodeIds(self, pep):
+        d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL"""
+                                    .format("NOT " if pep else ""))
         d.addCallback(lambda results: [r[0] for r in results])
         return d
 
 
-    def createNode(self, nodeIdentifier, owner, config):
+    def createNode(self, nodeIdentifier, owner, config, pep, recipient=None):
         return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
-                                           owner, config)
+                                           owner, config, pep, recipient)
 
 
-    def _createNode(self, cursor, nodeIdentifier, owner, config):
+    def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient):
         if config['pubsub#node_type'] != 'leaf':
             raise error.NoCollections()
 
         owner = owner.userhost()
+
         try:
             cursor.execute("""INSERT INTO nodes
                               (node, node_type, persist_items,
-                               deliver_payloads, send_last_published_item, access_model, publish_model)
+                               deliver_payloads, send_last_published_item, access_model, publish_model, pep)
                               VALUES
-                              (%s, 'leaf', %s, %s, %s, %s, %s)""",
+                              (%s, 'leaf', %s, %s, %s, %s, %s, %s)""",
                            (nodeIdentifier,
                             config['pubsub#persist_items'],
                             config['pubsub#deliver_payloads'],
                             config['pubsub#send_last_published_item'],
                             config[const.OPT_ACCESS_MODEL],
                             config[const.OPT_PUBLISH_MODEL],
+                            recipient.userhost() if pep else None
                             )
                            )
         except cursor._pool.dbapi.IntegrityError:
             raise error.NodeExists()
 
-        cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,));
+        cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""",
+                                (nodeIdentifier,), pep, recipient));
         node_id = cursor.fetchone()[0]
 
         cursor.execute("""SELECT 1 as bool from entities where jid=%s""",
@@ -210,39 +269,49 @@
                 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname)
                                   VALUES (%s,%s)""" , (node_id, group))
 
+    def deleteNodeByDbId(self, db_id):
+        """Delete a node using directly its database id"""
+        return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id)
 
-    def deleteNode(self, nodeIdentifier):
-        return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier)
+    def _deleteNodeByDbId(self, cursor, db_id):
+        cursor.execute("""DELETE FROM nodes WHERE node_id=%s""",
+                       (db_id,))
+
+        if cursor.rowcount != 1:
+            raise error.NodeNotFound()
+
+    def deleteNode(self, nodeIdentifier, pep, recipient=None):
+        return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient)
 
 
-    def _deleteNode(self, cursor, nodeIdentifier):
-        cursor.execute("""DELETE FROM nodes WHERE node=%s""",
-                       (nodeIdentifier,))
+    def _deleteNode(self, cursor, nodeIdentifier, pep, recipient):
+        cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""",
+                                (nodeIdentifier,), pep, recipient))
 
         if cursor.rowcount != 1:
             raise error.NodeNotFound()
 
-    def getNodeGroups(self, nodeIdentifier):
-        return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier)
+    def getNodeGroups(self, nodeIdentifier, pep, recipient=None):
+        return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier, pep, recipient)
 
-    def _getNodeGroups(self, cursor, nodeIdentifier):
-        cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
-                       (nodeIdentifier,))
+    def _getNodeGroups(self, cursor, nodeIdentifier, pep, recipient):
+        cursor.execute(*withPEP("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
+                                (nodeIdentifier,), pep, recipient))
         rows = cursor.fetchall()
 
         return [row[0] for row in rows]
 
-    def getAffiliations(self, entity):
-        d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
+    def getAffiliations(self, entity, pep, recipient=None):
+        d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities
                                         NATURAL JOIN affiliations
                                         NATURAL JOIN nodes
                                         WHERE jid=%s""",
-                                     (entity.userhost(),))
+                                     (entity.userhost(),), pep, recipient, 'nodes'))
         d.addCallback(lambda results: [tuple(r) for r in results])
         return d
 
 
-    def getSubscriptions(self, entity):
+    def getSubscriptions(self, entity, pep, recipient=None):
         def toSubscriptions(rows):
             subscriptions = []
             for row in rows:
@@ -256,8 +325,8 @@
                                      FROM entities
                                      NATURAL JOIN subscriptions
                                      NATURAL JOIN nodes
-                                     WHERE jid=%s""",
-                                  (entity.userhost(),))
+                                     WHERE jid=%s AND nodes.pep=%s""",
+                                  (entity.userhost(), recipient.userhost() if pep else None))
         d.addCallback(toSubscriptions)
         return d
 
@@ -271,15 +340,16 @@
 
     implements(iidavoll.INode)
 
-    def __init__(self, nodeIdentifier, config):
+    def __init__(self, nodeDbId, nodeIdentifier, config):
+        self.nodeDbId = nodeDbId
         self.nodeIdentifier = nodeIdentifier
         self._config = config
         self.owner = None;
 
 
     def _checkNodeExists(self, cursor):
-        cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""",
-                       (self.nodeIdentifier,))
+        cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""",
+                       (self.nodeDbId,))
         if not cursor.fetchone():
             raise error.NodeNotFound()
 
@@ -290,7 +360,7 @@
     def getNodeOwner(self):
         if self.owner:
             return defer.succeed(self.owner)
-        d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node=%s""", (self.nodeIdentifier,))
+        d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s""", (self.nodeDbId,))
         d.addCallback(lambda result: jid.JID(result[0][0]))
         return d
 
@@ -315,12 +385,16 @@
         self._checkNodeExists(cursor)
         cursor.execute("""UPDATE nodes SET persist_items=%s,
                                            deliver_payloads=%s,
-                                           send_last_published_item=%s
-                          WHERE node=%s""",
-                       (config["pubsub#persist_items"],
-                        config["pubsub#deliver_payloads"],
-                        config["pubsub#send_last_published_item"],
-                        self.nodeIdentifier))
+                                           send_last_published_item=%s,
+                                           access_model=%s,
+                                           publish_model=%s
+                          WHERE node_id=%s""",
+                       (config[const.OPT_PERSIST_ITEMS],
+                        config[const.OPT_DELIVER_PAYLOADS],
+                        config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
+                        config[const.OPT_ACCESS_MODEL],
+                        config[const.OPT_PUBLISH_MODEL],
+                        self.nodeDbId))
 
 
     def _setCachedConfiguration(self, void, config):
@@ -342,8 +416,8 @@
         cursor.execute("""SELECT affiliation FROM affiliations
                           NATURAL JOIN nodes
                           NATURAL JOIN entities
-                          WHERE node=%s AND jid=%s""",
-                       (self.nodeIdentifier,
+                          WHERE node_id=%s AND jid=%s""",
+                       (self.nodeDbId,
                         entity.userhost()))
 
         try:
@@ -351,19 +425,9 @@
         except TypeError:
             return None
 
+
     def getAccessModel(self):
-        return self.dbpool.runInteraction(self._getAccessModel)
-
-    def _getAccessModel(self, cursor, entity):
-        self._checkNodeExists(cursor)
-        cursor.execute("""SELECT access_model FROM nodes
-                          WHERE node=%s""",
-                       (self.nodeIdentifier,))
-
-        try:
-            return cursor.fetchone()[0]
-        except TypeError:
-            return None
+        return self._config[const.OPT_ACCESS_MODEL]
 
 
     def getSubscription(self, subscriber):
@@ -379,8 +443,8 @@
         cursor.execute("""SELECT state FROM subscriptions
                           NATURAL JOIN nodes
                           NATURAL JOIN entities
-                          WHERE node=%s AND jid=%s AND resource=%s""",
-                       (self.nodeIdentifier,
+                          WHERE node_id=%s AND jid=%s AND resource=%s""",
+                       (self.nodeDbId,
                         userhost,
                         resource))
 
@@ -398,13 +462,13 @@
     def _getSubscriptions(self, cursor, state):
         self._checkNodeExists(cursor)
 
-        query = """SELECT jid, resource, state,
+        query = """SELECT node, jid, resource, state,
                           subscription_type, subscription_depth
                    FROM subscriptions
                    NATURAL JOIN nodes
                    NATURAL JOIN entities
-                   WHERE node=%s"""
-        values = [self.nodeIdentifier]
+                   WHERE node_id=%s"""
+        values = [self.nodeDbId]
 
         if state:
             query += " AND state=%s"
@@ -415,16 +479,16 @@
 
         subscriptions = []
         for row in rows:
-            subscriber = jid.JID(u'%s/%s' % (row[0], row[1]))
+            subscriber = jid.JID(u'%s/%s' % (row[1], row[2]))
 
             options = {}
-            if row[3]:
-                options['pubsub#subscription_type'] = row[3];
             if row[4]:
-                options['pubsub#subscription_depth'] = row[4];
+                options['pubsub#subscription_type'] = row[4];
+            if row[5]:
+                options['pubsub#subscription_depth'] = row[5];
 
-            subscriptions.append(Subscription(self.nodeIdentifier, subscriber,
-                                              row[2], options))
+            subscriptions.append(Subscription(row[0], subscriber,
+                                              row[3], options))
 
         return subscriptions
 
@@ -453,17 +517,14 @@
             cursor.execute("""INSERT INTO subscriptions
                               (node_id, entity_id, resource, state,
                                subscription_type, subscription_depth)
-                              SELECT node_id, entity_id, %s, %s, %s, %s FROM
-                              (SELECT node_id FROM nodes
-                                              WHERE node=%s) as n
-                              CROSS JOIN
+                              SELECT %s, entity_id, %s, %s, %s, %s FROM
                               (SELECT entity_id FROM entities
-                                                WHERE jid=%s) as e""",
-                           (resource,
+                                                WHERE jid=%s) AS ent_id""",
+                           (self.nodeDbId,
+                            resource,
                             state,
                             subscription_type,
                             subscription_depth,
-                            self.nodeIdentifier,
                             userhost))
         except cursor._pool.dbapi.IntegrityError:
             raise error.SubscriptionExists()
@@ -481,12 +542,11 @@
         resource = subscriber.resource or ''
 
         cursor.execute("""DELETE FROM subscriptions WHERE
-                          node_id=(SELECT node_id FROM nodes
-                                                  WHERE node=%s) AND
+                          node_id=%s AND
                           entity_id=(SELECT entity_id FROM entities
                                                       WHERE jid=%s) AND
                           resource=%s""",
-                       (self.nodeIdentifier,
+                       (self.nodeDbId,
                         userhost,
                         resource))
         if cursor.rowcount != 1:
@@ -506,9 +566,9 @@
                           NATURAL JOIN subscriptions
                           NATURAL JOIN nodes
                           WHERE entities.jid=%s
-                          AND node=%s AND state='subscribed'""",
+                          AND node_id=%s AND state='subscribed'""",
                        (entity.userhost(),
-                       self.nodeIdentifier))
+                       self.nodeDbId))
 
         return cursor.fetchone() is not None
 
@@ -523,8 +583,8 @@
         cursor.execute("""SELECT jid, affiliation FROM nodes
                           NATURAL JOIN affiliations
                           NATURAL JOIN entities
-                          WHERE node=%s""",
-                       (self.nodeIdentifier,))
+                          WHERE node_id=%s""",
+                       (self.nodeDbId,))
         result = cursor.fetchall()
 
         return [(jid.internJID(r[0]), r[1]) for r in result]
@@ -541,36 +601,37 @@
         return self.dbpool.runInteraction(self._storeItems, item_data, publisher)
 
 
-    def _storeItems(self, cursor, item_data, publisher):
+    def _storeItems(self, cursor, items_data, publisher):
         self._checkNodeExists(cursor)
-        for item_datum in item_data:
-            self._storeItem(cursor, item_datum, publisher)
+        for item_data in items_data:
+            self._storeItem(cursor, item_data, publisher)
 
 
-    def _storeItem(self, cursor, item_datum, publisher):
-        access_model, item_config, item = item_datum
+    def _storeItem(self, cursor, item_data, publisher):
+        item, access_model, item_config = item_data
         data = item.toXml()
 
         cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
                           FROM nodes
                           WHERE nodes.node_id = items.node_id AND
-                                nodes.node = %s and items.item=%s""",
+                                nodes.node_id = %s and items.item=%s""",
                        (publisher.full(),
                         data,
-                        self.nodeIdentifier,
+                        self.nodeDbId,
                         item["id"]))
         if cursor.rowcount == 1:
             return
 
         cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model)
-                          SELECT node_id, %s, %s, %s, %s FROM nodes
-                                                     WHERE node=%s
+                          SELECT %s, %s, %s, %s, %s FROM nodes
+                                                     WHERE node_id=%s
                                                      RETURNING item_id""",
-                       (item["id"],
+                       (self.nodeDbId,
+                        item["id"],
                         publisher.full(),
                         data,
                         access_model,
-                        self.nodeIdentifier))
+                        self.nodeDbId))
 
         if access_model == const.VAL_AMODEL_ROSTER:
             item_id = cursor.fetchone()[0];
@@ -596,10 +657,9 @@
 
         for itemIdentifier in itemIdentifiers:
             cursor.execute("""DELETE FROM items WHERE
-                              node_id=(SELECT node_id FROM nodes
-                                                      WHERE node=%s) AND
+                              node_id=%s AND
                               item=%s""",
-                           (self.nodeIdentifier,
+                           (self.nodeDbId,
                             itemIdentifier))
 
             if cursor.rowcount:
@@ -623,40 +683,44 @@
         return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data)
 
     def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data):
+        #  FIXME: simplify the query construction
         self._checkNodeExists(cursor)
 
         if unrestricted:
             query = ["SELECT data,items.access_model,item_id"]
             source = """FROM nodes
                        INNER JOIN items USING (node_id)
-                       WHERE node=%s"""
-            args = [self.nodeIdentifier]
+                       WHERE node_id=%s"""
+            args = [self.nodeDbId]
         else:
             query = ["SELECT data"]
             groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else ""
             source = """FROM nodes
                        INNER JOIN items USING (node_id)
                        LEFT JOIN item_groups_authorized USING (item_id)
-                       WHERE node=%s AND
+                       WHERE node_id=%s AND
                        (items.access_model='open'""" + groups + ")"
 
-            args = [self.nodeIdentifier]
+            args = [self.nodeDbId]
             if authorized_groups:
                 args.append(authorized_groups)
 
         if 'filters' in ext_data:  # MAM filters
             for filter_ in ext_data['filters']:
                 if filter_.var == 'start':
-                    source += " AND date>='{date}'".format(date=filter_.value)
+                    source += " AND date>=%s"
+                    args.append(filter_.value)
                 if filter_.var == 'end':
-                    source += " AND date<='{date}'".format(date=filter_.value)
+                    source += " AND date<=%s"
+                    args.append(filter_.value)
                 if filter_.var == 'with':
                     jid_s = filter_.value
                     if '/' in jid_s:
-                        source += " AND publisher='{pub}'".format(pub=filter_.value)
-                    else:  # assume the publisher field in DB is always a full JID
-                        # XXX: need to escape the % with itself to avoid formatting error
-                        source += " AND publisher LIKE '{pub}/%%'".format(pub=filter_.value)
+                        source += " AND publisher=%s"
+                        args.append(filter_.value)
+                    else:
+                        source += " AND publisher LIKE %s"
+                        args.append(u"{}%".format(filter_.value))
 
         query.append(source)
         order = "DESC"
@@ -666,7 +730,9 @@
             maxItems = rsm.max
             if rsm.index is not None:
                 query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)")
-                args.append(self.nodeIdentifier)
+                # FIXME: change the request so source is not used 2 times
+                # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args
+                args.append(self.nodeDbId)
                 if authorized_groups:
                     args.append(authorized_groups)
                 args.append(rsm.index)
@@ -694,11 +760,10 @@
                 item = generic.stripNamespace(parseXml(data[0]))
                 access_model = data[1]
                 item_id = data[2]
-                if access_model == 'roster': #TODO: jid access_model
+                access_list = {}
+                if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
                     cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
-                    access_list = [r[0] for r in cursor.fetchall()]
-                else:
-                    access_list = None
+                    access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
 
                 ret.append((item, access_model, access_list))
             return ret
@@ -720,18 +785,18 @@
         if unrestricted:
             query = ["""SELECT count(item_id) FROM nodes
                        INNER JOIN items USING (node_id)
-                       WHERE node=%s"""]
-            args = [self.nodeIdentifier]
+                       WHERE node_id=%s"""]
+            args = [self.nodeDbId]
         else:
             query = ["""SELECT count(item_id) FROM nodes
                        INNER  JOIN items USING (node_id)
                        LEFT JOIN item_groups_authorized USING (item_id)
-                       WHERE node=%s AND
+                       WHERE node_id=%s AND
                        (items.access_model='open' """ +
                        ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
                        ")"]
 
-            args = [self.nodeIdentifier]
+            args = [self.nodeDbId]
             if authorized_groups:
                 args.append(authorized_groups)
 
@@ -755,22 +820,22 @@
             query = ["""SELECT row_number FROM (
                        SELECT row_number() OVER (ORDER BY date DESC), item
                        FROM nodes INNER JOIN items USING (node_id)
-                       WHERE node=%s
+                       WHERE node_id=%s
                        ) as x
                        WHERE item=%s LIMIT 1"""]
-            args = [self.nodeIdentifier]
+            args = [self.nodeDbId]
         else:
             query = ["""SELECT row_number FROM (
                        SELECT row_number() OVER (ORDER BY date DESC), item
                        FROM nodes INNER JOIN items USING (node_id)
                        LEFT JOIN item_groups_authorized USING (item_id)
-                       WHERE node=%s AND
+                       WHERE node_id=%s AND
                        (items.access_model='open' """ +
                        ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
                        """)) as x
                        WHERE item=%s LIMIT 1"""]
 
-            args = [self.nodeIdentifier]
+            args = [self.nodeDbId]
             if authorized_groups:
                 args.append(authorized_groups)
 
@@ -784,7 +849,8 @@
         @param authorized_groups: we want to get items that these groups can access
         @param unrestricted: if true, don't check permissions
         @param itemIdentifiers: list of ids of the items we want to get
-        @return: list of (item, access_model, access_model) if unrestricted is True, else list of items
+        @return: list of (item, access_model, access_list) if unrestricted is True, else list of items
+            access_list is managed as a dictionnary with same key as for item_config
         """
         return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers)
 
@@ -796,41 +862,66 @@
             for itemIdentifier in itemIdentifiers:
                 cursor.execute("""SELECT data,items.access_model,item_id FROM nodes
                                   INNER JOIN items USING (node_id)
-                                  WHERE node=%s AND item=%s""",
-                               (self.nodeIdentifier,
+                                  WHERE node_id=%s AND item=%s""",
+                               (self.nodeDbId,
                                 itemIdentifier))
                 result = cursor.fetchone()
-                if result:
-                    for data in result:
-                        item = generic.stripNamespace(parseXml(data[0]))
-                        access_model = data[1]
-                        item_id = data[2]
-                        if access_model == 'roster': #TODO: jid access_model
-                            cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
-                            access_list = [r[0] for r in cursor.fetchall()]
-                        else:
-                            access_list = None
+                if not result:
+                    raise error.ItemNotFound()
 
-                        ret.append((item, access_model, access_list))
+                item = generic.stripNamespace(parseXml(result[0]))
+                access_model = result[1]
+                item_id = result[2]
+                access_list = {}
+                if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
+                    cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
+                    access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
+
+                ret.append((item, access_model, access_list))
         else: #we check permission before returning items
             for itemIdentifier in itemIdentifiers:
-                args = [self.nodeIdentifier, itemIdentifier]
+                args = [self.nodeDbId, itemIdentifier]
                 if authorized_groups:
                     args.append(authorized_groups)
                 cursor.execute("""SELECT data FROM nodes
                            INNER  JOIN items USING (node_id)
                            LEFT JOIN item_groups_authorized USING (item_id)
-                           WHERE node=%s AND item=%s AND
+                           WHERE node_id=%s AND item=%s AND
                            (items.access_model='open' """ +
                            ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")",
                            args)
 
                 result = cursor.fetchone()
                 if result:
-                    ret.append(parseXml(result[0]))
+                    ret.append(generic.stripNamespace(parseXml(result[0])))
 
         return ret
 
+
+    def getItemsPublishers(self, itemIdentifiers):
+        """Get the publishers for all given identifiers
+
+        @return (dict): map of itemIdentifiers to publisher
+        """
+        return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers)
+
+
+    def _getItemsPublishers(self, cursor, itemIdentifiers):
+        self._checkNodeExists(cursor)
+        ret = {}
+        for itemIdentifier in itemIdentifiers:
+            cursor.execute("""SELECT publisher FROM items
+                              WHERE item=%s""",
+                            (itemIdentifier,))
+            result = cursor.fetchone()
+            if not result:
+                # We have an internal error, that's why we use ValueError
+                # and not error.ItemNotFound()
+                raise ValueError() # itemIdentifier must exists
+            ret[itemIdentifier] = jid.JID(result[0])
+        return ret
+
+
     def purge(self):
         return self.dbpool.runInteraction(self._purge)
 
@@ -839,23 +930,23 @@
         self._checkNodeExists(cursor)
 
         cursor.execute("""DELETE FROM items WHERE
-                          node_id=(SELECT node_id FROM nodes WHERE node=%s)""",
-                       (self.nodeIdentifier,))
+                          node_id=%s""",
+                       (self.nodeDbId,))
 
-
-    def filterItemsWithPublisher(self, itemIdentifiers, requestor):
-        return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, requestor)
+   # FIXME: to be checked
+   #  def filterItemsWithPublisher(self, itemIdentifiers, recipient):
+   #      return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, recipient)
 
-    def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor):
-        self._checkNodeExists(cursor)
-        ret = []
-        for itemIdentifier in itemIdentifiers:
-            args = ["%s/%%" % requestor.userhost(), itemIdentifier]
-            cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args)
-            result = cursor.fetchone()
-            if result:
-                ret.append(result[0])
-        return ret
+   #  def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor):
+   #      self._checkNodeExists(cursor)
+   #      ret = []
+   #      for itemIdentifier in itemIdentifiers:
+   #          args = ["%s/%%" % requestor.userhost(), itemIdentifier]
+   #          cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args)
+   #          result = cursor.fetchone()
+   #          if result:
+   #              ret.append(result[0])
+   #      return ret
 
 class CollectionNode(Node):