view sat_pubsub/pgsql_storage.py @ 306:ee89e4ed47fd

Added tag 0.2.0 for changeset 642dffb9d6f1
author Goffi <goffi@goffi.org>
date Mon, 07 Dec 2015 19:20:27 +0100
parents 05c875a13a62
children 087b705493a6
line wrap: on
line source

#!/usr/bin/python
#-*- coding: utf-8 -*-

"""
Copyright (c) 2003-2011 Ralph Meijer
Copyright (c) 2012, 2013, 2014, 2015 Jérôme Poisson


This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
--

This program is based on Idavoll (http://idavoll.ik.nu/),
originaly written by Ralph Meijer (http://ralphm.net/blog/)
It is sublicensed under AGPL v3 (or any later version) as allowed by the original
license.

--

Here is a copy of the original license:

Copyright (c) 2003-2011 Ralph Meijer

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

"""

import copy, logging

from zope.interface import implements

from twisted.internet import defer
from twisted.words.protocols.jabber import jid

from wokkel import generic
from wokkel.pubsub import Subscription

from sat_pubsub import error, iidavoll, const
import psycopg2
import psycopg2.extensions
# we wants psycopg2 to return us unicode, not str
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)

# parseXml manage str, but we get unicode
parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))


def withPEP(query, values, pep, recipient, pep_table=None):
    """Helper method to facilitate PEP management

    @param query: SQL query basis
    @param values: current values to replace in query
    @param pep: True if we are in PEP mode
    @param recipient: jid of the recipient
    @param pep_table: added before pep if table need to be specified
    @return: query + PEP AND check,
        recipient's bare jid is added to value if needed
    """
    pep_col_name = "{}pep".format(
                   '' if pep_table is None
                   else ".{}".format(pep_table))
    if pep:
        pep_check="AND {}=%s".format(pep_col_name)
        values=list(values) + [recipient.userhost()]
    else:
        pep_check="AND {} IS NULL".format(pep_col_name)
    return "{} {}".format(query, pep_check), values


class Storage:

    implements(iidavoll.IStorage)

    defaultConfig = {
            'leaf': {
                const.OPT_PERSIST_ITEMS: True,
                const.OPT_DELIVER_PAYLOADS: True,
                const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
                const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
                const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
            },
            'collection': {
                const.OPT_DELIVER_PAYLOADS: True,
                const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
                const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
                const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
            }
    }

    def __init__(self, dbpool):
        self.dbpool = dbpool

    def _buildNode(self, row):
        """Build a note class from database result row"""
        configuration = {}

        if not row:
            raise error.NodeNotFound()

        if row[2] == 'leaf':
            configuration = {
                    'pubsub#persist_items': row[3],
                    'pubsub#deliver_payloads': row[4],
                    'pubsub#send_last_published_item': row[5],
                    const.OPT_ACCESS_MODEL:row[6],
                    const.OPT_PUBLISH_MODEL:row[7],
                    }
            node = LeafNode(row[0], row[1], configuration)
            node.dbpool = self.dbpool
            return node
        elif row[2] == 'collection':
            configuration = {
                    'pubsub#deliver_payloads': row[4],
                    'pubsub#send_last_published_item': row[5],
                    const.OPT_ACCESS_MODEL: row[6],
                    const.OPT_PUBLISH_MODEL:row[7],
                    }
            node = CollectionNode(row[0], row[1], configuration)
            node.dbpool = self.dbpool
            return node
        else:
            raise ValueError("Unknown node type !")

    def getNodeById(self, nodeDbId):
        """Get node using database ID insted of pubsub identifier

        @param nodeDbId(unicode): database ID
        """
        return self.dbpool.runInteraction(self._getNodeById, nodeDbId)


    def _getNodeById(self, cursor, nodeDbId):
        cursor.execute("""SELECT node_id,
                                 node,
                                 node_type,
                                 persist_items,
                                 deliver_payloads,
                                 send_last_published_item,
                                 access_model,
                                 publish_model,
                                 pep
                            FROM nodes
                            WHERE node_id=%s""",
                       (nodeDbId,))
        row = cursor.fetchone()
        return self._buildNode(row)

    def getNode(self, nodeIdentifier, pep, recipient=None):
        return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient)


    def _getNode(self, cursor, nodeIdentifier, pep, recipient):
        cursor.execute(*withPEP("""SELECT node_id,
                                          node,
                                          node_type,
                                          persist_items,
                                          deliver_payloads,
                                          send_last_published_item,
                                          access_model,
                                          publish_model,
                                          pep
                                   FROM nodes
                                   WHERE node=%s""",
                              (nodeIdentifier,), pep, recipient))
        row = cursor.fetchone()
        return self._buildNode(row)

    def getNodeIds(self, pep):
        d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL"""
                                    .format("NOT " if pep else ""))
        d.addCallback(lambda results: [r[0] for r in results])
        return d


    def createNode(self, nodeIdentifier, owner, config, pep, recipient=None):
        return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
                                           owner, config, pep, recipient)


    def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient):
        if config['pubsub#node_type'] != 'leaf':
            raise error.NoCollections()

        owner = owner.userhost()

        try:
            cursor.execute("""INSERT INTO nodes
                              (node, node_type, persist_items,
                               deliver_payloads, send_last_published_item, access_model, publish_model, pep)
                              VALUES
                              (%s, 'leaf', %s, %s, %s, %s, %s, %s)""",
                           (nodeIdentifier,
                            config['pubsub#persist_items'],
                            config['pubsub#deliver_payloads'],
                            config['pubsub#send_last_published_item'],
                            config[const.OPT_ACCESS_MODEL],
                            config[const.OPT_PUBLISH_MODEL],
                            recipient.userhost() if pep else None
                            )
                           )
        except cursor._pool.dbapi.IntegrityError:
            raise error.NodeExists()

        cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""",
                                (nodeIdentifier,), pep, recipient));
        node_id = cursor.fetchone()[0]

        cursor.execute("""SELECT 1 as bool from entities where jid=%s""",
                       (owner,))

        if not cursor.fetchone():
            # XXX: we can NOT rely on the previous query! Commit is needed now because
            # if the entry exists the next query will leave the database in a corrupted
            # state: the solution is to rollback. I tried with other methods like
            # "WHERE NOT EXISTS" but none of them worked, so the following solution
            # looks like the sole - unless you have auto-commit on. More info
            # about this issue: http://cssmay.com/question/tag/tag-psycopg2
            cursor._connection.commit()
            try:
                cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                               (owner,))
            except psycopg2.IntegrityError as e:
                cursor._connection.rollback()
                logging.warning("during node creation: %s" % e.message)

        cursor.execute("""INSERT INTO affiliations
                          (node_id, entity_id, affiliation)
                          SELECT %s, entity_id, 'owner' FROM
                          (SELECT entity_id FROM entities
                                            WHERE jid=%s) as e""",
                       (node_id, owner))

        #TODO: manage JID access
        if config[const.OPT_ACCESS_MODEL] == const.VAL_AMODEL_ROSTER:
            if const.OPT_ROSTER_GROUPS_ALLOWED in config:
                allowed_groups = config[const.OPT_ROSTER_GROUPS_ALLOWED]
            else:
                allowed_groups = []
            for group in allowed_groups:
                #TODO: check that group are actually in roster
                cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname)
                                  VALUES (%s,%s)""" , (node_id, group))

    def deleteNodeByDbId(self, db_id):
        """Delete a node using directly its database id"""
        return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id)

    def _deleteNodeByDbId(self, cursor, db_id):
        cursor.execute("""DELETE FROM nodes WHERE node_id=%s""",
                       (db_id,))

        if cursor.rowcount != 1:
            raise error.NodeNotFound()

    def deleteNode(self, nodeIdentifier, pep, recipient=None):
        return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient)


    def _deleteNode(self, cursor, nodeIdentifier, pep, recipient):
        cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""",
                                (nodeIdentifier,), pep, recipient))

        if cursor.rowcount != 1:
            raise error.NodeNotFound()

    def getNodeGroups(self, nodeIdentifier, pep, recipient=None):
        return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier, pep, recipient)

    def _getNodeGroups(self, cursor, nodeIdentifier, pep, recipient):
        cursor.execute(*withPEP("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
                                (nodeIdentifier,), pep, recipient))
        rows = cursor.fetchall()

        return [row[0] for row in rows]

    def getAffiliations(self, entity, pep, recipient=None):
        d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities
                                        NATURAL JOIN affiliations
                                        NATURAL JOIN nodes
                                        WHERE jid=%s""",
                                     (entity.userhost(),), pep, recipient, 'nodes'))
        d.addCallback(lambda results: [tuple(r) for r in results])
        return d


    def getSubscriptions(self, entity, pep, recipient=None):
        def toSubscriptions(rows):
            subscriptions = []
            for row in rows:
                subscriber = jid.internJID('%s/%s' % (row[1],
                                                      row[2]))
                subscription = Subscription(row[0], subscriber, row[3])
                subscriptions.append(subscription)
            return subscriptions

        d = self.dbpool.runQuery("""SELECT node, jid, resource, state
                                     FROM entities
                                     NATURAL JOIN subscriptions
                                     NATURAL JOIN nodes
                                     WHERE jid=%s AND nodes.pep=%s""",
                                  (entity.userhost(), recipient.userhost() if pep else None))
        d.addCallback(toSubscriptions)
        return d


    def getDefaultConfiguration(self, nodeType):
        return self.defaultConfig[nodeType]



class Node:

    implements(iidavoll.INode)

    def __init__(self, nodeDbId, nodeIdentifier, config):
        self.nodeDbId = nodeDbId
        self.nodeIdentifier = nodeIdentifier
        self._config = config
        self.owner = None;


    def _checkNodeExists(self, cursor):
        cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""",
                       (self.nodeDbId,))
        if not cursor.fetchone():
            raise error.NodeNotFound()


    def getType(self):
        return self.nodeType

    def getNodeOwner(self):
        if self.owner:
            return defer.succeed(self.owner)
        d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s""", (self.nodeDbId,))
        d.addCallback(lambda result: jid.JID(result[0][0]))
        return d


    def getConfiguration(self):
        return self._config


    def setConfiguration(self, options):
        config = copy.copy(self._config)

        for option in options:
            if option in config:
                config[option] = options[option]

        d = self.dbpool.runInteraction(self._setConfiguration, config)
        d.addCallback(self._setCachedConfiguration, config)
        return d


    def _setConfiguration(self, cursor, config):
        self._checkNodeExists(cursor)
        cursor.execute("""UPDATE nodes SET persist_items=%s,
                                           deliver_payloads=%s,
                                           send_last_published_item=%s,
                                           access_model=%s,
                                           publish_model=%s
                          WHERE node_id=%s""",
                       (config[const.OPT_PERSIST_ITEMS],
                        config[const.OPT_DELIVER_PAYLOADS],
                        config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
                        config[const.OPT_ACCESS_MODEL],
                        config[const.OPT_PUBLISH_MODEL],
                        self.nodeDbId))


    def _setCachedConfiguration(self, void, config):
        self._config = config


    def getMetaData(self):
        config = copy.copy(self._config)
        config["pubsub#node_type"] = self.nodeType
        return config


    def getAffiliation(self, entity):
        return self.dbpool.runInteraction(self._getAffiliation, entity)


    def _getAffiliation(self, cursor, entity):
        self._checkNodeExists(cursor)
        cursor.execute("""SELECT affiliation FROM affiliations
                          NATURAL JOIN nodes
                          NATURAL JOIN entities
                          WHERE node_id=%s AND jid=%s""",
                       (self.nodeDbId,
                        entity.userhost()))

        try:
            return cursor.fetchone()[0]
        except TypeError:
            return None


    def getAccessModel(self):
        return self._config[const.OPT_ACCESS_MODEL]


    def getSubscription(self, subscriber):
        return self.dbpool.runInteraction(self._getSubscription, subscriber)


    def _getSubscription(self, cursor, subscriber):
        self._checkNodeExists(cursor)

        userhost = subscriber.userhost()
        resource = subscriber.resource or ''

        cursor.execute("""SELECT state FROM subscriptions
                          NATURAL JOIN nodes
                          NATURAL JOIN entities
                          WHERE node_id=%s AND jid=%s AND resource=%s""",
                       (self.nodeDbId,
                        userhost,
                        resource))

        row = cursor.fetchone()
        if not row:
            return None
        else:
            return Subscription(self.nodeIdentifier, subscriber, row[0])


    def getSubscriptions(self, state=None):
        return self.dbpool.runInteraction(self._getSubscriptions, state)


    def _getSubscriptions(self, cursor, state):
        self._checkNodeExists(cursor)

        query = """SELECT node, jid, resource, state,
                          subscription_type, subscription_depth
                   FROM subscriptions
                   NATURAL JOIN nodes
                   NATURAL JOIN entities
                   WHERE node_id=%s"""
        values = [self.nodeDbId]

        if state:
            query += " AND state=%s"
            values.append(state)

        cursor.execute(query, values)
        rows = cursor.fetchall()

        subscriptions = []
        for row in rows:
            subscriber = jid.JID(u'%s/%s' % (row[1], row[2]))

            options = {}
            if row[4]:
                options['pubsub#subscription_type'] = row[4];
            if row[5]:
                options['pubsub#subscription_depth'] = row[5];

            subscriptions.append(Subscription(row[0], subscriber,
                                              row[3], options))

        return subscriptions


    def addSubscription(self, subscriber, state, config):
        return self.dbpool.runInteraction(self._addSubscription, subscriber,
                                          state, config)


    def _addSubscription(self, cursor, subscriber, state, config):
        self._checkNodeExists(cursor)

        userhost = subscriber.userhost()
        resource = subscriber.resource or ''

        subscription_type = config.get('pubsub#subscription_type')
        subscription_depth = config.get('pubsub#subscription_depth')

        try:
            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
                           (userhost,))
        except cursor._pool.dbapi.IntegrityError:
            cursor._connection.rollback()

        try:
            cursor.execute("""INSERT INTO subscriptions
                              (node_id, entity_id, resource, state,
                               subscription_type, subscription_depth)
                              SELECT %s, entity_id, %s, %s, %s, %s FROM
                              (SELECT entity_id FROM entities
                                                WHERE jid=%s) AS ent_id""",
                           (self.nodeDbId,
                            resource,
                            state,
                            subscription_type,
                            subscription_depth,
                            userhost))
        except cursor._pool.dbapi.IntegrityError:
            raise error.SubscriptionExists()


    def removeSubscription(self, subscriber):
        return self.dbpool.runInteraction(self._removeSubscription,
                                           subscriber)


    def _removeSubscription(self, cursor, subscriber):
        self._checkNodeExists(cursor)

        userhost = subscriber.userhost()
        resource = subscriber.resource or ''

        cursor.execute("""DELETE FROM subscriptions WHERE
                          node_id=%s AND
                          entity_id=(SELECT entity_id FROM entities
                                                      WHERE jid=%s) AND
                          resource=%s""",
                       (self.nodeDbId,
                        userhost,
                        resource))
        if cursor.rowcount != 1:
            raise error.NotSubscribed()

        return None


    def isSubscribed(self, entity):
        return self.dbpool.runInteraction(self._isSubscribed, entity)


    def _isSubscribed(self, cursor, entity):
        self._checkNodeExists(cursor)

        cursor.execute("""SELECT 1 as bool FROM entities
                          NATURAL JOIN subscriptions
                          NATURAL JOIN nodes
                          WHERE entities.jid=%s
                          AND node_id=%s AND state='subscribed'""",
                       (entity.userhost(),
                       self.nodeDbId))

        return cursor.fetchone() is not None


    def getAffiliations(self):
        return self.dbpool.runInteraction(self._getAffiliations)


    def _getAffiliations(self, cursor):
        self._checkNodeExists(cursor)

        cursor.execute("""SELECT jid, affiliation FROM nodes
                          NATURAL JOIN affiliations
                          NATURAL JOIN entities
                          WHERE node_id=%s""",
                       (self.nodeDbId,))
        result = cursor.fetchall()

        return [(jid.internJID(r[0]), r[1]) for r in result]



class LeafNode(Node):

    implements(iidavoll.ILeafNode)

    nodeType = 'leaf'

    def storeItems(self, item_data, publisher):
        return self.dbpool.runInteraction(self._storeItems, item_data, publisher)


    def _storeItems(self, cursor, items_data, publisher):
        self._checkNodeExists(cursor)
        for item_data in items_data:
            self._storeItem(cursor, item_data, publisher)


    def _storeItem(self, cursor, item_data, publisher):
        item, access_model, item_config = item_data.item, item_data.access_model, item_data.config
        data = item.toXml()

        cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
                          FROM nodes
                          WHERE nodes.node_id = items.node_id AND
                                nodes.node_id = %s and items.item=%s
                          RETURNING item_id""",
                       (publisher.full(),
                        data,
                        self.nodeDbId,
                        item["id"]))
        if cursor.rowcount == 1:
            item_id = cursor.fetchone()[0];
            self._storeCategories(cursor, item_id, item_data.categories, update=True)
            return

        cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model)
                          SELECT %s, %s, %s, %s, %s FROM nodes
                                                     WHERE node_id=%s
                                                     RETURNING item_id""",
                       (self.nodeDbId,
                        item["id"],
                        publisher.full(),
                        data,
                        access_model,
                        self.nodeDbId))

        item_id = cursor.fetchone()[0];
        self._storeCategories(cursor, item_id, item_data.categories)

        if access_model == const.VAL_AMODEL_ROSTER:
            if const.OPT_ROSTER_GROUPS_ALLOWED in item_config:
                item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value
                allowed_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED]
            else:
                allowed_groups = []
            for group in allowed_groups:
                #TODO: check that group are actually in roster
                cursor.execute("""INSERT INTO item_groups_authorized (item_id, groupname)
                                  VALUES (%s,%s)""" , (item_id, group))

    def _storeCategories(self, cursor, item_id, categories, update=False):
        # TODO: handle canonical form
        if update:
            cursor.execute("""DELETE FROM item_categories
                              WHERE item_id=%s""", (item_id,))

        for category in categories:
            cursor.execute("""INSERT INTO item_categories (item_id, category)
                              VALUES (%s, %s)""", (item_id, category))

    def removeItems(self, itemIdentifiers):
        return self.dbpool.runInteraction(self._removeItems, itemIdentifiers)


    def _removeItems(self, cursor, itemIdentifiers):
        self._checkNodeExists(cursor)

        deleted = []

        for itemIdentifier in itemIdentifiers:
            cursor.execute("""DELETE FROM items WHERE
                              node_id=%s AND
                              item=%s""",
                           (self.nodeDbId,
                            itemIdentifier))

            if cursor.rowcount:
                deleted.append(itemIdentifier)

        return deleted


    def getItems(self, authorized_groups, unrestricted, maxItems=None, ext_data=None):
        """ Get all authorised items
        @param authorized_groups: we want to get items that these groups can access
        @param unrestricted: if true, don't check permissions (i.e.: get all items)
        @param maxItems: nb of items we want to tget
        @param rsm_data: options for RSM feature handling (XEP-0059) as a
                         dictionnary of C{unicode} to C{unicode}.

        @return: list of (item, access_model, id) if unrestricted is True, else list of items
        """
        if ext_data is None:
            ext_data = {}
        return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data)

    def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data):
        #  FIXME: simplify the query construction
        self._checkNodeExists(cursor)

        if unrestricted:
            query = ["SELECT data,items.access_model,item_id"]
            source = """FROM nodes
                       INNER JOIN items USING (node_id)
                       WHERE node_id=%s"""
            args = [self.nodeDbId]
        else:
            query = ["SELECT data"]
            groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else ""
            source = """FROM nodes
                       INNER JOIN items USING (node_id)
                       LEFT JOIN item_groups_authorized USING (item_id)
                       WHERE node_id=%s AND
                       (items.access_model='open'""" + groups + ")"

            args = [self.nodeDbId]
            if authorized_groups:
                args.append(authorized_groups)

        if 'filters' in ext_data:  # MAM filters
            for filter_ in ext_data['filters']:
                if filter_.var == 'start':
                    source += " AND date>=%s"
                    args.append(filter_.value)
                if filter_.var == 'end':
                    source += " AND date<=%s"
                    args.append(filter_.value)
                if filter_.var == 'with':
                    jid_s = filter_.value
                    if '/' in jid_s:
                        source += " AND publisher=%s"
                        args.append(filter_.value)
                    else:
                        source += " AND publisher LIKE %s"
                        args.append(u"{}%".format(filter_.value))

        query.append(source)
        order = "DESC"

        if 'rsm' in ext_data:
            rsm = ext_data['rsm']
            maxItems = rsm.max
            if rsm.index is not None:
                query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)")
                # FIXME: change the request so source is not used 2 times
                # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args
                args.append(self.nodeDbId)
                if authorized_groups:
                    args.append(authorized_groups)
                args.append(rsm.index)
            elif rsm.before is not None:
                order = "ASC"
                if rsm.before != '':
                    query.append("AND date>(SELECT date FROM items WHERE item=%s LIMIT 1)")
                    args.append(rsm.before)
            elif rsm.after:
                query.append("AND date<(SELECT date FROM items WHERE item=%s LIMIT 1)")
                args.append(rsm.after)

        query.append("ORDER BY date %s" % order)

        if maxItems is not None:
            query.append("LIMIT %s")
            args.append(maxItems)

        cursor.execute(' '.join(query), args)

        result = cursor.fetchall()
        if unrestricted:
            ret = []
            for data in result:
                item = generic.stripNamespace(parseXml(data[0]))
                access_model = data[1]
                item_id = data[2]
                access_list = {}
                if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
                    cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
                    access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]

                ret.append((item, access_model, access_list))
            return ret
        items = [generic.stripNamespace(parseXml(r[0])) for r in result]
        return items

    def countItems(self, authorized_groups, unrestricted):
        """ Count the accessible items.

        @param authorized_groups: we want to get items that these groups can access.
        @param unrestricted: if true, don't check permissions (i.e.: get all items).
        @return: deferred that fires a C{int}.
        """
        return self.dbpool.runInteraction(self._countItems, authorized_groups, unrestricted)

    def _countItems(self, cursor, authorized_groups, unrestricted):
        self._checkNodeExists(cursor)

        if unrestricted:
            query = ["""SELECT count(item_id) FROM nodes
                       INNER JOIN items USING (node_id)
                       WHERE node_id=%s"""]
            args = [self.nodeDbId]
        else:
            query = ["""SELECT count(item_id) FROM nodes
                       INNER  JOIN items USING (node_id)
                       LEFT JOIN item_groups_authorized USING (item_id)
                       WHERE node_id=%s AND
                       (items.access_model='open' """ +
                       ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
                       ")"]

            args = [self.nodeDbId]
            if authorized_groups:
                args.append(authorized_groups)

        cursor.execute(' '.join(query), args)
        return cursor.fetchall()[0][0]

    def getIndex(self, authorized_groups, unrestricted, item):
        """ Retrieve the index of the given item within the accessible window.

        @param authorized_groups: we want to get items that these groups can access.
        @param unrestricted: if true, don't check permissions (i.e.: get all items).
        @param item: item identifier.
        @return: deferred that fires a C{int}.
        """
        return self.dbpool.runInteraction(self._getIndex, authorized_groups, unrestricted, item)

    def _getIndex(self, cursor, authorized_groups, unrestricted, item):
        self._checkNodeExists(cursor)

        if unrestricted:
            query = ["""SELECT row_number FROM (
                       SELECT row_number() OVER (ORDER BY date DESC), item
                       FROM nodes INNER JOIN items USING (node_id)
                       WHERE node_id=%s
                       ) as x
                       WHERE item=%s LIMIT 1"""]
            args = [self.nodeDbId]
        else:
            query = ["""SELECT row_number FROM (
                       SELECT row_number() OVER (ORDER BY date DESC), item
                       FROM nodes INNER JOIN items USING (node_id)
                       LEFT JOIN item_groups_authorized USING (item_id)
                       WHERE node_id=%s AND
                       (items.access_model='open' """ +
                       ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
                       """)) as x
                       WHERE item=%s LIMIT 1"""]

            args = [self.nodeDbId]
            if authorized_groups:
                args.append(authorized_groups)

        args.append(item)
        cursor.execute(' '.join(query), args)

        return cursor.fetchall()[0][0]

    def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers):
        """ Get items which are in the given list
        @param authorized_groups: we want to get items that these groups can access
        @param unrestricted: if true, don't check permissions
        @param itemIdentifiers: list of ids of the items we want to get
        @return: list of (item, access_model, access_list) if unrestricted is True, else list of items
            access_list is managed as a dictionnary with same key as for item_config
        """
        return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers)


    def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers):
        self._checkNodeExists(cursor)
        ret = []
        if unrestricted: #we get everything without checking permissions
            for itemIdentifier in itemIdentifiers:
                cursor.execute("""SELECT data,items.access_model,item_id FROM nodes
                                  INNER JOIN items USING (node_id)
                                  WHERE node_id=%s AND item=%s""",
                               (self.nodeDbId,
                                itemIdentifier))
                result = cursor.fetchone()
                if not result:
                    raise error.ItemNotFound()

                item = generic.stripNamespace(parseXml(result[0]))
                access_model = result[1]
                item_id = result[2]
                access_list = {}
                if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
                    cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
                    access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]

                ret.append((item, access_model, access_list))
        else: #we check permission before returning items
            for itemIdentifier in itemIdentifiers:
                args = [self.nodeDbId, itemIdentifier]
                if authorized_groups:
                    args.append(authorized_groups)
                cursor.execute("""SELECT data FROM nodes
                           INNER  JOIN items USING (node_id)
                           LEFT JOIN item_groups_authorized USING (item_id)
                           WHERE node_id=%s AND item=%s AND
                           (items.access_model='open' """ +
                           ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")",
                           args)

                result = cursor.fetchone()
                if result:
                    ret.append(generic.stripNamespace(parseXml(result[0])))

        return ret


    def getItemsPublishers(self, itemIdentifiers):
        """Get the publishers for all given identifiers

        @return (dict): map of itemIdentifiers to publisher
        """
        return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers)


    def _getItemsPublishers(self, cursor, itemIdentifiers):
        self._checkNodeExists(cursor)
        ret = {}
        for itemIdentifier in itemIdentifiers:
            cursor.execute("""SELECT publisher FROM items
                              WHERE item=%s""",
                            (itemIdentifier,))
            result = cursor.fetchone()
            if not result:
                # We have an internal error, that's why we use ValueError
                # and not error.ItemNotFound()
                raise ValueError() # itemIdentifier must exists
            ret[itemIdentifier] = jid.JID(result[0])
        return ret


    def purge(self):
        return self.dbpool.runInteraction(self._purge)


    def _purge(self, cursor):
        self._checkNodeExists(cursor)

        cursor.execute("""DELETE FROM items WHERE
                          node_id=%s""",
                       (self.nodeDbId,))

   # FIXME: to be checked
   #  def filterItemsWithPublisher(self, itemIdentifiers, recipient):
   #      return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, recipient)

   #  def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor):
   #      self._checkNodeExists(cursor)
   #      ret = []
   #      for itemIdentifier in itemIdentifiers:
   #          args = ["%s/%%" % requestor.userhost(), itemIdentifier]
   #          cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args)
   #          result = cursor.fetchone()
   #          if result:
   #              ret.append(result[0])
   #      return ret

class CollectionNode(Node):

    nodeType = 'collection'



class GatewayStorage(object):
    """
    Memory based storage facility for the XMPP-HTTP gateway.
    """

    def __init__(self, dbpool):
        self.dbpool = dbpool


    def _countCallbacks(self, cursor, service, nodeIdentifier):
        """
        Count number of callbacks registered for a node.
        """
        cursor.execute("""SELECT count(*) FROM callbacks
                          WHERE service=%s and node=%s""",
                       (service.full(),
                        nodeIdentifier))
        results = cursor.fetchall()
        return results[0][0]


    def addCallback(self, service, nodeIdentifier, callback):
        def interaction(cursor):
            cursor.execute("""SELECT 1 as bool FROM callbacks
                              WHERE service=%s and node=%s and uri=%s""",
                           (service.full(),
                           nodeIdentifier,
                           callback))
            if cursor.fetchall():
                return

            cursor.execute("""INSERT INTO callbacks
                              (service, node, uri) VALUES
                              (%s, %s, %s)""",
                           (service.full(),
                           nodeIdentifier,
                           callback))

        return self.dbpool.runInteraction(interaction)


    def removeCallback(self, service, nodeIdentifier, callback):
        def interaction(cursor):
            cursor.execute("""DELETE FROM callbacks
                              WHERE service=%s and node=%s and uri=%s""",
                           (service.full(),
                            nodeIdentifier,
                            callback))

            if cursor.rowcount != 1:
                raise error.NotSubscribed()

            last = not self._countCallbacks(cursor, service, nodeIdentifier)
            return last

        return self.dbpool.runInteraction(interaction)

    def getCallbacks(self, service, nodeIdentifier):
        def interaction(cursor):
            cursor.execute("""SELECT uri FROM callbacks
                              WHERE service=%s and node=%s""",
                           (service.full(),
                            nodeIdentifier))
            results = cursor.fetchall()

            if not results:
                raise error.NoCallbacks()

            return [result[0] for result in results]

        return self.dbpool.runInteraction(interaction)


    def hasCallbacks(self, service, nodeIdentifier):
        def interaction(cursor):
            return bool(self._countCallbacks(cursor, service, nodeIdentifier))

        return self.dbpool.runInteraction(interaction)