Mercurial > libervia-pubsub
diff sat_pubsub/pgsql_storage.py @ 294:df1edebb0466
PEP implementation, draft (huge patch sorry):
/!\ database schema has changed ! /!\
- whole PEP behaviour is not managed yet
- if the stanza is delegated, PEP is assumed
- fixed potential SQL injection in pgsql_storage
- publish notifications manage PEP
- added retract notifications (if "notify" attribute is present), with PEP handling
- a publisher can't replace an item he didn't publised anymore
- /!\ schema has changed, sat_pubsub_update_0_1.sql update it
- sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 16 Aug 2015 01:32:42 +0200 |
parents | 002c59dbc23f |
children | 4115999d85e9 |
line wrap: on
line diff
--- a/sat_pubsub/pgsql_storage.py Sun Aug 16 01:15:13 2015 +0200 +++ b/sat_pubsub/pgsql_storage.py Sun Aug 16 01:32:42 2015 +0200 @@ -72,21 +72,44 @@ # parseXml manage str, but we get unicode parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) + +def withPEP(query, values, pep, recipient, pep_table=None): + """Helper method to facilitate PEP management + + @param query: SQL query basis + @param values: current values to replace in query + @param pep: True if we are in PEP mode + @param recipient: jid of the recipient + @param pep_table: added before pep if table need to be specified + @return: query + PEP AND check, + recipient's bare jid is added to value if needed + """ + pep_col_name = "{}pep".format( + '' if pep_table is None + else ".{}".format(pep_table)) + if pep: + pep_check="AND {}=%s".format(pep_col_name) + values=list(values) + [recipient.userhost()] + else: + pep_check="AND {} IS NULL".format(pep_col_name) + return "{} {}".format(query, pep_check), values + + class Storage: implements(iidavoll.IStorage) defaultConfig = { 'leaf': { - "pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', + const.OPT_PERSIST_ITEMS: True, + const.OPT_DELIVER_PAYLOADS: True, + const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, }, 'collection': { - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', + const.OPT_DELIVER_PAYLOADS: True, + const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, } @@ -95,83 +118,119 @@ def __init__(self, dbpool): self.dbpool = dbpool - def getNode(self, nodeIdentifier): - return self.dbpool.runInteraction(self._getNode, nodeIdentifier) - - def _getNode(self, cursor, nodeIdentifier): + def _buildNode(self, row): + """Build a note class from database result row""" configuration = {} - cursor.execute("""SELECT node_type, - persist_items, - deliver_payloads, - send_last_published_item, - access_model, - publish_model - FROM nodes - WHERE node=%s""", - (nodeIdentifier,)) - row = cursor.fetchone() if not row: raise error.NodeNotFound() - if row[0] == 'leaf': + if row[2] == 'leaf': configuration = { - 'pubsub#persist_items': row[1], - 'pubsub#deliver_payloads': row[2], - 'pubsub#send_last_published_item': row[3], - const.OPT_ACCESS_MODEL:row[4], - const.OPT_PUBLISH_MODEL:row[5], + 'pubsub#persist_items': row[3], + 'pubsub#deliver_payloads': row[4], + 'pubsub#send_last_published_item': row[5], + const.OPT_ACCESS_MODEL:row[6], + const.OPT_PUBLISH_MODEL:row[7], } - node = LeafNode(nodeIdentifier, configuration) + node = LeafNode(row[0], row[1], configuration) + node.dbpool = self.dbpool + return node + elif row[2] == 'collection': + configuration = { + 'pubsub#deliver_payloads': row[4], + 'pubsub#send_last_published_item': row[5], + const.OPT_ACCESS_MODEL: row[6], + const.OPT_PUBLISH_MODEL:row[7], + } + node = CollectionNode(row[0], row[1], configuration) node.dbpool = self.dbpool return node - elif row[0] == 'collection': - configuration = { - 'pubsub#deliver_payloads': row[2], - 'pubsub#send_last_published_item': row[3], - const.OPT_ACCESS_MODEL: row[4], - const.OPT_PUBLISH_MODEL:row[5], - } - node = CollectionNode(nodeIdentifier, configuration) - node.dbpool = self.dbpool - return node + else: + raise ValueError("Unknown node type !") + + def getNodeById(self, nodeDbId): + """Get node using database ID insted of pubsub identifier + + @param nodeDbId(unicode): database ID + """ + return self.dbpool.runInteraction(self._getNodeById, nodeDbId) + def _getNodeById(self, cursor, nodeDbId): + cursor.execute("""SELECT node_id, + node, + node_type, + persist_items, + deliver_payloads, + send_last_published_item, + access_model, + publish_model, + pep + FROM nodes + WHERE node_id=%s""", + (nodeDbId,)) + row = cursor.fetchone() + return self._buildNode(row) - def getNodeIds(self): - d = self.dbpool.runQuery("""SELECT node from nodes""") + def getNode(self, nodeIdentifier, pep, recipient=None): + return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient) + + + def _getNode(self, cursor, nodeIdentifier, pep, recipient): + cursor.execute(*withPEP("""SELECT node_id, + node, + node_type, + persist_items, + deliver_payloads, + send_last_published_item, + access_model, + publish_model, + pep + FROM nodes + WHERE node=%s""", + (nodeIdentifier,), pep, recipient)) + row = cursor.fetchone() + return self._buildNode(row) + + def getNodeIds(self, pep): + d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL""" + .format("NOT " if pep else "")) d.addCallback(lambda results: [r[0] for r in results]) return d - def createNode(self, nodeIdentifier, owner, config): + def createNode(self, nodeIdentifier, owner, config, pep, recipient=None): return self.dbpool.runInteraction(self._createNode, nodeIdentifier, - owner, config) + owner, config, pep, recipient) - def _createNode(self, cursor, nodeIdentifier, owner, config): + def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient): if config['pubsub#node_type'] != 'leaf': raise error.NoCollections() owner = owner.userhost() + try: cursor.execute("""INSERT INTO nodes (node, node_type, persist_items, - deliver_payloads, send_last_published_item, access_model, publish_model) + deliver_payloads, send_last_published_item, access_model, publish_model, pep) VALUES - (%s, 'leaf', %s, %s, %s, %s, %s)""", + (%s, 'leaf', %s, %s, %s, %s, %s, %s)""", (nodeIdentifier, config['pubsub#persist_items'], config['pubsub#deliver_payloads'], config['pubsub#send_last_published_item'], config[const.OPT_ACCESS_MODEL], config[const.OPT_PUBLISH_MODEL], + recipient.userhost() if pep else None ) ) except cursor._pool.dbapi.IntegrityError: raise error.NodeExists() - cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,)); + cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""", + (nodeIdentifier,), pep, recipient)); node_id = cursor.fetchone()[0] cursor.execute("""SELECT 1 as bool from entities where jid=%s""", @@ -210,39 +269,49 @@ cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) VALUES (%s,%s)""" , (node_id, group)) + def deleteNodeByDbId(self, db_id): + """Delete a node using directly its database id""" + return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id) - def deleteNode(self, nodeIdentifier): - return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) + def _deleteNodeByDbId(self, cursor, db_id): + cursor.execute("""DELETE FROM nodes WHERE node_id=%s""", + (db_id,)) + + if cursor.rowcount != 1: + raise error.NodeNotFound() + + def deleteNode(self, nodeIdentifier, pep, recipient=None): + return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient) - def _deleteNode(self, cursor, nodeIdentifier): - cursor.execute("""DELETE FROM nodes WHERE node=%s""", - (nodeIdentifier,)) + def _deleteNode(self, cursor, nodeIdentifier, pep, recipient): + cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""", + (nodeIdentifier,), pep, recipient)) if cursor.rowcount != 1: raise error.NodeNotFound() - def getNodeGroups(self, nodeIdentifier): - return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier) + def getNodeGroups(self, nodeIdentifier, pep, recipient=None): + return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier, pep, recipient) - def _getNodeGroups(self, cursor, nodeIdentifier): - cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", - (nodeIdentifier,)) + def _getNodeGroups(self, cursor, nodeIdentifier, pep, recipient): + cursor.execute(*withPEP("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", + (nodeIdentifier,), pep, recipient)) rows = cursor.fetchall() return [row[0] for row in rows] - def getAffiliations(self, entity): - d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities + def getAffiliations(self, entity, pep, recipient=None): + d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities NATURAL JOIN affiliations NATURAL JOIN nodes WHERE jid=%s""", - (entity.userhost(),)) + (entity.userhost(),), pep, recipient, 'nodes')) d.addCallback(lambda results: [tuple(r) for r in results]) return d - def getSubscriptions(self, entity): + def getSubscriptions(self, entity, pep, recipient=None): def toSubscriptions(rows): subscriptions = [] for row in rows: @@ -256,8 +325,8 @@ FROM entities NATURAL JOIN subscriptions NATURAL JOIN nodes - WHERE jid=%s""", - (entity.userhost(),)) + WHERE jid=%s AND nodes.pep=%s""", + (entity.userhost(), recipient.userhost() if pep else None)) d.addCallback(toSubscriptions) return d @@ -271,15 +340,16 @@ implements(iidavoll.INode) - def __init__(self, nodeIdentifier, config): + def __init__(self, nodeDbId, nodeIdentifier, config): + self.nodeDbId = nodeDbId self.nodeIdentifier = nodeIdentifier self._config = config self.owner = None; def _checkNodeExists(self, cursor): - cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", - (self.nodeIdentifier,)) + cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", + (self.nodeDbId,)) if not cursor.fetchone(): raise error.NodeNotFound() @@ -290,7 +360,7 @@ def getNodeOwner(self): if self.owner: return defer.succeed(self.owner) - d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node=%s""", (self.nodeIdentifier,)) + d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s""", (self.nodeDbId,)) d.addCallback(lambda result: jid.JID(result[0][0])) return d @@ -315,12 +385,16 @@ self._checkNodeExists(cursor) cursor.execute("""UPDATE nodes SET persist_items=%s, deliver_payloads=%s, - send_last_published_item=%s - WHERE node=%s""", - (config["pubsub#persist_items"], - config["pubsub#deliver_payloads"], - config["pubsub#send_last_published_item"], - self.nodeIdentifier)) + send_last_published_item=%s, + access_model=%s, + publish_model=%s + WHERE node_id=%s""", + (config[const.OPT_PERSIST_ITEMS], + config[const.OPT_DELIVER_PAYLOADS], + config[const.OPT_SEND_LAST_PUBLISHED_ITEM], + config[const.OPT_ACCESS_MODEL], + config[const.OPT_PUBLISH_MODEL], + self.nodeDbId)) def _setCachedConfiguration(self, void, config): @@ -342,8 +416,8 @@ cursor.execute("""SELECT affiliation FROM affiliations NATURAL JOIN nodes NATURAL JOIN entities - WHERE node=%s AND jid=%s""", - (self.nodeIdentifier, + WHERE node_id=%s AND jid=%s""", + (self.nodeDbId, entity.userhost())) try: @@ -351,19 +425,9 @@ except TypeError: return None + def getAccessModel(self): - return self.dbpool.runInteraction(self._getAccessModel) - - def _getAccessModel(self, cursor, entity): - self._checkNodeExists(cursor) - cursor.execute("""SELECT access_model FROM nodes - WHERE node=%s""", - (self.nodeIdentifier,)) - - try: - return cursor.fetchone()[0] - except TypeError: - return None + return self._config[const.OPT_ACCESS_MODEL] def getSubscription(self, subscriber): @@ -379,8 +443,8 @@ cursor.execute("""SELECT state FROM subscriptions NATURAL JOIN nodes NATURAL JOIN entities - WHERE node=%s AND jid=%s AND resource=%s""", - (self.nodeIdentifier, + WHERE node_id=%s AND jid=%s AND resource=%s""", + (self.nodeDbId, userhost, resource)) @@ -398,13 +462,13 @@ def _getSubscriptions(self, cursor, state): self._checkNodeExists(cursor) - query = """SELECT jid, resource, state, + query = """SELECT node, jid, resource, state, subscription_type, subscription_depth FROM subscriptions NATURAL JOIN nodes NATURAL JOIN entities - WHERE node=%s""" - values = [self.nodeIdentifier] + WHERE node_id=%s""" + values = [self.nodeDbId] if state: query += " AND state=%s" @@ -415,16 +479,16 @@ subscriptions = [] for row in rows: - subscriber = jid.JID(u'%s/%s' % (row[0], row[1])) + subscriber = jid.JID(u'%s/%s' % (row[1], row[2])) options = {} - if row[3]: - options['pubsub#subscription_type'] = row[3]; if row[4]: - options['pubsub#subscription_depth'] = row[4]; + options['pubsub#subscription_type'] = row[4]; + if row[5]: + options['pubsub#subscription_depth'] = row[5]; - subscriptions.append(Subscription(self.nodeIdentifier, subscriber, - row[2], options)) + subscriptions.append(Subscription(row[0], subscriber, + row[3], options)) return subscriptions @@ -453,17 +517,14 @@ cursor.execute("""INSERT INTO subscriptions (node_id, entity_id, resource, state, subscription_type, subscription_depth) - SELECT node_id, entity_id, %s, %s, %s, %s FROM - (SELECT node_id FROM nodes - WHERE node=%s) as n - CROSS JOIN + SELECT %s, entity_id, %s, %s, %s, %s FROM (SELECT entity_id FROM entities - WHERE jid=%s) as e""", - (resource, + WHERE jid=%s) AS ent_id""", + (self.nodeDbId, + resource, state, subscription_type, subscription_depth, - self.nodeIdentifier, userhost)) except cursor._pool.dbapi.IntegrityError: raise error.SubscriptionExists() @@ -481,12 +542,11 @@ resource = subscriber.resource or '' cursor.execute("""DELETE FROM subscriptions WHERE - node_id=(SELECT node_id FROM nodes - WHERE node=%s) AND + node_id=%s AND entity_id=(SELECT entity_id FROM entities WHERE jid=%s) AND resource=%s""", - (self.nodeIdentifier, + (self.nodeDbId, userhost, resource)) if cursor.rowcount != 1: @@ -506,9 +566,9 @@ NATURAL JOIN subscriptions NATURAL JOIN nodes WHERE entities.jid=%s - AND node=%s AND state='subscribed'""", + AND node_id=%s AND state='subscribed'""", (entity.userhost(), - self.nodeIdentifier)) + self.nodeDbId)) return cursor.fetchone() is not None @@ -523,8 +583,8 @@ cursor.execute("""SELECT jid, affiliation FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities - WHERE node=%s""", - (self.nodeIdentifier,)) + WHERE node_id=%s""", + (self.nodeDbId,)) result = cursor.fetchall() return [(jid.internJID(r[0]), r[1]) for r in result] @@ -541,36 +601,37 @@ return self.dbpool.runInteraction(self._storeItems, item_data, publisher) - def _storeItems(self, cursor, item_data, publisher): + def _storeItems(self, cursor, items_data, publisher): self._checkNodeExists(cursor) - for item_datum in item_data: - self._storeItem(cursor, item_datum, publisher) + for item_data in items_data: + self._storeItem(cursor, item_data, publisher) - def _storeItem(self, cursor, item_datum, publisher): - access_model, item_config, item = item_datum + def _storeItem(self, cursor, item_data, publisher): + item, access_model, item_config = item_data data = item.toXml() cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s FROM nodes WHERE nodes.node_id = items.node_id AND - nodes.node = %s and items.item=%s""", + nodes.node_id = %s and items.item=%s""", (publisher.full(), data, - self.nodeIdentifier, + self.nodeDbId, item["id"])) if cursor.rowcount == 1: return cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model) - SELECT node_id, %s, %s, %s, %s FROM nodes - WHERE node=%s + SELECT %s, %s, %s, %s, %s FROM nodes + WHERE node_id=%s RETURNING item_id""", - (item["id"], + (self.nodeDbId, + item["id"], publisher.full(), data, access_model, - self.nodeIdentifier)) + self.nodeDbId)) if access_model == const.VAL_AMODEL_ROSTER: item_id = cursor.fetchone()[0]; @@ -596,10 +657,9 @@ for itemIdentifier in itemIdentifiers: cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT node_id FROM nodes - WHERE node=%s) AND + node_id=%s AND item=%s""", - (self.nodeIdentifier, + (self.nodeDbId, itemIdentifier)) if cursor.rowcount: @@ -623,40 +683,44 @@ return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data) def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data): + # FIXME: simplify the query construction self._checkNodeExists(cursor) if unrestricted: query = ["SELECT data,items.access_model,item_id"] source = """FROM nodes INNER JOIN items USING (node_id) - WHERE node=%s""" - args = [self.nodeIdentifier] + WHERE node_id=%s""" + args = [self.nodeDbId] else: query = ["SELECT data"] groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else "" source = """FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) - WHERE node=%s AND + WHERE node_id=%s AND (items.access_model='open'""" + groups + ")" - args = [self.nodeIdentifier] + args = [self.nodeDbId] if authorized_groups: args.append(authorized_groups) if 'filters' in ext_data: # MAM filters for filter_ in ext_data['filters']: if filter_.var == 'start': - source += " AND date>='{date}'".format(date=filter_.value) + source += " AND date>=%s" + args.append(filter_.value) if filter_.var == 'end': - source += " AND date<='{date}'".format(date=filter_.value) + source += " AND date<=%s" + args.append(filter_.value) if filter_.var == 'with': jid_s = filter_.value if '/' in jid_s: - source += " AND publisher='{pub}'".format(pub=filter_.value) - else: # assume the publisher field in DB is always a full JID - # XXX: need to escape the % with itself to avoid formatting error - source += " AND publisher LIKE '{pub}/%%'".format(pub=filter_.value) + source += " AND publisher=%s" + args.append(filter_.value) + else: + source += " AND publisher LIKE %s" + args.append(u"{}%".format(filter_.value)) query.append(source) order = "DESC" @@ -666,7 +730,9 @@ maxItems = rsm.max if rsm.index is not None: query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)") - args.append(self.nodeIdentifier) + # FIXME: change the request so source is not used 2 times + # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args + args.append(self.nodeDbId) if authorized_groups: args.append(authorized_groups) args.append(rsm.index) @@ -694,11 +760,10 @@ item = generic.stripNamespace(parseXml(data[0])) access_model = data[1] item_id = data[2] - if access_model == 'roster': #TODO: jid access_model + access_list = {} + if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) - access_list = [r[0] for r in cursor.fetchall()] - else: - access_list = None + access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] ret.append((item, access_model, access_list)) return ret @@ -720,18 +785,18 @@ if unrestricted: query = ["""SELECT count(item_id) FROM nodes INNER JOIN items USING (node_id) - WHERE node=%s"""] - args = [self.nodeIdentifier] + WHERE node_id=%s"""] + args = [self.nodeDbId] else: query = ["""SELECT count(item_id) FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) - WHERE node=%s AND + WHERE node_id=%s AND (items.access_model='open' """ + ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")"] - args = [self.nodeIdentifier] + args = [self.nodeDbId] if authorized_groups: args.append(authorized_groups) @@ -755,22 +820,22 @@ query = ["""SELECT row_number FROM ( SELECT row_number() OVER (ORDER BY date DESC), item FROM nodes INNER JOIN items USING (node_id) - WHERE node=%s + WHERE node_id=%s ) as x WHERE item=%s LIMIT 1"""] - args = [self.nodeIdentifier] + args = [self.nodeDbId] else: query = ["""SELECT row_number FROM ( SELECT row_number() OVER (ORDER BY date DESC), item FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) - WHERE node=%s AND + WHERE node_id=%s AND (items.access_model='open' """ + ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + """)) as x WHERE item=%s LIMIT 1"""] - args = [self.nodeIdentifier] + args = [self.nodeDbId] if authorized_groups: args.append(authorized_groups) @@ -784,7 +849,8 @@ @param authorized_groups: we want to get items that these groups can access @param unrestricted: if true, don't check permissions @param itemIdentifiers: list of ids of the items we want to get - @return: list of (item, access_model, access_model) if unrestricted is True, else list of items + @return: list of (item, access_model, access_list) if unrestricted is True, else list of items + access_list is managed as a dictionnary with same key as for item_config """ return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) @@ -796,41 +862,66 @@ for itemIdentifier in itemIdentifiers: cursor.execute("""SELECT data,items.access_model,item_id FROM nodes INNER JOIN items USING (node_id) - WHERE node=%s AND item=%s""", - (self.nodeIdentifier, + WHERE node_id=%s AND item=%s""", + (self.nodeDbId, itemIdentifier)) result = cursor.fetchone() - if result: - for data in result: - item = generic.stripNamespace(parseXml(data[0])) - access_model = data[1] - item_id = data[2] - if access_model == 'roster': #TODO: jid access_model - cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) - access_list = [r[0] for r in cursor.fetchall()] - else: - access_list = None + if not result: + raise error.ItemNotFound() - ret.append((item, access_model, access_list)) + item = generic.stripNamespace(parseXml(result[0])) + access_model = result[1] + item_id = result[2] + access_list = {} + if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model + cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) + access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] + + ret.append((item, access_model, access_list)) else: #we check permission before returning items for itemIdentifier in itemIdentifiers: - args = [self.nodeIdentifier, itemIdentifier] + args = [self.nodeDbId, itemIdentifier] if authorized_groups: args.append(authorized_groups) cursor.execute("""SELECT data FROM nodes INNER JOIN items USING (node_id) LEFT JOIN item_groups_authorized USING (item_id) - WHERE node=%s AND item=%s AND + WHERE node_id=%s AND item=%s AND (items.access_model='open' """ + ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", args) result = cursor.fetchone() if result: - ret.append(parseXml(result[0])) + ret.append(generic.stripNamespace(parseXml(result[0]))) return ret + + def getItemsPublishers(self, itemIdentifiers): + """Get the publishers for all given identifiers + + @return (dict): map of itemIdentifiers to publisher + """ + return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers) + + + def _getItemsPublishers(self, cursor, itemIdentifiers): + self._checkNodeExists(cursor) + ret = {} + for itemIdentifier in itemIdentifiers: + cursor.execute("""SELECT publisher FROM items + WHERE item=%s""", + (itemIdentifier,)) + result = cursor.fetchone() + if not result: + # We have an internal error, that's why we use ValueError + # and not error.ItemNotFound() + raise ValueError() # itemIdentifier must exists + ret[itemIdentifier] = jid.JID(result[0]) + return ret + + def purge(self): return self.dbpool.runInteraction(self._purge) @@ -839,23 +930,23 @@ self._checkNodeExists(cursor) cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT node_id FROM nodes WHERE node=%s)""", - (self.nodeIdentifier,)) + node_id=%s""", + (self.nodeDbId,)) - - def filterItemsWithPublisher(self, itemIdentifiers, requestor): - return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, requestor) + # FIXME: to be checked + # def filterItemsWithPublisher(self, itemIdentifiers, recipient): + # return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, recipient) - def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor): - self._checkNodeExists(cursor) - ret = [] - for itemIdentifier in itemIdentifiers: - args = ["%s/%%" % requestor.userhost(), itemIdentifier] - cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args) - result = cursor.fetchone() - if result: - ret.append(result[0]) - return ret + # def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor): + # self._checkNodeExists(cursor) + # ret = [] + # for itemIdentifier in itemIdentifiers: + # args = ["%s/%%" % requestor.userhost(), itemIdentifier] + # cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args) + # result = cursor.fetchone() + # if result: + # ret.append(result[0]) + # return ret class CollectionNode(Node):