diff sat_pubsub/backend.py @ 294:df1edebb0466

PEP implementation, draft (huge patch sorry): /!\ database schema has changed ! /!\ - whole PEP behaviour is not managed yet - if the stanza is delegated, PEP is assumed - fixed potential SQL injection in pgsql_storage - publish notifications manage PEP - added retract notifications (if "notify" attribute is present), with PEP handling - a publisher can't replace an item he didn't publised anymore - /!\ schema has changed, sat_pubsub_update_0_1.sql update it - sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author Goffi <goffi@goffi.org>
date Sun, 16 Aug 2015 01:32:42 +0200
parents 9f612fa19eea
children 6ce33757d21e
line wrap: on
line diff
--- a/sat_pubsub/backend.py	Sun Aug 16 01:15:13 2015 +0200
+++ b/sat_pubsub/backend.py	Sun Aug 16 01:32:42 2015 +0200
@@ -70,7 +70,7 @@
 from twisted.python import components, log
 from twisted.internet import defer, reactor
 from twisted.words.protocols.jabber.error import StanzaError
-from twisted.words.protocols.jabber.jid import JID, InvalidFormat
+# from twisted.words.protocols.jabber.jid import JID, InvalidFormat
 from twisted.words.xish import utility
 
 from wokkel import disco, data_form, rsm
@@ -103,13 +103,13 @@
     implements(iidavoll.IBackendService)
 
     nodeOptions = {
-            "pubsub#persist_items":
+            const.OPT_PERSIST_ITEMS:
                 {"type": "boolean",
                  "label": "Persist items to storage"},
-            "pubsub#deliver_payloads":
+            const.OPT_DELIVER_PAYLOADS:
                 {"type": "boolean",
                  "label": "Deliver payloads with event notifications"},
-            "pubsub#send_last_published_item":
+            const.OPT_SEND_LAST_PUBLISHED_ITEM:
                 {"type": "list-single",
                  "label": "When to send the last published item",
                  "options": {
@@ -181,18 +181,20 @@
         return True
 
 
-    def getNodeType(self, nodeIdentifier):
-        d = self.storage.getNode(nodeIdentifier)
+    def getNodeType(self, nodeIdentifier, pep, recipient=None):
+        # FIXME: manage pep and recipient
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getType())
         return d
 
 
-    def getNodes(self):
-        return self.storage.getNodeIds()
+    def getNodes(self, pep):
+        return self.storage.getNodeIds(pep)
 
 
-    def getNodeMetaData(self, nodeIdentifier):
-        d = self.storage.getNode(nodeIdentifier)
+    def getNodeMetaData(self, nodeIdentifier, pep, recipient=None):
+        # FIXME: manage pep and recipient
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getMetaData())
         d.addCallback(self._makeMetaData)
         return d
@@ -214,14 +216,13 @@
         """ Check authorisation of publishing in node for requestor """
 
         def check(affiliation):
-            d = defer.succeed(node)
+            d = defer.succeed((affiliation, node))
             configuration = node.getConfiguration()
             publish_model = configuration[const.OPT_PUBLISH_MODEL]
-
-            if (publish_model == const.VAL_PMODEL_PUBLISHERS):
+            if publish_model == const.VAL_PMODEL_PUBLISHERS:
                 if affiliation not in ['owner', 'publisher']:
                     raise error.Forbidden()
-            elif (publish_model == const.VAL_PMODEL_SUBSCRIBERS):
+            elif publish_model == const.VAL_PMODEL_SUBSCRIBERS:
                 if affiliation not in ['owner', 'publisher']:
                     # we are in subscribers publish model, we must check that
                     # the requestor is a subscriber to allow him to publish
@@ -229,12 +230,12 @@
                     def checkSubscription(subscribed):
                         if not subscribed:
                             raise error.Forbidden()
-                        return node
+                        return (affiliation, node)
 
                     d.addCallback(lambda ignore: node.isSubscribed(requestor))
                     d.addCallback(checkSubscription)
             elif publish_model != const.VAL_PMODEL_OPEN:
-                raise Exception('Unexpected value') # publish_model must be publishers (default), subscribers or open.
+                raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open.
 
             return d
 
@@ -246,6 +247,7 @@
         """Get and remove item configuration information
         @param item:
         """
+        # FIXME: dirty ! Need to use elements()
         item_config = None
         access_model = const.VAL_AMODEL_DEFAULT
         for i in range(len(item.children)):
@@ -264,60 +266,87 @@
         return (access_model, item_config)
 
 
-    def publish(self, nodeIdentifier, items, requestor):
-        d = self.storage.getNode(nodeIdentifier)
+    def _checkOverwrite(self, node, itemIdentifiers, publisher):
+        """Check that the itemIdentifiers correspond to items published
+        by the current publisher"""
+        def doCheck(item_pub_map):
+            for item_publisher in item_pub_map.iterValues():
+                if item_publisher.userhost() != publisher.userhost():
+                    raise error.ItemForbidden()
+
+        d = node.getItemsPublishers(itemIdentifiers)
+        d.addCallback(doCheck)
+        return d
+
+
+    def publish(self, nodeIdentifier, items, requestor, pep, recipient):
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(self._checkAuth, requestor)
         #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster.
         #FIXME: in addition, there can be several owners: that is not managed yet
-        d.addCallback(self._doPublish, items, requestor)
+        d.addCallback(self._doPublish, items, requestor, pep, recipient)
         return d
 
 
-    def _doPublish(self, node, items, requestor):
+    def _doPublish(self, result, items, requestor, pep, recipient):
+        affiliation, node = result
         if node.nodeType == 'collection':
             raise error.NoPublishing()
 
         configuration = node.getConfiguration()
-        persistItems = configuration["pubsub#persist_items"]
-        deliverPayloads = configuration["pubsub#deliver_payloads"]
+        persistItems = configuration[const.OPT_PERSIST_ITEMS]
+        deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS]
 
         if items and not persistItems and not deliverPayloads:
             raise error.ItemForbidden()
         elif not items and (persistItems or deliverPayloads):
             raise error.ItemRequired()
 
-        parsed_items = []
+        items_data = []
+        check_overwrite = False
         for item in items:
             if persistItems or deliverPayloads:
                 item.uri = None
                 item.defaultUri = None
                 if not item.getAttribute("id"):
                     item["id"] = str(uuid.uuid4())
+                else:
+                    check_overwrite = True
             access_model, item_config = self.parseItemConfig(item)
-            parsed_items.append((access_model, item_config, item))
+            items_data.append((item, access_model, item_config))
 
         if persistItems:
-            d = node.storeItems(parsed_items, requestor)
+            if check_overwrite and affiliation != 'owner':
+                # we don't want a publisher to overwrite the item
+                # of an other publisher
+                d = self._checkOverwrite(node, [item['id'] for item in items if item.getAttribute('id')], requestor)
+                d.addCallback(lambda _: node.storeItems(items_data, requestor))
+            else:
+                d = node.storeItems(items_data, requestor)
         else:
             d = defer.succeed(None)
 
-        d.addCallback(self._doNotify, node, parsed_items,
-                      deliverPayloads)
+        d.addCallback(self._doNotify, node, items_data,
+                      deliverPayloads, pep, recipient)
         return d
 
 
-    def _doNotify(self, result, node, items, deliverPayloads):
-        if items and not deliverPayloads:
-            for access_model, item_config, item in items:
+    def _doNotify(self, result, node, items_data, deliverPayloads, pep, recipient):
+        if items_data and not deliverPayloads:
+            for access_model, item_config, item in items_data:
                 item.children = []
-
-        self.dispatch({'items': items, 'node': node},
+        self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient},
                       '//event/pubsub/notify')
 
 
-    def getNotifications(self, nodeIdentifier, items):
+    def getNotifications(self, nodeDbId, items_data):
+        """Build a list of subscriber to the node
 
-        def toNotifications(subscriptions, nodeIdentifier, items):
+        subscribers will be associated with subscribed items,
+        and subscription type.
+        """
+
+        def toNotifications(subscriptions, items_data):
             subsBySubscriber = {}
             for subscription in subscriptions:
                 if subscription.options.get('pubsub#subscription_type',
@@ -326,7 +355,7 @@
                                                        set())
                     subs.add(subscription)
 
-            notifications = [(subscriber, subscriptions_, items)
+            notifications = [(subscriber, subscriptions_, items_data)
                              for subscriber, subscriptions_
                              in subsBySubscriber.iteritems()]
 
@@ -336,37 +365,46 @@
             failure.trap(error.NodeNotFound)
             return []
 
-        d1 = self.storage.getNode(nodeIdentifier)
+        d1 = self.storage.getNodeById(nodeDbId)
         d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
-        d2 = self.storage.getNode('')
-        d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
-        d2.addErrback(rootNotFound)
-        d = defer.gatherResults([d1, d2])
-        d.addCallback(lambda result: result[0] + result[1])
-        d.addCallback(toNotifications, nodeIdentifier, items)
-        return d
+        # FIXME: must add root node subscriptions ?
+        # d2 = self.storage.getNode('', False) # FIXME: to check
+        # d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
+        # d2.addErrback(rootNotFound)
+        # d = defer.gatherResults([d1, d2])
+        # d.addCallback(lambda result: result[0] + result[1])
+        d1.addCallback(toNotifications, items_data)
+        return d1
 
-    def registerNotifier(self, observerfn, *args, **kwargs):
+    def registerPublishNotifier(self, observerfn, *args, **kwargs):
         self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
 
-    def subscribe(self, nodeIdentifier, subscriber, requestor):
+    def registerRetractNotifier(self, observerfn, *args, **kwargs):
+        self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs)
+
+    def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
         subscriberEntity = subscriber.userhostJID()
         if subscriberEntity != requestor.userhostJID():
             return defer.fail(error.Forbidden())
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, subscriberEntity)
         d.addCallback(self._doSubscribe, subscriber)
         return d
 
 
     def _doSubscribe(self, result, subscriber):
+        # TODO: implement other access models
         node, affiliation = result
-        #FIXME: must check node's access_model before subscribing
 
         if affiliation == 'outcast':
             raise error.Forbidden()
 
+        access_model = node.getAccessModel()
+
+        if access_model != const.VAL_AMODEL_OPEN:
+            raise NotImplementedError
+
         def trapExists(failure):
             failure.trap(error.SubscriptionExists)
             return False
@@ -380,6 +418,7 @@
         d = node.addSubscription(subscriber, 'subscribed', {})
         d.addCallbacks(lambda _: True, trapExists)
         d.addCallback(cb)
+
         return d
 
 
@@ -406,11 +445,11 @@
         return subscription
 
 
-    def unsubscribe(self, nodeIdentifier, subscriber, requestor):
+    def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
         if subscriber.userhostJID() != requestor.userhostJID():
             return defer.fail(error.Forbidden())
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.removeSubscription(subscriber))
         return d
 
@@ -428,33 +467,33 @@
         return True
 
 
-    def createNode(self, nodeIdentifier, requestor, options = None):
+    def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None):
         if not nodeIdentifier:
             nodeIdentifier = 'generic/%s' % uuid.uuid4()
 
         if not options:
             options = {}
 
-        if self.supportsCreatorCheck():
-            groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX)
-            try:
-                nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier)
-            except InvalidFormat:
-                is_user_jid = False
-            else:
-                is_user_jid = bool(nodeIdentifierJID.user)
+        # if self.supportsCreatorCheck():
+        #     groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX)
+        #     try:
+        #         nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier)
+        #     except InvalidFormat:
+        #         is_user_jid = False
+        #     else:
+        #         is_user_jid = bool(nodeIdentifierJID.user)
 
-            if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID():
-                #we have an user jid node, but not created by the owner of this jid
-                print "Wrong creator"
-                raise error.Forbidden()
+        #     if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID():
+        #         #we have an user jid node, but not created by the owner of this jid
+        #         print "Wrong creator"
+        #         raise error.Forbidden()
 
         nodeType = 'leaf'
         config = self.storage.getDefaultConfiguration(nodeType)
         config['pubsub#node_type'] = nodeType
         config.update(options)
 
-        d = self.storage.createNode(nodeIdentifier, requestor, config)
+        d = self.storage.createNode(nodeIdentifier, requestor, config, pep, recipient)
         d.addCallback(lambda _: nodeIdentifier)
         return d
 
@@ -464,21 +503,21 @@
         return d
 
 
-    def getNodeConfiguration(self, nodeIdentifier):
+    def getNodeConfiguration(self, nodeIdentifier, pep, recipient):
         if not nodeIdentifier:
             return defer.fail(error.NoRootNode())
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getConfiguration())
 
         return d
 
 
-    def setNodeConfiguration(self, nodeIdentifier, options, requestor):
+    def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient):
         if not nodeIdentifier:
             return defer.fail(error.NoRootNode())
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, requestor)
         d.addCallback(self._doSetNodeConfiguration, options)
         return d
@@ -497,13 +536,13 @@
         return self.storage.getAffiliations(entity)
 
 
-    def getItems(self, nodeIdentifier, requestor, maxItems=None,
+    def getItems(self, nodeIdentifier, recipient, maxItems=None,
                        itemIdentifiers=None, ext_data=None):
         if ext_data is None:
             ext_data = {}
-        d = self.storage.getNode(nodeIdentifier)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doGetItems, requestor, maxItems, itemIdentifiers,
+        d = self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
+        d.addCallback(_getAffiliation, recipient)
+        d.addCallback(self._doGetItems, recipient, maxItems, itemIdentifiers,
                       ext_data)
         return d
 
@@ -545,7 +584,7 @@
                 elif access_model == const.VAL_AMODEL_ROSTER:
                     form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG)
                     access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER)
-                    allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list)
+                    allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list[const.OPT_ROSTER_GROUPS_ALLOWED])
                     form.addField(access)
                     form.addField(allowed)
                     item.addChild(form.toElement())
@@ -590,15 +629,15 @@
         if affiliation == 'outcast':
             raise error.Forbidden()
 
-        access_model = node.getConfiguration()["pubsub#access_model"]
+        access_model = node.getAccessModel()
         d = node.getNodeOwner()
         d.addCallback(self.privilege.getRoster)
         d.addErrback(self._rosterEb)
 
-        if access_model == 'open' or affiliation == 'owner':
+        if access_model == const.VAL_AMODEL_OPEN or affiliation == 'owner':
             d.addCallback(lambda roster: (True, roster))
             d.addCallback(access_checked)
-        elif access_model == 'roster':
+        elif access_model == const.VAL_AMODEL_ROSTER:
             d.addCallback(self._getNodeGroups,node.nodeIdentifier)
             d.addCallback(self.checkGroup, requestor)
             d.addCallback(access_checked)
@@ -650,36 +689,39 @@
 
         return defer.DeferredList(d_list).addCallback(render)
 
-    def retractItem(self, nodeIdentifier, itemIdentifiers, requestor):
-        d = self.storage.getNode(nodeIdentifier)
+    def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, requestor)
-        if const.FLAG_RETRACT_ALLOW_PUBLISHER:
-            d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor)
-        else:
-            d.addCallback(self._doRetract, itemIdentifiers)
+        # FIXME: to be checked
+        # if const.FLAG_RETRACT_ALLOW_PUBLISHER:
+        #     d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor)
+        # else:
+        #     d.addCallback(self._doRetract, itemIdentifiers)
+        d.addCallback(self._doRetract, itemIdentifiers, notify, pep, recipient)
         return d
 
-    def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor):
-        """This method has been added to allow the publisher
-        of an item to retract it, even if he has no affiliation
-        to that item. For instance, this allows you to delete
-        an item you posted in a node of "open" publish model.
-        """
+    # FIXME: to be checked
+    # def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor):
+    #     """This method has been added to allow the publisher
+    #     of an item to retract it, even if he has no affiliation
+    #     to that item. For instance, this allows you to delete
+    #     an item you posted in a node of "open" publish model.
+    #     """
+    #     node, affiliation = result
+    #     if affiliation in ['owner', 'publisher']:
+    #         return self._doRetract(result, itemIdentifiers)
+    #     d = node.filterItemsWithPublisher(itemIdentifiers, requestor)
+    #     def filterCb(filteredItems):
+    #         if not filteredItems:
+    #             return self._doRetract(result, itemIdentifiers)
+    #         # XXX: fake an affiliation that does NOT exist
+    #         return self._doRetract((node, 'publisher'), filteredItems)
+    #     d.addCallback(filterCb)
+    #     return d
+
+    def _doRetract(self, result, itemIdentifiers, notify, pep, recipient):
         node, affiliation = result
-        if affiliation in ['owner', 'publisher']:
-            return self._doRetract(result, itemIdentifiers)
-        d = node.filterItemsWithPublisher(itemIdentifiers, requestor)
-        def filterCb(filteredItems):
-            if not filteredItems:
-                return self._doRetract(result, itemIdentifiers)
-            # XXX: fake an affiliation that does NOT exist
-            return self._doRetract((node, 'publisher'), filteredItems)
-        d.addCallback(filterCb)
-        return d
-
-    def _doRetract(self, result, itemIdentifiers):
-        node, affiliation = result
-        persistItems = node.getConfiguration()["pubsub#persist_items"]
+        persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
 
         if affiliation not in ['owner', 'publisher']:
             raise error.Forbidden()
@@ -687,19 +729,32 @@
         if not persistItems:
             raise error.NodeNotPersistent()
 
-        d = node.removeItems(itemIdentifiers)
-        d.addCallback(self._doNotifyRetraction, node.nodeIdentifier)
+        # we need to get the items before removing them, for the notifications
+
+        def removeItems(items_data):
+            """Remove the items and keep only actually removed ones in items_data"""
+            d = node.removeItems(itemIdentifiers)
+            d.addCallback(lambda removed: [item_data for item_data in items_data if item_data[0]["id"] in removed])
+            return d
+
+        d = node.getItemsById(None, True, itemIdentifiers)
+        d.addCallback(removeItems)
+
+        if notify:
+            d.addCallback(self._doNotifyRetraction, node, pep, recipient)
         return d
 
 
-    def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier):
-        self.dispatch({'itemIdentifiers': itemIdentifiers,
-                       'nodeIdentifier': nodeIdentifier },
+    def _doNotifyRetraction(self, items_data, node, pep, recipient):
+        self.dispatch({'items_data': items_data,
+                       'node': node,
+                       'pep': pep,
+                       'recipient': recipient},
                       '//event/pubsub/retract')
 
 
-    def purgeNode(self, nodeIdentifier, requestor):
-        d = self.storage.getNode(nodeIdentifier)
+    def purgeNode(self, nodeIdentifier, requestor, pep, recipient):
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, requestor)
         d.addCallback(self._doPurge)
         return d
@@ -707,7 +762,7 @@
 
     def _doPurge(self, result):
         node, affiliation = result
-        persistItems = node.getConfiguration()["pubsub#persist_items"]
+        persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
 
         if affiliation != 'owner':
             raise error.Forbidden()
@@ -728,24 +783,24 @@
         self._callbackList.append(preDeleteFn)
 
 
-    def getSubscribers(self, nodeIdentifier):
+    def getSubscribers(self, nodeIdentifier, pep, recipient):
         def cb(subscriptions):
             return [subscription.subscriber for subscription in subscriptions]
 
-        d = self.storage.getNode(nodeIdentifier)
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getSubscriptions('subscribed'))
         d.addCallback(cb)
         return d
 
 
-    def deleteNode(self, nodeIdentifier, requestor, redirectURI=None):
-        d = self.storage.getNode(nodeIdentifier)
+    def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None):
+        d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doPreDelete, redirectURI)
+        d.addCallback(self._doPreDelete, redirectURI, pep, recipient)
         return d
 
 
-    def _doPreDelete(self, result, redirectURI):
+    def _doPreDelete(self, result, redirectURI, pep, recipient):
         node, affiliation = result
 
         if affiliation != 'owner':
@@ -754,19 +809,19 @@
         data = {'node': node,
                 'redirectURI': redirectURI}
 
-        d = defer.DeferredList([cb(data)
+        d = defer.DeferredList([cb(data, pep, recipient)
                                 for cb in self._callbackList],
                                consumeErrors=1)
-        d.addCallback(self._doDelete, node.nodeIdentifier)
+        d.addCallback(self._doDelete, node.nodeDbId)
 
 
-    def _doDelete(self, result, nodeIdentifier):
+    def _doDelete(self, result, nodeDbId):
         dl = []
         for succeeded, r in result:
             if succeeded and r:
                 dl.extend(r)
 
-        d = self.storage.deleteNode(nodeIdentifier)
+        d = self.storage.deleteNodeByDbId(nodeDbId)
         d.addCallback(self._doNotifyDelete, dl)
 
         return d
@@ -812,6 +867,7 @@
         error.Forbidden: ('forbidden', None, None),
         error.NotAuthorized: ('not-authorized', None, None),
         error.NotInRoster: ('not-authorized', 'not-in-roster-group', None),
+        error.ItemNotFound: ('item-not-found', None, None),
         error.ItemForbidden: ('bad-request', 'item-forbidden', None),
         error.ItemRequired: ('bad-request', 'item-required', None),
         error.NoInstantNodes: ('not-acceptable',
@@ -838,11 +894,13 @@
         self.backend = backend
         self.hideNodes = False
 
-        self.backend.registerNotifier(self._notify)
+        self.backend.registerPublishNotifier(self._notifyPublish)
+        self.backend.registerRetractNotifier(self._notifyRetract)
         self.backend.registerPreDelete(self._preDelete)
 
-        if self.backend.supportsCreatorCheck():
-            self.features.append("creator-jid-check")  #SàT custom feature: Check that a node (which correspond to
+        # FIXME: to be removed, it's not useful anymore as PEP is now used
+        # if self.backend.supportsCreatorCheck():
+        #     self.features.append("creator-jid-check")  #SàT custom feature: Check that a node (which correspond to
                                                        #                    a jid in this server) is created by the right jid
 
         if self.backend.supportsAutoCreate():
@@ -866,39 +924,14 @@
         # if self.backend.supportsPublishModel():       #XXX: this feature is not really described in XEP-0060, we just can see it in examples
         #     self.features.append("publish_model")     #     but it's necessary for microblogging comments (see XEP-0277)
 
-    def _notify(self, data):
-        items = data['items']
+    def _notifyPublish(self, data):
+        items_data = data['items_data']
         node = data['node']
-
-        def _notifyAllowed(result):
-            """Check access of subscriber for each item,
-            and notify only allowed ones"""
-            notifications, (owner_jid,roster) = result
-
-            #we filter items not allowed for the subscribers
-            notifications_filtered = []
-
-            for subscriber, subscriptions, _items in notifications:
-                allowed_items = [] #we keep only item which subscriber can access
+        pep = data['pep']
+        recipient = data['recipient']
 
-                for access_model, item_config, item in _items:
-                    if access_model == 'open':
-                        allowed_items.append(item)
-                    elif access_model == 'roster':
-                        _subscriber = subscriber.userhostJID()
-                        if not _subscriber in roster:
-                            continue
-                        #the subscriber is known, is he in the right group ?
-                        authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED]
-                        if roster[_subscriber].groups.intersection(authorized_groups):
-                            allowed_items.append(item)
-
-                    else: #unknown access_model
-                        raise NotImplementedError
-
-                if allowed_items:
-                    notifications_filtered.append((subscriber, subscriptions, allowed_items))
-
+        def afterPrepare(result):
+            owner_jid, notifications_filtered = result
             #we notify the owner
             #FIXME: check if this comply with XEP-0060 (option needed ?)
             #TODO: item's access model have to be sent back to owner
@@ -910,7 +943,7 @@
                 """
                 #TODO: a test should check that only the owner get the item configuration back
 
-                access_model, item_config, item = item_data
+                item, access_model, item_config = item_data
                 new_item = deepcopy(item)
                 if item_config:
                     new_item.addChild(item_config.toElement())
@@ -920,37 +953,136 @@
                                            set([Subscription(node.nodeIdentifier,
                                                             owner_jid,
                                                             'subscribed')]),
-                                           [getFullItem(item_data) for item_data in items]))
+                                           [getFullItem(item_data) for item_data in items_data]))
+
+            if pep:
+                return self.backend.privilege.notifyPublish(
+                    recipient,
+                    node.nodeIdentifier,
+                    notifications_filtered)
+
+            else:
+                return self.pubsubService.notifyPublish(
+                    self.serviceJID,
+                    node.nodeIdentifier,
+                    notifications_filtered)
+
+        d = self._prepareNotify(items_data, node, data.get('subscription'))
+        d.addCallback(afterPrepare)
+        return d
+
+    def _notifyRetract(self, data):
+        items_data = data['items_data']
+        node = data['node']
+        pep = data['pep']
+        recipient = data['recipient']
 
-            return self.pubsubService.notifyPublish(
-                                                self.serviceJID,
-                                                node.nodeIdentifier,
-                                                notifications_filtered)
+        def afterPrepare(result):
+            owner_jid, notifications_filtered = result
+            #we add the owner
+
+            notifications_filtered.append((owner_jid,
+                                           set([Subscription(node.nodeIdentifier,
+                                                            owner_jid,
+                                                            'subscribed')]),
+                                           [item for item, _, _ in items_data]))
+
+            if pep:
+                return self.backend.privilege.notifyRetract(
+                    recipient,
+                    node.nodeIdentifier,
+                    notifications_filtered)
+
+            else:
+                return self.pubsubService.notifyRetract(
+                    self.serviceJID,
+                    node.nodeIdentifier,
+                    notifications_filtered)
+
+        d = self._prepareNotify(items_data, node, data.get('subscription'))
+        d.addCallback(afterPrepare)
+        return d
 
 
-        if 'subscription' not in data:
-            d1 = self.backend.getNotifications(node.nodeIdentifier, items)
+    def _prepareNotify(self, items_data, node, subscription=None):
+        """Do a bunch of permissions check and filter notifications
+
+        The owner is not added to these notifications,
+        it must be called by the calling method
+        @param items_data(tuple): must contain:
+            - item (domish.Element)
+            - access_model (unicode)
+            - access_list (dict as returned getItemsById, or item_config)
+        @param node(LeafNode): node hosting the items
+        @param subscription(pubsub.Subscription, None): TODO
+
+        @return (tuple): will contain:
+            - notifications_filtered
+            - node_owner_jid
+            - items_data
+        """
+
+        def filterNotifications(result):
+            """Check access of subscriber for each item, and keep only allowed ones"""
+            notifications, (owner_jid,roster) = result
+
+            #we filter items not allowed for the subscribers
+            notifications_filtered = []
+
+            for subscriber, subscriptions, _items_data in notifications:
+                if subscriber == owner_jid:
+                    # as notification is always sent to owner,
+                    # we ignore owner if he is here
+                    continue
+                allowed_items = [] #we keep only item which subscriber can access
+
+                for item, access_model, access_list in _items_data:
+                    if access_model == const.VAL_AMODEL_OPEN:
+                        allowed_items.append(item)
+                    elif access_model == const.VAL_AMODEL_ROSTER:
+                        _subscriber = subscriber.userhostJID()
+                        if not _subscriber in roster:
+                            continue
+                        #the subscriber is known, is he in the right group ?
+                        authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED]
+                        if roster[_subscriber].groups.intersection(authorized_groups):
+                            allowed_items.append(item)
+
+                    else: #unknown access_model
+                        raise NotImplementedError
+
+                if allowed_items:
+                    notifications_filtered.append((subscriber, subscriptions, allowed_items))
+            return (owner_jid, notifications_filtered)
+
+
+        if subscription is None:
+            d1 = self.backend.getNotifications(node.nodeDbId, items_data)
         else:
-            subscription = data['subscription']
             d1 = defer.succeed([(subscription.subscriber, [subscription],
-                                items)])
+                                items_data)])
 
         def _got_owner(owner_jid):
             #return a tuple with owner_jid and roster
+            def rosterEb(failure):
+                log.msg("Error while getting roster: {}".format(failure.value))
+                return (owner_jid, {})
+
             d = self.backend.privilege.getRoster(owner_jid)
-            d.addErrback(self._rosterEb)
+            d.addErrback(rosterEb)
             d.addCallback(lambda roster: (owner_jid,roster))
+            return d
 
         d2 = node.getNodeOwner()
         d2.addCallback(_got_owner)
+        d = defer.gatherResults([d1, d2])
+        d.addCallback(filterNotifications)
+        return d
 
-        d = defer.gatherResults([d1, d2])
-        d.addCallback(_notifyAllowed)
-
-    def _preDelete(self, data):
+    def _preDelete(self, data, pep, recipient):
         nodeIdentifier = data['node'].nodeIdentifier
         redirectURI = data.get('redirectURI', None)
-        d = self.backend.getSubscribers(nodeIdentifier)
+        d = self.backend.getSubscribers(nodeIdentifier, pep, recipient)
         d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
                                                 self.serviceJID,
                                                 nodeIdentifier,
@@ -972,7 +1104,8 @@
 
         raise exc
 
-    def getInfo(self, requestor, service, nodeIdentifier):
+    def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None):
+        return [] # FIXME: disabled for now, need to manage PEP
         if not requestor.resource:
             # this avoid error when getting a disco request from server during namespace delegation
             return []
@@ -1000,10 +1133,11 @@
         return d
 
 
-    def getNodes(self, requestor, service, nodeIdentifier):
+    def getNodes(self, requestor, service, nodeIdentifier, pep=None):
+        return defer.succeed([]) # FIXME: disabled for now, need to manage PEP
         if service.resource:
             return defer.succeed([])
-        d = self.backend.getNodes()
+        d = self.backend.getNodes(pep)
         return d.addErrback(self._mapErrors)
 
 
@@ -1014,20 +1148,33 @@
         if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
             print "Auto-creating node %s" % (request.nodeIdentifier,)
             d = self.backend.createNode(request.nodeIdentifier,
-                                        request.sender)
+                                        request.sender,
+                                        pep=self._isPep(request),
+                                        recipient=request.recipient)
             d.addCallback(lambda ignore,
                                  request: self.backend.publish(request.nodeIdentifier,
                                                                request.items,
-                                                               request.sender),
+                                                               request.sender,
+                                                               self._isPep(request),
+                                                               request.recipient,
+                                                              ),
                           request)
             return d
 
         return failure
 
+    def _isPep(self, request):
+        try:
+            return request.delegated
+        except AttributeError:
+            return False
+
     def publish(self, request):
         d = self.backend.publish(request.nodeIdentifier,
                                  request.items,
-                                 request.sender)
+                                 request.sender,
+                                 self._isPep(request),
+                                 request.recipient)
         d.addErrback(self._publish_errb, request)
         return d.addErrback(self._mapErrors)
 
@@ -1035,56 +1182,74 @@
     def subscribe(self, request):
         d = self.backend.subscribe(request.nodeIdentifier,
                                    request.subscriber,
-                                   request.sender)
+                                   request.sender,
+                                   self._isPep(request),
+                                   request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def unsubscribe(self, request):
         d = self.backend.unsubscribe(request.nodeIdentifier,
                                      request.subscriber,
-                                     request.sender)
+                                     request.sender,
+                                     self._isPep(request),
+                                     request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def subscriptions(self, request):
-        d = self.backend.getSubscriptions(request.sender)
+        d = self.backend.getSubscriptions(self._isPep(request),
+                                          request.sender)
         return d.addErrback(self._mapErrors)
 
 
     def affiliations(self, request):
-        d = self.backend.getAffiliations(request.sender)
+        d = self.backend.getAffiliations(self._isPep(request),
+                                         request.sender)
         return d.addErrback(self._mapErrors)
 
 
     def create(self, request):
         d = self.backend.createNode(request.nodeIdentifier,
-                                    request.sender, request.options)
+                                    request.sender, request.options,
+                                    self._isPep(request),
+                                    request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def default(self, request):
-        d = self.backend.getDefaultConfiguration(request.nodeType)
+        d = self.backend.getDefaultConfiguration(request.nodeType,
+                                                 self._isPep(request),
+                                                 request.sender)
         return d.addErrback(self._mapErrors)
 
 
     def configureGet(self, request):
-        d = self.backend.getNodeConfiguration(request.nodeIdentifier)
+        d = self.backend.getNodeConfiguration(request.nodeIdentifier,
+                                              self._isPep(request),
+                                              request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def configureSet(self, request):
         d = self.backend.setNodeConfiguration(request.nodeIdentifier,
                                               request.options,
-                                              request.sender)
+                                              request.sender,
+                                              self._isPep(request),
+                                              request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def items(self, request):
         ext_data = {}
-        if const.FLAG_ENABLE_RSM:
+        if const.FLAG_ENABLE_RSM and request.rsm is not None:
             ext_data['rsm'] = request.rsm
+        try:
+            ext_data['pep'] = request.delegated
+        except AttributeError:
+            pass
         d = self.backend.getItems(request.nodeIdentifier,
-                                  request.sender,
+                                  request.recipient,
                                   request.maxItems,
                                   request.itemIdentifiers,
                                   ext_data)
@@ -1093,19 +1258,26 @@
     def retract(self, request):
         d = self.backend.retractItem(request.nodeIdentifier,
                                      request.itemIdentifiers,
-                                     request.sender)
+                                     request.sender,
+                                     request.notify,
+                                     self._isPep(request),
+                                     request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def purge(self, request):
         d = self.backend.purgeNode(request.nodeIdentifier,
-                                   request.sender)
+                                   request.sender,
+                                   self._isPep(request),
+                                   request.recipient)
         return d.addErrback(self._mapErrors)
 
 
     def delete(self, request):
         d = self.backend.deleteNode(request.nodeIdentifier,
-                                    request.sender)
+                                    request.sender,
+                                    self._isPep(request),
+                                    request.recipient)
         return d.addErrback(self._mapErrors)
 
 components.registerAdapter(PubSubResourceFromBackend,