comparison idavoll/backend.py @ 198:e404775b12df

Change naming and spacing conventions to match Twisted's.
author Ralph Meijer <ralphm@ik.nu>
date Tue, 10 Jun 2008 11:31:49 +0000
parents 00a6dbfbee42
children 2189c663ba44
comparison
equal deleted inserted replaced
197:9da5a95d408d 198:e404775b12df
1 # -*- test-case-name: idavoll.test.test_backend -*- 1 # -*- test-case-name: idavoll.test.test_backend -*-
2 # 2 #
3 # Copyright (c) 2003-2008 Ralph Meijer 3 # Copyright (c) 2003-2008 Ralph Meijer
4 # See LICENSE for details. 4 # See LICENSE for details.
5
6 """
7 Generic publish-subscribe backend.
8
9 This module implements a generic publish-subscribe backend service with
10 business logic as per
11 L{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>)} that interacts with
12 a given storage facility. It also provides an adapter from the XMPP
13 publish-subscribe protocol.
14 """
5 15
6 import uuid 16 import uuid
7 17
8 from zope.interface import implements 18 from zope.interface import implements
9 19
17 from wokkel.pubsub import PubSubService, PubSubError 27 from wokkel.pubsub import PubSubService, PubSubError
18 28
19 from idavoll import error, iidavoll 29 from idavoll import error, iidavoll
20 from idavoll.iidavoll import IBackendService 30 from idavoll.iidavoll import IBackendService
21 31
22 def _get_affiliation(node, entity): 32 def _getAffiliation(node, entity):
23 d = node.get_affiliation(entity) 33 d = node.getAffiliation(entity)
24 d.addCallback(lambda affiliation: (node, affiliation)) 34 d.addCallback(lambda affiliation: (node, affiliation))
25 return d 35 return d
26 36
27 37
38
28 class BackendService(service.Service, utility.EventDispatcher): 39 class BackendService(service.Service, utility.EventDispatcher):
40 """
41 Generic publish-subscribe backend service.
42
43 @cvar options: Node configuration form as a mapping from the field
44 name to a dictionary that holds the field's type,
45 label and possible options to choose from.
46 @type options: C{dict}.
47 @cvar defaultConfig: The default node configuration.
48 """
29 49
30 implements(iidavoll.IBackendService) 50 implements(iidavoll.IBackendService)
31 51
32 options = {"pubsub#persist_items": 52 options = {"pubsub#persist_items":
33 {"type": "boolean", 53 {"type": "boolean",
43 "on_sub": "When a new subscription is processed", 63 "on_sub": "When a new subscription is processed",
44 } 64 }
45 }, 65 },
46 } 66 }
47 67
48 default_config = {"pubsub#persist_items": True, 68 defaultConfig = {"pubsub#persist_items": True,
49 "pubsub#deliver_payloads": True, 69 "pubsub#deliver_payloads": True,
50 "pubsub#send_last_published_item": 'on_sub', 70 "pubsub#send_last_published_item": 'on_sub',
51 } 71 }
52 72
53 def __init__(self, storage): 73 def __init__(self, storage):
54 utility.EventDispatcher.__init__(self) 74 utility.EventDispatcher.__init__(self)
55 self.storage = storage 75 self.storage = storage
56 self._callback_list = [] 76 self._callbackList = []
57 77
58 def supports_publisher_affiliation(self): 78
79 def supportsPublisherAffiliation(self):
59 return True 80 return True
60 81
61 def supports_outcast_affiliation(self): 82
83 def supportsOutcastAffiliation(self):
62 return True 84 return True
63 85
64 def supports_persistent_items(self): 86
87 def supportsPersistentItems(self):
65 return True 88 return True
66 89
67 def get_node_type(self, node_id): 90
68 d = self.storage.get_node(node_id) 91 def getNodeType(self, nodeIdentifier):
69 d.addCallback(lambda node: node.get_type()) 92 d = self.storage.getNode(nodeIdentifier)
70 return d 93 d.addCallback(lambda node: node.getType())
71 94 return d
72 def get_nodes(self): 95
73 return self.storage.get_node_ids() 96
74 97 def getNodes(self):
75 def get_node_meta_data(self, node_id): 98 return self.storage.getNodeIds()
76 d = self.storage.get_node(node_id) 99
77 d.addCallback(lambda node: node.get_meta_data()) 100
78 d.addCallback(self._make_meta_data) 101 def getNodeMetaData(self, nodeIdentifier):
79 return d 102 d = self.storage.getNode(nodeIdentifier)
80 103 d.addCallback(lambda node: node.getMetaData())
81 def _make_meta_data(self, meta_data): 104 d.addCallback(self._makeMetaData)
105 return d
106
107
108 def _makeMetaData(self, metaData):
82 options = [] 109 options = []
83 for key, value in meta_data.iteritems(): 110 for key, value in metaData.iteritems():
84 if self.options.has_key(key): 111 if self.options.has_key(key):
85 option = {"var": key} 112 option = {"var": key}
86 option.update(self.options[key]) 113 option.update(self.options[key])
87 option["value"] = value 114 option["value"] = value
88 options.append(option) 115 options.append(option)
89 116
90 return options 117 return options
91 118
92 def _check_auth(self, node, requestor): 119
120 def _checkAuth(self, node, requestor):
93 def check(affiliation, node): 121 def check(affiliation, node):
94 if affiliation not in ['owner', 'publisher']: 122 if affiliation not in ['owner', 'publisher']:
95 raise error.Forbidden() 123 raise error.Forbidden()
96 return node 124 return node
97 125
98 d = node.get_affiliation(requestor) 126 d = node.getAffiliation(requestor)
99 d.addCallback(check, node) 127 d.addCallback(check, node)
100 return d 128 return d
101 129
102 def publish(self, node_id, items, requestor): 130
103 d = self.storage.get_node(node_id) 131 def publish(self, nodeIdentifier, items, requestor):
104 d.addCallback(self._check_auth, requestor) 132 d = self.storage.getNode(nodeIdentifier)
105 d.addCallback(self._do_publish, items, requestor) 133 d.addCallback(self._checkAuth, requestor)
106 return d 134 d.addCallback(self._doPublish, items, requestor)
107 135 return d
108 def _do_publish(self, node, items, requestor): 136
109 configuration = node.get_configuration() 137
110 persist_items = configuration["pubsub#persist_items"] 138 def _doPublish(self, node, items, requestor):
111 deliver_payloads = configuration["pubsub#deliver_payloads"] 139 configuration = node.getConfiguration()
112 140 persistItems = configuration["pubsub#persist_items"]
113 if items and not persist_items and not deliver_payloads: 141 deliverPayloads = configuration["pubsub#deliver_payloads"]
142
143 if items and not persistItems and not deliverPayloads:
114 raise error.ItemForbidden() 144 raise error.ItemForbidden()
115 elif not items and (persist_items or deliver_payloads): 145 elif not items and (persistItems or deliverPayloads):
116 raise error.ItemRequired() 146 raise error.ItemRequired()
117 147
118 if persist_items or deliver_payloads: 148 if persistItems or deliverPayloads:
119 for item in items: 149 for item in items:
120 if not item.getAttribute("id"): 150 if not item.getAttribute("id"):
121 item["id"] = str(uuid.uuid4()) 151 item["id"] = str(uuid.uuid4())
122 152
123 if persist_items: 153 if persistItems:
124 d = node.store_items(items, requestor) 154 d = node.storeItems(items, requestor)
125 else: 155 else:
126 d = defer.succeed(None) 156 d = defer.succeed(None)
127 157
128 d.addCallback(self._do_notify, node.id, items, deliver_payloads) 158 d.addCallback(self._doNotify, node.nodeIdentifier, items,
129 return d 159 deliverPayloads)
130 160 return d
131 def _do_notify(self, result, node_id, items, deliver_payloads): 161
132 if items and not deliver_payloads: 162
163 def _doNotify(self, result, nodeIdentifier, items, deliverPayloads):
164 if items and not deliverPayloads:
133 for item in items: 165 for item in items:
134 item.children = [] 166 item.children = []
135 167
136 self.dispatch({'items': items, 'node_id': node_id}, 168 self.dispatch({'items': items, 'nodeIdentifier': nodeIdentifier},
137 '//event/pubsub/notify') 169 '//event/pubsub/notify')
138 170
139 def get_notification_list(self, node_id, items): 171
140 d = self.storage.get_node(node_id) 172 def getNotificationList(self, nodeIdentifier, items):
141 d.addCallback(lambda node: node.get_subscribers()) 173 d = self.storage.getNode(nodeIdentifier)
142 d.addCallback(self._magic_filter, node_id, items) 174 d.addCallback(lambda node: node.getSubscribers())
143 return d 175 d.addCallback(self._magicFilter, nodeIdentifier, items)
144 176 return d
145 def _magic_filter(self, subscribers, node_id, items): 177
178
179 def _magicFilter(self, subscribers, nodeIdentifier, items):
146 list = [] 180 list = []
147 for subscriber in subscribers: 181 for subscriber in subscribers:
148 list.append((subscriber, items)) 182 list.append((subscriber, items))
149 return list 183 return list
150 184
151 def register_notifier(self, observerfn, *args, **kwargs): 185
186 def registerNotifier(self, observerfn, *args, **kwargs):
152 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) 187 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
153 188
154 def subscribe(self, node_id, subscriber, requestor): 189
155 subscriber_entity = subscriber.userhostJID() 190 def subscribe(self, nodeIdentifier, subscriber, requestor):
156 if subscriber_entity != requestor: 191 subscriberEntity = subscriber.userhostJID()
192 if subscriberEntity != requestor:
157 return defer.fail(error.Forbidden()) 193 return defer.fail(error.Forbidden())
158 194
159 d = self.storage.get_node(node_id) 195 d = self.storage.getNode(nodeIdentifier)
160 d.addCallback(_get_affiliation, subscriber_entity) 196 d.addCallback(_getAffiliation, subscriberEntity)
161 d.addCallback(self._do_subscribe, subscriber) 197 d.addCallback(self._doSubscribe, subscriber)
162 return d 198 return d
163 199
164 def _do_subscribe(self, result, subscriber): 200
201 def _doSubscribe(self, result, subscriber):
165 node, affiliation = result 202 node, affiliation = result
166 203
167 if affiliation == 'outcast': 204 if affiliation == 'outcast':
168 raise error.Forbidden() 205 raise error.Forbidden()
169 206
170 d = node.add_subscription(subscriber, 'subscribed') 207 d = node.addSubscription(subscriber, 'subscribed')
171 d.addCallback(lambda _: self._send_last_published(node, subscriber)) 208 d.addCallback(lambda _: self._sendLastPublished(node, subscriber))
172 d.addCallback(lambda _: 'subscribed') 209 d.addCallback(lambda _: 'subscribed')
173 d.addErrback(self._get_subscription, node, subscriber) 210 d.addErrback(self._getSubscription, node, subscriber)
174 d.addCallback(self._return_subscription, node.id) 211 d.addCallback(self._returnSubscription, node.nodeIdentifier)
175 return d 212 return d
176 213
177 def _get_subscription(self, failure, node, subscriber): 214
215 def _getSubscription(self, failure, node, subscriber):
178 failure.trap(error.SubscriptionExists) 216 failure.trap(error.SubscriptionExists)
179 return node.get_subscription(subscriber) 217 return node.getSubscription(subscriber)
180 218
181 def _return_subscription(self, result, node_id): 219
182 return node_id, result 220 def _returnSubscription(self, result, nodeIdentifier):
183 221 return nodeIdentifier, result
184 def _send_last_published(self, node, subscriber): 222
223
224 def _sendLastPublished(self, node, subscriber):
185 class StringParser(object): 225 class StringParser(object):
186 def __init__(self): 226 def __init__(self):
187 self.elementStream = domish.elementStream() 227 self.elementStream = domish.elementStream()
188 self.elementStream.DocumentStartEvent = self.docStart 228 self.elementStream.DocumentStartEvent = self.docStart
189 self.elementStream.ElementEvent = self.elem 229 self.elementStream.ElementEvent = self.elem
200 240
201 def parse(self, string): 241 def parse(self, string):
202 self.elementStream.parse(string) 242 self.elementStream.parse(string)
203 return self.document 243 return self.document
204 244
205 def notify_item(result): 245 def notifyItem(result):
206 if not result: 246 if not result:
207 return 247 return
208 248
209 items = [domish.SerializedXML(item) for item in result] 249 items = [domish.SerializedXML(item) for item in result]
210 250
211 reactor.callLater(0, self.dispatch, {'items': items, 251 reactor.callLater(0, self.dispatch,
212 'node_id': node.id, 252 {'items': items,
213 'subscriber': subscriber}, 253 'nodeIdentifier': node.nodeIdentifier,
214 '//event/pubsub/notify') 254 'subscriber': subscriber},
215 255 '//event/pubsub/notify')
216 config = node.get_configuration() 256
257 config = node.getConfiguration()
217 if config.get("pubsub#send_last_published_item", 'never') != 'on_sub': 258 if config.get("pubsub#send_last_published_item", 'never') != 'on_sub':
218 return 259 return
219 260
220 d = self.get_items(node.id, subscriber.userhostJID(), 1) 261 d = self.getItems(node.nodeIdentifier, subscriber.userhostJID(), 1)
221 d.addCallback(notify_item) 262 d.addCallback(notifyItem)
222 263
223 def unsubscribe(self, node_id, subscriber, requestor): 264
265 def unsubscribe(self, nodeIdentifier, subscriber, requestor):
224 if subscriber.userhostJID() != requestor: 266 if subscriber.userhostJID() != requestor:
225 return defer.fail(error.Forbidden()) 267 return defer.fail(error.Forbidden())
226 268
227 d = self.storage.get_node(node_id) 269 d = self.storage.getNode(nodeIdentifier)
228 d.addCallback(lambda node: node.remove_subscription(subscriber)) 270 d.addCallback(lambda node: node.removeSubscription(subscriber))
229 return d 271 return d
230 272
231 def get_subscriptions(self, entity): 273
232 return self.storage.get_subscriptions(entity) 274 def getSubscriptions(self, entity):
233 275 return self.storage.getSubscriptions(entity)
234 def supports_instant_nodes(self): 276
277
278 def supportsInstantNodes(self):
235 return True 279 return True
236 280
237 def create_node(self, node_id, requestor): 281
238 if not node_id: 282 def createNode(self, nodeIdentifier, requestor):
239 node_id = 'generic/%s' % uuid.uuid4() 283 if not nodeIdentifier:
240 d = self.storage.create_node(node_id, requestor) 284 nodeIdentifier = 'generic/%s' % uuid.uuid4()
241 d.addCallback(lambda _: node_id) 285 d = self.storage.createNode(nodeIdentifier, requestor)
242 return d 286 d.addCallback(lambda _: nodeIdentifier)
243 287 return d
244 def get_default_configuration(self): 288
245 d = defer.succeed(self.default_config) 289
246 d.addCallback(self._make_config) 290 def getDefaultConfiguration(self):
247 return d 291 d = defer.succeed(self.defaultConfig)
248 292 d.addCallback(self._makeConfig)
249 def get_node_configuration(self, node_id): 293 return d
250 if not node_id: 294
295
296 def getNodeConfiguration(self, nodeIdentifier):
297 if not nodeIdentifier:
251 return defer.fail(error.NoRootNode()) 298 return defer.fail(error.NoRootNode())
252 299
253 d = self.storage.get_node(node_id) 300 d = self.storage.getNode(nodeIdentifier)
254 d.addCallback(lambda node: node.get_configuration()) 301 d.addCallback(lambda node: node.getConfiguration())
255 302
256 d.addCallback(self._make_config) 303 d.addCallback(self._makeConfig)
257 return d 304 return d
258 305
259 def _make_config(self, config): 306
307 def _makeConfig(self, config):
260 options = [] 308 options = []
261 for key, value in self.options.iteritems(): 309 for key, value in self.options.iteritems():
262 option = {"var": key} 310 option = {"var": key}
263 option.update(value) 311 option.update(value)
264 if config.has_key(key): 312 if config.has_key(key):
265 option["value"] = config[key] 313 option["value"] = config[key]
266 options.append(option) 314 options.append(option)
267 315
268 return options 316 return options
269 317
270 def set_node_configuration(self, node_id, options, requestor): 318
271 if not node_id: 319 def setNodeConfiguration(self, nodeIdentifier, options, requestor):
320 if not nodeIdentifier:
272 return defer.fail(error.NoRootNode()) 321 return defer.fail(error.NoRootNode())
273 322
274 for key, value in options.iteritems(): 323 for key, value in options.iteritems():
275 if not self.options.has_key(key): 324 if not self.options.has_key(key):
276 return defer.fail(error.InvalidConfigurationOption()) 325 return defer.fail(error.InvalidConfigurationOption())
278 try: 327 try:
279 options[key] = bool(int(value)) 328 options[key] = bool(int(value))
280 except ValueError: 329 except ValueError:
281 return defer.fail(error.InvalidConfigurationValue()) 330 return defer.fail(error.InvalidConfigurationValue())
282 331
283 d = self.storage.get_node(node_id) 332 d = self.storage.getNode(nodeIdentifier)
284 d.addCallback(_get_affiliation, requestor) 333 d.addCallback(_getAffiliation, requestor)
285 d.addCallback(self._do_set_node_configuration, options) 334 d.addCallback(self._doSetNodeConfiguration, options)
286 return d 335 return d
287 336
288 def _do_set_node_configuration(self, result, options): 337
338 def _doSetNodeConfiguration(self, result, options):
289 node, affiliation = result 339 node, affiliation = result
290 340
291 if affiliation != 'owner': 341 if affiliation != 'owner':
292 raise error.Forbidden() 342 raise error.Forbidden()
293 343
294 return node.set_configuration(options) 344 return node.setConfiguration(options)
295 345
296 def get_affiliations(self, entity): 346
297 return self.storage.get_affiliations(entity) 347 def getAffiliations(self, entity):
298 348 return self.storage.getAffiliations(entity)
299 def get_items(self, node_id, requestor, max_items=None, item_ids=[]): 349
300 d = self.storage.get_node(node_id) 350
301 d.addCallback(_get_affiliation, requestor) 351 def getItems(self, nodeIdentifier, requestor, maxItems=None,
302 d.addCallback(self._do_get_items, max_items, item_ids) 352 itemIdentifiers=None):
303 return d 353 d = self.storage.getNode(nodeIdentifier)
304 354 d.addCallback(_getAffiliation, requestor)
305 def _do_get_items(self, result, max_items, item_ids): 355 d.addCallback(self._doGetItems, maxItems, itemIdentifiers)
356 return d
357
358
359 def _doGetItems(self, result, maxItems, itemIdentifiers):
306 node, affiliation = result 360 node, affiliation = result
307 361
308 if affiliation == 'outcast': 362 if affiliation == 'outcast':
309 raise error.Forbidden() 363 raise error.Forbidden()
310 364
311 if item_ids: 365 if itemIdentifiers:
312 return node.get_items_by_id(item_ids) 366 return node.getItemsById(itemIdentifiers)
313 else: 367 else:
314 return node.get_items(max_items) 368 return node.getItems(maxItems)
315 369
316 def retract_item(self, node_id, item_ids, requestor): 370
317 d = self.storage.get_node(node_id) 371 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor):
318 d.addCallback(_get_affiliation, requestor) 372 d = self.storage.getNode(nodeIdentifier)
319 d.addCallback(self._do_retract, item_ids) 373 d.addCallback(_getAffiliation, requestor)
320 return d 374 d.addCallback(self._doRetract, itemIdentifiers)
321 375 return d
322 def _do_retract(self, result, item_ids): 376
377
378 def _doRetract(self, result, itemIdentifiers):
323 node, affiliation = result 379 node, affiliation = result
324 persist_items = node.get_configuration()["pubsub#persist_items"] 380 persistItems = node.getConfiguration()["pubsub#persist_items"]
325 381
326 if affiliation not in ['owner', 'publisher']: 382 if affiliation not in ['owner', 'publisher']:
327 raise error.Forbidden() 383 raise error.Forbidden()
328 384
329 if not persist_items: 385 if not persistItems:
330 raise error.NodeNotPersistent() 386 raise error.NodeNotPersistent()
331 387
332 d = node.remove_items(item_ids) 388 d = node.removeItems(itemIdentifiers)
333 d.addCallback(self._do_notify_retraction, node.id) 389 d.addCallback(self._doNotifyRetraction, node.nodeIdentifier)
334 return d 390 return d
335 391
336 def _do_notify_retraction(self, item_ids, node_id): 392
337 self.dispatch({ 'item_ids': item_ids, 'node_id': node_id }, 393 def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier):
338 '//event/pubsub/retract') 394 self.dispatch({'itemIdentifiers': itemIdentifiers,
339 395 'nodeIdentifier': nodeIdentifier },
340 def purge_node(self, node_id, requestor): 396 '//event/pubsub/retract')
341 d = self.storage.get_node(node_id) 397
342 d.addCallback(_get_affiliation, requestor) 398
343 d.addCallback(self._do_purge) 399 def purgeNode(self, nodeIdentifier, requestor):
344 return d 400 d = self.storage.getNode(nodeIdentifier)
345 401 d.addCallback(_getAffiliation, requestor)
346 def _do_purge(self, result): 402 d.addCallback(self._doPurge)
403 return d
404
405
406 def _doPurge(self, result):
347 node, affiliation = result 407 node, affiliation = result
348 persist_items = node.get_configuration()["pubsub#persist_items"] 408 persistItems = node.getConfiguration()["pubsub#persist_items"]
349 409
350 if affiliation != 'owner': 410 if affiliation != 'owner':
351 raise error.Forbidden() 411 raise error.Forbidden()
352 412
353 if not persist_items: 413 if not persistItems:
354 raise error.NodeNotPersistent() 414 raise error.NodeNotPersistent()
355 415
356 d = node.purge() 416 d = node.purge()
357 d.addCallback(self._do_notify_purge, node.id) 417 d.addCallback(self._doNotifyPurge, node.nodeIdentifier)
358 return d 418 return d
359 419
360 def _do_notify_purge(self, result, node_id): 420
361 self.dispatch(node_id, '//event/pubsub/purge') 421 def _doNotifyPurge(self, result, nodeIdentifier):
362 422 self.dispatch(nodeIdentifier, '//event/pubsub/purge')
363 def register_pre_delete(self, pre_delete_fn): 423
364 self._callback_list.append(pre_delete_fn) 424
365 425 def registerPreDelete(self, preDeleteFn):
366 def get_subscribers(self, node_id): 426 self._callbackList.append(preDeleteFn)
367 d = self.storage.get_node(node_id) 427
368 d.addCallback(lambda node: node.get_subscribers()) 428
369 return d 429 def getSubscribers(self, nodeIdentifier):
370 430 d = self.storage.getNode(nodeIdentifier)
371 def delete_node(self, node_id, requestor): 431 d.addCallback(lambda node: node.getSubscribers())
372 d = self.storage.get_node(node_id) 432 return d
373 d.addCallback(_get_affiliation, requestor) 433
374 d.addCallback(self._do_pre_delete) 434
375 return d 435 def deleteNode(self, nodeIdentifier, requestor):
376 436 d = self.storage.getNode(nodeIdentifier)
377 def _do_pre_delete(self, result): 437 d.addCallback(_getAffiliation, requestor)
438 d.addCallback(self._doPreDelete)
439 return d
440
441
442 def _doPreDelete(self, result):
378 node, affiliation = result 443 node, affiliation = result
379 444
380 if affiliation != 'owner': 445 if affiliation != 'owner':
381 raise error.Forbidden() 446 raise error.Forbidden()
382 447
383 d = defer.DeferredList([cb(node.id) for cb in self._callback_list], 448 d = defer.DeferredList([cb(node.nodeIdentifier)
449 for cb in self._callbackList],
384 consumeErrors=1) 450 consumeErrors=1)
385 d.addCallback(self._do_delete, node.id) 451 d.addCallback(self._doDelete, node.nodeIdentifier)
386 452
387 def _do_delete(self, result, node_id): 453
454 def _doDelete(self, result, nodeIdentifier):
388 dl = [] 455 dl = []
389 for succeeded, r in result: 456 for succeeded, r in result:
390 if succeeded and r: 457 if succeeded and r:
391 dl.extend(r) 458 dl.extend(r)
392 459
393 d = self.storage.delete_node(node_id) 460 d = self.storage.deleteNode(nodeIdentifier)
394 d.addCallback(self._do_notify_delete, dl) 461 d.addCallback(self._doNotifyDelete, dl)
395 462
396 return d 463 return d
397 464
398 def _do_notify_delete(self, result, dl): 465
466 def _doNotifyDelete(self, result, dl):
399 for d in dl: 467 for d in dl:
400 d.callback(None) 468 d.callback(None)
469
401 470
402 471
403 class PubSubServiceFromBackend(PubSubService): 472 class PubSubServiceFromBackend(PubSubService):
404 """ 473 """
405 Adapts a backend to an xmpp publish-subscribe service. 474 Adapts a backend to an xmpp publish-subscribe service.
431 self.backend = backend 500 self.backend = backend
432 self.hideNodes = False 501 self.hideNodes = False
433 502
434 self.pubSubFeatures = self._getPubSubFeatures() 503 self.pubSubFeatures = self._getPubSubFeatures()
435 504
436 self.backend.register_notifier(self._notify) 505 self.backend.registerNotifier(self._notify)
437 self.backend.register_pre_delete(self._pre_delete) 506 self.backend.registerPreDelete(self._preDelete)
507
438 508
439 def _getPubSubFeatures(self): 509 def _getPubSubFeatures(self):
440 features = [ 510 features = [
441 "config-node", 511 "config-node",
442 "create-nodes", 512 "create-nodes",
452 "retrieve-items", 522 "retrieve-items",
453 "retrieve-subscriptions", 523 "retrieve-subscriptions",
454 "subscribe", 524 "subscribe",
455 ] 525 ]
456 526
457 if self.backend.supports_instant_nodes(): 527 if self.backend.supportsInstantNodes():
458 features.append("instant-nodes") 528 features.append("instant-nodes")
459 529
460 if self.backend.supports_outcast_affiliation(): 530 if self.backend.supportsOutcastAffiliation():
461 features.append("outcast-affiliation") 531 features.append("outcast-affiliation")
462 532
463 if self.backend.supports_persistent_items(): 533 if self.backend.supportsPersistentItems():
464 features.append("persistent-items") 534 features.append("persistent-items")
465 535
466 if self.backend.supports_publisher_affiliation(): 536 if self.backend.supportsPublisherAffiliation():
467 features.append("publisher-affiliation") 537 features.append("publisher-affiliation")
468 538
469 return features 539 return features
540
470 541
471 def _notify(self, data): 542 def _notify(self, data):
472 items = data['items'] 543 items = data['items']
473 nodeIdentifier = data['node_id'] 544 nodeIdentifier = data['nodeIdentifier']
474 if 'subscriber' not in data: 545 if 'subscriber' not in data:
475 d = self.backend.get_notification_list(nodeIdentifier, items) 546 d = self.backend.getNotificationList(nodeIdentifier, items)
476 else: 547 else:
477 d = defer.succeed([(data['subscriber'], items)]) 548 d = defer.succeed([(data['subscriber'], items)])
478 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID, 549 d.addCallback(lambda notifications: self.notifyPublish(self.serviceJID,
479 nodeIdentifier, 550 nodeIdentifier,
480 notifications)) 551 notifications))
481 552
482 def _pre_delete(self, nodeIdentifier): 553
483 d = self.backend.get_subscribers(nodeIdentifier) 554 def _preDelete(self, nodeIdentifier):
555 d = self.backend.getSubscribers(nodeIdentifier)
484 d.addCallback(lambda subscribers: self.notifyDelete(self.serviceJID, 556 d.addCallback(lambda subscribers: self.notifyDelete(self.serviceJID,
485 nodeIdentifier, 557 nodeIdentifier,
486 subscribers)) 558 subscribers))
487 return d 559 return d
560
488 561
489 def _mapErrors(self, failure): 562 def _mapErrors(self, failure):
490 e = failure.trap(*self._errorMap.keys()) 563 e = failure.trap(*self._errorMap.keys())
491 564
492 condition, pubsubCondition, feature = self._errorMap[e] 565 condition, pubsubCondition, feature = self._errorMap[e]
497 else: 570 else:
498 exc = StanzaError(condition, text=msg) 571 exc = StanzaError(condition, text=msg)
499 572
500 raise exc 573 raise exc
501 574
575
502 def getNodeInfo(self, requestor, service, nodeIdentifier): 576 def getNodeInfo(self, requestor, service, nodeIdentifier):
503 info = {} 577 info = {}
504 578
505 def saveType(result): 579 def saveType(result):
506 info['type'] = result 580 info['type'] = result
509 def saveMetaData(result): 583 def saveMetaData(result):
510 info['meta-data'] = result 584 info['meta-data'] = result
511 return info 585 return info
512 586
513 d = defer.succeed(nodeIdentifier) 587 d = defer.succeed(nodeIdentifier)
514 d.addCallback(self.backend.get_node_type) 588 d.addCallback(self.backend.getNodeType)
515 d.addCallback(saveType) 589 d.addCallback(saveType)
516 d.addCallback(self.backend.get_node_meta_data) 590 d.addCallback(self.backend.getNodeMetaData)
517 d.addCallback(saveMetaData) 591 d.addCallback(saveMetaData)
518 d.addErrback(self._mapErrors) 592 d.addErrback(self._mapErrors)
519 return d 593 return d
594
520 595
521 def getNodes(self, requestor, service): 596 def getNodes(self, requestor, service):
522 if service.resource: 597 if service.resource:
523 return defer.succeed([]) 598 return defer.succeed([])
524 d = self.backend.get_nodes() 599 d = self.backend.getNodes()
525 return d.addErrback(self._mapErrors) 600 return d.addErrback(self._mapErrors)
601
526 602
527 def publish(self, requestor, service, nodeIdentifier, items): 603 def publish(self, requestor, service, nodeIdentifier, items):
528 d = self.backend.publish(nodeIdentifier, items, requestor) 604 d = self.backend.publish(nodeIdentifier, items, requestor)
529 return d.addErrback(self._mapErrors) 605 return d.addErrback(self._mapErrors)
530 606
607
531 def subscribe(self, requestor, service, nodeIdentifier, subscriber): 608 def subscribe(self, requestor, service, nodeIdentifier, subscriber):
532 d = self.backend.subscribe(nodeIdentifier, subscriber, requestor) 609 d = self.backend.subscribe(nodeIdentifier, subscriber, requestor)
533 return d.addErrback(self._mapErrors) 610 return d.addErrback(self._mapErrors)
534 611
612
535 def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): 613 def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
536 d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor) 614 d = self.backend.unsubscribe(nodeIdentifier, subscriber, requestor)
537 return d.addErrback(self._mapErrors) 615 return d.addErrback(self._mapErrors)
538 616
617
539 def subscriptions(self, requestor, service): 618 def subscriptions(self, requestor, service):
540 d = self.backend.get_subscriptions(requestor) 619 d = self.backend.getSubscriptions(requestor)
541 return d.addErrback(self._mapErrors) 620 return d.addErrback(self._mapErrors)
621
542 622
543 def affiliations(self, requestor, service): 623 def affiliations(self, requestor, service):
544 d = self.backend.get_affiliations(requestor) 624 d = self.backend.getAffiliations(requestor)
545 return d.addErrback(self._mapErrors) 625 return d.addErrback(self._mapErrors)
626
546 627
547 def create(self, requestor, service, nodeIdentifier): 628 def create(self, requestor, service, nodeIdentifier):
548 d = self.backend.create_node(nodeIdentifier, requestor) 629 d = self.backend.createNode(nodeIdentifier, requestor)
549 return d.addErrback(self._mapErrors) 630 return d.addErrback(self._mapErrors)
631
550 632
551 def getDefaultConfiguration(self, requestor, service): 633 def getDefaultConfiguration(self, requestor, service):
552 d = self.backend.get_default_configuration() 634 d = self.backend.getDefaultConfiguration()
553 return d.addErrback(self._mapErrors) 635 return d.addErrback(self._mapErrors)
636
554 637
555 def getConfiguration(self, requestor, service, nodeIdentifier): 638 def getConfiguration(self, requestor, service, nodeIdentifier):
556 d = self.backend.get_node_configuration(nodeIdentifier) 639 d = self.backend.getNodeConfiguration(nodeIdentifier)
557 return d.addErrback(self._mapErrors) 640 return d.addErrback(self._mapErrors)
641
558 642
559 def setConfiguration(self, requestor, service, nodeIdentifier, options): 643 def setConfiguration(self, requestor, service, nodeIdentifier, options):
560 d = self.backend.set_node_configuration(nodeIdentifier, options, 644 d = self.backend.setNodeConfiguration(nodeIdentifier, options,
561 requestor) 645 requestor)
562 return d.addErrback(self._mapErrors) 646 return d.addErrback(self._mapErrors)
563 647
564 def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers): 648
565 d = self.backend.get_items(nodeIdentifier, requestor, maxItems, 649 def items(self, requestor, service, nodeIdentifier, maxItems,
650 itemIdentifiers):
651 d = self.backend.getItems(nodeIdentifier, requestor, maxItems,
566 itemIdentifiers) 652 itemIdentifiers)
567 return d.addErrback(self._mapErrors) 653 return d.addErrback(self._mapErrors)
568 654
655
569 def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): 656 def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
570 d = self.backend.retract_item(nodeIdentifier, itemIdentifiers, 657 d = self.backend.retractItem(nodeIdentifier, itemIdentifiers,
571 requestor) 658 requestor)
572 return d.addErrback(self._mapErrors) 659 return d.addErrback(self._mapErrors)
573 660
661
574 def purge(self, requestor, service, nodeIdentifier): 662 def purge(self, requestor, service, nodeIdentifier):
575 d = self.backend.purge_node(nodeIdentifier, requestor) 663 d = self.backend.purgeNode(nodeIdentifier, requestor)
576 return d.addErrback(self._mapErrors) 664 return d.addErrback(self._mapErrors)
665
577 666
578 def delete(self, requestor, service, nodeIdentifier): 667 def delete(self, requestor, service, nodeIdentifier):
579 d = self.backend.delete_node(nodeIdentifier, requestor) 668 d = self.backend.deleteNode(nodeIdentifier, requestor)
580 return d.addErrback(self._mapErrors) 669 return d.addErrback(self._mapErrors)
581 670
582 components.registerAdapter(PubSubServiceFromBackend, 671 components.registerAdapter(PubSubServiceFromBackend,
583 IBackendService, 672 IBackendService,
584 IPubSubService) 673 IPubSubService)