comparison idavoll/gateway.py @ 185:9038908dc2f5

Add gateway support for retrieving items from a node. Reorder gateway module.
author Ralph Meijer <ralphm@ik.nu>
date Tue, 15 Apr 2008 17:32:56 +0000
parents c21b986cff30
children 365fd3e4daf8
comparison
equal deleted inserted replaced
184:bd88658dbca3 185:9038908dc2f5
32 32
33 NS_ATOM = 'http://www.w3.org/2005/Atom' 33 NS_ATOM = 'http://www.w3.org/2005/Atom'
34 MIME_ATOM_ENTRY = 'application/atom+xml;type=entry' 34 MIME_ATOM_ENTRY = 'application/atom+xml;type=entry'
35 MIME_JSON = 'application/json' 35 MIME_JSON = 'application/json'
36 36
37 class RemoteSubscriptionService(service.Service, PubSubClient): 37 class XMPPURIParseError(ValueError):
38 38 """
39 def __init__(self, jid): 39 Raised when a given XMPP URI couldn't be properly parsed.
40 self.jid = jid 40 """
41 41
42 def startService(self): 42
43 self.callbacks = {} 43
44 44 def getServiceAndNode(uri):
45 def trapNotFound(self, failure): 45 """
46 failure.trap(StanzaError) 46 Given an XMPP URI, extract the publish subscribe service JID and node ID.
47 if not failure.value.condition == 'item-not-found': 47 """
48 raise failure 48
49 raise error.NodeNotFound 49 try:
50 50 scheme, rest = uri.split(':', 1)
51 def subscribeCallback(self, jid, nodeIdentifier, callback): 51 except ValueError:
52 52 raise XMPPURIParseError("No URI scheme component")
53 def newCallbackList(result): 53
54 callbackList = set() 54 if scheme != 'xmpp':
55 self.callbacks[jid, nodeIdentifier] = callbackList 55 raise XMPPURIParseError("Unknown URI scheme")
56 return callbackList 56
57 57 if rest.startswith("//"):
58 try: 58 raise XMPPURIParseError("Unexpected URI authority component")
59 callbackList = self.callbacks[jid, nodeIdentifier] 59
60 except KeyError: 60 try:
61 d = self.subscribe(jid, nodeIdentifier, self.jid) 61 entity, query = rest.split('?', 1)
62 d.addCallback(newCallbackList) 62 except ValueError:
63 else: 63 raise XMPPURIParseError("No URI query component")
64 d = defer.succeed(callbackList) 64
65 65 if not entity:
66 d.addCallback(lambda callbackList: callbackList.add(callback)) 66 raise XMPPURIParseError("Empty URI path component")
67 d.addErrback(self.trapNotFound) 67
68 return d 68 try:
69 69 jid = JID(entity)
70 def unsubscribeCallback(self, jid, nodeIdentifier, callback): 70 except Exception, e:
71 try: 71 raise XMPPURIParseError("Invalid JID: %s" % e.message)
72 callbackList = self.callbacks[jid, nodeIdentifier] 72
73 callbackList.remove(callback) 73 params = cgi.parse_qs(query)
74 except KeyError: 74
75 return defer.fail(error.NotSubscribed()) 75 try:
76 76 nodeIdentifier = params['node'][0]
77 if not callbackList: 77 except (KeyError, ValueError):
78 self.unsubscribe(jid, nodeIdentifier, self.jid) 78 raise XMPPURIParseError("No node in query component of URI")
79 79
80 return defer.succeed(None) 80 return jid, nodeIdentifier
81 81
82 def itemsReceived(self, recipient, service, nodeIdentifier, items):
83 """
84 Fire up HTTP client to do callback
85 """
86
87 # Collect atom entries
88 atomEntries = []
89 for item in items:
90 # ignore non-items (i.e. retractions)
91 if item.name != 'item':
92 continue
93
94 atomEntry = None
95 for element in item.elements():
96 # extract the first element that is an atom entry
97 if element.uri == NS_ATOM and element.name == 'entry':
98 atomEntry = element
99 break
100
101 if atomEntry:
102 atomEntries.append(atomEntry)
103
104 # Don't notify if there are no atom entries
105 if not atomEntries:
106 return
107
108 if len(atomEntries) == 1:
109 contentType = 'application/atom+xml;type=entry'
110 payload = atomEntries[0]
111 else:
112 contentType = 'application/atom+xml;type=feed'
113 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
114 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
115
116 # Collect the received entries in a feed
117 payload = domish.Element((NS_ATOM, 'feed'))
118 payload.addElement('title', content='Received item collection')
119 payload.addElement('id', content=nodeURI)
120 payload.addElement('updated', content=now)
121 for atomEntry in atomEntries:
122 payload.addChild(atomEntry)
123
124 self.callCallbacks(recipient, service, nodeIdentifier, payload,
125 contentType)
126
127 def deleteReceived(self, recipient, service, nodeIdentifier):
128 """
129 Fire up HTTP client to do callback
130 """
131
132 self.callCallbacks(recipient, service, nodeIdentifier,
133 eventType='DELETED')
134
135 def callCallbacks(self, recipient, service, nodeIdentifier,
136 payload=None, contentType=None, eventType=None):
137 try:
138 callbacks = self.callbacks[service, nodeIdentifier]
139 except KeyError:
140 return
141
142 postdata = None
143 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
144 headers = {'Referer': nodeURI.encode('utf-8'),
145 'PubSub-Service': service.full().encode('utf-8')}
146
147 if payload:
148 postdata = payload.toXml().encode('utf-8')
149 if contentType:
150 headers['Content-Type'] = "%s;charset=utf-8" % contentType
151
152 if eventType:
153 headers['Event'] = eventType
154
155 def postNotification(callbackURI):
156 d = client.getPage(str(callbackURI),
157 method='POST',
158 postdata=postdata,
159 headers=headers)
160 d.addErrback(log.err)
161
162 for callbackURI in callbacks:
163 reactor.callLater(0, postNotification, callbackURI)
164 82
165 83
166 class WebStreamParser(object): 84 class WebStreamParser(object):
167 def __init__(self): 85 def __init__(self):
168 self.elementStream = domish.elementStream() 86 self.elementStream = domish.elementStream()
169 self.elementStream.DocumentStartEvent = self.docStart 87 self.elementStream.DocumentStartEvent = self.docStart
170 self.elementStream.ElementEvent = self.elem 88 self.elementStream.ElementEvent = self.elem
171 self.elementStream.DocumentEndEvent = self.docEnd 89 self.elementStream.DocumentEndEvent = self.docEnd
172 self.done = False 90 self.done = False
173 91
92
174 def docStart(self, elem): 93 def docStart(self, elem):
175 self.document = elem 94 self.document = elem
176 95
96
177 def elem(self, elem): 97 def elem(self, elem):
178 self.document.addChild(elem) 98 self.document.addChild(elem)
179 99
100
180 def docEnd(self): 101 def docEnd(self):
181 self.done = True 102 self.done = True
103
182 104
183 def parse(self, stream): 105 def parse(self, stream):
184 def endOfStream(result): 106 def endOfStream(result):
185 if not self.done: 107 if not self.done:
186 raise Exception("No more stuff?") 108 raise Exception("No more stuff?")
190 d = readStream(stream, self.elementStream.parse) 112 d = readStream(stream, self.elementStream.parse)
191 d.addCallback(endOfStream) 113 d.addCallback(endOfStream)
192 return d 114 return d
193 115
194 116
195 class XMPPURIParseError(ValueError):
196 """
197 Raised when a given XMPP URI couldn't be properly parsed.
198 """
199
200
201 def getServiceAndNode(uri):
202 """
203 Given an XMPP URI, extract the publish subscribe service JID and node ID.
204 """
205
206 try:
207 scheme, rest = uri.split(':', 1)
208 except ValueError:
209 raise XMPPURIParseError("No URI scheme component")
210
211 if scheme != 'xmpp':
212 raise XMPPURIParseError("Unknown URI scheme")
213
214 if rest.startswith("//"):
215 raise XMPPURIParseError("Unexpected URI authority component")
216
217 try:
218 entity, query = rest.split('?', 1)
219 except ValueError:
220 raise XMPPURIParseError("No URI query component")
221
222 if not entity:
223 raise XMPPURIParseError("Empty URI path component")
224
225 try:
226 jid = JID(entity)
227 except Exception, e:
228 raise XMPPURIParseError("Invalid JID: %s" % e.message)
229
230 params = cgi.parse_qs(query)
231
232 try:
233 nodeIdentifier = params['node'][0]
234 except (KeyError, ValueError):
235 raise XMPPURIParseError("No node in query component of URI")
236
237 return jid, nodeIdentifier
238
239 117
240 class CreateResource(resource.Resource): 118 class CreateResource(resource.Resource):
241 """ 119 """
242 A resource to create a publish-subscribe node. 120 A resource to create a publish-subscribe node.
243 """ 121 """
244 def __init__(self, backend, serviceJID, owner): 122 def __init__(self, backend, serviceJID, owner):
245 self.backend = backend 123 self.backend = backend
246 self.serviceJID = serviceJID 124 self.serviceJID = serviceJID
247 self.owner = owner 125 self.owner = owner
248 126
127
249 http_GET = None 128 http_GET = None
129
250 130
251 def http_POST(self, request): 131 def http_POST(self, request):
252 """ 132 """
253 Respond to a POST request to create a new node. 133 Respond to a POST request to create a new node.
254 """ 134 """
261 d = self.backend.create_node(None, self.owner) 141 d = self.backend.create_node(None, self.owner)
262 d.addCallback(toResponse) 142 d.addCallback(toResponse)
263 return d 143 return d
264 144
265 145
146
266 class DeleteResource(resource.Resource): 147 class DeleteResource(resource.Resource):
267 """ 148 """
268 A resource to create a publish-subscribe node. 149 A resource to create a publish-subscribe node.
269 """ 150 """
270 def __init__(self, backend, serviceJID, owner): 151 def __init__(self, backend, serviceJID, owner):
271 self.backend = backend 152 self.backend = backend
272 self.serviceJID = serviceJID 153 self.serviceJID = serviceJID
273 self.owner = owner 154 self.owner = owner
274 155
156
275 http_GET = None 157 http_GET = None
158
276 159
277 def http_POST(self, request): 160 def http_POST(self, request):
278 """ 161 """
279 Respond to a POST request to create a new node. 162 Respond to a POST request to create a new node.
280 """ 163 """
306 d.addErrback(trapNotFound) 189 d.addErrback(trapNotFound)
307 d.addErrback(trapXMPPURIParseError) 190 d.addErrback(trapXMPPURIParseError)
308 return d 191 return d
309 192
310 193
194
311 class PublishResource(resource.Resource): 195 class PublishResource(resource.Resource):
312 """ 196 """
313 A resource to publish to a publish-subscribe node. 197 A resource to publish to a publish-subscribe node.
314 """ 198 """
315 199
316 def __init__(self, backend, serviceJID, owner): 200 def __init__(self, backend, serviceJID, owner):
317 self.backend = backend 201 self.backend = backend
318 self.serviceJID = serviceJID 202 self.serviceJID = serviceJID
319 self.owner = owner 203 self.owner = owner
320 204
205
321 http_GET = None 206 http_GET = None
207
322 208
323 def checkMediaType(self, request): 209 def checkMediaType(self, request):
324 ctype = request.headers.getHeader('content-type') 210 ctype = request.headers.getHeader('content-type')
325 211
326 if not ctype: 212 if not ctype:
337 http.StatusResponse( 223 http.StatusResponse(
338 responsecode.UNSUPPORTED_MEDIA_TYPE, 224 responsecode.UNSUPPORTED_MEDIA_TYPE,
339 "Unsupported Media Type: %s" % 225 "Unsupported Media Type: %s" %
340 http_headers.generateContentType(ctype))) 226 http_headers.generateContentType(ctype)))
341 227
228
342 def parseXMLPayload(self, stream): 229 def parseXMLPayload(self, stream):
343 p = WebStreamParser() 230 p = WebStreamParser()
344 return p.parse(stream) 231 return p.parse(stream)
232
345 233
346 def http_POST(self, request): 234 def http_POST(self, request):
347 """ 235 """
348 Respond to a POST request to create a new item. 236 Respond to a POST request to create a new item.
349 """ 237 """
388 d.addErrback(trapNotFound) 276 d.addErrback(trapNotFound)
389 d.addErrback(trapXMPPURIParseError) 277 d.addErrback(trapXMPPURIParseError)
390 return d 278 return d
391 279
392 280
393 class SubscribeBaseResource(resource.Resource): 281
282 class ListResource(resource.Resource):
283 def __init__(self, service):
284 self.service = service
285
286
287 def render(self, request):
288 def responseFromNodes(nodeIdentifiers):
289 return http.Response(responsecode.OK,
290 stream=simplejson.dumps(nodeIdentifiers))
291
292 d = self.service.get_nodes()
293 d.addCallback(responseFromNodes)
294 return d
295
296
297
298 # Service for subscribing to remote XMPP Pubsub nodes and web resources
299
300 def extractAtomEntries(items):
301 """
302 Extract atom entries from a list of publish-subscribe items.
303
304 @param items: List of L{domish.Element}s that represent publish-subscribe
305 items.
306 @type items: C{list}
307 """
308
309 atomEntries = []
310
311 for item in items:
312 # ignore non-items (i.e. retractions)
313 if item.name != 'item':
314 continue
315
316 atomEntry = None
317 for element in item.elements():
318 # extract the first element that is an atom entry
319 if element.uri == NS_ATOM and element.name == 'entry':
320 atomEntry = element
321 break
322
323 if atomEntry:
324 atomEntries.append(atomEntry)
325
326 return atomEntries
327
328 def constructFeed(service, nodeIdentifier, entries, title):
329 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
330 now = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
331
332 # Collect the received entries in a feed
333 feed = domish.Element((NS_ATOM, 'feed'))
334 feed.addElement('title', content=title)
335 feed.addElement('id', content=nodeURI)
336 feed.addElement('updated', content=now)
337
338 for entry in entries:
339 feed.addChild(entry)
340
341 return feed
342
343 class RemoteSubscriptionService(service.Service, PubSubClient):
344 """
345 Service for subscribing to remote XMPP Publish-Subscribe nodes.
346
347 Subscriptions are created with a callback HTTP URI that is POSTed
348 to with the received items in notifications.
349 """
350
351 def __init__(self, jid):
352 self.jid = jid
353
354
355 def startService(self):
356 self.callbacks = {}
357
358
359 def trapNotFound(self, failure):
360 failure.trap(StanzaError)
361 if not failure.value.condition == 'item-not-found':
362 raise failure
363 raise error.NodeNotFound
364
365
366 def subscribeCallback(self, jid, nodeIdentifier, callback):
367
368 def newCallbackList(result):
369 callbackList = set()
370 self.callbacks[jid, nodeIdentifier] = callbackList
371 return callbackList
372
373 try:
374 callbackList = self.callbacks[jid, nodeIdentifier]
375 except KeyError:
376 d = self.subscribe(jid, nodeIdentifier, self.jid)
377 d.addCallback(newCallbackList)
378 else:
379 d = defer.succeed(callbackList)
380
381 d.addCallback(lambda callbackList: callbackList.add(callback))
382 d.addErrback(self.trapNotFound)
383 return d
384
385
386 def unsubscribeCallback(self, jid, nodeIdentifier, callback):
387 try:
388 callbackList = self.callbacks[jid, nodeIdentifier]
389 callbackList.remove(callback)
390 except KeyError:
391 return defer.fail(error.NotSubscribed())
392
393 if not callbackList:
394 self.unsubscribe(jid, nodeIdentifier, self.jid)
395
396 return defer.succeed(None)
397
398
399 def itemsReceived(self, recipient, service, nodeIdentifier, items):
400 """
401 Fire up HTTP client to do callback
402 """
403
404 atomEntries = extractAtomEntries(items)
405
406 # Don't notify if there are no atom entries
407 if not atomEntries:
408 return
409
410 if len(atomEntries) == 1:
411 contentType = 'application/atom+xml;type=entry'
412 payload = atomEntries[0]
413 else:
414 contentType = 'application/atom+xml;type=feed'
415 payload = constructFeed(service, nodeIdentifier, atomEntries,
416 title='Received item collection')
417
418 self.callCallbacks(recipient, service, nodeIdentifier, payload,
419 contentType)
420
421
422 def deleteReceived(self, recipient, service, nodeIdentifier):
423 """
424 Fire up HTTP client to do callback
425 """
426
427 self.callCallbacks(recipient, service, nodeIdentifier,
428 eventType='DELETED')
429
430
431 def callCallbacks(self, recipient, service, nodeIdentifier,
432 payload=None, contentType=None, eventType=None):
433 try:
434 callbacks = self.callbacks[service, nodeIdentifier]
435 except KeyError:
436 return
437
438 postdata = None
439 nodeURI = 'xmpp:%s?;node=%s' % (service.full(), nodeIdentifier)
440 headers = {'Referer': nodeURI.encode('utf-8'),
441 'PubSub-Service': service.full().encode('utf-8')}
442
443 if payload:
444 postdata = payload.toXml().encode('utf-8')
445 if contentType:
446 headers['Content-Type'] = "%s;charset=utf-8" % contentType
447
448 if eventType:
449 headers['Event'] = eventType
450
451 def postNotification(callbackURI):
452 d = client.getPage(str(callbackURI),
453 method='POST',
454 postdata=postdata,
455 headers=headers)
456 d.addErrback(log.err)
457
458 for callbackURI in callbacks:
459 reactor.callLater(0, postNotification, callbackURI)
460
461
462
463 class RemoteSubscribeBaseResource(resource.Resource):
394 """ 464 """
395 Base resource for remote pubsub node subscription and unsubscription. 465 Base resource for remote pubsub node subscription and unsubscription.
396 466
397 This resource accepts POST request with a JSON document that holds 467 This resource accepts POST request with a JSON document that holds
398 a dictionary with the keys C{uri} and C{callback} that respectively map 468 a dictionary with the keys C{uri} and C{callback} that respectively map
409 479
410 def __init__(self, service): 480 def __init__(self, service):
411 self.service = service 481 self.service = service
412 self.params = None 482 self.params = None
413 483
484
414 http_GET = None 485 http_GET = None
486
415 487
416 def http_POST(self, request): 488 def http_POST(self, request):
417 def trapNotFound(failure): 489 def trapNotFound(failure):
418 failure.trap(error.NodeNotFound) 490 failure.trap(error.NodeNotFound)
419 return http.StatusResponse(responsecode.NOT_FOUND, 491 return http.StatusResponse(responsecode.NOT_FOUND,
445 d.addErrback(trapNotFound) 517 d.addErrback(trapNotFound)
446 d.addErrback(trapXMPPURIParseError) 518 d.addErrback(trapXMPPURIParseError)
447 return d 519 return d
448 520
449 521
450 class SubscribeResource(SubscribeBaseResource): 522
523 class RemoteSubscribeResource(RemoteSubscribeBaseResource):
451 """ 524 """
452 Resource to subscribe to a remote publish-subscribe node. 525 Resource to subscribe to a remote publish-subscribe node.
453 526
454 The passed C{uri} is the XMPP URI of the node to subscribe to and the 527 The passed C{uri} is the XMPP URI of the node to subscribe to and the
455 C{callback} is the callback URI. Upon receiving notifications from the 528 C{callback} is the callback URI. Upon receiving notifications from the
456 node, a POST request will be perfomed on the callback URI. 529 node, a POST request will be perfomed on the callback URI.
457 """ 530 """
458 serviceMethod = 'subscribeCallback' 531 serviceMethod = 'subscribeCallback'
459 532
460 533
461 class UnsubscribeResource(SubscribeBaseResource): 534
535 class RemoteUnsubscribeResource(RemoteSubscribeBaseResource):
462 """ 536 """
463 Resource to unsubscribe from a remote publish-subscribe node. 537 Resource to unsubscribe from a remote publish-subscribe node.
464 538
465 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the 539 The passed C{uri} is the XMPP URI of the node to unsubscribe from and the
466 C{callback} is the callback URI that was registered for it. 540 C{callback} is the callback URI that was registered for it.
467 """ 541 """
468 serviceMethod = 'unsubscribeCallback' 542 serviceMethod = 'unsubscribeCallback'
469 543
470 544
471 class ListResource(resource.Resource): 545
546 class RemoteItemsResource(resource.Resource):
547 """
548 Resource for retrieving items from a remote pubsub node.
549 """
550
472 def __init__(self, service): 551 def __init__(self, service):
473 self.service = service 552 self.service = service
474 553
475 def render(self, request): 554 def render(self, request):
476 def responseFromNodes(nodeIdentifiers): 555 try:
477 return http.Response(responsecode.OK, 556 maxItems = int(request.args.get('max_items', [0])[0]) or None
478 stream=simplejson.dumps(nodeIdentifiers)) 557 except ValueError:
479 558 return http.StatusResponse(responsecode.BAD_REQUEST,
480 d = self.service.get_nodes() 559 "The argument max_items has an invalid value.")
481 d.addCallback(responseFromNodes) 560
482 return d 561 try:
483 562 jid, nodeIdentifier = getServiceAndNode(request.args['uri'][0])
563 except KeyError:
564 return http.StatusResponse(responsecode.BAD_REQUEST,
565 "No URI for the remote node provided.")
566 except XMPPURIParseError:
567 return http.StatusResponse(responsecode.BAD_REQUEST,
568 "Malformed XMPP URI: %s" % failure.value.message)
569
570
571 def respond(items):
572 """Create a feed out the retrieved items."""
573 contentType = http_headers.MimeType('application',
574 'atom+xml',
575 {'type': 'feed'})
576 atomEntries = extractAtomEntries(items)
577 feed = constructFeed(jid, nodeIdentifier, atomEntries,
578 "Retrieved item collection")
579 payload = feed.toXml().encode('utf-8')
580 return http.Response(responsecode.OK, stream=payload,
581 headers={'Content-Type': contentType})
582
583 def trapNotFound(failure):
584 failure.trap(StanzaError)
585 if not failure.value.condition == 'item-not-found':
586 raise failure
587 return http.StatusResponse(responsecode.NOT_FOUND,
588 "Node not found")
589
590 d = self.service.items(jid, nodeIdentifier)
591 d.addCallback(respond)
592 d.addErrback(trapNotFound)
593 return d
594
595
596
597 # Client side code to interact with a service as provided above
484 598
485 def getPageWithFactory(url, contextFactory=None, *args, **kwargs): 599 def getPageWithFactory(url, contextFactory=None, *args, **kwargs):
486 """Download a web page. 600 """Download a web page.
487 601
488 Download a page. Return the factory that holds a deferred, which will 602 Download a page. Return the factory that holds a deferred, which will
504 else: 618 else:
505 reactor.connectTCP(host, port, factory) 619 reactor.connectTCP(host, port, factory)
506 return factory 620 return factory
507 621
508 622
623
509 class CallbackResource(resource.Resource): 624 class CallbackResource(resource.Resource):
510 """ 625 """
511 Web resource for retrieving gateway notifications. 626 Web resource for retrieving gateway notifications.
512 """ 627 """
513 628
514 def __init__(self, callback): 629 def __init__(self, callback):
515 self.callback = callback 630 self.callback = callback
516 631
632
517 http_GET = None 633 http_GET = None
634
518 635
519 def http_POST(self, request): 636 def http_POST(self, request):
520 p = WebStreamParser() 637 p = WebStreamParser()
521 d = p.parse(request.stream) 638 d = p.parse(request.stream)
522 d.addCallback(self.callback, request.headers) 639 d.addCallback(self.callback, request.headers)
523 d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT)) 640 d.addCallback(lambda _: http.Response(responsecode.NO_CONTENT))
524 return d 641 return d
642
643
525 644
526 class GatewayClient(service.Service): 645 class GatewayClient(service.Service):
527 """ 646 """
528 Service that provides client access to the HTTP Gateway into Idavoll. 647 Service that provides client access to the HTTP Gateway into Idavoll.
529 """ 648 """
536 self.callbackPort = callbackPort or 8087 655 self.callbackPort = callbackPort or 8087
537 root = resource.Resource() 656 root = resource.Resource()
538 root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs)) 657 root.child_callback = CallbackResource(lambda *args, **kwargs: self.callback(*args, **kwargs))
539 self.site = server.Site(root) 658 self.site = server.Site(root)
540 659
660
541 def startService(self): 661 def startService(self):
542 self.port = reactor.listenTCP(self.callbackPort, 662 self.port = reactor.listenTCP(self.callbackPort,
543 channel.HTTPFactory(self.site)) 663 channel.HTTPFactory(self.site))
544 664
665
545 def stopService(self): 666 def stopService(self):
546 return self.port.stopListening() 667 return self.port.stopListening()
668
547 669
548 def _makeURI(self, verb, query=None): 670 def _makeURI(self, verb, query=None):
549 uriComponents = urlparse.urlparse(self.baseURI) 671 uriComponents = urlparse.urlparse(self.baseURI)
550 uri = urlparse.urlunparse((uriComponents[0], 672 uri = urlparse.urlunparse((uriComponents[0],
551 uriComponents[1], 673 uriComponents[1],
553 '', 675 '',
554 query and urllib.urlencode(query) or '', 676 query and urllib.urlencode(query) or '',
555 '')) 677 ''))
556 return uri 678 return uri
557 679
680
558 def callback(self, data, headers): 681 def callback(self, data, headers):
559 pass 682 pass
683
560 684
561 def create(self): 685 def create(self):
562 f = getPageWithFactory(self._makeURI('create'), 686 f = getPageWithFactory(self._makeURI('create'),
563 method='POST', 687 method='POST',
564 agent=self.agent) 688 agent=self.agent)
565 return f.deferred.addCallback(simplejson.loads) 689 return f.deferred.addCallback(simplejson.loads)
690
566 691
567 def publish(self, entry, xmppURI=None): 692 def publish(self, entry, xmppURI=None):
568 query = xmppURI and {'uri': xmppURI} 693 query = xmppURI and {'uri': xmppURI}
569 694
570 f = getPageWithFactory(self._makeURI('publish', query), 695 f = getPageWithFactory(self._makeURI('publish', query),
572 postdata=entry.toXml().encode('utf-8'), 697 postdata=entry.toXml().encode('utf-8'),
573 headers={'Content-Type': MIME_ATOM_ENTRY}, 698 headers={'Content-Type': MIME_ATOM_ENTRY},
574 agent=self.agent) 699 agent=self.agent)
575 return f.deferred.addCallback(simplejson.loads) 700 return f.deferred.addCallback(simplejson.loads)
576 701
702
577 def listNodes(self): 703 def listNodes(self):
578 f = getPageWithFactory(self._makeURI('list'), 704 f = getPageWithFactory(self._makeURI('list'),
579 method='GET', 705 method='GET',
580 agent=self.agent) 706 agent=self.agent)
581 return f.deferred.addCallback(simplejson.loads) 707 return f.deferred.addCallback(simplejson.loads)
708
582 709
583 def subscribe(self, xmppURI): 710 def subscribe(self, xmppURI):
584 params = {'uri': xmppURI, 711 params = {'uri': xmppURI,
585 'callback': 'http://%s:%s/callback' % (self.callbackHost, 712 'callback': 'http://%s:%s/callback' % (self.callbackHost,
586 self.callbackPort)} 713 self.callbackPort)}
588 method='POST', 715 method='POST',
589 postdata=simplejson.dumps(params), 716 postdata=simplejson.dumps(params),
590 headers={'Content-Type': MIME_JSON}, 717 headers={'Content-Type': MIME_JSON},
591 agent=self.agent) 718 agent=self.agent)
592 return f.deferred 719 return f.deferred
720
721
722 def items(self, xmppURI, maxItems=None):
723 query = {'uri': xmppURI}
724 if maxItems:
725 query['maxItems'] = int(maxItems)
726 f = getPageWithFactory(self._makeURI('items', query),
727 method='GET',
728 agent=self.agent)
729 return f.deferred