Mercurial > libervia-pubsub
comparison sat_pubsub/gateway.py @ 232:923281d4c5bc
renamed idavoll directory to sat_pubsub
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 17 May 2012 12:48:14 +0200 |
parents | idavoll/gateway.py@55b45c7dccb4 |
children | 564ae55219e1 |
comparison
equal
deleted
inserted
replaced
231:d99047cd90f9 | 232:923281d4c5bc |
---|---|
1 # -*- test-case-name: idavoll.test.test_gateway -*- | |
2 # | |
3 # Copyright (c) 2003-2009 Ralph Meijer | |
4 # See LICENSE for details. | |
5 | |
6 """ | |
7 Web resources and client for interacting with pubsub services. | |
8 """ | |
9 | |
10 import cgi | |
11 from time import gmtime, strftime | |
12 import urllib | |
13 import urlparse | |
14 | |
15 import simplejson | |
16 | |
17 from twisted.application import service | |
18 from twisted.internet import defer, reactor | |
19 from twisted.python import log | |
20 from twisted.web import client | |
21 from twisted.web2 import http, http_headers, resource, responsecode | |
22 from twisted.web2 import channel, server | |
23 from twisted.web2.stream import readStream | |
24 from twisted.words.protocols.jabber.jid import JID | |
25 from twisted.words.protocols.jabber.error import StanzaError | |
26 from twisted.words.xish import domish | |
27 | |
28 from wokkel.pubsub import Item | |
29 from wokkel.pubsub import PubSubClient | |
30 | |
31 from idavoll import error | |
32 | |
33 NS_ATOM = 'http://www.w3.org/2005/Atom' | |
34 MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' | |
35 MIME_JSON = 'application/json' | |
36 | |
37 class XMPPURIParseError(ValueError): | |
38 """ | |
39 Raised when a given XMPP URI couldn't be properly parsed. | |
40 """ | |
41 | |
42 | |
43 | |
44 def getServiceAndNode(uri): | |
45 """ | |
46 Given an XMPP URI, extract the publish subscribe service JID and node ID. | |
47 """ | |
48 | |
49 try: | |
50 scheme, rest = uri.split(':', 1) | |
51 except ValueError: | |
52 raise XMPPURIParseError("No URI scheme component") | |
53 | |
54 if scheme != 'xmpp': | |
55 raise XMPPURIParseError("Unknown URI scheme") | |
56 | |
57 if rest.startswith("//"): | |
58 raise XMPPURIParseError("Unexpected URI authority component") | |
59 | |
60 try: | |
61 entity, query = rest.split('?', 1) | |
62 except ValueError: | |
63 raise XMPPURIParseError("No URI query component") | |
64 | |
65 if not entity: | |
66 raise XMPPURIParseError("Empty URI path component") | |
67 | |
68 try: | |
69 service = JID(entity) | |
70 except Exception, e: | |
71 raise XMPPURIParseError("Invalid JID: %s" % e) | |
72 | |
73 params = cgi.parse_qs(query) | |
74 | |
75 try: | |
76 nodeIdentifier = params['node'][0] | |
77 except (KeyError, ValueError): | |
78 nodeIdentifier = '' | |
79 | |
80 return service, nodeIdentifier | |
81 | |
82 | |
83 | |
84 def getXMPPURI(service, nodeIdentifier): | |
85 """ | |
86 Construct an XMPP URI from a service JID and node identifier. | |
87 """ | |
88 return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') | |
89 | |
90 | |
91 | |
92 class WebStreamParser(object): | |
93 def __init__(self): | |
94 self.elementStream = domish.elementStream() | |
95 self.elementStream.DocumentStartEvent = self.docStart | |
96 self.elementStream.ElementEvent = self.elem | |
97 self.elementStream.DocumentEndEvent = self.docEnd | |
98 self.done = False | |
99 | |
100 | |
101 def docStart(self, elem): | |
102 self.document = elem | |
103 | |
104 | |
105 def elem(self, elem): | |
106 self.document.addChild(elem) | |
107 | |
108 | |
109 def docEnd(self): | |
110 self.done = True | |
111 | |
112 | |
113 def parse(self, stream): | |
114 def endOfStream(result): | |
115 if not self.done: | |
116 raise Exception("No more stuff?") | |
117 else: | |
118 return self.document | |
119 | |
120 d = readStream(stream, self.elementStream.parse) | |
121 d.addCallback(endOfStream) | |
122 return d | |
123 | |
124 | |
125 | |
126 class CreateResource(resource.Resource): | |
127 """ | |
128 A resource to create a publish-subscribe node. | |
129 """ | |
130 def __init__(self, backend, serviceJID, owner): | |
131 self.backend = backend | |
132 self.serviceJID = serviceJID | |
133 self.owner = owner | |
134 | |
135 | |
136 http_GET = None | |
137 | |
138 | |
139 def http_POST(self, request): | |
140 """ | |
141 Respond to a POST request to create a new node. | |
142 """ | |
143 | |
144 def toResponse(nodeIdentifier): | |
145 uri = getXMPPURI(self.serviceJID, nodeIdentifier) | |
146 stream = simplejson.dumps({'uri': uri}) | |
147 contentType = http_headers.MimeType.fromString(MIME_JSON) | |
148 return http.Response(responsecode.OK, stream=stream, | |
149 headers={'Content-Type': contentType}) | |
150 d = self.backend.createNode(None, self.owner) | |
151 d.addCallback(toResponse) | |
152 return d | |
153 | |
154 | |
155 | |
156 class DeleteResource(resource.Resource): | |
157 """ | |
158 A resource to create a publish-subscribe node. | |
159 """ | |
160 def __init__(self, backend, serviceJID, owner): | |
161 self.backend = backend | |
162 self.serviceJID = serviceJID | |
163 self.owner = owner | |
164 | |
165 | |
166 http_GET = None | |
167 | |
168 | |
169 def http_POST(self, request): | |
170 """ | |
171 Respond to a POST request to create a new node. | |
172 """ | |
173 | |
174 def gotStream(_): | |
175 if request.args.get('uri'): | |
176 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) | |
177 return defer.succeed(nodeIdentifier) | |
178 else: | |
179 raise http.HTTPError(http.Response(responsecode.BAD_REQUEST, | |
180 "No URI given")) | |
181 | |
182 def doDelete(nodeIdentifier, data): | |
183 if data: | |
184 params = simplejson.loads(''.join(data)) | |
185 redirectURI = params.get('redirect_uri') | |
186 else: | |
187 redirectURI = None | |
188 | |
189 return self.backend.deleteNode(nodeIdentifier, self.owner, | |
190 redirectURI) | |
191 | |
192 def respond(result): | |
193 return http.Response(responsecode.NO_CONTENT) | |
194 | |
195 | |
196 def trapNotFound(failure): | |
197 failure.trap(error.NodeNotFound) | |
198 return http.StatusResponse(responsecode.NOT_FOUND, | |
199 "Node not found") | |
200 | |
201 def trapXMPPURIParseError(failure): | |
202 failure.trap(XMPPURIParseError) | |
203 return http.StatusResponse(responsecode.BAD_REQUEST, | |
204 "Malformed XMPP URI: %s" % failure.value) | |
205 | |
206 data = [] | |
207 d = readStream(request.stream, data.append) | |
208 d.addCallback(gotStream) | |
209 d.addCallback(doDelete, data) | |
210 d.addCallback(respond) | |
211 d.addErrback(trapNotFound) | |
212 d.addErrback(trapXMPPURIParseError) | |
213 return d | |
214 | |
215 | |
216 | |
217 class PublishResource(resource.Resource): | |
218 """ | |
219 A resource to publish to a publish-subscribe node. | |
220 """ | |
221 | |
222 def __init__(self, backend, serviceJID, owner): | |
223 self.backend = backend | |
224 self.serviceJID = serviceJID | |
225 self.owner = owner | |
226 | |
227 | |
228 http_GET = None | |
229 | |
230 | |
231 def checkMediaType(self, request): | |
232 ctype = request.headers.getHeader('content-type') | |
233 | |
234 if not ctype: | |
235 raise http.HTTPError( | |
236 http.StatusResponse( | |
237 responsecode.BAD_REQUEST, | |
238 "No specified Media Type")) | |
239 | |
240 if (ctype.mediaType != 'application' or | |
241 ctype.mediaSubtype != 'atom+xml' or | |
242 ctype.params.get('type') != 'entry' or | |
243 ctype.params.get('charset', 'utf-8') != 'utf-8'): | |
244 raise http.HTTPError( | |
245 http.StatusResponse( | |
246 responsecode.UNSUPPORTED_MEDIA_TYPE, | |
247 "Unsupported Media Type: %s" % | |
248 http_headers.generateContentType(ctype))) | |
249 | |
250 | |
251 def parseXMLPayload(self, stream): | |
252 p = WebStreamParser() | |
253 return p.parse(stream) | |
254 | |
255 | |
256 def http_POST(self, request): | |
257 """ | |
258 Respond to a POST request to create a new item. | |
259 """ | |
260 | |
261 def toResponse(nodeIdentifier): | |
262 uri = getXMPPURI(self.serviceJID, nodeIdentifier) | |
263 stream = simplejson.dumps({'uri': uri}) | |
264 contentType = http_headers.MimeType.fromString(MIME_JSON) | |
265 return http.Response(responsecode.OK, stream=stream, | |
266 headers={'Content-Type': contentType}) | |
267 | |
268 def gotNode(nodeIdentifier, payload): | |
269 item = Item(id='current', payload=payload) | |
270 d = self.backend.publish(nodeIdentifier, [item], self.owner) | |
271 d.addCallback(lambda _: nodeIdentifier) | |
272 return d | |
273 | |
274 def getNode(): | |
275 if request.args.get('uri'): | |
276 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) | |
277 return defer.succeed(nodeIdentifier) | |
278 else: | |
279 return self.backend.createNode(None, self.owner) | |
280 | |
281 def doPublish(payload): | |
282 d = getNode() | |
283 d.addCallback(gotNode, payload) | |
284 return d | |
285 | |
286 def trapNotFound(failure): | |
287 failure.trap(error.NodeNotFound) | |
288 return http.StatusResponse(responsecode.NOT_FOUND, | |
289 "Node not found") | |
290 | |
291 def trapXMPPURIParseError(failure): | |
292 failure.trap(XMPPURIParseError) | |
293 return http.StatusResponse(responsecode.BAD_REQUEST, | |
294 "Malformed XMPP URI: %s" % failure.value) | |
295 | |
296 self.checkMediaType(request) | |
297 d = self.parseXMLPayload(request.stream) | |
298 d.addCallback(doPublish) | |
299 d.addCallback(toResponse) | |
300 d.addErrback(trapNotFound) | |
301 d.addErrback(trapXMPPURIParseError) | |
302 return d | |
303 | |
304 | |
305 | |
306 class ListResource(resource.Resource): | |
307 def __init__(self, service): | |
308 self.service = service | |
309 | |
310 | |
311 def render(self, request): | |
312 def responseFromNodes(nodeIdentifiers): | |
313 stream = simplejson.dumps(nodeIdentifiers) | |
314 contentType = http_headers.MimeType.fromString(MIME_JSON) | |
315 return http.Response(responsecode.OK, stream=stream, | |
316 headers={'Content-Type': contentType}) | |
317 | |
318 d = self.service.getNodes() | |
319 d.addCallback(responseFromNodes) | |
320 return d | |
321 | |
322 | |
323 | |
324 # Service for subscribing to remote XMPP Pubsub nodes and web resources | |
325 | |
326 def extractAtomEntries(items): | |
327 """ | |
328 Extract atom entries from a list of publish-subscribe items. | |
329 | |
330 @param items: List of L{domish.Element}s that represent publish-subscribe | |
331 items. | |
332 @type items: C{list} | |
333 """ | |
334 | |
335 atomEntries = [] | |
336 | |
337 for item in items: | |
338 # ignore non-items (i.e. retractions) | |
339 if item.name != 'item': | |
340 continue | |
341 | |
342 atomEntry = None | |
343 for element in item.elements(): | |
344 # extract the first element that is an atom entry | |
345 if element.uri == NS_ATOM and element.name == 'entry': | |
346 atomEntry = element | |
347 break | |
348 | |
349 if atomEntry: | |
350 atomEntries.append(atomEntry) | |
351 | |
352 return atomEntries | |
353 | |
354 | |
355 | |
356 def constructFeed(service, nodeIdentifier, entries, title): | |
357 nodeURI = getXMPPURI(service, nodeIdentifier) | |
358 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) | |
359 | |
360 # Collect the received entries in a feed | |
361 feed = domish.Element((NS_ATOM, 'feed')) | |
362 feed.addElement('title', content=title) | |
363 feed.addElement('id', content=nodeURI) | |
364 feed.addElement('updated', content=now) | |
365 | |
366 for entry in entries: | |
367 feed.addChild(entry) | |
368 | |
369 return feed | |
370 | |
371 | |
372 | |
373 class RemoteSubscriptionService(service.Service, PubSubClient): | |
374 """ | |
375 Service for subscribing to remote XMPP Publish-Subscribe nodes. | |
376 | |
377 Subscriptions are created with a callback HTTP URI that is POSTed | |
378 to with the received items in notifications. | |
379 """ | |
380 | |
381 def __init__(self, jid, storage): | |
382 self.jid = jid | |
383 self.storage = storage | |
384 | |
385 | |
386 def trapNotFound(self, failure): | |
387 failure.trap(StanzaError) | |
388 | |
389 if failure.value.condition == 'item-not-found': | |
390 raise error.NodeNotFound() | |
391 else: | |
392 return failure | |
393 | |
394 | |
395 def subscribeCallback(self, jid, nodeIdentifier, callback): | |
396 """ | |
397 Subscribe a callback URI. | |
398 | |
399 This registers a callback URI to be called when a notification is | |
400 received for the given node. | |
401 | |
402 If this is the first callback registered for this node, the gateway | |
403 will subscribe to the node. Otherwise, the most recently published item | |
404 for this node is retrieved and, if present, the newly registered | |
405 callback will be called with that item. | |
406 """ | |
407 | |
408 def callbackForLastItem(items): | |
409 atomEntries = extractAtomEntries(items) | |
410 | |
411 if not atomEntries: | |
412 return | |
413 | |
414 self._postTo([callback], jid, nodeIdentifier, atomEntries[0], | |
415 'application/atom+xml;type=entry') | |
416 | |
417 def subscribeOrItems(hasCallbacks): | |
418 if hasCallbacks: | |
419 if not nodeIdentifier: | |
420 return None | |
421 d = self.items(jid, nodeIdentifier, 1) | |
422 d.addCallback(callbackForLastItem) | |
423 else: | |
424 d = self.subscribe(jid, nodeIdentifier, self.jid) | |
425 | |
426 d.addErrback(self.trapNotFound) | |
427 return d | |
428 | |
429 d = self.storage.hasCallbacks(jid, nodeIdentifier) | |
430 d.addCallback(subscribeOrItems) | |
431 d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, | |
432 callback)) | |
433 return d | |
434 | |
435 | |
436 def unsubscribeCallback(self, jid, nodeIdentifier, callback): | |
437 """ | |
438 Unsubscribe a callback. | |
439 | |
440 If this was the last registered callback for this node, the | |
441 gateway will unsubscribe from node. | |
442 """ | |
443 | |
444 def cb(last): | |
445 if last: | |
446 return self.unsubscribe(jid, nodeIdentifier, self.jid) | |
447 | |
448 d = self.storage.removeCallback(jid, nodeIdentifier, callback) | |
449 d.addCallback(cb) | |
450 return d | |
451 | |
452 | |
453 def itemsReceived(self, event): | |
454 """ | |
455 Fire up HTTP client to do callback | |
456 """ | |
457 | |
458 atomEntries = extractAtomEntries(event.items) | |
459 service = event.sender | |
460 nodeIdentifier = event.nodeIdentifier | |
461 headers = event.headers | |
462 | |
463 # Don't notify if there are no atom entries | |
464 if not atomEntries: | |
465 return | |
466 | |
467 if len(atomEntries) == 1: | |
468 contentType = 'application/atom+xml;type=entry' | |
469 payload = atomEntries[0] | |
470 else: | |
471 contentType = 'application/atom+xml;type=feed' | |
472 payload = constructFeed(service, nodeIdentifier, atomEntries, | |
473 title='Received item collection') | |
474 | |
475 self.callCallbacks(service, nodeIdentifier, payload, contentType) | |
476 | |
477 if 'Collection' in headers: | |
478 for collection in headers['Collection']: | |
479 nodeIdentifier = collection or '' | |
480 self.callCallbacks(service, nodeIdentifier, payload, | |
481 contentType) | |
482 | |
483 | |
484 def deleteReceived(self, event): | |
485 """ | |
486 Fire up HTTP client to do callback | |
487 """ | |
488 | |
489 service = event.sender | |
490 nodeIdentifier = event.nodeIdentifier | |
491 redirectURI = event.redirectURI | |
492 self.callCallbacks(service, nodeIdentifier, eventType='DELETED', | |
493 redirectURI=redirectURI) | |
494 | |
495 | |
496 def _postTo(self, callbacks, service, nodeIdentifier, | |
497 payload=None, contentType=None, eventType=None, | |
498 redirectURI=None): | |
499 | |
500 if not callbacks: | |
501 return | |
502 | |
503 postdata = None | |
504 nodeURI = getXMPPURI(service, nodeIdentifier) | |
505 headers = {'Referer': nodeURI.encode('utf-8'), | |
506 'PubSub-Service': service.full().encode('utf-8')} | |
507 | |
508 if payload: | |
509 postdata = payload.toXml().encode('utf-8') | |
510 if contentType: | |
511 headers['Content-Type'] = "%s;charset=utf-8" % contentType | |
512 | |
513 if eventType: | |
514 headers['Event'] = eventType | |
515 | |
516 if redirectURI: | |
517 headers['Link'] = '<%s>; rel=alternate' % ( | |
518 redirectURI.encode('utf-8'), | |
519 ) | |
520 | |
521 def postNotification(callbackURI): | |
522 f = getPageWithFactory(str(callbackURI), | |
523 method='POST', | |
524 postdata=postdata, | |
525 headers=headers) | |
526 d = f.deferred | |
527 d.addErrback(log.err) | |
528 | |
529 for callbackURI in callbacks: | |
530 reactor.callLater(0, postNotification, callbackURI) | |
531 | |
532 | |
533 def callCallbacks(self, service, nodeIdentifier, | |
534 payload=None, contentType=None, eventType=None, | |
535 redirectURI=None): | |
536 | |
537 def eb(failure): | |
538 failure.trap(error.NoCallbacks) | |
539 | |
540 # No callbacks were registered for this node. Unsubscribe? | |
541 | |
542 d = self.storage.getCallbacks(service, nodeIdentifier) | |
543 d.addCallback(self._postTo, service, nodeIdentifier, payload, | |
544 contentType, eventType, redirectURI) | |
545 d.addErrback(eb) | |
546 d.addErrback(log.err) | |
547 | |
548 | |
549 | |
550 class RemoteSubscribeBaseResource(resource.Resource): | |
551 """ | |
552 Base resource for remote pubsub node subscription and unsubscription. | |
553 | |
554 This resource accepts POST request with a JSON document that holds | |
555 a dictionary with the keys C{uri} and C{callback} that respectively map | |
556 to the XMPP URI of the publish-subscribe node and the callback URI. | |
557 | |
558 This class should be inherited with L{serviceMethod} overridden. | |
559 | |
560 @cvar serviceMethod: The name of the method to be called with | |
561 the JID of the pubsub service, the node identifier | |
562 and the callback URI as received in the HTTP POST | |
563 request to this resource. | |
564 """ | |
565 serviceMethod = None | |
566 errorMap = { | |
567 error.NodeNotFound: | |
568 (responsecode.FORBIDDEN, "Node not found"), | |
569 error.NotSubscribed: | |
570 (responsecode.FORBIDDEN, "No such subscription found"), | |
571 error.SubscriptionExists: | |
572 (responsecode.FORBIDDEN, "Subscription already exists"), | |
573 } | |
574 | |
575 def __init__(self, service): | |
576 self.service = service | |
577 self.params = None | |
578 | |
579 | |
580 http_GET = None | |
581 | |
582 | |
583 def http_POST(self, request): | |
584 def trapNotFound(failure): | |
585 err = failure.trap(*self.errorMap.keys()) | |
586 code, msg = self.errorMap[err] | |
587 return http.StatusResponse(code, msg) | |
588 | |
589 def respond(result): | |
590 return http.Response(responsecode.NO_CONTENT) | |
591 | |
592 def gotRequest(result): | |
593 uri = self.params['uri'] | |
594 callback = self.params['callback'] | |
595 | |
596 jid, nodeIdentifier = getServiceAndNode(uri) | |
597 method = getattr(self.service, self.serviceMethod) | |
598 d = method(jid, nodeIdentifier, callback) | |
599 return d | |
600 | |
601 def storeParams(data): | |
602 self.params = simplejson.loads(data) | |
603 | |
604 def trapXMPPURIParseError(failure): | |
605 failure.trap(XMPPURIParseError) | |
606 return http.StatusResponse(responsecode.BAD_REQUEST, | |
607 "Malformed XMPP URI: %s" % failure.value) | |
608 | |
609 d = readStream(request.stream, storeParams) | |
610 d.addCallback(gotRequest) | |
611 d.addCallback(respond) | |
612 d.addErrback(trapNotFound) | |
613 d.addErrback(trapXMPPURIParseError) | |
614 return d | |
615 | |
616 | |
617 | |
618 class RemoteSubscribeResource(RemoteSubscribeBaseResource): | |
619 """ | |
620 Resource to subscribe to a remote publish-subscribe node. | |
621 | |
622 The passed C{uri} is the XMPP URI of the node to subscribe to and the | |
623 C{callback} is the callback URI. Upon receiving notifications from the | |
624 node, a POST request will be perfomed on the callback URI. | |
625 """ | |
626 serviceMethod = 'subscribeCallback' | |
627 | |
628 | |
629 | |
630 class RemoteUnsubscribeResource(RemoteSubscribeBaseResource): | |
631 """ | |
632 Resource to unsubscribe from a remote publish-subscribe node. | |
633 | |
634 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the | |
635 C{callback} is the callback URI that was registered for it. | |
636 """ | |
637 serviceMethod = 'unsubscribeCallback' | |
638 | |
639 | |
640 | |
641 class RemoteItemsResource(resource.Resource): | |
642 """ | |
643 Resource for retrieving items from a remote pubsub node. | |
644 """ | |
645 | |
646 def __init__(self, service): | |
647 self.service = service | |
648 | |
649 | |
650 def render(self, request): | |
651 try: | |
652 maxItems = int(request.args.get('max_items', [0])[0]) or None | |
653 except ValueError: | |
654 return http.StatusResponse(responsecode.BAD_REQUEST, | |
655 "The argument max_items has an invalid value.") | |
656 | |
657 try: | |
658 uri = request.args['uri'][0] | |
659 except KeyError: | |
660 return http.StatusResponse(responsecode.BAD_REQUEST, | |
661 "No URI for the remote node provided.") | |
662 | |
663 try: | |
664 jid, nodeIdentifier = getServiceAndNode(uri) | |
665 except XMPPURIParseError: | |
666 return http.StatusResponse(responsecode.BAD_REQUEST, | |
667 "Malformed XMPP URI: %s" % uri) | |
668 | |
669 def respond(items): | |
670 """Create a feed out the retrieved items.""" | |
671 contentType = http_headers.MimeType('application', | |
672 'atom+xml', | |
673 {'type': 'feed'}) | |
674 atomEntries = extractAtomEntries(items) | |
675 feed = constructFeed(jid, nodeIdentifier, atomEntries, | |
676 "Retrieved item collection") | |
677 payload = feed.toXml().encode('utf-8') | |
678 return http.Response(responsecode.OK, stream=payload, | |
679 headers={'Content-Type': contentType}) | |
680 | |
681 def trapNotFound(failure): | |
682 failure.trap(StanzaError) | |
683 if not failure.value.condition == 'item-not-found': | |
684 raise failure | |
685 return http.StatusResponse(responsecode.NOT_FOUND, | |
686 "Node not found") | |
687 | |
688 d = self.service.items(jid, nodeIdentifier, maxItems) | |
689 d.addCallback(respond) | |
690 d.addErrback(trapNotFound) | |
691 return d | |
692 | |
693 | |
694 | |
695 # Client side code to interact with a service as provided above | |
696 | |
697 def getPageWithFactory(url, contextFactory=None, *args, **kwargs): | |
698 """Download a web page. | |
699 | |
700 Download a page. Return the factory that holds a deferred, which will | |
701 callback with a page (as a string) or errback with a description of the | |
702 error. | |
703 | |
704 See HTTPClientFactory to see what extra args can be passed. | |
705 """ | |
706 | |
707 scheme, host, port, path = client._parse(url) | |
708 factory = client.HTTPClientFactory(url, *args, **kwargs) | |
709 factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200() | |
710 | |
711 if scheme == 'https': | |
712 from twisted.internet import ssl | |
713 if contextFactory is None: | |
714 contextFactory = ssl.ClientContextFactory() | |
715 reactor.connectSSL(host, port, factory, contextFactory) | |
716 else: | |
717 reactor.connectTCP(host, port, factory) | |
718 return factory | |
719 | |
720 | |
721 | |
722 class CallbackResource(resource.Resource): | |
723 """ | |
724 Web resource for retrieving gateway notifications. | |
725 """ | |
726 | |
727 def __init__(self, callback): | |
728 self.callback = callback | |
729 | |
730 | |
731 http_GET = None | |
732 | |
733 | |
734 def http_POST(self, request): | |
735 p = WebStreamParser() | |
736 if not request.headers.hasHeader('Event'): | |
737 d = p.parse(request.stream) | |
738 else: | |
739 d = defer.succeed(None) | |
740 d.addCallback(self.callback, request.headers) | |
741 d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) | |
742 return d | |
743 | |
744 | |
745 | |
746 class GatewayClient(service.Service): | |
747 """ | |
748 Service that provides client access to the HTTP Gateway into Idavoll. | |
749 """ | |
750 | |
751 agent = "Idavoll HTTP Gateway Client" | |
752 | |
753 def __init__(self, baseURI, callbackHost=None, callbackPort=None): | |
754 self.baseURI = baseURI | |
755 self.callbackHost = callbackHost or 'localhost' | |
756 self.callbackPort = callbackPort or 8087 | |
757 root = resource.Resource() | |
758 root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) | |
759 self.site = server.Site(root) | |
760 | |
761 | |
762 def startService(self): | |
763 self.port = reactor.listenTCP(self.callbackPort, | |
764 channel.HTTPFactory(self.site)) | |
765 | |
766 | |
767 def stopService(self): | |
768 return self.port.stopListening() | |
769 | |
770 | |
771 def _makeURI(self, verb, query=None): | |
772 uriComponents = urlparse.urlparse(self.baseURI) | |
773 uri = urlparse.urlunparse((uriComponents[0], | |
774 uriComponents[1], | |
775 uriComponents[2] + verb, | |
776 '', | |
777 query and urllib.urlencode(query) or '', | |
778 '')) | |
779 return uri | |
780 | |
781 | |
782 def callback(self, data, headers): | |
783 pass | |
784 | |
785 | |
786 def ping(self): | |
787 f = getPageWithFactory(self._makeURI(''), | |
788 method='HEAD', | |
789 agent=self.agent) | |
790 return f.deferred | |
791 | |
792 | |
793 def create(self): | |
794 f = getPageWithFactory(self._makeURI('create'), | |
795 method='POST', | |
796 agent=self.agent) | |
797 return f.deferred.addCallback(simplejson.loads) | |
798 | |
799 | |
800 def delete(self, xmppURI, redirectURI=None): | |
801 query = {'uri': xmppURI} | |
802 | |
803 if redirectURI: | |
804 params = {'redirect_uri': redirectURI} | |
805 postdata = simplejson.dumps(params) | |
806 headers = {'Content-Type': MIME_JSON} | |
807 else: | |
808 postdata = None | |
809 headers = None | |
810 | |
811 f = getPageWithFactory(self._makeURI('delete', query), | |
812 method='POST', | |
813 postdata=postdata, | |
814 headers=headers, | |
815 agent=self.agent) | |
816 return f.deferred | |
817 | |
818 | |
819 def publish(self, entry, xmppURI=None): | |
820 query = xmppURI and {'uri': xmppURI} | |
821 | |
822 f = getPageWithFactory(self._makeURI('publish', query), | |
823 method='POST', | |
824 postdata=entry.toXml().encode('utf-8'), | |
825 headers={'Content-Type': MIME_ATOM_ENTRY}, | |
826 agent=self.agent) | |
827 return f.deferred.addCallback(simplejson.loads) | |
828 | |
829 | |
830 def listNodes(self): | |
831 f = getPageWithFactory(self._makeURI('list'), | |
832 method='GET', | |
833 agent=self.agent) | |
834 return f.deferred.addCallback(simplejson.loads) | |
835 | |
836 | |
837 def subscribe(self, xmppURI): | |
838 params = {'uri': xmppURI, | |
839 'callback': 'http://%s:%s/callback' % (self.callbackHost, | |
840 self.callbackPort)} | |
841 f = getPageWithFactory(self._makeURI('subscribe'), | |
842 method='POST', | |
843 postdata=simplejson.dumps(params), | |
844 headers={'Content-Type': MIME_JSON}, | |
845 agent=self.agent) | |
846 return f.deferred | |
847 | |
848 | |
849 def unsubscribe(self, xmppURI): | |
850 params = {'uri': xmppURI, | |
851 'callback': 'http://%s:%s/callback' % (self.callbackHost, | |
852 self.callbackPort)} | |
853 f = getPageWithFactory(self._makeURI('unsubscribe'), | |
854 method='POST', | |
855 postdata=simplejson.dumps(params), | |
856 headers={'Content-Type': MIME_JSON}, | |
857 agent=self.agent) | |
858 return f.deferred | |
859 | |
860 | |
861 def items(self, xmppURI, maxItems=None): | |
862 query = {'uri': xmppURI} | |
863 if maxItems: | |
864 query['max_items'] = int(maxItems) | |
865 f = getPageWithFactory(self._makeURI('items', query), | |
866 method='GET', | |
867 agent=self.agent) | |
868 return f.deferred |