diff sat_pubsub/pgsql_storage.py @ 294:df1edebb0466

PEP implementation, draft (huge patch sorry): /!\ database schema has changed ! /!\ - whole PEP behaviour is not managed yet - if the stanza is delegated, PEP is assumed - fixed potential SQL injection in pgsql_storage - publish notifications manage PEP - added retract notifications (if "notify" attribute is present), with PEP handling - a publisher can't replace an item he didn't publised anymore - /!\ schema has changed, sat_pubsub_update_0_1.sql update it - sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author Goffi <goffi@goffi.org>
date Sun, 16 Aug 2015 01:32:42 +0200
parents 002c59dbc23f
children 4115999d85e9
line wrap: on
line diff
--- a/sat_pubsub/pgsql_storage.py	Sun Aug 16 01:15:13 2015 +0200
+++ b/sat_pubsub/pgsql_storage.py	Sun Aug 16 01:32:42 2015 +0200
@@ -72,21 +72,44 @@
 # parseXml manage str, but we get unicode
 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
 
+
+def withPEP(query, values, pep, recipient, pep_table=None):
+    """Helper method to facilitate PEP management
+
+    @param query: SQL query basis
+    @param values: current values to replace in query
+    @param pep: True if we are in PEP mode
+    @param recipient: jid of the recipient
+    @param pep_table: added before pep if table need to be specified
+    @return: query + PEP AND check,
+        recipient's bare jid is added to value if needed
+    """
+    pep_col_name = "{}pep".format(
+                   '' if pep_table is None
+                   else ".{}".format(pep_table))
+    if pep:
+        pep_check="AND {}=%s".format(pep_col_name)
+        values=list(values) + [recipient.userhost()]
+    else:
+        pep_check="AND {} IS NULL".format(pep_col_name)
+    return "{} {}".format(query, pep_check), values
+
+
 class Storage:
 
     implements(iidavoll.IStorage)
 
     defaultConfig = {
             'leaf': {
-                "pubsub#persist_items": True,
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
+                const.OPT_PERSIST_ITEMS: True,
+                const.OPT_DELIVER_PAYLOADS: True,
+                const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
                 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
                 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
             },
             'collection': {
-                "pubsub#deliver_payloads": True,
-                "pubsub#send_last_published_item": 'on_sub',
+                const.OPT_DELIVER_PAYLOADS: True,
+                const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
                 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
                 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
             }
@@ -95,83 +118,119 @@
     def __init__(self, dbpool):
         self.dbpool = dbpool
 
-    def getNode(self, nodeIdentifier):
-        return self.dbpool.runInteraction(self._getNode, nodeIdentifier)
-
-    def _getNode(self, cursor, nodeIdentifier):
+    def _buildNode(self, row):
+        """Build a note class from database result row"""
         configuration = {}
-        cursor.execute("""SELECT node_type,
-                                 persist_items,
-                                 deliver_payloads,
-                                 send_last_published_item,
-                                 access_model,
-                                 publish_model
-                          FROM nodes
-                          WHERE node=%s""",
-                       (nodeIdentifier,))
-        row = cursor.fetchone()
 
         if not row:
             raise error.NodeNotFound()
 
-        if row[0] == 'leaf':
+        if row[2] == 'leaf':
             configuration = {
-                    'pubsub#persist_items': row[1],
-                    'pubsub#deliver_payloads': row[2],
-                    'pubsub#send_last_published_item': row[3],
-                    const.OPT_ACCESS_MODEL:row[4],
-                    const.OPT_PUBLISH_MODEL:row[5],
+                    'pubsub#persist_items': row[3],
+                    'pubsub#deliver_payloads': row[4],
+                    'pubsub#send_last_published_item': row[5],
+                    const.OPT_ACCESS_MODEL:row[6],
+                    const.OPT_PUBLISH_MODEL:row[7],
                     }
-            node = LeafNode(nodeIdentifier, configuration)
+            node = LeafNode(row[0], row[1], configuration)
+            node.dbpool = self.dbpool
+            return node
+        elif row[2] == 'collection':
+            configuration = {
+                    'pubsub#deliver_payloads': row[4],
+                    'pubsub#send_last_published_item': row[5],
+                    const.OPT_ACCESS_MODEL: row[6],
+                    const.OPT_PUBLISH_MODEL:row[7],
+                    }
+            node = CollectionNode(row[0], row[1], configuration)
             node.dbpool = self.dbpool
             return node
-        elif row[0] == 'collection':
-            configuration = {
-                    'pubsub#deliver_payloads': row[2],
-                    'pubsub#send_last_published_item': row[3],
-                    const.OPT_ACCESS_MODEL: row[4],
-                    const.OPT_PUBLISH_MODEL:row[5],
-                    }
-            node = CollectionNode(nodeIdentifier, configuration)
-            node.dbpool = self.dbpool
-            return node
+        else:
+            raise ValueError("Unknown node type !")
+
+    def getNodeById(self, nodeDbId):
+        """Get node using database ID insted of pubsub identifier
+
+        @param nodeDbId(unicode): database ID
+        """
+        return self.dbpool.runInteraction(self._getNodeById, nodeDbId)
 
 
+    def _getNodeById(self, cursor, nodeDbId):
+        cursor.execute("""SELECT node_id,
+                                 node,
+                                 node_type,
+                                 persist_items,
+                                 deliver_payloads,
+                                 send_last_published_item,
+                                 access_model,
+                                 publish_model,
+                                 pep
+                            FROM nodes
+                            WHERE node_id=%s""",
+                       (nodeDbId,))
+        row = cursor.fetchone()
+        return self._buildNode(row)
 
-    def getNodeIds(self):
-        d = self.dbpool.runQuery("""SELECT node from nodes""")
+    def getNode(self, nodeIdentifier, pep, recipient=None):
+        return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient)
+
+
+    def _getNode(self, cursor, nodeIdentifier, pep, recipient):
+        cursor.execute(*withPEP("""SELECT node_id,
+                                          node,
+                                          node_type,
+                                          persist_items,
+                                          deliver_payloads,
+                                          send_last_published_item,
+                                          access_model,
+                                          publish_model,
+                                          pep
+                                   FROM nodes
+                                   WHERE node=%s""",
+                              (nodeIdentifier,), pep, recipient))
+        row = cursor.fetchone()
+        return self._buildNode(row)
+
+    def getNodeIds(self, pep):
+        d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL"""
+                                    .format("NOT " if pep else ""))
         d.addCallback(lambda results: [r[0] for r in results])
         return d
 
 
-    def createNode(self, nodeIdentifier, owner, config):
+    def createNode(self, nodeIdentifier, owner, config, pep, recipient=None):
         return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
-                                           owner, config)
+                                           owner, config, pep, recipient)
 
 
-    def _createNode(self, cursor, nodeIdentifier, owner, config):
+    def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient):
         if config['pubsub#node_type'] != 'leaf':
             raise error.NoCollections()
 
         owner = owner.userhost()
+
         try:
             cursor.execute("""INSERT INTO nodes
                               (node, node_type, persist_items,
-                               deliver_payloads, send_last_published_item, access_model, publish_model)
+                               deliver_payloads, send_last_published_item, access_model, publish_model, pep)
                               VALUES
-                              (%s, 'leaf', %s, %s, %s, %s, %s)""",
+                              (%s, 'leaf', %s, %s, %s, %s, %s, %s)""",
                            (nodeIdentifier,
                             config['pubsub#persist_items'],
                             config['pubsub#deliver_payloads'],
                             config['pubsub#send_last_published_item'],
                             config[const.OPT_ACCESS_MODEL],
                             config[const.OPT_PUBLISH_MODEL],
+                            recipient.userhost() if pep else None
                             )
                            )
         except cursor._pool.dbapi.IntegrityError:
             raise error.NodeExists()
 
-        cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,));
+        cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""",
+                                (nodeIdentifier,), pep, recipient));
         node_id = cursor.fetchone()[0]
 
         cursor.execute("""SELECT 1 as bool from entities where jid=%s""",
@@ -210,39 +269,49 @@
                 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname)
                                   VALUES (%s,%s)""" , (node_id, group))
 
+    def deleteNodeByDbId(self, db_id):
+        """Delete a node using directly its database id"""
+        return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id)
 
-    def deleteNode(self, nodeIdentifier):
-        return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier)
+    def _deleteNodeByDbId(self, cursor, db_id):
+        cursor.execute("""DELETE FROM nodes WHERE node_id=%s""",
+                       (db_id,))
+
+        if cursor.rowcount != 1:
+            raise error.NodeNotFound()
+
+    def deleteNode(self, nodeIdentifier, pep, recipient=None):
+        return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient)
 
 
-    def _deleteNode(self, cursor, nodeIdentifier):
-        cursor.execute("""DELETE FROM nodes WHERE node=%s""",
-                       (nodeIdentifier,))
+    def _deleteNode(self, cursor, nodeIdentifier, pep, recipient):
+        cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""",
+                                (nodeIdentifier,), pep, recipient))
 
         if cursor.rowcount != 1:
             raise error.NodeNotFound()
 
-    def getNodeGroups(self, nodeIdentifier):
-        return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier)
+    def getNodeGroups(self, nodeIdentifier, pep, recipient=None):
+        return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier, pep, recipient)
 
-    def _getNodeGroups(self, cursor, nodeIdentifier):
-        cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
-                       (nodeIdentifier,))
+    def _getNodeGroups(self, cursor, nodeIdentifier, pep, recipient):
+        cursor.execute(*withPEP("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
+                                (nodeIdentifier,), pep, recipient))
         rows = cursor.fetchall()
 
         return [row[0] for row in rows]
 
-    def getAffiliations(self, entity):
-        d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
+    def getAffiliations(self, entity, pep, recipient=None):
+        d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities
                                         NATURAL JOIN affiliations
                                         NATURAL JOIN nodes
                                         WHERE jid=%s""",
-                                     (entity.userhost(),))
+                                     (entity.userhost(),), pep, recipient, 'nodes'))
         d.addCallback(lambda results: [tuple(r) for r in results])
         return d
 
 
-    def getSubscriptions(self, entity):
+    def getSubscriptions(self, entity, pep, recipient=None):
         def toSubscriptions(rows):
             subscriptions = []
             for row in rows:
@@ -256,8 +325,8 @@
                                      FROM entities
                                      NATURAL JOIN subscriptions
                                      NATURAL JOIN nodes
-                                     WHERE jid=%s""",
-                                  (entity.userhost(),))
+                                     WHERE jid=%s AND nodes.pep=%s""",
+                                  (entity.userhost(), recipient.userhost() if pep else None))
         d.addCallback(toSubscriptions)
         return d
 
@@ -271,15 +340,16 @@
 
     implements(iidavoll.INode)
 
-    def __init__(self, nodeIdentifier, config):
+    def __init__(self, nodeDbId, nodeIdentifier, config):
+        self.nodeDbId = nodeDbId
         self.nodeIdentifier = nodeIdentifier
         self._config = config
         self.owner = None;
 
 
     def _checkNodeExists(self, cursor):
-        cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""",
-                       (self.nodeIdentifier,))
+        cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""",
+                       (self.nodeDbId,))
         if not cursor.fetchone():
             raise error.NodeNotFound()
 
@@ -290,7 +360,7 @@
     def getNodeOwner(self):
         if self.owner:
             return defer.succeed(self.owner)
-        d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node=%s""", (self.nodeIdentifier,))
+        d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s""", (self.nodeDbId,))
         d.addCallback(lambda result: jid.JID(result[0][0]))
         return d
 
@@ -315,12 +385,16 @@
         self._checkNodeExists(cursor)
         cursor.execute("""UPDATE nodes SET persist_items=%s,
                                            deliver_payloads=%s,
-                                           send_last_published_item=%s
-                          WHERE node=%s""",
-                       (config["pubsub#persist_items"],
-                        config["pubsub#deliver_payloads"],
-                        config["pubsub#send_last_published_item"],
-                        self.nodeIdentifier))
+                                           send_last_published_item=%s,
+                                           access_model=%s,
+                                           publish_model=%s
+                          WHERE node_id=%s""",
+                       (config[const.OPT_PERSIST_ITEMS],
+                        config[const.OPT_DELIVER_PAYLOADS],
+                        config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
+                        config[const.OPT_ACCESS_MODEL],
+                        config[const.OPT_PUBLISH_MODEL],
+                        self.nodeDbId))
 
 
     def _setCachedConfiguration(self, void, config):
@@ -342,8 +416,8 @@
         cursor.execute("""SELECT affiliation FROM affiliations
                           NATURAL JOIN nodes
                           NATURAL JOIN entities
-                          WHERE node=%s AND jid=%s""",
-                       (self.nodeIdentifier,
+                          WHERE node_id=%s AND jid=%s""",
+                       (self.nodeDbId,
                         entity.userhost()))
 
         try:
@@ -351,19 +425,9 @@
         except TypeError:
             return None
 
+
     def getAccessModel(self):
-        return self.dbpool.runInteraction(self._getAccessModel)
-
-    def _getAccessModel(self, cursor, entity):
-        self._checkNodeExists(cursor)
-        cursor.execute("""SELECT access_model FROM nodes
-                          WHERE node=%s""",
-                       (self.nodeIdentifier,))
-
-        try:
-            return cursor.fetchone()[0]
-        except TypeError:
-            return None
+        return self._config[const.OPT_ACCESS_MODEL]
 
 
     def getSubscription(self, subscriber):
@@ -379,8 +443,8 @@
         cursor.execute("""SELECT state FROM subscriptions
                           NATURAL JOIN nodes
                           NATURAL JOIN entities
-                          WHERE node=%s AND jid=%s AND resource=%s""",
-                       (self.nodeIdentifier,
+                          WHERE node_id=%s AND jid=%s AND resource=%s""",
+                       (self.nodeDbId,
                         userhost,
                         resource))
 
@@ -398,13 +462,13 @@
     def _getSubscriptions(self, cursor, state):
         self._checkNodeExists(cursor)
 
-        query = """SELECT jid, resource, state,
+        query = """SELECT node, jid, resource, state,
                           subscription_type, subscription_depth
                    FROM subscriptions
                    NATURAL JOIN nodes
                    NATURAL JOIN entities
-                   WHERE node=%s"""
-        values = [self.nodeIdentifier]
+                   WHERE node_id=%s"""
+        values = [self.nodeDbId]
 
         if state:
             query += " AND state=%s"
@@ -415,16 +479,16 @@
 
         subscriptions = []
         for row in rows:
-            subscriber = jid.JID(u'%s/%s' % (row[0], row[1]))
+            subscriber = jid.JID(u'%s/%s' % (row[1], row[2]))
 
             options = {}
-            if row[3]:
-                options['pubsub#subscription_type'] = row[3];
             if row[4]:
-                options['pubsub#subscription_depth'] = row[4];
+                options['pubsub#subscription_type'] = row[4];
+            if row[5]:
+                options['pubsub#subscription_depth'] = row[5];
 
-            subscriptions.append(Subscription(self.nodeIdentifier, subscriber,
-                                              row[2], options))
+            subscriptions.append(Subscription(row[0], subscriber,
+                                              row[3], options))
 
         return subscriptions
 
@@ -453,17 +517,14 @@
             cursor.execute("""INSERT INTO subscriptions
                               (node_id, entity_id, resource, state,
                                subscription_type, subscription_depth)
-                              SELECT node_id, entity_id, %s, %s, %s, %s FROM
-                              (SELECT node_id FROM nodes
-                                              WHERE node=%s) as n
-                              CROSS JOIN
+                              SELECT %s, entity_id, %s, %s, %s, %s FROM
                               (SELECT entity_id FROM entities
-                                                WHERE jid=%s) as e""",
-                           (resource,
+                                                WHERE jid=%s) AS ent_id""",
+                           (self.nodeDbId,
+                            resource,
                             state,
                             subscription_type,
                             subscription_depth,
-                            self.nodeIdentifier,
                             userhost))
         except cursor._pool.dbapi.IntegrityError:
             raise error.SubscriptionExists()
@@ -481,12 +542,11 @@
         resource = subscriber.resource or ''
 
         cursor.execute("""DELETE FROM subscriptions WHERE
-                          node_id=(SELECT node_id FROM nodes
-                                                  WHERE node=%s) AND
+                          node_id=%s AND
                           entity_id=(SELECT entity_id FROM entities
                                                       WHERE jid=%s) AND
                           resource=%s""",
-                       (self.nodeIdentifier,
+                       (self.nodeDbId,
                         userhost,
                         resource))
         if cursor.rowcount != 1:
@@ -506,9 +566,9 @@
                           NATURAL JOIN subscriptions
                           NATURAL JOIN nodes
                           WHERE entities.jid=%s
-                          AND node=%s AND state='subscribed'""",
+                          AND node_id=%s AND state='subscribed'""",
                        (entity.userhost(),
-                       self.nodeIdentifier))
+                       self.nodeDbId))
 
         return cursor.fetchone() is not None
 
@@ -523,8 +583,8 @@
         cursor.execute("""SELECT jid, affiliation FROM nodes
                           NATURAL JOIN affiliations
                           NATURAL JOIN entities
-                          WHERE node=%s""",
-                       (self.nodeIdentifier,))
+                          WHERE node_id=%s""",
+                       (self.nodeDbId,))
         result = cursor.fetchall()
 
         return [(jid.internJID(r[0]), r[1]) for r in result]
@@ -541,36 +601,37 @@
         return self.dbpool.runInteraction(self._storeItems, item_data, publisher)
 
 
-    def _storeItems(self, cursor, item_data, publisher):
+    def _storeItems(self, cursor, items_data, publisher):
         self._checkNodeExists(cursor)
-        for item_datum in item_data:
-            self._storeItem(cursor, item_datum, publisher)
+        for item_data in items_data:
+            self._storeItem(cursor, item_data, publisher)
 
 
-    def _storeItem(self, cursor, item_datum, publisher):
-        access_model, item_config, item = item_datum
+    def _storeItem(self, cursor, item_data, publisher):
+        item, access_model, item_config = item_data
         data = item.toXml()
 
         cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
                           FROM nodes
                           WHERE nodes.node_id = items.node_id AND
-                                nodes.node = %s and items.item=%s""",
+                                nodes.node_id = %s and items.item=%s""",
                        (publisher.full(),
                         data,
-                        self.nodeIdentifier,
+                        self.nodeDbId,
                         item["id"]))
         if cursor.rowcount == 1:
             return
 
         cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model)
-                          SELECT node_id, %s, %s, %s, %s FROM nodes
-                                                     WHERE node=%s
+                          SELECT %s, %s, %s, %s, %s FROM nodes
+                                                     WHERE node_id=%s
                                                      RETURNING item_id""",
-                       (item["id"],
+                       (self.nodeDbId,
+                        item["id"],
                         publisher.full(),
                         data,
                         access_model,
-                        self.nodeIdentifier))
+                        self.nodeDbId))
 
         if access_model == const.VAL_AMODEL_ROSTER:
             item_id = cursor.fetchone()[0];
@@ -596,10 +657,9 @@
 
         for itemIdentifier in itemIdentifiers:
             cursor.execute("""DELETE FROM items WHERE
-                              node_id=(SELECT node_id FROM nodes
-                                                      WHERE node=%s) AND
+                              node_id=%s AND
                               item=%s""",
-                           (self.nodeIdentifier,
+                           (self.nodeDbId,
                             itemIdentifier))
 
             if cursor.rowcount:
@@ -623,40 +683,44 @@
         return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data)
 
     def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data):
+        #  FIXME: simplify the query construction
         self._checkNodeExists(cursor)
 
         if unrestricted:
             query = ["SELECT data,items.access_model,item_id"]
             source = """FROM nodes
                        INNER JOIN items USING (node_id)
-                       WHERE node=%s"""
-            args = [self.nodeIdentifier]
+                       WHERE node_id=%s"""
+            args = [self.nodeDbId]
         else:
             query = ["SELECT data"]
             groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else ""
             source = """FROM nodes
                        INNER JOIN items USING (node_id)
                        LEFT JOIN item_groups_authorized USING (item_id)
-                       WHERE node=%s AND
+                       WHERE node_id=%s AND
                        (items.access_model='open'""" + groups + ")"
 
-            args = [self.nodeIdentifier]
+            args = [self.nodeDbId]
             if authorized_groups:
                 args.append(authorized_groups)
 
         if 'filters' in ext_data:  # MAM filters
             for filter_ in ext_data['filters']:
                 if filter_.var == 'start':
-                    source += " AND date>='{date}'".format(date=filter_.value)
+                    source += " AND date>=%s"
+                    args.append(filter_.value)
                 if filter_.var == 'end':
-                    source += " AND date<='{date}'".format(date=filter_.value)
+                    source += " AND date<=%s"
+                    args.append(filter_.value)
                 if filter_.var == 'with':
                     jid_s = filter_.value
                     if '/' in jid_s:
-                        source += " AND publisher='{pub}'".format(pub=filter_.value)
-                    else:  # assume the publisher field in DB is always a full JID
-                        # XXX: need to escape the % with itself to avoid formatting error
-                        source += " AND publisher LIKE '{pub}/%%'".format(pub=filter_.value)
+                        source += " AND publisher=%s"
+                        args.append(filter_.value)
+                    else:
+                        source += " AND publisher LIKE %s"
+                        args.append(u"{}%".format(filter_.value))
 
         query.append(source)
         order = "DESC"
@@ -666,7 +730,9 @@
             maxItems = rsm.max
             if rsm.index is not None:
                 query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)")
-                args.append(self.nodeIdentifier)
+                # FIXME: change the request so source is not used 2 times
+                # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args
+                args.append(self.nodeDbId)
                 if authorized_groups:
                     args.append(authorized_groups)
                 args.append(rsm.index)
@@ -694,11 +760,10 @@
                 item = generic.stripNamespace(parseXml(data[0]))
                 access_model = data[1]
                 item_id = data[2]
-                if access_model == 'roster': #TODO: jid access_model
+                access_list = {}
+                if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
                     cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
-                    access_list = [r[0] for r in cursor.fetchall()]
-                else:
-                    access_list = None
+                    access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
 
                 ret.append((item, access_model, access_list))
             return ret
@@ -720,18 +785,18 @@
         if unrestricted:
             query = ["""SELECT count(item_id) FROM nodes
                        INNER JOIN items USING (node_id)
-                       WHERE node=%s"""]
-            args = [self.nodeIdentifier]
+                       WHERE node_id=%s"""]
+            args = [self.nodeDbId]
         else:
             query = ["""SELECT count(item_id) FROM nodes
                        INNER  JOIN items USING (node_id)
                        LEFT JOIN item_groups_authorized USING (item_id)
-                       WHERE node=%s AND
+                       WHERE node_id=%s AND
                        (items.access_model='open' """ +
                        ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
                        ")"]
 
-            args = [self.nodeIdentifier]
+            args = [self.nodeDbId]
             if authorized_groups:
                 args.append(authorized_groups)
 
@@ -755,22 +820,22 @@
             query = ["""SELECT row_number FROM (
                        SELECT row_number() OVER (ORDER BY date DESC), item
                        FROM nodes INNER JOIN items USING (node_id)
-                       WHERE node=%s
+                       WHERE node_id=%s
                        ) as x
                        WHERE item=%s LIMIT 1"""]
-            args = [self.nodeIdentifier]
+            args = [self.nodeDbId]
         else:
             query = ["""SELECT row_number FROM (
                        SELECT row_number() OVER (ORDER BY date DESC), item
                        FROM nodes INNER JOIN items USING (node_id)
                        LEFT JOIN item_groups_authorized USING (item_id)
-                       WHERE node=%s AND
+                       WHERE node_id=%s AND
                        (items.access_model='open' """ +
                        ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
                        """)) as x
                        WHERE item=%s LIMIT 1"""]
 
-            args = [self.nodeIdentifier]
+            args = [self.nodeDbId]
             if authorized_groups:
                 args.append(authorized_groups)
 
@@ -784,7 +849,8 @@
         @param authorized_groups: we want to get items that these groups can access
         @param unrestricted: if true, don't check permissions
         @param itemIdentifiers: list of ids of the items we want to get
-        @return: list of (item, access_model, access_model) if unrestricted is True, else list of items
+        @return: list of (item, access_model, access_list) if unrestricted is True, else list of items
+            access_list is managed as a dictionnary with same key as for item_config
         """
         return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers)
 
@@ -796,41 +862,66 @@
             for itemIdentifier in itemIdentifiers:
                 cursor.execute("""SELECT data,items.access_model,item_id FROM nodes
                                   INNER JOIN items USING (node_id)
-                                  WHERE node=%s AND item=%s""",
-                               (self.nodeIdentifier,
+                                  WHERE node_id=%s AND item=%s""",
+                               (self.nodeDbId,
                                 itemIdentifier))
                 result = cursor.fetchone()
-                if result:
-                    for data in result:
-                        item = generic.stripNamespace(parseXml(data[0]))
-                        access_model = data[1]
-                        item_id = data[2]
-                        if access_model == 'roster': #TODO: jid access_model
-                            cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
-                            access_list = [r[0] for r in cursor.fetchall()]
-                        else:
-                            access_list = None
+                if not result:
+                    raise error.ItemNotFound()
 
-                        ret.append((item, access_model, access_list))
+                item = generic.stripNamespace(parseXml(result[0]))
+                access_model = result[1]
+                item_id = result[2]
+                access_list = {}
+                if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
+                    cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
+                    access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
+
+                ret.append((item, access_model, access_list))
         else: #we check permission before returning items
             for itemIdentifier in itemIdentifiers:
-                args = [self.nodeIdentifier, itemIdentifier]
+                args = [self.nodeDbId, itemIdentifier]
                 if authorized_groups:
                     args.append(authorized_groups)
                 cursor.execute("""SELECT data FROM nodes
                            INNER  JOIN items USING (node_id)
                            LEFT JOIN item_groups_authorized USING (item_id)
-                           WHERE node=%s AND item=%s AND
+                           WHERE node_id=%s AND item=%s AND
                            (items.access_model='open' """ +
                            ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")",
                            args)
 
                 result = cursor.fetchone()
                 if result:
-                    ret.append(parseXml(result[0]))
+                    ret.append(generic.stripNamespace(parseXml(result[0])))
 
         return ret
 
+
+    def getItemsPublishers(self, itemIdentifiers):
+        """Get the publishers for all given identifiers
+
+        @return (dict): map of itemIdentifiers to publisher
+        """
+        return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers)
+
+
+    def _getItemsPublishers(self, cursor, itemIdentifiers):
+        self._checkNodeExists(cursor)
+        ret = {}
+        for itemIdentifier in itemIdentifiers:
+            cursor.execute("""SELECT publisher FROM items
+                              WHERE item=%s""",
+                            (itemIdentifier,))
+            result = cursor.fetchone()
+            if not result:
+                # We have an internal error, that's why we use ValueError
+                # and not error.ItemNotFound()
+                raise ValueError() # itemIdentifier must exists
+            ret[itemIdentifier] = jid.JID(result[0])
+        return ret
+
+
     def purge(self):
         return self.dbpool.runInteraction(self._purge)
 
@@ -839,23 +930,23 @@
         self._checkNodeExists(cursor)
 
         cursor.execute("""DELETE FROM items WHERE
-                          node_id=(SELECT node_id FROM nodes WHERE node=%s)""",
-                       (self.nodeIdentifier,))
+                          node_id=%s""",
+                       (self.nodeDbId,))
 
-
-    def filterItemsWithPublisher(self, itemIdentifiers, requestor):
-        return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, requestor)
+   # FIXME: to be checked
+   #  def filterItemsWithPublisher(self, itemIdentifiers, recipient):
+   #      return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, recipient)
 
-    def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor):
-        self._checkNodeExists(cursor)
-        ret = []
-        for itemIdentifier in itemIdentifiers:
-            args = ["%s/%%" % requestor.userhost(), itemIdentifier]
-            cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args)
-            result = cursor.fetchone()
-            if result:
-                ret.append(result[0])
-        return ret
+   #  def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor):
+   #      self._checkNodeExists(cursor)
+   #      ret = []
+   #      for itemIdentifier in itemIdentifiers:
+   #          args = ["%s/%%" % requestor.userhost(), itemIdentifier]
+   #          cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args)
+   #          result = cursor.fetchone()
+   #          if result:
+   #              ret.append(result[0])
+   #      return ret
 
 class CollectionNode(Node):