Mercurial > libervia-pubsub
comparison idavoll/backend.py @ 108:1c18759d2afb
Moved two errors to storage.py.
Moved generic backend implementation to generic_backend.py.
Added two configuration errors.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Fri, 08 Apr 2005 10:16:08 +0000 |
parents | dc36882d2620 |
children | 753b8432460f |
comparison
equal
deleted
inserted
replaced
107:d252d793f0ed | 108:1c18759d2afb |
---|---|
1 from twisted.words.protocols.jabber import jid | 1 from zope.interface import Interface |
2 from twisted.application import service | 2 import storage |
3 from twisted.xish import utility | |
4 from twisted.internet import defer | |
5 from zope.interface import Interface, implements | |
6 import sha | |
7 import time | |
8 | 3 |
9 class Error(Exception): | 4 class Error(Exception): |
10 msg = '' | 5 msg = '' |
11 | 6 |
12 def __str__(self): | 7 def __str__(self): |
13 return self.msg | 8 return self.msg |
14 | 9 |
15 class NodeNotFound(Error): | |
16 msg = 'Node not found' | |
17 | |
18 class NotAuthorized(Error): | 10 class NotAuthorized(Error): |
19 pass | 11 pass |
20 | 12 |
21 class PayloadExpected(Error): | 13 class PayloadExpected(Error): |
22 msg = 'Payload expected' | 14 msg = 'Payload expected' |
25 msg = 'No payload allowed' | 17 msg = 'No payload allowed' |
26 | 18 |
27 class NoInstantNodes(Error): | 19 class NoInstantNodes(Error): |
28 pass | 20 pass |
29 | 21 |
30 class NodeExists(Error): | |
31 pass | |
32 | |
33 class NotImplemented(Error): | 22 class NotImplemented(Error): |
34 pass | 23 pass |
35 | 24 |
36 class NotSubscribed(Error): | 25 class NotSubscribed(Error): |
37 pass | 26 pass |
38 | 27 |
28 class InvalidConfigurationOption(Error): | |
29 msg = 'Invalid configuration option' | |
30 | |
31 class InvalidConfigurationValue(Error): | |
32 msg = 'Bad configuration value' | |
33 | |
39 class IBackendService(Interface): | 34 class IBackendService(Interface): |
40 """ Interface to a backend service of a pubsub service. """ | 35 """ Interface to a backend service of a pubsub service. """ |
41 | 36 |
42 def get_supported_affiliations(self): | 37 def __init__(self, storage): |
43 """ Reports the list of supported affiliation types. | 38 """ |
39 @param storage: L{storage} object. | |
40 """ | |
41 | |
42 def supports_publisher_affiliation(self): | |
43 """ Reports if the backend supports the publisher affiliation. | |
44 | 44 |
45 @return: a list of supported affiliation types. | 45 @rtype: C{bool} |
46 """ | |
47 | |
48 def supports_outcast_affiliation(self): | |
49 """ Reports if the backend supports the publisher affiliation. | |
50 | |
51 @rtype: C{bool} | |
52 """ | |
53 | |
54 def supports_persistent_items(self): | |
55 """ Reports if the backend supports persistent items. | |
56 | |
57 @rtype: C{bool} | |
58 """ | |
59 | |
60 def get_node_type(self, node_id): | |
61 """ Return type of a node. | |
62 | |
63 @return: a deferred that returns either 'leaf' or 'collection' | |
64 """ | |
65 | |
66 def get_nodes(self): | |
67 """ Returns list of all nodes. | |
68 | |
69 @return: a deferred that returns a C{list} of node ids. | |
70 """ | |
71 | |
72 def get_node_meta_data(self, node_id): | |
73 """ Return meta data for a node. | |
74 | |
75 @return: a deferred that returns a C{list} of C{dict}s with the | |
76 metadata. | |
46 """ | 77 """ |
47 | 78 |
48 class INodeCreationService(Interface): | 79 class INodeCreationService(Interface): |
49 """ A service for creating nodes """ | 80 """ A service for creating nodes """ |
50 | 81 |
164 items, else if C{item_ids} is not empty, return the items requested. | 195 items, else if C{item_ids} is not empty, return the items requested. |
165 If neither is given, return all items. | 196 If neither is given, return all items. |
166 | 197 |
167 @return: a deferred that returns the requested items | 198 @return: a deferred that returns the requested items |
168 """ | 199 """ |
169 | |
170 class BackendService(service.MultiService, utility.EventDispatcher): | |
171 | |
172 implements(IBackendService) | |
173 | |
174 options = {"pubsub#persist_items": | |
175 {"type": "boolean", | |
176 "label": "Persist items to storage"}, | |
177 "pubsub#deliver_payloads": | |
178 {"type": "boolean", | |
179 "label": "Deliver payloads with event notifications"}, | |
180 } | |
181 | |
182 default_config = {"pubsub#persist_items": True, | |
183 "pubsub#deliver_payloads": True, | |
184 } | |
185 | |
186 def __init__(self, storage): | |
187 service.MultiService.__init__(self) | |
188 utility.EventDispatcher.__init__(self) | |
189 self.storage = storage | |
190 | |
191 def supports_publisher_affiliation(self): | |
192 return True | |
193 | |
194 def supports_outcast_affiliation(self): | |
195 return True | |
196 | |
197 def supports_persistent_items(self): | |
198 return True | |
199 | |
200 def get_node_type(self, node_id): | |
201 return self.storage.get_node_type(node_id) | |
202 | |
203 def get_nodes(self): | |
204 return self.storage.get_nodes() | |
205 | |
206 def get_node_meta_data(self, node_id): | |
207 d = self.storage.get_node_configuration(node_id) | |
208 | |
209 d.addCallback(self._make_meta_data) | |
210 return d | |
211 | |
212 def _make_meta_data(self, meta_data): | |
213 options = [] | |
214 for key, value in meta_data.iteritems(): | |
215 if self.options.has_key(key): | |
216 option = {"var": key} | |
217 option.update(self.options[key]) | |
218 option["value"] = value | |
219 options.append(option) | |
220 | |
221 return options | |
222 | |
223 class PublishService(service.Service): | |
224 | |
225 implements(IPublishService) | |
226 | |
227 def publish(self, node_id, items, requestor): | |
228 d1 = self.parent.storage.get_node_configuration(node_id) | |
229 d2 = self.parent.storage.get_affiliation(node_id, requestor) | |
230 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) | |
231 d.addErrback(lambda x: x.value[0]) | |
232 d.addCallback(self._do_publish, node_id, items, requestor) | |
233 return d | |
234 | |
235 def _do_publish(self, result, node_id, items, requestor): | |
236 configuration = result[0][1] | |
237 persist_items = configuration["pubsub#persist_items"] | |
238 deliver_payloads = configuration["pubsub#deliver_payloads"] | |
239 affiliation = result[1][1] | |
240 | |
241 if affiliation not in ['owner', 'publisher']: | |
242 raise NotAuthorized | |
243 | |
244 if items and not persist_items and not deliver_payloads: | |
245 raise NoPayloadAllowed | |
246 elif not items and (persist_items or deliver_payloads): | |
247 raise PayloadExpected | |
248 | |
249 if persist_items or deliver_payloads: | |
250 for item in items: | |
251 if not item.getAttribute("id"): | |
252 item["id"] = sha.new(str(time.time()) + | |
253 requestor.full()).hexdigest() | |
254 | |
255 if persist_items: | |
256 d = self.parent.storage.store_items(node_id, items, | |
257 requestor) | |
258 else: | |
259 d = defer.succeed(None) | |
260 | |
261 d.addCallback(self._do_notify, node_id, items, deliver_payloads) | |
262 | |
263 def _do_notify(self, result, node_id, items, deliver_payloads): | |
264 if items and not deliver_payloads: | |
265 for item in items: | |
266 item.children = [] | |
267 | |
268 self.parent.dispatch({ 'items': items, 'node_id': node_id }, | |
269 '//event/pubsub/notify') | |
270 | |
271 class NotificationService(service.Service): | |
272 | |
273 implements(INotificationService) | |
274 | |
275 def get_notification_list(self, node_id, items): | |
276 d = self.parent.storage.get_subscribers(node_id) | |
277 d.addCallback(self._magic_filter, node_id, items) | |
278 return d | |
279 | |
280 def _magic_filter(self, subscribers, node_id, items): | |
281 list = {} | |
282 for subscriber in subscribers: | |
283 list[subscriber] = items | |
284 return list | |
285 | |
286 def register_notifier(self, observerfn, *args, **kwargs): | |
287 self.parent.addObserver('//event/pubsub/notify', observerfn, | |
288 *args, **kwargs) | |
289 | |
290 class SubscriptionService(service.Service): | |
291 | |
292 implements(ISubscriptionService) | |
293 | |
294 def subscribe(self, node_id, subscriber, requestor): | |
295 if subscriber.userhostJID() != requestor: | |
296 raise NotAuthorized | |
297 | |
298 d1 = self.parent.storage.get_node_configuration(node_id) | |
299 d2 = self.parent.storage.get_affiliation(node_id, subscriber) | |
300 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) | |
301 d.addErrback(lambda x: x.value[0]) | |
302 d.addCallback(self._do_subscribe, node_id, subscriber) | |
303 return d | |
304 | |
305 def _do_subscribe(self, result, node_id, subscriber): | |
306 configuration = result[0][1] | |
307 affiliation = result[1][1] | |
308 | |
309 if affiliation == 'outcast': | |
310 raise NotAuthorized | |
311 | |
312 d = self.parent.storage.add_subscription(node_id, subscriber, | |
313 'subscribed') | |
314 d.addCallback(self._return_subscription, affiliation) | |
315 return d | |
316 | |
317 def _return_subscription(self, result, affiliation): | |
318 result['affiliation'] = affiliation | |
319 return result | |
320 | |
321 def unsubscribe(self, node_id, subscriber, requestor): | |
322 if subscriber.userhostJID() != requestor: | |
323 raise NotAuthorized | |
324 | |
325 d = self.parent.storage.get_node_configuration(node_id) | |
326 d.addCallback(self._do_unsubscribe, node_id, subscriber) | |
327 return d | |
328 | |
329 def _do_unsubscribe(self, result, node_id, subscriber): | |
330 return self.parent.storage.remove_subscription(node_id, | |
331 subscriber) | |
332 | |
333 class NodeCreationService(service.Service): | |
334 | |
335 implements(INodeCreationService) | |
336 | |
337 def supports_instant_nodes(self): | |
338 return True | |
339 | |
340 def create_node(self, node_id, requestor): | |
341 if not node_id: | |
342 node_id = 'generic/%s' % sha.new(str(time.time()) + | |
343 requestor.full()).hexdigest() | |
344 | |
345 d = self.parent.storage.create_node(node_id, requestor) | |
346 d.addCallback(lambda _: node_id) | |
347 return d | |
348 | |
349 def get_node_configuration(self, node_id): | |
350 if node_id: | |
351 d = self.parent.storage.get_node_configuration(node_id) | |
352 else: | |
353 # XXX: this is disabled in pubsub.py | |
354 d = defer.succeed(self.parent.default_config) | |
355 | |
356 d.addCallback(self._make_config) | |
357 return d | |
358 | |
359 def _make_config(self, config): | |
360 options = [] | |
361 for key, value in self.parent.options.iteritems(): | |
362 option = {"var": key} | |
363 option.update(value) | |
364 if config.has_key(key): | |
365 option["value"] = config[key] | |
366 options.append(option) | |
367 | |
368 return options | |
369 | |
370 def set_node_configuration(self, node_id, options, requestor): | |
371 for key in options.iterkeys(): | |
372 if not self.parent.options.has_key(key): | |
373 raise InvalidConfigurationOption | |
374 | |
375 d = self.parent.storage.get_affiliation(node_id, requestor) | |
376 d.addCallback(self._do_set_node_configuration, node_id, options) | |
377 return d | |
378 | |
379 def _do_set_node_configuration(self, affiliation, node_id, options): | |
380 if affiliation != 'owner': | |
381 raise NotAuthorized | |
382 | |
383 return self.parent.storage.set_node_configuration(node_id, options) | |
384 | |
385 class AffiliationsService(service.Service): | |
386 | |
387 implements(IAffiliationsService) | |
388 | |
389 def get_affiliations(self, entity): | |
390 d1 = self.parent.storage.get_affiliations(entity) | |
391 d2 = self.parent.storage.get_subscriptions(entity) | |
392 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) | |
393 d.addErrback(lambda x: x.value[0]) | |
394 d.addCallback(self._affiliations_result, entity) | |
395 return d | |
396 | |
397 def _affiliations_result(self, result, entity): | |
398 affiliations = result[0][1] | |
399 subscriptions = result[1][1] | |
400 | |
401 new_affiliations = {} | |
402 | |
403 for node, affiliation in affiliations: | |
404 new_affiliations[(node, entity.full())] = {'node': node, | |
405 'jid': entity, | |
406 'affiliation': affiliation, | |
407 'subscription': None | |
408 } | |
409 | |
410 for node, subscriber, subscription in subscriptions: | |
411 key = node, subscriber.full() | |
412 if new_affiliations.has_key(key): | |
413 new_affiliations[key]['subscription'] = subscription | |
414 else: | |
415 new_affiliations[key] = {'node': node, | |
416 'jid': subscriber, | |
417 'affiliation': None, | |
418 'subscription': subscription} | |
419 | |
420 return new_affiliations.values() | |
421 | |
422 class ItemRetrievalService(service.Service): | |
423 | |
424 implements(IItemRetrievalService) | |
425 | |
426 def get_items(self, node_id, requestor, max_items=None, item_ids=[]): | |
427 d = self.parent.storage.is_subscribed(node_id, requestor) | |
428 d.addCallback(self._do_get_items, node_id, max_items, item_ids) | |
429 return d | |
430 | |
431 def _do_get_items(self, result, node_id, max_items, item_ids): | |
432 if not result: | |
433 raise NotAuthorized | |
434 | |
435 if item_ids: | |
436 return self.parent.storage.get_items_by_ids(node_id, item_ids) | |
437 else: | |
438 return self.parent.storage.get_items(node_id, max_items) | |
439 | |
440 class RetractionService(service.Service): | |
441 | |
442 implements(IRetractionService) | |
443 | |
444 def retract_item(self, node_id, item_ids, requestor): | |
445 d1 = self.parent.storage.get_node_configuration(node_id) | |
446 d2 = self.parent.storage.get_affiliation(node_id, requestor) | |
447 d = defer.DeferredList([d1, d2], fireOnOneErrback=1) | |
448 d.addErrback(lambda x: x.value[0]) | |
449 d.addCallback(self._do_retract, node_id, item_ids) | |
450 return d | |
451 | |
452 def _do_retract(self, result, node_id, item_ids): | |
453 configuration = result[0][1] | |
454 persist_items = configuration["persist_items"] | |
455 affiliation = result[1][1] | |
456 | |
457 if affiliation not in ['owner', 'publisher']: | |
458 raise NotAuthorized | |
459 | |
460 if not persist_items: | |
461 raise NodeNotPersistent | |
462 | |
463 d = self.parent.storage.remove_items(node_id, item_ids) | |
464 d.addCallback(self._do_notify_retraction, node_id) | |
465 return d | |
466 | |
467 def _do_notify_retraction(self, result, node_id): | |
468 self.parent.dispatch({ 'item_ids': result, 'node_id': node_id }, | |
469 '//event/pubsub/retract') | |
470 | |
471 def purge_node(self, node_id, requestor): | |
472 d1 = self.parent.storage.get_node_configuration(node_id) | |
473 d2 = self.parent.storage.get_affiliation(node_id, requestor) | |
474 d = defer.DeferredList([d1, d2], fireOnOneErrback=1) | |
475 d.addErrback(lambda x: x.value[0]) | |
476 d.addCallback(self._do_purge, node_id) | |
477 return d | |
478 | |
479 def _do_purge(self, result, node_id): | |
480 configuration = result[0][1] | |
481 persist_items = configuration["persist_items"] | |
482 affiliation = result[1][1] | |
483 | |
484 if affiliation != 'owner': | |
485 raise NotAuthorized | |
486 | |
487 if not persist_items: | |
488 raise NodeNotPersistent | |
489 | |
490 d = self.parent.storage.purge_node(node_id) | |
491 d.addCallback(self._do_notify_purge, node_id) | |
492 return d | |
493 | |
494 def _do_notify_purge(self, result, node_id): | |
495 self.parent.dispatch(node_id, '//event/pubsub/purge') | |
496 | |
497 class NodeDeletionService(service.Service): | |
498 | |
499 implements(INodeDeletionService) | |
500 | |
501 def __init__(self): | |
502 self._callback_list = [] | |
503 | |
504 def register_pre_delete(self, pre_delete_fn): | |
505 self._callback_list.append(pre_delete_fn) | |
506 | |
507 def get_subscribers(self, node_id): | |
508 return self.parent.storage.get_subscribers(node_id) | |
509 | |
510 def delete_node(self, node_id, requestor): | |
511 d1 = self.parent.storage.get_node_configuration(node_id) | |
512 d2 = self.parent.storage.get_affiliation(node_id, requestor) | |
513 d = defer.DeferredList([d1, d2], fireOnOneErrback=1) | |
514 d.addErrback(lambda x: x.value[0]) | |
515 d.addCallback(self._do_pre_delete, node_id) | |
516 return d | |
517 | |
518 def _do_pre_delete(self, result, node_id): | |
519 configuration = result[0][1] | |
520 persist_items = configuration["persist_items"] | |
521 affiliation = result[1][1] | |
522 | |
523 if affiliation != 'owner': | |
524 raise NotAuthorized | |
525 | |
526 d = defer.DeferredList([cb(node_id) for cb in self._callback_list], | |
527 consumeErrors=1) | |
528 d.addCallback(self._do_delete, node_id) | |
529 | |
530 def _do_delete(self, result, node_id): | |
531 dl = [] | |
532 for succeeded, r in result: | |
533 if succeeded and r: | |
534 dl.extend(r) | |
535 | |
536 d = self.parent.storage.delete_node(node_id) | |
537 d.addCallback(self._do_notify_delete, dl) | |
538 | |
539 return d | |
540 | |
541 def _do_notify_delete(self, result, dl): | |
542 for d in dl: | |
543 d.callback(None) |