diff src/gateway.py @ 369:dabee42494ac

config file + cleaning: - SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir). Its options must be in the "pubsub" section - options on command line override config options - removed tap and http files which are not used anymore - changed directory structure to put source in src, to be coherent with SàT and Libervia - changed options name, db* become db_*, secret become xmpp_pwd - an exception is raised if jid or xmpp_pwd is are not configured
author Goffi <goffi@goffi.org>
date Fri, 02 Mar 2018 12:59:38 +0100
parents sat_pubsub/gateway.py@618a92080812
children aa3a464df605
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/gateway.py	Fri Mar 02 12:59:38 2018 +0100
@@ -0,0 +1,899 @@
+#!/usr/bin/python
+#-*- coding: utf-8 -*-
+
+# Copyright (c) 2003-2011 Ralph Meijer
+# Copyright (c) 2012-2018 Jérôme Poisson
+
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+# --
+
+# This program is based on Idavoll (http://idavoll.ik.nu/),
+# originaly written by Ralph Meijer (http://ralphm.net/blog/)
+# It is sublicensed under AGPL v3 (or any later version) as allowed by the original
+# license.
+
+# --
+
+# Here is a copy of the original license:
+
+# Copyright (c) 2003-2011 Ralph Meijer
+
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+"""
+Web resources and client for interacting with pubsub services.
+"""
+
+import mimetools
+from time import gmtime, strftime
+from StringIO import StringIO
+import urllib
+import urlparse
+
+import simplejson
+
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.python import log
+from twisted.web import client, http, resource, server
+from twisted.web.error import Error
+from twisted.words.protocols.jabber.jid import JID
+from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.xish import domish
+
+from wokkel.generic import parseXml
+from wokkel.pubsub import Item
+from wokkel.pubsub import PubSubClient
+
+from sat_pubsub import error
+
+NS_ATOM = 'http://www.w3.org/2005/Atom'
+MIME_ATOM_ENTRY = b'application/atom+xml;type=entry'
+MIME_ATOM_FEED = b'application/atom+xml;type=feed'
+MIME_JSON = b'application/json'
+
+class XMPPURIParseError(ValueError):
+    """
+    Raised when a given XMPP URI couldn't be properly parsed.
+    """
+
+
+
+def getServiceAndNode(uri):
+    """
+    Given an XMPP URI, extract the publish subscribe service JID and node ID.
+    """
+
+    try:
+        scheme, rest = uri.split(':', 1)
+    except ValueError:
+        raise XMPPURIParseError("No URI scheme component")
+
+    if scheme != 'xmpp':
+        raise XMPPURIParseError("Unknown URI scheme")
+
+    if rest.startswith("//"):
+        raise XMPPURIParseError("Unexpected URI authority component")
+
+    try:
+        entity, query = rest.split('?', 1)
+    except ValueError:
+        entity, query = rest, ''
+
+    if not entity:
+        raise XMPPURIParseError("Empty URI path component")
+
+    try:
+        service = JID(entity)
+    except Exception, e:
+        raise XMPPURIParseError("Invalid JID: %s" % e)
+
+    params = urlparse.parse_qs(query)
+
+    try:
+        nodeIdentifier = params['node'][0]
+    except (KeyError, ValueError):
+        nodeIdentifier = ''
+
+    return service, nodeIdentifier
+
+
+
+def getXMPPURI(service, nodeIdentifier):
+    """
+    Construct an XMPP URI from a service JID and node identifier.
+    """
+    return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '')
+
+
+
+def _parseContentType(header):
+    """
+    Parse a Content-Type header value to a L{mimetools.Message}.
+
+    L{mimetools.Message} parses a Content-Type header and makes the
+    components available with its C{getmaintype}, C{getsubtype}, C{gettype},
+    C{getplist} and C{getparam} methods.
+    """
+    return mimetools.Message(StringIO(b'Content-Type: ' + header))
+
+
+
+def _asyncResponse(render):
+    """
+    """
+    def wrapped(self, request):
+        def eb(failure):
+            if failure.check(Error):
+                err = failure.value
+            else:
+                log.err(failure)
+                err = Error(500)
+            request.setResponseCode(err.status, err.message)
+            return err.response
+
+        def finish(result):
+            if result is server.NOT_DONE_YET:
+                return
+
+            if result:
+                request.write(result)
+            request.finish()
+
+        d = defer.maybeDeferred(render, self, request)
+        d.addErrback(eb)
+        d.addCallback(finish)
+
+        return server.NOT_DONE_YET
+
+    return wrapped
+
+
+
+class CreateResource(resource.Resource):
+    """
+    A resource to create a publish-subscribe node.
+    """
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+
+    http_GET = None
+
+
+    @_asyncResponse
+    def render_POST(self, request):
+        """
+        Respond to a POST request to create a new node.
+        """
+
+        def toResponse(nodeIdentifier):
+            uri = getXMPPURI(self.serviceJID, nodeIdentifier)
+            body = simplejson.dumps({'uri': uri})
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
+
+        d = self.backend.createNode(None, self.owner)
+        d.addCallback(toResponse)
+        return d
+
+
+
+class DeleteResource(resource.Resource):
+    """
+    A resource to create a publish-subscribe node.
+    """
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+
+    render_GET = None
+
+
+    @_asyncResponse
+    def render_POST(self, request):
+        """
+        Respond to a POST request to create a new node.
+        """
+        def toResponse(result):
+            request.setResponseCode(http.NO_CONTENT)
+
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            raise Error(http.NOT_FOUND, "Node not found")
+
+        if not request.args.get('uri'):
+            raise Error(http.BAD_REQUEST, "No URI given")
+
+        try:
+            jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
+        except XMPPURIParseError, e:
+            raise Error(http.BAD_REQUEST, "Malformed XMPP URI: %s" % e)
+
+
+        data = request.content.read()
+        if data:
+            params = simplejson.loads(data)
+            redirectURI = params.get('redirect_uri', None)
+        else:
+            redirectURI = None
+
+        d = self.backend.deleteNode(nodeIdentifier, self.owner,
+                                    redirectURI)
+        d.addCallback(toResponse)
+        d.addErrback(trapNotFound)
+        return d
+
+
+
+class PublishResource(resource.Resource):
+    """
+    A resource to publish to a publish-subscribe node.
+    """
+
+    def __init__(self, backend, serviceJID, owner):
+        self.backend = backend
+        self.serviceJID = serviceJID
+        self.owner = owner
+
+
+    render_GET = None
+
+
+    def checkMediaType(self, request):
+        ctype = request.getHeader(b'content-type')
+
+        if not ctype:
+            request.setResponseCode(http.BAD_REQUEST)
+
+            raise Error(http.BAD_REQUEST, b"No specified Media Type")
+
+        message = _parseContentType(ctype)
+        if (message.maintype != b'application' or
+            message.subtype != b'atom+xml' or
+            message.getparam(b'type') != b'entry' or
+            (message.getparam(b'charset') or b'utf-8') != b'utf-8'):
+            raise Error(http.UNSUPPORTED_MEDIA_TYPE,
+                              b"Unsupported Media Type: %s" % ctype)
+
+
+    @_asyncResponse
+    def render_POST(self, request):
+        """
+        Respond to a POST request to create a new item.
+        """
+
+        def toResponse(nodeIdentifier):
+            uri = getXMPPURI(self.serviceJID, nodeIdentifier)
+            body = simplejson.dumps({'uri': uri})
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
+
+        def gotNode(nodeIdentifier, payload):
+            item = Item(id='current', payload=payload)
+            d = self.backend.publish(nodeIdentifier, [item], self.owner)
+            d.addCallback(lambda _: nodeIdentifier)
+            return d
+
+        def getNode():
+            if request.args.get('uri'):
+                jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
+                return defer.succeed(nodeIdentifier)
+            else:
+                return self.backend.createNode(None, self.owner)
+
+        def trapNotFound(failure):
+            failure.trap(error.NodeNotFound)
+            raise Error(http.NOT_FOUND, "Node not found")
+
+        def trapXMPPURIParseError(failure):
+            failure.trap(XMPPURIParseError)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % failure.value)
+
+        self.checkMediaType(request)
+        payload = parseXml(request.content.read())
+        d = getNode()
+        d.addCallback(gotNode, payload)
+        d.addCallback(toResponse)
+        d.addErrback(trapNotFound)
+        d.addErrback(trapXMPPURIParseError)
+        return d
+
+
+
+class ListResource(resource.Resource):
+    def __init__(self, service):
+        self.service = service
+
+
+    @_asyncResponse
+    def render_GET(self, request):
+        def responseFromNodes(nodeIdentifiers):
+            body = simplejson.dumps(nodeIdentifiers)
+            request.setHeader(b'Content-Type', MIME_JSON)
+            return body
+
+        d = self.service.getNodes()
+        d.addCallback(responseFromNodes)
+        return d
+
+
+
+# Service for subscribing to remote XMPP Pubsub nodes and web resources
+
+def extractAtomEntries(items):
+    """
+    Extract atom entries from a list of publish-subscribe items.
+
+    @param items: List of L{domish.Element}s that represent publish-subscribe
+        items.
+    @type items: C{list}
+    """
+
+    atomEntries = []
+
+    for item in items:
+        # ignore non-items (i.e. retractions)
+        if item.name != 'item':
+            continue
+
+        atomEntry = None
+        for element in item.elements():
+            # extract the first element that is an atom entry
+            if element.uri == NS_ATOM and element.name == 'entry':
+                atomEntry = element
+                break
+
+        if atomEntry:
+            atomEntries.append(atomEntry)
+
+    return atomEntries
+
+
+
+def constructFeed(service, nodeIdentifier, entries, title):
+    nodeURI = getXMPPURI(service, nodeIdentifier)
+    now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
+
+    # Collect the received entries in a feed
+    feed = domish.Element((NS_ATOM, 'feed'))
+    feed.addElement('title', content=title)
+    feed.addElement('id', content=nodeURI)
+    feed.addElement('updated', content=now)
+
+    for entry in entries:
+        feed.addChild(entry)
+
+    return feed
+
+
+
+class RemoteSubscriptionService(service.Service, PubSubClient):
+    """
+    Service for subscribing to remote XMPP Publish-Subscribe nodes.
+
+    Subscriptions are created with a callback HTTP URI that is POSTed
+    to with the received items in notifications.
+    """
+
+    def __init__(self, jid, storage):
+        self.jid = jid
+        self.storage = storage
+
+
+    def trapNotFound(self, failure):
+        failure.trap(StanzaError)
+
+        if failure.value.condition == 'item-not-found':
+            raise error.NodeNotFound()
+        else:
+            return failure
+
+
+    def subscribeCallback(self, jid, nodeIdentifier, callback):
+        """
+        Subscribe a callback URI.
+
+        This registers a callback URI to be called when a notification is
+        received for the given node.
+
+        If this is the first callback registered for this node, the gateway
+        will subscribe to the node. Otherwise, the most recently published item
+        for this node is retrieved and, if present, the newly registered
+        callback will be called with that item.
+        """
+
+        def callbackForLastItem(items):
+            atomEntries = extractAtomEntries(items)
+
+            if not atomEntries:
+                return
+
+            self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
+                         MIME_ATOM_ENTRY)
+
+        def subscribeOrItems(hasCallbacks):
+            if hasCallbacks:
+                if not nodeIdentifier:
+                    return None
+                d = self.items(jid, nodeIdentifier, 1)
+                d.addCallback(callbackForLastItem)
+            else:
+                d = self.subscribe(jid, nodeIdentifier, self.jid)
+
+            d.addErrback(self.trapNotFound)
+            return d
+
+        d = self.storage.hasCallbacks(jid, nodeIdentifier)
+        d.addCallback(subscribeOrItems)
+        d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier,
+                                                         callback))
+        return d
+
+
+    def unsubscribeCallback(self, jid, nodeIdentifier, callback):
+        """
+        Unsubscribe a callback.
+
+        If this was the last registered callback for this node, the
+        gateway will unsubscribe from node.
+        """
+
+        def cb(last):
+            if last:
+                return self.unsubscribe(jid, nodeIdentifier, self.jid)
+
+        d = self.storage.removeCallback(jid, nodeIdentifier, callback)
+        d.addCallback(cb)
+        return d
+
+
+    def itemsReceived(self, event):
+        """
+        Fire up HTTP client to do callback
+        """
+
+        atomEntries = extractAtomEntries(event.items)
+        service = event.sender
+        nodeIdentifier = event.nodeIdentifier
+        headers = event.headers
+
+        # Don't notify if there are no atom entries
+        if not atomEntries:
+            return
+
+        if len(atomEntries) == 1:
+            contentType = MIME_ATOM_ENTRY
+            payload = atomEntries[0]
+        else:
+            contentType = MIME_ATOM_FEED
+            payload = constructFeed(service, nodeIdentifier, atomEntries,
+                                    title='Received item collection')
+
+        self.callCallbacks(service, nodeIdentifier, payload, contentType)
+
+        if 'Collection' in headers:
+            for collection in headers['Collection']:
+                nodeIdentifier = collection or ''
+                self.callCallbacks(service, nodeIdentifier, payload,
+                                   contentType)
+
+
+    def deleteReceived(self, event):
+        """
+        Fire up HTTP client to do callback
+        """
+
+        service = event.sender
+        nodeIdentifier = event.nodeIdentifier
+        redirectURI = event.redirectURI
+        self.callCallbacks(service, nodeIdentifier, eventType='DELETED',
+                           redirectURI=redirectURI)
+
+
+    def _postTo(self, callbacks, service, nodeIdentifier,
+                      payload=None, contentType=None, eventType=None,
+                      redirectURI=None):
+
+        if not callbacks:
+            return
+
+        postdata = None
+        nodeURI = getXMPPURI(service, nodeIdentifier)
+        headers = {'Referer': nodeURI.encode('utf-8'),
+                   'PubSub-Service': service.full().encode('utf-8')}
+
+        if payload:
+            postdata = payload.toXml().encode('utf-8')
+            if contentType:
+                headers['Content-Type'] = "%s;charset=utf-8" % contentType
+
+        if eventType:
+            headers['Event'] = eventType
+
+        if redirectURI:
+            headers['Link'] = '<%s>; rel=alternate' % (
+                              redirectURI.encode('utf-8'),
+                              )
+
+        def postNotification(callbackURI):
+            f = getPageWithFactory(str(callbackURI),
+                                   method='POST',
+                                   postdata=postdata,
+                                   headers=headers)
+            d = f.deferred
+            d.addErrback(log.err)
+
+        for callbackURI in callbacks:
+            reactor.callLater(0, postNotification, callbackURI)
+
+
+    def callCallbacks(self, service, nodeIdentifier,
+                            payload=None, contentType=None, eventType=None,
+                            redirectURI=None):
+
+        def eb(failure):
+            failure.trap(error.NoCallbacks)
+
+            # No callbacks were registered for this node. Unsubscribe?
+
+        d = self.storage.getCallbacks(service, nodeIdentifier)
+        d.addCallback(self._postTo, service, nodeIdentifier, payload,
+                                    contentType, eventType, redirectURI)
+        d.addErrback(eb)
+        d.addErrback(log.err)
+
+
+
+class RemoteSubscribeBaseResource(resource.Resource):
+    """
+    Base resource for remote pubsub node subscription and unsubscription.
+
+    This resource accepts POST request with a JSON document that holds
+    a dictionary with the keys C{uri} and C{callback} that respectively map
+    to the XMPP URI of the publish-subscribe node and the callback URI.
+
+    This class should be inherited with L{serviceMethod} overridden.
+
+    @cvar serviceMethod: The name of the method to be called with
+                         the JID of the pubsub service, the node identifier
+                         and the callback URI as received in the HTTP POST
+                         request to this resource.
+    """
+    serviceMethod = None
+    errorMap = {
+            error.NodeNotFound:
+                (http.FORBIDDEN, "Node not found"),
+            error.NotSubscribed:
+                (http.FORBIDDEN, "No such subscription found"),
+            error.SubscriptionExists:
+                (http.FORBIDDEN, "Subscription already exists"),
+    }
+
+    def __init__(self, service):
+        self.service = service
+        self.params = None
+
+
+    render_GET = None
+
+
+    @_asyncResponse
+    def render_POST(self, request):
+        def trapNotFound(failure):
+            err = failure.trap(*self.errorMap.keys())
+            status, message = self.errorMap[err]
+            raise Error(status, message)
+
+        def toResponse(result):
+            request.setResponseCode(http.NO_CONTENT)
+            return b''
+
+        def trapXMPPURIParseError(failure):
+            failure.trap(XMPPURIParseError)
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % failure.value)
+
+        data = request.content.read()
+        self.params = simplejson.loads(data)
+
+        uri = self.params['uri']
+        callback = self.params['callback']
+
+        jid, nodeIdentifier = getServiceAndNode(uri)
+        method = getattr(self.service, self.serviceMethod)
+        d = method(jid, nodeIdentifier, callback)
+        d.addCallback(toResponse)
+        d.addErrback(trapNotFound)
+        d.addErrback(trapXMPPURIParseError)
+        return d
+
+
+
+class RemoteSubscribeResource(RemoteSubscribeBaseResource):
+    """
+    Resource to subscribe to a remote publish-subscribe node.
+
+    The passed C{uri} is the XMPP URI of the node to subscribe to and the
+    C{callback} is the callback URI. Upon receiving notifications from the
+    node, a POST request will be perfomed on the callback URI.
+    """
+    serviceMethod = 'subscribeCallback'
+
+
+
+class RemoteUnsubscribeResource(RemoteSubscribeBaseResource):
+    """
+    Resource to unsubscribe from a remote publish-subscribe node.
+
+    The passed C{uri} is the XMPP URI of the node to unsubscribe from and the
+    C{callback} is the callback URI that was registered for it.
+    """
+    serviceMethod = 'unsubscribeCallback'
+
+
+
+class RemoteItemsResource(resource.Resource):
+    """
+    Resource for retrieving items from a remote pubsub node.
+    """
+
+    def __init__(self, service):
+        self.service = service
+
+
+    @_asyncResponse
+    def render_GET(self, request):
+        try:
+            maxItems = int(request.args.get('max_items', [0])[0]) or None
+        except ValueError:
+            raise Error(http.BAD_REQUEST,
+                        "The argument max_items has an invalid value.")
+
+        try:
+            uri = request.args['uri'][0]
+        except KeyError:
+            raise Error(http.BAD_REQUEST,
+                        "No URI for the remote node provided.")
+
+        try:
+            jid, nodeIdentifier = getServiceAndNode(uri)
+        except XMPPURIParseError:
+            raise Error(http.BAD_REQUEST,
+                        "Malformed XMPP URI: %s" % uri)
+
+        def toResponse(items):
+            """
+            Create a feed out the retrieved items.
+            """
+            atomEntries = extractAtomEntries(items)
+            feed = constructFeed(jid, nodeIdentifier, atomEntries,
+                                    "Retrieved item collection")
+            body = feed.toXml().encode('utf-8')
+            request.setHeader(b'Content-Type', MIME_ATOM_FEED)
+            return body
+
+        def trapNotFound(failure):
+            failure.trap(StanzaError)
+            if not failure.value.condition == 'item-not-found':
+                raise failure
+            raise Error(http.NOT_FOUND, "Node not found")
+
+        d = self.service.items(jid, nodeIdentifier, maxItems)
+        d.addCallback(toResponse)
+        d.addErrback(trapNotFound)
+        return d
+
+
+
+# Client side code to interact with a service as provided above
+
+def getPageWithFactory(url, contextFactory=None, *args, **kwargs):
+    """Download a web page.
+
+    Download a page. Return the factory that holds a deferred, which will
+    callback with a page (as a string) or errback with a description of the
+    error.
+
+    See HTTPClientFactory to see what extra args can be passed.
+    """
+
+    factory = client.HTTPClientFactory(url, *args, **kwargs)
+    factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200()
+
+    if factory.scheme == 'https':
+        from twisted.internet import ssl
+        if contextFactory is None:
+            contextFactory = ssl.ClientContextFactory()
+        reactor.connectSSL(factory.host, factory.port, factory, contextFactory)
+    else:
+        reactor.connectTCP(factory.host, factory.port, factory)
+    return factory
+
+
+
+class CallbackResource(resource.Resource):
+    """
+    Web resource for retrieving gateway notifications.
+    """
+
+    def __init__(self, callback):
+        self.callback = callback
+
+
+    http_GET = None
+
+
+    def render_POST(self, request):
+        if request.requestHeaders.hasHeader(b'Event'):
+            payload = None
+        else:
+            payload = parseXml(request.content.read())
+
+        self.callback(payload, request.requestHeaders)
+
+        request.setResponseCode(http.NO_CONTENT)
+        return b''
+
+
+
+
+class GatewayClient(service.Service):
+    """
+    Service that provides client access to the HTTP Gateway into Idavoll.
+    """
+
+    agent = "Idavoll HTTP Gateway Client"
+
+    def __init__(self, baseURI, callbackHost=None, callbackPort=None):
+        self.baseURI = baseURI
+        self.callbackHost = callbackHost or 'localhost'
+        self.callbackPort = callbackPort or 8087
+        root = resource.Resource()
+        root.putChild('callback', CallbackResource(
+                lambda *args, **kwargs: self.callback(*args, **kwargs)))
+        self.site = server.Site(root)
+
+
+    def startService(self):
+        self.port = reactor.listenTCP(self.callbackPort,
+                                      self.site)
+
+
+    def stopService(self):
+        return self.port.stopListening()
+
+
+    def _makeURI(self, verb, query=None):
+        uriComponents = urlparse.urlparse(self.baseURI)
+        uri = urlparse.urlunparse((uriComponents[0],
+                                   uriComponents[1],
+                                   uriComponents[2] + verb,
+                                   '',
+                                   query and urllib.urlencode(query) or '',
+                                   ''))
+        return uri
+
+
+    def callback(self, data, headers):
+        pass
+
+
+    def ping(self):
+        f = getPageWithFactory(self._makeURI(''),
+                               method='HEAD',
+                               agent=self.agent)
+        return f.deferred
+
+
+    def create(self):
+        f = getPageWithFactory(self._makeURI('create'),
+                    method='POST',
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+
+    def delete(self, xmppURI, redirectURI=None):
+        query = {'uri': xmppURI}
+
+        if redirectURI:
+            params = {'redirect_uri': redirectURI}
+            postdata = simplejson.dumps(params)
+            headers = {'Content-Type': MIME_JSON}
+        else:
+            postdata = None
+            headers = None
+
+        f = getPageWithFactory(self._makeURI('delete', query),
+                    method='POST',
+                    postdata=postdata,
+                    headers=headers,
+                    agent=self.agent)
+        return f.deferred
+
+
+    def publish(self, entry, xmppURI=None):
+        query = xmppURI and {'uri': xmppURI}
+
+        f = getPageWithFactory(self._makeURI('publish', query),
+                    method='POST',
+                    postdata=entry.toXml().encode('utf-8'),
+                    headers={'Content-Type': MIME_ATOM_ENTRY},
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+
+    def listNodes(self):
+        f = getPageWithFactory(self._makeURI('list'),
+                    method='GET',
+                    agent=self.agent)
+        return f.deferred.addCallback(simplejson.loads)
+
+
+    def subscribe(self, xmppURI):
+        params = {'uri': xmppURI,
+                  'callback': 'http://%s:%s/callback' % (self.callbackHost,
+                                                         self.callbackPort)}
+        f = getPageWithFactory(self._makeURI('subscribe'),
+                    method='POST',
+                    postdata=simplejson.dumps(params),
+                    headers={'Content-Type': MIME_JSON},
+                    agent=self.agent)
+        return f.deferred
+
+
+    def unsubscribe(self, xmppURI):
+        params = {'uri': xmppURI,
+                  'callback': 'http://%s:%s/callback' % (self.callbackHost,
+                                                         self.callbackPort)}
+        f = getPageWithFactory(self._makeURI('unsubscribe'),
+                    method='POST',
+                    postdata=simplejson.dumps(params),
+                    headers={'Content-Type': MIME_JSON},
+                    agent=self.agent)
+        return f.deferred
+
+
+    def items(self, xmppURI, maxItems=None):
+        query = {'uri': xmppURI}
+        if maxItems is not None:
+             query['max_items'] = int(maxItems)
+        f = getPageWithFactory(self._makeURI('items', query),
+                    method='GET',
+                    agent=self.agent)
+        return f.deferred