comparison idavoll/backend.py @ 167:ef22e4150caa

Move protocol implementations (pubsub, disco, forms) to and depend on wokkel. Author: ralphm Fixes: #4
author Ralph Meijer <ralphm@ik.nu>
date Wed, 03 Oct 2007 12:41:43 +0000
parents 6fe78048baf9
children e2c2c2baf483
comparison
equal deleted inserted replaced
166:5abb017bd687 167:ef22e4150caa
1 # Copyright (c) 2003-2006 Ralph Meijer 1 # -*- test-case-name: idavoll.test.test_backend -*-
2 #
3 # Copyright (c) 2003-2007 Ralph Meijer
2 # See LICENSE for details. 4 # See LICENSE for details.
3 5
4 from zope.interface import Interface 6 import uuid
5 import storage 7
6 8 from zope.interface import implements
7 class Error(Exception): 9
8 msg = '' 10 from twisted.application import service
9 11 from twisted.python import components
10 def __str__(self): 12 from twisted.internet import defer
11 return self.msg 13 from twisted.words.protocols.jabber.error import StanzaError
12 14 from twisted.words.xish import utility
13 class Forbidden(Error): 15
14 pass 16 from wokkel.iwokkel import IDisco, IPubSubService
15 17 from wokkel.pubsub import PubSubService, PubSubError
16 class ItemForbidden(Error): 18
17 pass 19 from idavoll import error, iidavoll
18 20 from idavoll.iidavoll import IBackendService
19 class ItemRequired(Error): 21
20 pass 22 def _get_affiliation(node, entity):
21 23 d = node.get_affiliation(entity)
22 class NoInstantNodes(Error): 24 d.addCallback(lambda affiliation: (node, affiliation))
23 pass 25 return d
24 26
25 class NotSubscribed(Error): 27 class BackendService(service.Service, utility.EventDispatcher):
26 pass 28
27 29 implements(iidavoll.IBackendService)
28 class InvalidConfigurationOption(Error): 30
29 msg = 'Invalid configuration option' 31 options = {"pubsub#persist_items":
30 32 {"type": "boolean",
31 class InvalidConfigurationValue(Error): 33 "label": "Persist items to storage"},
32 msg = 'Bad configuration value' 34 "pubsub#deliver_payloads":
33 35 {"type": "boolean",
34 class NodeNotPersistent(Error): 36 "label": "Deliver payloads with event notifications"},
35 pass 37 }
36 38
37 class NoRootNode(Error): 39 default_config = {"pubsub#persist_items": True,
38 pass 40 "pubsub#deliver_payloads": True,
39 41 }
40 class IBackendService(Interface): 42
41 """ Interface to a backend service of a pubsub service. """ 43 def __init__(self, storage):
42 44 utility.EventDispatcher.__init__(self)
43 def __init__(storage): 45 self.storage = storage
44 """ 46 self._callback_list = []
45 @param storage: L{storage} object.
46 """
47 47
48 def supports_publisher_affiliation(self): 48 def supports_publisher_affiliation(self):
49 """ Reports if the backend supports the publisher affiliation. 49 return True
50
51 @rtype: C{bool}
52 """
53 50
54 def supports_outcast_affiliation(self): 51 def supports_outcast_affiliation(self):
55 """ Reports if the backend supports the publisher affiliation. 52 return True
56
57 @rtype: C{bool}
58 """
59 53
60 def supports_persistent_items(self): 54 def supports_persistent_items(self):
61 """ Reports if the backend supports persistent items. 55 return True
62 56
63 @rtype: C{bool} 57 def get_node_type(self, node_id):
64 """ 58 d = self.storage.get_node(node_id)
65 59 d.addCallback(lambda node: node.get_type())
66 def get_node_type(node_id): 60 return d
67 """ Return type of a node.
68
69 @return: a deferred that returns either 'leaf' or 'collection'
70 """
71 61
72 def get_nodes(self): 62 def get_nodes(self):
73 """ Returns list of all nodes. 63 return self.storage.get_node_ids()
74 64
75 @return: a deferred that returns a C{list} of node ids. 65 def get_node_meta_data(self, node_id):
76 """ 66 d = self.storage.get_node(node_id)
77 67 d.addCallback(lambda node: node.get_meta_data())
78 def get_node_meta_data(node_id): 68 d.addCallback(self._make_meta_data)
79 """ Return meta data for a node. 69 return d
80 70
81 @return: a deferred that returns a C{list} of C{dict}s with the 71 def _make_meta_data(self, meta_data):
82 metadata. 72 options = []
83 """ 73 for key, value in meta_data.iteritems():
84 74 if self.options.has_key(key):
85 class INodeCreationService(Interface): 75 option = {"var": key}
86 """ A service for creating nodes """ 76 option.update(self.options[key])
87 77 option["value"] = value
88 def create_node(node_id, requestor): 78 options.append(option)
89 """ Create a node. 79
90 80 return options
91 @return: a deferred that fires when the node has been created. 81
92 """ 82 def _check_auth(self, node, requestor):
93 83 def check(affiliation, node):
94 class INodeDeletionService(Interface): 84 if affiliation not in ['owner', 'publisher']:
95 """ A service for deleting nodes. """ 85 raise error.Forbidden()
96 86 return node
97 def register_pre_delete(pre_delete_fn): 87
98 """ Register a callback that is called just before a node deletion. 88 d = node.get_affiliation(requestor)
99 89 d.addCallback(check, node)
100 The function C{pre_deleted_fn} is added to a list of functions 90 return d
101 to be called just before deletion of a node. The callback 91
102 C{pre_delete_fn} is called with the C{node_id} that is about to be 92 def publish(self, node_id, items, requestor):
103 deleted and should return a deferred that returns a list of deferreds 93 d = self.storage.get_node(node_id)
104 that are to be fired after deletion. The backend collects the lists 94 d.addCallback(self._check_auth, requestor)
105 from all these callbacks before actually deleting the node in question. 95 d.addCallback(self._do_publish, items, requestor)
106 After deletion all collected deferreds are fired to do post-processing. 96 return d
107 97
108 The idea is that you want to be able to collect data from the 98 def _do_publish(self, node, items, requestor):
109 node before deleting it, for example to get a list of subscribers 99 configuration = node.get_configuration()
110 that have to be notified after the node has been deleted. To do this, 100 persist_items = configuration["pubsub#persist_items"]
111 C{pre_delete_fn} fetches the subscriber list and passes this 101 deliver_payloads = configuration["pubsub#deliver_payloads"]
112 list to a callback attached to a deferred that it sets up. This 102
113 deferred is returned in the list of deferreds. 103 if items and not persist_items and not deliver_payloads:
114 """ 104 raise error.ItemForbidden()
115 105 elif not items and (persist_items or deliver_payloads):
116 def get_subscribers(node_id): 106 raise error.ItemRequired()
117 """ Get node subscriber list. 107
118 108 if persist_items or deliver_payloads:
119 @return: a deferred that fires with the list of subscribers. 109 for item in items:
120 """ 110 if not item.getAttribute("id"):
121 111 item["id"] = uuid.generate()
122 def delete_node(node_id, requestor): 112
123 """ Delete a node. 113 if persist_items:
124 114 d = node.store_items(items, requestor)
125 @return: a deferred that fires when the node has been deleted. 115 else:
126 """ 116 d = defer.succeed(None)
127 117
128 class IPublishService(Interface): 118 d.addCallback(self._do_notify, node.id, items, deliver_payloads)
129 """ A service for publishing items to a node. """ 119 return d
130 120
131 def publish(node_id, items, requestor): 121 def _do_notify(self, result, node_id, items, deliver_payloads):
132 """ Publish items to a pubsub node. 122 if items and not deliver_payloads:
133 123 for item in items:
134 @return: a deferred that fires when the items have been published. 124 item.children = []
135 """ 125
136 class INotificationService(Interface): 126 self.dispatch({'items': items, 'node_id': node_id},
137 """ A service for notification of published items. """ 127 '//event/pubsub/notify')
138 128
139 def register_notifier(observerfn, *args, **kwargs): 129 def get_notification_list(self, node_id, items):
140 """ Register callback which is called for notification. """ 130 d = self.storage.get_node(node_id)
141 131 d.addCallback(lambda node: node.get_subscribers())
142 def get_notification_list(node_id, items): 132 d.addCallback(self._magic_filter, node_id, items)
143 pass 133 return d
144 134
145 class ISubscriptionService(Interface): 135 def _magic_filter(self, subscribers, node_id, items):
146 """ A service for managing subscriptions. """ 136 list = []
147 137 for subscriber in subscribers:
148 def subscribe(node_id, subscriber, requestor): 138 list.append((subscriber, items))
149 """ Request the subscription of an entity to a pubsub node. 139 return list
150 140
151 Depending on the node's configuration and possible business rules, the 141 def register_notifier(self, observerfn, *args, **kwargs):
152 C{subscriber} is added to the list of subscriptions of the node with id 142 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
153 C{node_id}. The C{subscriber} might be different from the C{requestor}, 143
154 and if the C{requestor} is not allowed to subscribe this entity an 144 def subscribe(self, node_id, subscriber, requestor):
155 exception should be raised. 145 subscriber_entity = subscriber.userhostJID()
156 146 if subscriber_entity != requestor:
157 @return: a deferred that returns the subscription state 147 return defer.fail(error.Forbidden())
158 """ 148
159 149 d = self.storage.get_node(node_id)
160 def unsubscribe(node_id, subscriber, requestor): 150 d.addCallback(_get_affiliation, subscriber_entity)
161 """ Cancel the subscription of an entity to a pubsub node. 151 d.addCallback(self._do_subscribe, subscriber)
162 152 return d
163 The subscription of C{subscriber} is removed from the list of 153
164 subscriptions of the node with id C{node_id}. If the C{requestor} 154 def _do_subscribe(self, result, subscriber):
165 is not allowed to unsubscribe C{subscriber}, an an exception should 155 node, affiliation = result
166 be raised. 156
167 157 if affiliation == 'outcast':
168 @return: a deferred that fires when unsubscription is complete. 158 raise error.Forbidden()
169 """ 159
170 160 d = node.add_subscription(subscriber, 'subscribed')
171 def get_subscriptions(entity): 161 d.addCallback(lambda _: 'subscribed')
172 """ Report the list of current subscriptions with this pubsub service. 162 d.addErrback(self._get_subscription, node, subscriber)
173 163 d.addCallback(self._return_subscription, node.id)
174 Report the list of the current subscriptions with all nodes within this 164 return d
175 pubsub service, for the C{entity}. 165
176 166 def _get_subscription(self, failure, node, subscriber):
177 @return: a deferred that returns the list of all current subscriptions 167 failure.trap(error.SubscriptionExists)
178 as tuples C{(node_id, subscriber, subscription)}. 168 return node.get_subscription(subscriber)
179 """ 169
180 170 def _return_subscription(self, result, node_id):
181 class IAffiliationsService(Interface): 171 return node_id, result
182 """ A service for retrieving the affiliations with this pubsub service. """ 172
183 173 def unsubscribe(self, node_id, subscriber, requestor):
184 def get_affiliations(entity): 174 if subscriber.userhostJID() != requestor:
185 """ Report the list of current affiliations with this pubsub service. 175 return defer.fail(error.Forbidden())
186 176
187 Report the list of the current affiliations with all nodes within this 177 d = self.storage.get_node(node_id)
188 pubsub service, for the C{entity}. 178 d.addCallback(lambda node: node.remove_subscription(subscriber))
189 179 return d
190 @return: a deferred that returns the list of all current affiliations 180
191 as tuples C{(node_id, affiliation)}. 181 def get_subscriptions(self, entity):
192 """ 182 return self.storage.get_subscriptions(entity)
193 183
194 class IRetractionService(Interface): 184 def supports_instant_nodes(self):
195 """ A service for retracting published items """ 185 return True
196 186
197 def retract_item(node_id, item_id, requestor): 187 def create_node(self, node_id, requestor):
198 """ Removes item in node from persistent storage """ 188 if not node_id:
199 189 node_id = 'generic/%s' % uuid.generate()
200 def purge_node(node_id, requestor): 190 d = self.storage.create_node(node_id, requestor)
201 """ Removes all items in node from persistent storage """ 191 d.addCallback(lambda _: node_id)
202 192 return d
203 class IItemRetrievalService(Interface): 193
204 """ A service for retrieving previously published items. """ 194 def get_default_configuration(self):
205 195 d = defer.succeed(self.default_config)
206 def get_items(node_id, requestor, max_items=None, item_ids=[]): 196 d.addCallback(self._make_config)
207 """ Retrieve items from persistent storage 197 return d
208 198
209 If C{max_items} is given, return the C{max_items} last published 199 def get_node_configuration(self, node_id):
210 items, else if C{item_ids} is not empty, return the items requested. 200 if not node_id:
211 If neither is given, return all items. 201 raise error.NoRootNode()
212 202
213 @return: a deferred that returns the requested items 203 d = self.storage.get_node(node_id)
214 """ 204 d.addCallback(lambda node: node.get_configuration())
205
206 d.addCallback(self._make_config)
207 return d
208
209 def _make_config(self, config):
210 options = []
211 for key, value in self.options.iteritems():
212 option = {"var": key}
213 option.update(value)
214 if config.has_key(key):
215 option["value"] = config[key]
216 options.append(option)
217
218 return options
219
220 def set_node_configuration(self, node_id, options, requestor):
221 if not node_id:
222 raise error.NoRootNode()
223
224 for key, value in options.iteritems():
225 if not self.options.has_key(key):
226 raise error.InvalidConfigurationOption()
227 if self.options[key]["type"] == 'boolean':
228 try:
229 options[key] = bool(int(value))
230 except ValueError:
231 raise error.InvalidConfigurationValue()
232
233 d = self.storage.get_node(node_id)
234 d.addCallback(_get_affiliation, requestor)
235 d.addCallback(self._do_set_node_configuration, options)
236 return d
237
238 def _do_set_node_configuration(self, result, options):
239 node, affiliation = result
240
241 if affiliation != 'owner':
242 raise error.Forbidden()
243
244 return node.set_configuration(options)
245
246 def get_affiliations(self, entity):
247 return self.storage.get_affiliations(entity)
248
249 def get_items(self, node_id, requestor, max_items=None, item_ids=[]):
250 d = self.storage.get_node(node_id)
251 d.addCallback(_get_affiliation, requestor)
252 d.addCallback(self._do_get_items, max_items, item_ids)
253 return d
254
255 def _do_get_items(self, result, max_items, item_ids):
256 node, affiliation = result
257
258 if affiliation == 'outcast':
259 raise error.Forbidden()
260
261 if item_ids:
262 return node.get_items_by_id(item_ids)
263 else:
264 return node.get_items(max_items)
265
266 def retract_item(self, node_id, item_ids, requestor):
267 d = self.storage.get_node(node_id)
268 d.addCallback(_get_affiliation, requestor)
269 d.addCallback(self._do_retract, item_ids)
270 return d
271
272 def _do_retract(self, result, item_ids):
273 node, affiliation = result
274 persist_items = node.get_configuration()["pubsub#persist_items"]
275
276 if affiliation not in ['owner', 'publisher']:
277 raise error.Forbidden()
278
279 if not persist_items:
280 raise error.NodeNotPersistent()
281
282 d = node.remove_items(item_ids)
283 d.addCallback(self._do_notify_retraction, node.id)
284 return d
285
286 def _do_notify_retraction(self, item_ids, node_id):
287 self.dispatch({ 'item_ids': item_ids, 'node_id': node_id },
288 '//event/pubsub/retract')
289
290 def purge_node(self, node_id, requestor):
291 d = self.storage.get_node(node_id)
292 d.addCallback(_get_affiliation, requestor)
293 d.addCallback(self._do_purge)
294 return d
295
296 def _do_purge(self, result):
297 node, affiliation = result
298 persist_items = node.get_configuration()["pubsub#persist_items"]
299
300 if affiliation != 'owner':
301 raise error.Forbidden()
302
303 if not persist_items:
304 raise error.NodeNotPersistent()
305
306 d = node.purge()
307 d.addCallback(self._do_notify_purge, node.id)
308 return d
309
310 def _do_notify_purge(self, result, node_id):
311 self.dispatch(node_id, '//event/pubsub/purge')
312
313 def register_pre_delete(self, pre_delete_fn):
314 self._callback_list.append(pre_delete_fn)
315
316 def get_subscribers(self, node_id):
317 d = self.storage.get_node(node_id)
318 d.addCallback(lambda node: node.get_subscribers())
319 return d
320
321 def delete_node(self, node_id, requestor):
322 d = self.storage.get_node(node_id)
323 d.addCallback(_get_affiliation, requestor)
324 d.addCallback(self._do_pre_delete)
325 return d
326
327 def _do_pre_delete(self, result):
328 node, affiliation = result
329
330 if affiliation != 'owner':
331 raise error.Forbidden()
332
333 d = defer.DeferredList([cb(node.id) for cb in self._callback_list],
334 consumeErrors=1)
335 d.addCallback(self._do_delete, node.id)
336
337 def _do_delete(self, result, node_id):
338 dl = []
339 for succeeded, r in result:
340 if succeeded and r:
341 dl.extend(r)
342
343 d = self.storage.delete_node(node_id)
344 d.addCallback(self._do_notify_delete, dl)
345
346 return d
347
348 def _do_notify_delete(self, result, dl):
349 for d in dl:
350 d.callback(None)
351
352
353 class PubSubServiceFromBackend(PubSubService):
354 """
355 Adapts a backend to an xmpp publish-subscribe service.
356 """
357
358 implements(IDisco)
359
360 _errorMap = {
361 error.NodeNotFound: ('item-not-found', None, None),
362 error.NodeExists: ('conflict', None, None),
363 error.SubscriptionNotFound: ('not-authorized',
364 'not-subscribed',
365 None),
366 error.Forbidden: ('forbidden', None, None),
367 error.ItemForbidden: ('bad-request', 'item-forbidden', None),
368 error.ItemRequired: ('bad-request', 'item-required', None),
369 error.NoInstantNodes: ('not-acceptable',
370 'unsupported',
371 'instant-nodes'),
372 error.NotSubscribed: ('not-authorized', 'not-subscribed', None),
373 error.InvalidConfigurationOption: ('not-acceptable', None, None),
374 error.InvalidConfigurationValue: ('not-acceptable', None, None),
375 error.NodeNotPersistent: ('feature-not-implemented',
376 'unsupported',
377 'persistent-node'),
378 error.NoRootNode: ('bad-request', None, None),
379 }
380
381 def __init__(self, backend):
382 PubSubService.__init__(self)
383
384 self.backend = backend
385 self.hideNodes = False
386
387 self.pubSubFeatures = self._getPubSubFeatures()
388
389 self.backend.register_notifier(self._notify)
390
391 def _getPubSubFeatures(self):
392 features = [
393 "config-node",
394 "create-nodes",
395 "delete-any",
396 "delete-nodes",
397 "item-ids",
398 "meta-data",
399 "publish",
400 "purge-nodes",
401 "retract-items",
402 "retrieve-affiliations",
403 "retrieve-default",
404 "retrieve-items",
405 "retrieve-subscriptions",
406 "subscribe",
407 ]
408
409 if self.backend.supports_instant_nodes():
410 features.append("instant-nodes")
411
412 if self.backend.supports_outcast_affiliation():
413 features.append("outcast-affiliation")
414
415 if self.backend.supports_persistent_items():
416 features.append("persistent-items")
417
418 if self.backend.supports_publisher_affiliation():
419 features.append("publisher-affiliation")
420
421 return features
422
423 def _notify(self, data):
424 items = data['items']
425 nodeIdentifier = data['node_id']
426 d = self.backend.get_notification_list(nodeIdentifier, items)
427 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID,
428 nodeIdentifier,
429 notifications))
430
431 def _mapErrors(self, failure):
432 e = failure.trap(*self._errorMap.keys())
433
434 condition, pubsubCondition, feature = self._errorMap[e]
435 msg = failure.value.msg
436
437 if pubsubCondition:
438 exc = PubSubError(condition, pubsubCondition, feature, msg)
439 else:
440 exc = StanzaError(condition, text=msg)
441
442 raise exc
443
444 def getNodeInfo(self, requestor, service, nodeIdentifier):
445 info = {}
446
447 def saveType(result):
448 info['type'] = result
449 return nodeIdentifier
450
451 def saveMetaData(result):
452 info['meta-data'] = result
453 return info
454
455 d = defer.succeed(nodeIdentifier)
456 d.addCallback(self.backend.get_node_type)
457 d.addCallback(saveType)
458 d.addCallback(self.backend.get_node_meta_data)
459 d.addCallback(saveMetaData)
460 d.errback(self._mapErrors)
461 return d
462
463 def getNodes(self, requestor, service):
464 d = self.backend.get_nodes()
465 return d.addErrback(self._mapErrors)
466
467 def publish(self, requestor, service, nodeIdentifier, items):
468 d = self.backend.publish(nodeIdentifier, items, requestor)
469 return d.addErrback(self._mapErrors)
470
471 def subscribe(self, requestor, service, nodeIdentifier, subscriber):
472 d = self.backend.subscribe(nodeIdentifier, subscriber, requestor)
473 return d.addErrback(self._mapErrors)
474
475 def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
476 d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor)
477 return d.addErrback(self._mapErrors)
478
479 def subscriptions(self, requestor, service):
480 d = self.backend.get_subscriptions(requestor)
481 return d.addErrback(self._mapErrors)
482
483 def affiliations(self, requestor, service):
484 d = self.backend.get_affiliations(requestor)
485 return d.addErrback(self._mapErrors)
486
487 def create(self, requestor, service, nodeIdentifier):
488 d = self.backend.create_node(nodeIdentifier, requestor)
489 return d.addErrback(self._mapErrors)
490
491 def getDefaultConfiguration(self, requestor, service):
492 d = self.backend.get_default_configuration()
493 return d.addErrback(self._mapErrors)
494
495 def getConfiguration(self, requestor, service, nodeIdentifier):
496 d = self.backend.get_node_configuration(nodeIdentifier)
497 return d.addErrback(self._mapErrors)
498
499 def setConfiguration(self, requestor, service, nodeIdentifier, options):
500 d = self.backend.set_node_configuration(nodeIdentifier, options,
501 requestor)
502 return d.addErrback(self._mapErrors)
503
504 def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers):
505 d = self.backend.get_items(nodeIdentifier, requestor, maxItems,
506 itemIdentifiers)
507 return d.addErrback(self._mapErrors)
508
509 def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
510 d = self.backend.retract_item(nodeIdentifier, itemIdentifiers,
511 requestor)
512 return d.addErrback(self._mapErrors)
513
514 def purge(self, requestor, service, nodeIdentifier):
515 d = self.backend.purge_node(nodeIdentifier, requestor)
516 return d.addErrback(self._mapErrors)
517
518 def delete(self, requestor, service, nodeIdentifier):
519 d = self.backend.delete_node(nodeIdentifier, requestor)
520 return d.addErrback(self._mapErrors)
521
522 components.registerAdapter(PubSubServiceFromBackend,
523 IBackendService,
524 IPubSubService)