comparison idavoll/backend.py @ 108:1c18759d2afb

Moved two errors to storage.py. Moved generic backend implementation to generic_backend.py. Added two configuration errors.
author Ralph Meijer <ralphm@ik.nu>
date Fri, 08 Apr 2005 10:16:08 +0000
parents dc36882d2620
children 753b8432460f
comparison
equal deleted inserted replaced
107:d252d793f0ed 108:1c18759d2afb
1 from twisted.words.protocols.jabber import jid 1 from zope.interface import Interface
2 from twisted.application import service 2 import storage
3 from twisted.xish import utility
4 from twisted.internet import defer
5 from zope.interface import Interface, implements
6 import sha
7 import time
8 3
9 class Error(Exception): 4 class Error(Exception):
10 msg = '' 5 msg = ''
11 6
12 def __str__(self): 7 def __str__(self):
13 return self.msg 8 return self.msg
14 9
15 class NodeNotFound(Error):
16 msg = 'Node not found'
17
18 class NotAuthorized(Error): 10 class NotAuthorized(Error):
19 pass 11 pass
20 12
21 class PayloadExpected(Error): 13 class PayloadExpected(Error):
22 msg = 'Payload expected' 14 msg = 'Payload expected'
25 msg = 'No payload allowed' 17 msg = 'No payload allowed'
26 18
27 class NoInstantNodes(Error): 19 class NoInstantNodes(Error):
28 pass 20 pass
29 21
30 class NodeExists(Error):
31 pass
32
33 class NotImplemented(Error): 22 class NotImplemented(Error):
34 pass 23 pass
35 24
36 class NotSubscribed(Error): 25 class NotSubscribed(Error):
37 pass 26 pass
38 27
28 class InvalidConfigurationOption(Error):
29 msg = 'Invalid configuration option'
30
31 class InvalidConfigurationValue(Error):
32 msg = 'Bad configuration value'
33
39 class IBackendService(Interface): 34 class IBackendService(Interface):
40 """ Interface to a backend service of a pubsub service. """ 35 """ Interface to a backend service of a pubsub service. """
41 36
42 def get_supported_affiliations(self): 37 def __init__(self, storage):
43 """ Reports the list of supported affiliation types. 38 """
39 @param storage: L{storage} object.
40 """
41
42 def supports_publisher_affiliation(self):
43 """ Reports if the backend supports the publisher affiliation.
44 44
45 @return: a list of supported affiliation types. 45 @rtype: C{bool}
46 """
47
48 def supports_outcast_affiliation(self):
49 """ Reports if the backend supports the publisher affiliation.
50
51 @rtype: C{bool}
52 """
53
54 def supports_persistent_items(self):
55 """ Reports if the backend supports persistent items.
56
57 @rtype: C{bool}
58 """
59
60 def get_node_type(self, node_id):
61 """ Return type of a node.
62
63 @return: a deferred that returns either 'leaf' or 'collection'
64 """
65
66 def get_nodes(self):
67 """ Returns list of all nodes.
68
69 @return: a deferred that returns a C{list} of node ids.
70 """
71
72 def get_node_meta_data(self, node_id):
73 """ Return meta data for a node.
74
75 @return: a deferred that returns a C{list} of C{dict}s with the
76 metadata.
46 """ 77 """
47 78
48 class INodeCreationService(Interface): 79 class INodeCreationService(Interface):
49 """ A service for creating nodes """ 80 """ A service for creating nodes """
50 81
164 items, else if C{item_ids} is not empty, return the items requested. 195 items, else if C{item_ids} is not empty, return the items requested.
165 If neither is given, return all items. 196 If neither is given, return all items.
166 197
167 @return: a deferred that returns the requested items 198 @return: a deferred that returns the requested items
168 """ 199 """
169
170 class BackendService(service.MultiService, utility.EventDispatcher):
171
172 implements(IBackendService)
173
174 options = {"pubsub#persist_items":
175 {"type": "boolean",
176 "label": "Persist items to storage"},
177 "pubsub#deliver_payloads":
178 {"type": "boolean",
179 "label": "Deliver payloads with event notifications"},
180 }
181
182 default_config = {"pubsub#persist_items": True,
183 "pubsub#deliver_payloads": True,
184 }
185
186 def __init__(self, storage):
187 service.MultiService.__init__(self)
188 utility.EventDispatcher.__init__(self)
189 self.storage = storage
190
191 def supports_publisher_affiliation(self):
192 return True
193
194 def supports_outcast_affiliation(self):
195 return True
196
197 def supports_persistent_items(self):
198 return True
199
200 def get_node_type(self, node_id):
201 return self.storage.get_node_type(node_id)
202
203 def get_nodes(self):
204 return self.storage.get_nodes()
205
206 def get_node_meta_data(self, node_id):
207 d = self.storage.get_node_configuration(node_id)
208
209 d.addCallback(self._make_meta_data)
210 return d
211
212 def _make_meta_data(self, meta_data):
213 options = []
214 for key, value in meta_data.iteritems():
215 if self.options.has_key(key):
216 option = {"var": key}
217 option.update(self.options[key])
218 option["value"] = value
219 options.append(option)
220
221 return options
222
223 class PublishService(service.Service):
224
225 implements(IPublishService)
226
227 def publish(self, node_id, items, requestor):
228 d1 = self.parent.storage.get_node_configuration(node_id)
229 d2 = self.parent.storage.get_affiliation(node_id, requestor)
230 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1)
231 d.addErrback(lambda x: x.value[0])
232 d.addCallback(self._do_publish, node_id, items, requestor)
233 return d
234
235 def _do_publish(self, result, node_id, items, requestor):
236 configuration = result[0][1]
237 persist_items = configuration["pubsub#persist_items"]
238 deliver_payloads = configuration["pubsub#deliver_payloads"]
239 affiliation = result[1][1]
240
241 if affiliation not in ['owner', 'publisher']:
242 raise NotAuthorized
243
244 if items and not persist_items and not deliver_payloads:
245 raise NoPayloadAllowed
246 elif not items and (persist_items or deliver_payloads):
247 raise PayloadExpected
248
249 if persist_items or deliver_payloads:
250 for item in items:
251 if not item.getAttribute("id"):
252 item["id"] = sha.new(str(time.time()) +
253 requestor.full()).hexdigest()
254
255 if persist_items:
256 d = self.parent.storage.store_items(node_id, items,
257 requestor)
258 else:
259 d = defer.succeed(None)
260
261 d.addCallback(self._do_notify, node_id, items, deliver_payloads)
262
263 def _do_notify(self, result, node_id, items, deliver_payloads):
264 if items and not deliver_payloads:
265 for item in items:
266 item.children = []
267
268 self.parent.dispatch({ 'items': items, 'node_id': node_id },
269 '//event/pubsub/notify')
270
271 class NotificationService(service.Service):
272
273 implements(INotificationService)
274
275 def get_notification_list(self, node_id, items):
276 d = self.parent.storage.get_subscribers(node_id)
277 d.addCallback(self._magic_filter, node_id, items)
278 return d
279
280 def _magic_filter(self, subscribers, node_id, items):
281 list = {}
282 for subscriber in subscribers:
283 list[subscriber] = items
284 return list
285
286 def register_notifier(self, observerfn, *args, **kwargs):
287 self.parent.addObserver('//event/pubsub/notify', observerfn,
288 *args, **kwargs)
289
290 class SubscriptionService(service.Service):
291
292 implements(ISubscriptionService)
293
294 def subscribe(self, node_id, subscriber, requestor):
295 if subscriber.userhostJID() != requestor:
296 raise NotAuthorized
297
298 d1 = self.parent.storage.get_node_configuration(node_id)
299 d2 = self.parent.storage.get_affiliation(node_id, subscriber)
300 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1)
301 d.addErrback(lambda x: x.value[0])
302 d.addCallback(self._do_subscribe, node_id, subscriber)
303 return d
304
305 def _do_subscribe(self, result, node_id, subscriber):
306 configuration = result[0][1]
307 affiliation = result[1][1]
308
309 if affiliation == 'outcast':
310 raise NotAuthorized
311
312 d = self.parent.storage.add_subscription(node_id, subscriber,
313 'subscribed')
314 d.addCallback(self._return_subscription, affiliation)
315 return d
316
317 def _return_subscription(self, result, affiliation):
318 result['affiliation'] = affiliation
319 return result
320
321 def unsubscribe(self, node_id, subscriber, requestor):
322 if subscriber.userhostJID() != requestor:
323 raise NotAuthorized
324
325 d = self.parent.storage.get_node_configuration(node_id)
326 d.addCallback(self._do_unsubscribe, node_id, subscriber)
327 return d
328
329 def _do_unsubscribe(self, result, node_id, subscriber):
330 return self.parent.storage.remove_subscription(node_id,
331 subscriber)
332
333 class NodeCreationService(service.Service):
334
335 implements(INodeCreationService)
336
337 def supports_instant_nodes(self):
338 return True
339
340 def create_node(self, node_id, requestor):
341 if not node_id:
342 node_id = 'generic/%s' % sha.new(str(time.time()) +
343 requestor.full()).hexdigest()
344
345 d = self.parent.storage.create_node(node_id, requestor)
346 d.addCallback(lambda _: node_id)
347 return d
348
349 def get_node_configuration(self, node_id):
350 if node_id:
351 d = self.parent.storage.get_node_configuration(node_id)
352 else:
353 # XXX: this is disabled in pubsub.py
354 d = defer.succeed(self.parent.default_config)
355
356 d.addCallback(self._make_config)
357 return d
358
359 def _make_config(self, config):
360 options = []
361 for key, value in self.parent.options.iteritems():
362 option = {"var": key}
363 option.update(value)
364 if config.has_key(key):
365 option["value"] = config[key]
366 options.append(option)
367
368 return options
369
370 def set_node_configuration(self, node_id, options, requestor):
371 for key in options.iterkeys():
372 if not self.parent.options.has_key(key):
373 raise InvalidConfigurationOption
374
375 d = self.parent.storage.get_affiliation(node_id, requestor)
376 d.addCallback(self._do_set_node_configuration, node_id, options)
377 return d
378
379 def _do_set_node_configuration(self, affiliation, node_id, options):
380 if affiliation != 'owner':
381 raise NotAuthorized
382
383 return self.parent.storage.set_node_configuration(node_id, options)
384
385 class AffiliationsService(service.Service):
386
387 implements(IAffiliationsService)
388
389 def get_affiliations(self, entity):
390 d1 = self.parent.storage.get_affiliations(entity)
391 d2 = self.parent.storage.get_subscriptions(entity)
392 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1)
393 d.addErrback(lambda x: x.value[0])
394 d.addCallback(self._affiliations_result, entity)
395 return d
396
397 def _affiliations_result(self, result, entity):
398 affiliations = result[0][1]
399 subscriptions = result[1][1]
400
401 new_affiliations = {}
402
403 for node, affiliation in affiliations:
404 new_affiliations[(node, entity.full())] = {'node': node,
405 'jid': entity,
406 'affiliation': affiliation,
407 'subscription': None
408 }
409
410 for node, subscriber, subscription in subscriptions:
411 key = node, subscriber.full()
412 if new_affiliations.has_key(key):
413 new_affiliations[key]['subscription'] = subscription
414 else:
415 new_affiliations[key] = {'node': node,
416 'jid': subscriber,
417 'affiliation': None,
418 'subscription': subscription}
419
420 return new_affiliations.values()
421
422 class ItemRetrievalService(service.Service):
423
424 implements(IItemRetrievalService)
425
426 def get_items(self, node_id, requestor, max_items=None, item_ids=[]):
427 d = self.parent.storage.is_subscribed(node_id, requestor)
428 d.addCallback(self._do_get_items, node_id, max_items, item_ids)
429 return d
430
431 def _do_get_items(self, result, node_id, max_items, item_ids):
432 if not result:
433 raise NotAuthorized
434
435 if item_ids:
436 return self.parent.storage.get_items_by_ids(node_id, item_ids)
437 else:
438 return self.parent.storage.get_items(node_id, max_items)
439
440 class RetractionService(service.Service):
441
442 implements(IRetractionService)
443
444 def retract_item(self, node_id, item_ids, requestor):
445 d1 = self.parent.storage.get_node_configuration(node_id)
446 d2 = self.parent.storage.get_affiliation(node_id, requestor)
447 d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
448 d.addErrback(lambda x: x.value[0])
449 d.addCallback(self._do_retract, node_id, item_ids)
450 return d
451
452 def _do_retract(self, result, node_id, item_ids):
453 configuration = result[0][1]
454 persist_items = configuration["persist_items"]
455 affiliation = result[1][1]
456
457 if affiliation not in ['owner', 'publisher']:
458 raise NotAuthorized
459
460 if not persist_items:
461 raise NodeNotPersistent
462
463 d = self.parent.storage.remove_items(node_id, item_ids)
464 d.addCallback(self._do_notify_retraction, node_id)
465 return d
466
467 def _do_notify_retraction(self, result, node_id):
468 self.parent.dispatch({ 'item_ids': result, 'node_id': node_id },
469 '//event/pubsub/retract')
470
471 def purge_node(self, node_id, requestor):
472 d1 = self.parent.storage.get_node_configuration(node_id)
473 d2 = self.parent.storage.get_affiliation(node_id, requestor)
474 d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
475 d.addErrback(lambda x: x.value[0])
476 d.addCallback(self._do_purge, node_id)
477 return d
478
479 def _do_purge(self, result, node_id):
480 configuration = result[0][1]
481 persist_items = configuration["persist_items"]
482 affiliation = result[1][1]
483
484 if affiliation != 'owner':
485 raise NotAuthorized
486
487 if not persist_items:
488 raise NodeNotPersistent
489
490 d = self.parent.storage.purge_node(node_id)
491 d.addCallback(self._do_notify_purge, node_id)
492 return d
493
494 def _do_notify_purge(self, result, node_id):
495 self.parent.dispatch(node_id, '//event/pubsub/purge')
496
497 class NodeDeletionService(service.Service):
498
499 implements(INodeDeletionService)
500
501 def __init__(self):
502 self._callback_list = []
503
504 def register_pre_delete(self, pre_delete_fn):
505 self._callback_list.append(pre_delete_fn)
506
507 def get_subscribers(self, node_id):
508 return self.parent.storage.get_subscribers(node_id)
509
510 def delete_node(self, node_id, requestor):
511 d1 = self.parent.storage.get_node_configuration(node_id)
512 d2 = self.parent.storage.get_affiliation(node_id, requestor)
513 d = defer.DeferredList([d1, d2], fireOnOneErrback=1)
514 d.addErrback(lambda x: x.value[0])
515 d.addCallback(self._do_pre_delete, node_id)
516 return d
517
518 def _do_pre_delete(self, result, node_id):
519 configuration = result[0][1]
520 persist_items = configuration["persist_items"]
521 affiliation = result[1][1]
522
523 if affiliation != 'owner':
524 raise NotAuthorized
525
526 d = defer.DeferredList([cb(node_id) for cb in self._callback_list],
527 consumeErrors=1)
528 d.addCallback(self._do_delete, node_id)
529
530 def _do_delete(self, result, node_id):
531 dl = []
532 for succeeded, r in result:
533 if succeeded and r:
534 dl.extend(r)
535
536 d = self.parent.storage.delete_node(node_id)
537 d.addCallback(self._do_notify_delete, dl)
538
539 return d
540
541 def _do_notify_delete(self, result, dl):
542 for d in dl:
543 d.callback(None)