Mercurial > libervia-pubsub
comparison idavoll/backend.py @ 198:e404775b12df
Change naming and spacing conventions to match Twisted's.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 10 Jun 2008 11:31:49 +0000 |
parents | 00a6dbfbee42 |
children | 2189c663ba44 |
comparison
equal
deleted
inserted
replaced
197:9da5a95d408d | 198:e404775b12df |
---|---|
1 # -*- test-case-name: idavoll.test.test_backend -*- | 1 # -*- test-case-name: idavoll.test.test_backend -*- |
2 # | 2 # |
3 # Copyright (c) 2003-2008 Ralph Meijer | 3 # Copyright (c) 2003-2008 Ralph Meijer |
4 # See LICENSE for details. | 4 # See LICENSE for details. |
5 | |
6 """ | |
7 Generic publish-subscribe backend. | |
8 | |
9 This module implements a generic publish-subscribe backend service with | |
10 business logic as per | |
11 L{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>)} that interacts with | |
12 a given storage facility. It also provides an adapter from the XMPP | |
13 publish-subscribe protocol. | |
14 """ | |
5 | 15 |
6 import uuid | 16 import uuid |
7 | 17 |
8 from zope.interface import implements | 18 from zope.interface import implements |
9 | 19 |
17 from wokkel.pubsub import PubSubService, PubSubError | 27 from wokkel.pubsub import PubSubService, PubSubError |
18 | 28 |
19 from idavoll import error, iidavoll | 29 from idavoll import error, iidavoll |
20 from idavoll.iidavoll import IBackendService | 30 from idavoll.iidavoll import IBackendService |
21 | 31 |
22 def _get_affiliation(node, entity): | 32 def _getAffiliation(node, entity): |
23 d = node.get_affiliation(entity) | 33 d = node.getAffiliation(entity) |
24 d.addCallback(lambda affiliation: (node, affiliation)) | 34 d.addCallback(lambda affiliation: (node, affiliation)) |
25 return d | 35 return d |
26 | 36 |
27 | 37 |
38 | |
28 class BackendService(service.Service, utility.EventDispatcher): | 39 class BackendService(service.Service, utility.EventDispatcher): |
40 """ | |
41 Generic publish-subscribe backend service. | |
42 | |
43 @cvar options: Node configuration form as a mapping from the field | |
44 name to a dictionary that holds the field's type, | |
45 label and possible options to choose from. | |
46 @type options: C{dict}. | |
47 @cvar defaultConfig: The default node configuration. | |
48 """ | |
29 | 49 |
30 implements(iidavoll.IBackendService) | 50 implements(iidavoll.IBackendService) |
31 | 51 |
32 options = {"pubsub#persist_items": | 52 options = {"pubsub#persist_items": |
33 {"type": "boolean", | 53 {"type": "boolean", |
43 "on_sub": "When a new subscription is processed", | 63 "on_sub": "When a new subscription is processed", |
44 } | 64 } |
45 }, | 65 }, |
46 } | 66 } |
47 | 67 |
48 default_config = {"pubsub#persist_items": True, | 68 defaultConfig = {"pubsub#persist_items": True, |
49 "pubsub#deliver_payloads": True, | 69 "pubsub#deliver_payloads": True, |
50 "pubsub#send_last_published_item": 'on_sub', | 70 "pubsub#send_last_published_item": 'on_sub', |
51 } | 71 } |
52 | 72 |
53 def __init__(self, storage): | 73 def __init__(self, storage): |
54 utility.EventDispatcher.__init__(self) | 74 utility.EventDispatcher.__init__(self) |
55 self.storage = storage | 75 self.storage = storage |
56 self._callback_list = [] | 76 self._callbackList = [] |
57 | 77 |
58 def supports_publisher_affiliation(self): | 78 |
79 def supportsPublisherAffiliation(self): | |
59 return True | 80 return True |
60 | 81 |
61 def supports_outcast_affiliation(self): | 82 |
83 def supportsOutcastAffiliation(self): | |
62 return True | 84 return True |
63 | 85 |
64 def supports_persistent_items(self): | 86 |
87 def supportsPersistentItems(self): | |
65 return True | 88 return True |
66 | 89 |
67 def get_node_type(self, node_id): | 90 |
68 d = self.storage.get_node(node_id) | 91 def getNodeType(self, nodeIdentifier): |
69 d.addCallback(lambda node: node.get_type()) | 92 d = self.storage.getNode(nodeIdentifier) |
70 return d | 93 d.addCallback(lambda node: node.getType()) |
71 | 94 return d |
72 def get_nodes(self): | 95 |
73 return self.storage.get_node_ids() | 96 |
74 | 97 def getNodes(self): |
75 def get_node_meta_data(self, node_id): | 98 return self.storage.getNodeIds() |
76 d = self.storage.get_node(node_id) | 99 |
77 d.addCallback(lambda node: node.get_meta_data()) | 100 |
78 d.addCallback(self._make_meta_data) | 101 def getNodeMetaData(self, nodeIdentifier): |
79 return d | 102 d = self.storage.getNode(nodeIdentifier) |
80 | 103 d.addCallback(lambda node: node.getMetaData()) |
81 def _make_meta_data(self, meta_data): | 104 d.addCallback(self._makeMetaData) |
105 return d | |
106 | |
107 | |
108 def _makeMetaData(self, metaData): | |
82 options = [] | 109 options = [] |
83 for key, value in meta_data.iteritems(): | 110 for key, value in metaData.iteritems(): |
84 if self.options.has_key(key): | 111 if self.options.has_key(key): |
85 option = {"var": key} | 112 option = {"var": key} |
86 option.update(self.options[key]) | 113 option.update(self.options[key]) |
87 option["value"] = value | 114 option["value"] = value |
88 options.append(option) | 115 options.append(option) |
89 | 116 |
90 return options | 117 return options |
91 | 118 |
92 def _check_auth(self, node, requestor): | 119 |
120 def _checkAuth(self, node, requestor): | |
93 def check(affiliation, node): | 121 def check(affiliation, node): |
94 if affiliation not in ['owner', 'publisher']: | 122 if affiliation not in ['owner', 'publisher']: |
95 raise error.Forbidden() | 123 raise error.Forbidden() |
96 return node | 124 return node |
97 | 125 |
98 d = node.get_affiliation(requestor) | 126 d = node.getAffiliation(requestor) |
99 d.addCallback(check, node) | 127 d.addCallback(check, node) |
100 return d | 128 return d |
101 | 129 |
102 def publish(self, node_id, items, requestor): | 130 |
103 d = self.storage.get_node(node_id) | 131 def publish(self, nodeIdentifier, items, requestor): |
104 d.addCallback(self._check_auth, requestor) | 132 d = self.storage.getNode(nodeIdentifier) |
105 d.addCallback(self._do_publish, items, requestor) | 133 d.addCallback(self._checkAuth, requestor) |
106 return d | 134 d.addCallback(self._doPublish, items, requestor) |
107 | 135 return d |
108 def _do_publish(self, node, items, requestor): | 136 |
109 configuration = node.get_configuration() | 137 |
110 persist_items = configuration["pubsub#persist_items"] | 138 def _doPublish(self, node, items, requestor): |
111 deliver_payloads = configuration["pubsub#deliver_payloads"] | 139 configuration = node.getConfiguration() |
112 | 140 persistItems = configuration["pubsub#persist_items"] |
113 if items and not persist_items and not deliver_payloads: | 141 deliverPayloads = configuration["pubsub#deliver_payloads"] |
142 | |
143 if items and not persistItems and not deliverPayloads: | |
114 raise error.ItemForbidden() | 144 raise error.ItemForbidden() |
115 elif not items and (persist_items or deliver_payloads): | 145 elif not items and (persistItems or deliverPayloads): |
116 raise error.ItemRequired() | 146 raise error.ItemRequired() |
117 | 147 |
118 if persist_items or deliver_payloads: | 148 if persistItems or deliverPayloads: |
119 for item in items: | 149 for item in items: |
120 if not item.getAttribute("id"): | 150 if not item.getAttribute("id"): |
121 item["id"] = str(uuid.uuid4()) | 151 item["id"] = str(uuid.uuid4()) |
122 | 152 |
123 if persist_items: | 153 if persistItems: |
124 d = node.store_items(items, requestor) | 154 d = node.storeItems(items, requestor) |
125 else: | 155 else: |
126 d = defer.succeed(None) | 156 d = defer.succeed(None) |
127 | 157 |
128 d.addCallback(self._do_notify, node.id, items, deliver_payloads) | 158 d.addCallback(self._doNotify, node.nodeIdentifier, items, |
129 return d | 159 deliverPayloads) |
130 | 160 return d |
131 def _do_notify(self, result, node_id, items, deliver_payloads): | 161 |
132 if items and not deliver_payloads: | 162 |
163 def _doNotify(self, result, nodeIdentifier, items, deliverPayloads): | |
164 if items and not deliverPayloads: | |
133 for item in items: | 165 for item in items: |
134 item.children = [] | 166 item.children = [] |
135 | 167 |
136 self.dispatch({'items': items, 'node_id': node_id}, | 168 self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier}, |
137 '//event/pubsub/notify') | 169 '//event/pubsub/notify') |
138 | 170 |
139 def get_notification_list(self, node_id, items): | 171 |
140 d = self.storage.get_node(node_id) | 172 def getNotificationList(self, nodeIdentifier, items): |
141 d.addCallback(lambda node: node.get_subscribers()) | 173 d = self.storage.getNode(nodeIdentifier) |
142 d.addCallback(self._magic_filter, node_id, items) | 174 d.addCallback(lambda node: node.getSubscribers()) |
143 return d | 175 d.addCallback(self._magicFilter, nodeIdentifier, items) |
144 | 176 return d |
145 def _magic_filter(self, subscribers, node_id, items): | 177 |
178 | |
179 def _magicFilter(self, subscribers, nodeIdentifier, items): | |
146 list = [] | 180 list = [] |
147 for subscriber in subscribers: | 181 for subscriber in subscribers: |
148 list.append((subscriber, items)) | 182 list.append((subscriber, items)) |
149 return list | 183 return list |
150 | 184 |
151 def register_notifier(self, observerfn, *args, **kwargs): | 185 |
186 def registerNotifier(self, observerfn, *args, **kwargs): | |
152 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) | 187 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) |
153 | 188 |
154 def subscribe(self, node_id, subscriber, requestor): | 189 |
155 subscriber_entity = subscriber.userhostJID() | 190 def subscribe(self, nodeIdentifier, subscriber, requestor): |
156 if subscriber_entity != requestor: | 191 subscriberEntity = subscriber.userhostJID() |
192 if subscriberEntity != requestor: | |
157 return defer.fail(error.Forbidden()) | 193 return defer.fail(error.Forbidden()) |
158 | 194 |
159 d = self.storage.get_node(node_id) | 195 d = self.storage.getNode(nodeIdentifier) |
160 d.addCallback(_get_affiliation, subscriber_entity) | 196 d.addCallback(_getAffiliation, subscriberEntity) |
161 d.addCallback(self._do_subscribe, subscriber) | 197 d.addCallback(self._doSubscribe, subscriber) |
162 return d | 198 return d |
163 | 199 |
164 def _do_subscribe(self, result, subscriber): | 200 |
201 def _doSubscribe(self, result, subscriber): | |
165 node, affiliation = result | 202 node, affiliation = result |
166 | 203 |
167 if affiliation == 'outcast': | 204 if affiliation == 'outcast': |
168 raise error.Forbidden() | 205 raise error.Forbidden() |
169 | 206 |
170 d = node.add_subscription(subscriber, 'subscribed') | 207 d = node.addSubscription(subscriber, 'subscribed') |
171 d.addCallback(lambda _: self._send_last_published(node, subscriber)) | 208 d.addCallback(lambda _: self._sendLastPublished(node, subscriber)) |
172 d.addCallback(lambda _: 'subscribed') | 209 d.addCallback(lambda _: 'subscribed') |
173 d.addErrback(self._get_subscription, node, subscriber) | 210 d.addErrback(self._getSubscription, node, subscriber) |
174 d.addCallback(self._return_subscription, node.id) | 211 d.addCallback(self._returnSubscription, node.nodeIdentifier) |
175 return d | 212 return d |
176 | 213 |
177 def _get_subscription(self, failure, node, subscriber): | 214 |
215 def _getSubscription(self, failure, node, subscriber): | |
178 failure.trap(error.SubscriptionExists) | 216 failure.trap(error.SubscriptionExists) |
179 return node.get_subscription(subscriber) | 217 return node.getSubscription(subscriber) |
180 | 218 |
181 def _return_subscription(self, result, node_id): | 219 |
182 return node_id, result | 220 def _returnSubscription(self, result, nodeIdentifier): |
183 | 221 return nodeIdentifier, result |
184 def _send_last_published(self, node, subscriber): | 222 |
223 | |
224 def _sendLastPublished(self, node, subscriber): | |
185 class StringParser(object): | 225 class StringParser(object): |
186 def __init__(self): | 226 def __init__(self): |
187 self.elementStream = domish.elementStream() | 227 self.elementStream = domish.elementStream() |
188 self.elementStream.DocumentStartEvent = self.docStart | 228 self.elementStream.DocumentStartEvent = self.docStart |
189 self.elementStream.ElementEvent = self.elem | 229 self.elementStream.ElementEvent = self.elem |
200 | 240 |
201 def parse(self, string): | 241 def parse(self, string): |
202 self.elementStream.parse(string) | 242 self.elementStream.parse(string) |
203 return self.document | 243 return self.document |
204 | 244 |
205 def notify_item(result): | 245 def notifyItem(result): |
206 if not result: | 246 if not result: |
207 return | 247 return |
208 | 248 |
209 items = [domish.SerializedXML(item) for item in result] | 249 items = [domish.SerializedXML(item) for item in result] |
210 | 250 |
211 reactor.callLater(0, self.dispatch, {'items': items, | 251 reactor.callLater(0, self.dispatch, |
212 'node_id': node.id, | 252 {'items': items, |
213 'subscriber': subscriber}, | 253 'nodeIdentifier': node.nodeIdentifier, |
214 '//event/pubsub/notify') | 254 'subscriber': subscriber}, |
215 | 255 '//event/pubsub/notify') |
216 config = node.get_configuration() | 256 |
257 config = node.getConfiguration() | |
217 if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': | 258 if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': |
218 return | 259 return |
219 | 260 |
220 d = self.get_items(node.id, subscriber.userhostJID(), 1) | 261 d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1) |
221 d.addCallback(notify_item) | 262 d.addCallback(notifyItem) |
222 | 263 |
223 def unsubscribe(self, node_id, subscriber, requestor): | 264 |
265 def unsubscribe(self, nodeIdentifier, subscriber, requestor): | |
224 if subscriber.userhostJID() != requestor: | 266 if subscriber.userhostJID() != requestor: |
225 return defer.fail(error.Forbidden()) | 267 return defer.fail(error.Forbidden()) |
226 | 268 |
227 d = self.storage.get_node(node_id) | 269 d = self.storage.getNode(nodeIdentifier) |
228 d.addCallback(lambda node: node.remove_subscription(subscriber)) | 270 d.addCallback(lambda node: node.removeSubscription(subscriber)) |
229 return d | 271 return d |
230 | 272 |
231 def get_subscriptions(self, entity): | 273 |
232 return self.storage.get_subscriptions(entity) | 274 def getSubscriptions(self, entity): |
233 | 275 return self.storage.getSubscriptions(entity) |
234 def supports_instant_nodes(self): | 276 |
277 | |
278 def supportsInstantNodes(self): | |
235 return True | 279 return True |
236 | 280 |
237 def create_node(self, node_id, requestor): | 281 |
238 if not node_id: | 282 def createNode(self, nodeIdentifier, requestor): |
239 node_id = 'generic/%s' % uuid.uuid4() | 283 if not nodeIdentifier: |
240 d = self.storage.create_node(node_id, requestor) | 284 nodeIdentifier = 'generic/%s' % uuid.uuid4() |
241 d.addCallback(lambda _: node_id) | 285 d = self.storage.createNode(nodeIdentifier, requestor) |
242 return d | 286 d.addCallback(lambda _: nodeIdentifier) |
243 | 287 return d |
244 def get_default_configuration(self): | 288 |
245 d = defer.succeed(self.default_config) | 289 |
246 d.addCallback(self._make_config) | 290 def getDefaultConfiguration(self): |
247 return d | 291 d = defer.succeed(self.defaultConfig) |
248 | 292 d.addCallback(self._makeConfig) |
249 def get_node_configuration(self, node_id): | 293 return d |
250 if not node_id: | 294 |
295 | |
296 def getNodeConfiguration(self, nodeIdentifier): | |
297 if not nodeIdentifier: | |
251 return defer.fail(error.NoRootNode()) | 298 return defer.fail(error.NoRootNode()) |
252 | 299 |
253 d = self.storage.get_node(node_id) | 300 d = self.storage.getNode(nodeIdentifier) |
254 d.addCallback(lambda node: node.get_configuration()) | 301 d.addCallback(lambda node: node.getConfiguration()) |
255 | 302 |
256 d.addCallback(self._make_config) | 303 d.addCallback(self._makeConfig) |
257 return d | 304 return d |
258 | 305 |
259 def _make_config(self, config): | 306 |
307 def _makeConfig(self, config): | |
260 options = [] | 308 options = [] |
261 for key, value in self.options.iteritems(): | 309 for key, value in self.options.iteritems(): |
262 option = {"var": key} | 310 option = {"var": key} |
263 option.update(value) | 311 option.update(value) |
264 if config.has_key(key): | 312 if config.has_key(key): |
265 option["value"] = config[key] | 313 option["value"] = config[key] |
266 options.append(option) | 314 options.append(option) |
267 | 315 |
268 return options | 316 return options |
269 | 317 |
270 def set_node_configuration(self, node_id, options, requestor): | 318 |
271 if not node_id: | 319 def setNodeConfiguration(self, nodeIdentifier, options, requestor): |
320 if not nodeIdentifier: | |
272 return defer.fail(error.NoRootNode()) | 321 return defer.fail(error.NoRootNode()) |
273 | 322 |
274 for key, value in options.iteritems(): | 323 for key, value in options.iteritems(): |
275 if not self.options.has_key(key): | 324 if not self.options.has_key(key): |
276 return defer.fail(error.InvalidConfigurationOption()) | 325 return defer.fail(error.InvalidConfigurationOption()) |
278 try: | 327 try: |
279 options[key] = bool(int(value)) | 328 options[key] = bool(int(value)) |
280 except ValueError: | 329 except ValueError: |
281 return defer.fail(error.InvalidConfigurationValue()) | 330 return defer.fail(error.InvalidConfigurationValue()) |
282 | 331 |
283 d = self.storage.get_node(node_id) | 332 d = self.storage.getNode(nodeIdentifier) |
284 d.addCallback(_get_affiliation, requestor) | 333 d.addCallback(_getAffiliation, requestor) |
285 d.addCallback(self._do_set_node_configuration, options) | 334 d.addCallback(self._doSetNodeConfiguration, options) |
286 return d | 335 return d |
287 | 336 |
288 def _do_set_node_configuration(self, result, options): | 337 |
338 def _doSetNodeConfiguration(self, result, options): | |
289 node, affiliation = result | 339 node, affiliation = result |
290 | 340 |
291 if affiliation != 'owner': | 341 if affiliation != 'owner': |
292 raise error.Forbidden() | 342 raise error.Forbidden() |
293 | 343 |
294 return node.set_configuration(options) | 344 return node.setConfiguration(options) |
295 | 345 |
296 def get_affiliations(self, entity): | 346 |
297 return self.storage.get_affiliations(entity) | 347 def getAffiliations(self, entity): |
298 | 348 return self.storage.getAffiliations(entity) |
299 def get_items(self, node_id, requestor, max_items=None, item_ids=[]): | 349 |
300 d = self.storage.get_node(node_id) | 350 |
301 d.addCallback(_get_affiliation, requestor) | 351 def getItems(self, nodeIdentifier, requestor, maxItems=None, |
302 d.addCallback(self._do_get_items, max_items, item_ids) | 352 itemIdentifiers=None): |
303 return d | 353 d = self.storage.getNode(nodeIdentifier) |
304 | 354 d.addCallback(_getAffiliation, requestor) |
305 def _do_get_items(self, result, max_items, item_ids): | 355 d.addCallback(self._doGetItems, maxItems, itemIdentifiers) |
356 return d | |
357 | |
358 | |
359 def _doGetItems(self, result, maxItems, itemIdentifiers): | |
306 node, affiliation = result | 360 node, affiliation = result |
307 | 361 |
308 if affiliation == 'outcast': | 362 if affiliation == 'outcast': |
309 raise error.Forbidden() | 363 raise error.Forbidden() |
310 | 364 |
311 if item_ids: | 365 if itemIdentifiers: |
312 return node.get_items_by_id(item_ids) | 366 return node.getItemsById(itemIdentifiers) |
313 else: | 367 else: |
314 return node.get_items(max_items) | 368 return node.getItems(maxItems) |
315 | 369 |
316 def retract_item(self, node_id, item_ids, requestor): | 370 |
317 d = self.storage.get_node(node_id) | 371 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): |
318 d.addCallback(_get_affiliation, requestor) | 372 d = self.storage.getNode(nodeIdentifier) |
319 d.addCallback(self._do_retract, item_ids) | 373 d.addCallback(_getAffiliation, requestor) |
320 return d | 374 d.addCallback(self._doRetract, itemIdentifiers) |
321 | 375 return d |
322 def _do_retract(self, result, item_ids): | 376 |
377 | |
378 def _doRetract(self, result, itemIdentifiers): | |
323 node, affiliation = result | 379 node, affiliation = result |
324 persist_items = node.get_configuration()["pubsub#persist_items"] | 380 persistItems = node.getConfiguration()["pubsub#persist_items"] |
325 | 381 |
326 if affiliation not in ['owner', 'publisher']: | 382 if affiliation not in ['owner', 'publisher']: |
327 raise error.Forbidden() | 383 raise error.Forbidden() |
328 | 384 |
329 if not persist_items: | 385 if not persistItems: |
330 raise error.NodeNotPersistent() | 386 raise error.NodeNotPersistent() |
331 | 387 |
332 d = node.remove_items(item_ids) | 388 d = node.removeItems(itemIdentifiers) |
333 d.addCallback(self._do_notify_retraction, node.id) | 389 d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) |
334 return d | 390 return d |
335 | 391 |
336 def _do_notify_retraction(self, item_ids, node_id): | 392 |
337 self.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, | 393 def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): |
338 '//event/pubsub/retract') | 394 self.dispatch({'itemIdentifiers': itemIdentifiers, |
339 | 395 'nodeIdentifier': nodeIdentifier }, |
340 def purge_node(self, node_id, requestor): | 396 '//event/pubsub/retract') |
341 d = self.storage.get_node(node_id) | 397 |
342 d.addCallback(_get_affiliation, requestor) | 398 |
343 d.addCallback(self._do_purge) | 399 def purgeNode(self, nodeIdentifier, requestor): |
344 return d | 400 d = self.storage.getNode(nodeIdentifier) |
345 | 401 d.addCallback(_getAffiliation, requestor) |
346 def _do_purge(self, result): | 402 d.addCallback(self._doPurge) |
403 return d | |
404 | |
405 | |
406 def _doPurge(self, result): | |
347 node, affiliation = result | 407 node, affiliation = result |
348 persist_items = node.get_configuration()["pubsub#persist_items"] | 408 persistItems = node.getConfiguration()["pubsub#persist_items"] |
349 | 409 |
350 if affiliation != 'owner': | 410 if affiliation != 'owner': |
351 raise error.Forbidden() | 411 raise error.Forbidden() |
352 | 412 |
353 if not persist_items: | 413 if not persistItems: |
354 raise error.NodeNotPersistent() | 414 raise error.NodeNotPersistent() |
355 | 415 |
356 d = node.purge() | 416 d = node.purge() |
357 d.addCallback(self._do_notify_purge, node.id) | 417 d.addCallback(self._doNotifyPurge, node.nodeIdentifier) |
358 return d | 418 return d |
359 | 419 |
360 def _do_notify_purge(self, result, node_id): | 420 |
361 self.dispatch(node_id, '//event/pubsub/purge') | 421 def _doNotifyPurge(self, result, nodeIdentifier): |
362 | 422 self.dispatch(nodeIdentifier, '//event/pubsub/purge') |
363 def register_pre_delete(self, pre_delete_fn): | 423 |
364 self._callback_list.append(pre_delete_fn) | 424 |
365 | 425 def registerPreDelete(self, preDeleteFn): |
366 def get_subscribers(self, node_id): | 426 self._callbackList.append(preDeleteFn) |
367 d = self.storage.get_node(node_id) | 427 |
368 d.addCallback(lambda node: node.get_subscribers()) | 428 |
369 return d | 429 def getSubscribers(self, nodeIdentifier): |
370 | 430 d = self.storage.getNode(nodeIdentifier) |
371 def delete_node(self, node_id, requestor): | 431 d.addCallback(lambda node: node.getSubscribers()) |
372 d = self.storage.get_node(node_id) | 432 return d |
373 d.addCallback(_get_affiliation, requestor) | 433 |
374 d.addCallback(self._do_pre_delete) | 434 |
375 return d | 435 def deleteNode(self, nodeIdentifier, requestor): |
376 | 436 d = self.storage.getNode(nodeIdentifier) |
377 def _do_pre_delete(self, result): | 437 d.addCallback(_getAffiliation, requestor) |
438 d.addCallback(self._doPreDelete) | |
439 return d | |
440 | |
441 | |
442 def _doPreDelete(self, result): | |
378 node, affiliation = result | 443 node, affiliation = result |
379 | 444 |
380 if affiliation != 'owner': | 445 if affiliation != 'owner': |
381 raise error.Forbidden() | 446 raise error.Forbidden() |
382 | 447 |
383 d = defer.DeferredList([cb(node.id) for cb in self._callback_list], | 448 d = defer.DeferredList([cb(node.nodeIdentifier) |
449 for cb in self._callbackList], | |
384 consumeErrors=1) | 450 consumeErrors=1) |
385 d.addCallback(self._do_delete, node.id) | 451 d.addCallback(self._doDelete, node.nodeIdentifier) |
386 | 452 |
387 def _do_delete(self, result, node_id): | 453 |
454 def _doDelete(self, result, nodeIdentifier): | |
388 dl = [] | 455 dl = [] |
389 for succeeded, r in result: | 456 for succeeded, r in result: |
390 if succeeded and r: | 457 if succeeded and r: |
391 dl.extend(r) | 458 dl.extend(r) |
392 | 459 |
393 d = self.storage.delete_node(node_id) | 460 d = self.storage.deleteNode(nodeIdentifier) |
394 d.addCallback(self._do_notify_delete, dl) | 461 d.addCallback(self._doNotifyDelete, dl) |
395 | 462 |
396 return d | 463 return d |
397 | 464 |
398 def _do_notify_delete(self, result, dl): | 465 |
466 def _doNotifyDelete(self, result, dl): | |
399 for d in dl: | 467 for d in dl: |
400 d.callback(None) | 468 d.callback(None) |
469 | |
401 | 470 |
402 | 471 |
403 class PubSubServiceFromBackend(PubSubService): | 472 class PubSubServiceFromBackend(PubSubService): |
404 """ | 473 """ |
405 Adapts a backend to an xmpp publish-subscribe service. | 474 Adapts a backend to an xmpp publish-subscribe service. |
431 self.backend = backend | 500 self.backend = backend |
432 self.hideNodes = False | 501 self.hideNodes = False |
433 | 502 |
434 self.pubSubFeatures = self._getPubSubFeatures() | 503 self.pubSubFeatures = self._getPubSubFeatures() |
435 | 504 |
436 self.backend.register_notifier(self._notify) | 505 self.backend.registerNotifier(self._notify) |
437 self.backend.register_pre_delete(self._pre_delete) | 506 self.backend.registerPreDelete(self._preDelete) |
507 | |
438 | 508 |
439 def _getPubSubFeatures(self): | 509 def _getPubSubFeatures(self): |
440 features = [ | 510 features = [ |
441 "config-node", | 511 "config-node", |
442 "create-nodes", | 512 "create-nodes", |
452 "retrieve-items", | 522 "retrieve-items", |
453 "retrieve-subscriptions", | 523 "retrieve-subscriptions", |
454 "subscribe", | 524 "subscribe", |
455 ] | 525 ] |
456 | 526 |
457 if self.backend.supports_instant_nodes(): | 527 if self.backend.supportsInstantNodes(): |
458 features.append("instant-nodes") | 528 features.append("instant-nodes") |
459 | 529 |
460 if self.backend.supports_outcast_affiliation(): | 530 if self.backend.supportsOutcastAffiliation(): |
461 features.append("outcast-affiliation") | 531 features.append("outcast-affiliation") |
462 | 532 |
463 if self.backend.supports_persistent_items(): | 533 if self.backend.supportsPersistentItems(): |
464 features.append("persistent-items") | 534 features.append("persistent-items") |
465 | 535 |
466 if self.backend.supports_publisher_affiliation(): | 536 if self.backend.supportsPublisherAffiliation(): |
467 features.append("publisher-affiliation") | 537 features.append("publisher-affiliation") |
468 | 538 |
469 return features | 539 return features |
540 | |
470 | 541 |
471 def _notify(self, data): | 542 def _notify(self, data): |
472 items = data['items'] | 543 items = data['items'] |
473 nodeIdentifier = data['node_id'] | 544 nodeIdentifier = data['nodeIdentifier'] |
474 if 'subscriber' not in data: | 545 if 'subscriber' not in data: |
475 d = self.backend.get_notification_list(nodeIdentifier, items) | 546 d = self.backend.getNotificationList(nodeIdentifier, items) |
476 else: | 547 else: |
477 d = defer.succeed([(data['subscriber'], items)]) | 548 d = defer.succeed([(data['subscriber'], items)]) |
478 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, | 549 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, |
479 nodeIdentifier, | 550 nodeIdentifier, |
480 notifications)) | 551 notifications)) |
481 | 552 |
482 def _pre_delete(self, nodeIdentifier): | 553 |
483 d = self.backend.get_subscribers(nodeIdentifier) | 554 def _preDelete(self, nodeIdentifier): |
555 d = self.backend.getSubscribers(nodeIdentifier) | |
484 d.addCallback(lambda subscribers: self.notifyDelete(self.serviceJID, | 556 d.addCallback(lambda subscribers: self.notifyDelete(self.serviceJID, |
485 nodeIdentifier, | 557 nodeIdentifier, |
486 subscribers)) | 558 subscribers)) |
487 return d | 559 return d |
560 | |
488 | 561 |
489 def _mapErrors(self, failure): | 562 def _mapErrors(self, failure): |
490 e = failure.trap(*self._errorMap.keys()) | 563 e = failure.trap(*self._errorMap.keys()) |
491 | 564 |
492 condition, pubsubCondition, feature = self._errorMap[e] | 565 condition, pubsubCondition, feature = self._errorMap[e] |
497 else: | 570 else: |
498 exc = StanzaError(condition, text=msg) | 571 exc = StanzaError(condition, text=msg) |
499 | 572 |
500 raise exc | 573 raise exc |
501 | 574 |
575 | |
502 def getNodeInfo(self, requestor, service, nodeIdentifier): | 576 def getNodeInfo(self, requestor, service, nodeIdentifier): |
503 info = {} | 577 info = {} |
504 | 578 |
505 def saveType(result): | 579 def saveType(result): |
506 info['type'] = result | 580 info['type'] = result |
509 def saveMetaData(result): | 583 def saveMetaData(result): |
510 info['meta-data'] = result | 584 info['meta-data'] = result |
511 return info | 585 return info |
512 | 586 |
513 d = defer.succeed(nodeIdentifier) | 587 d = defer.succeed(nodeIdentifier) |
514 d.addCallback(self.backend.get_node_type) | 588 d.addCallback(self.backend.getNodeType) |
515 d.addCallback(saveType) | 589 d.addCallback(saveType) |
516 d.addCallback(self.backend.get_node_meta_data) | 590 d.addCallback(self.backend.getNodeMetaData) |
517 d.addCallback(saveMetaData) | 591 d.addCallback(saveMetaData) |
518 d.addErrback(self._mapErrors) | 592 d.addErrback(self._mapErrors) |
519 return d | 593 return d |
594 | |
520 | 595 |
521 def getNodes(self, requestor, service): | 596 def getNodes(self, requestor, service): |
522 if service.resource: | 597 if service.resource: |
523 return defer.succeed([]) | 598 return defer.succeed([]) |
524 d = self.backend.get_nodes() | 599 d = self.backend.getNodes() |
525 return d.addErrback(self._mapErrors) | 600 return d.addErrback(self._mapErrors) |
601 | |
526 | 602 |
527 def publish(self, requestor, service, nodeIdentifier, items): | 603 def publish(self, requestor, service, nodeIdentifier, items): |
528 d = self.backend.publish(nodeIdentifier, items, requestor) | 604 d = self.backend.publish(nodeIdentifier, items, requestor) |
529 return d.addErrback(self._mapErrors) | 605 return d.addErrback(self._mapErrors) |
530 | 606 |
607 | |
531 def subscribe(self, requestor, service, nodeIdentifier, subscriber): | 608 def subscribe(self, requestor, service, nodeIdentifier, subscriber): |
532 d = self.backend.subscribe(nodeIdentifier, subscriber, requestor) | 609 d = self.backend.subscribe(nodeIdentifier, subscriber, requestor) |
533 return d.addErrback(self._mapErrors) | 610 return d.addErrback(self._mapErrors) |
534 | 611 |
612 | |
535 def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): | 613 def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): |
536 d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor) | 614 d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor) |
537 return d.addErrback(self._mapErrors) | 615 return d.addErrback(self._mapErrors) |
538 | 616 |
617 | |
539 def subscriptions(self, requestor, service): | 618 def subscriptions(self, requestor, service): |
540 d = self.backend.get_subscriptions(requestor) | 619 d = self.backend.getSubscriptions(requestor) |
541 return d.addErrback(self._mapErrors) | 620 return d.addErrback(self._mapErrors) |
621 | |
542 | 622 |
543 def affiliations(self, requestor, service): | 623 def affiliations(self, requestor, service): |
544 d = self.backend.get_affiliations(requestor) | 624 d = self.backend.getAffiliations(requestor) |
545 return d.addErrback(self._mapErrors) | 625 return d.addErrback(self._mapErrors) |
626 | |
546 | 627 |
547 def create(self, requestor, service, nodeIdentifier): | 628 def create(self, requestor, service, nodeIdentifier): |
548 d = self.backend.create_node(nodeIdentifier, requestor) | 629 d = self.backend.createNode(nodeIdentifier, requestor) |
549 return d.addErrback(self._mapErrors) | 630 return d.addErrback(self._mapErrors) |
631 | |
550 | 632 |
551 def getDefaultConfiguration(self, requestor, service): | 633 def getDefaultConfiguration(self, requestor, service): |
552 d = self.backend.get_default_configuration() | 634 d = self.backend.getDefaultConfiguration() |
553 return d.addErrback(self._mapErrors) | 635 return d.addErrback(self._mapErrors) |
636 | |
554 | 637 |
555 def getConfiguration(self, requestor, service, nodeIdentifier): | 638 def getConfiguration(self, requestor, service, nodeIdentifier): |
556 d = self.backend.get_node_configuration(nodeIdentifier) | 639 d = self.backend.getNodeConfiguration(nodeIdentifier) |
557 return d.addErrback(self._mapErrors) | 640 return d.addErrback(self._mapErrors) |
641 | |
558 | 642 |
559 def setConfiguration(self, requestor, service, nodeIdentifier, options): | 643 def setConfiguration(self, requestor, service, nodeIdentifier, options): |
560 d = self.backend.set_node_configuration(nodeIdentifier, options, | 644 d = self.backend.setNodeConfiguration(nodeIdentifier, options, |
561 requestor) | 645 requestor) |
562 return d.addErrback(self._mapErrors) | 646 return d.addErrback(self._mapErrors) |
563 | 647 |
564 def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): | 648 |
565 d = self.backend.get_items(nodeIdentifier, requestor, maxItems, | 649 def items(self, requestor, service, nodeIdentifier, maxItems, |
650 itemIdentifiers): | |
651 d = self.backend.getItems(nodeIdentifier, requestor, maxItems, | |
566 itemIdentifiers) | 652 itemIdentifiers) |
567 return d.addErrback(self._mapErrors) | 653 return d.addErrback(self._mapErrors) |
568 | 654 |
655 | |
569 def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): | 656 def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): |
570 d = self.backend.retract_item(nodeIdentifier, itemIdentifiers, | 657 d = self.backend.retractItem(nodeIdentifier, itemIdentifiers, |
571 requestor) | 658 requestor) |
572 return d.addErrback(self._mapErrors) | 659 return d.addErrback(self._mapErrors) |
573 | 660 |
661 | |
574 def purge(self, requestor, service, nodeIdentifier): | 662 def purge(self, requestor, service, nodeIdentifier): |
575 d = self.backend.purge_node(nodeIdentifier, requestor) | 663 d = self.backend.purgeNode(nodeIdentifier, requestor) |
576 return d.addErrback(self._mapErrors) | 664 return d.addErrback(self._mapErrors) |
665 | |
577 | 666 |
578 def delete(self, requestor, service, nodeIdentifier): | 667 def delete(self, requestor, service, nodeIdentifier): |
579 d = self.backend.delete_node(nodeIdentifier, requestor) | 668 d = self.backend.deleteNode(nodeIdentifier, requestor) |
580 return d.addErrback(self._mapErrors) | 669 return d.addErrback(self._mapErrors) |
581 | 670 |
582 components.registerAdapter(PubSubServiceFromBackend, | 671 components.registerAdapter(PubSubServiceFromBackend, |
583 IBackendService, | 672 IBackendService, |
584 IPubSubService) | 673 IPubSubService) |