Mercurial > libervia-pubsub
comparison idavoll/gateway.py @ 177:faf1c9bc2612
Add HTTP gateway in a separate plugin.
Author: ralphm
Fixes #8.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Thu, 10 Apr 2008 11:18:29 +0000 |
parents | |
children | c21b986cff30 |
comparison
equal
deleted
inserted
replaced
176:17fc5dd77158 | 177:faf1c9bc2612 |
---|---|
1 # Copyright (c) 2003-2008 Ralph Meijer | |
2 # See LICENSE for details. | |
3 | |
4 import cgi | |
5 from time import gmtime, strftime | |
6 | |
7 import simplejson | |
8 | |
9 from twisted.application import service | |
10 from twisted.internet import defer, reactor | |
11 from twisted.web import client | |
12 from twisted.web2 import http, http_headers, resource, responsecode | |
13 from twisted.web2.stream import readStream | |
14 from twisted.words.protocols.jabber.jid import JID | |
15 from twisted.words.protocols.jabber.error import StanzaError | |
16 from twisted.words.xish import domish | |
17 | |
18 from wokkel.pubsub import Item | |
19 from wokkel.pubsub import PubSubClient | |
20 | |
21 from idavoll import error | |
22 | |
23 NS_ATOM = 'http://www.w3.org/2005/Atom' | |
24 | |
25 | |
26 class RemoteSubscriptionService(service.Service, PubSubClient): | |
27 | |
28 def __init__(self, jid): | |
29 self.jid = jid | |
30 | |
31 def startService(self): | |
32 self.callbacks = {} | |
33 | |
34 def trapNotFound(self, failure): | |
35 failure.trap(StanzaError) | |
36 if not failure.value.condition == 'item-not-found': | |
37 raise failure | |
38 raise error.NodeNotFound | |
39 | |
40 def subscribeCallback(self, jid, nodeIdentifier, callback): | |
41 | |
42 def newCallbackList(result): | |
43 callbackList = set() | |
44 self.callbacks[jid, nodeIdentifier] = callbackList | |
45 return callbackList | |
46 | |
47 try: | |
48 callbackList = self.callbacks[jid, nodeIdentifier] | |
49 except KeyError: | |
50 d = self.subscribe(jid, nodeIdentifier, self.jid) | |
51 d.addCallback(newCallbackList) | |
52 else: | |
53 d = defer.succeed(callbackList) | |
54 | |
55 d.addCallback(lambda callbackList: callbackList.add(callback)) | |
56 d.addErrback(self.trapNotFound) | |
57 return d | |
58 | |
59 def unsubscribeCallback(self, jid, nodeIdentifier, callback): | |
60 try: | |
61 callbackList = self.callbacks[jid, nodeIdentifier] | |
62 callbackList.remove(callback) | |
63 except KeyError: | |
64 return defer.fail(error.NotSubscribed()) | |
65 | |
66 if not callbackList: | |
67 self.unsubscribe(jid, nodeIdentifier, self.jid) | |
68 | |
69 return defer.succeed(None) | |
70 | |
71 def itemsReceived(self, recipient, service, nodeIdentifier, items): | |
72 """ | |
73 Fire up HTTP client to do callback | |
74 """ | |
75 | |
76 # Collect atom entries | |
77 atomEntries = [] | |
78 for item in items: | |
79 # ignore non-items (i.e. retractions) | |
80 if item.name != 'item': | |
81 continue | |
82 | |
83 atomEntry = None | |
84 for element in item.elements(): | |
85 # extract the first element that is an atom entry | |
86 if element.uri == NS_ATOM and element.name == 'entry': | |
87 atomEntry = element | |
88 break | |
89 | |
90 if atomEntry: | |
91 atomEntries.append(atomEntry) | |
92 | |
93 # Don't notify if there are no atom entries | |
94 if not atomEntries: | |
95 return | |
96 | |
97 if len(atomEntries) == 1: | |
98 contentType = 'application/atom+xml;type=entry' | |
99 payload = atomEntries[0] | |
100 else: | |
101 contentType = 'application/atom+xml;type=feed' | |
102 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) | |
103 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) | |
104 | |
105 # Collect the received entries in a feed | |
106 payload = domish.Element((NS_ATOM, 'feed')) | |
107 payload.addElement('title', content='Received item collection') | |
108 payload.addElement('id', content=nodeURI) | |
109 payload.addElement('updated', content=now) | |
110 for atomEntry in atomEntries: | |
111 payload.addChild(atomEntry) | |
112 | |
113 self.callCallbacks(recipient, service, nodeIdentifier, payload, | |
114 contentType) | |
115 | |
116 def deleteReceived(self, recipient, service, nodeIdentifier): | |
117 """ | |
118 Fire up HTTP client to do callback | |
119 """ | |
120 | |
121 self.callCallbacks(recipient, service, nodeIdentifier, | |
122 eventType='DELETED') | |
123 | |
124 def callCallbacks(self, recipient, service, nodeIdentifier, | |
125 payload=None, contentType=None, eventType=None): | |
126 try: | |
127 callbacks = self.callbacks[service, nodeIdentifier] | |
128 except KeyError: | |
129 return | |
130 | |
131 postdata = None | |
132 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) | |
133 headers = {'Referer': nodeURI.encode('utf-8'), | |
134 'PubSub-Service': service.full().encode('utf-8')} | |
135 | |
136 if payload: | |
137 postdata = payload.toXml().encode('utf-8') | |
138 if contentType: | |
139 headers['Content-Type'] = "%s;charset=utf-8" % contentType | |
140 | |
141 if eventType: | |
142 headers['Event'] = eventType | |
143 | |
144 def postNotification(callbackURI): | |
145 return client.getPage(str(callbackURI), | |
146 method='POST', | |
147 postdata=postdata, | |
148 headers=headers) | |
149 | |
150 for callbackURI in callbacks: | |
151 reactor.callLater(0, postNotification, callbackURI) | |
152 | |
153 | |
154 class WebStreamParser(object): | |
155 def __init__(self): | |
156 self.elementStream = domish.elementStream() | |
157 self.elementStream.DocumentStartEvent = self.docStart | |
158 self.elementStream.ElementEvent = self.elem | |
159 self.elementStream.DocumentEndEvent = self.docEnd | |
160 self.done = False | |
161 | |
162 def docStart(self, elem): | |
163 self.document = elem | |
164 | |
165 def elem(self, elem): | |
166 self.document.addChild(elem) | |
167 | |
168 def docEnd(self): | |
169 self.done = True | |
170 | |
171 def parse(self, stream): | |
172 def endOfStream(result): | |
173 if not self.done: | |
174 raise Exception("No more stuff?") | |
175 else: | |
176 return self.document | |
177 | |
178 d = readStream(stream, self.elementStream.parse) | |
179 d.addCallback(endOfStream) | |
180 return d | |
181 | |
182 | |
183 class XMPPURIParseError(ValueError): | |
184 """ | |
185 Raised when a given XMPP URI couldn't be properly parsed. | |
186 """ | |
187 | |
188 | |
189 def getServiceAndNode(uri): | |
190 """ | |
191 Given an XMPP URI, extract the publish subscribe service JID and node ID. | |
192 """ | |
193 | |
194 try: | |
195 scheme, rest = uri.split(':', 1) | |
196 except ValueError: | |
197 raise XMPPURIParseError("No URI scheme component") | |
198 | |
199 if scheme != 'xmpp': | |
200 raise XMPPURIParseError("Unknown URI scheme") | |
201 | |
202 if rest.startswith("//"): | |
203 raise XMPPURIParseError("Unexpected URI authority component") | |
204 | |
205 try: | |
206 entity, query = rest.split('?', 1) | |
207 except ValueError: | |
208 raise XMPPURIParseError("No URI query component") | |
209 | |
210 if not entity: | |
211 raise XMPPURIParseError("Empty URI path component") | |
212 | |
213 try: | |
214 jid = JID(entity) | |
215 except Exception, e: | |
216 raise XMPPURIParseError("Invalid JID: %s" % e.message) | |
217 | |
218 params = cgi.parse_qs(query) | |
219 | |
220 try: | |
221 nodeIdentifier = params['node'][0] | |
222 except (KeyError, ValueError): | |
223 raise XMPPURIParseError("No node in query component of URI") | |
224 | |
225 return jid, nodeIdentifier | |
226 | |
227 | |
228 class CreateResource(resource.Resource): | |
229 """ | |
230 A resource to create a publish-subscribe node. | |
231 """ | |
232 def __init__(self, backend, serviceJID, owner): | |
233 self.backend = backend | |
234 self.serviceJID = serviceJID | |
235 self.owner = owner | |
236 | |
237 http_GET = None | |
238 | |
239 def http_POST(self, request): | |
240 """ | |
241 Respond to a POST request to create a new node. | |
242 """ | |
243 | |
244 def toResponse(nodeIdentifier): | |
245 uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier) | |
246 stream = simplejson.dumps({'uri': uri}) | |
247 return http.Response(responsecode.OK, stream=stream) | |
248 | |
249 d = self.backend.create_node(None, self.owner) | |
250 d.addCallback(toResponse) | |
251 return d | |
252 | |
253 | |
254 class DeleteResource(resource.Resource): | |
255 """ | |
256 A resource to create a publish-subscribe node. | |
257 """ | |
258 def __init__(self, backend, serviceJID, owner): | |
259 self.backend = backend | |
260 self.serviceJID = serviceJID | |
261 self.owner = owner | |
262 | |
263 http_GET = None | |
264 | |
265 def http_POST(self, request): | |
266 """ | |
267 Respond to a POST request to create a new node. | |
268 """ | |
269 | |
270 def respond(result): | |
271 return http.Response(responsecode.NO_CONTENT) | |
272 | |
273 def getNode(): | |
274 if request.args.get('uri'): | |
275 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) | |
276 return defer.succeed(nodeIdentifier) | |
277 else: | |
278 raise http.HTTPError(http.Response(responsecode.BAD_REQUEST, | |
279 "No URI given")) | |
280 | |
281 def trapNotFound(failure): | |
282 failure.trap(error.NodeNotFound) | |
283 return http.StatusResponse(responsecode.NOT_FOUND, | |
284 "Node not found") | |
285 | |
286 def trapXMPPURIParseError(failure): | |
287 failure.trap(XMPPURIParseError) | |
288 return http.StatusResponse(responsecode.BAD_REQUEST, | |
289 "Malformed XMPP URI: %s" % failure.value.message) | |
290 | |
291 d = getNode() | |
292 d.addCallback(self.backend.delete_node, self.owner) | |
293 d.addCallback(respond) | |
294 d.addErrback(trapNotFound) | |
295 d.addErrback(trapXMPPURIParseError) | |
296 return d | |
297 | |
298 | |
299 class PublishResource(resource.Resource): | |
300 """ | |
301 A resource to publish to a publish-subscribe node. | |
302 """ | |
303 | |
304 def __init__(self, backend, serviceJID, owner): | |
305 self.backend = backend | |
306 self.serviceJID = serviceJID | |
307 self.owner = owner | |
308 | |
309 http_GET = None | |
310 | |
311 def checkMediaType(self, request): | |
312 ctype = request.headers.getHeader('content-type') | |
313 | |
314 if not ctype: | |
315 raise http.HTTPError( | |
316 http.StatusResponse( | |
317 responsecode.BAD_REQUEST, | |
318 "No specified Media Type")) | |
319 | |
320 if (ctype.mediaType != 'application' or | |
321 ctype.mediaSubtype != 'atom+xml' or | |
322 ctype.params.get('type') != 'entry' or | |
323 ctype.params.get('charset', 'utf-8') != 'utf-8'): | |
324 raise http.HTTPError( | |
325 http.StatusResponse( | |
326 responsecode.UNSUPPORTED_MEDIA_TYPE, | |
327 "Unsupported Media Type: %s" % | |
328 http_headers.generateContentType(ctype))) | |
329 | |
330 def parseXMLPayload(self, stream): | |
331 if not stream: | |
332 print "Stream is empty", repr(stream) | |
333 elif not stream.length: | |
334 print "Stream length is", repr(stream.length) | |
335 p = WebStreamParser() | |
336 return p.parse(stream) | |
337 | |
338 def http_POST(self, request): | |
339 """ | |
340 Respond to a POST request to create a new item. | |
341 """ | |
342 | |
343 def toResponse(nodeIdentifier): | |
344 uri = 'xmpp:%s?;node=%s' % (self.serviceJID.full(), nodeIdentifier) | |
345 stream = simplejson.dumps({'uri': uri}) | |
346 return http.Response(responsecode.OK, stream=stream) | |
347 | |
348 def gotNode(nodeIdentifier, payload): | |
349 item = Item(id='current', payload=payload) | |
350 d = self.backend.publish(nodeIdentifier, [item], self.owner) | |
351 d.addCallback(lambda _: nodeIdentifier) | |
352 return d | |
353 | |
354 def getNode(): | |
355 if request.args.get('uri'): | |
356 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) | |
357 return defer.succeed(nodeIdentifier) | |
358 else: | |
359 return self.backend.create_node(None, self.owner) | |
360 | |
361 def doPublish(payload): | |
362 d = getNode() | |
363 d.addCallback(gotNode, payload) | |
364 return d | |
365 | |
366 def trapNotFound(failure): | |
367 failure.trap(error.NodeNotFound) | |
368 return http.StatusResponse(responsecode.NOT_FOUND, | |
369 "Node not found") | |
370 | |
371 def trapXMPPURIParseError(failure): | |
372 failure.trap(XMPPURIParseError) | |
373 return http.StatusResponse(responsecode.BAD_REQUEST, | |
374 "Malformed XMPP URI: %s" % failure.value.message) | |
375 | |
376 self.checkMediaType(request) | |
377 d = self.parseXMLPayload(request.stream) | |
378 d.addCallback(doPublish) | |
379 d.addCallback(toResponse) | |
380 d.addErrback(trapNotFound) | |
381 d.addErrback(trapXMPPURIParseError) | |
382 return d | |
383 | |
384 | |
385 class SubscribeBaseResource(resource.Resource): | |
386 """ | |
387 Base resource for remote pubsub node subscription and unsubscription. | |
388 | |
389 This resource accepts POST request with a JSON document that holds | |
390 a dictionary with the keys C{uri} and C{callback} that respectively map | |
391 to the XMPP URI of the publish-subscribe node and the callback URI. | |
392 | |
393 This class should be inherited with L{serviceMethod} overridden. | |
394 | |
395 @cvar serviceMethod: The name of the method to be called with | |
396 the JID of the pubsub service, the node identifier | |
397 and the callback URI as received in the HTTP POST | |
398 request to this resource. | |
399 """ | |
400 serviceMethod = None | |
401 | |
402 def __init__(self, service): | |
403 self.service = service | |
404 self.params = None | |
405 | |
406 http_GET = None | |
407 | |
408 def http_POST(self, request): | |
409 def trapNotFound(failure): | |
410 failure.trap(error.NodeNotFound) | |
411 return http.StatusResponse(responsecode.NOT_FOUND, | |
412 "Node not found") | |
413 | |
414 def respond(result): | |
415 return http.Response(responsecode.NO_CONTENT) | |
416 | |
417 def gotRequest(result): | |
418 uri = self.params['uri'] | |
419 callback = self.params['callback'] | |
420 | |
421 jid, nodeIdentifier = getServiceAndNode(uri) | |
422 method = getattr(self.service, self.serviceMethod) | |
423 d = method(jid, nodeIdentifier, callback) | |
424 return d | |
425 | |
426 def storeParams(data): | |
427 self.params = simplejson.loads(data) | |
428 | |
429 def trapXMPPURIParseError(failure): | |
430 failure.trap(XMPPURIParseError) | |
431 return http.StatusResponse(responsecode.BAD_REQUEST, | |
432 "Malformed XMPP URI: %s" % failure.value.message) | |
433 | |
434 d = readStream(request.stream, storeParams) | |
435 d.addCallback(gotRequest) | |
436 d.addCallback(respond) | |
437 d.addErrback(trapNotFound) | |
438 d.addErrback(trapXMPPURIParseError) | |
439 return d | |
440 | |
441 | |
442 class SubscribeResource(SubscribeBaseResource): | |
443 """ | |
444 Resource to subscribe to a remote publish-subscribe node. | |
445 | |
446 The passed C{uri} is the XMPP URI of the node to subscribe to and the | |
447 C{callback} is the callback URI. Upon receiving notifications from the | |
448 node, a POST request will be perfomed on the callback URI. | |
449 """ | |
450 serviceMethod = 'subscribeCallback' | |
451 | |
452 | |
453 class UnsubscribeResource(SubscribeBaseResource): | |
454 """ | |
455 Resource to unsubscribe from a remote publish-subscribe node. | |
456 | |
457 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the | |
458 C{callback} is the callback URI that was registered for it. | |
459 """ | |
460 serviceMethod = 'unsubscribeCallback' | |
461 | |
462 | |
463 class ListResource(resource.Resource): | |
464 def __init__(self, service): | |
465 self.service = service | |
466 | |
467 def render(self, request): | |
468 def responseFromNodes(nodeIdentifiers): | |
469 import pprint | |
470 return http.Response(responsecode.OK, stream=pprint.pformat(nodeIdentifiers)) | |
471 | |
472 d = self.service.get_nodes() | |
473 d.addCallback(responseFromNodes) | |
474 return d |