Mercurial > libervia-pubsub
annotate idavoll/generic_backend.py @ 129:43102fecb14b
Fix some typos.
Don't expect ids of removed items back from storage, but use the list we got
ourselves.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Sun, 24 Apr 2005 17:24:35 +0000 |
parents | 7d83fe9bdb65 |
children | 812300cdbc22 |
rev | line source |
---|---|
107 | 1 import sha |
2 import time | |
3 from twisted.words.protocols.jabber import jid | |
4 from twisted.application import service | |
5 from twisted.xish import utility | |
6 from twisted.internet import defer | |
7 from zope.interface import implements | |
129 | 8 import backend, storage |
107 | 9 |
10 def _get_affiliation(node, entity): | |
11 d = node.get_affiliation(entity) | |
12 d.addCallback(lambda affiliation: (node, affiliation)) | |
13 return d | |
14 | |
15 class BackendService(service.MultiService, utility.EventDispatcher): | |
16 | |
17 implements(backend.IBackendService) | |
18 | |
19 options = {"pubsub#persist_items": | |
20 {"type": "boolean", | |
21 "label": "Persist items to storage"}, | |
22 "pubsub#deliver_payloads": | |
23 {"type": "boolean", | |
24 "label": "Deliver payloads with event notifications"}, | |
25 } | |
26 | |
27 default_config = {"pubsub#persist_items": True, | |
28 "pubsub#deliver_payloads": True, | |
29 } | |
30 | |
31 def __init__(self, storage): | |
32 service.MultiService.__init__(self) | |
33 utility.EventDispatcher.__init__(self) | |
34 self.storage = storage | |
35 | |
36 def supports_publisher_affiliation(self): | |
37 return True | |
38 | |
39 def supports_outcast_affiliation(self): | |
40 return True | |
41 | |
42 def supports_persistent_items(self): | |
43 return True | |
44 | |
45 def get_node_type(self, node_id): | |
46 d = self.storage.get_node(node_id) | |
47 d.addCallback(lambda node: node.get_type()) | |
48 return d | |
49 | |
50 def get_nodes(self): | |
51 return self.storage.get_node_ids() | |
52 | |
53 def get_node_meta_data(self, node_id): | |
54 d = self.storage.get_node(node_id) | |
55 d.addCallback(lambda node: node.get_meta_data()) | |
56 d.addCallback(self._make_meta_data) | |
57 return d | |
58 | |
59 def _make_meta_data(self, meta_data): | |
60 options = [] | |
61 for key, value in meta_data.iteritems(): | |
62 if self.options.has_key(key): | |
63 option = {"var": key} | |
64 option.update(self.options[key]) | |
65 option["value"] = value | |
66 options.append(option) | |
67 | |
68 return options | |
69 | |
70 class PublishService(service.Service): | |
71 | |
72 implements(backend.IPublishService) | |
73 | |
74 def publish(self, node_id, items, requestor): | |
75 d = self.parent.storage.get_node(node_id) | |
76 d.addCallback(_get_affiliation, requestor) | |
77 d.addCallback(self._do_publish, items, requestor) | |
78 return d | |
79 | |
80 def _do_publish(self, result, items, requestor): | |
81 node, affiliation = result | |
82 configuration = node.get_configuration() | |
83 persist_items = configuration["pubsub#persist_items"] | |
84 deliver_payloads = configuration["pubsub#deliver_payloads"] | |
85 | |
86 if affiliation not in ['owner', 'publisher']: | |
87 raise backend.NotAuthorized | |
88 | |
89 if items and not persist_items and not deliver_payloads: | |
90 raise backend.NoPayloadAllowed | |
91 elif not items and (persist_items or deliver_payloads): | |
92 raise backend.PayloadExpected | |
93 | |
94 if persist_items or deliver_payloads: | |
95 for item in items: | |
96 if not item.getAttribute("id"): | |
97 item["id"] = sha.new(str(time.time()) + | |
98 requestor.full()).hexdigest() | |
99 | |
100 if persist_items: | |
101 d = node.store_items(items, requestor) | |
102 else: | |
103 d = defer.succeed(None) | |
104 | |
105 d.addCallback(self._do_notify, node.id, items, deliver_payloads) | |
129 | 106 return d |
107 | 107 |
108 def _do_notify(self, result, node_id, items, deliver_payloads): | |
109 if items and not deliver_payloads: | |
110 for item in items: | |
111 item.children = [] | |
112 | |
113 self.parent.dispatch({ 'items': items, 'node_id': node_id }, | |
114 '//event/pubsub/notify') | |
115 | |
116 class NotificationService(service.Service): | |
117 | |
118 implements(backend.INotificationService) | |
119 | |
120 def get_notification_list(self, node_id, items): | |
121 d = self.parent.storage.get_node(node_id) | |
122 d.addCallback(lambda node: node.get_subscribers()) | |
123 d.addCallback(self._magic_filter, node_id, items) | |
124 return d | |
125 | |
126 def _magic_filter(self, subscribers, node_id, items): | |
127 list = [] | |
128 for subscriber in subscribers: | |
129 list.append((subscriber, items)) | |
130 return list | |
131 | |
132 def register_notifier(self, observerfn, *args, **kwargs): | |
133 self.parent.addObserver('//event/pubsub/notify', observerfn, | |
134 *args, **kwargs) | |
135 | |
136 class SubscriptionService(service.Service): | |
137 | |
138 implements(backend.ISubscriptionService) | |
139 | |
140 def subscribe(self, node_id, subscriber, requestor): | |
141 subscriber_entity = subscriber.userhostJID() | |
142 if subscriber_entity != requestor: | |
143 return defer.fail(backend.NotAuthorized) | |
144 | |
145 d = self.parent.storage.get_node(node_id) | |
146 d.addCallback(_get_affiliation, subscriber_entity) | |
147 d.addCallback(self._do_subscribe, subscriber) | |
148 return d | |
149 | |
150 def _do_subscribe(self, result, subscriber): | |
151 node, affiliation = result | |
152 | |
153 if affiliation == 'outcast': | |
154 raise backend.NotAuthorized | |
155 | |
156 d = node.add_subscription(subscriber, 'subscribed') | |
118
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
157 d.addCallback(lambda _: 'subscribed') |
129 | 158 d.addErrback(self._get_subscription, node, subscriber) |
118
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
159 d.addCallback(self._return_subscription, affiliation, node.id) |
107 | 160 return d |
161 | |
129 | 162 def _get_subscription(self, failure, node, subscriber): |
163 failure.trap(storage.SubscriptionExists) | |
118
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
164 return node.get_subscription(subscriber) |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
165 |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
166 def _return_subscription(self, result, affiliation, node_id): |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
167 return {'affiliation': affiliation, |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
168 'node': node_id, |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
169 'state': result} |
107 | 170 |
171 def unsubscribe(self, node_id, subscriber, requestor): | |
172 if subscriber.userhostJID() != requestor: | |
173 raise backend.NotAuthorized | |
174 | |
175 d = self.parent.storage.get_node(node_id) | |
176 d.addCallback(lambda node: node.remove_subscription(subscriber)) | |
177 return d | |
178 | |
179 class NodeCreationService(service.Service): | |
180 | |
181 implements(backend.INodeCreationService) | |
182 | |
183 def supports_instant_nodes(self): | |
184 return True | |
185 | |
186 def create_node(self, node_id, requestor): | |
187 if not node_id: | |
188 node_id = 'generic/%s' % sha.new(str(time.time()) + | |
189 requestor.full()).hexdigest() | |
190 | |
191 d = self.parent.storage.create_node(node_id, requestor) | |
192 d.addCallback(lambda _: node_id) | |
193 return d | |
194 | |
195 def get_node_configuration(self, node_id): | |
196 if node_id: | |
197 d = self.parent.storage.get_node(node_id) | |
198 d.addCallback(lambda node: node.get_configuration()) | |
199 else: | |
200 # XXX: this is disabled in pubsub.py | |
201 d = defer.succeed(self.parent.default_config) | |
202 | |
203 d.addCallback(self._make_config) | |
204 return d | |
205 | |
206 def _make_config(self, config): | |
207 options = [] | |
208 for key, value in self.parent.options.iteritems(): | |
209 option = {"var": key} | |
210 option.update(value) | |
211 if config.has_key(key): | |
212 option["value"] = config[key] | |
213 options.append(option) | |
214 | |
215 return options | |
216 | |
217 def set_node_configuration(self, node_id, options, requestor): | |
218 for key, value in options.iteritems(): | |
219 if not self.parent.options.has_key(key): | |
220 raise backend.InvalidConfigurationOption | |
221 if self.parent.options[key]["type"] == 'boolean': | |
222 try: | |
223 options[key] = bool(int(value)) | |
224 except ValueError: | |
225 raise backend.InvalidConfigurationValue | |
226 | |
227 d = self.parent.storage.get_node(node_id) | |
228 d.addCallback(_get_affiliation, requestor) | |
229 d.addCallback(self._do_set_node_configuration, options) | |
230 return d | |
231 | |
232 def _do_set_node_configuration(self, result, options): | |
233 node, affiliation = result | |
234 | |
235 if affiliation != 'owner': | |
236 raise backend.NotAuthorized | |
237 | |
238 return node.set_configuration(options) | |
239 | |
240 class AffiliationsService(service.Service): | |
241 | |
242 implements(backend.IAffiliationsService) | |
243 | |
244 def get_affiliations(self, entity): | |
245 d1 = self.parent.storage.get_affiliations(entity) | |
246 d2 = self.parent.storage.get_subscriptions(entity) | |
247 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) | |
248 d.addErrback(lambda x: x.value[0]) | |
249 d.addCallback(self._affiliations_result, entity) | |
250 return d | |
251 | |
252 def _affiliations_result(self, result, entity): | |
253 affiliations = result[0][1] | |
254 subscriptions = result[1][1] | |
255 | |
256 new_affiliations = {} | |
257 | |
258 for node, affiliation in affiliations: | |
259 new_affiliations[(node, entity.full())] = {'node': node, | |
260 'jid': entity, | |
261 'affiliation': affiliation, | |
262 'subscription': None | |
263 } | |
264 | |
265 for node, subscriber, subscription in subscriptions: | |
266 key = node, subscriber.full() | |
267 if new_affiliations.has_key(key): | |
268 new_affiliations[key]['subscription'] = subscription | |
269 else: | |
270 new_affiliations[key] = {'node': node, | |
271 'jid': subscriber, | |
272 'affiliation': None, | |
273 'subscription': subscription} | |
274 | |
275 return new_affiliations.values() | |
276 | |
277 class ItemRetrievalService(service.Service): | |
278 | |
279 implements(backend.IItemRetrievalService) | |
280 | |
281 def get_items(self, node_id, requestor, max_items=None, item_ids=[]): | |
282 d = self.parent.storage.get_node(node_id) | |
283 d.addCallback(self._is_subscribed, requestor) | |
284 d.addCallback(self._do_get_items, max_items, item_ids) | |
285 return d | |
286 | |
287 def _is_subscribed(self, node, subscriber): | |
288 d = node.is_subscribed(subscriber) | |
289 d.addCallback(lambda subscribed: (node, subscribed)) | |
290 return d | |
291 | |
292 def _do_get_items(self, result, max_items, item_ids): | |
293 node, subscribed = result | |
294 | |
295 if not subscribed: | |
296 raise backend.NotAuthorized | |
297 | |
298 if item_ids: | |
299 return node.get_items_by_id(item_ids) | |
300 else: | |
301 return node.get_items(max_items) | |
302 | |
303 class RetractionService(service.Service): | |
304 | |
305 implements(backend.IRetractionService) | |
306 | |
307 def retract_item(self, node_id, item_ids, requestor): | |
308 d = self.parent.storage.get_node(node_id) | |
309 d.addCallback(_get_affiliation, requestor) | |
310 d.addCallback(self._do_retract, item_ids) | |
311 return d | |
312 | |
313 def _do_retract(self, result, item_ids): | |
314 node, affiliation = result | |
315 persist_items = node.get_configuration()["pubsub#persist_items"] | |
316 | |
317 if affiliation not in ['owner', 'publisher']: | |
318 raise backend.NotAuthorized | |
319 | |
320 if not persist_items: | |
321 raise backend.NodeNotPersistent | |
322 | |
323 d = node.remove_items(item_ids) | |
129 | 324 d.addCallback(self._do_notify_retraction, item_ids, node.id) |
107 | 325 return d |
326 | |
129 | 327 def _do_notify_retraction(self, result, item_ids, node_id): |
328 self.parent.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, | |
107 | 329 '//event/pubsub/retract') |
330 | |
331 def purge_node(self, node_id, requestor): | |
332 d = self.parent.storage.get_node(node_id) | |
333 d.addCallback(_get_affiliation, requestor) | |
334 d.addCallback(self._do_purge) | |
335 return d | |
336 | |
337 def _do_purge(self, result): | |
338 node, affiliation = result | |
339 persist_items = node.get_configuration()["pubsub#persist_items"] | |
340 | |
341 if affiliation != 'owner': | |
342 raise backend.NotAuthorized | |
343 | |
344 if not persist_items: | |
345 raise backend.NodeNotPersistent | |
346 | |
347 d = node.purge() | |
348 d.addCallback(self._do_notify_purge, node.id) | |
349 return d | |
350 | |
351 def _do_notify_purge(self, result, node_id): | |
352 self.parent.dispatch(node_id, '//event/pubsub/purge') | |
353 | |
354 class NodeDeletionService(service.Service): | |
355 | |
356 implements(backend.INodeDeletionService) | |
357 | |
358 def __init__(self): | |
359 self._callback_list = [] | |
360 | |
361 def register_pre_delete(self, pre_delete_fn): | |
362 self._callback_list.append(pre_delete_fn) | |
363 | |
364 def get_subscribers(self, node_id): | |
365 d = self.parent.storage.get_node(node_id) | |
366 d.addCallback(lambda node: node.get_subscribers()) | |
367 return d | |
368 | |
369 def delete_node(self, node_id, requestor): | |
370 d = self.parent.storage.get_node(node_id) | |
371 d.addCallback(_get_affiliation, requestor) | |
372 d.addCallback(self._do_pre_delete) | |
373 return d | |
374 | |
375 def _do_pre_delete(self, result): | |
376 node, affiliation = result | |
377 | |
378 if affiliation != 'owner': | |
379 raise backend.NotAuthorized | |
380 | |
381 d = defer.DeferredList([cb(node_id) for cb in self._callback_list], | |
382 consumeErrors=1) | |
383 d.addCallback(self._do_delete, node.id) | |
384 | |
385 def _do_delete(self, result, node_id): | |
386 dl = [] | |
387 for succeeded, r in result: | |
388 if succeeded and r: | |
389 dl.extend(r) | |
390 | |
391 d = self.parent.storage.delete_node(node_id) | |
392 d.addCallback(self._do_notify_delete, dl) | |
393 | |
394 return d | |
395 | |
396 def _do_notify_delete(self, result, dl): | |
397 for d in dl: | |
398 d.callback(None) |