Mercurial > libervia-pubsub
comparison idavoll/generic_backend.py @ 107:d252d793f0ed
Initial revision.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Fri, 08 Apr 2005 10:15:02 +0000 |
parents | |
children | 7d83fe9bdb65 |
comparison
equal
deleted
inserted
replaced
106:dc36882d2620 | 107:d252d793f0ed |
---|---|
1 import sha | |
2 import time | |
3 from twisted.words.protocols.jabber import jid | |
4 from twisted.application import service | |
5 from twisted.xish import utility | |
6 from twisted.internet import defer | |
7 from zope.interface import implements | |
8 import backend | |
9 | |
10 def _get_affiliation(node, entity): | |
11 d = node.get_affiliation(entity) | |
12 d.addCallback(lambda affiliation: (node, affiliation)) | |
13 return d | |
14 | |
15 class BackendService(service.MultiService, utility.EventDispatcher): | |
16 | |
17 implements(backend.IBackendService) | |
18 | |
19 options = {"pubsub#persist_items": | |
20 {"type": "boolean", | |
21 "label": "Persist items to storage"}, | |
22 "pubsub#deliver_payloads": | |
23 {"type": "boolean", | |
24 "label": "Deliver payloads with event notifications"}, | |
25 } | |
26 | |
27 default_config = {"pubsub#persist_items": True, | |
28 "pubsub#deliver_payloads": True, | |
29 } | |
30 | |
31 def __init__(self, storage): | |
32 service.MultiService.__init__(self) | |
33 utility.EventDispatcher.__init__(self) | |
34 self.storage = storage | |
35 | |
36 def supports_publisher_affiliation(self): | |
37 return True | |
38 | |
39 def supports_outcast_affiliation(self): | |
40 return True | |
41 | |
42 def supports_persistent_items(self): | |
43 return True | |
44 | |
45 def get_node_type(self, node_id): | |
46 d = self.storage.get_node(node_id) | |
47 d.addCallback(lambda node: node.get_type()) | |
48 return d | |
49 | |
50 def get_nodes(self): | |
51 return self.storage.get_node_ids() | |
52 | |
53 def get_node_meta_data(self, node_id): | |
54 d = self.storage.get_node(node_id) | |
55 d.addCallback(lambda node: node.get_meta_data()) | |
56 d.addCallback(self._make_meta_data) | |
57 return d | |
58 | |
59 def _make_meta_data(self, meta_data): | |
60 options = [] | |
61 for key, value in meta_data.iteritems(): | |
62 if self.options.has_key(key): | |
63 option = {"var": key} | |
64 option.update(self.options[key]) | |
65 option["value"] = value | |
66 options.append(option) | |
67 | |
68 return options | |
69 | |
70 class PublishService(service.Service): | |
71 | |
72 implements(backend.IPublishService) | |
73 | |
74 def publish(self, node_id, items, requestor): | |
75 d = self.parent.storage.get_node(node_id) | |
76 d.addCallback(_get_affiliation, requestor) | |
77 d.addCallback(self._do_publish, items, requestor) | |
78 return d | |
79 | |
80 def _do_publish(self, result, items, requestor): | |
81 node, affiliation = result | |
82 configuration = node.get_configuration() | |
83 persist_items = configuration["pubsub#persist_items"] | |
84 deliver_payloads = configuration["pubsub#deliver_payloads"] | |
85 | |
86 if affiliation not in ['owner', 'publisher']: | |
87 raise backend.NotAuthorized | |
88 | |
89 if items and not persist_items and not deliver_payloads: | |
90 raise backend.NoPayloadAllowed | |
91 elif not items and (persist_items or deliver_payloads): | |
92 raise backend.PayloadExpected | |
93 | |
94 if persist_items or deliver_payloads: | |
95 for item in items: | |
96 if not item.getAttribute("id"): | |
97 item["id"] = sha.new(str(time.time()) + | |
98 requestor.full()).hexdigest() | |
99 | |
100 if persist_items: | |
101 d = node.store_items(items, requestor) | |
102 else: | |
103 d = defer.succeed(None) | |
104 | |
105 d.addCallback(self._do_notify, node.id, items, deliver_payloads) | |
106 | |
107 def _do_notify(self, result, node_id, items, deliver_payloads): | |
108 if items and not deliver_payloads: | |
109 for item in items: | |
110 item.children = [] | |
111 | |
112 self.parent.dispatch({ 'items': items, 'node_id': node_id }, | |
113 '//event/pubsub/notify') | |
114 | |
115 class NotificationService(service.Service): | |
116 | |
117 implements(backend.INotificationService) | |
118 | |
119 def get_notification_list(self, node_id, items): | |
120 d = self.parent.storage.get_node(node_id) | |
121 d.addCallback(lambda node: node.get_subscribers()) | |
122 d.addCallback(self._magic_filter, node_id, items) | |
123 return d | |
124 | |
125 def _magic_filter(self, subscribers, node_id, items): | |
126 list = [] | |
127 for subscriber in subscribers: | |
128 list.append((subscriber, items)) | |
129 return list | |
130 | |
131 def register_notifier(self, observerfn, *args, **kwargs): | |
132 self.parent.addObserver('//event/pubsub/notify', observerfn, | |
133 *args, **kwargs) | |
134 | |
135 class SubscriptionService(service.Service): | |
136 | |
137 implements(backend.ISubscriptionService) | |
138 | |
139 def subscribe(self, node_id, subscriber, requestor): | |
140 subscriber_entity = subscriber.userhostJID() | |
141 if subscriber_entity != requestor: | |
142 return defer.fail(backend.NotAuthorized) | |
143 | |
144 d = self.parent.storage.get_node(node_id) | |
145 d.addCallback(_get_affiliation, subscriber_entity) | |
146 d.addCallback(self._do_subscribe, subscriber) | |
147 return d | |
148 | |
149 def _do_subscribe(self, result, subscriber): | |
150 node, affiliation = result | |
151 | |
152 if affiliation == 'outcast': | |
153 raise backend.NotAuthorized | |
154 | |
155 d = node.add_subscription(subscriber, 'subscribed') | |
156 d.addCallback(self._return_subscription, affiliation) | |
157 return d | |
158 | |
159 def _return_subscription(self, result, affiliation): | |
160 result['affiliation'] = affiliation | |
161 return result | |
162 | |
163 def unsubscribe(self, node_id, subscriber, requestor): | |
164 if subscriber.userhostJID() != requestor: | |
165 raise backend.NotAuthorized | |
166 | |
167 d = self.parent.storage.get_node(node_id) | |
168 d.addCallback(lambda node: node.remove_subscription(subscriber)) | |
169 return d | |
170 | |
171 class NodeCreationService(service.Service): | |
172 | |
173 implements(backend.INodeCreationService) | |
174 | |
175 def supports_instant_nodes(self): | |
176 return True | |
177 | |
178 def create_node(self, node_id, requestor): | |
179 if not node_id: | |
180 node_id = 'generic/%s' % sha.new(str(time.time()) + | |
181 requestor.full()).hexdigest() | |
182 | |
183 d = self.parent.storage.create_node(node_id, requestor) | |
184 d.addCallback(lambda _: node_id) | |
185 return d | |
186 | |
187 def get_node_configuration(self, node_id): | |
188 if node_id: | |
189 d = self.parent.storage.get_node(node_id) | |
190 d.addCallback(lambda node: node.get_configuration()) | |
191 else: | |
192 # XXX: this is disabled in pubsub.py | |
193 d = defer.succeed(self.parent.default_config) | |
194 | |
195 d.addCallback(self._make_config) | |
196 return d | |
197 | |
198 def _make_config(self, config): | |
199 options = [] | |
200 for key, value in self.parent.options.iteritems(): | |
201 option = {"var": key} | |
202 option.update(value) | |
203 if config.has_key(key): | |
204 option["value"] = config[key] | |
205 options.append(option) | |
206 | |
207 return options | |
208 | |
209 def set_node_configuration(self, node_id, options, requestor): | |
210 for key, value in options.iteritems(): | |
211 if not self.parent.options.has_key(key): | |
212 raise backend.InvalidConfigurationOption | |
213 if self.parent.options[key]["type"] == 'boolean': | |
214 try: | |
215 options[key] = bool(int(value)) | |
216 except ValueError: | |
217 raise backend.InvalidConfigurationValue | |
218 | |
219 d = self.parent.storage.get_node(node_id) | |
220 d.addCallback(_get_affiliation, requestor) | |
221 d.addCallback(self._do_set_node_configuration, options) | |
222 return d | |
223 | |
224 def _do_set_node_configuration(self, result, options): | |
225 node, affiliation = result | |
226 | |
227 if affiliation != 'owner': | |
228 raise backend.NotAuthorized | |
229 | |
230 return node.set_configuration(options) | |
231 | |
232 class AffiliationsService(service.Service): | |
233 | |
234 implements(backend.IAffiliationsService) | |
235 | |
236 def get_affiliations(self, entity): | |
237 d1 = self.parent.storage.get_affiliations(entity) | |
238 d2 = self.parent.storage.get_subscriptions(entity) | |
239 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) | |
240 d.addErrback(lambda x: x.value[0]) | |
241 d.addCallback(self._affiliations_result, entity) | |
242 return d | |
243 | |
244 def _affiliations_result(self, result, entity): | |
245 affiliations = result[0][1] | |
246 subscriptions = result[1][1] | |
247 | |
248 new_affiliations = {} | |
249 | |
250 for node, affiliation in affiliations: | |
251 new_affiliations[(node, entity.full())] = {'node': node, | |
252 'jid': entity, | |
253 'affiliation': affiliation, | |
254 'subscription': None | |
255 } | |
256 | |
257 for node, subscriber, subscription in subscriptions: | |
258 key = node, subscriber.full() | |
259 if new_affiliations.has_key(key): | |
260 new_affiliations[key]['subscription'] = subscription | |
261 else: | |
262 new_affiliations[key] = {'node': node, | |
263 'jid': subscriber, | |
264 'affiliation': None, | |
265 'subscription': subscription} | |
266 | |
267 return new_affiliations.values() | |
268 | |
269 class ItemRetrievalService(service.Service): | |
270 | |
271 implements(backend.IItemRetrievalService) | |
272 | |
273 def get_items(self, node_id, requestor, max_items=None, item_ids=[]): | |
274 d = self.parent.storage.get_node(node_id) | |
275 d.addCallback(self._is_subscribed, requestor) | |
276 d.addCallback(self._do_get_items, max_items, item_ids) | |
277 return d | |
278 | |
279 def _is_subscribed(self, node, subscriber): | |
280 d = node.is_subscribed(subscriber) | |
281 d.addCallback(lambda subscribed: (node, subscribed)) | |
282 return d | |
283 | |
284 def _do_get_items(self, result, max_items, item_ids): | |
285 node, subscribed = result | |
286 | |
287 if not subscribed: | |
288 raise backend.NotAuthorized | |
289 | |
290 if item_ids: | |
291 return node.get_items_by_id(item_ids) | |
292 else: | |
293 return node.get_items(max_items) | |
294 | |
295 class RetractionService(service.Service): | |
296 | |
297 implements(backend.IRetractionService) | |
298 | |
299 def retract_item(self, node_id, item_ids, requestor): | |
300 d = self.parent.storage.get_node(node_id) | |
301 d.addCallback(_get_affiliation, requestor) | |
302 d.addCallback(self._do_retract, item_ids) | |
303 return d | |
304 | |
305 def _do_retract(self, result, item_ids): | |
306 node, affiliation = result | |
307 persist_items = node.get_configuration()["pubsub#persist_items"] | |
308 | |
309 if affiliation not in ['owner', 'publisher']: | |
310 raise backend.NotAuthorized | |
311 | |
312 if not persist_items: | |
313 raise backend.NodeNotPersistent | |
314 | |
315 d = node.remove_items(item_ids) | |
316 d.addCallback(self._do_notify_retraction, node.id) | |
317 return d | |
318 | |
319 def _do_notify_retraction(self, result, node_id): | |
320 self.parent.dispatch({ 'item_ids': result, 'node_id': node_id }, | |
321 '//event/pubsub/retract') | |
322 | |
323 def purge_node(self, node_id, requestor): | |
324 d = self.parent.storage.get_node(node_id) | |
325 d.addCallback(_get_affiliation, requestor) | |
326 d.addCallback(self._do_purge) | |
327 return d | |
328 | |
329 def _do_purge(self, result): | |
330 node, affiliation = result | |
331 persist_items = node.get_configuration()["pubsub#persist_items"] | |
332 | |
333 if affiliation != 'owner': | |
334 raise backend.NotAuthorized | |
335 | |
336 if not persist_items: | |
337 raise backend.NodeNotPersistent | |
338 | |
339 d = node.purge() | |
340 d.addCallback(self._do_notify_purge, node.id) | |
341 return d | |
342 | |
343 def _do_notify_purge(self, result, node_id): | |
344 self.parent.dispatch(node_id, '//event/pubsub/purge') | |
345 | |
346 class NodeDeletionService(service.Service): | |
347 | |
348 implements(backend.INodeDeletionService) | |
349 | |
350 def __init__(self): | |
351 self._callback_list = [] | |
352 | |
353 def register_pre_delete(self, pre_delete_fn): | |
354 self._callback_list.append(pre_delete_fn) | |
355 | |
356 def get_subscribers(self, node_id): | |
357 d = self.parent.storage.get_node(node_id) | |
358 d.addCallback(lambda node: node.get_subscribers()) | |
359 return d | |
360 | |
361 def delete_node(self, node_id, requestor): | |
362 d = self.parent.storage.get_node(node_id) | |
363 d.addCallback(_get_affiliation, requestor) | |
364 d.addCallback(self._do_pre_delete) | |
365 return d | |
366 | |
367 def _do_pre_delete(self, result): | |
368 node, affiliation = result | |
369 | |
370 if affiliation != 'owner': | |
371 raise backend.NotAuthorized | |
372 | |
373 d = defer.DeferredList([cb(node_id) for cb in self._callback_list], | |
374 consumeErrors=1) | |
375 d.addCallback(self._do_delete, node.id) | |
376 | |
377 def _do_delete(self, result, node_id): | |
378 dl = [] | |
379 for succeeded, r in result: | |
380 if succeeded and r: | |
381 dl.extend(r) | |
382 | |
383 d = self.parent.storage.delete_node(node_id) | |
384 d.addCallback(self._do_notify_delete, dl) | |
385 | |
386 return d | |
387 | |
388 def _do_notify_delete(self, result, dl): | |
389 for d in dl: | |
390 d.callback(None) |