comparison sat_pubsub/gateway.py @ 232:923281d4c5bc

renamed idavoll directory to sat_pubsub
author Goffi <goffi@goffi.org>
date Thu, 17 May 2012 12:48:14 +0200
parents idavoll/gateway.py@55b45c7dccb4
children 564ae55219e1
comparison
equal deleted inserted replaced
231:d99047cd90f9 232:923281d4c5bc
1 # -*- test-case-name: idavoll.test.test_gateway -*-
2 #
3 # Copyright (c) 2003-2009 Ralph Meijer
4 # See LICENSE for details.
5
6 """
7 Web resources and client for interacting with pubsub services.
8 """
9
10 import cgi
11 from time import gmtime, strftime
12 import urllib
13 import urlparse
14
15 import simplejson
16
17 from twisted.application import service
18 from twisted.internet import defer, reactor
19 from twisted.python import log
20 from twisted.web import client
21 from twisted.web2 import http, http_headers, resource, responsecode
22 from twisted.web2 import channel, server
23 from twisted.web2.stream import readStream
24 from twisted.words.protocols.jabber.jid import JID
25 from twisted.words.protocols.jabber.error import StanzaError
26 from twisted.words.xish import domish
27
28 from wokkel.pubsub import Item
29 from wokkel.pubsub import PubSubClient
30
31 from idavoll import error
32
33 NS_ATOM = 'http://www.w3.org/2005/Atom'
34 MIME_ATOM_ENTRY = 'application/atom+xml;type=entry'
35 MIME_JSON = 'application/json'
36
37 class XMPPURIParseError(ValueError):
38 """
39 Raised when a given XMPP URI couldn't be properly parsed.
40 """
41
42
43
44 def getServiceAndNode(uri):
45 """
46 Given an XMPP URI, extract the publish subscribe service JID and node ID.
47 """
48
49 try:
50 scheme, rest = uri.split(':', 1)
51 except ValueError:
52 raise XMPPURIParseError("No URI scheme component")
53
54 if scheme != 'xmpp':
55 raise XMPPURIParseError("Unknown URI scheme")
56
57 if rest.startswith("//"):
58 raise XMPPURIParseError("Unexpected URI authority component")
59
60 try:
61 entity, query = rest.split('?', 1)
62 except ValueError:
63 raise XMPPURIParseError("No URI query component")
64
65 if not entity:
66 raise XMPPURIParseError("Empty URI path component")
67
68 try:
69 service = JID(entity)
70 except Exception, e:
71 raise XMPPURIParseError("Invalid JID: %s" % e)
72
73 params = cgi.parse_qs(query)
74
75 try:
76 nodeIdentifier = params['node'][0]
77 except (KeyError, ValueError):
78 nodeIdentifier = ''
79
80 return service, nodeIdentifier
81
82
83
84 def getXMPPURI(service, nodeIdentifier):
85 """
86 Construct an XMPP URI from a service JID and node identifier.
87 """
88 return "xmpp:%s?;node=%s" % (service.full(), nodeIdentifier or '')
89
90
91
92 class WebStreamParser(object):
93 def __init__(self):
94 self.elementStream = domish.elementStream()
95 self.elementStream.DocumentStartEvent = self.docStart
96 self.elementStream.ElementEvent = self.elem
97 self.elementStream.DocumentEndEvent = self.docEnd
98 self.done = False
99
100
101 def docStart(self, elem):
102 self.document = elem
103
104
105 def elem(self, elem):
106 self.document.addChild(elem)
107
108
109 def docEnd(self):
110 self.done = True
111
112
113 def parse(self, stream):
114 def endOfStream(result):
115 if not self.done:
116 raise Exception("No more stuff?")
117 else:
118 return self.document
119
120 d = readStream(stream, self.elementStream.parse)
121 d.addCallback(endOfStream)
122 return d
123
124
125
126 class CreateResource(resource.Resource):
127 """
128 A resource to create a publish-subscribe node.
129 """
130 def __init__(self, backend, serviceJID, owner):
131 self.backend = backend
132 self.serviceJID = serviceJID
133 self.owner = owner
134
135
136 http_GET = None
137
138
139 def http_POST(self, request):
140 """
141 Respond to a POST request to create a new node.
142 """
143
144 def toResponse(nodeIdentifier):
145 uri = getXMPPURI(self.serviceJID, nodeIdentifier)
146 stream = simplejson.dumps({'uri': uri})
147 contentType = http_headers.MimeType.fromString(MIME_JSON)
148 return http.Response(responsecode.OK, stream=stream,
149 headers={'Content-Type': contentType})
150 d = self.backend.createNode(None, self.owner)
151 d.addCallback(toResponse)
152 return d
153
154
155
156 class DeleteResource(resource.Resource):
157 """
158 A resource to create a publish-subscribe node.
159 """
160 def __init__(self, backend, serviceJID, owner):
161 self.backend = backend
162 self.serviceJID = serviceJID
163 self.owner = owner
164
165
166 http_GET = None
167
168
169 def http_POST(self, request):
170 """
171 Respond to a POST request to create a new node.
172 """
173
174 def gotStream(_):
175 if request.args.get('uri'):
176 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
177 return defer.succeed(nodeIdentifier)
178 else:
179 raise http.HTTPError(http.Response(responsecode.BAD_REQUEST,
180 "No URI given"))
181
182 def doDelete(nodeIdentifier, data):
183 if data:
184 params = simplejson.loads(''.join(data))
185 redirectURI = params.get('redirect_uri')
186 else:
187 redirectURI = None
188
189 return self.backend.deleteNode(nodeIdentifier, self.owner,
190 redirectURI)
191
192 def respond(result):
193 return http.Response(responsecode.NO_CONTENT)
194
195
196 def trapNotFound(failure):
197 failure.trap(error.NodeNotFound)
198 return http.StatusResponse(responsecode.NOT_FOUND,
199 "Node not found")
200
201 def trapXMPPURIParseError(failure):
202 failure.trap(XMPPURIParseError)
203 return http.StatusResponse(responsecode.BAD_REQUEST,
204 "Malformed XMPP URI: %s" % failure.value)
205
206 data = []
207 d = readStream(request.stream, data.append)
208 d.addCallback(gotStream)
209 d.addCallback(doDelete, data)
210 d.addCallback(respond)
211 d.addErrback(trapNotFound)
212 d.addErrback(trapXMPPURIParseError)
213 return d
214
215
216
217 class PublishResource(resource.Resource):
218 """
219 A resource to publish to a publish-subscribe node.
220 """
221
222 def __init__(self, backend, serviceJID, owner):
223 self.backend = backend
224 self.serviceJID = serviceJID
225 self.owner = owner
226
227
228 http_GET = None
229
230
231 def checkMediaType(self, request):
232 ctype = request.headers.getHeader('content-type')
233
234 if not ctype:
235 raise http.HTTPError(
236 http.StatusResponse(
237 responsecode.BAD_REQUEST,
238 "No specified Media Type"))
239
240 if (ctype.mediaType != 'application' or
241 ctype.mediaSubtype != 'atom+xml' or
242 ctype.params.get('type') != 'entry' or
243 ctype.params.get('charset', 'utf-8') != 'utf-8'):
244 raise http.HTTPError(
245 http.StatusResponse(
246 responsecode.UNSUPPORTED_MEDIA_TYPE,
247 "Unsupported Media Type: %s" %
248 http_headers.generateContentType(ctype)))
249
250
251 def parseXMLPayload(self, stream):
252 p = WebStreamParser()
253 return p.parse(stream)
254
255
256 def http_POST(self, request):
257 """
258 Respond to a POST request to create a new item.
259 """
260
261 def toResponse(nodeIdentifier):
262 uri = getXMPPURI(self.serviceJID, nodeIdentifier)
263 stream = simplejson.dumps({'uri': uri})
264 contentType = http_headers.MimeType.fromString(MIME_JSON)
265 return http.Response(responsecode.OK, stream=stream,
266 headers={'Content-Type': contentType})
267
268 def gotNode(nodeIdentifier, payload):
269 item = Item(id='current', payload=payload)
270 d = self.backend.publish(nodeIdentifier, [item], self.owner)
271 d.addCallback(lambda _: nodeIdentifier)
272 return d
273
274 def getNode():
275 if request.args.get('uri'):
276 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
277 return defer.succeed(nodeIdentifier)
278 else:
279 return self.backend.createNode(None, self.owner)
280
281 def doPublish(payload):
282 d = getNode()
283 d.addCallback(gotNode, payload)
284 return d
285
286 def trapNotFound(failure):
287 failure.trap(error.NodeNotFound)
288 return http.StatusResponse(responsecode.NOT_FOUND,
289 "Node not found")
290
291 def trapXMPPURIParseError(failure):
292 failure.trap(XMPPURIParseError)
293 return http.StatusResponse(responsecode.BAD_REQUEST,
294 "Malformed XMPP URI: %s" % failure.value)
295
296 self.checkMediaType(request)
297 d = self.parseXMLPayload(request.stream)
298 d.addCallback(doPublish)
299 d.addCallback(toResponse)
300 d.addErrback(trapNotFound)
301 d.addErrback(trapXMPPURIParseError)
302 return d
303
304
305
306 class ListResource(resource.Resource):
307 def __init__(self, service):
308 self.service = service
309
310
311 def render(self, request):
312 def responseFromNodes(nodeIdentifiers):
313 stream = simplejson.dumps(nodeIdentifiers)
314 contentType = http_headers.MimeType.fromString(MIME_JSON)
315 return http.Response(responsecode.OK, stream=stream,
316 headers={'Content-Type': contentType})
317
318 d = self.service.getNodes()
319 d.addCallback(responseFromNodes)
320 return d
321
322
323
324 # Service for subscribing to remote XMPP Pubsub nodes and web resources
325
326 def extractAtomEntries(items):
327 """
328 Extract atom entries from a list of publish-subscribe items.
329
330 @param items: List of L{domish.Element}s that represent publish-subscribe
331 items.
332 @type items: C{list}
333 """
334
335 atomEntries = []
336
337 for item in items:
338 # ignore non-items (i.e. retractions)
339 if item.name != 'item':
340 continue
341
342 atomEntry = None
343 for element in item.elements():
344 # extract the first element that is an atom entry
345 if element.uri == NS_ATOM and element.name == 'entry':
346 atomEntry = element
347 break
348
349 if atomEntry:
350 atomEntries.append(atomEntry)
351
352 return atomEntries
353
354
355
356 def constructFeed(service, nodeIdentifier, entries, title):
357 nodeURI = getXMPPURI(service, nodeIdentifier)
358 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
359
360 # Collect the received entries in a feed
361 feed = domish.Element((NS_ATOM, 'feed'))
362 feed.addElement('title', content=title)
363 feed.addElement('id', content=nodeURI)
364 feed.addElement('updated', content=now)
365
366 for entry in entries:
367 feed.addChild(entry)
368
369 return feed
370
371
372
373 class RemoteSubscriptionService(service.Service, PubSubClient):
374 """
375 Service for subscribing to remote XMPP Publish-Subscribe nodes.
376
377 Subscriptions are created with a callback HTTP URI that is POSTed
378 to with the received items in notifications.
379 """
380
381 def __init__(self, jid, storage):
382 self.jid = jid
383 self.storage = storage
384
385
386 def trapNotFound(self, failure):
387 failure.trap(StanzaError)
388
389 if failure.value.condition == 'item-not-found':
390 raise error.NodeNotFound()
391 else:
392 return failure
393
394
395 def subscribeCallback(self, jid, nodeIdentifier, callback):
396 """
397 Subscribe a callback URI.
398
399 This registers a callback URI to be called when a notification is
400 received for the given node.
401
402 If this is the first callback registered for this node, the gateway
403 will subscribe to the node. Otherwise, the most recently published item
404 for this node is retrieved and, if present, the newly registered
405 callback will be called with that item.
406 """
407
408 def callbackForLastItem(items):
409 atomEntries = extractAtomEntries(items)
410
411 if not atomEntries:
412 return
413
414 self._postTo([callback], jid, nodeIdentifier, atomEntries[0],
415 'application/atom+xml;type=entry')
416
417 def subscribeOrItems(hasCallbacks):
418 if hasCallbacks:
419 if not nodeIdentifier:
420 return None
421 d = self.items(jid, nodeIdentifier, 1)
422 d.addCallback(callbackForLastItem)
423 else:
424 d = self.subscribe(jid, nodeIdentifier, self.jid)
425
426 d.addErrback(self.trapNotFound)
427 return d
428
429 d = self.storage.hasCallbacks(jid, nodeIdentifier)
430 d.addCallback(subscribeOrItems)
431 d.addCallback(lambda _: self.storage.addCallback(jid, nodeIdentifier,
432 callback))
433 return d
434
435
436 def unsubscribeCallback(self, jid, nodeIdentifier, callback):
437 """
438 Unsubscribe a callback.
439
440 If this was the last registered callback for this node, the
441 gateway will unsubscribe from node.
442 """
443
444 def cb(last):
445 if last:
446 return self.unsubscribe(jid, nodeIdentifier, self.jid)
447
448 d = self.storage.removeCallback(jid, nodeIdentifier, callback)
449 d.addCallback(cb)
450 return d
451
452
453 def itemsReceived(self, event):
454 """
455 Fire up HTTP client to do callback
456 """
457
458 atomEntries = extractAtomEntries(event.items)
459 service = event.sender
460 nodeIdentifier = event.nodeIdentifier
461 headers = event.headers
462
463 # Don't notify if there are no atom entries
464 if not atomEntries:
465 return
466
467 if len(atomEntries) == 1:
468 contentType = 'application/atom+xml;type=entry'
469 payload = atomEntries[0]
470 else:
471 contentType = 'application/atom+xml;type=feed'
472 payload = constructFeed(service, nodeIdentifier, atomEntries,
473 title='Received item collection')
474
475 self.callCallbacks(service, nodeIdentifier, payload, contentType)
476
477 if 'Collection' in headers:
478 for collection in headers['Collection']:
479 nodeIdentifier = collection or ''
480 self.callCallbacks(service, nodeIdentifier, payload,
481 contentType)
482
483
484 def deleteReceived(self, event):
485 """
486 Fire up HTTP client to do callback
487 """
488
489 service = event.sender
490 nodeIdentifier = event.nodeIdentifier
491 redirectURI = event.redirectURI
492 self.callCallbacks(service, nodeIdentifier, eventType='DELETED',
493 redirectURI=redirectURI)
494
495
496 def _postTo(self, callbacks, service, nodeIdentifier,
497 payload=None, contentType=None, eventType=None,
498 redirectURI=None):
499
500 if not callbacks:
501 return
502
503 postdata = None
504 nodeURI = getXMPPURI(service, nodeIdentifier)
505 headers = {'Referer': nodeURI.encode('utf-8'),
506 'PubSub-Service': service.full().encode('utf-8')}
507
508 if payload:
509 postdata = payload.toXml().encode('utf-8')
510 if contentType:
511 headers['Content-Type'] = "%s;charset=utf-8" % contentType
512
513 if eventType:
514 headers['Event'] = eventType
515
516 if redirectURI:
517 headers['Link'] = '<%s>; rel=alternate' % (
518 redirectURI.encode('utf-8'),
519 )
520
521 def postNotification(callbackURI):
522 f = getPageWithFactory(str(callbackURI),
523 method='POST',
524 postdata=postdata,
525 headers=headers)
526 d = f.deferred
527 d.addErrback(log.err)
528
529 for callbackURI in callbacks:
530 reactor.callLater(0, postNotification, callbackURI)
531
532
533 def callCallbacks(self, service, nodeIdentifier,
534 payload=None, contentType=None, eventType=None,
535 redirectURI=None):
536
537 def eb(failure):
538 failure.trap(error.NoCallbacks)
539
540 # No callbacks were registered for this node. Unsubscribe?
541
542 d = self.storage.getCallbacks(service, nodeIdentifier)
543 d.addCallback(self._postTo, service, nodeIdentifier, payload,
544 contentType, eventType, redirectURI)
545 d.addErrback(eb)
546 d.addErrback(log.err)
547
548
549
550 class RemoteSubscribeBaseResource(resource.Resource):
551 """
552 Base resource for remote pubsub node subscription and unsubscription.
553
554 This resource accepts POST request with a JSON document that holds
555 a dictionary with the keys C{uri} and C{callback} that respectively map
556 to the XMPP URI of the publish-subscribe node and the callback URI.
557
558 This class should be inherited with L{serviceMethod} overridden.
559
560 @cvar serviceMethod: The name of the method to be called with
561 the JID of the pubsub service, the node identifier
562 and the callback URI as received in the HTTP POST
563 request to this resource.
564 """
565 serviceMethod = None
566 errorMap = {
567 error.NodeNotFound:
568 (responsecode.FORBIDDEN, "Node not found"),
569 error.NotSubscribed:
570 (responsecode.FORBIDDEN, "No such subscription found"),
571 error.SubscriptionExists:
572 (responsecode.FORBIDDEN, "Subscription already exists"),
573 }
574
575 def __init__(self, service):
576 self.service = service
577 self.params = None
578
579
580 http_GET = None
581
582
583 def http_POST(self, request):
584 def trapNotFound(failure):
585 err = failure.trap(*self.errorMap.keys())
586 code, msg = self.errorMap[err]
587 return http.StatusResponse(code, msg)
588
589 def respond(result):
590 return http.Response(responsecode.NO_CONTENT)
591
592 def gotRequest(result):
593 uri = self.params['uri']
594 callback = self.params['callback']
595
596 jid, nodeIdentifier = getServiceAndNode(uri)
597 method = getattr(self.service, self.serviceMethod)
598 d = method(jid, nodeIdentifier, callback)
599 return d
600
601 def storeParams(data):
602 self.params = simplejson.loads(data)
603
604 def trapXMPPURIParseError(failure):
605 failure.trap(XMPPURIParseError)
606 return http.StatusResponse(responsecode.BAD_REQUEST,
607 "Malformed XMPP URI: %s" % failure.value)
608
609 d = readStream(request.stream, storeParams)
610 d.addCallback(gotRequest)
611 d.addCallback(respond)
612 d.addErrback(trapNotFound)
613 d.addErrback(trapXMPPURIParseError)
614 return d
615
616
617
618 class RemoteSubscribeResource(RemoteSubscribeBaseResource):
619 """
620 Resource to subscribe to a remote publish-subscribe node.
621
622 The passed C{uri} is the XMPP URI of the node to subscribe to and the
623 C{callback} is the callback URI. Upon receiving notifications from the
624 node, a POST request will be perfomed on the callback URI.
625 """
626 serviceMethod = 'subscribeCallback'
627
628
629
630 class RemoteUnsubscribeResource(RemoteSubscribeBaseResource):
631 """
632 Resource to unsubscribe from a remote publish-subscribe node.
633
634 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the
635 C{callback} is the callback URI that was registered for it.
636 """
637 serviceMethod = 'unsubscribeCallback'
638
639
640
641 class RemoteItemsResource(resource.Resource):
642 """
643 Resource for retrieving items from a remote pubsub node.
644 """
645
646 def __init__(self, service):
647 self.service = service
648
649
650 def render(self, request):
651 try:
652 maxItems = int(request.args.get('max_items', [0])[0]) or None
653 except ValueError:
654 return http.StatusResponse(responsecode.BAD_REQUEST,
655 "The argument max_items has an invalid value.")
656
657 try:
658 uri = request.args['uri'][0]
659 except KeyError:
660 return http.StatusResponse(responsecode.BAD_REQUEST,
661 "No URI for the remote node provided.")
662
663 try:
664 jid, nodeIdentifier = getServiceAndNode(uri)
665 except XMPPURIParseError:
666 return http.StatusResponse(responsecode.BAD_REQUEST,
667 "Malformed XMPP URI: %s" % uri)
668
669 def respond(items):
670 """Create a feed out the retrieved items."""
671 contentType = http_headers.MimeType('application',
672 'atom+xml',
673 {'type': 'feed'})
674 atomEntries = extractAtomEntries(items)
675 feed = constructFeed(jid, nodeIdentifier, atomEntries,
676 "Retrieved item collection")
677 payload = feed.toXml().encode('utf-8')
678 return http.Response(responsecode.OK, stream=payload,
679 headers={'Content-Type': contentType})
680
681 def trapNotFound(failure):
682 failure.trap(StanzaError)
683 if not failure.value.condition == 'item-not-found':
684 raise failure
685 return http.StatusResponse(responsecode.NOT_FOUND,
686 "Node not found")
687
688 d = self.service.items(jid, nodeIdentifier, maxItems)
689 d.addCallback(respond)
690 d.addErrback(trapNotFound)
691 return d
692
693
694
695 # Client side code to interact with a service as provided above
696
697 def getPageWithFactory(url, contextFactory=None, *args, **kwargs):
698 """Download a web page.
699
700 Download a page. Return the factory that holds a deferred, which will
701 callback with a page (as a string) or errback with a description of the
702 error.
703
704 See HTTPClientFactory to see what extra args can be passed.
705 """
706
707 scheme, host, port, path = client._parse(url)
708 factory = client.HTTPClientFactory(url, *args, **kwargs)
709 factory.protocol.handleStatus_204 = lambda self: self.handleStatus_200()
710
711 if scheme == 'https':
712 from twisted.internet import ssl
713 if contextFactory is None:
714 contextFactory = ssl.ClientContextFactory()
715 reactor.connectSSL(host, port, factory, contextFactory)
716 else:
717 reactor.connectTCP(host, port, factory)
718 return factory
719
720
721
722 class CallbackResource(resource.Resource):
723 """
724 Web resource for retrieving gateway notifications.
725 """
726
727 def __init__(self, callback):
728 self.callback = callback
729
730
731 http_GET = None
732
733
734 def http_POST(self, request):
735 p = WebStreamParser()
736 if not request.headers.hasHeader('Event'):
737 d = p.parse(request.stream)
738 else:
739 d = defer.succeed(None)
740 d.addCallback(self.callback, request.headers)
741 d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT))
742 return d
743
744
745
746 class GatewayClient(service.Service):
747 """
748 Service that provides client access to the HTTP Gateway into Idavoll.
749 """
750
751 agent = "Idavoll HTTP Gateway Client"
752
753 def __init__(self, baseURI, callbackHost=None, callbackPort=None):
754 self.baseURI = baseURI
755 self.callbackHost = callbackHost or 'localhost'
756 self.callbackPort = callbackPort or 8087
757 root = resource.Resource()
758 root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs))
759 self.site = server.Site(root)
760
761
762 def startService(self):
763 self.port = reactor.listenTCP(self.callbackPort,
764 channel.HTTPFactory(self.site))
765
766
767 def stopService(self):
768 return self.port.stopListening()
769
770
771 def _makeURI(self, verb, query=None):
772 uriComponents = urlparse.urlparse(self.baseURI)
773 uri = urlparse.urlunparse((uriComponents[0],
774 uriComponents[1],
775 uriComponents[2] + verb,
776 '',
777 query and urllib.urlencode(query) or '',
778 ''))
779 return uri
780
781
782 def callback(self, data, headers):
783 pass
784
785
786 def ping(self):
787 f = getPageWithFactory(self._makeURI(''),
788 method='HEAD',
789 agent=self.agent)
790 return f.deferred
791
792
793 def create(self):
794 f = getPageWithFactory(self._makeURI('create'),
795 method='POST',
796 agent=self.agent)
797 return f.deferred.addCallback(simplejson.loads)
798
799
800 def delete(self, xmppURI, redirectURI=None):
801 query = {'uri': xmppURI}
802
803 if redirectURI:
804 params = {'redirect_uri': redirectURI}
805 postdata = simplejson.dumps(params)
806 headers = {'Content-Type': MIME_JSON}
807 else:
808 postdata = None
809 headers = None
810
811 f = getPageWithFactory(self._makeURI('delete', query),
812 method='POST',
813 postdata=postdata,
814 headers=headers,
815 agent=self.agent)
816 return f.deferred
817
818
819 def publish(self, entry, xmppURI=None):
820 query = xmppURI and {'uri': xmppURI}
821
822 f = getPageWithFactory(self._makeURI('publish', query),
823 method='POST',
824 postdata=entry.toXml().encode('utf-8'),
825 headers={'Content-Type': MIME_ATOM_ENTRY},
826 agent=self.agent)
827 return f.deferred.addCallback(simplejson.loads)
828
829
830 def listNodes(self):
831 f = getPageWithFactory(self._makeURI('list'),
832 method='GET',
833 agent=self.agent)
834 return f.deferred.addCallback(simplejson.loads)
835
836
837 def subscribe(self, xmppURI):
838 params = {'uri': xmppURI,
839 'callback': 'http://%s:%s/callback' % (self.callbackHost,
840 self.callbackPort)}
841 f = getPageWithFactory(self._makeURI('subscribe'),
842 method='POST',
843 postdata=simplejson.dumps(params),
844 headers={'Content-Type': MIME_JSON},
845 agent=self.agent)
846 return f.deferred
847
848
849 def unsubscribe(self, xmppURI):
850 params = {'uri': xmppURI,
851 'callback': 'http://%s:%s/callback' % (self.callbackHost,
852 self.callbackPort)}
853 f = getPageWithFactory(self._makeURI('unsubscribe'),
854 method='POST',
855 postdata=simplejson.dumps(params),
856 headers={'Content-Type': MIME_JSON},
857 agent=self.agent)
858 return f.deferred
859
860
861 def items(self, xmppURI, maxItems=None):
862 query = {'uri': xmppURI}
863 if maxItems:
864 query['max_items'] = int(maxItems)
865 f = getPageWithFactory(self._makeURI('items', query),
866 method='GET',
867 agent=self.agent)
868 return f.deferred