Mercurial > sat_tmp
comparison wokkel/pubsub.py @ 0:09e7c32a6a00
use sat.tmp.wokkel as a buffer module until the changes are integrated to wokkel
author | souliane <souliane@mailoo.org> |
---|---|
date | Mon, 15 Dec 2014 12:46:58 +0100 |
parents | |
children | dc3a3f454f39 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:09e7c32a6a00 |
---|---|
1 # -*- test-case-name: wokkel.test.test_pubsub -*- | |
2 # | |
3 # Copyright (c) Ralph Meijer. | |
4 # See LICENSE for details. | |
5 | |
6 """ | |
7 XMPP publish-subscribe protocol. | |
8 | |
9 This protocol is specified in | |
10 U{XEP-0060<http://xmpp.org/extensions/xep-0060.html>}. | |
11 """ | |
12 | |
13 from zope.interface import implements | |
14 | |
15 from twisted.internet import defer | |
16 from twisted.python import log | |
17 from twisted.words.protocols.jabber import jid, error | |
18 from twisted.words.xish import domish | |
19 | |
20 from wokkel import disco, data_form, generic, shim | |
21 from wokkel.compat import IQ | |
22 from wokkel.subprotocols import IQHandlerMixin, XMPPHandler | |
23 from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource | |
24 | |
25 # Iq get and set XPath queries | |
26 IQ_GET = '/iq[@type="get"]' | |
27 IQ_SET = '/iq[@type="set"]' | |
28 | |
29 # Publish-subscribe namespaces | |
30 NS_PUBSUB = 'http://jabber.org/protocol/pubsub' | |
31 NS_PUBSUB_EVENT = NS_PUBSUB + '#event' | |
32 NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' | |
33 NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" | |
34 NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config" | |
35 NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data" | |
36 NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options" | |
37 | |
38 # XPath to match pubsub requests | |
39 PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \ | |
40 'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \ | |
41 '@xmlns="' + NS_PUBSUB_OWNER + '"]' | |
42 | |
43 class SubscriptionPending(Exception): | |
44 """ | |
45 Raised when the requested subscription is pending acceptance. | |
46 """ | |
47 | |
48 | |
49 | |
50 class SubscriptionUnconfigured(Exception): | |
51 """ | |
52 Raised when the requested subscription needs to be configured before | |
53 becoming active. | |
54 """ | |
55 | |
56 | |
57 | |
58 class PubSubError(error.StanzaError): | |
59 """ | |
60 Exception with publish-subscribe specific condition. | |
61 """ | |
62 def __init__(self, condition, pubsubCondition, feature=None, text=None): | |
63 appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) | |
64 if feature: | |
65 appCondition['feature'] = feature | |
66 error.StanzaError.__init__(self, condition, | |
67 text=text, | |
68 appCondition=appCondition) | |
69 | |
70 | |
71 | |
72 class BadRequest(error.StanzaError): | |
73 """ | |
74 Bad request stanza error. | |
75 """ | |
76 def __init__(self, pubsubCondition=None, text=None): | |
77 if pubsubCondition: | |
78 appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) | |
79 else: | |
80 appCondition = None | |
81 error.StanzaError.__init__(self, 'bad-request', | |
82 text=text, | |
83 appCondition=appCondition) | |
84 | |
85 | |
86 | |
87 class Unsupported(PubSubError): | |
88 def __init__(self, feature, text=None): | |
89 self.feature = feature | |
90 PubSubError.__init__(self, 'feature-not-implemented', | |
91 'unsupported', | |
92 feature, | |
93 text) | |
94 | |
95 def __str__(self): | |
96 message = PubSubError.__str__(self) | |
97 message += ', feature %r' % self.feature | |
98 return message | |
99 | |
100 | |
101 class Subscription(object): | |
102 """ | |
103 A subscription to a node. | |
104 | |
105 @ivar nodeIdentifier: The identifier of the node subscribed to. The root | |
106 node is denoted by C{None}. | |
107 @type nodeIdentifier: C{unicode} | |
108 | |
109 @ivar subscriber: The subscribing entity. | |
110 @type subscriber: L{jid.JID} | |
111 | |
112 @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'}, | |
113 C{'unconfigured'}. | |
114 @type state: C{unicode} | |
115 | |
116 @ivar options: Optional list of subscription options. | |
117 @type options: C{dict} | |
118 | |
119 @ivar subscriptionIdentifier: Optional subscription identifier. | |
120 @type subscriptionIdentifier: C{unicode} | |
121 """ | |
122 | |
123 def __init__(self, nodeIdentifier, subscriber, state, options=None, | |
124 subscriptionIdentifier=None): | |
125 self.nodeIdentifier = nodeIdentifier | |
126 self.subscriber = subscriber | |
127 self.state = state | |
128 self.options = options or {} | |
129 self.subscriptionIdentifier = subscriptionIdentifier | |
130 | |
131 | |
132 @staticmethod | |
133 def fromElement(element): | |
134 return Subscription( | |
135 element.getAttribute('node'), | |
136 jid.JID(element.getAttribute('jid')), | |
137 element.getAttribute('subscription'), | |
138 subscriptionIdentifier=element.getAttribute('subid')) | |
139 | |
140 | |
141 def toElement(self, defaultUri=None): | |
142 """ | |
143 Return the DOM representation of this subscription. | |
144 | |
145 @rtype: L{domish.Element} | |
146 """ | |
147 element = domish.Element((defaultUri, 'subscription')) | |
148 if self.nodeIdentifier: | |
149 element['node'] = self.nodeIdentifier | |
150 element['jid'] = unicode(self.subscriber) | |
151 element['subscription'] = self.state | |
152 if self.subscriptionIdentifier: | |
153 element['subid'] = self.subscriptionIdentifier | |
154 return element | |
155 | |
156 | |
157 | |
158 class Item(domish.Element): | |
159 """ | |
160 Publish subscribe item. | |
161 | |
162 This behaves like an object providing L{domish.IElement}. | |
163 | |
164 Item payload can be added using C{addChild} or C{addRawXml}, or using the | |
165 C{payload} keyword argument to C{__init__}. | |
166 """ | |
167 | |
168 def __init__(self, id=None, payload=None): | |
169 """ | |
170 @param id: optional item identifier | |
171 @type id: C{unicode} | |
172 @param payload: optional item payload. Either as a domish element, or | |
173 as serialized XML. | |
174 @type payload: object providing L{domish.IElement} or C{unicode}. | |
175 """ | |
176 | |
177 domish.Element.__init__(self, (None, 'item')) | |
178 if id is not None: | |
179 self['id'] = id | |
180 if payload is not None: | |
181 if isinstance(payload, basestring): | |
182 self.addRawXml(payload) | |
183 else: | |
184 self.addChild(payload) | |
185 | |
186 | |
187 | |
188 class PubSubRequest(generic.Stanza): | |
189 """ | |
190 A publish-subscribe request. | |
191 | |
192 The set of instance variables used depends on the type of request. If | |
193 a variable is not applicable or not passed in the request, its value is | |
194 C{None}. | |
195 | |
196 @ivar verb: The type of publish-subscribe request. See C{_requestVerbMap}. | |
197 @type verb: C{str}. | |
198 | |
199 @ivar affiliations: Affiliations to be modified. | |
200 @type affiliations: C{set} | |
201 | |
202 @ivar items: The items to be published, as L{domish.Element}s. | |
203 @type items: C{list} | |
204 | |
205 @ivar itemIdentifiers: Identifiers of the items to be retrieved or | |
206 retracted. | |
207 @type itemIdentifiers: C{set} | |
208 | |
209 @ivar maxItems: Maximum number of items to retrieve. | |
210 @type maxItems: C{int}. | |
211 | |
212 @ivar nodeIdentifier: Identifier of the node the request is about. | |
213 @type nodeIdentifier: C{unicode} | |
214 | |
215 @ivar nodeType: The type of node that should be created, or for which the | |
216 configuration is retrieved. C{'leaf'} or C{'collection'}. | |
217 @type nodeType: C{str} | |
218 | |
219 @ivar options: Configurations options for nodes, subscriptions and publish | |
220 requests. | |
221 @type options: L{data_form.Form} | |
222 | |
223 @ivar subscriber: The subscribing entity. | |
224 @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
225 | |
226 @ivar subscriptionIdentifier: Identifier for a specific subscription. | |
227 @type subscriptionIdentifier: C{unicode} | |
228 | |
229 @ivar subscriptions: Subscriptions to be modified, as a set of | |
230 L{Subscription}. | |
231 @type subscriptions: C{set} | |
232 | |
233 @ivar affiliations: Affiliations to be modified, as a dictionary of entity | |
234 (L{JID<twisted.words.protocols.jabber.jid.JID>} to affiliation | |
235 (C{unicode}). | |
236 @type affiliations: C{dict} | |
237 """ | |
238 | |
239 verb = None | |
240 | |
241 affiliations = None | |
242 items = None | |
243 itemIdentifiers = None | |
244 maxItems = None | |
245 nodeIdentifier = None | |
246 nodeType = None | |
247 options = None | |
248 subscriber = None | |
249 subscriptionIdentifier = None | |
250 subscriptions = None | |
251 affiliations = None | |
252 | |
253 # Map request iq type and subelement name to request verb | |
254 _requestVerbMap = { | |
255 ('set', NS_PUBSUB, 'publish'): 'publish', | |
256 ('set', NS_PUBSUB, 'subscribe'): 'subscribe', | |
257 ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe', | |
258 ('get', NS_PUBSUB, 'options'): 'optionsGet', | |
259 ('set', NS_PUBSUB, 'options'): 'optionsSet', | |
260 ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions', | |
261 ('get', NS_PUBSUB, 'affiliations'): 'affiliations', | |
262 ('set', NS_PUBSUB, 'create'): 'create', | |
263 ('get', NS_PUBSUB_OWNER, 'default'): 'default', | |
264 ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet', | |
265 ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet', | |
266 ('get', NS_PUBSUB, 'items'): 'items', | |
267 ('set', NS_PUBSUB, 'retract'): 'retract', | |
268 ('set', NS_PUBSUB_OWNER, 'purge'): 'purge', | |
269 ('set', NS_PUBSUB_OWNER, 'delete'): 'delete', | |
270 ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet', | |
271 ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet', | |
272 ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet', | |
273 ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet', | |
274 } | |
275 | |
276 # Map request verb to request iq type and subelement name | |
277 _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems())) | |
278 | |
279 # Map request verb to parameter handler names | |
280 _parameters = { | |
281 'publish': ['node', 'items'], | |
282 'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'], | |
283 'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'], | |
284 'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'], | |
285 'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'], | |
286 'subscriptions': [], | |
287 'affiliations': [], | |
288 'create': ['nodeOrNone', 'configureOrNone'], | |
289 'default': ['default'], | |
290 'configureGet': ['nodeOrEmpty'], | |
291 'configureSet': ['nodeOrEmpty', 'configure'], | |
292 'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'], | |
293 'retract': ['node', 'itemIdentifiers'], | |
294 'purge': ['node'], | |
295 'delete': ['node'], | |
296 'affiliationsGet': ['nodeOrEmpty'], | |
297 'affiliationsSet': ['nodeOrEmpty', 'affiliations'], | |
298 'subscriptionsGet': ['nodeOrEmpty'], | |
299 'subscriptionsSet': [], | |
300 } | |
301 | |
302 def __init__(self, verb=None): | |
303 self.verb = verb | |
304 | |
305 | |
306 def _parse_node(self, verbElement): | |
307 """ | |
308 Parse the required node identifier out of the verbElement. | |
309 """ | |
310 try: | |
311 self.nodeIdentifier = verbElement["node"] | |
312 except KeyError: | |
313 raise BadRequest('nodeid-required') | |
314 | |
315 | |
316 def _render_node(self, verbElement): | |
317 """ | |
318 Render the required node identifier on the verbElement. | |
319 """ | |
320 if not self.nodeIdentifier: | |
321 raise Exception("Node identifier is required") | |
322 | |
323 verbElement['node'] = self.nodeIdentifier | |
324 | |
325 | |
326 def _parse_nodeOrEmpty(self, verbElement): | |
327 """ | |
328 Parse the node identifier out of the verbElement. May be empty. | |
329 """ | |
330 self.nodeIdentifier = verbElement.getAttribute("node", '') | |
331 | |
332 | |
333 def _render_nodeOrEmpty(self, verbElement): | |
334 """ | |
335 Render the node identifier on the verbElement. May be empty. | |
336 """ | |
337 if self.nodeIdentifier: | |
338 verbElement['node'] = self.nodeIdentifier | |
339 | |
340 | |
341 def _parse_nodeOrNone(self, verbElement): | |
342 """ | |
343 Parse the optional node identifier out of the verbElement. | |
344 """ | |
345 self.nodeIdentifier = verbElement.getAttribute("node") | |
346 | |
347 | |
348 def _render_nodeOrNone(self, verbElement): | |
349 """ | |
350 Render the optional node identifier on the verbElement. | |
351 """ | |
352 if self.nodeIdentifier: | |
353 verbElement['node'] = self.nodeIdentifier | |
354 | |
355 | |
356 def _parse_items(self, verbElement): | |
357 """ | |
358 Parse items out of the verbElement for publish requests. | |
359 """ | |
360 self.items = [] | |
361 for element in verbElement.elements(): | |
362 if element.uri == NS_PUBSUB and element.name == 'item': | |
363 self.items.append(element) | |
364 | |
365 | |
366 def _render_items(self, verbElement): | |
367 """ | |
368 Render items into the verbElement for publish requests. | |
369 """ | |
370 if self.items: | |
371 for item in self.items: | |
372 item.uri = NS_PUBSUB | |
373 verbElement.addChild(item) | |
374 | |
375 | |
376 def _parse_jid(self, verbElement): | |
377 """ | |
378 Parse subscriber out of the verbElement for un-/subscribe requests. | |
379 """ | |
380 try: | |
381 self.subscriber = jid.internJID(verbElement["jid"]) | |
382 except KeyError: | |
383 raise BadRequest('jid-required') | |
384 | |
385 | |
386 def _render_jid(self, verbElement): | |
387 """ | |
388 Render subscriber into the verbElement for un-/subscribe requests. | |
389 """ | |
390 verbElement['jid'] = self.subscriber.full() | |
391 | |
392 | |
393 def _parse_default(self, verbElement): | |
394 """ | |
395 Parse node type out of a request for the default node configuration. | |
396 """ | |
397 form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG) | |
398 if form is not None and form.formType == 'submit': | |
399 values = form.getValues() | |
400 self.nodeType = values.get('pubsub#node_type', 'leaf') | |
401 else: | |
402 self.nodeType = 'leaf' | |
403 | |
404 | |
405 def _parse_configure(self, verbElement): | |
406 """ | |
407 Parse options out of a request for setting the node configuration. | |
408 """ | |
409 form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG) | |
410 if form is not None: | |
411 if form.formType in ('submit', 'cancel'): | |
412 self.options = form | |
413 else: | |
414 raise BadRequest(text=u"Unexpected form type '%s'" % form.formType) | |
415 else: | |
416 raise BadRequest(text="Missing configuration form") | |
417 | |
418 | |
419 def _parse_configureOrNone(self, verbElement): | |
420 """ | |
421 Parse optional node configuration form in create request. | |
422 """ | |
423 for element in verbElement.parent.elements(): | |
424 if element.uri == NS_PUBSUB and element.name == 'configure': | |
425 form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG) | |
426 if form is not None: | |
427 if form.formType != 'submit': | |
428 raise BadRequest(text=u"Unexpected form type '%s'" % | |
429 form.formType) | |
430 else: | |
431 form = data_form.Form('submit', | |
432 formNamespace=NS_PUBSUB_NODE_CONFIG) | |
433 self.options = form | |
434 | |
435 | |
436 def _render_configureOrNone(self, verbElement): | |
437 """ | |
438 Render optional node configuration form in create request. | |
439 """ | |
440 if self.options is not None: | |
441 configure = verbElement.parent.addElement('configure') | |
442 configure.addChild(self.options.toElement()) | |
443 | |
444 | |
445 def _parse_itemIdentifiers(self, verbElement): | |
446 """ | |
447 Parse item identifiers out of items and retract requests. | |
448 """ | |
449 self.itemIdentifiers = [] | |
450 for element in verbElement.elements(): | |
451 if element.uri == NS_PUBSUB and element.name == 'item': | |
452 try: | |
453 self.itemIdentifiers.append(element["id"]) | |
454 except KeyError: | |
455 raise BadRequest() | |
456 | |
457 | |
458 def _render_itemIdentifiers(self, verbElement): | |
459 """ | |
460 Render item identifiers into items and retract requests. | |
461 """ | |
462 if self.itemIdentifiers: | |
463 for itemIdentifier in self.itemIdentifiers: | |
464 item = verbElement.addElement('item') | |
465 item['id'] = itemIdentifier | |
466 | |
467 | |
468 def _parse_maxItems(self, verbElement): | |
469 """ | |
470 Parse maximum items out of an items request. | |
471 """ | |
472 value = verbElement.getAttribute('max_items') | |
473 | |
474 if value: | |
475 try: | |
476 self.maxItems = int(value) | |
477 except ValueError: | |
478 raise BadRequest(text="Field max_items requires a positive " + | |
479 "integer value") | |
480 | |
481 | |
482 def _render_maxItems(self, verbElement): | |
483 """ | |
484 Render maximum items into an items request. | |
485 """ | |
486 if self.maxItems: | |
487 verbElement['max_items'] = unicode(self.maxItems) | |
488 | |
489 | |
490 def _parse_subidOrNone(self, verbElement): | |
491 """ | |
492 Parse subscription identifier out of a request. | |
493 """ | |
494 self.subscriptionIdentifier = verbElement.getAttribute("subid") | |
495 | |
496 | |
497 def _render_subidOrNone(self, verbElement): | |
498 """ | |
499 Render subscription identifier into a request. | |
500 """ | |
501 if self.subscriptionIdentifier: | |
502 verbElement['subid'] = self.subscriptionIdentifier | |
503 | |
504 | |
505 def _parse_options(self, verbElement): | |
506 """ | |
507 Parse options form out of a subscription options request. | |
508 """ | |
509 form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS) | |
510 if form is not None: | |
511 if form.formType in ('submit', 'cancel'): | |
512 self.options = form | |
513 else: | |
514 raise BadRequest(text=u"Unexpected form type '%s'" % form.formType) | |
515 else: | |
516 raise BadRequest(text="Missing options form") | |
517 | |
518 | |
519 | |
520 def _render_options(self, verbElement): | |
521 verbElement.addChild(self.options.toElement()) | |
522 | |
523 | |
524 def _parse_optionsWithSubscribe(self, verbElement): | |
525 for element in verbElement.parent.elements(): | |
526 if element.name == 'options' and element.uri == NS_PUBSUB: | |
527 form = data_form.findForm(element, | |
528 NS_PUBSUB_SUBSCRIBE_OPTIONS) | |
529 if form is not None: | |
530 if form.formType != 'submit': | |
531 raise BadRequest(text=u"Unexpected form type '%s'" % | |
532 form.formType) | |
533 else: | |
534 form = data_form.Form('submit', | |
535 formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) | |
536 self.options = form | |
537 | |
538 | |
539 def _render_optionsWithSubscribe(self, verbElement): | |
540 if self.options is not None: | |
541 optionsElement = verbElement.parent.addElement('options') | |
542 self._render_options(optionsElement) | |
543 | |
544 | |
545 def _parse_affiliations(self, verbElement): | |
546 self.affiliations = {} | |
547 for element in verbElement.elements(): | |
548 if (element.uri == NS_PUBSUB_OWNER and | |
549 element.name == 'affiliation'): | |
550 try: | |
551 entity = jid.internJID(element['jid']).userhostJID() | |
552 except KeyError: | |
553 raise BadRequest(text='Missing jid attribute') | |
554 | |
555 if entity in self.affiliations: | |
556 raise BadRequest(text='Multiple affiliations for an entity') | |
557 | |
558 try: | |
559 affiliation = element['affiliation'] | |
560 except KeyError: | |
561 raise BadRequest(text='Missing affiliation attribute') | |
562 | |
563 self.affiliations[entity] = affiliation | |
564 | |
565 | |
566 def parseElement(self, element): | |
567 """ | |
568 Parse the publish-subscribe verb and parameters out of a request. | |
569 """ | |
570 generic.Stanza.parseElement(self, element) | |
571 | |
572 verbs = [] | |
573 verbElements = [] | |
574 for child in element.pubsub.elements(): | |
575 key = (self.stanzaType, child.uri, child.name) | |
576 try: | |
577 verb = self._requestVerbMap[key] | |
578 except KeyError: | |
579 continue | |
580 | |
581 verbs.append(verb) | |
582 verbElements.append(child) | |
583 | |
584 if not verbs: | |
585 raise NotImplementedError() | |
586 | |
587 if len(verbs) > 1: | |
588 if 'optionsSet' in verbs and 'subscribe' in verbs: | |
589 self.verb = 'subscribe' | |
590 verbElement = verbElements[verbs.index('subscribe')] | |
591 else: | |
592 raise NotImplementedError() | |
593 else: | |
594 self.verb = verbs[0] | |
595 verbElement = verbElements[0] | |
596 | |
597 for parameter in self._parameters[self.verb]: | |
598 getattr(self, '_parse_%s' % parameter)(verbElement) | |
599 | |
600 | |
601 | |
602 def send(self, xs): | |
603 """ | |
604 Send this request to its recipient. | |
605 | |
606 This renders all of the relevant parameters for this specific | |
607 requests into an L{IQ}, and invoke its C{send} method. | |
608 This returns a deferred that fires upon reception of a response. See | |
609 L{IQ} for details. | |
610 | |
611 @param xs: The XML stream to send the request on. | |
612 @type xs: L{twisted.words.protocols.jabber.xmlstream.XmlStream} | |
613 @rtype: L{defer.Deferred}. | |
614 """ | |
615 | |
616 try: | |
617 (self.stanzaType, | |
618 childURI, | |
619 childName) = self._verbRequestMap[self.verb] | |
620 except KeyError: | |
621 raise NotImplementedError() | |
622 | |
623 iq = IQ(xs, self.stanzaType) | |
624 iq.addElement((childURI, 'pubsub')) | |
625 verbElement = iq.pubsub.addElement(childName) | |
626 | |
627 if self.sender: | |
628 iq['from'] = self.sender.full() | |
629 if self.recipient: | |
630 iq['to'] = self.recipient.full() | |
631 | |
632 for parameter in self._parameters[self.verb]: | |
633 getattr(self, '_render_%s' % parameter)(verbElement) | |
634 | |
635 return iq.send() | |
636 | |
637 | |
638 | |
639 class PubSubEvent(object): | |
640 """ | |
641 A publish subscribe event. | |
642 | |
643 @param sender: The entity from which the notification was received. | |
644 @type sender: L{jid.JID} | |
645 @param recipient: The entity to which the notification was sent. | |
646 @type recipient: L{wokkel.pubsub.ItemsEvent} | |
647 @param nodeIdentifier: Identifier of the node the event pertains to. | |
648 @type nodeIdentifier: C{unicode} | |
649 @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}. | |
650 @type headers: C{dict} | |
651 """ | |
652 | |
653 def __init__(self, sender, recipient, nodeIdentifier, headers): | |
654 self.sender = sender | |
655 self.recipient = recipient | |
656 self.nodeIdentifier = nodeIdentifier | |
657 self.headers = headers | |
658 | |
659 | |
660 | |
661 class ItemsEvent(PubSubEvent): | |
662 """ | |
663 A publish-subscribe event that signifies new, updated and retracted items. | |
664 | |
665 @param items: List of received items as domish elements. | |
666 @type items: C{list} of L{domish.Element} | |
667 """ | |
668 | |
669 def __init__(self, sender, recipient, nodeIdentifier, items, headers): | |
670 PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers) | |
671 self.items = items | |
672 | |
673 | |
674 | |
675 class DeleteEvent(PubSubEvent): | |
676 """ | |
677 A publish-subscribe event that signifies the deletion of a node. | |
678 """ | |
679 | |
680 redirectURI = None | |
681 | |
682 | |
683 | |
684 class PurgeEvent(PubSubEvent): | |
685 """ | |
686 A publish-subscribe event that signifies the purging of a node. | |
687 """ | |
688 | |
689 | |
690 | |
691 class PubSubClient(XMPPHandler): | |
692 """ | |
693 Publish subscribe client protocol. | |
694 """ | |
695 | |
696 implements(IPubSubClient) | |
697 | |
698 def connectionInitialized(self): | |
699 self.xmlstream.addObserver('/message/event[@xmlns="%s"]' % | |
700 NS_PUBSUB_EVENT, self._onEvent) | |
701 | |
702 | |
703 def _onEvent(self, message): | |
704 if message.getAttribute('type') == 'error': | |
705 return | |
706 | |
707 try: | |
708 sender = jid.JID(message["from"]) | |
709 recipient = jid.JID(message["to"]) | |
710 except KeyError: | |
711 return | |
712 | |
713 actionElement = None | |
714 for element in message.event.elements(): | |
715 if element.uri == NS_PUBSUB_EVENT: | |
716 actionElement = element | |
717 | |
718 if not actionElement: | |
719 return | |
720 | |
721 eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None) | |
722 | |
723 if eventHandler: | |
724 headers = shim.extractHeaders(message) | |
725 eventHandler(sender, recipient, actionElement, headers) | |
726 message.handled = True | |
727 | |
728 | |
729 def _onEvent_items(self, sender, recipient, action, headers): | |
730 nodeIdentifier = action["node"] | |
731 | |
732 items = [element for element in action.elements() | |
733 if element.name in ('item', 'retract')] | |
734 | |
735 event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers) | |
736 self.itemsReceived(event) | |
737 | |
738 | |
739 def _onEvent_delete(self, sender, recipient, action, headers): | |
740 nodeIdentifier = action["node"] | |
741 event = DeleteEvent(sender, recipient, nodeIdentifier, headers) | |
742 if action.redirect: | |
743 event.redirectURI = action.redirect.getAttribute('uri') | |
744 self.deleteReceived(event) | |
745 | |
746 | |
747 def _onEvent_purge(self, sender, recipient, action, headers): | |
748 nodeIdentifier = action["node"] | |
749 event = PurgeEvent(sender, recipient, nodeIdentifier, headers) | |
750 self.purgeReceived(event) | |
751 | |
752 | |
753 def itemsReceived(self, event): | |
754 pass | |
755 | |
756 | |
757 def deleteReceived(self, event): | |
758 pass | |
759 | |
760 | |
761 def purgeReceived(self, event): | |
762 pass | |
763 | |
764 | |
765 def createNode(self, service, nodeIdentifier=None, options=None, | |
766 sender=None): | |
767 """ | |
768 Create a publish subscribe node. | |
769 | |
770 @param service: The publish subscribe service to create the node at. | |
771 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
772 @param nodeIdentifier: Optional suggestion for the id of the node. | |
773 @type nodeIdentifier: C{unicode} | |
774 @param options: Optional node configuration options. | |
775 @type options: C{dict} | |
776 """ | |
777 request = PubSubRequest('create') | |
778 request.recipient = service | |
779 request.nodeIdentifier = nodeIdentifier | |
780 request.sender = sender | |
781 | |
782 if options: | |
783 form = data_form.Form(formType='submit', | |
784 formNamespace=NS_PUBSUB_NODE_CONFIG) | |
785 form.makeFields(options) | |
786 request.options = form | |
787 | |
788 def cb(iq): | |
789 try: | |
790 new_node = iq.pubsub.create["node"] | |
791 except AttributeError: | |
792 # the suggested node identifier was accepted | |
793 new_node = nodeIdentifier | |
794 return new_node | |
795 | |
796 d = request.send(self.xmlstream) | |
797 d.addCallback(cb) | |
798 return d | |
799 | |
800 | |
801 def deleteNode(self, service, nodeIdentifier, sender=None): | |
802 """ | |
803 Delete a publish subscribe node. | |
804 | |
805 @param service: The publish subscribe service to delete the node from. | |
806 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
807 @param nodeIdentifier: The identifier of the node. | |
808 @type nodeIdentifier: C{unicode} | |
809 """ | |
810 request = PubSubRequest('delete') | |
811 request.recipient = service | |
812 request.nodeIdentifier = nodeIdentifier | |
813 request.sender = sender | |
814 return request.send(self.xmlstream) | |
815 | |
816 | |
817 def subscribe(self, service, nodeIdentifier, subscriber, | |
818 options=None, sender=None): | |
819 """ | |
820 Subscribe to a publish subscribe node. | |
821 | |
822 @param service: The publish subscribe service that keeps the node. | |
823 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
824 | |
825 @param nodeIdentifier: The identifier of the node. | |
826 @type nodeIdentifier: C{unicode} | |
827 | |
828 @param subscriber: The entity to subscribe to the node. This entity | |
829 will get notifications of new published items. | |
830 @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
831 | |
832 @param options: Subscription options. | |
833 @type options: C{dict} | |
834 | |
835 @return: Deferred that fires with L{Subscription} or errbacks with | |
836 L{SubscriptionPending} or L{SubscriptionUnconfigured}. | |
837 @rtype: L{defer.Deferred} | |
838 """ | |
839 request = PubSubRequest('subscribe') | |
840 request.recipient = service | |
841 request.nodeIdentifier = nodeIdentifier | |
842 request.subscriber = subscriber | |
843 request.sender = sender | |
844 | |
845 if options: | |
846 form = data_form.Form(formType='submit', | |
847 formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) | |
848 form.makeFields(options) | |
849 request.options = form | |
850 | |
851 def cb(iq): | |
852 subscription = Subscription.fromElement(iq.pubsub.subscription) | |
853 | |
854 if subscription.state == 'pending': | |
855 raise SubscriptionPending() | |
856 elif subscription.state == 'unconfigured': | |
857 raise SubscriptionUnconfigured() | |
858 else: | |
859 # we assume subscription == 'subscribed' | |
860 # any other value would be invalid, but that should have | |
861 # yielded a stanza error. | |
862 return subscription | |
863 | |
864 d = request.send(self.xmlstream) | |
865 d.addCallback(cb) | |
866 return d | |
867 | |
868 | |
869 def unsubscribe(self, service, nodeIdentifier, subscriber, | |
870 subscriptionIdentifier=None, sender=None): | |
871 """ | |
872 Unsubscribe from a publish subscribe node. | |
873 | |
874 @param service: The publish subscribe service that keeps the node. | |
875 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
876 | |
877 @param nodeIdentifier: The identifier of the node. | |
878 @type nodeIdentifier: C{unicode} | |
879 | |
880 @param subscriber: The entity to unsubscribe from the node. | |
881 @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
882 | |
883 @param subscriptionIdentifier: Optional subscription identifier. | |
884 @type subscriptionIdentifier: C{unicode} | |
885 """ | |
886 request = PubSubRequest('unsubscribe') | |
887 request.recipient = service | |
888 request.nodeIdentifier = nodeIdentifier | |
889 request.subscriber = subscriber | |
890 request.subscriptionIdentifier = subscriptionIdentifier | |
891 request.sender = sender | |
892 return request.send(self.xmlstream) | |
893 | |
894 | |
895 def publish(self, service, nodeIdentifier, items=None, sender=None): | |
896 """ | |
897 Publish to a publish subscribe node. | |
898 | |
899 @param service: The publish subscribe service that keeps the node. | |
900 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
901 @param nodeIdentifier: The identifier of the node. | |
902 @type nodeIdentifier: C{unicode} | |
903 @param items: Optional list of L{Item}s to publish. | |
904 @type items: C{list} | |
905 """ | |
906 request = PubSubRequest('publish') | |
907 request.recipient = service | |
908 request.nodeIdentifier = nodeIdentifier | |
909 request.items = items | |
910 request.sender = sender | |
911 return request.send(self.xmlstream) | |
912 | |
913 | |
914 def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, | |
915 subscriptionIdentifier=None, sender=None): | |
916 """ | |
917 Retrieve previously published items from a publish subscribe node. | |
918 | |
919 @param service: The publish subscribe service that keeps the node. | |
920 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
921 | |
922 @param nodeIdentifier: The identifier of the node. | |
923 @type nodeIdentifier: C{unicode} | |
924 | |
925 @param maxItems: Optional limit on the number of retrieved items. | |
926 @type maxItems: C{int} | |
927 | |
928 @param itemIdentifiers: Identifiers of the items to be retrieved. | |
929 @type itemIdentifiers: C{set} | |
930 | |
931 @param subscriptionIdentifier: Optional subscription identifier. In | |
932 case the node has been subscribed to multiple times, this narrows | |
933 the results to the specific subscription. | |
934 @type subscriptionIdentifier: C{unicode} | |
935 """ | |
936 request = PubSubRequest('items') | |
937 request.recipient = service | |
938 request.nodeIdentifier = nodeIdentifier | |
939 if maxItems: | |
940 request.maxItems = str(int(maxItems)) | |
941 request.subscriptionIdentifier = subscriptionIdentifier | |
942 request.sender = sender | |
943 request.itemIdentifiers = itemIdentifiers | |
944 | |
945 def cb(iq): | |
946 items = [] | |
947 for element in iq.pubsub.items.elements(): | |
948 if element.uri == NS_PUBSUB and element.name == 'item': | |
949 items.append(element) | |
950 return items | |
951 | |
952 d = request.send(self.xmlstream) | |
953 d.addCallback(cb) | |
954 return d | |
955 | |
956 def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None): | |
957 """ | |
958 Retract items from a publish subscribe node. | |
959 | |
960 @param service: The publish subscribe service to delete the node from. | |
961 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
962 @param nodeIdentifier: The identifier of the node. | |
963 @type nodeIdentifier: C{unicode} | |
964 @param itemIdentifiers: Identifiers of the items to be retracted. | |
965 @type itemIdentifiers: C{set} | |
966 """ | |
967 request = PubSubRequest('retract') | |
968 request.recipient = service | |
969 request.nodeIdentifier = nodeIdentifier | |
970 request.itemIdentifiers = itemIdentifiers | |
971 request.sender = sender | |
972 return request.send(self.xmlstream) | |
973 | |
974 def getOptions(self, service, nodeIdentifier, subscriber, | |
975 subscriptionIdentifier=None, sender=None): | |
976 """ | |
977 Get subscription options. | |
978 | |
979 @param service: The publish subscribe service that keeps the node. | |
980 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
981 | |
982 @param nodeIdentifier: The identifier of the node. | |
983 @type nodeIdentifier: C{unicode} | |
984 | |
985 @param subscriber: The entity subscribed to the node. | |
986 @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
987 | |
988 @param subscriptionIdentifier: Optional subscription identifier. | |
989 @type subscriptionIdentifier: C{unicode} | |
990 | |
991 @rtype: L{data_form.Form} | |
992 """ | |
993 request = PubSubRequest('optionsGet') | |
994 request.recipient = service | |
995 request.nodeIdentifier = nodeIdentifier | |
996 request.subscriber = subscriber | |
997 request.subscriptionIdentifier = subscriptionIdentifier | |
998 request.sender = sender | |
999 | |
1000 def cb(iq): | |
1001 form = data_form.findForm(iq.pubsub.options, | |
1002 NS_PUBSUB_SUBSCRIBE_OPTIONS) | |
1003 form.typeCheck() | |
1004 return form | |
1005 | |
1006 d = request.send(self.xmlstream) | |
1007 d.addCallback(cb) | |
1008 return d | |
1009 | |
1010 | |
1011 def setOptions(self, service, nodeIdentifier, subscriber, | |
1012 options, subscriptionIdentifier=None, sender=None): | |
1013 """ | |
1014 Set subscription options. | |
1015 | |
1016 @param service: The publish subscribe service that keeps the node. | |
1017 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
1018 | |
1019 @param nodeIdentifier: The identifier of the node. | |
1020 @type nodeIdentifier: C{unicode} | |
1021 | |
1022 @param subscriber: The entity subscribed to the node. | |
1023 @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
1024 | |
1025 @param options: Subscription options. | |
1026 @type options: C{dict}. | |
1027 | |
1028 @param subscriptionIdentifier: Optional subscription identifier. | |
1029 @type subscriptionIdentifier: C{unicode} | |
1030 """ | |
1031 request = PubSubRequest('optionsSet') | |
1032 request.recipient = service | |
1033 request.nodeIdentifier = nodeIdentifier | |
1034 request.subscriber = subscriber | |
1035 request.subscriptionIdentifier = subscriptionIdentifier | |
1036 request.sender = sender | |
1037 | |
1038 form = data_form.Form(formType='submit', | |
1039 formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) | |
1040 form.makeFields(options) | |
1041 request.options = form | |
1042 | |
1043 d = request.send(self.xmlstream) | |
1044 return d | |
1045 | |
1046 | |
1047 | |
1048 class PubSubService(XMPPHandler, IQHandlerMixin): | |
1049 """ | |
1050 Protocol implementation for a XMPP Publish Subscribe Service. | |
1051 | |
1052 The word Service here is used as taken from the Publish Subscribe | |
1053 specification. It is the party responsible for keeping nodes and their | |
1054 subscriptions, and sending out notifications. | |
1055 | |
1056 Methods from the L{IPubSubService} interface that are called as a result | |
1057 of an XMPP request may raise exceptions. Alternatively the deferred | |
1058 returned by these methods may have their errback called. These are handled | |
1059 as follows: | |
1060 | |
1061 - If the exception is an instance of L{error.StanzaError}, an error | |
1062 response iq is returned. | |
1063 - Any other exception is reported using L{log.msg}. An error response | |
1064 with the condition C{internal-server-error} is returned. | |
1065 | |
1066 The default implementation of said methods raises an L{Unsupported} | |
1067 exception and are meant to be overridden. | |
1068 | |
1069 @ivar discoIdentity: Service discovery identity as a dictionary with | |
1070 keys C{'category'}, C{'type'} and C{'name'}. | |
1071 @ivar pubSubFeatures: List of supported publish-subscribe features for | |
1072 service discovery, as C{str}. | |
1073 @type pubSubFeatures: C{list} or C{None} | |
1074 """ | |
1075 | |
1076 implements(IPubSubService, disco.IDisco) | |
1077 | |
1078 iqHandlers = { | |
1079 '/*': '_onPubSubRequest', | |
1080 } | |
1081 | |
1082 _legacyHandlers = { | |
1083 'publish': ('publish', ['sender', 'recipient', | |
1084 'nodeIdentifier', 'items']), | |
1085 'subscribe': ('subscribe', ['sender', 'recipient', | |
1086 'nodeIdentifier', 'subscriber']), | |
1087 'unsubscribe': ('unsubscribe', ['sender', 'recipient', | |
1088 'nodeIdentifier', 'subscriber']), | |
1089 'subscriptions': ('subscriptions', ['sender', 'recipient']), | |
1090 'affiliations': ('affiliations', ['sender', 'recipient']), | |
1091 'create': ('create', ['sender', 'recipient', 'nodeIdentifier']), | |
1092 'getConfigurationOptions': ('getConfigurationOptions', []), | |
1093 'default': ('getDefaultConfiguration', | |
1094 ['sender', 'recipient', 'nodeType']), | |
1095 'configureGet': ('getConfiguration', ['sender', 'recipient', | |
1096 'nodeIdentifier']), | |
1097 'configureSet': ('setConfiguration', ['sender', 'recipient', | |
1098 'nodeIdentifier', 'options']), | |
1099 'items': ('items', ['sender', 'recipient', 'nodeIdentifier', | |
1100 'maxItems', 'itemIdentifiers']), | |
1101 'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier', | |
1102 'itemIdentifiers']), | |
1103 'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']), | |
1104 'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']), | |
1105 } | |
1106 | |
1107 _request_class = PubSubRequest | |
1108 | |
1109 hideNodes = False | |
1110 | |
1111 def __init__(self, resource=None): | |
1112 self.resource = resource | |
1113 self.discoIdentity = {'category': 'pubsub', | |
1114 'type': 'service', | |
1115 'name': 'Generic Publish-Subscribe Service'} | |
1116 | |
1117 self.pubSubFeatures = [] | |
1118 | |
1119 | |
1120 def connectionMade(self): | |
1121 self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest) | |
1122 | |
1123 | |
1124 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | |
1125 def toInfo(nodeInfo): | |
1126 if not nodeInfo: | |
1127 return | |
1128 | |
1129 (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data'] | |
1130 info.append(disco.DiscoIdentity('pubsub', nodeType)) | |
1131 if metaData: | |
1132 form = data_form.Form(formType="result", | |
1133 formNamespace=NS_PUBSUB_META_DATA) | |
1134 form.addField( | |
1135 data_form.Field( | |
1136 var='pubsub#node_type', | |
1137 value=nodeType, | |
1138 label='The type of node (collection or leaf)' | |
1139 ) | |
1140 ) | |
1141 | |
1142 for metaDatum in metaData: | |
1143 form.addField(data_form.Field.fromDict(metaDatum)) | |
1144 | |
1145 info.append(form) | |
1146 | |
1147 return | |
1148 | |
1149 info = [] | |
1150 | |
1151 request = PubSubRequest('discoInfo') | |
1152 | |
1153 if self.resource is not None: | |
1154 resource = self.resource.locateResource(request) | |
1155 identity = resource.discoIdentity | |
1156 features = resource.features | |
1157 getInfo = resource.getInfo | |
1158 else: | |
1159 category = self.discoIdentity['category'] | |
1160 idType = self.discoIdentity['type'] | |
1161 name = self.discoIdentity['name'] | |
1162 identity = disco.DiscoIdentity(category, idType, name) | |
1163 features = self.pubSubFeatures | |
1164 getInfo = self.getNodeInfo | |
1165 | |
1166 if not nodeIdentifier: | |
1167 info.append(identity) | |
1168 info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS)) | |
1169 info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature)) | |
1170 for feature in features]) | |
1171 | |
1172 d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '') | |
1173 d.addCallback(toInfo) | |
1174 d.addErrback(log.err) | |
1175 d.addCallback(lambda _: info) | |
1176 return d | |
1177 | |
1178 | |
1179 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | |
1180 if self.hideNodes: | |
1181 d = defer.succeed([]) | |
1182 elif self.resource is not None: | |
1183 request = PubSubRequest('discoInfo') | |
1184 resource = self.resource.locateResource(request) | |
1185 d = resource.getNodes(requestor, target, nodeIdentifier) | |
1186 elif nodeIdentifier: | |
1187 d = self.getNodes(requestor, target) | |
1188 else: | |
1189 d = defer.succeed([]) | |
1190 | |
1191 d.addCallback(lambda nodes: [disco.DiscoItem(target, node) | |
1192 for node in nodes]) | |
1193 return d | |
1194 | |
1195 | |
1196 def _onPubSubRequest(self, iq): | |
1197 request = self._request_class.fromElement(iq) | |
1198 | |
1199 if self.resource is not None: | |
1200 resource = self.resource.locateResource(request) | |
1201 else: | |
1202 resource = self | |
1203 | |
1204 # Preprocess the request, knowing the handling resource | |
1205 try: | |
1206 preProcessor = getattr(self, '_preProcess_%s' % request.verb) | |
1207 except AttributeError: | |
1208 pass | |
1209 else: | |
1210 request = preProcessor(resource, request) | |
1211 if request is None: | |
1212 return defer.succeed(None) | |
1213 | |
1214 # Process the request itself, | |
1215 if resource is not self: | |
1216 try: | |
1217 handler = getattr(resource, request.verb) | |
1218 except AttributeError: | |
1219 text = "Request verb: %s" % request.verb | |
1220 return defer.fail(Unsupported('', text)) | |
1221 | |
1222 d = handler(request) | |
1223 else: | |
1224 try: | |
1225 handlerName, argNames = self._legacyHandlers[request.verb] | |
1226 except KeyError: | |
1227 text = "Request verb: %s" % request.verb | |
1228 return defer.fail(Unsupported('', text)) | |
1229 | |
1230 handler = getattr(self, handlerName) | |
1231 args = [getattr(request, arg) for arg in argNames] | |
1232 d = handler(*args) | |
1233 | |
1234 # If needed, translate the result into a response | |
1235 try: | |
1236 cb = getattr(self, '_toResponse_%s' % request.verb) | |
1237 except AttributeError: | |
1238 pass | |
1239 else: | |
1240 d.addCallback(cb, resource, request) | |
1241 | |
1242 return d | |
1243 | |
1244 | |
1245 def _toResponse_subscribe(self, result, resource, request): | |
1246 response = domish.Element((NS_PUBSUB, "pubsub")) | |
1247 response.addChild(result.toElement(NS_PUBSUB)) | |
1248 return response | |
1249 | |
1250 | |
1251 def _toResponse_subscriptions(self, result, resource, request): | |
1252 response = domish.Element((NS_PUBSUB, 'pubsub')) | |
1253 subscriptions = response.addElement('subscriptions') | |
1254 for subscription in result: | |
1255 subscriptions.addChild(subscription.toElement(NS_PUBSUB)) | |
1256 return response | |
1257 | |
1258 | |
1259 def _toResponse_affiliations(self, result, resource, request): | |
1260 response = domish.Element((NS_PUBSUB, 'pubsub')) | |
1261 affiliations = response.addElement('affiliations') | |
1262 | |
1263 for nodeIdentifier, affiliation in result: | |
1264 item = affiliations.addElement('affiliation') | |
1265 item['node'] = nodeIdentifier | |
1266 item['affiliation'] = affiliation | |
1267 | |
1268 return response | |
1269 | |
1270 | |
1271 def _toResponse_create(self, result, resource, request): | |
1272 if not request.nodeIdentifier or request.nodeIdentifier != result: | |
1273 response = domish.Element((NS_PUBSUB, 'pubsub')) | |
1274 create = response.addElement('create') | |
1275 create['node'] = result | |
1276 return response | |
1277 else: | |
1278 return None | |
1279 | |
1280 | |
1281 def _formFromConfiguration(self, resource, values): | |
1282 fieldDefs = resource.getConfigurationOptions() | |
1283 form = data_form.Form(formType="form", | |
1284 formNamespace=NS_PUBSUB_NODE_CONFIG) | |
1285 form.makeFields(values, fieldDefs) | |
1286 return form | |
1287 | |
1288 | |
1289 def _checkConfiguration(self, resource, form): | |
1290 fieldDefs = resource.getConfigurationOptions() | |
1291 form.typeCheck(fieldDefs, filterUnknown=True) | |
1292 | |
1293 | |
1294 def _preProcess_create(self, resource, request): | |
1295 if request.options: | |
1296 self._checkConfiguration(resource, request.options) | |
1297 return request | |
1298 | |
1299 | |
1300 def _preProcess_default(self, resource, request): | |
1301 if request.nodeType not in ('leaf', 'collection'): | |
1302 raise error.StanzaError('not-acceptable') | |
1303 else: | |
1304 return request | |
1305 | |
1306 | |
1307 def _toResponse_default(self, options, resource, request): | |
1308 response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) | |
1309 default = response.addElement("default") | |
1310 form = self._formFromConfiguration(resource, options) | |
1311 default.addChild(form.toElement()) | |
1312 return response | |
1313 | |
1314 | |
1315 def _toResponse_configureGet(self, options, resource, request): | |
1316 response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) | |
1317 configure = response.addElement("configure") | |
1318 form = self._formFromConfiguration(resource, options) | |
1319 configure.addChild(form.toElement()) | |
1320 | |
1321 if request.nodeIdentifier: | |
1322 configure["node"] = request.nodeIdentifier | |
1323 | |
1324 return response | |
1325 | |
1326 | |
1327 def _preProcess_configureSet(self, resource, request): | |
1328 if request.options.formType == 'cancel': | |
1329 return None | |
1330 else: | |
1331 self._checkConfiguration(resource, request.options) | |
1332 return request | |
1333 | |
1334 | |
1335 def _toResponse_items(self, result, resource, request): | |
1336 response = domish.Element((NS_PUBSUB, 'pubsub')) | |
1337 items = response.addElement('items') | |
1338 items["node"] = request.nodeIdentifier | |
1339 | |
1340 for item in result: | |
1341 if item.name == 'item': | |
1342 item.uri = NS_PUBSUB | |
1343 items.addChild(item) | |
1344 | |
1345 return response | |
1346 | |
1347 | |
1348 def _createNotification(self, eventType, service, nodeIdentifier, | |
1349 subscriber, subscriptions=None): | |
1350 headers = [] | |
1351 | |
1352 if subscriptions: | |
1353 for subscription in subscriptions: | |
1354 if nodeIdentifier != subscription.nodeIdentifier: | |
1355 headers.append(('Collection', subscription.nodeIdentifier)) | |
1356 | |
1357 message = domish.Element((None, "message")) | |
1358 message["from"] = service.full() | |
1359 message["to"] = subscriber.full() | |
1360 event = message.addElement((NS_PUBSUB_EVENT, "event")) | |
1361 | |
1362 element = event.addElement(eventType) | |
1363 element["node"] = nodeIdentifier | |
1364 | |
1365 if headers: | |
1366 message.addChild(shim.Headers(headers)) | |
1367 | |
1368 return message | |
1369 | |
1370 | |
1371 def _toResponse_affiliationsGet(self, result, resource, request): | |
1372 response = domish.Element((NS_PUBSUB_OWNER, 'pubsub')) | |
1373 affiliations = response.addElement('affiliations') | |
1374 | |
1375 if request.nodeIdentifier: | |
1376 affiliations['node'] = request.nodeIdentifier | |
1377 | |
1378 for entity, affiliation in result.iteritems(): | |
1379 item = affiliations.addElement('affiliation') | |
1380 item['jid'] = entity.full() | |
1381 item['affiliation'] = affiliation | |
1382 | |
1383 return response | |
1384 | |
1385 | |
1386 # public methods | |
1387 | |
1388 def notifyPublish(self, service, nodeIdentifier, notifications): | |
1389 for subscriber, subscriptions, items in notifications: | |
1390 message = self._createNotification('items', service, | |
1391 nodeIdentifier, subscriber, | |
1392 subscriptions) | |
1393 for item in items: | |
1394 item.uri = NS_PUBSUB_EVENT | |
1395 message.event.items.addChild(item) | |
1396 self.send(message) | |
1397 | |
1398 | |
1399 def notifyDelete(self, service, nodeIdentifier, subscribers, | |
1400 redirectURI=None): | |
1401 for subscriber in subscribers: | |
1402 message = self._createNotification('delete', service, | |
1403 nodeIdentifier, | |
1404 subscriber) | |
1405 if redirectURI: | |
1406 redirect = message.event.delete.addElement('redirect') | |
1407 redirect['uri'] = redirectURI | |
1408 self.send(message) | |
1409 | |
1410 | |
1411 def getNodeInfo(self, requestor, service, nodeIdentifier): | |
1412 return None | |
1413 | |
1414 | |
1415 def getNodes(self, requestor, service): | |
1416 return [] | |
1417 | |
1418 | |
1419 def publish(self, requestor, service, nodeIdentifier, items): | |
1420 raise Unsupported('publish') | |
1421 | |
1422 | |
1423 def subscribe(self, requestor, service, nodeIdentifier, subscriber): | |
1424 raise Unsupported('subscribe') | |
1425 | |
1426 | |
1427 def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): | |
1428 raise Unsupported('subscribe') | |
1429 | |
1430 | |
1431 def subscriptions(self, requestor, service): | |
1432 raise Unsupported('retrieve-subscriptions') | |
1433 | |
1434 | |
1435 def affiliations(self, requestor, service): | |
1436 raise Unsupported('retrieve-affiliations') | |
1437 | |
1438 | |
1439 def create(self, requestor, service, nodeIdentifier): | |
1440 raise Unsupported('create-nodes') | |
1441 | |
1442 | |
1443 def getConfigurationOptions(self): | |
1444 return {} | |
1445 | |
1446 | |
1447 def getDefaultConfiguration(self, requestor, service, nodeType): | |
1448 raise Unsupported('retrieve-default') | |
1449 | |
1450 | |
1451 def getConfiguration(self, requestor, service, nodeIdentifier): | |
1452 raise Unsupported('config-node') | |
1453 | |
1454 | |
1455 def setConfiguration(self, requestor, service, nodeIdentifier, options): | |
1456 raise Unsupported('config-node') | |
1457 | |
1458 | |
1459 def items(self, requestor, service, nodeIdentifier, maxItems, | |
1460 itemIdentifiers): | |
1461 raise Unsupported('retrieve-items') | |
1462 | |
1463 | |
1464 def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): | |
1465 raise Unsupported('retract-items') | |
1466 | |
1467 | |
1468 def purge(self, requestor, service, nodeIdentifier): | |
1469 raise Unsupported('purge-nodes') | |
1470 | |
1471 | |
1472 def delete(self, requestor, service, nodeIdentifier): | |
1473 raise Unsupported('delete-nodes') | |
1474 | |
1475 | |
1476 | |
1477 class PubSubResource(object): | |
1478 | |
1479 implements(IPubSubResource) | |
1480 | |
1481 features = [] | |
1482 discoIdentity = disco.DiscoIdentity('pubsub', | |
1483 'service', | |
1484 'Publish-Subscribe Service') | |
1485 | |
1486 | |
1487 def locateResource(self, request): | |
1488 return self | |
1489 | |
1490 | |
1491 def getInfo(self, requestor, service, nodeIdentifier): | |
1492 return defer.succeed(None) | |
1493 | |
1494 | |
1495 def getNodes(self, requestor, service, nodeIdentifier): | |
1496 return defer.succeed([]) | |
1497 | |
1498 | |
1499 def getConfigurationOptions(self): | |
1500 return {} | |
1501 | |
1502 | |
1503 def publish(self, request): | |
1504 return defer.fail(Unsupported('publish')) | |
1505 | |
1506 | |
1507 def subscribe(self, request): | |
1508 return defer.fail(Unsupported('subscribe')) | |
1509 | |
1510 | |
1511 def unsubscribe(self, request): | |
1512 return defer.fail(Unsupported('subscribe')) | |
1513 | |
1514 | |
1515 def subscriptions(self, request): | |
1516 return defer.fail(Unsupported('retrieve-subscriptions')) | |
1517 | |
1518 | |
1519 def affiliations(self, request): | |
1520 return defer.fail(Unsupported('retrieve-affiliations')) | |
1521 | |
1522 | |
1523 def create(self, request): | |
1524 return defer.fail(Unsupported('create-nodes')) | |
1525 | |
1526 | |
1527 def default(self, request): | |
1528 return defer.fail(Unsupported('retrieve-default')) | |
1529 | |
1530 | |
1531 def configureGet(self, request): | |
1532 return defer.fail(Unsupported('config-node')) | |
1533 | |
1534 | |
1535 def configureSet(self, request): | |
1536 return defer.fail(Unsupported('config-node')) | |
1537 | |
1538 | |
1539 def items(self, request): | |
1540 return defer.fail(Unsupported('retrieve-items')) | |
1541 | |
1542 | |
1543 def retract(self, request): | |
1544 return defer.fail(Unsupported('retract-items')) | |
1545 | |
1546 | |
1547 def purge(self, request): | |
1548 return defer.fail(Unsupported('purge-nodes')) | |
1549 | |
1550 | |
1551 def delete(self, request): | |
1552 return defer.fail(Unsupported('delete-nodes')) | |
1553 | |
1554 | |
1555 def affiliationsGet(self, request): | |
1556 return defer.fail(Unsupported('modify-affiliations')) | |
1557 | |
1558 | |
1559 def affiliationsSet(self, request): | |
1560 return defer.fail(Unsupported('modify-affiliations')) | |
1561 | |
1562 | |
1563 def subscriptionsGet(self, request): | |
1564 return defer.fail(Unsupported('manage-subscriptions')) | |
1565 | |
1566 | |
1567 def subscriptionsSet(self, request): | |
1568 return defer.fail(Unsupported('manage-subscriptions')) |