comparison idavoll/generic_backend.py @ 107:d252d793f0ed

Initial revision.
author Ralph Meijer <ralphm@ik.nu>
date Fri, 08 Apr 2005 10:15:02 +0000
parents
children 7d83fe9bdb65
comparison
equal deleted inserted replaced
106:dc36882d2620 107:d252d793f0ed
1 import sha
2 import time
3 from twisted.words.protocols.jabber import jid
4 from twisted.application import service
5 from twisted.xish import utility
6 from twisted.internet import defer
7 from zope.interface import implements
8 import backend
9
10 def _get_affiliation(node, entity):
11 d = node.get_affiliation(entity)
12 d.addCallback(lambda affiliation: (node, affiliation))
13 return d
14
15 class BackendService(service.MultiService, utility.EventDispatcher):
16
17 implements(backend.IBackendService)
18
19 options = {"pubsub#persist_items":
20 {"type": "boolean",
21 "label": "Persist items to storage"},
22 "pubsub#deliver_payloads":
23 {"type": "boolean",
24 "label": "Deliver payloads with event notifications"},
25 }
26
27 default_config = {"pubsub#persist_items": True,
28 "pubsub#deliver_payloads": True,
29 }
30
31 def __init__(self, storage):
32 service.MultiService.__init__(self)
33 utility.EventDispatcher.__init__(self)
34 self.storage = storage
35
36 def supports_publisher_affiliation(self):
37 return True
38
39 def supports_outcast_affiliation(self):
40 return True
41
42 def supports_persistent_items(self):
43 return True
44
45 def get_node_type(self, node_id):
46 d = self.storage.get_node(node_id)
47 d.addCallback(lambda node: node.get_type())
48 return d
49
50 def get_nodes(self):
51 return self.storage.get_node_ids()
52
53 def get_node_meta_data(self, node_id):
54 d = self.storage.get_node(node_id)
55 d.addCallback(lambda node: node.get_meta_data())
56 d.addCallback(self._make_meta_data)
57 return d
58
59 def _make_meta_data(self, meta_data):
60 options = []
61 for key, value in meta_data.iteritems():
62 if self.options.has_key(key):
63 option = {"var": key}
64 option.update(self.options[key])
65 option["value"] = value
66 options.append(option)
67
68 return options
69
70 class PublishService(service.Service):
71
72 implements(backend.IPublishService)
73
74 def publish(self, node_id, items, requestor):
75 d = self.parent.storage.get_node(node_id)
76 d.addCallback(_get_affiliation, requestor)
77 d.addCallback(self._do_publish, items, requestor)
78 return d
79
80 def _do_publish(self, result, items, requestor):
81 node, affiliation = result
82 configuration = node.get_configuration()
83 persist_items = configuration["pubsub#persist_items"]
84 deliver_payloads = configuration["pubsub#deliver_payloads"]
85
86 if affiliation not in ['owner', 'publisher']:
87 raise backend.NotAuthorized
88
89 if items and not persist_items and not deliver_payloads:
90 raise backend.NoPayloadAllowed
91 elif not items and (persist_items or deliver_payloads):
92 raise backend.PayloadExpected
93
94 if persist_items or deliver_payloads:
95 for item in items:
96 if not item.getAttribute("id"):
97 item["id"] = sha.new(str(time.time()) +
98 requestor.full()).hexdigest()
99
100 if persist_items:
101 d = node.store_items(items, requestor)
102 else:
103 d = defer.succeed(None)
104
105 d.addCallback(self._do_notify, node.id, items, deliver_payloads)
106
107 def _do_notify(self, result, node_id, items, deliver_payloads):
108 if items and not deliver_payloads:
109 for item in items:
110 item.children = []
111
112 self.parent.dispatch({ 'items': items, 'node_id': node_id },
113 '//event/pubsub/notify')
114
115 class NotificationService(service.Service):
116
117 implements(backend.INotificationService)
118
119 def get_notification_list(self, node_id, items):
120 d = self.parent.storage.get_node(node_id)
121 d.addCallback(lambda node: node.get_subscribers())
122 d.addCallback(self._magic_filter, node_id, items)
123 return d
124
125 def _magic_filter(self, subscribers, node_id, items):
126 list = []
127 for subscriber in subscribers:
128 list.append((subscriber, items))
129 return list
130
131 def register_notifier(self, observerfn, *args, **kwargs):
132 self.parent.addObserver('//event/pubsub/notify', observerfn,
133 *args, **kwargs)
134
135 class SubscriptionService(service.Service):
136
137 implements(backend.ISubscriptionService)
138
139 def subscribe(self, node_id, subscriber, requestor):
140 subscriber_entity = subscriber.userhostJID()
141 if subscriber_entity != requestor:
142 return defer.fail(backend.NotAuthorized)
143
144 d = self.parent.storage.get_node(node_id)
145 d.addCallback(_get_affiliation, subscriber_entity)
146 d.addCallback(self._do_subscribe, subscriber)
147 return d
148
149 def _do_subscribe(self, result, subscriber):
150 node, affiliation = result
151
152 if affiliation == 'outcast':
153 raise backend.NotAuthorized
154
155 d = node.add_subscription(subscriber, 'subscribed')
156 d.addCallback(self._return_subscription, affiliation)
157 return d
158
159 def _return_subscription(self, result, affiliation):
160 result['affiliation'] = affiliation
161 return result
162
163 def unsubscribe(self, node_id, subscriber, requestor):
164 if subscriber.userhostJID() != requestor:
165 raise backend.NotAuthorized
166
167 d = self.parent.storage.get_node(node_id)
168 d.addCallback(lambda node: node.remove_subscription(subscriber))
169 return d
170
171 class NodeCreationService(service.Service):
172
173 implements(backend.INodeCreationService)
174
175 def supports_instant_nodes(self):
176 return True
177
178 def create_node(self, node_id, requestor):
179 if not node_id:
180 node_id = 'generic/%s' % sha.new(str(time.time()) +
181 requestor.full()).hexdigest()
182
183 d = self.parent.storage.create_node(node_id, requestor)
184 d.addCallback(lambda _: node_id)
185 return d
186
187 def get_node_configuration(self, node_id):
188 if node_id:
189 d = self.parent.storage.get_node(node_id)
190 d.addCallback(lambda node: node.get_configuration())
191 else:
192 # XXX: this is disabled in pubsub.py
193 d = defer.succeed(self.parent.default_config)
194
195 d.addCallback(self._make_config)
196 return d
197
198 def _make_config(self, config):
199 options = []
200 for key, value in self.parent.options.iteritems():
201 option = {"var": key}
202 option.update(value)
203 if config.has_key(key):
204 option["value"] = config[key]
205 options.append(option)
206
207 return options
208
209 def set_node_configuration(self, node_id, options, requestor):
210 for key, value in options.iteritems():
211 if not self.parent.options.has_key(key):
212 raise backend.InvalidConfigurationOption
213 if self.parent.options[key]["type"] == 'boolean':
214 try:
215 options[key] = bool(int(value))
216 except ValueError:
217 raise backend.InvalidConfigurationValue
218
219 d = self.parent.storage.get_node(node_id)
220 d.addCallback(_get_affiliation, requestor)
221 d.addCallback(self._do_set_node_configuration, options)
222 return d
223
224 def _do_set_node_configuration(self, result, options):
225 node, affiliation = result
226
227 if affiliation != 'owner':
228 raise backend.NotAuthorized
229
230 return node.set_configuration(options)
231
232 class AffiliationsService(service.Service):
233
234 implements(backend.IAffiliationsService)
235
236 def get_affiliations(self, entity):
237 d1 = self.parent.storage.get_affiliations(entity)
238 d2 = self.parent.storage.get_subscriptions(entity)
239 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1)
240 d.addErrback(lambda x: x.value[0])
241 d.addCallback(self._affiliations_result, entity)
242 return d
243
244 def _affiliations_result(self, result, entity):
245 affiliations = result[0][1]
246 subscriptions = result[1][1]
247
248 new_affiliations = {}
249
250 for node, affiliation in affiliations:
251 new_affiliations[(node, entity.full())] = {'node': node,
252 'jid': entity,
253 'affiliation': affiliation,
254 'subscription': None
255 }
256
257 for node, subscriber, subscription in subscriptions:
258 key = node, subscriber.full()
259 if new_affiliations.has_key(key):
260 new_affiliations[key]['subscription'] = subscription
261 else:
262 new_affiliations[key] = {'node': node,
263 'jid': subscriber,
264 'affiliation': None,
265 'subscription': subscription}
266
267 return new_affiliations.values()
268
269 class ItemRetrievalService(service.Service):
270
271 implements(backend.IItemRetrievalService)
272
273 def get_items(self, node_id, requestor, max_items=None, item_ids=[]):
274 d = self.parent.storage.get_node(node_id)
275 d.addCallback(self._is_subscribed, requestor)
276 d.addCallback(self._do_get_items, max_items, item_ids)
277 return d
278
279 def _is_subscribed(self, node, subscriber):
280 d = node.is_subscribed(subscriber)
281 d.addCallback(lambda subscribed: (node, subscribed))
282 return d
283
284 def _do_get_items(self, result, max_items, item_ids):
285 node, subscribed = result
286
287 if not subscribed:
288 raise backend.NotAuthorized
289
290 if item_ids:
291 return node.get_items_by_id(item_ids)
292 else:
293 return node.get_items(max_items)
294
295 class RetractionService(service.Service):
296
297 implements(backend.IRetractionService)
298
299 def retract_item(self, node_id, item_ids, requestor):
300 d = self.parent.storage.get_node(node_id)
301 d.addCallback(_get_affiliation, requestor)
302 d.addCallback(self._do_retract, item_ids)
303 return d
304
305 def _do_retract(self, result, item_ids):
306 node, affiliation = result
307 persist_items = node.get_configuration()["pubsub#persist_items"]
308
309 if affiliation not in ['owner', 'publisher']:
310 raise backend.NotAuthorized
311
312 if not persist_items:
313 raise backend.NodeNotPersistent
314
315 d = node.remove_items(item_ids)
316 d.addCallback(self._do_notify_retraction, node.id)
317 return d
318
319 def _do_notify_retraction(self, result, node_id):
320 self.parent.dispatch({ 'item_ids': result, 'node_id': node_id },
321 '//event/pubsub/retract')
322
323 def purge_node(self, node_id, requestor):
324 d = self.parent.storage.get_node(node_id)
325 d.addCallback(_get_affiliation, requestor)
326 d.addCallback(self._do_purge)
327 return d
328
329 def _do_purge(self, result):
330 node, affiliation = result
331 persist_items = node.get_configuration()["pubsub#persist_items"]
332
333 if affiliation != 'owner':
334 raise backend.NotAuthorized
335
336 if not persist_items:
337 raise backend.NodeNotPersistent
338
339 d = node.purge()
340 d.addCallback(self._do_notify_purge, node.id)
341 return d
342
343 def _do_notify_purge(self, result, node_id):
344 self.parent.dispatch(node_id, '//event/pubsub/purge')
345
346 class NodeDeletionService(service.Service):
347
348 implements(backend.INodeDeletionService)
349
350 def __init__(self):
351 self._callback_list = []
352
353 def register_pre_delete(self, pre_delete_fn):
354 self._callback_list.append(pre_delete_fn)
355
356 def get_subscribers(self, node_id):
357 d = self.parent.storage.get_node(node_id)
358 d.addCallback(lambda node: node.get_subscribers())
359 return d
360
361 def delete_node(self, node_id, requestor):
362 d = self.parent.storage.get_node(node_id)
363 d.addCallback(_get_affiliation, requestor)
364 d.addCallback(self._do_pre_delete)
365 return d
366
367 def _do_pre_delete(self, result):
368 node, affiliation = result
369
370 if affiliation != 'owner':
371 raise backend.NotAuthorized
372
373 d = defer.DeferredList([cb(node_id) for cb in self._callback_list],
374 consumeErrors=1)
375 d.addCallback(self._do_delete, node.id)
376
377 def _do_delete(self, result, node_id):
378 dl = []
379 for succeeded, r in result:
380 if succeeded and r:
381 dl.extend(r)
382
383 d = self.parent.storage.delete_node(node_id)
384 d.addCallback(self._do_notify_delete, dl)
385
386 return d
387
388 def _do_notify_delete(self, result, dl):
389 for d in dl:
390 d.callback(None)