Mercurial > libervia-pubsub
diff idavoll/gateway.py @ 185:9038908dc2f5
Add gateway support for retrieving items from a node. Reorder gateway module.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 15 Apr 2008 17:32:56 +0000 |
parents | c21b986cff30 |
children | 365fd3e4daf8 |
line wrap: on
line diff
--- a/idavoll/gateway.py Fri Apr 11 14:48:32 2008 +0000 +++ b/idavoll/gateway.py Tue Apr 15 17:32:56 2008 +0000 @@ -34,170 +34,13 @@ MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' MIME_JSON = 'application/json' -class RemoteSubscriptionService(service.Service, PubSubClient): - - def __init__(self, jid): - self.jid = jid - - def startService(self): - self.callbacks = {} - - def trapNotFound(self, failure): - failure.trap(StanzaError) - if not failure.value.condition == 'item-not-found': - raise failure - raise error.NodeNotFound - - def subscribeCallback(self, jid, nodeIdentifier, callback): - - def newCallbackList(result): - callbackList = set() - self.callbacks[jid, nodeIdentifier] = callbackList - return callbackList - - try: - callbackList = self.callbacks[jid, nodeIdentifier] - except KeyError: - d = self.subscribe(jid, nodeIdentifier, self.jid) - d.addCallback(newCallbackList) - else: - d = defer.succeed(callbackList) - - d.addCallback(lambda callbackList: callbackList.add(callback)) - d.addErrback(self.trapNotFound) - return d - - def unsubscribeCallback(self, jid, nodeIdentifier, callback): - try: - callbackList = self.callbacks[jid, nodeIdentifier] - callbackList.remove(callback) - except KeyError: - return defer.fail(error.NotSubscribed()) - - if not callbackList: - self.unsubscribe(jid, nodeIdentifier, self.jid) - - return defer.succeed(None) - - def itemsReceived(self, recipient, service, nodeIdentifier, items): - """ - Fire up HTTP client to do callback - """ - - # Collect atom entries - atomEntries = [] - for item in items: - # ignore non-items (i.e. retractions) - if item.name != 'item': - continue - - atomEntry = None - for element in item.elements(): - # extract the first element that is an atom entry - if element.uri == NS_ATOM and element.name == 'entry': - atomEntry = element - break - - if atomEntry: - atomEntries.append(atomEntry) - - # Don't notify if there are no atom entries - if not atomEntries: - return - - if len(atomEntries) == 1: - contentType = 'application/atom+xml;type=entry' - payload = atomEntries[0] - else: - contentType = 'application/atom+xml;type=feed' - nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) - now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) - - # Collect the received entries in a feed - payload = domish.Element((NS_ATOM, 'feed')) - payload.addElement('title', content='Received item collection') - payload.addElement('id', content=nodeURI) - payload.addElement('updated', content=now) - for atomEntry in atomEntries: - payload.addChild(atomEntry) - - self.callCallbacks(recipient, service, nodeIdentifier, payload, - contentType) - - def deleteReceived(self, recipient, service, nodeIdentifier): - """ - Fire up HTTP client to do callback - """ - - self.callCallbacks(recipient, service, nodeIdentifier, - eventType='DELETED') - - def callCallbacks(self, recipient, service, nodeIdentifier, - payload=None, contentType=None, eventType=None): - try: - callbacks = self.callbacks[service, nodeIdentifier] - except KeyError: - return - - postdata = None - nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) - headers = {'Referer': nodeURI.encode('utf-8'), - 'PubSub-Service': service.full().encode('utf-8')} - - if payload: - postdata = payload.toXml().encode('utf-8') - if contentType: - headers['Content-Type'] = "%s;charset=utf-8" % contentType - - if eventType: - headers['Event'] = eventType - - def postNotification(callbackURI): - d = client.getPage(str(callbackURI), - method='POST', - postdata=postdata, - headers=headers) - d.addErrback(log.err) - - for callbackURI in callbacks: - reactor.callLater(0, postNotification, callbackURI) - - -class WebStreamParser(object): - def __init__(self): - self.elementStream = domish.elementStream() - self.elementStream.DocumentStartEvent = self.docStart - self.elementStream.ElementEvent = self.elem - self.elementStream.DocumentEndEvent = self.docEnd - self.done = False - - def docStart(self, elem): - self.document = elem - - def elem(self, elem): - self.document.addChild(elem) - - def docEnd(self): - self.done = True - - def parse(self, stream): - def endOfStream(result): - if not self.done: - raise Exception("No more stuff?") - else: - return self.document - - d = readStream(stream, self.elementStream.parse) - d.addCallback(endOfStream) - return d - - class XMPPURIParseError(ValueError): """ Raised when a given XMPP URI couldn't be properly parsed. """ + def getServiceAndNode(uri): """ Given an XMPP URI, extract the publish subscribe service JID and node ID. @@ -237,6 +80,41 @@ return jid, nodeIdentifier + +class WebStreamParser(object): + def __init__(self): + self.elementStream = domish.elementStream() + self.elementStream.DocumentStartEvent = self.docStart + self.elementStream.ElementEvent = self.elem + self.elementStream.DocumentEndEvent = self.docEnd + self.done = False + + + def docStart(self, elem): + self.document = elem + + + def elem(self, elem): + self.document.addChild(elem) + + + def docEnd(self): + self.done = True + + + def parse(self, stream): + def endOfStream(result): + if not self.done: + raise Exception("No more stuff?") + else: + return self.document + + d = readStream(stream, self.elementStream.parse) + d.addCallback(endOfStream) + return d + + + class CreateResource(resource.Resource): """ A resource to create a publish-subscribe node. @@ -246,8 +124,10 @@ self.serviceJID = serviceJID self.owner = owner + http_GET = None + def http_POST(self, request): """ Respond to a POST request to create a new node. @@ -263,6 +143,7 @@ return d + class DeleteResource(resource.Resource): """ A resource to create a publish-subscribe node. @@ -272,8 +153,10 @@ self.serviceJID = serviceJID self.owner = owner + http_GET = None + def http_POST(self, request): """ Respond to a POST request to create a new node. @@ -308,6 +191,7 @@ return d + class PublishResource(resource.Resource): """ A resource to publish to a publish-subscribe node. @@ -318,8 +202,10 @@ self.serviceJID = serviceJID self.owner = owner + http_GET = None + def checkMediaType(self, request): ctype = request.headers.getHeader('content-type') @@ -339,10 +225,12 @@ "Unsupported Media Type: %s" % http_headers.generateContentType(ctype))) + def parseXMLPayload(self, stream): p = WebStreamParser() return p.parse(stream) + def http_POST(self, request): """ Respond to a POST request to create a new item. @@ -390,7 +278,189 @@ return d -class SubscribeBaseResource(resource.Resource): + +class ListResource(resource.Resource): + def __init__(self, service): + self.service = service + + + def render(self, request): + def responseFromNodes(nodeIdentifiers): + return http.Response(responsecode.OK, + stream=simplejson.dumps(nodeIdentifiers)) + + d = self.service.get_nodes() + d.addCallback(responseFromNodes) + return d + + + +# Service for subscribing to remote XMPP Pubsub nodes and web resources + +def extractAtomEntries(items): + """ + Extract atom entries from a list of publish-subscribe items. + + @param items: List of L{domish.Element}s that represent publish-subscribe + items. + @type items: C{list} + """ + + atomEntries = [] + + for item in items: + # ignore non-items (i.e. retractions) + if item.name != 'item': + continue + + atomEntry = None + for element in item.elements(): + # extract the first element that is an atom entry + if element.uri == NS_ATOM and element.name == 'entry': + atomEntry = element + break + + if atomEntry: + atomEntries.append(atomEntry) + + return atomEntries + +def constructFeed(service, nodeIdentifier, entries, title): + nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) + now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) + + # Collect the received entries in a feed + feed = domish.Element((NS_ATOM, 'feed')) + feed.addElement('title', content=title) + feed.addElement('id', content=nodeURI) + feed.addElement('updated', content=now) + + for entry in entries: + feed.addChild(entry) + + return feed + +class RemoteSubscriptionService(service.Service, PubSubClient): + """ + Service for subscribing to remote XMPP Publish-Subscribe nodes. + + Subscriptions are created with a callback HTTP URI that is POSTed + to with the received items in notifications. + """ + + def __init__(self, jid): + self.jid = jid + + + def startService(self): + self.callbacks = {} + + + def trapNotFound(self, failure): + failure.trap(StanzaError) + if not failure.value.condition == 'item-not-found': + raise failure + raise error.NodeNotFound + + + def subscribeCallback(self, jid, nodeIdentifier, callback): + + def newCallbackList(result): + callbackList = set() + self.callbacks[jid, nodeIdentifier] = callbackList + return callbackList + + try: + callbackList = self.callbacks[jid, nodeIdentifier] + except KeyError: + d = self.subscribe(jid, nodeIdentifier, self.jid) + d.addCallback(newCallbackList) + else: + d = defer.succeed(callbackList) + + d.addCallback(lambda callbackList: callbackList.add(callback)) + d.addErrback(self.trapNotFound) + return d + + + def unsubscribeCallback(self, jid, nodeIdentifier, callback): + try: + callbackList = self.callbacks[jid, nodeIdentifier] + callbackList.remove(callback) + except KeyError: + return defer.fail(error.NotSubscribed()) + + if not callbackList: + self.unsubscribe(jid, nodeIdentifier, self.jid) + + return defer.succeed(None) + + + def itemsReceived(self, recipient, service, nodeIdentifier, items): + """ + Fire up HTTP client to do callback + """ + + atomEntries = extractAtomEntries(items) + + # Don't notify if there are no atom entries + if not atomEntries: + return + + if len(atomEntries) == 1: + contentType = 'application/atom+xml;type=entry' + payload = atomEntries[0] + else: + contentType = 'application/atom+xml;type=feed' + payload = constructFeed(service, nodeIdentifier, atomEntries, + title='Received item collection') + + self.callCallbacks(recipient, service, nodeIdentifier, payload, + contentType) + + + def deleteReceived(self, recipient, service, nodeIdentifier): + """ + Fire up HTTP client to do callback + """ + + self.callCallbacks(recipient, service, nodeIdentifier, + eventType='DELETED') + + + def callCallbacks(self, recipient, service, nodeIdentifier, + payload=None, contentType=None, eventType=None): + try: + callbacks = self.callbacks[service, nodeIdentifier] + except KeyError: + return + + postdata = None + nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) + headers = {'Referer': nodeURI.encode('utf-8'), + 'PubSub-Service': service.full().encode('utf-8')} + + if payload: + postdata = payload.toXml().encode('utf-8') + if contentType: + headers['Content-Type'] = "%s;charset=utf-8" % contentType + + if eventType: + headers['Event'] = eventType + + def postNotification(callbackURI): + d = client.getPage(str(callbackURI), + method='POST', + postdata=postdata, + headers=headers) + d.addErrback(log.err) + + for callbackURI in callbacks: + reactor.callLater(0, postNotification, callbackURI) + + + +class RemoteSubscribeBaseResource(resource.Resource): """ Base resource for remote pubsub node subscription and unsubscription. @@ -411,8 +481,10 @@ self.service = service self.params = None + http_GET = None + def http_POST(self, request): def trapNotFound(failure): failure.trap(error.NodeNotFound) @@ -447,7 +519,8 @@ return d -class SubscribeResource(SubscribeBaseResource): + +class RemoteSubscribeResource(RemoteSubscribeBaseResource): """ Resource to subscribe to a remote publish-subscribe node. @@ -458,7 +531,8 @@ serviceMethod = 'subscribeCallback' -class UnsubscribeResource(SubscribeBaseResource): + +class RemoteUnsubscribeResource(RemoteSubscribeBaseResource): """ Resource to unsubscribe from a remote publish-subscribe node. @@ -468,20 +542,60 @@ serviceMethod = 'unsubscribeCallback' -class ListResource(resource.Resource): + +class RemoteItemsResource(resource.Resource): + """ + Resource for retrieving items from a remote pubsub node. + """ + def __init__(self, service): self.service = service def render(self, request): - def responseFromNodes(nodeIdentifiers): - return http.Response(responsecode.OK, - stream=simplejson.dumps(nodeIdentifiers)) + try: + maxItems = int(request.args.get('max_items', [0])[0]) or None + except ValueError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "The argument max_items has an invalid value.") + + try: + jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) + except KeyError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "No URI for the remote node provided.") + except XMPPURIParseError: + return http.StatusResponse(responsecode.BAD_REQUEST, + "Malformed XMPP URI: %s" % failure.value.message) + - d = self.service.get_nodes() - d.addCallback(responseFromNodes) + def respond(items): + """Create a feed out the retrieved items.""" + contentType = http_headers.MimeType('application', + 'atom+xml', + {'type': 'feed'}) + atomEntries = extractAtomEntries(items) + feed = constructFeed(jid, nodeIdentifier, atomEntries, + "Retrieved item collection") + payload = feed.toXml().encode('utf-8') + return http.Response(responsecode.OK, stream=payload, + headers={'Content-Type': contentType}) + + def trapNotFound(failure): + failure.trap(StanzaError) + if not failure.value.condition == 'item-not-found': + raise failure + return http.StatusResponse(responsecode.NOT_FOUND, + "Node not found") + + d = self.service.items(jid, nodeIdentifier) + d.addCallback(respond) + d.addErrback(trapNotFound) return d + +# Client side code to interact with a service as provided above + def getPageWithFactory(url, contextFactory=None, *args, **kwargs): """Download a web page. @@ -506,6 +620,7 @@ return factory + class CallbackResource(resource.Resource): """ Web resource for retrieving gateway notifications. @@ -514,8 +629,10 @@ def __init__(self, callback): self.callback = callback + http_GET = None + def http_POST(self, request): p = WebStreamParser() d = p.parse(request.stream) @@ -523,6 +640,8 @@ d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) return d + + class GatewayClient(service.Service): """ Service that provides client access to the HTTP Gateway into Idavoll. @@ -538,13 +657,16 @@ root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) self.site = server.Site(root) + def startService(self): self.port = reactor.listenTCP(self.callbackPort, channel.HTTPFactory(self.site)) + def stopService(self): return self.port.stopListening() + def _makeURI(self, verb, query=None): uriComponents = urlparse.urlparse(self.baseURI) uri = urlparse.urlunparse((uriComponents[0], @@ -555,15 +677,18 @@ '')) return uri + def callback(self, data, headers): pass + def create(self): f = getPageWithFactory(self._makeURI('create'), method='POST', agent=self.agent) return f.deferred.addCallback(simplejson.loads) + def publish(self, entry, xmppURI=None): query = xmppURI and {'uri': xmppURI} @@ -574,12 +699,14 @@ agent=self.agent) return f.deferred.addCallback(simplejson.loads) + def listNodes(self): f = getPageWithFactory(self._makeURI('list'), method='GET', agent=self.agent) return f.deferred.addCallback(simplejson.loads) + def subscribe(self, xmppURI): params = {'uri': xmppURI, 'callback': 'http://%s:%s/callback' % (self.callbackHost, @@ -590,3 +717,13 @@ headers={'Content-Type': MIME_JSON}, agent=self.agent) return f.deferred + + + def items(self, xmppURI, maxItems=None): + query = {'uri': xmppURI} + if maxItems: + query['maxItems'] = int(maxItems) + f = getPageWithFactory(self._makeURI('items', query), + method='GET', + agent=self.agent) + return f.deferred