comparison idavoll/gateway.py @ 177:faf1c9bc2612

Add HTTP gateway in a separate plugin. Author: ralphm Fixes #8.
author Ralph Meijer <ralphm@ik.nu>
date Thu, 10 Apr 2008 11:18:29 +0000
parents
children c21b986cff30
comparison
equal deleted inserted replaced
176:17fc5dd77158 177:faf1c9bc2612
1 # Copyright (c) 2003-2008 Ralph Meijer
2 # See LICENSE for details.
3
4 import cgi
5 from time import gmtime, strftime
6
7 import simplejson
8
9 from twisted.application import service
10 from twisted.internet import defer, reactor
11 from twisted.web import client
12 from twisted.web2 import http, http_headers, resource, responsecode
13 from twisted.web2.stream import readStream
14 from twisted.words.protocols.jabber.jid import JID
15 from twisted.words.protocols.jabber.error import StanzaError
16 from twisted.words.xish import domish
17
18 from wokkel.pubsub import Item
19 from wokkel.pubsub import PubSubClient
20
21 from idavoll import error
22
23 NS_ATOM = 'http://www.w3.org/2005/Atom'
24
25
26 class RemoteSubscriptionService(service.Service, PubSubClient):
27
28 def __init__(self, jid):
29 self.jid = jid
30
31 def startService(self):
32 self.callbacks = {}
33
34 def trapNotFound(self, failure):
35 failure.trap(StanzaError)
36 if not failure.value.condition == 'item-not-found':
37 raise failure
38 raise error.NodeNotFound
39
40 def subscribeCallback(self, jid, nodeIdentifier, callback):
41
42 def newCallbackList(result):
43 callbackList = set()
44 self.callbacks[jid, nodeIdentifier] = callbackList
45 return callbackList
46
47 try:
48 callbackList = self.callbacks[jid, nodeIdentifier]
49 except KeyError:
50 d = self.subscribe(jid, nodeIdentifier, self.jid)
51 d.addCallback(newCallbackList)
52 else:
53 d = defer.succeed(callbackList)
54
55 d.addCallback(lambda callbackList: callbackList.add(callback))
56 d.addErrback(self.trapNotFound)
57 return d
58
59 def unsubscribeCallback(self, jid, nodeIdentifier, callback):
60 try:
61 callbackList = self.callbacks[jid, nodeIdentifier]
62 callbackList.remove(callback)
63 except KeyError:
64 return defer.fail(error.NotSubscribed())
65
66 if not callbackList:
67 self.unsubscribe(jid, nodeIdentifier, self.jid)
68
69 return defer.succeed(None)
70
71 def itemsReceived(self, recipient, service, nodeIdentifier, items):
72 """
73 Fire up HTTP client to do callback
74 """
75
76 # Collect atom entries
77 atomEntries = []
78 for item in items:
79 # ignore non-items (i.e. retractions)
80 if item.name != 'item':
81 continue
82
83 atomEntry = None
84 for element in item.elements():
85 # extract the first element that is an atom entry
86 if element.uri == NS_ATOM and element.name == 'entry':
87 atomEntry = element
88 break
89
90 if atomEntry:
91 atomEntries.append(atomEntry)
92
93 # Don't notify if there are no atom entries
94 if not atomEntries:
95 return
96
97 if len(atomEntries) == 1:
98 contentType = 'application/atom+xml;type=entry'
99 payload = atomEntries[0]
100 else:
101 contentType = 'application/atom+xml;type=feed'
102 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
103 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
104
105 # Collect the received entries in a feed
106 payload = domish.Element((NS_ATOM, 'feed'))
107 payload.addElement('title', content='Received item collection')
108 payload.addElement('id', content=nodeURI)
109 payload.addElement('updated', content=now)
110 for atomEntry in atomEntries:
111 payload.addChild(atomEntry)
112
113 self.callCallbacks(recipient, service, nodeIdentifier, payload,
114 contentType)
115
116 def deleteReceived(self, recipient, service, nodeIdentifier):
117 """
118 Fire up HTTP client to do callback
119 """
120
121 self.callCallbacks(recipient, service, nodeIdentifier,
122 eventType='DELETED')
123
124 def callCallbacks(self, recipient, service, nodeIdentifier,
125 payload=None, contentType=None, eventType=None):
126 try:
127 callbacks = self.callbacks[service, nodeIdentifier]
128 except KeyError:
129 return
130
131 postdata = None
132 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
133 headers = {'Referer': nodeURI.encode('utf-8'),
134 'PubSub-Service': service.full().encode('utf-8')}
135
136 if payload:
137 postdata = payload.toXml().encode('utf-8')
138 if contentType:
139 headers['Content-Type'] = "%s;charset=utf-8" % contentType
140
141 if eventType:
142 headers['Event'] = eventType
143
144 def postNotification(callbackURI):
145 return client.getPage(str(callbackURI),
146 method='POST',
147 postdata=postdata,
148 headers=headers)
149
150 for callbackURI in callbacks:
151 reactor.callLater(0, postNotification, callbackURI)
152
153
154 class WebStreamParser(object):
155 def __init__(self):
156 self.elementStream = domish.elementStream()
157 self.elementStream.DocumentStartEvent = self.docStart
158 self.elementStream.ElementEvent = self.elem
159 self.elementStream.DocumentEndEvent = self.docEnd
160 self.done = False
161
162 def docStart(self, elem):
163 self.document = elem
164
165 def elem(self, elem):
166 self.document.addChild(elem)
167
168 def docEnd(self):
169 self.done = True
170
171 def parse(self, stream):
172 def endOfStream(result):
173 if not self.done:
174 raise Exception("No more stuff?")
175 else:
176 return self.document
177
178 d = readStream(stream, self.elementStream.parse)
179 d.addCallback(endOfStream)
180 return d
181
182
183 class XMPPURIParseError(ValueError):
184 """
185 Raised when a given XMPP URI couldn't be properly parsed.
186 """
187
188
189 def getServiceAndNode(uri):
190 """
191 Given an XMPP URI, extract the publish subscribe service JID and node ID.
192 """
193
194 try:
195 scheme, rest = uri.split(':', 1)
196 except ValueError:
197 raise XMPPURIParseError("No URI scheme component")
198
199 if scheme != 'xmpp':
200 raise XMPPURIParseError("Unknown URI scheme")
201
202 if rest.startswith("//"):
203 raise XMPPURIParseError("Unexpected URI authority component")
204
205 try:
206 entity, query = rest.split('?', 1)
207 except ValueError:
208 raise XMPPURIParseError("No URI query component")
209
210 if not entity:
211 raise XMPPURIParseError("Empty URI path component")
212
213 try:
214 jid = JID(entity)
215 except Exception, e:
216 raise XMPPURIParseError("Invalid JID: %s" % e.message)
217
218 params = cgi.parse_qs(query)
219
220 try:
221 nodeIdentifier = params['node'][0]
222 except (KeyError, ValueError):
223 raise XMPPURIParseError("No node in query component of URI")
224
225 return jid, nodeIdentifier
226
227
228 class CreateResource(resource.Resource):
229 """
230 A resource to create a publish-subscribe node.
231 """
232 def __init__(self, backend, serviceJID, owner):
233 self.backend = backend
234 self.serviceJID = serviceJID
235 self.owner = owner
236
237 http_GET = None
238
239 def http_POST(self, request):
240 """
241 Respond to a POST request to create a new node.
242 """
243
244 def toResponse(nodeIdentifier):
245 uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier)
246 stream = simplejson.dumps({'uri': uri})
247 return http.Response(responsecode.OK, stream=stream)
248
249 d = self.backend.create_node(None, self.owner)
250 d.addCallback(toResponse)
251 return d
252
253
254 class DeleteResource(resource.Resource):
255 """
256 A resource to create a publish-subscribe node.
257 """
258 def __init__(self, backend, serviceJID, owner):
259 self.backend = backend
260 self.serviceJID = serviceJID
261 self.owner = owner
262
263 http_GET = None
264
265 def http_POST(self, request):
266 """
267 Respond to a POST request to create a new node.
268 """
269
270 def respond(result):
271 return http.Response(responsecode.NO_CONTENT)
272
273 def getNode():
274 if request.args.get('uri'):
275 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
276 return defer.succeed(nodeIdentifier)
277 else:
278 raise http.HTTPError(http.Response(responsecode.BAD_REQUEST,
279 "No URI given"))
280
281 def trapNotFound(failure):
282 failure.trap(error.NodeNotFound)
283 return http.StatusResponse(responsecode.NOT_FOUND,
284 "Node not found")
285
286 def trapXMPPURIParseError(failure):
287 failure.trap(XMPPURIParseError)
288 return http.StatusResponse(responsecode.BAD_REQUEST,
289 "Malformed XMPP URI: %s" % failure.value.message)
290
291 d = getNode()
292 d.addCallback(self.backend.delete_node, self.owner)
293 d.addCallback(respond)
294 d.addErrback(trapNotFound)
295 d.addErrback(trapXMPPURIParseError)
296 return d
297
298
299 class PublishResource(resource.Resource):
300 """
301 A resource to publish to a publish-subscribe node.
302 """
303
304 def __init__(self, backend, serviceJID, owner):
305 self.backend = backend
306 self.serviceJID = serviceJID
307 self.owner = owner
308
309 http_GET = None
310
311 def checkMediaType(self, request):
312 ctype = request.headers.getHeader('content-type')
313
314 if not ctype:
315 raise http.HTTPError(
316 http.StatusResponse(
317 responsecode.BAD_REQUEST,
318 "No specified Media Type"))
319
320 if (ctype.mediaType != 'application' or
321 ctype.mediaSubtype != 'atom+xml' or
322 ctype.params.get('type') != 'entry' or
323 ctype.params.get('charset', 'utf-8') != 'utf-8'):
324 raise http.HTTPError(
325 http.StatusResponse(
326 responsecode.UNSUPPORTED_MEDIA_TYPE,
327 "Unsupported Media Type: %s" %
328 http_headers.generateContentType(ctype)))
329
330 def parseXMLPayload(self, stream):
331 if not stream:
332 print "Stream is empty", repr(stream)
333 elif not stream.length:
334 print "Stream length is", repr(stream.length)
335 p = WebStreamParser()
336 return p.parse(stream)
337
338 def http_POST(self, request):
339 """
340 Respond to a POST request to create a new item.
341 """
342
343 def toResponse(nodeIdentifier):
344 uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier)
345 stream = simplejson.dumps({'uri': uri})
346 return http.Response(responsecode.OK, stream=stream)
347
348 def gotNode(nodeIdentifier, payload):
349 item = Item(id='current', payload=payload)
350 d = self.backend.publish(nodeIdentifier, [item], self.owner)
351 d.addCallback(lambda _: nodeIdentifier)
352 return d
353
354 def getNode():
355 if request.args.get('uri'):
356 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
357 return defer.succeed(nodeIdentifier)
358 else:
359 return self.backend.create_node(None, self.owner)
360
361 def doPublish(payload):
362 d = getNode()
363 d.addCallback(gotNode, payload)
364 return d
365
366 def trapNotFound(failure):
367 failure.trap(error.NodeNotFound)
368 return http.StatusResponse(responsecode.NOT_FOUND,
369 "Node not found")
370
371 def trapXMPPURIParseError(failure):
372 failure.trap(XMPPURIParseError)
373 return http.StatusResponse(responsecode.BAD_REQUEST,
374 "Malformed XMPP URI: %s" % failure.value.message)
375
376 self.checkMediaType(request)
377 d = self.parseXMLPayload(request.stream)
378 d.addCallback(doPublish)
379 d.addCallback(toResponse)
380 d.addErrback(trapNotFound)
381 d.addErrback(trapXMPPURIParseError)
382 return d
383
384
385 class SubscribeBaseResource(resource.Resource):
386 """
387 Base resource for remote pubsub node subscription and unsubscription.
388
389 This resource accepts POST request with a JSON document that holds
390 a dictionary with the keys C{uri} and C{callback} that respectively map
391 to the XMPP URI of the publish-subscribe node and the callback URI.
392
393 This class should be inherited with L{serviceMethod} overridden.
394
395 @cvar serviceMethod: The name of the method to be called with
396 the JID of the pubsub service, the node identifier
397 and the callback URI as received in the HTTP POST
398 request to this resource.
399 """
400 serviceMethod = None
401
402 def __init__(self, service):
403 self.service = service
404 self.params = None
405
406 http_GET = None
407
408 def http_POST(self, request):
409 def trapNotFound(failure):
410 failure.trap(error.NodeNotFound)
411 return http.StatusResponse(responsecode.NOT_FOUND,
412 "Node not found")
413
414 def respond(result):
415 return http.Response(responsecode.NO_CONTENT)
416
417 def gotRequest(result):
418 uri = self.params['uri']
419 callback = self.params['callback']
420
421 jid, nodeIdentifier = getServiceAndNode(uri)
422 method = getattr(self.service, self.serviceMethod)
423 d = method(jid, nodeIdentifier, callback)
424 return d
425
426 def storeParams(data):
427 self.params = simplejson.loads(data)
428
429 def trapXMPPURIParseError(failure):
430 failure.trap(XMPPURIParseError)
431 return http.StatusResponse(responsecode.BAD_REQUEST,
432 "Malformed XMPP URI: %s" % failure.value.message)
433
434 d = readStream(request.stream, storeParams)
435 d.addCallback(gotRequest)
436 d.addCallback(respond)
437 d.addErrback(trapNotFound)
438 d.addErrback(trapXMPPURIParseError)
439 return d
440
441
442 class SubscribeResource(SubscribeBaseResource):
443 """
444 Resource to subscribe to a remote publish-subscribe node.
445
446 The passed C{uri} is the XMPP URI of the node to subscribe to and the
447 C{callback} is the callback URI. Upon receiving notifications from the
448 node, a POST request will be perfomed on the callback URI.
449 """
450 serviceMethod = 'subscribeCallback'
451
452
453 class UnsubscribeResource(SubscribeBaseResource):
454 """
455 Resource to unsubscribe from a remote publish-subscribe node.
456
457 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the
458 C{callback} is the callback URI that was registered for it.
459 """
460 serviceMethod = 'unsubscribeCallback'
461
462
463 class ListResource(resource.Resource):
464 def __init__(self, service):
465 self.service = service
466
467 def render(self, request):
468 def responseFromNodes(nodeIdentifiers):
469 import pprint
470 return http.Response(responsecode.OK, stream=pprint.pformat(nodeIdentifiers))
471
472 d = self.service.get_nodes()
473 d.addCallback(responseFromNodes)
474 return d