# HG changeset patch # User Ralph Meijer # Date 1217857630 0 # Node ID 274a45d2a5ab51c1ed4aa7e3df40668e15721d21 # Parent e6b710bf2b24c1fd197cd3c5bd8bb93b2738a485 Implement root collection that includes all leaf nodes. diff -r e6b710bf2b24 -r 274a45d2a5ab db/gateway.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/db/gateway.sql Mon Aug 04 13:47:10 2008 +0000 @@ -0,0 +1,7 @@ +create table callbacks ( + service text not null, + node text not null, + uri text not null, + PRIMARY KEY (service, node, uri) +); + diff -r e6b710bf2b24 -r 274a45d2a5ab db/pubsub.sql --- a/db/pubsub.sql Mon Aug 04 07:10:45 2008 +0000 +++ b/db/pubsub.sql Mon Aug 04 13:47:10 2008 +0000 @@ -1,42 +1,49 @@ -create table entities ( - id serial primary key, - jid text not null unique +CREATE TABLE entities ( + entity_id serial PRIMARY KEY, + jid text NOT NULL UNIQUE ); -create table nodes ( - id serial primary key, - node text not null unique, - persistent boolean not null default true, - deliver_payload boolean not null default true, - send_last_published_item text not null default 'on_sub' - check (send_last_published_item in ('never', 'on_sub')) +CREATE TABLE nodes ( + node_id serial PRIMARY KEY, + node text NOT NULL UNIQUE, + node_type text NOT NULL DEFAULT 'leaf' + CHECK (node_type IN ('leaf', 'collection')), + persist_items boolean, + deliver_payloads boolean NOT NULL DEFAULT TRUE, + send_last_published_item text NOT NULL DEFAULT 'on_sub' + CHECK (send_last_published_item IN ('never', 'on_sub')) ); -create table affiliations ( - id serial primary key, - entity_id integer not null references entities on delete cascade, - node_id integer not null references nodes on delete cascade, - affiliation text not null - check (affiliation in ('outcast', 'publisher', 'owner')), - unique (entity_id, node_id) +INSERT INTO nodes (node, node_type) values ('', 'collection'); + +CREATE TABLE affiliations ( + affiliation_id serial PRIMARY KEY, + entity_id integer NOT NULL REFERENCES entities ON DELETE CASCADE, + node_id integer NOT NULL references nodes ON DELETE CASCADE, + affiliation text NOT NULL + CHECK (affiliation IN ('outcast', 'publisher', 'owner')), + UNIQUE (entity_id, node_id) ); -create table subscriptions ( - id serial primary key, - entity_id integer not null references entities on delete cascade, +CREATE TABLE subscriptions ( + subscription_id serial PRIMARY KEY, + entity_id integer NOT NULL REFERENCES entities ON DELETE CASCADE, resource text, - node_id integer not null references nodes on delete cascade, - subscription text not null default 'subscribed' check - (subscription in ('subscribed', 'pending', 'unconfigured')), - unique (entity_id, resource, node_id) -); + node_id integer NOT NULL REFERENCES nodes ON delete CASCADE, + state text NOT NULL DEFAULT 'subscribed' + CHECK (state IN ('subscribed', 'pending', 'unconfigured')), + subscription_type text + CHECK (subscription_type IN (NULL, 'items', 'nodes')), + subscription_depth text + CHECK (subscription_depth IN (NULL, '1', 'all')), + UNIQUE (entity_id, resource, node_id)); -create table items ( - id serial primary key, - node_id integer not null references nodes on delete cascade, - item text not null, - publisher text not null, +CREATE TABLE items ( + item_id serial PRIMARY KEY, + node_id integer NOT NULL REFERENCES nodes ON DELETE CASCADE, + item text NOT NULL, + publisher text NOT NULL, data text, - date timestamp with time zone not null default now(), - unique (node_id, item) + date timestamp with time zone NOT NULL DEFAULT now(), + UNIQUE (node_id, item) ); diff -r e6b710bf2b24 -r 274a45d2a5ab db/to_idavoll_0.8.sql --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/db/to_idavoll_0.8.sql Mon Aug 04 13:47:10 2008 +0000 @@ -0,0 +1,25 @@ +ALTER TABLE affiliations RENAME id TO affiliation_id; + +ALTER TABLE entities RENAME id TO entity_id; + +ALTER TABLE items RENAME id TO item_id; + +ALTER TABLE nodes RENAME id TO node_id; +ALTER TABLE nodes RENAME persistent to persist_items; +ALTER TABLE nodes RENAME deliver_payload to deliver_payloads; +ALTER TABLE nodes ADD COLUMN node_type text; +ALTER TABLE nodes ALTER COLUMN node_type SET DEFAULT 'leaf'; +UPDATE nodes SET node_type = 'leaf'; +ALTER TABLE nodes ALTER COLUMN node_type SET NOT NULL; +ALTER TABLE nodes ADD CHECK (node_type IN ('leaf', 'collection')); +ALTER TABLE nodes ALTER COLUMN persistent DROP NOT NULL; +ALTER TABLE nodes ALTER COLUMN persistent DROP DEFAULT; + +ALTER TABLE subscriptions RENAME id TO subscription_id; +ALTER TABLE subscriptions RENAME subscription TO state; +ALTER TABLE subscriptions ADD COLUMN subscription_type text + CHECK (subscription_type IN (NULL, 'items', 'nodes')); +ALTER TABLE subscriptions ADD COLUMN subscription_depth text + CHECK (subscription_depth IN (NULL, '1', 'all')); + +INSERT INTO nodes (node, node_type) values ('', 'collection'); diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/backend.py --- a/idavoll/backend.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/backend.py Mon Aug 04 13:47:10 2008 +0000 @@ -18,7 +18,7 @@ from zope.interface import implements from twisted.application import service -from twisted.python import components +from twisted.python import components, log from twisted.internet import defer, reactor from twisted.words.protocols.jabber.error import StanzaError from twisted.words.xish import domish, utility @@ -27,7 +27,7 @@ from wokkel.pubsub import PubSubService, PubSubError from idavoll import error, iidavoll -from idavoll.iidavoll import IBackendService +from idavoll.iidavoll import IBackendService, ILeafNode def _getAffiliation(node, entity): d = node.getAffiliation(entity) @@ -40,35 +40,45 @@ """ Generic publish-subscribe backend service. - @cvar options: Node configuration form as a mapping from the field - name to a dictionary that holds the field's type, - label and possible options to choose from. - @type options: C{dict}. + @cvar nodeOptions: Node configuration form as a mapping from the field + name to a dictionary that holds the field's type, label + and possible options to choose from. + @type nodeOptions: C{dict}. @cvar defaultConfig: The default node configuration. """ implements(iidavoll.IBackendService) - options = {"pubsub#persist_items": - {"type": "boolean", - "label": "Persist items to storage"}, - "pubsub#deliver_payloads": - {"type": "boolean", - "label": "Deliver payloads with event notifications"}, - "pubsub#send_last_published_item": - {"type": "list-single", - "label": "When to send the last published item", - "options": { - "never": "Never", - "on_sub": "When a new subscription is processed", - } - }, - } + nodeOptions = { + "pubsub#persist_items": + {"type": "boolean", + "label": "Persist items to storage"}, + "pubsub#deliver_payloads": + {"type": "boolean", + "label": "Deliver payloads with event notifications"}, + "pubsub#send_last_published_item": + {"type": "list-single", + "label": "When to send the last published item", + "options": { + "never": "Never", + "on_sub": "When a new subscription is processed"} + }, + } - defaultConfig = {"pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - } + subscriptionOptions = { + "pubsub#subscription_type": + {"type": "list-single", + "options": { + "items": "Receive notification of new items only", + "nodes": "Receive notification of new nodes only"} + }, + "pubsub#subscription_depth": + {"type": "list-single", + "options": { + "1": "Receive notification from direct child nodes only", + "all": "Receive notification from all descendent nodes"} + }, + } def __init__(self, storage): utility.EventDispatcher.__init__(self) @@ -108,9 +118,9 @@ def _makeMetaData(self, metaData): options = [] for key, value in metaData.iteritems(): - if self.options.has_key(key): + if key in self.nodeOptions: option = {"var": key} - option.update(self.options[key]) + option.update(self.nodeOptions[key]) option["value"] = value options.append(option) @@ -136,6 +146,9 @@ def _doPublish(self, node, items, requestor): + if node.nodeType == 'collection': + raise error.NoPublishing() + configuration = node.getConfiguration() persistItems = configuration["pubsub#persist_items"] deliverPayloads = configuration["pubsub#deliver_payloads"] @@ -147,6 +160,8 @@ if persistItems or deliverPayloads: for item in items: + item.uri = None + item.defaultUri = None if not item.getAttribute("id"): item["id"] = str(uuid.uuid4()) @@ -169,18 +184,36 @@ '//event/pubsub/notify') - def getNotificationList(self, nodeIdentifier, items): - d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.getSubscribers()) - d.addCallback(self._magicFilter, nodeIdentifier, items) - return d + def getNotifications(self, nodeIdentifier, items): + + def toNotifications(subscriptions, nodeIdentifier, items): + subsBySubscriber = {} + for subscription in subscriptions: + if subscription.options.get('pubsub#subscription_type', + 'items') == 'items': + subs = subsBySubscriber.setdefault(subscription.subscriber, + set()) + subs.add(subscription) + + notifications = [(subscriber, subscriptions, items) + for subscriber, subscriptions + in subsBySubscriber.iteritems()] + return notifications - def _magicFilter(self, subscribers, nodeIdentifier, items): - list = [] - for subscriber in subscribers: - list.append((subscriber, items)) - return list + def rootNotFound(failure): + failure.trap(error.NodeNotFound) + return [] + + d1 = self.storage.getNode(nodeIdentifier) + d1.addCallback(lambda node: node.getSubscriptions('subscribed')) + d2 = self.storage.getNode('') + d2.addCallback(lambda node: node.getSubscriptions('subscribed')) + d2.addErrback(rootNotFound) + d = defer.gatherResults([d1, d2]) + d.addCallback(lambda result: result[0] + result[1]) + d.addCallback(toNotifications, nodeIdentifier, items) + return d def registerNotifier(self, observerfn, *args, **kwargs): @@ -204,41 +237,42 @@ if affiliation == 'outcast': raise error.Forbidden() - d = node.addSubscription(subscriber, 'subscribed') - d.addCallback(lambda _: self._sendLastPublished(node, subscriber)) - d.addCallback(lambda _: 'subscribed') - d.addErrback(self._getSubscription, node, subscriber) - d.addCallback(self._returnSubscription, node.nodeIdentifier) + def trapExists(failure): + failure.trap(error.SubscriptionExists) + return False + + def cb(sendLast): + d = node.getSubscription(subscriber) + if sendLast: + d.addCallback(self._sendLastPublished, node) + return d + + d = node.addSubscription(subscriber, 'subscribed', {}) + d.addCallbacks(lambda _: True, trapExists) + d.addCallback(cb) return d - def _getSubscription(self, failure, node, subscriber): - failure.trap(error.SubscriptionExists) - return node.getSubscription(subscriber) - - - def _returnSubscription(self, result, nodeIdentifier): - return nodeIdentifier, result - - - def _sendLastPublished(self, node, subscriber): + def _sendLastPublished(self, subscription, node): def notifyItem(items): - if not items: - return - - reactor.callLater(0, self.dispatch, - {'items': items, - 'nodeIdentifier': node.nodeIdentifier, - 'subscriber': subscriber}, - '//event/pubsub/notify') + if items: + reactor.callLater(0, self.dispatch, + {'items': items, + 'nodeIdentifier': node.nodeIdentifier, + 'subscription': subscription}, + '//event/pubsub/notify') config = node.getConfiguration() - if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': - return + sendLastPublished = config.get('pubsub#send_last_published_item', + 'never') + if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': + entity = subscription.subscriber.userhostJID() + d = self.getItems(node.nodeIdentifier, entity, 1) + d.addCallback(notifyItem) + d.addErrback(log.err) - d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1) - d.addCallback(notifyItem) + return subscription def unsubscribe(self, nodeIdentifier, subscriber, requestor): @@ -261,13 +295,18 @@ def createNode(self, nodeIdentifier, requestor): if not nodeIdentifier: nodeIdentifier = 'generic/%s' % uuid.uuid4() - d = self.storage.createNode(nodeIdentifier, requestor) + + nodeType = 'leaf' + config = self.storage.getDefaultConfiguration(nodeType) + config['pubsub#node_type'] = nodeType + + d = self.storage.createNode(nodeIdentifier, requestor, config) d.addCallback(lambda _: nodeIdentifier) return d - def getDefaultConfiguration(self): - d = defer.succeed(self.defaultConfig) + def getDefaultConfiguration(self, nodeType): + d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) return d @@ -277,6 +316,7 @@ d = self.storage.getNode(nodeIdentifier) d.addCallback(lambda node: node.getConfiguration()) + return d @@ -314,6 +354,9 @@ def _doGetItems(self, result, maxItems, itemIdentifiers): node, affiliation = result + if not ILeafNode.providedBy(node): + return [] + if affiliation == 'outcast': raise error.Forbidden() @@ -382,8 +425,12 @@ def getSubscribers(self, nodeIdentifier): + def cb(subscriptions): + return [subscription.subscriber for subscription in subscriptions] + d = self.storage.getNode(nodeIdentifier) - d.addCallback(lambda node: node.getSubscribers()) + d.addCallback(lambda node: node.getSubscriptions('subscribed')) + d.addCallback(cb) return d @@ -447,6 +494,12 @@ 'unsupported', 'persistent-node'), error.NoRootNode: ('bad-request', None, None), + error.NoCollections: ('feature-not-implemented', + 'unsupported', + 'collections'), + error.NoPublishing: ('feature-not-implemented', + 'unsupported', + 'publish'), } def __init__(self, backend): @@ -497,10 +550,12 @@ def _notify(self, data): items = data['items'] nodeIdentifier = data['nodeIdentifier'] - if 'subscriber' not in data: - d = self.backend.getNotificationList(nodeIdentifier, items) + if 'subscription' not in data: + d = self.backend.getNotifications(nodeIdentifier, items) else: - d = defer.succeed([(data['subscriber'], items)]) + subscription = data['subscription'] + d = defer.succeed([(subscription.subscriber, [subscription], + items)]) d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, nodeIdentifier, notifications)) @@ -539,11 +594,16 @@ info['meta-data'] = result return info + def trapNotFound(failure): + failure.trap(error.NodeNotFound) + return info + d = defer.succeed(nodeIdentifier) d.addCallback(self.backend.getNodeType) d.addCallback(saveType) d.addCallback(self.backend.getNodeMetaData) d.addCallback(saveMetaData) + d.addErrback(trapNotFound) d.addErrback(self._mapErrors) return d @@ -586,11 +646,11 @@ def getConfigurationOptions(self): - return self.backend.options + return self.backend.nodeOptions - def getDefaultConfiguration(self, requestor, service): - d = self.backend.getDefaultConfiguration() + def getDefaultConfiguration(self, requestor, service, nodeType): + d = self.backend.getDefaultConfiguration(nodeType) return d.addErrback(self._mapErrors) diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/error.py --- a/idavoll/error.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/error.py Mon Aug 04 13:47:10 2008 +0000 @@ -8,10 +8,12 @@ return self.msg + class NodeNotFound(Error): pass + class NodeExists(Error): pass @@ -35,30 +37,37 @@ pass + class ItemForbidden(Error): pass + class ItemRequired(Error): pass + class NoInstantNodes(Error): pass + class InvalidConfigurationOption(Error): msg = 'Invalid configuration option' + class InvalidConfigurationValue(Error): msg = 'Bad configuration value' + class NodeNotPersistent(Error): pass + class NoRootNode(Error): pass @@ -68,3 +77,15 @@ """ There are no callbacks for this node. """ + + + +class NoCollections(Error): + pass + + + +class NoPublishing(Error): + """ + This node does not support publishing. + """ diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/gateway.py --- a/idavoll/gateway.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/gateway.py Mon Aug 04 13:47:10 2008 +0000 @@ -66,7 +66,7 @@ raise XMPPURIParseError("Empty URI path component") try: - jid = JID(entity) + service = JID(entity) except Exception, e: raise XMPPURIParseError("Invalid JID: %s" % e.message) @@ -75,9 +75,17 @@ try: nodeIdentifier = params['node'][0] except (KeyError, ValueError): - raise XMPPURIParseError("No node in query component of URI") + nodeIdentifier = '' + + return service, nodeIdentifier + + - return jid, nodeIdentifier +def getXMPPURI(service, nodeIdentifier): + """ + Construct an XMPP URI from a service JID and node identifier. + """ + return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') @@ -134,10 +142,11 @@ """ def toResponse(nodeIdentifier): - uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier) + uri = getXMPPURI(self.serviceJID, nodeIdentifier) stream = simplejson.dumps({'uri': uri}) - return http.Response(responsecode.OK, stream=stream) - + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) d = self.backend.createNode(None, self.owner) d.addCallback(toResponse) return d @@ -237,9 +246,11 @@ """ def toResponse(nodeIdentifier): - uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier) + uri = getXMPPURI(self.serviceJID, nodeIdentifier) stream = simplejson.dumps({'uri': uri}) - return http.Response(responsecode.OK, stream=stream) + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) def gotNode(nodeIdentifier, payload): item = Item(id='current', payload=payload) @@ -286,8 +297,10 @@ def render(self, request): def responseFromNodes(nodeIdentifiers): - return http.Response(responsecode.OK, - stream=simplejson.dumps(nodeIdentifiers)) + stream = simplejson.dumps(nodeIdentifiers) + contentType = http_headers.MimeType.fromString(MIME_JSON) + return http.Response(responsecode.OK, stream=stream, + headers={'Content-Type': contentType}) d = self.service.getNodes() d.addCallback(responseFromNodes) @@ -328,7 +341,7 @@ def constructFeed(service, nodeIdentifier, entries, title): - nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) + nodeURI = getXMPPURI(service, nodeIdentifier) now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) # Collect the received entries in a feed @@ -428,6 +441,9 @@ """ atomEntries = extractAtomEntries(event.items) + service = event.sender + nodeIdentifier = event.nodeIdentifier + headers = event.headers # Don't notify if there are no atom entries if not atomEntries: @@ -438,12 +454,16 @@ payload = atomEntries[0] else: contentType = 'application/atom+xml;type=feed' - payload = constructFeed(event.sender, event.nodeIdentifier, - atomEntries, + payload = constructFeed(service, nodeIdentifier, atomEntries, title='Received item collection') - self.callCallbacks(event.sender, event.nodeIdentifier, payload, - contentType) + self.callCallbacks(service, nodeIdentifier, payload, contentType) + + if 'Collection' in headers: + for collection in headers['Collection']: + nodeIdentifier = collection or '' + self.callCallbacks(service, nodeIdentifier, payload, + contentType) def deleteReceived(self, event): @@ -451,8 +471,9 @@ Fire up HTTP client to do callback """ - self.callCallbacks(event.sender, event.nodeIdentifier, - eventType='DELETED') + service = event.sender + nodeIdentifier = event.nodeIdentifier + self.callCallbacks(service, nodeIdentifier, eventType='DELETED') def _postTo(self, callbacks, service, nodeIdentifier, @@ -462,7 +483,7 @@ return postdata = None - nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) + nodeURI = getXMPPURI(service, nodeIdentifier) headers = {'Referer': nodeURI.encode('utf-8'), 'PubSub-Service': service.full().encode('utf-8')} @@ -491,9 +512,7 @@ def eb(failure): failure.trap(error.NoCallbacks) - # No callbacks were registered for this node. Unsubscribe. - d = self.unsubscribe(service, nodeIdentifier, self.jid) - return d + # No callbacks were registered for this node. Unsubscribe? d = self.storage.getCallbacks(service, nodeIdentifier) d.addCallback(self._postTo, service, nodeIdentifier, payload, @@ -524,6 +543,8 @@ (responsecode.FORBIDDEN, "Node not found"), error.NotSubscribed: (responsecode.FORBIDDEN, "No such subscription found"), + error.SubscriptionExists: + (responsecode.FORBIDDEN, "Subscription already exists"), } def __init__(self, service): @@ -734,6 +755,13 @@ pass + def ping(self): + f = getPageWithFactory(self._makeURI(''), + method='HEAD', + agent=self.agent) + return f.deferred + + def create(self): f = getPageWithFactory(self._makeURI('create'), method='POST', diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/iidavoll.py --- a/idavoll/iidavoll.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/iidavoll.py Mon Aug 04 13:47:10 2008 +0000 @@ -13,7 +13,7 @@ def __init__(storage): """ - @param storage: L{storage} object. + @param storage: Object providing L{IStorage}. """ @@ -164,9 +164,38 @@ """ Register callback which is called for notification. """ - def getNotificationList(nodeIdentifier, items): + def getNotifications(nodeIdentifier, items): """ - Get list of entities to notify. + Get notification list. + + This method is called to discover which entities should receive + notifications for the given items that have just been published to the + given node. + + The notification list contains tuples (subscriber, subscriptions, + items) to result in one notification per tuple: the given subscriptions + yielded the given items to be notified to this subscriber. This + structure is needed allow for letting the subscriber know which + subscriptions yielded which notifications, while catering for + collection nodes and content-based subscriptions. + + To minimize the amount of notifications per entity, implementers + should take care that if all items in C{items} were yielded + by the same set of subscriptions, exactly one tuple is for this + subscriber is returned, so that the subscriber would get exactly one + notification. Alternatively, one tuple per subscription combination. + + @param nodeIdentifier: The identifier of the node the items were + published to. + @type nodeIdentifier: C{unicode}. + @param items: The list of published items as + L{Element}s. + @type items: C{list} + @return: The notification list as tuples of + (L{JID}, + C{list} of L{Subscription}, + C{list} of L{Element}. + @rtype: C{list} """ @@ -197,8 +226,8 @@ Get Node. @param nodeIdentifier: NodeID of the desired node. - @type nodeIdentifier: L{str} - @return: deferred that returns a L{Node} object. + @type nodeIdentifier: C{str} + @return: deferred that returns a L{INode} providing object. """ @@ -206,22 +235,26 @@ """ Return all NodeIDs. - @return: deferred that returns a list of NodeIDs (L{str}). + @return: deferred that returns a list of NodeIDs (C{unicode}). """ - def createNode(nodeIdentifier, owner, config=None): + def createNode(nodeIdentifier, owner, config): """ Create new node. The implementation should make sure, the passed owner JID is stripped - of the resource (e.g. using C{owner.userhostJID()}). + of the resource (e.g. using C{owner.userhostJID()}). The passed config + is expected to have values for the fields returned by + L{getDefaultConfiguration}, as well as a value for + C{'pubsub#node_type'}. @param nodeIdentifier: NodeID of the new node. - @type nodeIdentifier: L{str} + @type nodeIdentifier: C{unicode} @param owner: JID of the new nodes's owner. - @type owner: L{jid.JID} - @param config: Configuration + @type owner: L{JID} + @param config: Node configuration. + @type config: C{dict} @return: deferred that fires on creation. """ @@ -231,7 +264,7 @@ Delete a node. @param nodeIdentifier: NodeID of the new node. - @type nodeIdentifier: L{str} + @type nodeIdentifier: C{unicode} @return: deferred that fires on deletion. """ @@ -244,11 +277,11 @@ of the resource (e.g. using C{owner.userhostJID()}). @param entity: JID of the entity. - @type entity: L{jid.JID} - @return: deferred that returns a L{list} of tuples of the form + @type entity: L{JID} + @return: deferred that returns a C{list} of tuples of the form C{(nodeIdentifier, affiliation)}, where C{nodeIdentifier} is - of the type L{str} and C{affiliation} is one of C{'owner'}, - C{'publisher'} and C{'outcast'}. + of the type L{unicode} and C{affiliation} is one of + C{'owner'}, C{'publisher'} and C{'outcast'}. """ @@ -260,12 +293,26 @@ of the resource (e.g. using C{owner.userhostJID()}). @param entity: JID of the entity. - @type entity: L{jid.JID} - @return: deferred that returns a L{list} of tuples of the form + @type entity: L{JID} + @return: deferred that returns a C{list} of tuples of the form C{(nodeIdentifier, subscriber, state)}, where - C{nodeIdentifier} is of the type L{str}, C{subscriber} of the - type {jid.JID}, and C{state} is C{'subscribed'} or - C{'pending'}. + C{nodeIdentifier} is of the type C{unicode}, C{subscriber} of + the type J{JID}, and + C{state} is C{'subscribed'}, C{'pending'} or + C{'unconfigured'}. + """ + + + def getDefaultConfiguration(nodeType): + """ + Get the default configuration for the given node type. + + @param nodeType: Either C{'leaf'} or C{'collection'}. + @type nodeType: C{str} + @return: The default configuration. + @rtype: C{dict}. + @raises: L{idavoll.error.NoCollections} if collections are not + supported. """ @@ -295,7 +342,7 @@ The configuration must at least have two options: C{pubsub#persist_items}, and C{pubsub#deliver_payloads}. - @return: L{dict} of configuration options. + @return: C{dict} of configuration options. """ @@ -306,7 +353,7 @@ The meta data must be a superset of the configuration options, and also at least should have a C{pubsub#node_type} entry. - @return: L{dict} of meta data. + @return: C{dict} of meta data. """ @@ -328,7 +375,7 @@ Get affiliation of entity with this node. @param entity: JID of entity. - @type entity: L{jid.JID} + @type entity: L{JID} @return: deferred that returns C{'owner'}, C{'publisher'}, C{'outcast'} or C{None}. """ @@ -339,20 +386,37 @@ Get subscription to this node of subscriber. @param subscriber: JID of the new subscriptions' entity. - @type subscriber: L{jid.JID} + @type subscriber: L{JID} @return: deferred that returns the subscription state (C{'subscribed'}, C{'pending'} or C{None}). """ - def addSubscription(subscriber, state): + def getSubscriptions(state=None): + """ + Get list of subscriptions to this node. + + The optional C{state} argument filters the subscriptions to their + state. + + @param state: Subscription state filter. One of C{'subscribed'}, + C{'pending'}, C{'unconfigured'}. + @type state: C{str} + @return: a deferred that returns a C{list} of + L{wokkel.pubsub.Subscription}s. + """ + + + def addSubscription(subscriber, state, config): """ Add new subscription to this node with given state. @param subscriber: JID of the new subscriptions' entity. - @type subscriber: L{jid.JID} + @type subscriber: L{JID} @param state: C{'subscribed'} or C{'pending'} - @type state: L{str} + @type state: C{str} + @param config: Subscription configuration. + @param config: C{dict} @return: deferred that fires on subscription. """ @@ -362,22 +426,11 @@ Remove subscription to this node. @param subscriber: JID of the subscriptions' entity. - @type subscriber: L{jid.JID} + @type subscriber: L{JID} @return: deferred that fires on removal. """ - def getSubscribers(): - """ - Get list of subscribers to this node. - - Retrieves the list of entities that have a subscription to this - node. That is, having the state C{'subscribed'}. - - @return: a deferred that returns a L{list} of L{jid.JID}s. - """ - - def isSubscribed(entity): """ Returns whether entity has any subscription to this node. @@ -386,8 +439,8 @@ C{'subscribed'} for any subscription that matches the bare JID. @param subscriber: bare JID of the subscriptions' entity. - @type subscriber: L{jid.JID} - @return: deferred that returns a L{bool}. + @type subscriber: L{JID} + @return: deferred that returns a C{bool}. """ @@ -395,8 +448,9 @@ """ Get affiliations of entities with this node. - @return: deferred that returns a L{list} of tuples (jid, affiliation), - where jid is a L(jid.JID) and affiliation is one of C{'owner'}, + @return: deferred that returns a C{list} of tuples (jid, affiliation), + where jid is a L(JID) + and affiliation is one of C{'owner'}, C{'publisher'}, C{'outcast'}. """ @@ -415,9 +469,9 @@ L{domish} representation of the XML fragment as defined for C{} in the C{http://jabber.org/protocol/pubsub} namespace. - @type items: L{list} of {domish.Element} + @type items: C{list} of {domish.Element} @param publisher: JID of the publishing entity. - @type publisher: L{jid.JID} + @type publisher: L{JID} @return: deferred that fires upon success. """ @@ -426,8 +480,8 @@ """ Remove items by id. - @param itemIdentifiers: L{list} of item ids. - @return: deferred that fires with a L{list} of ids of the items that + @param itemIdentifiers: C{list} of item ids. + @return: deferred that fires with a C{list} of ids of the items that were deleted """ @@ -443,7 +497,7 @@ @param maxItems: if given, a natural number (>0) that limits the returned number of items. - @return: deferred that fires with a L{list} of found items. + @return: deferred that fires with a C{list} of found items. """ @@ -455,8 +509,8 @@ represent the XML of the item as it was published, including the item wrapper with item id. - @param itemIdentifiers: L{list} of item ids. - @return: deferred that fires with a L{list} of found items. + @param itemIdentifiers: C{list} of item ids. + @return: deferred that fires with a C{list} of found items. """ diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/memory_storage.py --- a/idavoll/memory_storage.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/memory_storage.py Mon Aug 04 13:47:10 2008 +0000 @@ -6,19 +6,30 @@ from twisted.internet import defer from twisted.words.protocols.jabber import jid -from idavoll import error, iidavoll +from wokkel.pubsub import Subscription -defaultConfig = {"pubsub#persist_items": True, - "pubsub#deliver_payloads": True, - "pubsub#send_last_published_item": 'on_sub', - "pubsub#node_type": "leaf"} +from idavoll import error, iidavoll class Storage: implements(iidavoll.IStorage) + defaultConfig = { + 'leaf': { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + }, + 'collection': { + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } + } + def __init__(self): - self._nodes = {} + rootNode = CollectionNode('', jid.JID('localhost'), + copy.copy(self.defaultConfig['collection'])) + self._nodes = {'': rootNode} def getNode(self, nodeIdentifier): @@ -34,15 +45,12 @@ return defer.succeed(self._nodes.keys()) - def createNode(self, nodeIdentifier, owner, config=None): + def createNode(self, nodeIdentifier, owner, config): if nodeIdentifier in self._nodes: return defer.fail(error.NodeExists()) - if not config: - config = copy.copy(defaultConfig) - if config['pubsub#node_type'] != 'leaf': - raise NotImplementedError + raise error.NoCollections() node = LeafNode(nodeIdentifier, owner, config) self._nodes[nodeIdentifier] = node @@ -72,12 +80,17 @@ for subscriber, subscription in node._subscriptions.iteritems(): subscriber = jid.internJID(subscriber) if subscriber.userhostJID() == entity.userhostJID(): - subscriptions.append((node.nodeIdentifier, subscriber, - subscription.state)) + subscriptions.append(subscription) return defer.succeed(subscriptions) + def getDefaultConfiguration(self, nodeType): + if nodeType == 'collection': + raise error.NoCollections() + + return self.defaultConfig[nodeType] + class Node: @@ -120,17 +133,25 @@ try: subscription = self._subscriptions[subscriber.full()] except KeyError: - state = None + return defer.succeed(None) else: - state = subscription.state - return defer.succeed(state) + return defer.succeed(subscription) - def addSubscription(self, subscriber, state): + def getSubscriptions(self, state=None): + return defer.succeed( + [subscription + for subscription in self._subscriptions.itervalues() + if state is None or subscription.state == state]) + + + + def addSubscription(self, subscriber, state, options): if self._subscriptions.get(subscriber.full()): return defer.fail(error.SubscriptionExists()) - subscription = Subscription(state) + subscription = Subscription(self.nodeIdentifier, subscriber, state, + options) self._subscriptions[subscriber.full()] = subscription return defer.succeed(None) @@ -144,14 +165,6 @@ return defer.succeed(None) - def getSubscribers(self): - subscribers = [jid.internJID(subscriber) for subscriber, subscription - in self._subscriptions.iteritems() - if subscription.state == 'subscribed'] - - return defer.succeed(subscribers) - - def isSubscribed(self, entity): for subscriber, subscription in self._subscriptions.iteritems(): if jid.internJID(subscriber).userhost() == entity.userhost() and \ @@ -187,11 +200,14 @@ -class LeafNodeMixin: +class LeafNode(Node): + + implements(iidavoll.ILeafNode) nodeType = 'leaf' - def __init__(self): + def __init__(self, nodeIdentifier, owner, config): + Node.__init__(self, nodeIdentifier, owner, config) self._items = {} self._itemlist = [] @@ -251,21 +267,8 @@ return defer.succeed(None) - -class LeafNode(Node, LeafNodeMixin): - - implements(iidavoll.ILeafNode) - - def __init__(self, nodeIdentifier, owner, config): - Node.__init__(self, nodeIdentifier, owner, config) - LeafNodeMixin.__init__(self) - - - -class Subscription: - - def __init__(self, state): - self.state = state +class CollectionNode(Node): + nodeType = 'collection' diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/pgsql_storage.py --- a/idavoll/pgsql_storage.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/pgsql_storage.py Mon Aug 04 13:47:10 2008 +0000 @@ -4,8 +4,12 @@ import copy from zope.interface import implements + +from twisted.enterprise import adbapi from twisted.words.protocols.jabber import jid -from wokkel.generic import parseXml + +from wokkel.generic import parseXml, stripNamespace +from wokkel.pubsub import Subscription from idavoll import error, iidavoll @@ -13,6 +17,17 @@ implements(iidavoll.IStorage) + defaultConfig = { + 'leaf': { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + }, + 'collection': { + "pubsub#deliver_payloads": True, + "pubsub#send_last_published_item": 'on_sub', + } + } def __init__(self, dbpool): self.dbpool = dbpool @@ -24,22 +39,36 @@ def _getNode(self, cursor, nodeIdentifier): configuration = {} - cursor.execute("""SELECT persistent, deliver_payload, + cursor.execute("""SELECT node_type, + persist_items, + deliver_payloads, send_last_published_item FROM nodes WHERE node=%s""", (nodeIdentifier,)) - try: - (configuration["pubsub#persist_items"], - configuration["pubsub#deliver_payloads"], - configuration["pubsub#send_last_published_item"]) = \ - cursor.fetchone() - except TypeError: + row = cursor.fetchone() + + if not row: raise error.NodeNotFound() - else: + + if row.node_type == 'leaf': + configuration = { + 'pubsub#persist_items': row.persist_items, + 'pubsub#deliver_payloads': row.deliver_payloads, + 'pubsub#send_last_published_item': + row.send_last_published_item} node = LeafNode(nodeIdentifier, configuration) node.dbpool = self.dbpool return node + elif row.node_type == 'collection': + configuration = { + 'pubsub#deliver_payloads': row.deliver_payloads, + 'pubsub#send_last_published_item': + row.send_last_published_item} + node = CollectionNode(nodeIdentifier, configuration) + node.dbpool = self.dbpool + return node + def getNodeIds(self): @@ -48,16 +77,27 @@ return d - def createNode(self, nodeIdentifier, owner, config=None): + def createNode(self, nodeIdentifier, owner, config): return self.dbpool.runInteraction(self._createNode, nodeIdentifier, - owner) + owner, config) - def _createNode(self, cursor, nodeIdentifier, owner): + def _createNode(self, cursor, nodeIdentifier, owner, config): + if config['pubsub#node_type'] != 'leaf': + raise error.NoCollections() + owner = owner.userhost() try: - cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", - (nodeIdentifier)) + cursor.execute("""INSERT INTO nodes + (node, node_type, persist_items, + deliver_payloads, send_last_published_item) + VALUES + (%s, 'leaf', %s, %s, %s)""", + (nodeIdentifier, + config['pubsub#persist_items'], + config['pubsub#deliver_payloads'], + config['pubsub#send_last_published_item']) + ) except cursor._pool.dbapi.OperationalError: raise error.NodeExists() @@ -70,10 +110,11 @@ cursor.execute("""INSERT INTO affiliations (node_id, entity_id, affiliation) - SELECT n.id, e.id, 'owner' FROM - (SELECT id FROM nodes WHERE node=%s) AS n + SELECT node_id, entity_id, 'owner' FROM + (SELECT node_id FROM nodes WHERE node=%s) as n CROSS JOIN - (SELECT id FROM entities WHERE jid=%s) AS e""", + (SELECT entity_id FROM entities + WHERE jid=%s) as e""", (nodeIdentifier, owner)) @@ -91,10 +132,8 @@ def getAffiliations(self, entity): d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities - JOIN affiliations ON - (affiliations.entity_id=entities.id) - JOIN nodes ON - (nodes.id=affiliations.node_id) + NATURAL JOIN affiliations + NATURAL JOIN nodes WHERE jid=%s""", (entity.userhost(),)) d.addCallback(lambda results: [tuple(r) for r in results]) @@ -102,22 +141,27 @@ def getSubscriptions(self, entity): - d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription - FROM entities JOIN subscriptions ON - (subscriptions.entity_id=entities.id) - JOIN nodes ON - (nodes.id=subscriptions.node_id) + def toSubscriptions(rows): + subscriptions = [] + for row in rows: + subscriber = jid.internJID('%s/%s' % (row.jid, + row.resource)) + subscription = Subscription(row.node, subscriber, row.state) + subscriptions.append(subscription) + return subscriptions + + d = self.dbpool.runQuery("""SELECT node, jid, resource, state + FROM entities + NATURAL JOIN subscriptions + NATURAL JOIN nodes WHERE jid=%s""", (entity.userhost(),)) - d.addCallback(self._convertSubscriptionJIDs) + d.addCallback(toSubscriptions) return d - def _convertSubscriptionJIDs(self, subscriptions): - return [(node, - jid.internJID('%s/%s' % (subscriber, resource)), - subscription) - for node, subscriber, resource, subscription in subscriptions] + def getDefaultConfiguration(self, nodeType): + return self.defaultConfig[nodeType] @@ -131,7 +175,7 @@ def _checkNodeExists(self, cursor): - cursor.execute("""SELECT id FROM nodes WHERE node=%s""", + cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (self.nodeIdentifier)) if not cursor.fetchone(): raise error.NodeNotFound() @@ -159,7 +203,8 @@ def _setConfiguration(self, cursor, config): self._checkNodeExists(cursor) - cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s, + cursor.execute("""UPDATE nodes SET persist_items=%s, + deliver_payloads=%s, send_last_published_item=%s WHERE node=%s""", (config["pubsub#persist_items"], @@ -185,8 +230,8 @@ def _getAffiliation(self, cursor, entity): self._checkNodeExists(cursor) cursor.execute("""SELECT affiliation FROM affiliations - JOIN nodes ON (node_id=nodes.id) - JOIN entities ON (entity_id=entities.id) + NATURAL JOIN nodes + NATURAL JOIN entities WHERE node=%s AND jid=%s""", (self.nodeIdentifier, entity.userhost())) @@ -207,31 +252,72 @@ userhost = subscriber.userhost() resource = subscriber.resource or '' - cursor.execute("""SELECT subscription FROM subscriptions - JOIN nodes ON (nodes.id=subscriptions.node_id) - JOIN entities ON - (entities.id=subscriptions.entity_id) + cursor.execute("""SELECT state FROM subscriptions + NATURAL JOIN nodes + NATURAL JOIN entities WHERE node=%s AND jid=%s AND resource=%s""", (self.nodeIdentifier, userhost, resource)) - try: - return cursor.fetchone()[0] - except TypeError: + row = cursor.fetchone() + if not row: return None + else: + return Subscription(self.nodeIdentifier, subscriber, row.state) + + + def getSubscriptions(self, state=None): + return self.dbpool.runInteraction(self._getSubscriptions, state) - def addSubscription(self, subscriber, state): - return self.dbpool.runInteraction(self._addSubscription, subscriber, - state) + def _getSubscriptions(self, cursor, state): + self._checkNodeExists(cursor) + + query = """SELECT jid, resource, state, + subscription_type, subscription_depth + FROM subscriptions + NATURAL JOIN nodes + NATURAL JOIN entities + WHERE node=%s"""; + values = [self.nodeIdentifier] + + if state: + query += " AND state=%s" + values.append(state) + + cursor.execute(query, values); + rows = cursor.fetchall() + + subscriptions = [] + for row in rows: + subscriber = jid.JID('%s/%s' % (row.jid, row.resource)) + + options = {} + if row.subscription_type: + options['pubsub#subscription_type'] = row.subscription_type; + if row.subscription_depth: + options['pubsub#subscription_depth'] = row.subscription_depth; + + subscriptions.append(Subscription(self.nodeIdentifier, subscriber, + row.state, options)) + + return subscriptions - def _addSubscription(self, cursor, subscriber, state): + def addSubscription(self, subscriber, state, config): + return self.dbpool.runInteraction(self._addSubscription, subscriber, + state, config) + + + def _addSubscription(self, cursor, subscriber, state, config): self._checkNodeExists(cursor) userhost = subscriber.userhost() resource = subscriber.resource or '' + subscription_type = config.get('pubsub#subscription_type') + subscription_depth = config.get('pubsub#subscription_depth') + try: cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", (userhost)) @@ -240,13 +326,18 @@ try: cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, subscription) - SELECT n.id, e.id, %s, %s FROM - (SELECT id FROM nodes WHERE node=%s) AS n + (node_id, entity_id, resource, state, + subscription_type, subscription_depth) + SELECT node_id, entity_id, %s, %s, %s, %s FROM + (SELECT node_id FROM nodes + WHERE node=%s) as n CROSS JOIN - (SELECT id FROM entities WHERE jid=%s) AS e""", + (SELECT entity_id FROM entities + WHERE jid=%s) as e""", (resource, state, + subscription_type, + subscription_depth, self.nodeIdentifier, userhost)) except cursor._pool.dbapi.OperationalError: @@ -265,9 +356,11 @@ resource = subscriber.resource or '' cursor.execute("""DELETE FROM subscriptions WHERE - node_id=(SELECT id FROM nodes WHERE node=%s) AND - entity_id=(SELECT id FROM entities WHERE jid=%s) - AND resource=%s""", + node_id=(SELECT node_id FROM nodes + WHERE node=%s) AND + entity_id=(SELECT entity_id FROM entities + WHERE jid=%s) AND + resource=%s""", (self.nodeIdentifier, userhost, resource)) @@ -277,27 +370,6 @@ return None - def getSubscribers(self): - d = self.dbpool.runInteraction(self._getSubscribers) - d.addCallback(self._convertToJIDs) - return d - - - def _getSubscribers(self, cursor): - self._checkNodeExists(cursor) - cursor.execute("""SELECT jid, resource FROM subscriptions - JOIN nodes ON (node_id=nodes.id) - JOIN entities ON (entity_id=entities.id) - WHERE node=%s AND - subscription='subscribed'""", - (self.nodeIdentifier,)) - return cursor.fetchall() - - - def _convertToJIDs(self, list): - return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] - - def isSubscribed(self, entity): return self.dbpool.runInteraction(self._isSubscribed, entity) @@ -306,12 +378,10 @@ self._checkNodeExists(cursor) cursor.execute("""SELECT 1 FROM entities - JOIN subscriptions ON - (entities.id=subscriptions.entity_id) - JOIN nodes ON - (nodes.id=subscriptions.node_id) + NATURAL JOIN subscriptions + NATURAL JOIN nodes WHERE entities.jid=%s - AND node=%s AND subscription='subscribed'""", + AND node=%s AND state='subscribed'""", (entity.userhost(), self.nodeIdentifier)) @@ -326,10 +396,8 @@ self._checkNodeExists(cursor) cursor.execute("""SELECT jid, affiliation FROM nodes - JOIN affiliations ON - (nodes.id = affiliations.node_id) - JOIN entities ON - (affiliations.entity_id = entities.id) + NATURAL JOIN affiliations + NATURAL JOIN entities WHERE node=%s""", self.nodeIdentifier) result = cursor.fetchall() @@ -338,7 +406,9 @@ -class LeafNodeMixin: +class LeafNode(Node): + + implements(iidavoll.ILeafNode) nodeType = 'leaf' @@ -356,7 +426,7 @@ data = item.toXml() cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s FROM nodes - WHERE nodes.id = items.node_id AND + WHERE nodes.node_id = items.node_id AND nodes.node = %s and items.item=%s""", (publisher.full(), data, @@ -366,7 +436,8 @@ return cursor.execute("""INSERT INTO items (node_id, item, publisher, data) - SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", + SELECT node_id, %s, %s, %s FROM nodes + WHERE node=%s""", (item["id"], publisher.full(), data, @@ -384,7 +455,8 @@ for itemIdentifier in itemIdentifiers: cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT id FROM nodes WHERE node=%s) AND + node_id=(SELECT node_id FROM nodes + WHERE node=%s) AND item=%s""", (self.nodeIdentifier, itemIdentifier)) @@ -401,8 +473,8 @@ def _getItems(self, cursor, maxItems): self._checkNodeExists(cursor) - query = """SELECT data FROM nodes JOIN items ON - (nodes.id=items.node_id) + query = """SELECT data FROM nodes + NATURAL JOIN items WHERE node=%s ORDER BY date DESC""" if maxItems: cursor.execute(query + " LIMIT %s", @@ -412,7 +484,8 @@ cursor.execute(query, (self.nodeIdentifier)) result = cursor.fetchall() - return [parseXml(r[0]) for r in result] + items = [stripNamespace(parseXml(r[0])) for r in result] + return items def getItemsById(self, itemIdentifiers): @@ -423,8 +496,8 @@ self._checkNodeExists(cursor) items = [] for itemIdentifier in itemIdentifiers: - cursor.execute("""SELECT data FROM nodes JOIN items ON - (nodes.id=items.node_id) + cursor.execute("""SELECT data FROM nodes + NATURAL JOIN items WHERE node=%s AND item=%s""", (self.nodeIdentifier, itemIdentifier)) @@ -442,14 +515,13 @@ self._checkNodeExists(cursor) cursor.execute("""DELETE FROM items WHERE - node_id=(SELECT id FROM nodes WHERE node=%s)""", + node_id=(SELECT node_id FROM nodes WHERE node=%s)""", (self.nodeIdentifier,)) +class CollectionNode(Node): -class LeafNode(Node, LeafNodeMixin): - - implements(iidavoll.ILeafNode) + nodeType = 'collection' @@ -482,7 +554,7 @@ nodeIdentifier, callback) if cursor.fetchall(): - raise error.SubscriptionExists() + return cursor.execute("""INSERT INTO callbacks (service, node, uri) VALUES diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/tap_http.py --- a/idavoll/tap_http.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/tap_http.py Mon Aug 04 13:47:10 2008 +0000 @@ -1,10 +1,11 @@ # Copyright (c) 2003-2008 Ralph Meijer # See LICENSE for details. -from twisted.application import internet, strports +from twisted.application import internet, service, strports from twisted.conch import manhole, manhole_ssh from twisted.cred import portal, checkers -from twisted.web2 import channel, resource, server +from twisted.web2 import channel, log, resource, server +from twisted.web2.tap import Web2Service from idavoll import gateway, tap from idavoll.gateway import RemoteSubscriptionService @@ -15,6 +16,7 @@ ] + def getManholeFactory(namespace, **passwords): def getManHole(_): return manhole.Manhole(namespace) @@ -28,6 +30,7 @@ return f + def makeService(config): s = tap.makeService(config) @@ -67,13 +70,22 @@ site = server.Site(root) w = internet.TCPServer(int(config['webport']), channel.HTTPFactory(site)) + + if config["verbose"]: + logObserver = log.DefaultCommonAccessLoggingObserver() + w2s = Web2Service(logObserver) + w.setServiceParent(w2s) + w = w2s + w.setServiceParent(s) # Set up a manhole namespace = {'service': s, 'component': cs, - 'backend': bs} + 'backend': bs, + 'root': root} + f = getManholeFactory(namespace, admin='admin') manholeService = strports.service('2222', f) manholeService.setServiceParent(s) diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/test/test_backend.py --- a/idavoll/test/test_backend.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/test/test_backend.py Mon Aug 04 13:47:10 2008 +0000 @@ -5,6 +5,9 @@ Tests for L{idavoll.backend}. """ +from zope.interface import implements +from zope.interface.verify import verifyObject + from twisted.internet import defer from twisted.trial import unittest from twisted.words.protocols.jabber import jid @@ -12,22 +15,28 @@ from wokkel import pubsub -from idavoll import backend, error +from idavoll import backend, error, iidavoll OWNER = jid.JID('owner@example.com') NS_PUBSUB = 'http://jabber.org/protocol/pubsub' class BackendTest(unittest.TestCase): + + def test_interfaceIBackend(self): + self.assertTrue(verifyObject(iidavoll.IBackendService, + backend.BackendService(None))) + + def test_deleteNode(self): - class testNode: + class TestNode: nodeIdentifier = 'to-be-deleted' def getAffiliation(self, entity): if entity is OWNER: return defer.succeed('owner') - class testStorage: + class TestStorage: def getNode(self, nodeIdentifier): - return defer.succeed(testNode()) + return defer.succeed(TestNode()) def deleteNode(self, nodeIdentifier): if nodeIdentifier in ['to-be-deleted']: @@ -44,7 +53,7 @@ self.assertTrue(self.preDeleteCalled) self.assertTrue(self.storage.deleteCalled) - self.storage = testStorage() + self.storage = TestStorage() self.backend = backend.BackendService(self.storage) self.storage.backend = self.backend @@ -61,12 +70,15 @@ """ Test creation of a node without a given node identifier. """ - class testStorage: - def createNode(self, nodeIdentifier, requestor): + class TestStorage: + def getDefaultConfiguration(self, nodeType): + return {} + + def createNode(self, nodeIdentifier, requestor, config): self.nodeIdentifier = nodeIdentifier return defer.succeed(None) - self.storage = testStorage() + self.storage = TestStorage() self.backend = backend.BackendService(self.storage) self.storage.backend = self.backend @@ -78,6 +90,112 @@ d.addCallback(checkID) return d + class NodeStore: + """ + I just store nodes to pose as an L{IStorage} implementation. + """ + def __init__(self, nodes): + self.nodes = nodes + + def getNode(self, nodeIdentifier): + try: + return defer.succeed(self.nodes[nodeIdentifier]) + except KeyError: + return defer.fail(error.NodeNotFound()) + + + def test_getNotifications(self): + """ + Ensure subscribers show up in the notification list. + """ + item = pubsub.Item() + sub = pubsub.Subscription('test', OWNER, 'subscribed') + + class TestNode: + def getSubscriptions(self, state=None): + return [sub] + + def cb(result): + self.assertEquals(1, len(result)) + subscriber, subscriptions, items = result[-1] + + self.assertEquals(OWNER, subscriber) + self.assertEquals(set([sub]), subscriptions) + self.assertEquals([item], items) + + self.storage = self.NodeStore({'test': TestNode()}) + self.backend = backend.BackendService(self.storage) + d = self.backend.getNotifications('test', [item]) + d.addCallback(cb) + return d + + def test_getNotificationsRoot(self): + """ + Ensure subscribers to the root node show up in the notification list + for leaf nodes. + + This assumes a flat node relationship model with exactly one collection + node: the root node. Each leaf node is automatically a child node + of the root node. + """ + item = pubsub.Item() + subRoot = pubsub.Subscription('', OWNER, 'subscribed') + + class TestNode: + def getSubscriptions(self, state=None): + return [] + + class TestRootNode: + def getSubscriptions(self, state=None): + return [subRoot] + + def cb(result): + self.assertEquals(1, len(result)) + subscriber, subscriptions, items = result[-1] + self.assertEquals(OWNER, subscriber) + self.assertEquals(set([subRoot]), subscriptions) + self.assertEquals([item], items) + + self.storage = self.NodeStore({'test': TestNode(), + '': TestRootNode()}) + self.backend = backend.BackendService(self.storage) + d = self.backend.getNotifications('test', [item]) + d.addCallback(cb) + return d + + + def test_getNotificationsMultipleNodes(self): + """ + Ensure that entities that subscribe to a leaf node as well as the + root node get exactly one notification. + """ + item = pubsub.Item() + sub = pubsub.Subscription('test', OWNER, 'subscribed') + subRoot = pubsub.Subscription('', OWNER, 'subscribed') + + class TestNode: + def getSubscriptions(self, state=None): + return [sub] + + class TestRootNode: + def getSubscriptions(self, state=None): + return [subRoot] + + def cb(result): + self.assertEquals(1, len(result)) + subscriber, subscriptions, items = result[-1] + + self.assertEquals(OWNER, subscriber) + self.assertEquals(set([sub, subRoot]), subscriptions) + self.assertEquals([item], items) + + self.storage = self.NodeStore({'test': TestNode(), + '': TestRootNode()}) + self.backend = backend.BackendService(self.storage) + d = self.backend.getNotifications('test', [item]) + d.addCallback(cb) + return d + def test_getDefaultConfiguration(self): """ @@ -85,12 +203,18 @@ a deferred that fires a dictionary with configuration values. """ + class TestStorage: + def getDefaultConfiguration(self, nodeType): + return { + "pubsub#persist_items": True, + "pubsub#deliver_payloads": True} + def cb(options): self.assertIn("pubsub#persist_items", options) self.assertEqual(True, options["pubsub#persist_items"]) - self.backend = backend.BackendService(None) - d = self.backend.getDefaultConfiguration() + self.backend = backend.BackendService(TestStorage()) + d = self.backend.getDefaultConfiguration('leaf') d.addCallback(cb) return d @@ -164,7 +288,8 @@ """ Test publish request with an item without a node identifier. """ - class testNode: + class TestNode: + nodeType = 'leaf' nodeIdentifier = 'node' def getAffiliation(self, entity): if entity is OWNER: @@ -173,14 +298,14 @@ return {'pubsub#deliver_payloads': True, 'pubsub#persist_items': False} - class testStorage: + class TestStorage: def getNode(self, nodeIdentifier): - return defer.succeed(testNode()) + return defer.succeed(TestNode()) def checkID(notification): self.assertNotIdentical(None, notification['items'][0]['id']) - self.storage = testStorage() + self.storage = TestStorage() self.backend = backend.BackendService(self.storage) self.storage.backend = self.backend @@ -197,8 +322,10 @@ """ ITEM = "" % NS_PUBSUB - class testNode: + class TestNode: + implements(iidavoll.ILeafNode) nodeIdentifier = 'node' + nodeType = 'leaf' def getAffiliation(self, entity): if entity is OWNER: return defer.succeed('owner') @@ -208,19 +335,23 @@ 'pubsub#send_last_published_item': 'on_sub'} def getItems(self, maxItems): return [ITEM] - def addSubscription(self, subscriber, state): + def addSubscription(self, subscriber, state, options): + self.subscription = pubsub.Subscription('node', subscriber, + state, options) return defer.succeed(None) + def getSubscription(self, subscriber): + return defer.succeed(self.subscription) - class testStorage: + class TestStorage: def getNode(self, nodeIdentifier): - return defer.succeed(testNode()) + return defer.succeed(TestNode()) def cb(data): self.assertEquals('node', data['nodeIdentifier']) self.assertEquals([ITEM], data['items']) - self.assertEquals(OWNER, data['subscriber']) + self.assertEquals(OWNER, data['subscription'].subscriber) - self.storage = testStorage() + self.storage = TestStorage() self.backend = backend.BackendService(self.storage) self.storage.backend = self.backend @@ -311,7 +442,7 @@ def test_getConfigurationOptions(self): class TestBackend(BaseTestBackend): - options = { + nodeOptions = { "pubsub#persist_items": {"type": "boolean", "label": "Persist items to storage"}, @@ -327,7 +458,7 @@ def test_getDefaultConfiguration(self): class TestBackend(BaseTestBackend): - def getDefaultConfiguration(self): + def getDefaultConfiguration(self, nodeType): options = {"pubsub#persist_items": True, "pubsub#deliver_payloads": True, "pubsub#send_last_published_item": 'on_sub', @@ -338,6 +469,6 @@ self.assertEquals(True, options["pubsub#persist_items"]) s = backend.PubSubServiceFromBackend(TestBackend()) - d = s.getDefaultConfiguration(OWNER, 'test.example.org') + d = s.getDefaultConfiguration(OWNER, 'test.example.org', 'leaf') d.addCallback(cb) return d diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/test/test_gateway.py --- a/idavoll/test/test_gateway.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/test/test_gateway.py Mon Aug 04 13:47:10 2008 +0000 @@ -18,11 +18,12 @@ AGENT = "Idavoll Test Script" NS_ATOM = "http://www.w3.org/2005/Atom" -entry = domish.Element((NS_ATOM, 'entry')) -entry.addElement("id", content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a") -entry.addElement("title", content="Atom-Powered Robots Run Amok") -entry.addElement("author").addElement("name", content="John Doe") -entry.addElement("content", content="Some text.") +TEST_ENTRY = domish.Element((NS_ATOM, 'entry')) +TEST_ENTRY.addElement("id", + content="urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a") +TEST_ENTRY.addElement("title", content="Atom-Powered Robots Run Amok") +TEST_ENTRY.addElement("author").addElement("name", content="John Doe") +TEST_ENTRY.addElement("content", content="Some text.") baseURI = "http://localhost:8086/" componentJID = "pubsub" @@ -33,10 +34,27 @@ def setUp(self): self.client = gateway.GatewayClient(baseURI) self.client.startService() + self.addCleanup(self.client.stopService) + + def trapConnectionRefused(failure): + from twisted.internet.error import ConnectionRefusedError + failure.trap(ConnectionRefusedError) + raise unittest.SkipTest("Gateway to test against is not available") + + def trapNotFound(failure): + from twisted.web.error import Error + failure.trap(Error) + + d = self.client.ping() + d.addErrback(trapConnectionRefused) + d.addErrback(trapNotFound) + return d + def tearDown(self): return self.client.stopService() + def test_create(self): def cb(response): @@ -51,7 +69,7 @@ def cb(response): self.assertIn('uri', response) - d = self.client.publish(entry) + d = self.client.publish(TEST_ENTRY) d.addCallback(cb) return d @@ -62,7 +80,7 @@ def cb1(response): xmppURI = response['uri'] - d = self.client.publish(entry, xmppURI) + d = self.client.publish(TEST_ENTRY, xmppURI) d.addCallback(cb2, xmppURI) return d @@ -74,7 +92,7 @@ def cb(err): self.assertEqual('404', err.status) - d = self.client.publish(entry, 'xmpp:%s?node=test' % componentJID) + d = self.client.publish(TEST_ENTRY, 'xmpp:%s?node=test' % componentJID) self.assertFailure(d, error.Error) d.addCallback(cb) return d @@ -93,7 +111,6 @@ d.addCallback(cb) return d - def test_subscribeGetNotification(self): def onNotification(data, headers): @@ -106,7 +123,7 @@ return d def cb2(xmppURI): - d = self.client.publish(entry, xmppURI) + d = self.client.publish(TEST_ENTRY, xmppURI) return d @@ -140,7 +157,7 @@ return d def cb3(xmppURI): - d = self.client.publish(entry, xmppURI) + d = self.client.publish(TEST_ENTRY, xmppURI) return d @@ -169,7 +186,7 @@ def cb(response): xmppURI = response['uri'] self.assertNot(self.client.deferred.called) - d = self.client.publish(entry, xmppURI) + d = self.client.publish(TEST_ENTRY, xmppURI) d.addCallback(lambda _: xmppURI) return d @@ -202,7 +219,7 @@ xmppURI = response['uri'] self.assertNot(client1.deferred.called) self.assertNot(client2.deferred.called) - d = self.client.publish(entry, xmppURI) + d = self.client.publish(TEST_ENTRY, xmppURI) d.addCallback(lambda _: xmppURI) return d @@ -224,6 +241,7 @@ client2.callback = onNotification2 client2.deferred = defer.Deferred() + d = self.client.create() d.addCallback(cb) d.addCallback(cb2) @@ -242,6 +260,32 @@ return d + def test_subscribeRootGetNotification(self): + + def onNotification(data, headers): + self.client.deferred.callback(None) + + def cb(response): + xmppURI = response['uri'] + jid, nodeIdentifier = gateway.getServiceAndNode(xmppURI) + rootNode = gateway.getXMPPURI(jid, '') + + d = self.client.subscribe(rootNode) + d.addCallback(lambda _: xmppURI) + return d + + def cb2(xmppURI): + return self.client.publish(TEST_ENTRY, xmppURI) + + + self.client.callback = onNotification + self.client.deferred = defer.Deferred() + d = self.client.create() + d.addCallback(cb) + d.addCallback(cb2) + return defer.gatherResults([d, self.client.deferred]) + + def test_unsubscribeNonExisting(self): def cb(err): self.assertEqual('403', err.status) @@ -258,7 +302,7 @@ d = self.client.items(xmppURI) return d - d = self.client.publish(entry) + d = self.client.publish(TEST_ENTRY) d.addCallback(cb) return d @@ -269,6 +313,6 @@ d = self.client.items(xmppURI, 2) return d - d = self.client.publish(entry) + d = self.client.publish(TEST_ENTRY) d.addCallback(cb) return d diff -r e6b710bf2b24 -r 274a45d2a5ab idavoll/test/test_storage.py --- a/idavoll/test/test_storage.py Mon Aug 04 07:10:45 2008 +0000 +++ b/idavoll/test/test_storage.py Mon Aug 04 13:47:10 2008 +0000 @@ -5,6 +5,7 @@ Tests for L{idavoll.memory_storage} and L{idavoll.pgsql_storage}. """ +from zope.interface.verify import verifyObject from twisted.trial import unittest from twisted.words.protocols.jabber import jid from twisted.internet import defer @@ -12,7 +13,7 @@ from wokkel import pubsub -from idavoll import error +from idavoll import error, iidavoll OWNER = jid.JID('owner@example.com') SUBSCRIBER = jid.JID('subscriber@example.com/Home') @@ -20,17 +21,16 @@ SUBSCRIBER_TO_BE_DELETED = jid.JID('to_be_deleted@example.com/Home') SUBSCRIBER_PENDING = jid.JID('pending@example.com/Home') PUBLISHER = jid.JID('publisher@example.com') -ITEM = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB) +ITEM = domish.Element((None, 'item')) ITEM['id'] = 'current' ITEM.addElement(('testns', 'test'), content=u'Test \u2083 item') -ITEM_NEW = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB) +ITEM_NEW = domish.Element((None, 'item')) ITEM_NEW['id'] = 'new' ITEM_NEW.addElement(('testns', 'test'), content=u'Test \u2083 item') -ITEM_UPDATED = domish.Element((pubsub.NS_PUBSUB, 'item'), pubsub.NS_PUBSUB) +ITEM_UPDATED = domish.Element((None, 'item')) ITEM_UPDATED['id'] = 'current' ITEM_UPDATED.addElement(('testns', 'test'), content=u'Test \u2084 item') -ITEM_TO_BE_DELETED = domish.Element((pubsub.NS_PUBSUB, 'item'), - pubsub.NS_PUBSUB) +ITEM_TO_BE_DELETED = domish.Element((None, 'item')) ITEM_TO_BE_DELETED['id'] = 'to-be-deleted' ITEM_TO_BE_DELETED.addElement(('testns', 'test'), content=u'Test \u2083 item') @@ -53,6 +53,18 @@ return d + def test_interfaceIStorage(self): + self.assertTrue(verifyObject(iidavoll.IStorage, self.s)) + + + def test_interfaceINode(self): + self.assertTrue(verifyObject(iidavoll.INode, self.node)) + + + def test_interfaceILeafNode(self): + self.assertTrue(verifyObject(iidavoll.ILeafNode, self.node)) + + def test_getNode(self): return self.s.getNode('pre-existing') @@ -72,7 +84,9 @@ def test_createExistingNode(self): - d = self.s.createNode('pre-existing', OWNER) + config = self.s.getDefaultConfiguration('leaf') + config['pubsub#node_type'] = 'leaf' + d = self.s.createNode('pre-existing', OWNER, config) self.assertFailure(d, error.NodeExists) return d @@ -82,7 +96,9 @@ d = self.s.getNode('new 1') return d - d = self.s.createNode('new 1', OWNER) + config = self.s.getDefaultConfiguration('leaf') + config['pubsub#node_type'] = 'leaf' + d = self.s.createNode('new 1', OWNER, config) d.addCallback(cb) return d @@ -115,7 +131,13 @@ def test_getSubscriptions(self): def cb(subscriptions): - self.assertIn(('pre-existing', SUBSCRIBER, 'subscribed'), subscriptions) + found = False + for subscription in subscriptions: + if (subscription.nodeIdentifier == 'pre-existing' and + subscription.subscriber == SUBSCRIBER and + subscription.state == 'subscribed'): + found = True + self.assertTrue(found) d = self.s.getSubscriptions(SUBSCRIBER) d.addCallback(cb) @@ -192,30 +214,30 @@ def cb1(void): return self.node.getSubscription(SUBSCRIBER_NEW) - def cb2(state): - self.assertEqual(state, 'pending') + def cb2(subscription): + self.assertEqual(subscription.state, 'pending') - d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending') + d = self.node.addSubscription(SUBSCRIBER_NEW, 'pending', {}) d.addCallback(cb1) d.addCallback(cb2) return d def test_addExistingSubscription(self): - d = self.node.addSubscription(SUBSCRIBER, 'pending') + d = self.node.addSubscription(SUBSCRIBER, 'pending', {}) self.assertFailure(d, error.SubscriptionExists) return d def test_getSubscription(self): def cb(subscriptions): - self.assertEquals(subscriptions[0][1], 'subscribed') - self.assertEquals(subscriptions[1][1], 'pending') - self.assertEquals(subscriptions[2][1], None) + self.assertEquals(subscriptions[0].state, 'subscribed') + self.assertEquals(subscriptions[1].state, 'pending') + self.assertEquals(subscriptions[2], None) - d = defer.DeferredList([self.node.getSubscription(SUBSCRIBER), - self.node.getSubscription(SUBSCRIBER_PENDING), - self.node.getSubscription(OWNER)]) + d = defer.gatherResults([self.node.getSubscription(SUBSCRIBER), + self.node.getSubscription(SUBSCRIBER_PENDING), + self.node.getSubscription(OWNER)]) d.addCallback(cb) return d @@ -230,13 +252,17 @@ return d - def test_getSubscribers(self): + def test_getNodeSubscriptions(self): + def extractSubscribers(subscriptions): + return [subscription.subscriber for subscription in subscriptions] + def cb(subscribers): self.assertIn(SUBSCRIBER, subscribers) self.assertNotIn(SUBSCRIBER_PENDING, subscribers) self.assertNotIn(OWNER, subscribers) - d = self.node.getSubscribers() + d = self.node.getSubscriptions('subscribed') + d.addCallback(extractSubscribers) d.addCallback(cb) return d @@ -381,7 +407,9 @@ def setUp(self): from idavoll.memory_storage import Storage, PublishedItem, LeafNode - from idavoll.memory_storage import Subscription, defaultConfig + from idavoll.memory_storage import Subscription + + defaultConfig = Storage.defaultConfig['leaf'] self.s = Storage() self.s._nodes['pre-existing'] = \ @@ -394,11 +422,15 @@ LeafNode('to-be-purged', OWNER, None) subscriptions = self.s._nodes['pre-existing']._subscriptions - subscriptions[SUBSCRIBER.full()] = Subscription('subscribed') + subscriptions[SUBSCRIBER.full()] = Subscription('pre-existing', + SUBSCRIBER, + 'subscribed') subscriptions[SUBSCRIBER_TO_BE_DELETED.full()] = \ - Subscription('subscribed') + Subscription('pre-existing', SUBSCRIBER_TO_BE_DELETED, + 'subscribed') subscriptions[SUBSCRIBER_PENDING.full()] = \ - Subscription('pending') + Subscription('pre-existing', SUBSCRIBER_PENDING, + 'pending') item = PublishedItem(ITEM_TO_BE_DELETED, PUBLISHER) self.s._nodes['pre-existing']._items['to-be-deleted'] = item @@ -436,7 +468,9 @@ def init(self, cursor): self.cleandb(cursor) - cursor.execute("""INSERT INTO nodes (node) VALUES ('pre-existing')""") + cursor.execute("""INSERT INTO nodes + (node, node_type, persist_items) + VALUES ('pre-existing', 'leaf', TRUE)""") cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-deleted')""") cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-reconfigured')""") cursor.execute("""INSERT INTO nodes (node) VALUES ('to-be-purged')""") @@ -444,15 +478,15 @@ OWNER.userhost()) cursor.execute("""INSERT INTO affiliations (node_id, entity_id, affiliation) - SELECT nodes.id, entities.id, 'owner' + SELECT node_id, entity_id, 'owner' FROM nodes, entities WHERE node='pre-existing' AND jid=%s""", OWNER.userhost()) cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", SUBSCRIBER.userhost()) cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, subscription) - SELECT nodes.id, entities.id, %s, 'subscribed' + (node_id, entity_id, resource, state) + SELECT node_id, entity_id, %s, 'subscribed' FROM nodes, entities WHERE node='pre-existing' AND jid=%s""", (SUBSCRIBER.resource, @@ -460,8 +494,8 @@ cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", SUBSCRIBER_TO_BE_DELETED.userhost()) cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, subscription) - SELECT nodes.id, entities.id, %s, 'subscribed' + (node_id, entity_id, resource, state) + SELECT node_id, entity_id, %s, 'subscribed' FROM nodes, entities WHERE node='pre-existing' AND jid=%s""", (SUBSCRIBER_TO_BE_DELETED.resource, @@ -469,8 +503,8 @@ cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", SUBSCRIBER_PENDING.userhost()) cursor.execute("""INSERT INTO subscriptions - (node_id, entity_id, resource, subscription) - SELECT nodes.id, entities.id, %s, 'pending' + (node_id, entity_id, resource, state) + SELECT node_id, entity_id, %s, 'pending' FROM nodes, entities WHERE node='pre-existing' AND jid=%s""", (SUBSCRIBER_PENDING.resource, @@ -479,20 +513,20 @@ PUBLISHER.userhost()) cursor.execute("""INSERT INTO items (node_id, publisher, item, data, date) - SELECT nodes.id, %s, 'to-be-deleted', %s, + SELECT node_id, %s, 'to-be-deleted', %s, now() - interval '1 day' FROM nodes WHERE node='pre-existing'""", (PUBLISHER.userhost(), ITEM_TO_BE_DELETED.toXml())) cursor.execute("""INSERT INTO items (node_id, publisher, item, data) - SELECT nodes.id, %s, 'to-be-deleted', %s + SELECT node_id, %s, 'to-be-deleted', %s FROM nodes WHERE node='to-be-purged'""", (PUBLISHER.userhost(), ITEM_TO_BE_DELETED.toXml())) cursor.execute("""INSERT INTO items (node_id, publisher, item, data) - SELECT nodes.id, %s, 'current', %s + SELECT node_id, %s, 'current', %s FROM nodes WHERE node='pre-existing'""", (PUBLISHER.userhost(), diff -r e6b710bf2b24 -r 274a45d2a5ab setup.py --- a/setup.py Mon Aug 04 07:10:45 2008 +0000 +++ b/setup.py Mon Aug 04 13:47:10 2008 +0000 @@ -28,6 +28,8 @@ ], package_data={'twisted.plugins': ['twisted/plugins/idavoll.py', 'twisted/plugins/idavoll_http.py']}, + data_files=[('share/idavoll', ['db/pubsub.sql', + 'db/to_idavoll_0.8.sql'])], zip_safe=False, install_requires=install_requires, )