Mercurial > libervia-pubsub
comparison idavoll/backend.py @ 206:274a45d2a5ab
Implement root collection that includes all leaf nodes.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 04 Aug 2008 13:47:10 +0000 |
parents | e6b710bf2b24 |
children | 7f3ffb7a1a9e |
comparison
equal
deleted
inserted
replaced
205:e6b710bf2b24 | 206:274a45d2a5ab |
---|---|
16 import uuid | 16 import uuid |
17 | 17 |
18 from zope.interface import implements | 18 from zope.interface import implements |
19 | 19 |
20 from twisted.application import service | 20 from twisted.application import service |
21 from twisted.python import components | 21 from twisted.python import components, log |
22 from twisted.internet import defer, reactor | 22 from twisted.internet import defer, reactor |
23 from twisted.words.protocols.jabber.error import StanzaError | 23 from twisted.words.protocols.jabber.error import StanzaError |
24 from twisted.words.xish import domish, utility | 24 from twisted.words.xish import domish, utility |
25 | 25 |
26 from wokkel.iwokkel import IDisco, IPubSubService | 26 from wokkel.iwokkel import IDisco, IPubSubService |
27 from wokkel.pubsub import PubSubService, PubSubError | 27 from wokkel.pubsub import PubSubService, PubSubError |
28 | 28 |
29 from idavoll import error, iidavoll | 29 from idavoll import error, iidavoll |
30 from idavoll.iidavoll import IBackendService | 30 from idavoll.iidavoll import IBackendService, ILeafNode |
31 | 31 |
32 def _getAffiliation(node, entity): | 32 def _getAffiliation(node, entity): |
33 d = node.getAffiliation(entity) | 33 d = node.getAffiliation(entity) |
34 d.addCallback(lambda affiliation: (node, affiliation)) | 34 d.addCallback(lambda affiliation: (node, affiliation)) |
35 return d | 35 return d |
38 | 38 |
39 class BackendService(service.Service, utility.EventDispatcher): | 39 class BackendService(service.Service, utility.EventDispatcher): |
40 """ | 40 """ |
41 Generic publish-subscribe backend service. | 41 Generic publish-subscribe backend service. |
42 | 42 |
43 @cvar options: Node configuration form as a mapping from the field | 43 @cvar nodeOptions: Node configuration form as a mapping from the field |
44 name to a dictionary that holds the field's type, | 44 name to a dictionary that holds the field's type, label |
45 label and possible options to choose from. | 45 and possible options to choose from. |
46 @type options: C{dict}. | 46 @type nodeOptions: C{dict}. |
47 @cvar defaultConfig: The default node configuration. | 47 @cvar defaultConfig: The default node configuration. |
48 """ | 48 """ |
49 | 49 |
50 implements(iidavoll.IBackendService) | 50 implements(iidavoll.IBackendService) |
51 | 51 |
52 options = {"pubsub#persist_items": | 52 nodeOptions = { |
53 {"type": "boolean", | 53 "pubsub#persist_items": |
54 "label": "Persist items to storage"}, | 54 {"type": "boolean", |
55 "pubsub#deliver_payloads": | 55 "label": "Persist items to storage"}, |
56 {"type": "boolean", | 56 "pubsub#deliver_payloads": |
57 "label": "Deliver payloads with event notifications"}, | 57 {"type": "boolean", |
58 "pubsub#send_last_published_item": | 58 "label": "Deliver payloads with event notifications"}, |
59 {"type": "list-single", | 59 "pubsub#send_last_published_item": |
60 "label": "When to send the last published item", | 60 {"type": "list-single", |
61 "options": { | 61 "label": "When to send the last published item", |
62 "never": "Never", | 62 "options": { |
63 "on_sub": "When a new subscription is processed", | 63 "never": "Never", |
64 } | 64 "on_sub": "When a new subscription is processed"} |
65 }, | 65 }, |
66 } | 66 } |
67 | 67 |
68 defaultConfig = {"pubsub#persist_items": True, | 68 subscriptionOptions = { |
69 "pubsub#deliver_payloads": True, | 69 "pubsub#subscription_type": |
70 "pubsub#send_last_published_item": 'on_sub', | 70 {"type": "list-single", |
71 } | 71 "options": { |
72 "items": "Receive notification of new items only", | |
73 "nodes": "Receive notification of new nodes only"} | |
74 }, | |
75 "pubsub#subscription_depth": | |
76 {"type": "list-single", | |
77 "options": { | |
78 "1": "Receive notification from direct child nodes only", | |
79 "all": "Receive notification from all descendent nodes"} | |
80 }, | |
81 } | |
72 | 82 |
73 def __init__(self, storage): | 83 def __init__(self, storage): |
74 utility.EventDispatcher.__init__(self) | 84 utility.EventDispatcher.__init__(self) |
75 self.storage = storage | 85 self.storage = storage |
76 self._callbackList = [] | 86 self._callbackList = [] |
106 | 116 |
107 | 117 |
108 def _makeMetaData(self, metaData): | 118 def _makeMetaData(self, metaData): |
109 options = [] | 119 options = [] |
110 for key, value in metaData.iteritems(): | 120 for key, value in metaData.iteritems(): |
111 if self.options.has_key(key): | 121 if key in self.nodeOptions: |
112 option = {"var": key} | 122 option = {"var": key} |
113 option.update(self.options[key]) | 123 option.update(self.nodeOptions[key]) |
114 option["value"] = value | 124 option["value"] = value |
115 options.append(option) | 125 options.append(option) |
116 | 126 |
117 return options | 127 return options |
118 | 128 |
134 d.addCallback(self._doPublish, items, requestor) | 144 d.addCallback(self._doPublish, items, requestor) |
135 return d | 145 return d |
136 | 146 |
137 | 147 |
138 def _doPublish(self, node, items, requestor): | 148 def _doPublish(self, node, items, requestor): |
149 if node.nodeType == 'collection': | |
150 raise error.NoPublishing() | |
151 | |
139 configuration = node.getConfiguration() | 152 configuration = node.getConfiguration() |
140 persistItems = configuration["pubsub#persist_items"] | 153 persistItems = configuration["pubsub#persist_items"] |
141 deliverPayloads = configuration["pubsub#deliver_payloads"] | 154 deliverPayloads = configuration["pubsub#deliver_payloads"] |
142 | 155 |
143 if items and not persistItems and not deliverPayloads: | 156 if items and not persistItems and not deliverPayloads: |
145 elif not items and (persistItems or deliverPayloads): | 158 elif not items and (persistItems or deliverPayloads): |
146 raise error.ItemRequired() | 159 raise error.ItemRequired() |
147 | 160 |
148 if persistItems or deliverPayloads: | 161 if persistItems or deliverPayloads: |
149 for item in items: | 162 for item in items: |
163 item.uri = None | |
164 item.defaultUri = None | |
150 if not item.getAttribute("id"): | 165 if not item.getAttribute("id"): |
151 item["id"] = str(uuid.uuid4()) | 166 item["id"] = str(uuid.uuid4()) |
152 | 167 |
153 if persistItems: | 168 if persistItems: |
154 d = node.storeItems(items, requestor) | 169 d = node.storeItems(items, requestor) |
167 | 182 |
168 self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier}, | 183 self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier}, |
169 '//event/pubsub/notify') | 184 '//event/pubsub/notify') |
170 | 185 |
171 | 186 |
172 def getNotificationList(self, nodeIdentifier, items): | 187 def getNotifications(self, nodeIdentifier, items): |
173 d = self.storage.getNode(nodeIdentifier) | 188 |
174 d.addCallback(lambda node: node.getSubscribers()) | 189 def toNotifications(subscriptions, nodeIdentifier, items): |
175 d.addCallback(self._magicFilter, nodeIdentifier, items) | 190 subsBySubscriber = {} |
176 return d | 191 for subscription in subscriptions: |
177 | 192 if subscription.options.get('pubsub#subscription_type', |
178 | 193 'items') == 'items': |
179 def _magicFilter(self, subscribers, nodeIdentifier, items): | 194 subs = subsBySubscriber.setdefault(subscription.subscriber, |
180 list = [] | 195 set()) |
181 for subscriber in subscribers: | 196 subs.add(subscription) |
182 list.append((subscriber, items)) | 197 |
183 return list | 198 notifications = [(subscriber, subscriptions, items) |
199 for subscriber, subscriptions | |
200 in subsBySubscriber.iteritems()] | |
201 | |
202 return notifications | |
203 | |
204 def rootNotFound(failure): | |
205 failure.trap(error.NodeNotFound) | |
206 return [] | |
207 | |
208 d1 = self.storage.getNode(nodeIdentifier) | |
209 d1.addCallback(lambda node: node.getSubscriptions('subscribed')) | |
210 d2 = self.storage.getNode('') | |
211 d2.addCallback(lambda node: node.getSubscriptions('subscribed')) | |
212 d2.addErrback(rootNotFound) | |
213 d = defer.gatherResults([d1, d2]) | |
214 d.addCallback(lambda result: result[0] + result[1]) | |
215 d.addCallback(toNotifications, nodeIdentifier, items) | |
216 return d | |
184 | 217 |
185 | 218 |
186 def registerNotifier(self, observerfn, *args, **kwargs): | 219 def registerNotifier(self, observerfn, *args, **kwargs): |
187 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) | 220 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) |
188 | 221 |
202 node, affiliation = result | 235 node, affiliation = result |
203 | 236 |
204 if affiliation == 'outcast': | 237 if affiliation == 'outcast': |
205 raise error.Forbidden() | 238 raise error.Forbidden() |
206 | 239 |
207 d = node.addSubscription(subscriber, 'subscribed') | 240 def trapExists(failure): |
208 d.addCallback(lambda _: self._sendLastPublished(node, subscriber)) | 241 failure.trap(error.SubscriptionExists) |
209 d.addCallback(lambda _: 'subscribed') | 242 return False |
210 d.addErrback(self._getSubscription, node, subscriber) | 243 |
211 d.addCallback(self._returnSubscription, node.nodeIdentifier) | 244 def cb(sendLast): |
212 return d | 245 d = node.getSubscription(subscriber) |
213 | 246 if sendLast: |
214 | 247 d.addCallback(self._sendLastPublished, node) |
215 def _getSubscription(self, failure, node, subscriber): | 248 return d |
216 failure.trap(error.SubscriptionExists) | 249 |
217 return node.getSubscription(subscriber) | 250 d = node.addSubscription(subscriber, 'subscribed', {}) |
218 | 251 d.addCallbacks(lambda _: True, trapExists) |
219 | 252 d.addCallback(cb) |
220 def _returnSubscription(self, result, nodeIdentifier): | 253 return d |
221 return nodeIdentifier, result | 254 |
222 | 255 |
223 | 256 def _sendLastPublished(self, subscription, node): |
224 def _sendLastPublished(self, node, subscriber): | |
225 | 257 |
226 def notifyItem(items): | 258 def notifyItem(items): |
227 if not items: | 259 if items: |
228 return | 260 reactor.callLater(0, self.dispatch, |
229 | 261 {'items': items, |
230 reactor.callLater(0, self.dispatch, | 262 'nodeIdentifier': node.nodeIdentifier, |
231 {'items': items, | 263 'subscription': subscription}, |
232 'nodeIdentifier': node.nodeIdentifier, | 264 '//event/pubsub/notify') |
233 'subscriber': subscriber}, | |
234 '//event/pubsub/notify') | |
235 | 265 |
236 config = node.getConfiguration() | 266 config = node.getConfiguration() |
237 if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': | 267 sendLastPublished = config.get('pubsub#send_last_published_item', |
238 return | 268 'never') |
239 | 269 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': |
240 d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1) | 270 entity = subscription.subscriber.userhostJID() |
241 d.addCallback(notifyItem) | 271 d = self.getItems(node.nodeIdentifier, entity, 1) |
272 d.addCallback(notifyItem) | |
273 d.addErrback(log.err) | |
274 | |
275 return subscription | |
242 | 276 |
243 | 277 |
244 def unsubscribe(self, nodeIdentifier, subscriber, requestor): | 278 def unsubscribe(self, nodeIdentifier, subscriber, requestor): |
245 if subscriber.userhostJID() != requestor: | 279 if subscriber.userhostJID() != requestor: |
246 return defer.fail(error.Forbidden()) | 280 return defer.fail(error.Forbidden()) |
259 | 293 |
260 | 294 |
261 def createNode(self, nodeIdentifier, requestor): | 295 def createNode(self, nodeIdentifier, requestor): |
262 if not nodeIdentifier: | 296 if not nodeIdentifier: |
263 nodeIdentifier = 'generic/%s' % uuid.uuid4() | 297 nodeIdentifier = 'generic/%s' % uuid.uuid4() |
264 d = self.storage.createNode(nodeIdentifier, requestor) | 298 |
299 nodeType = 'leaf' | |
300 config = self.storage.getDefaultConfiguration(nodeType) | |
301 config['pubsub#node_type'] = nodeType | |
302 | |
303 d = self.storage.createNode(nodeIdentifier, requestor, config) | |
265 d.addCallback(lambda _: nodeIdentifier) | 304 d.addCallback(lambda _: nodeIdentifier) |
266 return d | 305 return d |
267 | 306 |
268 | 307 |
269 def getDefaultConfiguration(self): | 308 def getDefaultConfiguration(self, nodeType): |
270 d = defer.succeed(self.defaultConfig) | 309 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) |
271 return d | 310 return d |
272 | 311 |
273 | 312 |
274 def getNodeConfiguration(self, nodeIdentifier): | 313 def getNodeConfiguration(self, nodeIdentifier): |
275 if not nodeIdentifier: | 314 if not nodeIdentifier: |
276 return defer.fail(error.NoRootNode()) | 315 return defer.fail(error.NoRootNode()) |
277 | 316 |
278 d = self.storage.getNode(nodeIdentifier) | 317 d = self.storage.getNode(nodeIdentifier) |
279 d.addCallback(lambda node: node.getConfiguration()) | 318 d.addCallback(lambda node: node.getConfiguration()) |
319 | |
280 return d | 320 return d |
281 | 321 |
282 | 322 |
283 def setNodeConfiguration(self, nodeIdentifier, options, requestor): | 323 def setNodeConfiguration(self, nodeIdentifier, options, requestor): |
284 if not nodeIdentifier: | 324 if not nodeIdentifier: |
311 return d | 351 return d |
312 | 352 |
313 | 353 |
314 def _doGetItems(self, result, maxItems, itemIdentifiers): | 354 def _doGetItems(self, result, maxItems, itemIdentifiers): |
315 node, affiliation = result | 355 node, affiliation = result |
356 | |
357 if not ILeafNode.providedBy(node): | |
358 return [] | |
316 | 359 |
317 if affiliation == 'outcast': | 360 if affiliation == 'outcast': |
318 raise error.Forbidden() | 361 raise error.Forbidden() |
319 | 362 |
320 if itemIdentifiers: | 363 if itemIdentifiers: |
380 def registerPreDelete(self, preDeleteFn): | 423 def registerPreDelete(self, preDeleteFn): |
381 self._callbackList.append(preDeleteFn) | 424 self._callbackList.append(preDeleteFn) |
382 | 425 |
383 | 426 |
384 def getSubscribers(self, nodeIdentifier): | 427 def getSubscribers(self, nodeIdentifier): |
385 d = self.storage.getNode(nodeIdentifier) | 428 def cb(subscriptions): |
386 d.addCallback(lambda node: node.getSubscribers()) | 429 return [subscription.subscriber for subscription in subscriptions] |
430 | |
431 d = self.storage.getNode(nodeIdentifier) | |
432 d.addCallback(lambda node: node.getSubscriptions('subscribed')) | |
433 d.addCallback(cb) | |
387 return d | 434 return d |
388 | 435 |
389 | 436 |
390 def deleteNode(self, nodeIdentifier, requestor): | 437 def deleteNode(self, nodeIdentifier, requestor): |
391 d = self.storage.getNode(nodeIdentifier) | 438 d = self.storage.getNode(nodeIdentifier) |
445 error.InvalidConfigurationValue: ('not-acceptable', None, None), | 492 error.InvalidConfigurationValue: ('not-acceptable', None, None), |
446 error.NodeNotPersistent: ('feature-not-implemented', | 493 error.NodeNotPersistent: ('feature-not-implemented', |
447 'unsupported', | 494 'unsupported', |
448 'persistent-node'), | 495 'persistent-node'), |
449 error.NoRootNode: ('bad-request', None, None), | 496 error.NoRootNode: ('bad-request', None, None), |
497 error.NoCollections: ('feature-not-implemented', | |
498 'unsupported', | |
499 'collections'), | |
500 error.NoPublishing: ('feature-not-implemented', | |
501 'unsupported', | |
502 'publish'), | |
450 } | 503 } |
451 | 504 |
452 def __init__(self, backend): | 505 def __init__(self, backend): |
453 PubSubService.__init__(self) | 506 PubSubService.__init__(self) |
454 | 507 |
495 | 548 |
496 | 549 |
497 def _notify(self, data): | 550 def _notify(self, data): |
498 items = data['items'] | 551 items = data['items'] |
499 nodeIdentifier = data['nodeIdentifier'] | 552 nodeIdentifier = data['nodeIdentifier'] |
500 if 'subscriber' not in data: | 553 if 'subscription' not in data: |
501 d = self.backend.getNotificationList(nodeIdentifier, items) | 554 d = self.backend.getNotifications(nodeIdentifier, items) |
502 else: | 555 else: |
503 d = defer.succeed([(data['subscriber'], items)]) | 556 subscription = data['subscription'] |
557 d = defer.succeed([(subscription.subscriber, [subscription], | |
558 items)]) | |
504 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, | 559 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, |
505 nodeIdentifier, | 560 nodeIdentifier, |
506 notifications)) | 561 notifications)) |
507 | 562 |
508 | 563 |
537 | 592 |
538 def saveMetaData(result): | 593 def saveMetaData(result): |
539 info['meta-data'] = result | 594 info['meta-data'] = result |
540 return info | 595 return info |
541 | 596 |
597 def trapNotFound(failure): | |
598 failure.trap(error.NodeNotFound) | |
599 return info | |
600 | |
542 d = defer.succeed(nodeIdentifier) | 601 d = defer.succeed(nodeIdentifier) |
543 d.addCallback(self.backend.getNodeType) | 602 d.addCallback(self.backend.getNodeType) |
544 d.addCallback(saveType) | 603 d.addCallback(saveType) |
545 d.addCallback(self.backend.getNodeMetaData) | 604 d.addCallback(self.backend.getNodeMetaData) |
546 d.addCallback(saveMetaData) | 605 d.addCallback(saveMetaData) |
606 d.addErrback(trapNotFound) | |
547 d.addErrback(self._mapErrors) | 607 d.addErrback(self._mapErrors) |
548 return d | 608 return d |
549 | 609 |
550 | 610 |
551 def getNodes(self, requestor, service): | 611 def getNodes(self, requestor, service): |
584 d = self.backend.createNode(nodeIdentifier, requestor) | 644 d = self.backend.createNode(nodeIdentifier, requestor) |
585 return d.addErrback(self._mapErrors) | 645 return d.addErrback(self._mapErrors) |
586 | 646 |
587 | 647 |
588 def getConfigurationOptions(self): | 648 def getConfigurationOptions(self): |
589 return self.backend.options | 649 return self.backend.nodeOptions |
590 | 650 |
591 | 651 |
592 def getDefaultConfiguration(self, requestor, service): | 652 def getDefaultConfiguration(self, requestor, service, nodeType): |
593 d = self.backend.getDefaultConfiguration() | 653 d = self.backend.getDefaultConfiguration(nodeType) |
594 return d.addErrback(self._mapErrors) | 654 return d.addErrback(self._mapErrors) |
595 | 655 |
596 | 656 |
597 def getConfiguration(self, requestor, service, nodeIdentifier): | 657 def getConfiguration(self, requestor, service, nodeIdentifier): |
598 d = self.backend.getNodeConfiguration(nodeIdentifier) | 658 d = self.backend.getNodeConfiguration(nodeIdentifier) |