Mercurial > libervia-pubsub
comparison idavoll/gateway.py @ 185:9038908dc2f5
Add gateway support for retrieving items from a node. Reorder gateway module.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 15 Apr 2008 17:32:56 +0000 |
parents | c21b986cff30 |
children | 365fd3e4daf8 |
comparison
equal
deleted
inserted
replaced
184:bd88658dbca3 | 185:9038908dc2f5 |
---|---|
32 | 32 |
33 NS_ATOM = 'http://www.w3.org/2005/Atom' | 33 NS_ATOM = 'http://www.w3.org/2005/Atom' |
34 MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' | 34 MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' |
35 MIME_JSON = 'application/json' | 35 MIME_JSON = 'application/json' |
36 | 36 |
37 class RemoteSubscriptionService(service.Service, PubSubClient): | 37 class XMPPURIParseError(ValueError): |
38 | 38 """ |
39 def __init__(self, jid): | 39 Raised when a given XMPP URI couldn't be properly parsed. |
40 self.jid = jid | 40 """ |
41 | 41 |
42 def startService(self): | 42 |
43 self.callbacks = {} | 43 |
44 | 44 def getServiceAndNode(uri): |
45 def trapNotFound(self, failure): | 45 """ |
46 failure.trap(StanzaError) | 46 Given an XMPP URI, extract the publish subscribe service JID and node ID. |
47 if not failure.value.condition == 'item-not-found': | 47 """ |
48 raise failure | 48 |
49 raise error.NodeNotFound | 49 try: |
50 | 50 scheme, rest = uri.split(':', 1) |
51 def subscribeCallback(self, jid, nodeIdentifier, callback): | 51 except ValueError: |
52 | 52 raise XMPPURIParseError("No URI scheme component") |
53 def newCallbackList(result): | 53 |
54 callbackList = set() | 54 if scheme != 'xmpp': |
55 self.callbacks[jid, nodeIdentifier] = callbackList | 55 raise XMPPURIParseError("Unknown URI scheme") |
56 return callbackList | 56 |
57 | 57 if rest.startswith("//"): |
58 try: | 58 raise XMPPURIParseError("Unexpected URI authority component") |
59 callbackList = self.callbacks[jid, nodeIdentifier] | 59 |
60 except KeyError: | 60 try: |
61 d = self.subscribe(jid, nodeIdentifier, self.jid) | 61 entity, query = rest.split('?', 1) |
62 d.addCallback(newCallbackList) | 62 except ValueError: |
63 else: | 63 raise XMPPURIParseError("No URI query component") |
64 d = defer.succeed(callbackList) | 64 |
65 | 65 if not entity: |
66 d.addCallback(lambda callbackList: callbackList.add(callback)) | 66 raise XMPPURIParseError("Empty URI path component") |
67 d.addErrback(self.trapNotFound) | 67 |
68 return d | 68 try: |
69 | 69 jid = JID(entity) |
70 def unsubscribeCallback(self, jid, nodeIdentifier, callback): | 70 except Exception, e: |
71 try: | 71 raise XMPPURIParseError("Invalid JID: %s" % e.message) |
72 callbackList = self.callbacks[jid, nodeIdentifier] | 72 |
73 callbackList.remove(callback) | 73 params = cgi.parse_qs(query) |
74 except KeyError: | 74 |
75 return defer.fail(error.NotSubscribed()) | 75 try: |
76 | 76 nodeIdentifier = params['node'][0] |
77 if not callbackList: | 77 except (KeyError, ValueError): |
78 self.unsubscribe(jid, nodeIdentifier, self.jid) | 78 raise XMPPURIParseError("No node in query component of URI") |
79 | 79 |
80 return defer.succeed(None) | 80 return jid, nodeIdentifier |
81 | 81 |
82 def itemsReceived(self, recipient, service, nodeIdentifier, items): | |
83 """ | |
84 Fire up HTTP client to do callback | |
85 """ | |
86 | |
87 # Collect atom entries | |
88 atomEntries = [] | |
89 for item in items: | |
90 # ignore non-items (i.e. retractions) | |
91 if item.name != 'item': | |
92 continue | |
93 | |
94 atomEntry = None | |
95 for element in item.elements(): | |
96 # extract the first element that is an atom entry | |
97 if element.uri == NS_ATOM and element.name == 'entry': | |
98 atomEntry = element | |
99 break | |
100 | |
101 if atomEntry: | |
102 atomEntries.append(atomEntry) | |
103 | |
104 # Don't notify if there are no atom entries | |
105 if not atomEntries: | |
106 return | |
107 | |
108 if len(atomEntries) == 1: | |
109 contentType = 'application/atom+xml;type=entry' | |
110 payload = atomEntries[0] | |
111 else: | |
112 contentType = 'application/atom+xml;type=feed' | |
113 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) | |
114 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) | |
115 | |
116 # Collect the received entries in a feed | |
117 payload = domish.Element((NS_ATOM, 'feed')) | |
118 payload.addElement('title', content='Received item collection') | |
119 payload.addElement('id', content=nodeURI) | |
120 payload.addElement('updated', content=now) | |
121 for atomEntry in atomEntries: | |
122 payload.addChild(atomEntry) | |
123 | |
124 self.callCallbacks(recipient, service, nodeIdentifier, payload, | |
125 contentType) | |
126 | |
127 def deleteReceived(self, recipient, service, nodeIdentifier): | |
128 """ | |
129 Fire up HTTP client to do callback | |
130 """ | |
131 | |
132 self.callCallbacks(recipient, service, nodeIdentifier, | |
133 eventType='DELETED') | |
134 | |
135 def callCallbacks(self, recipient, service, nodeIdentifier, | |
136 payload=None, contentType=None, eventType=None): | |
137 try: | |
138 callbacks = self.callbacks[service, nodeIdentifier] | |
139 except KeyError: | |
140 return | |
141 | |
142 postdata = None | |
143 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) | |
144 headers = {'Referer': nodeURI.encode('utf-8'), | |
145 'PubSub-Service': service.full().encode('utf-8')} | |
146 | |
147 if payload: | |
148 postdata = payload.toXml().encode('utf-8') | |
149 if contentType: | |
150 headers['Content-Type'] = "%s;charset=utf-8" % contentType | |
151 | |
152 if eventType: | |
153 headers['Event'] = eventType | |
154 | |
155 def postNotification(callbackURI): | |
156 d = client.getPage(str(callbackURI), | |
157 method='POST', | |
158 postdata=postdata, | |
159 headers=headers) | |
160 d.addErrback(log.err) | |
161 | |
162 for callbackURI in callbacks: | |
163 reactor.callLater(0, postNotification, callbackURI) | |
164 | 82 |
165 | 83 |
166 class WebStreamParser(object): | 84 class WebStreamParser(object): |
167 def __init__(self): | 85 def __init__(self): |
168 self.elementStream = domish.elementStream() | 86 self.elementStream = domish.elementStream() |
169 self.elementStream.DocumentStartEvent = self.docStart | 87 self.elementStream.DocumentStartEvent = self.docStart |
170 self.elementStream.ElementEvent = self.elem | 88 self.elementStream.ElementEvent = self.elem |
171 self.elementStream.DocumentEndEvent = self.docEnd | 89 self.elementStream.DocumentEndEvent = self.docEnd |
172 self.done = False | 90 self.done = False |
173 | 91 |
92 | |
174 def docStart(self, elem): | 93 def docStart(self, elem): |
175 self.document = elem | 94 self.document = elem |
176 | 95 |
96 | |
177 def elem(self, elem): | 97 def elem(self, elem): |
178 self.document.addChild(elem) | 98 self.document.addChild(elem) |
179 | 99 |
100 | |
180 def docEnd(self): | 101 def docEnd(self): |
181 self.done = True | 102 self.done = True |
103 | |
182 | 104 |
183 def parse(self, stream): | 105 def parse(self, stream): |
184 def endOfStream(result): | 106 def endOfStream(result): |
185 if not self.done: | 107 if not self.done: |
186 raise Exception("No more stuff?") | 108 raise Exception("No more stuff?") |
190 d = readStream(stream, self.elementStream.parse) | 112 d = readStream(stream, self.elementStream.parse) |
191 d.addCallback(endOfStream) | 113 d.addCallback(endOfStream) |
192 return d | 114 return d |
193 | 115 |
194 | 116 |
195 class XMPPURIParseError(ValueError): | |
196 """ | |
197 Raised when a given XMPP URI couldn't be properly parsed. | |
198 """ | |
199 | |
200 | |
201 def getServiceAndNode(uri): | |
202 """ | |
203 Given an XMPP URI, extract the publish subscribe service JID and node ID. | |
204 """ | |
205 | |
206 try: | |
207 scheme, rest = uri.split(':', 1) | |
208 except ValueError: | |
209 raise XMPPURIParseError("No URI scheme component") | |
210 | |
211 if scheme != 'xmpp': | |
212 raise XMPPURIParseError("Unknown URI scheme") | |
213 | |
214 if rest.startswith("//"): | |
215 raise XMPPURIParseError("Unexpected URI authority component") | |
216 | |
217 try: | |
218 entity, query = rest.split('?', 1) | |
219 except ValueError: | |
220 raise XMPPURIParseError("No URI query component") | |
221 | |
222 if not entity: | |
223 raise XMPPURIParseError("Empty URI path component") | |
224 | |
225 try: | |
226 jid = JID(entity) | |
227 except Exception, e: | |
228 raise XMPPURIParseError("Invalid JID: %s" % e.message) | |
229 | |
230 params = cgi.parse_qs(query) | |
231 | |
232 try: | |
233 nodeIdentifier = params['node'][0] | |
234 except (KeyError, ValueError): | |
235 raise XMPPURIParseError("No node in query component of URI") | |
236 | |
237 return jid, nodeIdentifier | |
238 | |
239 | 117 |
240 class CreateResource(resource.Resource): | 118 class CreateResource(resource.Resource): |
241 """ | 119 """ |
242 A resource to create a publish-subscribe node. | 120 A resource to create a publish-subscribe node. |
243 """ | 121 """ |
244 def __init__(self, backend, serviceJID, owner): | 122 def __init__(self, backend, serviceJID, owner): |
245 self.backend = backend | 123 self.backend = backend |
246 self.serviceJID = serviceJID | 124 self.serviceJID = serviceJID |
247 self.owner = owner | 125 self.owner = owner |
248 | 126 |
127 | |
249 http_GET = None | 128 http_GET = None |
129 | |
250 | 130 |
251 def http_POST(self, request): | 131 def http_POST(self, request): |
252 """ | 132 """ |
253 Respond to a POST request to create a new node. | 133 Respond to a POST request to create a new node. |
254 """ | 134 """ |
261 d = self.backend.create_node(None, self.owner) | 141 d = self.backend.create_node(None, self.owner) |
262 d.addCallback(toResponse) | 142 d.addCallback(toResponse) |
263 return d | 143 return d |
264 | 144 |
265 | 145 |
146 | |
266 class DeleteResource(resource.Resource): | 147 class DeleteResource(resource.Resource): |
267 """ | 148 """ |
268 A resource to create a publish-subscribe node. | 149 A resource to create a publish-subscribe node. |
269 """ | 150 """ |
270 def __init__(self, backend, serviceJID, owner): | 151 def __init__(self, backend, serviceJID, owner): |
271 self.backend = backend | 152 self.backend = backend |
272 self.serviceJID = serviceJID | 153 self.serviceJID = serviceJID |
273 self.owner = owner | 154 self.owner = owner |
274 | 155 |
156 | |
275 http_GET = None | 157 http_GET = None |
158 | |
276 | 159 |
277 def http_POST(self, request): | 160 def http_POST(self, request): |
278 """ | 161 """ |
279 Respond to a POST request to create a new node. | 162 Respond to a POST request to create a new node. |
280 """ | 163 """ |
306 d.addErrback(trapNotFound) | 189 d.addErrback(trapNotFound) |
307 d.addErrback(trapXMPPURIParseError) | 190 d.addErrback(trapXMPPURIParseError) |
308 return d | 191 return d |
309 | 192 |
310 | 193 |
194 | |
311 class PublishResource(resource.Resource): | 195 class PublishResource(resource.Resource): |
312 """ | 196 """ |
313 A resource to publish to a publish-subscribe node. | 197 A resource to publish to a publish-subscribe node. |
314 """ | 198 """ |
315 | 199 |
316 def __init__(self, backend, serviceJID, owner): | 200 def __init__(self, backend, serviceJID, owner): |
317 self.backend = backend | 201 self.backend = backend |
318 self.serviceJID = serviceJID | 202 self.serviceJID = serviceJID |
319 self.owner = owner | 203 self.owner = owner |
320 | 204 |
205 | |
321 http_GET = None | 206 http_GET = None |
207 | |
322 | 208 |
323 def checkMediaType(self, request): | 209 def checkMediaType(self, request): |
324 ctype = request.headers.getHeader('content-type') | 210 ctype = request.headers.getHeader('content-type') |
325 | 211 |
326 if not ctype: | 212 if not ctype: |
337 http.StatusResponse( | 223 http.StatusResponse( |
338 responsecode.UNSUPPORTED_MEDIA_TYPE, | 224 responsecode.UNSUPPORTED_MEDIA_TYPE, |
339 "Unsupported Media Type: %s" % | 225 "Unsupported Media Type: %s" % |
340 http_headers.generateContentType(ctype))) | 226 http_headers.generateContentType(ctype))) |
341 | 227 |
228 | |
342 def parseXMLPayload(self, stream): | 229 def parseXMLPayload(self, stream): |
343 p = WebStreamParser() | 230 p = WebStreamParser() |
344 return p.parse(stream) | 231 return p.parse(stream) |
232 | |
345 | 233 |
346 def http_POST(self, request): | 234 def http_POST(self, request): |
347 """ | 235 """ |
348 Respond to a POST request to create a new item. | 236 Respond to a POST request to create a new item. |
349 """ | 237 """ |
388 d.addErrback(trapNotFound) | 276 d.addErrback(trapNotFound) |
389 d.addErrback(trapXMPPURIParseError) | 277 d.addErrback(trapXMPPURIParseError) |
390 return d | 278 return d |
391 | 279 |
392 | 280 |
393 class SubscribeBaseResource(resource.Resource): | 281 |
282 class ListResource(resource.Resource): | |
283 def __init__(self, service): | |
284 self.service = service | |
285 | |
286 | |
287 def render(self, request): | |
288 def responseFromNodes(nodeIdentifiers): | |
289 return http.Response(responsecode.OK, | |
290 stream=simplejson.dumps(nodeIdentifiers)) | |
291 | |
292 d = self.service.get_nodes() | |
293 d.addCallback(responseFromNodes) | |
294 return d | |
295 | |
296 | |
297 | |
298 # Service for subscribing to remote XMPP Pubsub nodes and web resources | |
299 | |
300 def extractAtomEntries(items): | |
301 """ | |
302 Extract atom entries from a list of publish-subscribe items. | |
303 | |
304 @param items: List of L{domish.Element}s that represent publish-subscribe | |
305 items. | |
306 @type items: C{list} | |
307 """ | |
308 | |
309 atomEntries = [] | |
310 | |
311 for item in items: | |
312 # ignore non-items (i.e. retractions) | |
313 if item.name != 'item': | |
314 continue | |
315 | |
316 atomEntry = None | |
317 for element in item.elements(): | |
318 # extract the first element that is an atom entry | |
319 if element.uri == NS_ATOM and element.name == 'entry': | |
320 atomEntry = element | |
321 break | |
322 | |
323 if atomEntry: | |
324 atomEntries.append(atomEntry) | |
325 | |
326 return atomEntries | |
327 | |
328 def constructFeed(service, nodeIdentifier, entries, title): | |
329 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) | |
330 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) | |
331 | |
332 # Collect the received entries in a feed | |
333 feed = domish.Element((NS_ATOM, 'feed')) | |
334 feed.addElement('title', content=title) | |
335 feed.addElement('id', content=nodeURI) | |
336 feed.addElement('updated', content=now) | |
337 | |
338 for entry in entries: | |
339 feed.addChild(entry) | |
340 | |
341 return feed | |
342 | |
343 class RemoteSubscriptionService(service.Service, PubSubClient): | |
344 """ | |
345 Service for subscribing to remote XMPP Publish-Subscribe nodes. | |
346 | |
347 Subscriptions are created with a callback HTTP URI that is POSTed | |
348 to with the received items in notifications. | |
349 """ | |
350 | |
351 def __init__(self, jid): | |
352 self.jid = jid | |
353 | |
354 | |
355 def startService(self): | |
356 self.callbacks = {} | |
357 | |
358 | |
359 def trapNotFound(self, failure): | |
360 failure.trap(StanzaError) | |
361 if not failure.value.condition == 'item-not-found': | |
362 raise failure | |
363 raise error.NodeNotFound | |
364 | |
365 | |
366 def subscribeCallback(self, jid, nodeIdentifier, callback): | |
367 | |
368 def newCallbackList(result): | |
369 callbackList = set() | |
370 self.callbacks[jid, nodeIdentifier] = callbackList | |
371 return callbackList | |
372 | |
373 try: | |
374 callbackList = self.callbacks[jid, nodeIdentifier] | |
375 except KeyError: | |
376 d = self.subscribe(jid, nodeIdentifier, self.jid) | |
377 d.addCallback(newCallbackList) | |
378 else: | |
379 d = defer.succeed(callbackList) | |
380 | |
381 d.addCallback(lambda callbackList: callbackList.add(callback)) | |
382 d.addErrback(self.trapNotFound) | |
383 return d | |
384 | |
385 | |
386 def unsubscribeCallback(self, jid, nodeIdentifier, callback): | |
387 try: | |
388 callbackList = self.callbacks[jid, nodeIdentifier] | |
389 callbackList.remove(callback) | |
390 except KeyError: | |
391 return defer.fail(error.NotSubscribed()) | |
392 | |
393 if not callbackList: | |
394 self.unsubscribe(jid, nodeIdentifier, self.jid) | |
395 | |
396 return defer.succeed(None) | |
397 | |
398 | |
399 def itemsReceived(self, recipient, service, nodeIdentifier, items): | |
400 """ | |
401 Fire up HTTP client to do callback | |
402 """ | |
403 | |
404 atomEntries = extractAtomEntries(items) | |
405 | |
406 # Don't notify if there are no atom entries | |
407 if not atomEntries: | |
408 return | |
409 | |
410 if len(atomEntries) == 1: | |
411 contentType = 'application/atom+xml;type=entry' | |
412 payload = atomEntries[0] | |
413 else: | |
414 contentType = 'application/atom+xml;type=feed' | |
415 payload = constructFeed(service, nodeIdentifier, atomEntries, | |
416 title='Received item collection') | |
417 | |
418 self.callCallbacks(recipient, service, nodeIdentifier, payload, | |
419 contentType) | |
420 | |
421 | |
422 def deleteReceived(self, recipient, service, nodeIdentifier): | |
423 """ | |
424 Fire up HTTP client to do callback | |
425 """ | |
426 | |
427 self.callCallbacks(recipient, service, nodeIdentifier, | |
428 eventType='DELETED') | |
429 | |
430 | |
431 def callCallbacks(self, recipient, service, nodeIdentifier, | |
432 payload=None, contentType=None, eventType=None): | |
433 try: | |
434 callbacks = self.callbacks[service, nodeIdentifier] | |
435 except KeyError: | |
436 return | |
437 | |
438 postdata = None | |
439 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier) | |
440 headers = {'Referer': nodeURI.encode('utf-8'), | |
441 'PubSub-Service': service.full().encode('utf-8')} | |
442 | |
443 if payload: | |
444 postdata = payload.toXml().encode('utf-8') | |
445 if contentType: | |
446 headers['Content-Type'] = "%s;charset=utf-8" % contentType | |
447 | |
448 if eventType: | |
449 headers['Event'] = eventType | |
450 | |
451 def postNotification(callbackURI): | |
452 d = client.getPage(str(callbackURI), | |
453 method='POST', | |
454 postdata=postdata, | |
455 headers=headers) | |
456 d.addErrback(log.err) | |
457 | |
458 for callbackURI in callbacks: | |
459 reactor.callLater(0, postNotification, callbackURI) | |
460 | |
461 | |
462 | |
463 class RemoteSubscribeBaseResource(resource.Resource): | |
394 """ | 464 """ |
395 Base resource for remote pubsub node subscription and unsubscription. | 465 Base resource for remote pubsub node subscription and unsubscription. |
396 | 466 |
397 This resource accepts POST request with a JSON document that holds | 467 This resource accepts POST request with a JSON document that holds |
398 a dictionary with the keys C{uri} and C{callback} that respectively map | 468 a dictionary with the keys C{uri} and C{callback} that respectively map |
409 | 479 |
410 def __init__(self, service): | 480 def __init__(self, service): |
411 self.service = service | 481 self.service = service |
412 self.params = None | 482 self.params = None |
413 | 483 |
484 | |
414 http_GET = None | 485 http_GET = None |
486 | |
415 | 487 |
416 def http_POST(self, request): | 488 def http_POST(self, request): |
417 def trapNotFound(failure): | 489 def trapNotFound(failure): |
418 failure.trap(error.NodeNotFound) | 490 failure.trap(error.NodeNotFound) |
419 return http.StatusResponse(responsecode.NOT_FOUND, | 491 return http.StatusResponse(responsecode.NOT_FOUND, |
445 d.addErrback(trapNotFound) | 517 d.addErrback(trapNotFound) |
446 d.addErrback(trapXMPPURIParseError) | 518 d.addErrback(trapXMPPURIParseError) |
447 return d | 519 return d |
448 | 520 |
449 | 521 |
450 class SubscribeResource(SubscribeBaseResource): | 522 |
523 class RemoteSubscribeResource(RemoteSubscribeBaseResource): | |
451 """ | 524 """ |
452 Resource to subscribe to a remote publish-subscribe node. | 525 Resource to subscribe to a remote publish-subscribe node. |
453 | 526 |
454 The passed C{uri} is the XMPP URI of the node to subscribe to and the | 527 The passed C{uri} is the XMPP URI of the node to subscribe to and the |
455 C{callback} is the callback URI. Upon receiving notifications from the | 528 C{callback} is the callback URI. Upon receiving notifications from the |
456 node, a POST request will be perfomed on the callback URI. | 529 node, a POST request will be perfomed on the callback URI. |
457 """ | 530 """ |
458 serviceMethod = 'subscribeCallback' | 531 serviceMethod = 'subscribeCallback' |
459 | 532 |
460 | 533 |
461 class UnsubscribeResource(SubscribeBaseResource): | 534 |
535 class RemoteUnsubscribeResource(RemoteSubscribeBaseResource): | |
462 """ | 536 """ |
463 Resource to unsubscribe from a remote publish-subscribe node. | 537 Resource to unsubscribe from a remote publish-subscribe node. |
464 | 538 |
465 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the | 539 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the |
466 C{callback} is the callback URI that was registered for it. | 540 C{callback} is the callback URI that was registered for it. |
467 """ | 541 """ |
468 serviceMethod = 'unsubscribeCallback' | 542 serviceMethod = 'unsubscribeCallback' |
469 | 543 |
470 | 544 |
471 class ListResource(resource.Resource): | 545 |
546 class RemoteItemsResource(resource.Resource): | |
547 """ | |
548 Resource for retrieving items from a remote pubsub node. | |
549 """ | |
550 | |
472 def __init__(self, service): | 551 def __init__(self, service): |
473 self.service = service | 552 self.service = service |
474 | 553 |
475 def render(self, request): | 554 def render(self, request): |
476 def responseFromNodes(nodeIdentifiers): | 555 try: |
477 return http.Response(responsecode.OK, | 556 maxItems = int(request.args.get('max_items', [0])[0]) or None |
478 stream=simplejson.dumps(nodeIdentifiers)) | 557 except ValueError: |
479 | 558 return http.StatusResponse(responsecode.BAD_REQUEST, |
480 d = self.service.get_nodes() | 559 "The argument max_items has an invalid value.") |
481 d.addCallback(responseFromNodes) | 560 |
482 return d | 561 try: |
483 | 562 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) |
563 except KeyError: | |
564 return http.StatusResponse(responsecode.BAD_REQUEST, | |
565 "No URI for the remote node provided.") | |
566 except XMPPURIParseError: | |
567 return http.StatusResponse(responsecode.BAD_REQUEST, | |
568 "Malformed XMPP URI: %s" % failure.value.message) | |
569 | |
570 | |
571 def respond(items): | |
572 """Create a feed out the retrieved items.""" | |
573 contentType = http_headers.MimeType('application', | |
574 'atom+xml', | |
575 {'type': 'feed'}) | |
576 atomEntries = extractAtomEntries(items) | |
577 feed = constructFeed(jid, nodeIdentifier, atomEntries, | |
578 "Retrieved item collection") | |
579 payload = feed.toXml().encode('utf-8') | |
580 return http.Response(responsecode.OK, stream=payload, | |
581 headers={'Content-Type': contentType}) | |
582 | |
583 def trapNotFound(failure): | |
584 failure.trap(StanzaError) | |
585 if not failure.value.condition == 'item-not-found': | |
586 raise failure | |
587 return http.StatusResponse(responsecode.NOT_FOUND, | |
588 "Node not found") | |
589 | |
590 d = self.service.items(jid, nodeIdentifier) | |
591 d.addCallback(respond) | |
592 d.addErrback(trapNotFound) | |
593 return d | |
594 | |
595 | |
596 | |
597 # Client side code to interact with a service as provided above | |
484 | 598 |
485 def getPageWithFactory(url, contextFactory=None, *args, **kwargs): | 599 def getPageWithFactory(url, contextFactory=None, *args, **kwargs): |
486 """Download a web page. | 600 """Download a web page. |
487 | 601 |
488 Download a page. Return the factory that holds a deferred, which will | 602 Download a page. Return the factory that holds a deferred, which will |
504 else: | 618 else: |
505 reactor.connectTCP(host, port, factory) | 619 reactor.connectTCP(host, port, factory) |
506 return factory | 620 return factory |
507 | 621 |
508 | 622 |
623 | |
509 class CallbackResource(resource.Resource): | 624 class CallbackResource(resource.Resource): |
510 """ | 625 """ |
511 Web resource for retrieving gateway notifications. | 626 Web resource for retrieving gateway notifications. |
512 """ | 627 """ |
513 | 628 |
514 def __init__(self, callback): | 629 def __init__(self, callback): |
515 self.callback = callback | 630 self.callback = callback |
516 | 631 |
632 | |
517 http_GET = None | 633 http_GET = None |
634 | |
518 | 635 |
519 def http_POST(self, request): | 636 def http_POST(self, request): |
520 p = WebStreamParser() | 637 p = WebStreamParser() |
521 d = p.parse(request.stream) | 638 d = p.parse(request.stream) |
522 d.addCallback(self.callback, request.headers) | 639 d.addCallback(self.callback, request.headers) |
523 d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) | 640 d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) |
524 return d | 641 return d |
642 | |
643 | |
525 | 644 |
526 class GatewayClient(service.Service): | 645 class GatewayClient(service.Service): |
527 """ | 646 """ |
528 Service that provides client access to the HTTP Gateway into Idavoll. | 647 Service that provides client access to the HTTP Gateway into Idavoll. |
529 """ | 648 """ |
536 self.callbackPort = callbackPort or 8087 | 655 self.callbackPort = callbackPort or 8087 |
537 root = resource.Resource() | 656 root = resource.Resource() |
538 root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) | 657 root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) |
539 self.site = server.Site(root) | 658 self.site = server.Site(root) |
540 | 659 |
660 | |
541 def startService(self): | 661 def startService(self): |
542 self.port = reactor.listenTCP(self.callbackPort, | 662 self.port = reactor.listenTCP(self.callbackPort, |
543 channel.HTTPFactory(self.site)) | 663 channel.HTTPFactory(self.site)) |
544 | 664 |
665 | |
545 def stopService(self): | 666 def stopService(self): |
546 return self.port.stopListening() | 667 return self.port.stopListening() |
668 | |
547 | 669 |
548 def _makeURI(self, verb, query=None): | 670 def _makeURI(self, verb, query=None): |
549 uriComponents = urlparse.urlparse(self.baseURI) | 671 uriComponents = urlparse.urlparse(self.baseURI) |
550 uri = urlparse.urlunparse((uriComponents[0], | 672 uri = urlparse.urlunparse((uriComponents[0], |
551 uriComponents[1], | 673 uriComponents[1], |
553 '', | 675 '', |
554 query and urllib.urlencode(query) or '', | 676 query and urllib.urlencode(query) or '', |
555 '')) | 677 '')) |
556 return uri | 678 return uri |
557 | 679 |
680 | |
558 def callback(self, data, headers): | 681 def callback(self, data, headers): |
559 pass | 682 pass |
683 | |
560 | 684 |
561 def create(self): | 685 def create(self): |
562 f = getPageWithFactory(self._makeURI('create'), | 686 f = getPageWithFactory(self._makeURI('create'), |
563 method='POST', | 687 method='POST', |
564 agent=self.agent) | 688 agent=self.agent) |
565 return f.deferred.addCallback(simplejson.loads) | 689 return f.deferred.addCallback(simplejson.loads) |
690 | |
566 | 691 |
567 def publish(self, entry, xmppURI=None): | 692 def publish(self, entry, xmppURI=None): |
568 query = xmppURI and {'uri': xmppURI} | 693 query = xmppURI and {'uri': xmppURI} |
569 | 694 |
570 f = getPageWithFactory(self._makeURI('publish', query), | 695 f = getPageWithFactory(self._makeURI('publish', query), |
572 postdata=entry.toXml().encode('utf-8'), | 697 postdata=entry.toXml().encode('utf-8'), |
573 headers={'Content-Type': MIME_ATOM_ENTRY}, | 698 headers={'Content-Type': MIME_ATOM_ENTRY}, |
574 agent=self.agent) | 699 agent=self.agent) |
575 return f.deferred.addCallback(simplejson.loads) | 700 return f.deferred.addCallback(simplejson.loads) |
576 | 701 |
702 | |
577 def listNodes(self): | 703 def listNodes(self): |
578 f = getPageWithFactory(self._makeURI('list'), | 704 f = getPageWithFactory(self._makeURI('list'), |
579 method='GET', | 705 method='GET', |
580 agent=self.agent) | 706 agent=self.agent) |
581 return f.deferred.addCallback(simplejson.loads) | 707 return f.deferred.addCallback(simplejson.loads) |
708 | |
582 | 709 |
583 def subscribe(self, xmppURI): | 710 def subscribe(self, xmppURI): |
584 params = {'uri': xmppURI, | 711 params = {'uri': xmppURI, |
585 'callback': 'http://%s:%s/callback' % (self.callbackHost, | 712 'callback': 'http://%s:%s/callback' % (self.callbackHost, |
586 self.callbackPort)} | 713 self.callbackPort)} |
588 method='POST', | 715 method='POST', |
589 postdata=simplejson.dumps(params), | 716 postdata=simplejson.dumps(params), |
590 headers={'Content-Type': MIME_JSON}, | 717 headers={'Content-Type': MIME_JSON}, |
591 agent=self.agent) | 718 agent=self.agent) |
592 return f.deferred | 719 return f.deferred |
720 | |
721 | |
722 def items(self, xmppURI, maxItems=None): | |
723 query = {'uri': xmppURI} | |
724 if maxItems: | |
725 query['maxItems'] = int(maxItems) | |
726 f = getPageWithFactory(self._makeURI('items', query), | |
727 method='GET', | |
728 agent=self.agent) | |
729 return f.deferred |