comparison idavoll/backend.py @ 206:274a45d2a5ab

Implement root collection that includes all leaf nodes.
author Ralph Meijer <ralphm@ik.nu>
date Mon, 04 Aug 2008 13:47:10 +0000
parents e6b710bf2b24
children 7f3ffb7a1a9e
comparison
equal deleted inserted replaced
205:e6b710bf2b24 206:274a45d2a5ab
16 import uuid 16 import uuid
17 17
18 from zope.interface import implements 18 from zope.interface import implements
19 19
20 from twisted.application import service 20 from twisted.application import service
21 from twisted.python import components 21 from twisted.python import components, log
22 from twisted.internet import defer, reactor 22 from twisted.internet import defer, reactor
23 from twisted.words.protocols.jabber.error import StanzaError 23 from twisted.words.protocols.jabber.error import StanzaError
24 from twisted.words.xish import domish, utility 24 from twisted.words.xish import domish, utility
25 25
26 from wokkel.iwokkel import IDisco, IPubSubService 26 from wokkel.iwokkel import IDisco, IPubSubService
27 from wokkel.pubsub import PubSubService, PubSubError 27 from wokkel.pubsub import PubSubService, PubSubError
28 28
29 from idavoll import error, iidavoll 29 from idavoll import error, iidavoll
30 from idavoll.iidavoll import IBackendService 30 from idavoll.iidavoll import IBackendService, ILeafNode
31 31
32 def _getAffiliation(node, entity): 32 def _getAffiliation(node, entity):
33 d = node.getAffiliation(entity) 33 d = node.getAffiliation(entity)
34 d.addCallback(lambda affiliation: (node, affiliation)) 34 d.addCallback(lambda affiliation: (node, affiliation))
35 return d 35 return d
38 38
39 class BackendService(service.Service, utility.EventDispatcher): 39 class BackendService(service.Service, utility.EventDispatcher):
40 """ 40 """
41 Generic publish-subscribe backend service. 41 Generic publish-subscribe backend service.
42 42
43 @cvar options: Node configuration form as a mapping from the field 43 @cvar nodeOptions: Node configuration form as a mapping from the field
44 name to a dictionary that holds the field's type, 44 name to a dictionary that holds the field's type, label
45 label and possible options to choose from. 45 and possible options to choose from.
46 @type options: C{dict}. 46 @type nodeOptions: C{dict}.
47 @cvar defaultConfig: The default node configuration. 47 @cvar defaultConfig: The default node configuration.
48 """ 48 """
49 49
50 implements(iidavoll.IBackendService) 50 implements(iidavoll.IBackendService)
51 51
52 options = {"pubsub#persist_items": 52 nodeOptions = {
53 {"type": "boolean", 53 "pubsub#persist_items":
54 "label": "Persist items to storage"}, 54 {"type": "boolean",
55 "pubsub#deliver_payloads": 55 "label": "Persist items to storage"},
56 {"type": "boolean", 56 "pubsub#deliver_payloads":
57 "label": "Deliver payloads with event notifications"}, 57 {"type": "boolean",
58 "pubsub#send_last_published_item": 58 "label": "Deliver payloads with event notifications"},
59 {"type": "list-single", 59 "pubsub#send_last_published_item":
60 "label": "When to send the last published item", 60 {"type": "list-single",
61 "options": { 61 "label": "When to send the last published item",
62 "never": "Never", 62 "options": {
63 "on_sub": "When a new subscription is processed", 63 "never": "Never",
64 } 64 "on_sub": "When a new subscription is processed"}
65 }, 65 },
66 } 66 }
67 67
68 defaultConfig = {"pubsub#persist_items": True, 68 subscriptionOptions = {
69 "pubsub#deliver_payloads": True, 69 "pubsub#subscription_type":
70 "pubsub#send_last_published_item": 'on_sub', 70 {"type": "list-single",
71 } 71 "options": {
72 "items": "Receive notification of new items only",
73 "nodes": "Receive notification of new nodes only"}
74 },
75 "pubsub#subscription_depth":
76 {"type": "list-single",
77 "options": {
78 "1": "Receive notification from direct child nodes only",
79 "all": "Receive notification from all descendent nodes"}
80 },
81 }
72 82
73 def __init__(self, storage): 83 def __init__(self, storage):
74 utility.EventDispatcher.__init__(self) 84 utility.EventDispatcher.__init__(self)
75 self.storage = storage 85 self.storage = storage
76 self._callbackList = [] 86 self._callbackList = []
106 116
107 117
108 def _makeMetaData(self, metaData): 118 def _makeMetaData(self, metaData):
109 options = [] 119 options = []
110 for key, value in metaData.iteritems(): 120 for key, value in metaData.iteritems():
111 if self.options.has_key(key): 121 if key in self.nodeOptions:
112 option = {"var": key} 122 option = {"var": key}
113 option.update(self.options[key]) 123 option.update(self.nodeOptions[key])
114 option["value"] = value 124 option["value"] = value
115 options.append(option) 125 options.append(option)
116 126
117 return options 127 return options
118 128
134 d.addCallback(self._doPublish, items, requestor) 144 d.addCallback(self._doPublish, items, requestor)
135 return d 145 return d
136 146
137 147
138 def _doPublish(self, node, items, requestor): 148 def _doPublish(self, node, items, requestor):
149 if node.nodeType == 'collection':
150 raise error.NoPublishing()
151
139 configuration = node.getConfiguration() 152 configuration = node.getConfiguration()
140 persistItems = configuration["pubsub#persist_items"] 153 persistItems = configuration["pubsub#persist_items"]
141 deliverPayloads = configuration["pubsub#deliver_payloads"] 154 deliverPayloads = configuration["pubsub#deliver_payloads"]
142 155
143 if items and not persistItems and not deliverPayloads: 156 if items and not persistItems and not deliverPayloads:
145 elif not items and (persistItems or deliverPayloads): 158 elif not items and (persistItems or deliverPayloads):
146 raise error.ItemRequired() 159 raise error.ItemRequired()
147 160
148 if persistItems or deliverPayloads: 161 if persistItems or deliverPayloads:
149 for item in items: 162 for item in items:
163 item.uri = None
164 item.defaultUri = None
150 if not item.getAttribute("id"): 165 if not item.getAttribute("id"):
151 item["id"] = str(uuid.uuid4()) 166 item["id"] = str(uuid.uuid4())
152 167
153 if persistItems: 168 if persistItems:
154 d = node.storeItems(items, requestor) 169 d = node.storeItems(items, requestor)
167 182
168 self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier}, 183 self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier},
169 '//event/pubsub/notify') 184 '//event/pubsub/notify')
170 185
171 186
172 def getNotificationList(self, nodeIdentifier, items): 187 def getNotifications(self, nodeIdentifier, items):
173 d = self.storage.getNode(nodeIdentifier) 188
174 d.addCallback(lambda node: node.getSubscribers()) 189 def toNotifications(subscriptions, nodeIdentifier, items):
175 d.addCallback(self._magicFilter, nodeIdentifier, items) 190 subsBySubscriber = {}
176 return d 191 for subscription in subscriptions:
177 192 if subscription.options.get('pubsub#subscription_type',
178 193 'items') == 'items':
179 def _magicFilter(self, subscribers, nodeIdentifier, items): 194 subs = subsBySubscriber.setdefault(subscription.subscriber,
180 list = [] 195 set())
181 for subscriber in subscribers: 196 subs.add(subscription)
182 list.append((subscriber, items)) 197
183 return list 198 notifications = [(subscriber, subscriptions, items)
199 for subscriber, subscriptions
200 in subsBySubscriber.iteritems()]
201
202 return notifications
203
204 def rootNotFound(failure):
205 failure.trap(error.NodeNotFound)
206 return []
207
208 d1 = self.storage.getNode(nodeIdentifier)
209 d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
210 d2 = self.storage.getNode('')
211 d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
212 d2.addErrback(rootNotFound)
213 d = defer.gatherResults([d1, d2])
214 d.addCallback(lambda result: result[0] + result[1])
215 d.addCallback(toNotifications, nodeIdentifier, items)
216 return d
184 217
185 218
186 def registerNotifier(self, observerfn, *args, **kwargs): 219 def registerNotifier(self, observerfn, *args, **kwargs):
187 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) 220 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
188 221
202 node, affiliation = result 235 node, affiliation = result
203 236
204 if affiliation == 'outcast': 237 if affiliation == 'outcast':
205 raise error.Forbidden() 238 raise error.Forbidden()
206 239
207 d = node.addSubscription(subscriber, 'subscribed') 240 def trapExists(failure):
208 d.addCallback(lambda _: self._sendLastPublished(node, subscriber)) 241 failure.trap(error.SubscriptionExists)
209 d.addCallback(lambda _: 'subscribed') 242 return False
210 d.addErrback(self._getSubscription, node, subscriber) 243
211 d.addCallback(self._returnSubscription, node.nodeIdentifier) 244 def cb(sendLast):
212 return d 245 d = node.getSubscription(subscriber)
213 246 if sendLast:
214 247 d.addCallback(self._sendLastPublished, node)
215 def _getSubscription(self, failure, node, subscriber): 248 return d
216 failure.trap(error.SubscriptionExists) 249
217 return node.getSubscription(subscriber) 250 d = node.addSubscription(subscriber, 'subscribed', {})
218 251 d.addCallbacks(lambda _: True, trapExists)
219 252 d.addCallback(cb)
220 def _returnSubscription(self, result, nodeIdentifier): 253 return d
221 return nodeIdentifier, result 254
222 255
223 256 def _sendLastPublished(self, subscription, node):
224 def _sendLastPublished(self, node, subscriber):
225 257
226 def notifyItem(items): 258 def notifyItem(items):
227 if not items: 259 if items:
228 return 260 reactor.callLater(0, self.dispatch,
229 261 {'items': items,
230 reactor.callLater(0, self.dispatch, 262 'nodeIdentifier': node.nodeIdentifier,
231 {'items': items, 263 'subscription': subscription},
232 'nodeIdentifier': node.nodeIdentifier, 264 '//event/pubsub/notify')
233 'subscriber': subscriber},
234 '//event/pubsub/notify')
235 265
236 config = node.getConfiguration() 266 config = node.getConfiguration()
237 if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': 267 sendLastPublished = config.get('pubsub#send_last_published_item',
238 return 268 'never')
239 269 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
240 d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1) 270 entity = subscription.subscriber.userhostJID()
241 d.addCallback(notifyItem) 271 d = self.getItems(node.nodeIdentifier, entity, 1)
272 d.addCallback(notifyItem)
273 d.addErrback(log.err)
274
275 return subscription
242 276
243 277
244 def unsubscribe(self, nodeIdentifier, subscriber, requestor): 278 def unsubscribe(self, nodeIdentifier, subscriber, requestor):
245 if subscriber.userhostJID() != requestor: 279 if subscriber.userhostJID() != requestor:
246 return defer.fail(error.Forbidden()) 280 return defer.fail(error.Forbidden())
259 293
260 294
261 def createNode(self, nodeIdentifier, requestor): 295 def createNode(self, nodeIdentifier, requestor):
262 if not nodeIdentifier: 296 if not nodeIdentifier:
263 nodeIdentifier = 'generic/%s' % uuid.uuid4() 297 nodeIdentifier = 'generic/%s' % uuid.uuid4()
264 d = self.storage.createNode(nodeIdentifier, requestor) 298
299 nodeType = 'leaf'
300 config = self.storage.getDefaultConfiguration(nodeType)
301 config['pubsub#node_type'] = nodeType
302
303 d = self.storage.createNode(nodeIdentifier, requestor, config)
265 d.addCallback(lambda _: nodeIdentifier) 304 d.addCallback(lambda _: nodeIdentifier)
266 return d 305 return d
267 306
268 307
269 def getDefaultConfiguration(self): 308 def getDefaultConfiguration(self, nodeType):
270 d = defer.succeed(self.defaultConfig) 309 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
271 return d 310 return d
272 311
273 312
274 def getNodeConfiguration(self, nodeIdentifier): 313 def getNodeConfiguration(self, nodeIdentifier):
275 if not nodeIdentifier: 314 if not nodeIdentifier:
276 return defer.fail(error.NoRootNode()) 315 return defer.fail(error.NoRootNode())
277 316
278 d = self.storage.getNode(nodeIdentifier) 317 d = self.storage.getNode(nodeIdentifier)
279 d.addCallback(lambda node: node.getConfiguration()) 318 d.addCallback(lambda node: node.getConfiguration())
319
280 return d 320 return d
281 321
282 322
283 def setNodeConfiguration(self, nodeIdentifier, options, requestor): 323 def setNodeConfiguration(self, nodeIdentifier, options, requestor):
284 if not nodeIdentifier: 324 if not nodeIdentifier:
311 return d 351 return d
312 352
313 353
314 def _doGetItems(self, result, maxItems, itemIdentifiers): 354 def _doGetItems(self, result, maxItems, itemIdentifiers):
315 node, affiliation = result 355 node, affiliation = result
356
357 if not ILeafNode.providedBy(node):
358 return []
316 359
317 if affiliation == 'outcast': 360 if affiliation == 'outcast':
318 raise error.Forbidden() 361 raise error.Forbidden()
319 362
320 if itemIdentifiers: 363 if itemIdentifiers:
380 def registerPreDelete(self, preDeleteFn): 423 def registerPreDelete(self, preDeleteFn):
381 self._callbackList.append(preDeleteFn) 424 self._callbackList.append(preDeleteFn)
382 425
383 426
384 def getSubscribers(self, nodeIdentifier): 427 def getSubscribers(self, nodeIdentifier):
385 d = self.storage.getNode(nodeIdentifier) 428 def cb(subscriptions):
386 d.addCallback(lambda node: node.getSubscribers()) 429 return [subscription.subscriber for subscription in subscriptions]
430
431 d = self.storage.getNode(nodeIdentifier)
432 d.addCallback(lambda node: node.getSubscriptions('subscribed'))
433 d.addCallback(cb)
387 return d 434 return d
388 435
389 436
390 def deleteNode(self, nodeIdentifier, requestor): 437 def deleteNode(self, nodeIdentifier, requestor):
391 d = self.storage.getNode(nodeIdentifier) 438 d = self.storage.getNode(nodeIdentifier)
445 error.InvalidConfigurationValue: ('not-acceptable', None, None), 492 error.InvalidConfigurationValue: ('not-acceptable', None, None),
446 error.NodeNotPersistent: ('feature-not-implemented', 493 error.NodeNotPersistent: ('feature-not-implemented',
447 'unsupported', 494 'unsupported',
448 'persistent-node'), 495 'persistent-node'),
449 error.NoRootNode: ('bad-request', None, None), 496 error.NoRootNode: ('bad-request', None, None),
497 error.NoCollections: ('feature-not-implemented',
498 'unsupported',
499 'collections'),
500 error.NoPublishing: ('feature-not-implemented',
501 'unsupported',
502 'publish'),
450 } 503 }
451 504
452 def __init__(self, backend): 505 def __init__(self, backend):
453 PubSubService.__init__(self) 506 PubSubService.__init__(self)
454 507
495 548
496 549
497 def _notify(self, data): 550 def _notify(self, data):
498 items = data['items'] 551 items = data['items']
499 nodeIdentifier = data['nodeIdentifier'] 552 nodeIdentifier = data['nodeIdentifier']
500 if 'subscriber' not in data: 553 if 'subscription' not in data:
501 d = self.backend.getNotificationList(nodeIdentifier, items) 554 d = self.backend.getNotifications(nodeIdentifier, items)
502 else: 555 else:
503 d = defer.succeed([(data['subscriber'], items)]) 556 subscription = data['subscription']
557 d = defer.succeed([(subscription.subscriber, [subscription],
558 items)])
504 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, 559 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID,
505 nodeIdentifier, 560 nodeIdentifier,
506 notifications)) 561 notifications))
507 562
508 563
537 592
538 def saveMetaData(result): 593 def saveMetaData(result):
539 info['meta-data'] = result 594 info['meta-data'] = result
540 return info 595 return info
541 596
597 def trapNotFound(failure):
598 failure.trap(error.NodeNotFound)
599 return info
600
542 d = defer.succeed(nodeIdentifier) 601 d = defer.succeed(nodeIdentifier)
543 d.addCallback(self.backend.getNodeType) 602 d.addCallback(self.backend.getNodeType)
544 d.addCallback(saveType) 603 d.addCallback(saveType)
545 d.addCallback(self.backend.getNodeMetaData) 604 d.addCallback(self.backend.getNodeMetaData)
546 d.addCallback(saveMetaData) 605 d.addCallback(saveMetaData)
606 d.addErrback(trapNotFound)
547 d.addErrback(self._mapErrors) 607 d.addErrback(self._mapErrors)
548 return d 608 return d
549 609
550 610
551 def getNodes(self, requestor, service): 611 def getNodes(self, requestor, service):
584 d = self.backend.createNode(nodeIdentifier, requestor) 644 d = self.backend.createNode(nodeIdentifier, requestor)
585 return d.addErrback(self._mapErrors) 645 return d.addErrback(self._mapErrors)
586 646
587 647
588 def getConfigurationOptions(self): 648 def getConfigurationOptions(self):
589 return self.backend.options 649 return self.backend.nodeOptions
590 650
591 651
592 def getDefaultConfiguration(self, requestor, service): 652 def getDefaultConfiguration(self, requestor, service, nodeType):
593 d = self.backend.getDefaultConfiguration() 653 d = self.backend.getDefaultConfiguration(nodeType)
594 return d.addErrback(self._mapErrors) 654 return d.addErrback(self._mapErrors)
595 655
596 656
597 def getConfiguration(self, requestor, service, nodeIdentifier): 657 def getConfiguration(self, requestor, service, nodeIdentifier):
598 d = self.backend.getNodeConfiguration(nodeIdentifier) 658 d = self.backend.getNodeConfiguration(nodeIdentifier)