Mercurial > libervia-pubsub
comparison src/gateway.py @ 369:dabee42494ac
config file + cleaning:
- SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir).
Its options must be in the "pubsub" section
- options on command line override config options
- removed tap and http files which are not used anymore
- changed directory structure to put source in src, to be coherent with SàT and Libervia
- changed options name, db* become db_*, secret become xmpp_pwd
- an exception is raised if jid or xmpp_pwd is are not configured
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Mar 2018 12:59:38 +0100 |
parents | sat_pubsub/gateway.py@618a92080812 |
children | aa3a464df605 |
comparison
equal
deleted
inserted
replaced
368:618a92080812 | 369:dabee42494ac |
---|---|
1 #!/usr/bin/python | |
2 #-*- coding: utf-8 -*- | |
3 | |
4 # Copyright (c) 2003-2011 Ralph Meijer | |
5 # Copyright (c) 2012-2018 Jérôme Poisson | |
6 | |
7 | |
8 # This program is free software: you can redistribute it and/or modify | |
9 # it under the terms of the GNU Affero General Public License as published by | |
10 # the Free Software Foundation, either version 3 of the License, or | |
11 # (at your option) any later version. | |
12 | |
13 # This program is distributed in the hope that it will be useful, | |
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 # GNU Affero General Public License for more details. | |
17 | |
18 # You should have received a copy of the GNU Affero General Public License | |
19 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 # -- | |
21 | |
22 # This program is based on Idavoll (http://idavoll.ik.nu/), | |
23 # originaly written by Ralph Meijer (http://ralphm.net/blog/) | |
24 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original | |
25 # license. | |
26 | |
27 # -- | |
28 | |
29 # Here is a copy of the original license: | |
30 | |
31 # Copyright (c) 2003-2011 Ralph Meijer | |
32 | |
33 # Permission is hereby granted, free of charge, to any person obtaining | |
34 # a copy of this software and associated documentation files (the | |
35 # "Software"), to deal in the Software without restriction, including | |
36 # without limitation the rights to use, copy, modify, merge, publish, | |
37 # distribute, sublicense, and/or sell copies of the Software, and to | |
38 # permit persons to whom the Software is furnished to do so, subject to | |
39 # the following conditions: | |
40 | |
41 # The above copyright notice and this permission notice shall be | |
42 # included in all copies or substantial portions of the Software. | |
43 | |
44 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
45 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
46 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
47 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE | |
48 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION | |
49 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION | |
50 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
51 | |
52 | |
53 """ | |
54 Web resources and client for interacting with pubsub services. | |
55 """ | |
56 | |
57 import mimetools | |
58 from time import gmtime, strftime | |
59 from StringIO import StringIO | |
60 import urllib | |
61 import urlparse | |
62 | |
63 import simplejson | |
64 | |
65 from twisted.application import service | |
66 from twisted.internet import defer, reactor | |
67 from twisted.python import log | |
68 from twisted.web import client, http, resource, server | |
69 from twisted.web.error import Error | |
70 from twisted.words.protocols.jabber.jid import JID | |
71 from twisted.words.protocols.jabber.error import StanzaError | |
72 from twisted.words.xish import domish | |
73 | |
74 from wokkel.generic import parseXml | |
75 from wokkel.pubsub import Item | |
76 from wokkel.pubsub import PubSubClient | |
77 | |
78 from sat_pubsub import error | |
79 | |
80 NS_ATOM = 'http://www.w3.org/2005/Atom' | |
81 MIME_ATOM_ENTRY = b'application/atom+xml;type=entry' | |
82 MIME_ATOM_FEED = b'application/atom+xml;type=feed' | |
83 MIME_JSON = b'application/json' | |
84 | |
85 class XMPPURIParseError(ValueError): | |
86 """ | |
87 Raised when a given XMPP URI couldn't be properly parsed. | |
88 """ | |
89 | |
90 | |
91 | |
92 def getServiceAndNode(uri): | |
93 """ | |
94 Given an XMPP URI, extract the publish subscribe service JID and node ID. | |
95 """ | |
96 | |
97 try: | |
98 scheme, rest = uri.split(':', 1) | |
99 except ValueError: | |
100 raise XMPPURIParseError("No URI scheme component") | |
101 | |
102 if scheme != 'xmpp': | |
103 raise XMPPURIParseError("Unknown URI scheme") | |
104 | |
105 if rest.startswith("//"): | |
106 raise XMPPURIParseError("Unexpected URI authority component") | |
107 | |
108 try: | |
109 entity, query = rest.split('?', 1) | |
110 except ValueError: | |
111 entity, query = rest, '' | |
112 | |
113 if not entity: | |
114 raise XMPPURIParseError("Empty URI path component") | |
115 | |
116 try: | |
117 service = JID(entity) | |
118 except Exception, e: | |
119 raise XMPPURIParseError("Invalid JID: %s" % e) | |
120 | |
121 params = urlparse.parse_qs(query) | |
122 | |
123 try: | |
124 nodeIdentifier = params['node'][0] | |
125 except (KeyError, ValueError): | |
126 nodeIdentifier = '' | |
127 | |
128 return service, nodeIdentifier | |
129 | |
130 | |
131 | |
132 def getXMPPURI(service, nodeIdentifier): | |
133 """ | |
134 Construct an XMPP URI from a service JID and node identifier. | |
135 """ | |
136 return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '') | |
137 | |
138 | |
139 | |
140 def _parseContentType(header): | |
141 """ | |
142 Parse a Content-Type header value to a L{mimetools.Message}. | |
143 | |
144 L{mimetools.Message} parses a Content-Type header and makes the | |
145 components available with its C{getmaintype}, C{getsubtype}, C{gettype}, | |
146 C{getplist} and C{getparam} methods. | |
147 """ | |
148 return mimetools.Message(StringIO(b'Content-Type: ' + header)) | |
149 | |
150 | |
151 | |
152 def _asyncResponse(render): | |
153 """ | |
154 """ | |
155 def wrapped(self, request): | |
156 def eb(failure): | |
157 if failure.check(Error): | |
158 err = failure.value | |
159 else: | |
160 log.err(failure) | |
161 err = Error(500) | |
162 request.setResponseCode(err.status, err.message) | |
163 return err.response | |
164 | |
165 def finish(result): | |
166 if result is server.NOT_DONE_YET: | |
167 return | |
168 | |
169 if result: | |
170 request.write(result) | |
171 request.finish() | |
172 | |
173 d = defer.maybeDeferred(render, self, request) | |
174 d.addErrback(eb) | |
175 d.addCallback(finish) | |
176 | |
177 return server.NOT_DONE_YET | |
178 | |
179 return wrapped | |
180 | |
181 | |
182 | |
183 class CreateResource(resource.Resource): | |
184 """ | |
185 A resource to create a publish-subscribe node. | |
186 """ | |
187 def __init__(self, backend, serviceJID, owner): | |
188 self.backend = backend | |
189 self.serviceJID = serviceJID | |
190 self.owner = owner | |
191 | |
192 | |
193 http_GET = None | |
194 | |
195 | |
196 @_asyncResponse | |
197 def render_POST(self, request): | |
198 """ | |
199 Respond to a POST request to create a new node. | |
200 """ | |
201 | |
202 def toResponse(nodeIdentifier): | |
203 uri = getXMPPURI(self.serviceJID, nodeIdentifier) | |
204 body = simplejson.dumps({'uri': uri}) | |
205 request.setHeader(b'Content-Type', MIME_JSON) | |
206 return body | |
207 | |
208 d = self.backend.createNode(None, self.owner) | |
209 d.addCallback(toResponse) | |
210 return d | |
211 | |
212 | |
213 | |
214 class DeleteResource(resource.Resource): | |
215 """ | |
216 A resource to create a publish-subscribe node. | |
217 """ | |
218 def __init__(self, backend, serviceJID, owner): | |
219 self.backend = backend | |
220 self.serviceJID = serviceJID | |
221 self.owner = owner | |
222 | |
223 | |
224 render_GET = None | |
225 | |
226 | |
227 @_asyncResponse | |
228 def render_POST(self, request): | |
229 """ | |
230 Respond to a POST request to create a new node. | |
231 """ | |
232 def toResponse(result): | |
233 request.setResponseCode(http.NO_CONTENT) | |
234 | |
235 def trapNotFound(failure): | |
236 failure.trap(error.NodeNotFound) | |
237 raise Error(http.NOT_FOUND, "Node not found") | |
238 | |
239 if not request.args.get('uri'): | |
240 raise Error(http.BAD_REQUEST, "No URI given") | |
241 | |
242 try: | |
243 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) | |
244 except XMPPURIParseError, e: | |
245 raise Error(http.BAD_REQUEST, "Malformed XMPP URI: %s" % e) | |
246 | |
247 | |
248 data = request.content.read() | |
249 if data: | |
250 params = simplejson.loads(data) | |
251 redirectURI = params.get('redirect_uri', None) | |
252 else: | |
253 redirectURI = None | |
254 | |
255 d = self.backend.deleteNode(nodeIdentifier, self.owner, | |
256 redirectURI) | |
257 d.addCallback(toResponse) | |
258 d.addErrback(trapNotFound) | |
259 return d | |
260 | |
261 | |
262 | |
263 class PublishResource(resource.Resource): | |
264 """ | |
265 A resource to publish to a publish-subscribe node. | |
266 """ | |
267 | |
268 def __init__(self, backend, serviceJID, owner): | |
269 self.backend = backend | |
270 self.serviceJID = serviceJID | |
271 self.owner = owner | |
272 | |
273 | |
274 render_GET = None | |
275 | |
276 | |
277 def checkMediaType(self, request): | |
278 ctype = request.getHeader(b'content-type') | |
279 | |
280 if not ctype: | |
281 request.setResponseCode(http.BAD_REQUEST) | |
282 | |
283 raise Error(http.BAD_REQUEST, b"No specified Media Type") | |
284 | |
285 message = _parseContentType(ctype) | |
286 if (message.maintype != b'application' or | |
287 message.subtype != b'atom+xml' or | |
288 message.getparam(b'type') != b'entry' or | |
289 (message.getparam(b'charset') or b'utf-8') != b'utf-8'): | |
290 raise Error(http.UNSUPPORTED_MEDIA_TYPE, | |
291 b"Unsupported Media Type: %s" % ctype) | |
292 | |
293 | |
294 @_asyncResponse | |
295 def render_POST(self, request): | |
296 """ | |
297 Respond to a POST request to create a new item. | |
298 """ | |
299 | |
300 def toResponse(nodeIdentifier): | |
301 uri = getXMPPURI(self.serviceJID, nodeIdentifier) | |
302 body = simplejson.dumps({'uri': uri}) | |
303 request.setHeader(b'Content-Type', MIME_JSON) | |
304 return body | |
305 | |
306 def gotNode(nodeIdentifier, payload): | |
307 item = Item(id='current', payload=payload) | |
308 d = self.backend.publish(nodeIdentifier, [item], self.owner) | |
309 d.addCallback(lambda _: nodeIdentifier) | |
310 return d | |
311 | |
312 def getNode(): | |
313 if request.args.get('uri'): | |
314 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0]) | |
315 return defer.succeed(nodeIdentifier) | |
316 else: | |
317 return self.backend.createNode(None, self.owner) | |
318 | |
319 def trapNotFound(failure): | |
320 failure.trap(error.NodeNotFound) | |
321 raise Error(http.NOT_FOUND, "Node not found") | |
322 | |
323 def trapXMPPURIParseError(failure): | |
324 failure.trap(XMPPURIParseError) | |
325 raise Error(http.BAD_REQUEST, | |
326 "Malformed XMPP URI: %s" % failure.value) | |
327 | |
328 self.checkMediaType(request) | |
329 payload = parseXml(request.content.read()) | |
330 d = getNode() | |
331 d.addCallback(gotNode, payload) | |
332 d.addCallback(toResponse) | |
333 d.addErrback(trapNotFound) | |
334 d.addErrback(trapXMPPURIParseError) | |
335 return d | |
336 | |
337 | |
338 | |
339 class ListResource(resource.Resource): | |
340 def __init__(self, service): | |
341 self.service = service | |
342 | |
343 | |
344 @_asyncResponse | |
345 def render_GET(self, request): | |
346 def responseFromNodes(nodeIdentifiers): | |
347 body = simplejson.dumps(nodeIdentifiers) | |
348 request.setHeader(b'Content-Type', MIME_JSON) | |
349 return body | |
350 | |
351 d = self.service.getNodes() | |
352 d.addCallback(responseFromNodes) | |
353 return d | |
354 | |
355 | |
356 | |
357 # Service for subscribing to remote XMPP Pubsub nodes and web resources | |
358 | |
359 def extractAtomEntries(items): | |
360 """ | |
361 Extract atom entries from a list of publish-subscribe items. | |
362 | |
363 @param items: List of L{domish.Element}s that represent publish-subscribe | |
364 items. | |
365 @type items: C{list} | |
366 """ | |
367 | |
368 atomEntries = [] | |
369 | |
370 for item in items: | |
371 # ignore non-items (i.e. retractions) | |
372 if item.name != 'item': | |
373 continue | |
374 | |
375 atomEntry = None | |
376 for element in item.elements(): | |
377 # extract the first element that is an atom entry | |
378 if element.uri == NS_ATOM and element.name == 'entry': | |
379 atomEntry = element | |
380 break | |
381 | |
382 if atomEntry: | |
383 atomEntries.append(atomEntry) | |
384 | |
385 return atomEntries | |
386 | |
387 | |
388 | |
389 def constructFeed(service, nodeIdentifier, entries, title): | |
390 nodeURI = getXMPPURI(service, nodeIdentifier) | |
391 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) | |
392 | |
393 # Collect the received entries in a feed | |
394 feed = domish.Element((NS_ATOM, 'feed')) | |
395 feed.addElement('title', content=title) | |
396 feed.addElement('id', content=nodeURI) | |
397 feed.addElement('updated', content=now) | |
398 | |
399 for entry in entries: | |
400 feed.addChild(entry) | |
401 | |
402 return feed | |
403 | |
404 | |
405 | |
406 class RemoteSubscriptionService(service.Service, PubSubClient): | |
407 """ | |
408 Service for subscribing to remote XMPP Publish-Subscribe nodes. | |
409 | |
410 Subscriptions are created with a callback HTTP URI that is POSTed | |
411 to with the received items in notifications. | |
412 """ | |
413 | |
414 def __init__(self, jid, storage): | |
415 self.jid = jid | |
416 self.storage = storage | |
417 | |
418 | |
419 def trapNotFound(self, failure): | |
420 failure.trap(StanzaError) | |
421 | |
422 if failure.value.condition == 'item-not-found': | |
423 raise error.NodeNotFound() | |
424 else: | |
425 return failure | |
426 | |
427 | |
428 def subscribeCallback(self, jid, nodeIdentifier, callback): | |
429 """ | |
430 Subscribe a callback URI. | |
431 | |
432 This registers a callback URI to be called when a notification is | |
433 received for the given node. | |
434 | |
435 If this is the first callback registered for this node, the gateway | |
436 will subscribe to the node. Otherwise, the most recently published item | |
437 for this node is retrieved and, if present, the newly registered | |
438 callback will be called with that item. | |
439 """ | |
440 | |
441 def callbackForLastItem(items): | |
442 atomEntries = extractAtomEntries(items) | |
443 | |
444 if not atomEntries: | |
445 return | |
446 | |
447 self._postTo([callback], jid, nodeIdentifier, atomEntries[0], | |
448 MIME_ATOM_ENTRY) | |
449 | |
450 def subscribeOrItems(hasCallbacks): | |
451 if hasCallbacks: | |
452 if not nodeIdentifier: | |
453 return None | |
454 d = self.items(jid, nodeIdentifier, 1) | |
455 d.addCallback(callbackForLastItem) | |
456 else: | |
457 d = self.subscribe(jid, nodeIdentifier, self.jid) | |
458 | |
459 d.addErrback(self.trapNotFound) | |
460 return d | |
461 | |
462 d = self.storage.hasCallbacks(jid, nodeIdentifier) | |
463 d.addCallback(subscribeOrItems) | |
464 d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier, | |
465 callback)) | |
466 return d | |
467 | |
468 | |
469 def unsubscribeCallback(self, jid, nodeIdentifier, callback): | |
470 """ | |
471 Unsubscribe a callback. | |
472 | |
473 If this was the last registered callback for this node, the | |
474 gateway will unsubscribe from node. | |
475 """ | |
476 | |
477 def cb(last): | |
478 if last: | |
479 return self.unsubscribe(jid, nodeIdentifier, self.jid) | |
480 | |
481 d = self.storage.removeCallback(jid, nodeIdentifier, callback) | |
482 d.addCallback(cb) | |
483 return d | |
484 | |
485 | |
486 def itemsReceived(self, event): | |
487 """ | |
488 Fire up HTTP client to do callback | |
489 """ | |
490 | |
491 atomEntries = extractAtomEntries(event.items) | |
492 service = event.sender | |
493 nodeIdentifier = event.nodeIdentifier | |
494 headers = event.headers | |
495 | |
496 # Don't notify if there are no atom entries | |
497 if not atomEntries: | |
498 return | |
499 | |
500 if len(atomEntries) == 1: | |
501 contentType = MIME_ATOM_ENTRY | |
502 payload = atomEntries[0] | |
503 else: | |
504 contentType = MIME_ATOM_FEED | |
505 payload = constructFeed(service, nodeIdentifier, atomEntries, | |
506 title='Received item collection') | |
507 | |
508 self.callCallbacks(service, nodeIdentifier, payload, contentType) | |
509 | |
510 if 'Collection' in headers: | |
511 for collection in headers['Collection']: | |
512 nodeIdentifier = collection or '' | |
513 self.callCallbacks(service, nodeIdentifier, payload, | |
514 contentType) | |
515 | |
516 | |
517 def deleteReceived(self, event): | |
518 """ | |
519 Fire up HTTP client to do callback | |
520 """ | |
521 | |
522 service = event.sender | |
523 nodeIdentifier = event.nodeIdentifier | |
524 redirectURI = event.redirectURI | |
525 self.callCallbacks(service, nodeIdentifier, eventType='DELETED', | |
526 redirectURI=redirectURI) | |
527 | |
528 | |
529 def _postTo(self, callbacks, service, nodeIdentifier, | |
530 payload=None, contentType=None, eventType=None, | |
531 redirectURI=None): | |
532 | |
533 if not callbacks: | |
534 return | |
535 | |
536 postdata = None | |
537 nodeURI = getXMPPURI(service, nodeIdentifier) | |
538 headers = {'Referer': nodeURI.encode('utf-8'), | |
539 'PubSub-Service': service.full().encode('utf-8')} | |
540 | |
541 if payload: | |
542 postdata = payload.toXml().encode('utf-8') | |
543 if contentType: | |
544 headers['Content-Type'] = "%s;charset=utf-8" % contentType | |
545 | |
546 if eventType: | |
547 headers['Event'] = eventType | |
548 | |
549 if redirectURI: | |
550 headers['Link'] = '<%s>; rel=alternate' % ( | |
551 redirectURI.encode('utf-8'), | |
552 ) | |
553 | |
554 def postNotification(callbackURI): | |
555 f = getPageWithFactory(str(callbackURI), | |
556 method='POST', | |
557 postdata=postdata, | |
558 headers=headers) | |
559 d = f.deferred | |
560 d.addErrback(log.err) | |
561 | |
562 for callbackURI in callbacks: | |
563 reactor.callLater(0, postNotification, callbackURI) | |
564 | |
565 | |
566 def callCallbacks(self, service, nodeIdentifier, | |
567 payload=None, contentType=None, eventType=None, | |
568 redirectURI=None): | |
569 | |
570 def eb(failure): | |
571 failure.trap(error.NoCallbacks) | |
572 | |
573 # No callbacks were registered for this node. Unsubscribe? | |
574 | |
575 d = self.storage.getCallbacks(service, nodeIdentifier) | |
576 d.addCallback(self._postTo, service, nodeIdentifier, payload, | |
577 contentType, eventType, redirectURI) | |
578 d.addErrback(eb) | |
579 d.addErrback(log.err) | |
580 | |
581 | |
582 | |
583 class RemoteSubscribeBaseResource(resource.Resource): | |
584 """ | |
585 Base resource for remote pubsub node subscription and unsubscription. | |
586 | |
587 This resource accepts POST request with a JSON document that holds | |
588 a dictionary with the keys C{uri} and C{callback} that respectively map | |
589 to the XMPP URI of the publish-subscribe node and the callback URI. | |
590 | |
591 This class should be inherited with L{serviceMethod} overridden. | |
592 | |
593 @cvar serviceMethod: The name of the method to be called with | |
594 the JID of the pubsub service, the node identifier | |
595 and the callback URI as received in the HTTP POST | |
596 request to this resource. | |
597 """ | |
598 serviceMethod = None | |
599 errorMap = { | |
600 error.NodeNotFound: | |
601 (http.FORBIDDEN, "Node not found"), | |
602 error.NotSubscribed: | |
603 (http.FORBIDDEN, "No such subscription found"), | |
604 error.SubscriptionExists: | |
605 (http.FORBIDDEN, "Subscription already exists"), | |
606 } | |
607 | |
608 def __init__(self, service): | |
609 self.service = service | |
610 self.params = None | |
611 | |
612 | |
613 render_GET = None | |
614 | |
615 | |
616 @_asyncResponse | |
617 def render_POST(self, request): | |
618 def trapNotFound(failure): | |
619 err = failure.trap(*self.errorMap.keys()) | |
620 status, message = self.errorMap[err] | |
621 raise Error(status, message) | |
622 | |
623 def toResponse(result): | |
624 request.setResponseCode(http.NO_CONTENT) | |
625 return b'' | |
626 | |
627 def trapXMPPURIParseError(failure): | |
628 failure.trap(XMPPURIParseError) | |
629 raise Error(http.BAD_REQUEST, | |
630 "Malformed XMPP URI: %s" % failure.value) | |
631 | |
632 data = request.content.read() | |
633 self.params = simplejson.loads(data) | |
634 | |
635 uri = self.params['uri'] | |
636 callback = self.params['callback'] | |
637 | |
638 jid, nodeIdentifier = getServiceAndNode(uri) | |
639 method = getattr(self.service, self.serviceMethod) | |
640 d = method(jid, nodeIdentifier, callback) | |
641 d.addCallback(toResponse) | |
642 d.addErrback(trapNotFound) | |
643 d.addErrback(trapXMPPURIParseError) | |
644 return d | |
645 | |
646 | |
647 | |
648 class RemoteSubscribeResource(RemoteSubscribeBaseResource): | |
649 """ | |
650 Resource to subscribe to a remote publish-subscribe node. | |
651 | |
652 The passed C{uri} is the XMPP URI of the node to subscribe to and the | |
653 C{callback} is the callback URI. Upon receiving notifications from the | |
654 node, a POST request will be perfomed on the callback URI. | |
655 """ | |
656 serviceMethod = 'subscribeCallback' | |
657 | |
658 | |
659 | |
660 class RemoteUnsubscribeResource(RemoteSubscribeBaseResource): | |
661 """ | |
662 Resource to unsubscribe from a remote publish-subscribe node. | |
663 | |
664 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the | |
665 C{callback} is the callback URI that was registered for it. | |
666 """ | |
667 serviceMethod = 'unsubscribeCallback' | |
668 | |
669 | |
670 | |
671 class RemoteItemsResource(resource.Resource): | |
672 """ | |
673 Resource for retrieving items from a remote pubsub node. | |
674 """ | |
675 | |
676 def __init__(self, service): | |
677 self.service = service | |
678 | |
679 | |
680 @_asyncResponse | |
681 def render_GET(self, request): | |
682 try: | |
683 maxItems = int(request.args.get('max_items', [0])[0]) or None | |
684 except ValueError: | |
685 raise Error(http.BAD_REQUEST, | |
686 "The argument max_items has an invalid value.") | |
687 | |
688 try: | |
689 uri = request.args['uri'][0] | |
690 except KeyError: | |
691 raise Error(http.BAD_REQUEST, | |
692 "No URI for the remote node provided.") | |
693 | |
694 try: | |
695 jid, nodeIdentifier = getServiceAndNode(uri) | |
696 except XMPPURIParseError: | |
697 raise Error(http.BAD_REQUEST, | |
698 "Malformed XMPP URI: %s" % uri) | |
699 | |
700 def toResponse(items): | |
701 """ | |
702 Create a feed out the retrieved items. | |
703 """ | |
704 atomEntries = extractAtomEntries(items) | |
705 feed = constructFeed(jid, nodeIdentifier, atomEntries, | |
706 "Retrieved item collection") | |
707 body = feed.toXml().encode('utf-8') | |
708 request.setHeader(b'Content-Type', MIME_ATOM_FEED) | |
709 return body | |
710 | |
711 def trapNotFound(failure): | |
712 failure.trap(StanzaError) | |
713 if not failure.value.condition == 'item-not-found': | |
714 raise failure | |
715 raise Error(http.NOT_FOUND, "Node not found") | |
716 | |
717 d = self.service.items(jid, nodeIdentifier, maxItems) | |
718 d.addCallback(toResponse) | |
719 d.addErrback(trapNotFound) | |
720 return d | |
721 | |
722 | |
723 | |
724 # Client side code to interact with a service as provided above | |
725 | |
726 def getPageWithFactory(url, contextFactory=None, *args, **kwargs): | |
727 """Download a web page. | |
728 | |
729 Download a page. Return the factory that holds a deferred, which will | |
730 callback with a page (as a string) or errback with a description of the | |
731 error. | |
732 | |
733 See HTTPClientFactory to see what extra args can be passed. | |
734 """ | |
735 | |
736 factory = client.HTTPClientFactory(url, *args, **kwargs) | |
737 factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200() | |
738 | |
739 if factory.scheme == 'https': | |
740 from twisted.internet import ssl | |
741 if contextFactory is None: | |
742 contextFactory = ssl.ClientContextFactory() | |
743 reactor.connectSSL(factory.host, factory.port, factory, contextFactory) | |
744 else: | |
745 reactor.connectTCP(factory.host, factory.port, factory) | |
746 return factory | |
747 | |
748 | |
749 | |
750 class CallbackResource(resource.Resource): | |
751 """ | |
752 Web resource for retrieving gateway notifications. | |
753 """ | |
754 | |
755 def __init__(self, callback): | |
756 self.callback = callback | |
757 | |
758 | |
759 http_GET = None | |
760 | |
761 | |
762 def render_POST(self, request): | |
763 if request.requestHeaders.hasHeader(b'Event'): | |
764 payload = None | |
765 else: | |
766 payload = parseXml(request.content.read()) | |
767 | |
768 self.callback(payload, request.requestHeaders) | |
769 | |
770 request.setResponseCode(http.NO_CONTENT) | |
771 return b'' | |
772 | |
773 | |
774 | |
775 | |
776 class GatewayClient(service.Service): | |
777 """ | |
778 Service that provides client access to the HTTP Gateway into Idavoll. | |
779 """ | |
780 | |
781 agent = "Idavoll HTTP Gateway Client" | |
782 | |
783 def __init__(self, baseURI, callbackHost=None, callbackPort=None): | |
784 self.baseURI = baseURI | |
785 self.callbackHost = callbackHost or 'localhost' | |
786 self.callbackPort = callbackPort or 8087 | |
787 root = resource.Resource() | |
788 root.putChild('callback', CallbackResource( | |
789 lambda *args, **kwargs: self.callback(*args, **kwargs))) | |
790 self.site = server.Site(root) | |
791 | |
792 | |
793 def startService(self): | |
794 self.port = reactor.listenTCP(self.callbackPort, | |
795 self.site) | |
796 | |
797 | |
798 def stopService(self): | |
799 return self.port.stopListening() | |
800 | |
801 | |
802 def _makeURI(self, verb, query=None): | |
803 uriComponents = urlparse.urlparse(self.baseURI) | |
804 uri = urlparse.urlunparse((uriComponents[0], | |
805 uriComponents[1], | |
806 uriComponents[2] + verb, | |
807 '', | |
808 query and urllib.urlencode(query) or '', | |
809 '')) | |
810 return uri | |
811 | |
812 | |
813 def callback(self, data, headers): | |
814 pass | |
815 | |
816 | |
817 def ping(self): | |
818 f = getPageWithFactory(self._makeURI(''), | |
819 method='HEAD', | |
820 agent=self.agent) | |
821 return f.deferred | |
822 | |
823 | |
824 def create(self): | |
825 f = getPageWithFactory(self._makeURI('create'), | |
826 method='POST', | |
827 agent=self.agent) | |
828 return f.deferred.addCallback(simplejson.loads) | |
829 | |
830 | |
831 def delete(self, xmppURI, redirectURI=None): | |
832 query = {'uri': xmppURI} | |
833 | |
834 if redirectURI: | |
835 params = {'redirect_uri': redirectURI} | |
836 postdata = simplejson.dumps(params) | |
837 headers = {'Content-Type': MIME_JSON} | |
838 else: | |
839 postdata = None | |
840 headers = None | |
841 | |
842 f = getPageWithFactory(self._makeURI('delete', query), | |
843 method='POST', | |
844 postdata=postdata, | |
845 headers=headers, | |
846 agent=self.agent) | |
847 return f.deferred | |
848 | |
849 | |
850 def publish(self, entry, xmppURI=None): | |
851 query = xmppURI and {'uri': xmppURI} | |
852 | |
853 f = getPageWithFactory(self._makeURI('publish', query), | |
854 method='POST', | |
855 postdata=entry.toXml().encode('utf-8'), | |
856 headers={'Content-Type': MIME_ATOM_ENTRY}, | |
857 agent=self.agent) | |
858 return f.deferred.addCallback(simplejson.loads) | |
859 | |
860 | |
861 def listNodes(self): | |
862 f = getPageWithFactory(self._makeURI('list'), | |
863 method='GET', | |
864 agent=self.agent) | |
865 return f.deferred.addCallback(simplejson.loads) | |
866 | |
867 | |
868 def subscribe(self, xmppURI): | |
869 params = {'uri': xmppURI, | |
870 'callback': 'http://%s:%s/callback' % (self.callbackHost, | |
871 self.callbackPort)} | |
872 f = getPageWithFactory(self._makeURI('subscribe'), | |
873 method='POST', | |
874 postdata=simplejson.dumps(params), | |
875 headers={'Content-Type': MIME_JSON}, | |
876 agent=self.agent) | |
877 return f.deferred | |
878 | |
879 | |
880 def unsubscribe(self, xmppURI): | |
881 params = {'uri': xmppURI, | |
882 'callback': 'http://%s:%s/callback' % (self.callbackHost, | |
883 self.callbackPort)} | |
884 f = getPageWithFactory(self._makeURI('unsubscribe'), | |
885 method='POST', | |
886 postdata=simplejson.dumps(params), | |
887 headers={'Content-Type': MIME_JSON}, | |
888 agent=self.agent) | |
889 return f.deferred | |
890 | |
891 | |
892 def items(self, xmppURI, maxItems=None): | |
893 query = {'uri': xmppURI} | |
894 if maxItems is not None: | |
895 query['max_items'] = int(maxItems) | |
896 f = getPageWithFactory(self._makeURI('items', query), | |
897 method='GET', | |
898 agent=self.agent) | |
899 return f.deferred |