107
|
1 import sha |
|
2 import time |
|
3 from twisted.words.protocols.jabber import jid |
|
4 from twisted.application import service |
|
5 from twisted.xish import utility |
|
6 from twisted.internet import defer |
|
7 from zope.interface import implements |
|
8 import backend |
|
9 |
|
10 def _get_affiliation(node, entity): |
|
11 d = node.get_affiliation(entity) |
|
12 d.addCallback(lambda affiliation: (node, affiliation)) |
|
13 return d |
|
14 |
|
15 class BackendService(service.MultiService, utility.EventDispatcher): |
|
16 |
|
17 implements(backend.IBackendService) |
|
18 |
|
19 options = {"pubsub#persist_items": |
|
20 {"type": "boolean", |
|
21 "label": "Persist items to storage"}, |
|
22 "pubsub#deliver_payloads": |
|
23 {"type": "boolean", |
|
24 "label": "Deliver payloads with event notifications"}, |
|
25 } |
|
26 |
|
27 default_config = {"pubsub#persist_items": True, |
|
28 "pubsub#deliver_payloads": True, |
|
29 } |
|
30 |
|
31 def __init__(self, storage): |
|
32 service.MultiService.__init__(self) |
|
33 utility.EventDispatcher.__init__(self) |
|
34 self.storage = storage |
|
35 |
|
36 def supports_publisher_affiliation(self): |
|
37 return True |
|
38 |
|
39 def supports_outcast_affiliation(self): |
|
40 return True |
|
41 |
|
42 def supports_persistent_items(self): |
|
43 return True |
|
44 |
|
45 def get_node_type(self, node_id): |
|
46 d = self.storage.get_node(node_id) |
|
47 d.addCallback(lambda node: node.get_type()) |
|
48 return d |
|
49 |
|
50 def get_nodes(self): |
|
51 return self.storage.get_node_ids() |
|
52 |
|
53 def get_node_meta_data(self, node_id): |
|
54 d = self.storage.get_node(node_id) |
|
55 d.addCallback(lambda node: node.get_meta_data()) |
|
56 d.addCallback(self._make_meta_data) |
|
57 return d |
|
58 |
|
59 def _make_meta_data(self, meta_data): |
|
60 options = [] |
|
61 for key, value in meta_data.iteritems(): |
|
62 if self.options.has_key(key): |
|
63 option = {"var": key} |
|
64 option.update(self.options[key]) |
|
65 option["value"] = value |
|
66 options.append(option) |
|
67 |
|
68 return options |
|
69 |
|
70 class PublishService(service.Service): |
|
71 |
|
72 implements(backend.IPublishService) |
|
73 |
|
74 def publish(self, node_id, items, requestor): |
|
75 d = self.parent.storage.get_node(node_id) |
|
76 d.addCallback(_get_affiliation, requestor) |
|
77 d.addCallback(self._do_publish, items, requestor) |
|
78 return d |
|
79 |
|
80 def _do_publish(self, result, items, requestor): |
|
81 node, affiliation = result |
|
82 configuration = node.get_configuration() |
|
83 persist_items = configuration["pubsub#persist_items"] |
|
84 deliver_payloads = configuration["pubsub#deliver_payloads"] |
|
85 |
|
86 if affiliation not in ['owner', 'publisher']: |
|
87 raise backend.NotAuthorized |
|
88 |
|
89 if items and not persist_items and not deliver_payloads: |
|
90 raise backend.NoPayloadAllowed |
|
91 elif not items and (persist_items or deliver_payloads): |
|
92 raise backend.PayloadExpected |
|
93 |
|
94 if persist_items or deliver_payloads: |
|
95 for item in items: |
|
96 if not item.getAttribute("id"): |
|
97 item["id"] = sha.new(str(time.time()) + |
|
98 requestor.full()).hexdigest() |
|
99 |
|
100 if persist_items: |
|
101 d = node.store_items(items, requestor) |
|
102 else: |
|
103 d = defer.succeed(None) |
|
104 |
|
105 d.addCallback(self._do_notify, node.id, items, deliver_payloads) |
|
106 |
|
107 def _do_notify(self, result, node_id, items, deliver_payloads): |
|
108 if items and not deliver_payloads: |
|
109 for item in items: |
|
110 item.children = [] |
|
111 |
|
112 self.parent.dispatch({ 'items': items, 'node_id': node_id }, |
|
113 '//event/pubsub/notify') |
|
114 |
|
115 class NotificationService(service.Service): |
|
116 |
|
117 implements(backend.INotificationService) |
|
118 |
|
119 def get_notification_list(self, node_id, items): |
|
120 d = self.parent.storage.get_node(node_id) |
|
121 d.addCallback(lambda node: node.get_subscribers()) |
|
122 d.addCallback(self._magic_filter, node_id, items) |
|
123 return d |
|
124 |
|
125 def _magic_filter(self, subscribers, node_id, items): |
|
126 list = [] |
|
127 for subscriber in subscribers: |
|
128 list.append((subscriber, items)) |
|
129 return list |
|
130 |
|
131 def register_notifier(self, observerfn, *args, **kwargs): |
|
132 self.parent.addObserver('//event/pubsub/notify', observerfn, |
|
133 *args, **kwargs) |
|
134 |
|
135 class SubscriptionService(service.Service): |
|
136 |
|
137 implements(backend.ISubscriptionService) |
|
138 |
|
139 def subscribe(self, node_id, subscriber, requestor): |
|
140 subscriber_entity = subscriber.userhostJID() |
|
141 if subscriber_entity != requestor: |
|
142 return defer.fail(backend.NotAuthorized) |
|
143 |
|
144 d = self.parent.storage.get_node(node_id) |
|
145 d.addCallback(_get_affiliation, subscriber_entity) |
|
146 d.addCallback(self._do_subscribe, subscriber) |
|
147 return d |
|
148 |
|
149 def _do_subscribe(self, result, subscriber): |
|
150 node, affiliation = result |
|
151 |
|
152 if affiliation == 'outcast': |
|
153 raise backend.NotAuthorized |
|
154 |
|
155 d = node.add_subscription(subscriber, 'subscribed') |
|
156 d.addCallback(self._return_subscription, affiliation) |
|
157 return d |
|
158 |
|
159 def _return_subscription(self, result, affiliation): |
|
160 result['affiliation'] = affiliation |
|
161 return result |
|
162 |
|
163 def unsubscribe(self, node_id, subscriber, requestor): |
|
164 if subscriber.userhostJID() != requestor: |
|
165 raise backend.NotAuthorized |
|
166 |
|
167 d = self.parent.storage.get_node(node_id) |
|
168 d.addCallback(lambda node: node.remove_subscription(subscriber)) |
|
169 return d |
|
170 |
|
171 class NodeCreationService(service.Service): |
|
172 |
|
173 implements(backend.INodeCreationService) |
|
174 |
|
175 def supports_instant_nodes(self): |
|
176 return True |
|
177 |
|
178 def create_node(self, node_id, requestor): |
|
179 if not node_id: |
|
180 node_id = 'generic/%s' % sha.new(str(time.time()) + |
|
181 requestor.full()).hexdigest() |
|
182 |
|
183 d = self.parent.storage.create_node(node_id, requestor) |
|
184 d.addCallback(lambda _: node_id) |
|
185 return d |
|
186 |
|
187 def get_node_configuration(self, node_id): |
|
188 if node_id: |
|
189 d = self.parent.storage.get_node(node_id) |
|
190 d.addCallback(lambda node: node.get_configuration()) |
|
191 else: |
|
192 # XXX: this is disabled in pubsub.py |
|
193 d = defer.succeed(self.parent.default_config) |
|
194 |
|
195 d.addCallback(self._make_config) |
|
196 return d |
|
197 |
|
198 def _make_config(self, config): |
|
199 options = [] |
|
200 for key, value in self.parent.options.iteritems(): |
|
201 option = {"var": key} |
|
202 option.update(value) |
|
203 if config.has_key(key): |
|
204 option["value"] = config[key] |
|
205 options.append(option) |
|
206 |
|
207 return options |
|
208 |
|
209 def set_node_configuration(self, node_id, options, requestor): |
|
210 for key, value in options.iteritems(): |
|
211 if not self.parent.options.has_key(key): |
|
212 raise backend.InvalidConfigurationOption |
|
213 if self.parent.options[key]["type"] == 'boolean': |
|
214 try: |
|
215 options[key] = bool(int(value)) |
|
216 except ValueError: |
|
217 raise backend.InvalidConfigurationValue |
|
218 |
|
219 d = self.parent.storage.get_node(node_id) |
|
220 d.addCallback(_get_affiliation, requestor) |
|
221 d.addCallback(self._do_set_node_configuration, options) |
|
222 return d |
|
223 |
|
224 def _do_set_node_configuration(self, result, options): |
|
225 node, affiliation = result |
|
226 |
|
227 if affiliation != 'owner': |
|
228 raise backend.NotAuthorized |
|
229 |
|
230 return node.set_configuration(options) |
|
231 |
|
232 class AffiliationsService(service.Service): |
|
233 |
|
234 implements(backend.IAffiliationsService) |
|
235 |
|
236 def get_affiliations(self, entity): |
|
237 d1 = self.parent.storage.get_affiliations(entity) |
|
238 d2 = self.parent.storage.get_subscriptions(entity) |
|
239 d = defer.DeferredList([d1, d2], fireOnOneErrback=1, consumeErrors=1) |
|
240 d.addErrback(lambda x: x.value[0]) |
|
241 d.addCallback(self._affiliations_result, entity) |
|
242 return d |
|
243 |
|
244 def _affiliations_result(self, result, entity): |
|
245 affiliations = result[0][1] |
|
246 subscriptions = result[1][1] |
|
247 |
|
248 new_affiliations = {} |
|
249 |
|
250 for node, affiliation in affiliations: |
|
251 new_affiliations[(node, entity.full())] = {'node': node, |
|
252 'jid': entity, |
|
253 'affiliation': affiliation, |
|
254 'subscription': None |
|
255 } |
|
256 |
|
257 for node, subscriber, subscription in subscriptions: |
|
258 key = node, subscriber.full() |
|
259 if new_affiliations.has_key(key): |
|
260 new_affiliations[key]['subscription'] = subscription |
|
261 else: |
|
262 new_affiliations[key] = {'node': node, |
|
263 'jid': subscriber, |
|
264 'affiliation': None, |
|
265 'subscription': subscription} |
|
266 |
|
267 return new_affiliations.values() |
|
268 |
|
269 class ItemRetrievalService(service.Service): |
|
270 |
|
271 implements(backend.IItemRetrievalService) |
|
272 |
|
273 def get_items(self, node_id, requestor, max_items=None, item_ids=[]): |
|
274 d = self.parent.storage.get_node(node_id) |
|
275 d.addCallback(self._is_subscribed, requestor) |
|
276 d.addCallback(self._do_get_items, max_items, item_ids) |
|
277 return d |
|
278 |
|
279 def _is_subscribed(self, node, subscriber): |
|
280 d = node.is_subscribed(subscriber) |
|
281 d.addCallback(lambda subscribed: (node, subscribed)) |
|
282 return d |
|
283 |
|
284 def _do_get_items(self, result, max_items, item_ids): |
|
285 node, subscribed = result |
|
286 |
|
287 if not subscribed: |
|
288 raise backend.NotAuthorized |
|
289 |
|
290 if item_ids: |
|
291 return node.get_items_by_id(item_ids) |
|
292 else: |
|
293 return node.get_items(max_items) |
|
294 |
|
295 class RetractionService(service.Service): |
|
296 |
|
297 implements(backend.IRetractionService) |
|
298 |
|
299 def retract_item(self, node_id, item_ids, requestor): |
|
300 d = self.parent.storage.get_node(node_id) |
|
301 d.addCallback(_get_affiliation, requestor) |
|
302 d.addCallback(self._do_retract, item_ids) |
|
303 return d |
|
304 |
|
305 def _do_retract(self, result, item_ids): |
|
306 node, affiliation = result |
|
307 persist_items = node.get_configuration()["pubsub#persist_items"] |
|
308 |
|
309 if affiliation not in ['owner', 'publisher']: |
|
310 raise backend.NotAuthorized |
|
311 |
|
312 if not persist_items: |
|
313 raise backend.NodeNotPersistent |
|
314 |
|
315 d = node.remove_items(item_ids) |
|
316 d.addCallback(self._do_notify_retraction, node.id) |
|
317 return d |
|
318 |
|
319 def _do_notify_retraction(self, result, node_id): |
|
320 self.parent.dispatch({ 'item_ids': result, 'node_id': node_id }, |
|
321 '//event/pubsub/retract') |
|
322 |
|
323 def purge_node(self, node_id, requestor): |
|
324 d = self.parent.storage.get_node(node_id) |
|
325 d.addCallback(_get_affiliation, requestor) |
|
326 d.addCallback(self._do_purge) |
|
327 return d |
|
328 |
|
329 def _do_purge(self, result): |
|
330 node, affiliation = result |
|
331 persist_items = node.get_configuration()["pubsub#persist_items"] |
|
332 |
|
333 if affiliation != 'owner': |
|
334 raise backend.NotAuthorized |
|
335 |
|
336 if not persist_items: |
|
337 raise backend.NodeNotPersistent |
|
338 |
|
339 d = node.purge() |
|
340 d.addCallback(self._do_notify_purge, node.id) |
|
341 return d |
|
342 |
|
343 def _do_notify_purge(self, result, node_id): |
|
344 self.parent.dispatch(node_id, '//event/pubsub/purge') |
|
345 |
|
346 class NodeDeletionService(service.Service): |
|
347 |
|
348 implements(backend.INodeDeletionService) |
|
349 |
|
350 def __init__(self): |
|
351 self._callback_list = [] |
|
352 |
|
353 def register_pre_delete(self, pre_delete_fn): |
|
354 self._callback_list.append(pre_delete_fn) |
|
355 |
|
356 def get_subscribers(self, node_id): |
|
357 d = self.parent.storage.get_node(node_id) |
|
358 d.addCallback(lambda node: node.get_subscribers()) |
|
359 return d |
|
360 |
|
361 def delete_node(self, node_id, requestor): |
|
362 d = self.parent.storage.get_node(node_id) |
|
363 d.addCallback(_get_affiliation, requestor) |
|
364 d.addCallback(self._do_pre_delete) |
|
365 return d |
|
366 |
|
367 def _do_pre_delete(self, result): |
|
368 node, affiliation = result |
|
369 |
|
370 if affiliation != 'owner': |
|
371 raise backend.NotAuthorized |
|
372 |
|
373 d = defer.DeferredList([cb(node_id) for cb in self._callback_list], |
|
374 consumeErrors=1) |
|
375 d.addCallback(self._do_delete, node.id) |
|
376 |
|
377 def _do_delete(self, result, node_id): |
|
378 dl = [] |
|
379 for succeeded, r in result: |
|
380 if succeeded and r: |
|
381 dl.extend(r) |
|
382 |
|
383 d = self.parent.storage.delete_node(node_id) |
|
384 d.addCallback(self._do_notify_delete, dl) |
|
385 |
|
386 return d |
|
387 |
|
388 def _do_notify_delete(self, result, dl): |
|
389 for d in dl: |
|
390 d.callback(None) |