Mercurial > libervia-pubsub
annotate idavoll/generic_backend.py @ 118:7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
already exists.
Added storage.INode.get_subscription() for retrieving the subscription state.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 12 Apr 2005 12:15:44 +0000 |
parents | d252d793f0ed |
children | 43102fecb14b |
rev | line source |
---|---|
107 | 1 import sha |
2 import time | |
3 from twisted.words.protocols.jabber import jid | |
4 from twisted.application import service | |
5 from twisted.xish import utility | |
6 from twisted.internet import defer | |
7 from zope.interface import implements | |
8 import backend | |
9 | |
10 def _get_affiliation(node, entity): | |
11 d = node.get_affiliation(entity) | |
12 d.addCallback(lambda affiliation: (node, affiliation)) | |
13 return d | |
14 | |
15 class BackendService(service.MultiService, utility.EventDispatcher): | |
16 | |
17 implements(backend.IBackendService) | |
18 | |
19 options = {"pubsub#persist_items": | |
20 {"type": "boolean", | |
21 "label": "Persist items to storage"}, | |
22 "pubsub#deliver_payloads": | |
23 {"type": "boolean", | |
24 "label": "Deliver payloads with event notifications"}, | |
25 } | |
26 | |
27 default_config = {"pubsub#persist_items": True, | |
28 "pubsub#deliver_payloads": True, | |
29 } | |
30 | |
31 def __init__(self, storage): | |
32 service.MultiService.__init__(self) | |
33 utility.EventDispatcher.__init__(self) | |
34 self.storage = storage | |
35 | |
36 def supports_publisher_affiliation(self): | |
37 return True | |
38 | |
39 def supports_outcast_affiliation(self): | |
40 return True | |
41 | |
42 def supports_persistent_items(self): | |
43 return True | |
44 | |
45 def get_node_type(self, node_id): | |
46 d = self.storage.get_node(node_id) | |
47 d.addCallback(lambda node: node.get_type()) | |
48 return d | |
49 | |
50 def get_nodes(self): | |
51 return self.storage.get_node_ids() | |
52 | |
53 def get_node_meta_data(self, node_id): | |
54 d = self.storage.get_node(node_id) | |
55 d.addCallback(lambda node: node.get_meta_data()) | |
56 d.addCallback(self._make_meta_data) | |
57 return d | |
58 | |
59 def _make_meta_data(self, meta_data): | |
60 options = [] | |
61 for key, value in meta_data.iteritems(): | |
62 if self.options.has_key(key): | |
63 option = {"var": key} | |
64 option.update(self.options[key]) | |
65 option["value"] = value | |
66 options.append(option) | |
67 | |
68 return options | |
69 | |
70 class PublishService(service.Service): | |
71 | |
72 implements(backend.IPublishService) | |
73 | |
74 def publish(self, node_id, items, requestor): | |
75 d = self.parent.storage.get_node(node_id) | |
76 d.addCallback(_get_affiliation, requestor) | |
77 d.addCallback(self._do_publish, items, requestor) | |
78 return d | |
79 | |
80 def _do_publish(self, result, items, requestor): | |
81 node, affiliation = result | |
82 configuration = node.get_configuration() | |
83 persist_items = configuration["pubsub#persist_items"] | |
84 deliver_payloads = configuration["pubsub#deliver_payloads"] | |
85 | |
86 if affiliation not in ['owner', 'publisher']: | |
87 raise backend.NotAuthorized | |
88 | |
89 if items and not persist_items and not deliver_payloads: | |
90 raise backend.NoPayloadAllowed | |
91 elif not items and (persist_items or deliver_payloads): | |
92 raise backend.PayloadExpected | |
93 | |
94 if persist_items or deliver_payloads: | |
95 for item in items: | |
96 if not item.getAttribute("id"): | |
97 item["id"] = sha.new(str(time.time()) + | |
98 requestor.full()).hexdigest() | |
99 | |
100 if persist_items: | |
101 d = node.store_items(items, requestor) | |
102 else: | |
103 d = defer.succeed(None) | |
104 | |
105 d.addCallback(self._do_notify, node.id, items, deliver_payloads) | |
106 | |
107 def _do_notify(self, result, node_id, items, deliver_payloads): | |
108 if items and not deliver_payloads: | |
109 for item in items: | |
110 item.children = [] | |
111 | |
112 self.parent.dispatch({ 'items': items, 'node_id': node_id }, | |
113 '//event/pubsub/notify') | |
114 | |
115 class NotificationService(service.Service): | |
116 | |
117 implements(backend.INotificationService) | |
118 | |
119 def get_notification_list(self, node_id, items): | |
120 d = self.parent.storage.get_node(node_id) | |
121 d.addCallback(lambda node: node.get_subscribers()) | |
122 d.addCallback(self._magic_filter, node_id, items) | |
123 return d | |
124 | |
125 def _magic_filter(self, subscribers, node_id, items): | |
126 list = [] | |
127 for subscriber in subscribers: | |
128 list.append((subscriber, items)) | |
129 return list | |
130 | |
131 def register_notifier(self, observerfn, *args, **kwargs): | |
132 self.parent.addObserver('//event/pubsub/notify', observerfn, | |
133 *args, **kwargs) | |
134 | |
135 class SubscriptionService(service.Service): | |
136 | |
137 implements(backend.ISubscriptionService) | |
138 | |
139 def subscribe(self, node_id, subscriber, requestor): | |
140 subscriber_entity = subscriber.userhostJID() | |
141 if subscriber_entity != requestor: | |
142 return defer.fail(backend.NotAuthorized) | |
143 | |
144 d = self.parent.storage.get_node(node_id) | |
145 d.addCallback(_get_affiliation, subscriber_entity) | |
146 d.addCallback(self._do_subscribe, subscriber) | |
147 return d | |
148 | |
149 def _do_subscribe(self, result, subscriber): | |
150 node, affiliation = result | |
151 | |
152 if affiliation == 'outcast': | |
153 raise backend.NotAuthorized | |
154 | |
155 d = node.add_subscription(subscriber, 'subscribed') | |
118
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
156 d.addCallback(lambda _: 'subscribed') |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
157 d.addErrback(self._get_subscription, node) |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
158 d.addCallback(self._return_subscription, affiliation, node.id) |
107 | 159 return d |
160 | |
118
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
161 def _get_subscription(self, failure, node): |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
162 failure.Trap(storage.SubscriptionExists) |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
163 return node.get_subscription(subscriber) |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
164 |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
165 def _return_subscription(self, result, affiliation, node_id): |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
166 return {'affiliation': affiliation, |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
167 'node': node_id, |
7d83fe9bdb65
Change storage.INode.add_subscription() to return a Failure when a subscription
Ralph Meijer <ralphm@ik.nu>
parents:
107
diff
changeset
|
168 'state': result} |
107 | 169 |
170 def unsubscribe(self, node_id, subscriber, requestor): | |
171 if subscriber.userhostJID() != requestor: | |
172 raise backend.NotAuthorized | |
173 | |
174 d = self.parent.storage.get_node(node_id) | |
175 d.addCallback(lambda node: node.remove_subscription(subscriber)) | |
176 return d | |
177 | |
178 class NodeCreationService(service.Service): | |
179 | |
180 implements(backend.INodeCreationService) | |
181 | |
182 def supports_instant_nodes(self): | |
183 return True | |
184 | |
185 def create_node(self, node_id, requestor): | |
186 if not node_id: | |
187 node_id = 'generic/%s' % sha.new(str(time.time()) + | |
188 requestor.full()).hexdigest() | |
189 | |
190 d = self.parent.storage.create_node(node_id, requestor) | |
191 d.addCallback(lambda _: node_id) | |
192 return d | |
193 | |
194 def get_node_configuration(self, node_id): | |
195 if node_id: | |
196 d = self.parent.storage.get_node(node_id) | |
197 d.addCallback(lambda node: node.get_configuration()) | |
198 else: | |
199 # XXX: this is disabled in pubsub.py | |
200 d = defer.succeed(self.parent.default_config) | |
201 | |
202 d.addCallback(self._make_config) | |
203 return d | |
204 | |
205 def _make_config(self, config): | |
206 options = [] | |
207 for key, value in self.parent.options.iteritems(): | |
208 option = {"var": key} | |
209 option.update(value) | |
210 if config.has_key(key): | |
211 option["value"] = config[key] | |
212 options.append(option) | |
213 | |
214 return options | |
215 | |
216 def set_node_configuration(self, node_id, options, requestor): | |
217 for key, value in options.iteritems(): | |
218 if not self.parent.options.has_key(key): | |
219 raise backend.InvalidConfigurationOption | |
220 if self.parent.options[key]["type"] == 'boolean': | |
221 try: | |
222 options[key] = bool(int(value)) | |
223 except ValueError: | |
224 raise backend.InvalidConfigurationValue | |
225 | |
226 d = self.parent.storage.get_node(node_id) | |
227 d.addCallback(_get_affiliation, requestor) | |
228 d.addCallback(self._do_set_node_configuration, options) | |
229 return d | |
230 | |
231 def _do_set_node_configuration(self, result, options): | |
232 node, affiliation = result | |
233 | |
234 if affiliation != 'owner': | |
235 raise backend.NotAuthorized | |
236 | |
237 return node.set_configuration(options) | |
238 | |
239 class AffiliationsService(service.Service): | |
240 | |
241 implements(backend.IAffiliationsService) | |
242 | |
243 def get_affiliations(self, entity): | |
244 d1 = self.parent.storage.get_affiliations(entity) | |
245 d2 = self.parent.storage.get_subscriptions(entity) | |
246 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) | |
247 d.addErrback(lambda x: x.value[0]) | |
248 d.addCallback(self._affiliations_result, entity) | |
249 return d | |
250 | |
251 def _affiliations_result(self, result, entity): | |
252 affiliations = result[0][1] | |
253 subscriptions = result[1][1] | |
254 | |
255 new_affiliations = {} | |
256 | |
257 for node, affiliation in affiliations: | |
258 new_affiliations[(node, entity.full())] = {'node': node, | |
259 'jid': entity, | |
260 'affiliation': affiliation, | |
261 'subscription': None | |
262 } | |
263 | |
264 for node, subscriber, subscription in subscriptions: | |
265 key = node, subscriber.full() | |
266 if new_affiliations.has_key(key): | |
267 new_affiliations[key]['subscription'] = subscription | |
268 else: | |
269 new_affiliations[key] = {'node': node, | |
270 'jid': subscriber, | |
271 'affiliation': None, | |
272 'subscription': subscription} | |
273 | |
274 return new_affiliations.values() | |
275 | |
276 class ItemRetrievalService(service.Service): | |
277 | |
278 implements(backend.IItemRetrievalService) | |
279 | |
280 def get_items(self, node_id, requestor, max_items=None, item_ids=[]): | |
281 d = self.parent.storage.get_node(node_id) | |
282 d.addCallback(self._is_subscribed, requestor) | |
283 d.addCallback(self._do_get_items, max_items, item_ids) | |
284 return d | |
285 | |
286 def _is_subscribed(self, node, subscriber): | |
287 d = node.is_subscribed(subscriber) | |
288 d.addCallback(lambda subscribed: (node, subscribed)) | |
289 return d | |
290 | |
291 def _do_get_items(self, result, max_items, item_ids): | |
292 node, subscribed = result | |
293 | |
294 if not subscribed: | |
295 raise backend.NotAuthorized | |
296 | |
297 if item_ids: | |
298 return node.get_items_by_id(item_ids) | |
299 else: | |
300 return node.get_items(max_items) | |
301 | |
302 class RetractionService(service.Service): | |
303 | |
304 implements(backend.IRetractionService) | |
305 | |
306 def retract_item(self, node_id, item_ids, requestor): | |
307 d = self.parent.storage.get_node(node_id) | |
308 d.addCallback(_get_affiliation, requestor) | |
309 d.addCallback(self._do_retract, item_ids) | |
310 return d | |
311 | |
312 def _do_retract(self, result, item_ids): | |
313 node, affiliation = result | |
314 persist_items = node.get_configuration()["pubsub#persist_items"] | |
315 | |
316 if affiliation not in ['owner', 'publisher']: | |
317 raise backend.NotAuthorized | |
318 | |
319 if not persist_items: | |
320 raise backend.NodeNotPersistent | |
321 | |
322 d = node.remove_items(item_ids) | |
323 d.addCallback(self._do_notify_retraction, node.id) | |
324 return d | |
325 | |
326 def _do_notify_retraction(self, result, node_id): | |
327 self.parent.dispatch({ 'item_ids': result, 'node_id': node_id }, | |
328 '//event/pubsub/retract') | |
329 | |
330 def purge_node(self, node_id, requestor): | |
331 d = self.parent.storage.get_node(node_id) | |
332 d.addCallback(_get_affiliation, requestor) | |
333 d.addCallback(self._do_purge) | |
334 return d | |
335 | |
336 def _do_purge(self, result): | |
337 node, affiliation = result | |
338 persist_items = node.get_configuration()["pubsub#persist_items"] | |
339 | |
340 if affiliation != 'owner': | |
341 raise backend.NotAuthorized | |
342 | |
343 if not persist_items: | |
344 raise backend.NodeNotPersistent | |
345 | |
346 d = node.purge() | |
347 d.addCallback(self._do_notify_purge, node.id) | |
348 return d | |
349 | |
350 def _do_notify_purge(self, result, node_id): | |
351 self.parent.dispatch(node_id, '//event/pubsub/purge') | |
352 | |
353 class NodeDeletionService(service.Service): | |
354 | |
355 implements(backend.INodeDeletionService) | |
356 | |
357 def __init__(self): | |
358 self._callback_list = [] | |
359 | |
360 def register_pre_delete(self, pre_delete_fn): | |
361 self._callback_list.append(pre_delete_fn) | |
362 | |
363 def get_subscribers(self, node_id): | |
364 d = self.parent.storage.get_node(node_id) | |
365 d.addCallback(lambda node: node.get_subscribers()) | |
366 return d | |
367 | |
368 def delete_node(self, node_id, requestor): | |
369 d = self.parent.storage.get_node(node_id) | |
370 d.addCallback(_get_affiliation, requestor) | |
371 d.addCallback(self._do_pre_delete) | |
372 return d | |
373 | |
374 def _do_pre_delete(self, result): | |
375 node, affiliation = result | |
376 | |
377 if affiliation != 'owner': | |
378 raise backend.NotAuthorized | |
379 | |
380 d = defer.DeferredList([cb(node_id) for cb in self._callback_list], | |
381 consumeErrors=1) | |
382 d.addCallback(self._do_delete, node.id) | |
383 | |
384 def _do_delete(self, result, node_id): | |
385 dl = [] | |
386 for succeeded, r in result: | |
387 if succeeded and r: | |
388 dl.extend(r) | |
389 | |
390 d = self.parent.storage.delete_node(node_id) | |
391 d.addCallback(self._do_notify_delete, dl) | |
392 | |
393 return d | |
394 | |
395 def _do_notify_delete(self, result, dl): | |
396 for d in dl: | |
397 d.callback(None) |