# HG changeset patch # User Ralph Meijer # Date 1100011892 0 # Node ID ec354aab39499691df3f84f1bbe264ae266be7aa # Parent c40ccc30dfb7b45af09ed866d92f11b7d5be3b4c Implemented ItemRetrievalService. diff -r c40ccc30dfb7 -r ec354aab3949 idavoll/backend.py --- a/idavoll/backend.py Tue Nov 09 14:50:18 2004 +0000 +++ b/idavoll/backend.py Tue Nov 09 14:51:32 2004 +0000 @@ -121,8 +121,7 @@ class IItemRetrievalService(components.Interface): """ A service for retrieving previously published items. """ - def get_items(self, node_id, max_items=None, item_ids=[], - requestor=None): + def get_items(self, node_id, requestor, max_items=None, item_ids=[]): """ Retrieve items from persistent storage If C{max_items} is given, return the C{max_items} last published @@ -318,3 +317,28 @@ 'subscription': subscription} return new_affiliations.values() + +class ItemRetrievalService(service.Service): + + __implements__ = IItemRetrievalService + + def get_items(self, node_id, requestor, max_items=None, item_ids=[]): + d = self.parent.storage.is_subscribed(node_id, requestor) + d.addCallback(self._do_get_items, node_id, max_items, item_ids) + return d + + def _do_get_items(self, result, node_id, max_items, item_ids): + def q(r): + print r + return r + + if not result: + raise NotAuthorized + + if item_ids: + d = self.parent.storage.get_items_by_ids(node_id, item_ids) + d.addCallback(q) + d.addErrback(q) + return d + else: + return self.parent.storage.get_items(node_id, max_items) diff -r c40ccc30dfb7 -r ec354aab3949 idavoll/pgsql_backend.py --- a/idavoll/pgsql_backend.py Tue Nov 09 14:50:18 2004 +0000 +++ b/idavoll/pgsql_backend.py Tue Nov 09 14:51:32 2004 +0000 @@ -227,6 +227,68 @@ d.addCallback(lambda results: [r[0] for r in results]) return d + def is_subscribed(self, node_id, subscriber): + return self.dbpool.runInteraction(self._is_subscribed, node_id, + subscriber) + + def _is_subscribed(self, cursor, node_id, subscriber): + self._check_node_exists(cursor, node_id) + + userhost = subscriber.userhost() + resource = subscriber.resource or '' + + cursor.execute("""SELECT 1 FROM entities + JOIN subscriptions ON + (entities.id=subscriptions.entity_id) + JOIN nodes ON + (nodes.id=subscriptions.node_id) + WHERE entities.jid=%s AND resource=%s + AND node=%s""", + (userhost.encode('utf8'), + resource.encode('utf8'), + node_id.encode('utf8'))) + + return cursor.fetchone() is not None + + def get_items_by_ids(self, node_id, item_ids): + return self.dbpool.runInteraction(self._get_items_by_ids, node_id, + item_ids) + + def _get_items_by_ids(self, cursor, node_id, item_ids): + self._check_node_exists(cursor, node_id) + items = [] + for item_id in item_ids: + cursor.execute("""SELECT data FROM nodes JOIN items ON + (nodes.id=items.node_id) + WHERE node=%s AND item=%s""", + (node_id.encode('utf8'), + item_id.encode('utf8'))) + result = cursor.fetchone() + if result: + items.append(result[0]) + return items + + def get_items(self, node_id, max_items=None): + return self.dbpool.runInteraction(self._get_items, node_id, max_items) + + def _get_items(self, cursor, node_id, max_items): + self._check_node_exists(cursor, node_id) + query = """SELECT data FROM nodes JOIN items ON + (nodes.id=items.node_id) + WHERE node=%s ORDER BY date DESC""" + try: + if max_items: + cursor.execute(query + " LIMIT %s", + (node_id.encode('utf8'), + max_items)) + else: + cursor.execute(query, (node_id.encode('utf8'))) + except Exception, e: + print e + + result = cursor.fetchall() + return [r[0] for r in result] + class BackendService(backend.BackendService): """ PostgreSQL backend Service for a JEP-0060 pubsub service """ @@ -244,3 +306,6 @@ class AffiliationsService(backend.AffiliationsService): pass + +class ItemRetrievalService(backend.ItemRetrievalService): + pass