Mercurial > libervia-pubsub
comparison sat_pubsub/gateway.py @ 273:6ba0d6def7f5
Use twisted.web instead of web2, initial tests.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Sun, 20 Jan 2013 13:38:41 +0100 |
parents | d55620ceafed |
children | 6641ea7990ee |
comparison
equal
deleted
inserted
replaced
272:558a43366c9f | 273:6ba0d6def7f5 |
---|---|
54 | 54 |
55 """ | 55 """ |
56 Web resources and client for interacting with pubsub services. | 56 Web resources and client for interacting with pubsub services. |
57 """ | 57 """ |
58 | 58 |
59 import cgi | 59 import mimetools |
60 from time import gmtime, strftime | 60 from time import gmtime, strftime |
61 from StringIO import StringIO | |
61 import urllib | 62 import urllib |
62 import urlparse | 63 import urlparse |
63 | 64 |
64 import simplejson | 65 import simplejson |
65 | 66 |
66 from twisted.application import service | 67 from twisted.application import service |
67 from twisted.internet import defer, reactor | 68 from twisted.internet import defer, reactor |
68 from twisted.python import log | 69 from twisted.python import log |
69 from twisted.web import client | 70 from twisted.web import client, http, resource, server |
70 from twisted.web2 import http, http_headers, resource, responsecode | 71 from twisted.web.error import Error |
71 from twisted.web2 import channel, server | |
72 from twisted.web2.stream import readStream | |
73 from twisted.words.protocols.jabber.jid import JID | 72 from twisted.words.protocols.jabber.jid import JID |
74 from twisted.words.protocols.jabber.error import StanzaError | 73 from twisted.words.protocols.jabber.error import StanzaError |
75 from twisted.words.xish import domish | 74 from twisted.words.xish import domish |
76 | 75 |
76 from wokkel.generic import parseXml | |
77 from wokkel.pubsub import Item | 77 from wokkel.pubsub import Item |
78 from wokkel.pubsub import PubSubClient | 78 from wokkel.pubsub import PubSubClient |
79 | 79 |
80 from sat_pubsub import error | 80 from sat_pubsub import error |
81 | 81 |
82 NS_ATOM = 'http://www.w3.org/2005/Atom' | 82 NS_ATOM = 'http://www.w3.org/2005/Atom' |
83 MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' | 83 MIME_ATOM_ENTRY = b'application/atom+xml;type=entry' |
84 MIME_JSON = 'application/json' | 84 MIME_ATOM_FEED = b'application/atom+xml;type=feed' |
85 MIME_JSON = b'application/json' | |
85 | 86 |
86 class XMPPURIParseError(ValueError): | 87 class XMPPURIParseError(ValueError): |
87 """ | 88 """ |
88 Raised when a given XMPP URI couldn't be properly parsed. | 89 Raised when a given XMPP URI couldn't be properly parsed. |
89 """ | 90 """ |
107 raise XMPPURIParseError("Unexpected URI authority component") | 108 raise XMPPURIParseError("Unexpected URI authority component") |
108 | 109 |
109 try: | 110 try: |
110 entity, query = rest.split('?', 1) | 111 entity, query = rest.split('?', 1) |
111 except ValueError: | 112 except ValueError: |
112 raise XMPPURIParseError("No URI query component") | 113 entity, query = rest, '' |
113 | 114 |
114 if not entity: | 115 if not entity: |
115 raise XMPPURIParseError("Empty URI path component") | 116 raise XMPPURIParseError("Empty URI path component") |
116 | 117 |
117 try: | 118 try: |
118 service = JID(entity) | 119 service = JID(entity) |
119 except Exception, e: | 120 except Exception, e: |
120 raise XMPPURIParseError("Invalid JID: %s" % e) | 121 raise XMPPURIParseError("Invalid JID: %s" % e) |
121 | 122 |
122 params = cgi.parse_qs(query) | 123 params = urlparse.parse_qs(query) |
123 | 124 |
124 try: | 125 try: |
125 nodeIdentifier = params['node'][0] | 126 nodeIdentifier = params['node'][0] |
126 except (KeyError, ValueError): | 127 except (KeyError, ValueError): |
127 nodeIdentifier = '' | 128 nodeIdentifier = '' |
136 """ | 137 """ |
137 return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') | 138 return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') |
138 | 139 |
139 | 140 |
140 | 141 |
141 class WebStreamParser(object): | 142 def _parseContentType(header): |
142 def __init__(self): | 143 """ |
143 self.elementStream = domish.elementStream() | 144 Parse a Content-Type header value to a L{mimetools.Message}. |
144 self.elementStream.DocumentStartEvent = self.docStart | 145 |
145 self.elementStream.ElementEvent = self.elem | 146 L{mimetools.Message} parses a Content-Type header and makes the |
146 self.elementStream.DocumentEndEvent = self.docEnd | 147 components available with its C{getmaintype}, C{getsubtype}, C{gettype}, |
147 self.done = False | 148 C{getplist} and C{getparam} methods. |
148 | 149 """ |
149 | 150 return mimetools.Message(StringIO(b'Content-Type: ' + header)) |
150 def docStart(self, elem): | 151 |
151 self.document = elem | 152 |
152 | 153 |
153 | 154 def _asyncResponse(render): |
154 def elem(self, elem): | 155 """ |
155 self.document.addChild(elem) | 156 """ |
156 | 157 def wrapped(self, request): |
157 | 158 def eb(failure): |
158 def docEnd(self): | 159 if failure.check(Error): |
159 self.done = True | 160 err = failure.value |
160 | |
161 | |
162 def parse(self, stream): | |
163 def endOfStream(result): | |
164 if not self.done: | |
165 raise Exception("No more stuff?") | |
166 else: | 161 else: |
167 return self.document | 162 log.err(failure) |
168 | 163 err = Error(500) |
169 d = readStream(stream, self.elementStream.parse) | 164 request.setResponseCode(err.status, err.message) |
170 d.addCallback(endOfStream) | 165 return err.response |
171 return d | 166 |
167 def finish(result): | |
168 if result is server.NOT_DONE_YET: | |
169 return | |
170 | |
171 if result: | |
172 request.write(result) | |
173 request.finish() | |
174 | |
175 d = defer.maybeDeferred(render, self, request) | |
176 d.addErrback(eb) | |
177 d.addCallback(finish) | |
178 | |
179 return server.NOT_DONE_YET | |
180 | |
181 return wrapped | |
172 | 182 |
173 | 183 |
174 | 184 |
175 class CreateResource(resource.Resource): | 185 class CreateResource(resource.Resource): |
176 """ | 186 """ |
183 | 193 |
184 | 194 |
185 http_GET = None | 195 http_GET = None |
186 | 196 |
187 | 197 |
188 def http_POST(self, request): | 198 @_asyncResponse |
199 def render_POST(self, request): | |
189 """ | 200 """ |
190 Respond to a POST request to create a new node. | 201 Respond to a POST request to create a new node. |
191 """ | 202 """ |
192 | 203 |
193 def toResponse(nodeIdentifier): | 204 def toResponse(nodeIdentifier): |
194 uri = getXMPPURI(self.serviceJID, nodeIdentifier) | 205 uri = getXMPPURI(self.serviceJID, nodeIdentifier) |
195 stream = simplejson.dumps({'uri': uri}) | 206 body = simplejson.dumps({'uri': uri}) |
196 contentType = http_headers.MimeType.fromString(MIME_JSON) | 207 request.setHeader(b'Content-Type', MIME_JSON) |
197 return http.Response(responsecode.OK, stream=stream, | 208 return body |
198 headers={'Content-Type': contentType}) | 209 |
199 d = self.backend.createNode(None, self.owner) | 210 d = self.backend.createNode(None, self.owner) |
200 d.addCallback(toResponse) | 211 d.addCallback(toResponse) |
201 return d | 212 return d |
202 | 213 |
203 | 214 |
210 self.backend = backend | 221 self.backend = backend |
211 self.serviceJID = serviceJID | 222 self.serviceJID = serviceJID |
212 self.owner = owner | 223 self.owner = owner |
213 | 224 |
214 | 225 |
215 http_GET = None | 226 render_GET = None |
216 | 227 |
217 | 228 |
218 def http_POST(self, request): | 229 @_asyncResponse |
230 def render_POST(self, request): | |
219 """ | 231 """ |
220 Respond to a POST request to create a new node. | 232 Respond to a POST request to create a new node. |
221 """ | 233 """ |
222 | 234 def toResponse(result): |
223 def gotStream(_): | 235 request.setResponseCode(http.NO_CONTENT) |
224 if request.args.get('uri'): | |
225 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) | |
226 return defer.succeed(nodeIdentifier) | |
227 else: | |
228 raise http.HTTPError(http.Response(responsecode.BAD_REQUEST, | |
229 "No URI given")) | |
230 | |
231 def doDelete(nodeIdentifier, data): | |
232 if data: | |
233 params = simplejson.loads(''.join(data)) | |
234 redirectURI = params.get('redirect_uri') | |
235 else: | |
236 redirectURI = None | |
237 | |
238 return self.backend.deleteNode(nodeIdentifier, self.owner, | |
239 redirectURI) | |
240 | |
241 def respond(result): | |
242 return http.Response(responsecode.NO_CONTENT) | |
243 | |
244 | 236 |
245 def trapNotFound(failure): | 237 def trapNotFound(failure): |
246 failure.trap(error.NodeNotFound) | 238 failure.trap(error.NodeNotFound) |
247 return http.StatusResponse(responsecode.NOT_FOUND, | 239 raise Error(http.NOT_FOUND, "Node not found") |
248 "Node not found") | |
249 | 240 |
250 def trapXMPPURIParseError(failure): | 241 def trapXMPPURIParseError(failure): |
251 failure.trap(XMPPURIParseError) | 242 failure.trap(XMPPURIParseError) |
252 return http.StatusResponse(responsecode.BAD_REQUEST, | 243 raise Error(http.BAD_REQUEST, |
253 "Malformed XMPP URI: %s" % failure.value) | 244 "Malformed XMPP URI: %s" % failure.value) |
254 | 245 |
255 data = [] | 246 if not request.args.get('uri'): |
256 d = readStream(request.stream, data.append) | 247 raise Error(http.BAD_REQUEST, "No URI given") |
257 d.addCallback(gotStream) | 248 |
258 d.addCallback(doDelete, data) | 249 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) |
259 d.addCallback(respond) | 250 |
251 data = request.content.read() | |
252 if data: | |
253 params = simplejson.loads(data) | |
254 redirectURI = params.get('redirect_uri', None) | |
255 else: | |
256 redirectURI = None | |
257 | |
258 d = self.backend.deleteNode(nodeIdentifier, self.owner, | |
259 redirectURI) | |
260 d.addCallback(toResponse) | |
260 d.addErrback(trapNotFound) | 261 d.addErrback(trapNotFound) |
261 d.addErrback(trapXMPPURIParseError) | 262 d.addErrback(trapXMPPURIParseError) |
262 return d | 263 return d |
263 | 264 |
264 | 265 |
272 self.backend = backend | 273 self.backend = backend |
273 self.serviceJID = serviceJID | 274 self.serviceJID = serviceJID |
274 self.owner = owner | 275 self.owner = owner |
275 | 276 |
276 | 277 |
277 http_GET = None | 278 render_GET = None |
278 | 279 |
279 | 280 |
280 def checkMediaType(self, request): | 281 def checkMediaType(self, request): |
281 ctype = request.headers.getHeader('content-type') | 282 ctype = request.getHeader(b'content-type') |
282 | 283 |
283 if not ctype: | 284 if not ctype: |
284 raise http.HTTPError( | 285 request.setResponseCode(http.BAD_REQUEST) |
285 http.StatusResponse( | 286 |
286 responsecode.BAD_REQUEST, | 287 raise Error(http.BAD_REQUEST, b"No specified Media Type") |
287 "No specified Media Type")) | 288 |
288 | 289 message = _parseContentType(ctype) |
289 if (ctype.mediaType != 'application' or | 290 if (message.maintype != b'application' or |
290 ctype.mediaSubtype != 'atom+xml' or | 291 message.subtype != b'atom+xml' or |
291 ctype.params.get('type') != 'entry' or | 292 message.getparam(b'type') != b'entry' or |
292 ctype.params.get('charset', 'utf-8') != 'utf-8'): | 293 (message.getparam(b'charset') or b'utf-8') != b'utf-8'): |
293 raise http.HTTPError( | 294 raise Error(http.UNSUPPORTED_MEDIA_TYPE, |
294 http.StatusResponse( | 295 b"Unsupported Media Type: %s" % ctype) |
295 responsecode.UNSUPPORTED_MEDIA_TYPE, | 296 |
296 "Unsupported Media Type: %s" % | 297 |
297 http_headers.generateContentType(ctype))) | 298 @_asyncResponse |
298 | 299 def render_POST(self, request): |
299 | |
300 def parseXMLPayload(self, stream): | |
301 p = WebStreamParser() | |
302 return p.parse(stream) | |
303 | |
304 | |
305 def http_POST(self, request): | |
306 """ | 300 """ |
307 Respond to a POST request to create a new item. | 301 Respond to a POST request to create a new item. |
308 """ | 302 """ |
309 | 303 |
310 def toResponse(nodeIdentifier): | 304 def toResponse(nodeIdentifier): |
311 uri = getXMPPURI(self.serviceJID, nodeIdentifier) | 305 uri = getXMPPURI(self.serviceJID, nodeIdentifier) |
312 stream = simplejson.dumps({'uri': uri}) | 306 body = simplejson.dumps({'uri': uri}) |
313 contentType = http_headers.MimeType.fromString(MIME_JSON) | 307 request.setHeader(b'Content-Type', MIME_JSON) |
314 return http.Response(responsecode.OK, stream=stream, | 308 return body |
315 headers={'Content-Type': contentType}) | |
316 | 309 |
317 def gotNode(nodeIdentifier, payload): | 310 def gotNode(nodeIdentifier, payload): |
318 item = Item(id='current', payload=payload) | 311 item = Item(id='current', payload=payload) |
319 d = self.backend.publish(nodeIdentifier, [item], self.owner) | 312 d = self.backend.publish(nodeIdentifier, [item], self.owner) |
320 d.addCallback(lambda _: nodeIdentifier) | 313 d.addCallback(lambda _: nodeIdentifier) |
325 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) | 318 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) |
326 return defer.succeed(nodeIdentifier) | 319 return defer.succeed(nodeIdentifier) |
327 else: | 320 else: |
328 return self.backend.createNode(None, self.owner) | 321 return self.backend.createNode(None, self.owner) |
329 | 322 |
330 def doPublish(payload): | |
331 d = getNode() | |
332 d.addCallback(gotNode, payload) | |
333 return d | |
334 | |
335 def trapNotFound(failure): | 323 def trapNotFound(failure): |
336 failure.trap(error.NodeNotFound) | 324 failure.trap(error.NodeNotFound) |
337 return http.StatusResponse(responsecode.NOT_FOUND, | 325 raise Error(http.NOT_FOUND, "Node not found") |
338 "Node not found") | |
339 | 326 |
340 def trapXMPPURIParseError(failure): | 327 def trapXMPPURIParseError(failure): |
341 failure.trap(XMPPURIParseError) | 328 failure.trap(XMPPURIParseError) |
342 return http.StatusResponse(responsecode.BAD_REQUEST, | 329 raise Error(http.BAD_REQUEST, |
343 "Malformed XMPP URI: %s" % failure.value) | 330 "Malformed XMPP URI: %s" % failure.value) |
344 | 331 |
345 self.checkMediaType(request) | 332 self.checkMediaType(request) |
346 d = self.parseXMLPayload(request.stream) | 333 payload = parseXml(request.content.read()) |
347 d.addCallback(doPublish) | 334 d = getNode() |
335 d.addCallback(gotNode, payload) | |
348 d.addCallback(toResponse) | 336 d.addCallback(toResponse) |
349 d.addErrback(trapNotFound) | 337 d.addErrback(trapNotFound) |
350 d.addErrback(trapXMPPURIParseError) | 338 d.addErrback(trapXMPPURIParseError) |
351 return d | 339 return d |
352 | 340 |
355 class ListResource(resource.Resource): | 343 class ListResource(resource.Resource): |
356 def __init__(self, service): | 344 def __init__(self, service): |
357 self.service = service | 345 self.service = service |
358 | 346 |
359 | 347 |
360 def render(self, request): | 348 @_asyncResponse |
349 def render_GET(self, request): | |
361 def responseFromNodes(nodeIdentifiers): | 350 def responseFromNodes(nodeIdentifiers): |
362 stream = simplejson.dumps(nodeIdentifiers) | 351 body = simplejson.dumps(nodeIdentifiers) |
363 contentType = http_headers.MimeType.fromString(MIME_JSON) | 352 request.setHeader(b'Content-Type', MIME_JSON) |
364 return http.Response(responsecode.OK, stream=stream, | 353 return body |
365 headers={'Content-Type': contentType}) | |
366 | 354 |
367 d = self.service.getNodes() | 355 d = self.service.getNodes() |
368 d.addCallback(responseFromNodes) | 356 d.addCallback(responseFromNodes) |
369 return d | 357 return d |
370 | 358 |
375 def extractAtomEntries(items): | 363 def extractAtomEntries(items): |
376 """ | 364 """ |
377 Extract atom entries from a list of publish-subscribe items. | 365 Extract atom entries from a list of publish-subscribe items. |
378 | 366 |
379 @param items: List of L{domish.Element}s that represent publish-subscribe | 367 @param items: List of L{domish.Element}s that represent publish-subscribe |
380 items. | 368 items. |
381 @type items: C{list} | 369 @type items: C{list} |
382 """ | 370 """ |
383 | 371 |
384 atomEntries = [] | 372 atomEntries = [] |
385 | 373 |
459 | 447 |
460 if not atomEntries: | 448 if not atomEntries: |
461 return | 449 return |
462 | 450 |
463 self._postTo([callback], jid, nodeIdentifier, atomEntries[0], | 451 self._postTo([callback], jid, nodeIdentifier, atomEntries[0], |
464 'application/atom+xml;type=entry') | 452 MIME_ATOM_ENTRY) |
465 | 453 |
466 def subscribeOrItems(hasCallbacks): | 454 def subscribeOrItems(hasCallbacks): |
467 if hasCallbacks: | 455 if hasCallbacks: |
468 if not nodeIdentifier: | 456 if not nodeIdentifier: |
469 return None | 457 return None |
512 # Don't notify if there are no atom entries | 500 # Don't notify if there are no atom entries |
513 if not atomEntries: | 501 if not atomEntries: |
514 return | 502 return |
515 | 503 |
516 if len(atomEntries) == 1: | 504 if len(atomEntries) == 1: |
517 contentType = 'application/atom+xml;type=entry' | 505 contentType = MIME_ATOM_ENTRY |
518 payload = atomEntries[0] | 506 payload = atomEntries[0] |
519 else: | 507 else: |
520 contentType = 'application/atom+xml;type=feed' | 508 contentType = MIME_ATOM_FEED |
521 payload = constructFeed(service, nodeIdentifier, atomEntries, | 509 payload = constructFeed(service, nodeIdentifier, atomEntries, |
522 title='Received item collection') | 510 title='Received item collection') |
523 | 511 |
524 self.callCallbacks(service, nodeIdentifier, payload, contentType) | 512 self.callCallbacks(service, nodeIdentifier, payload, contentType) |
525 | 513 |
612 request to this resource. | 600 request to this resource. |
613 """ | 601 """ |
614 serviceMethod = None | 602 serviceMethod = None |
615 errorMap = { | 603 errorMap = { |
616 error.NodeNotFound: | 604 error.NodeNotFound: |
617 (responsecode.FORBIDDEN, "Node not found"), | 605 (http.FORBIDDEN, "Node not found"), |
618 error.NotSubscribed: | 606 error.NotSubscribed: |
619 (responsecode.FORBIDDEN, "No such subscription found"), | 607 (http.FORBIDDEN, "No such subscription found"), |
620 error.SubscriptionExists: | 608 error.SubscriptionExists: |
621 (responsecode.FORBIDDEN, "Subscription already exists"), | 609 (http.FORBIDDEN, "Subscription already exists"), |
622 } | 610 } |
623 | 611 |
624 def __init__(self, service): | 612 def __init__(self, service): |
625 self.service = service | 613 self.service = service |
626 self.params = None | 614 self.params = None |
627 | 615 |
628 | 616 |
629 http_GET = None | 617 render_GET = None |
630 | 618 |
631 | 619 |
632 def http_POST(self, request): | 620 @_asyncResponse |
621 def render_POST(self, request): | |
633 def trapNotFound(failure): | 622 def trapNotFound(failure): |
634 err = failure.trap(*self.errorMap.keys()) | 623 err = failure.trap(*self.errorMap.keys()) |
635 code, msg = self.errorMap[err] | 624 status, message = self.errorMap[err] |
636 return http.StatusResponse(code, msg) | 625 raise Error(status, message) |
637 | 626 |
638 def respond(result): | 627 def toResponse(result): |
639 return http.Response(responsecode.NO_CONTENT) | 628 request.setResponseCode(http.NO_CONTENT) |
640 | 629 return b'' |
641 def gotRequest(result): | |
642 uri = self.params['uri'] | |
643 callback = self.params['callback'] | |
644 | |
645 jid, nodeIdentifier = getServiceAndNode(uri) | |
646 method = getattr(self.service, self.serviceMethod) | |
647 d = method(jid, nodeIdentifier, callback) | |
648 return d | |
649 | |
650 def storeParams(data): | |
651 self.params = simplejson.loads(data) | |
652 | 630 |
653 def trapXMPPURIParseError(failure): | 631 def trapXMPPURIParseError(failure): |
654 failure.trap(XMPPURIParseError) | 632 failure.trap(XMPPURIParseError) |
655 return http.StatusResponse(responsecode.BAD_REQUEST, | 633 raise Error(http.BAD_REQUEST, |
656 "Malformed XMPP URI: %s" % failure.value) | 634 "Malformed XMPP URI: %s" % failure.value) |
657 | 635 |
658 d = readStream(request.stream, storeParams) | 636 data = request.content.read() |
659 d.addCallback(gotRequest) | 637 self.params = simplejson.loads(data) |
660 d.addCallback(respond) | 638 |
639 uri = self.params['uri'] | |
640 callback = self.params['callback'] | |
641 | |
642 jid, nodeIdentifier = getServiceAndNode(uri) | |
643 method = getattr(self.service, self.serviceMethod) | |
644 d = method(jid, nodeIdentifier, callback) | |
645 d.addCallback(toResponse) | |
661 d.addErrback(trapNotFound) | 646 d.addErrback(trapNotFound) |
662 d.addErrback(trapXMPPURIParseError) | 647 d.addErrback(trapXMPPURIParseError) |
663 return d | 648 return d |
664 | 649 |
665 | 650 |
694 | 679 |
695 def __init__(self, service): | 680 def __init__(self, service): |
696 self.service = service | 681 self.service = service |
697 | 682 |
698 | 683 |
699 def render(self, request): | 684 @_asyncResponse |
685 def render_GET(self, request): | |
700 try: | 686 try: |
701 maxItems = int(request.args.get('max_items', [0])[0]) or None | 687 maxItems = int(request.args.get('max_items', [0])[0]) or None |
702 except ValueError: | 688 except ValueError: |
703 return http.StatusResponse(responsecode.BAD_REQUEST, | 689 raise Error(http.BAD_REQUEST, |
704 "The argument max_items has an invalid value.") | 690 "The argument max_items has an invalid value.") |
705 | 691 |
706 try: | 692 try: |
707 uri = request.args['uri'][0] | 693 uri = request.args['uri'][0] |
708 except KeyError: | 694 except KeyError: |
709 return http.StatusResponse(responsecode.BAD_REQUEST, | 695 raise Error(http.BAD_REQUEST, |
710 "No URI for the remote node provided.") | 696 "No URI for the remote node provided.") |
711 | 697 |
712 try: | 698 try: |
713 jid, nodeIdentifier = getServiceAndNode(uri) | 699 jid, nodeIdentifier = getServiceAndNode(uri) |
714 except XMPPURIParseError: | 700 except XMPPURIParseError: |
715 return http.StatusResponse(responsecode.BAD_REQUEST, | 701 raise Error(http.BAD_REQUEST, |
716 "Malformed XMPP URI: %s" % uri) | 702 "Malformed XMPP URI: %s" % uri) |
717 | 703 |
718 def respond(items): | 704 def toResponse(items): |
719 """Create a feed out the retrieved items.""" | 705 """ |
720 contentType = http_headers.MimeType('application', | 706 Create a feed out the retrieved items. |
721 'atom+xml', | 707 """ |
722 {'type': 'feed'}) | |
723 atomEntries = extractAtomEntries(items) | 708 atomEntries = extractAtomEntries(items) |
724 feed = constructFeed(jid, nodeIdentifier, atomEntries, | 709 feed = constructFeed(jid, nodeIdentifier, atomEntries, |
725 "Retrieved item collection") | 710 "Retrieved item collection") |
726 payload = feed.toXml().encode('utf-8') | 711 body = feed.toXml().encode('utf-8') |
727 return http.Response(responsecode.OK, stream=payload, | 712 request.setHeader(b'Content-Type', MIME_ATOM_FEED) |
728 headers={'Content-Type': contentType}) | 713 return body |
729 | 714 |
730 def trapNotFound(failure): | 715 def trapNotFound(failure): |
731 failure.trap(StanzaError) | 716 failure.trap(StanzaError) |
732 if not failure.value.condition == 'item-not-found': | 717 if not failure.value.condition == 'item-not-found': |
733 raise failure | 718 raise failure |
734 return http.StatusResponse(responsecode.NOT_FOUND, | 719 raise Error(http.NOT_FOUND, "Node not found") |
735 "Node not found") | |
736 | 720 |
737 d = self.service.items(jid, nodeIdentifier, maxItems) | 721 d = self.service.items(jid, nodeIdentifier, maxItems) |
738 d.addCallback(respond) | 722 d.addCallback(toResponse) |
739 d.addErrback(trapNotFound) | 723 d.addErrback(trapNotFound) |
740 return d | 724 return d |
741 | 725 |
742 | 726 |
743 | 727 |
778 | 762 |
779 | 763 |
780 http_GET = None | 764 http_GET = None |
781 | 765 |
782 | 766 |
783 def http_POST(self, request): | 767 def render_POST(self, request): |
784 p = WebStreamParser() | 768 if request.requestHeaders.hasHeader(b'Event'): |
785 if not request.headers.hasHeader('Event'): | 769 payload = None |
786 d = p.parse(request.stream) | |
787 else: | 770 else: |
788 d = defer.succeed(None) | 771 payload = parseXml(request.content.read()) |
789 d.addCallback(self.callback, request.headers) | 772 |
790 d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) | 773 self.callback(payload, request.requestHeaders) |
791 return d | 774 |
775 request.setResponseCode(http.NO_CONTENT) | |
776 return b'' | |
777 | |
792 | 778 |
793 | 779 |
794 | 780 |
795 class GatewayClient(service.Service): | 781 class GatewayClient(service.Service): |
796 """ | 782 """ |
802 def __init__(self, baseURI, callbackHost=None, callbackPort=None): | 788 def __init__(self, baseURI, callbackHost=None, callbackPort=None): |
803 self.baseURI = baseURI | 789 self.baseURI = baseURI |
804 self.callbackHost = callbackHost or 'localhost' | 790 self.callbackHost = callbackHost or 'localhost' |
805 self.callbackPort = callbackPort or 8087 | 791 self.callbackPort = callbackPort or 8087 |
806 root = resource.Resource() | 792 root = resource.Resource() |
807 root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) | 793 root.putChild('callback', CallbackResource( |
794 lambda *args, **kwargs: self.callback(*args, **kwargs))) | |
808 self.site = server.Site(root) | 795 self.site = server.Site(root) |
809 | 796 |
810 | 797 |
811 def startService(self): | 798 def startService(self): |
812 self.port = reactor.listenTCP(self.callbackPort, | 799 self.port = reactor.listenTCP(self.callbackPort, |
813 channel.HTTPFactory(self.site)) | 800 self.site) |
814 | 801 |
815 | 802 |
816 def stopService(self): | 803 def stopService(self): |
817 return self.port.stopListening() | 804 return self.port.stopListening() |
818 | 805 |