Mercurial > libervia-pubsub
comparison sat_pubsub/backend.py @ 294:df1edebb0466
PEP implementation, draft (huge patch sorry):
/!\ database schema has changed ! /!\
- whole PEP behaviour is not managed yet
- if the stanza is delegated, PEP is assumed
- fixed potential SQL injection in pgsql_storage
- publish notifications manage PEP
- added retract notifications (if "notify" attribute is present), with PEP handling
- a publisher can't replace an item he didn't publised anymore
- /!\ schema has changed, sat_pubsub_update_0_1.sql update it
- sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 16 Aug 2015 01:32:42 +0200 |
parents | 9f612fa19eea |
children | 6ce33757d21e |
comparison
equal
deleted
inserted
replaced
293:b96a4ac25f8b | 294:df1edebb0466 |
---|---|
68 | 68 |
69 from twisted.application import service | 69 from twisted.application import service |
70 from twisted.python import components, log | 70 from twisted.python import components, log |
71 from twisted.internet import defer, reactor | 71 from twisted.internet import defer, reactor |
72 from twisted.words.protocols.jabber.error import StanzaError | 72 from twisted.words.protocols.jabber.error import StanzaError |
73 from twisted.words.protocols.jabber.jid import JID, InvalidFormat | 73 # from twisted.words.protocols.jabber.jid import JID, InvalidFormat |
74 from twisted.words.xish import utility | 74 from twisted.words.xish import utility |
75 | 75 |
76 from wokkel import disco, data_form, rsm | 76 from wokkel import disco, data_form, rsm |
77 from wokkel.iwokkel import IPubSubResource | 77 from wokkel.iwokkel import IPubSubResource |
78 from wokkel.pubsub import PubSubResource, PubSubError, Subscription | 78 from wokkel.pubsub import PubSubResource, PubSubError, Subscription |
101 """ | 101 """ |
102 | 102 |
103 implements(iidavoll.IBackendService) | 103 implements(iidavoll.IBackendService) |
104 | 104 |
105 nodeOptions = { | 105 nodeOptions = { |
106 "pubsub#persist_items": | 106 const.OPT_PERSIST_ITEMS: |
107 {"type": "boolean", | 107 {"type": "boolean", |
108 "label": "Persist items to storage"}, | 108 "label": "Persist items to storage"}, |
109 "pubsub#deliver_payloads": | 109 const.OPT_DELIVER_PAYLOADS: |
110 {"type": "boolean", | 110 {"type": "boolean", |
111 "label": "Deliver payloads with event notifications"}, | 111 "label": "Deliver payloads with event notifications"}, |
112 "pubsub#send_last_published_item": | 112 const.OPT_SEND_LAST_PUBLISHED_ITEM: |
113 {"type": "list-single", | 113 {"type": "list-single", |
114 "label": "When to send the last published item", | 114 "label": "When to send the last published item", |
115 "options": { | 115 "options": { |
116 "never": "Never", | 116 "never": "Never", |
117 "on_sub": "When a new subscription is processed"} | 117 "on_sub": "When a new subscription is processed"} |
179 | 179 |
180 def supportsPublishModel(self): | 180 def supportsPublishModel(self): |
181 return True | 181 return True |
182 | 182 |
183 | 183 |
184 def getNodeType(self, nodeIdentifier): | 184 def getNodeType(self, nodeIdentifier, pep, recipient=None): |
185 d = self.storage.getNode(nodeIdentifier) | 185 # FIXME: manage pep and recipient |
186 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
186 d.addCallback(lambda node: node.getType()) | 187 d.addCallback(lambda node: node.getType()) |
187 return d | 188 return d |
188 | 189 |
189 | 190 |
190 def getNodes(self): | 191 def getNodes(self, pep): |
191 return self.storage.getNodeIds() | 192 return self.storage.getNodeIds(pep) |
192 | 193 |
193 | 194 |
194 def getNodeMetaData(self, nodeIdentifier): | 195 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None): |
195 d = self.storage.getNode(nodeIdentifier) | 196 # FIXME: manage pep and recipient |
197 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
196 d.addCallback(lambda node: node.getMetaData()) | 198 d.addCallback(lambda node: node.getMetaData()) |
197 d.addCallback(self._makeMetaData) | 199 d.addCallback(self._makeMetaData) |
198 return d | 200 return d |
199 | 201 |
200 | 202 |
212 | 214 |
213 def _checkAuth(self, node, requestor): | 215 def _checkAuth(self, node, requestor): |
214 """ Check authorisation of publishing in node for requestor """ | 216 """ Check authorisation of publishing in node for requestor """ |
215 | 217 |
216 def check(affiliation): | 218 def check(affiliation): |
217 d = defer.succeed(node) | 219 d = defer.succeed((affiliation, node)) |
218 configuration = node.getConfiguration() | 220 configuration = node.getConfiguration() |
219 publish_model = configuration[const.OPT_PUBLISH_MODEL] | 221 publish_model = configuration[const.OPT_PUBLISH_MODEL] |
220 | 222 if publish_model == const.VAL_PMODEL_PUBLISHERS: |
221 if (publish_model == const.VAL_PMODEL_PUBLISHERS): | |
222 if affiliation not in ['owner', 'publisher']: | 223 if affiliation not in ['owner', 'publisher']: |
223 raise error.Forbidden() | 224 raise error.Forbidden() |
224 elif (publish_model == const.VAL_PMODEL_SUBSCRIBERS): | 225 elif publish_model == const.VAL_PMODEL_SUBSCRIBERS: |
225 if affiliation not in ['owner', 'publisher']: | 226 if affiliation not in ['owner', 'publisher']: |
226 # we are in subscribers publish model, we must check that | 227 # we are in subscribers publish model, we must check that |
227 # the requestor is a subscriber to allow him to publish | 228 # the requestor is a subscriber to allow him to publish |
228 | 229 |
229 def checkSubscription(subscribed): | 230 def checkSubscription(subscribed): |
230 if not subscribed: | 231 if not subscribed: |
231 raise error.Forbidden() | 232 raise error.Forbidden() |
232 return node | 233 return (affiliation, node) |
233 | 234 |
234 d.addCallback(lambda ignore: node.isSubscribed(requestor)) | 235 d.addCallback(lambda ignore: node.isSubscribed(requestor)) |
235 d.addCallback(checkSubscription) | 236 d.addCallback(checkSubscription) |
236 elif publish_model != const.VAL_PMODEL_OPEN: | 237 elif publish_model != const.VAL_PMODEL_OPEN: |
237 raise Exception('Unexpected value') # publish_model must be publishers (default), subscribers or open. | 238 raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open. |
238 | 239 |
239 return d | 240 return d |
240 | 241 |
241 d = node.getAffiliation(requestor) | 242 d = node.getAffiliation(requestor) |
242 d.addCallback(check) | 243 d.addCallback(check) |
244 | 245 |
245 def parseItemConfig(self, item): | 246 def parseItemConfig(self, item): |
246 """Get and remove item configuration information | 247 """Get and remove item configuration information |
247 @param item: | 248 @param item: |
248 """ | 249 """ |
250 # FIXME: dirty ! Need to use elements() | |
249 item_config = None | 251 item_config = None |
250 access_model = const.VAL_AMODEL_DEFAULT | 252 access_model = const.VAL_AMODEL_DEFAULT |
251 for i in range(len(item.children)): | 253 for i in range(len(item.children)): |
252 elt = item.children[i] | 254 elt = item.children[i] |
253 if not (elt.uri,elt.name)==(data_form.NS_X_DATA,'x'): | 255 if not (elt.uri,elt.name)==(data_form.NS_X_DATA,'x'): |
262 access_model = item_config.get(const.OPT_ACCESS_MODEL, const.VAL_AMODEL_DEFAULT) | 264 access_model = item_config.get(const.OPT_ACCESS_MODEL, const.VAL_AMODEL_DEFAULT) |
263 | 265 |
264 return (access_model, item_config) | 266 return (access_model, item_config) |
265 | 267 |
266 | 268 |
267 def publish(self, nodeIdentifier, items, requestor): | 269 def _checkOverwrite(self, node, itemIdentifiers, publisher): |
268 d = self.storage.getNode(nodeIdentifier) | 270 """Check that the itemIdentifiers correspond to items published |
271 by the current publisher""" | |
272 def doCheck(item_pub_map): | |
273 for item_publisher in item_pub_map.iterValues(): | |
274 if item_publisher.userhost() != publisher.userhost(): | |
275 raise error.ItemForbidden() | |
276 | |
277 d = node.getItemsPublishers(itemIdentifiers) | |
278 d.addCallback(doCheck) | |
279 return d | |
280 | |
281 | |
282 def publish(self, nodeIdentifier, items, requestor, pep, recipient): | |
283 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
269 d.addCallback(self._checkAuth, requestor) | 284 d.addCallback(self._checkAuth, requestor) |
270 #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster. | 285 #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster. |
271 #FIXME: in addition, there can be several owners: that is not managed yet | 286 #FIXME: in addition, there can be several owners: that is not managed yet |
272 d.addCallback(self._doPublish, items, requestor) | 287 d.addCallback(self._doPublish, items, requestor, pep, recipient) |
273 return d | 288 return d |
274 | 289 |
275 | 290 |
276 def _doPublish(self, node, items, requestor): | 291 def _doPublish(self, result, items, requestor, pep, recipient): |
292 affiliation, node = result | |
277 if node.nodeType == 'collection': | 293 if node.nodeType == 'collection': |
278 raise error.NoPublishing() | 294 raise error.NoPublishing() |
279 | 295 |
280 configuration = node.getConfiguration() | 296 configuration = node.getConfiguration() |
281 persistItems = configuration["pubsub#persist_items"] | 297 persistItems = configuration[const.OPT_PERSIST_ITEMS] |
282 deliverPayloads = configuration["pubsub#deliver_payloads"] | 298 deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS] |
283 | 299 |
284 if items and not persistItems and not deliverPayloads: | 300 if items and not persistItems and not deliverPayloads: |
285 raise error.ItemForbidden() | 301 raise error.ItemForbidden() |
286 elif not items and (persistItems or deliverPayloads): | 302 elif not items and (persistItems or deliverPayloads): |
287 raise error.ItemRequired() | 303 raise error.ItemRequired() |
288 | 304 |
289 parsed_items = [] | 305 items_data = [] |
306 check_overwrite = False | |
290 for item in items: | 307 for item in items: |
291 if persistItems or deliverPayloads: | 308 if persistItems or deliverPayloads: |
292 item.uri = None | 309 item.uri = None |
293 item.defaultUri = None | 310 item.defaultUri = None |
294 if not item.getAttribute("id"): | 311 if not item.getAttribute("id"): |
295 item["id"] = str(uuid.uuid4()) | 312 item["id"] = str(uuid.uuid4()) |
313 else: | |
314 check_overwrite = True | |
296 access_model, item_config = self.parseItemConfig(item) | 315 access_model, item_config = self.parseItemConfig(item) |
297 parsed_items.append((access_model, item_config, item)) | 316 items_data.append((item, access_model, item_config)) |
298 | 317 |
299 if persistItems: | 318 if persistItems: |
300 d = node.storeItems(parsed_items, requestor) | 319 if check_overwrite and affiliation != 'owner': |
320 # we don't want a publisher to overwrite the item | |
321 # of an other publisher | |
322 d = self._checkOverwrite(node, [item['id'] for item in items if item.getAttribute('id')], requestor) | |
323 d.addCallback(lambda _: node.storeItems(items_data, requestor)) | |
324 else: | |
325 d = node.storeItems(items_data, requestor) | |
301 else: | 326 else: |
302 d = defer.succeed(None) | 327 d = defer.succeed(None) |
303 | 328 |
304 d.addCallback(self._doNotify, node, parsed_items, | 329 d.addCallback(self._doNotify, node, items_data, |
305 deliverPayloads) | 330 deliverPayloads, pep, recipient) |
306 return d | 331 return d |
307 | 332 |
308 | 333 |
309 def _doNotify(self, result, node, items, deliverPayloads): | 334 def _doNotify(self, result, node, items_data, deliverPayloads, pep, recipient): |
310 if items and not deliverPayloads: | 335 if items_data and not deliverPayloads: |
311 for access_model, item_config, item in items: | 336 for access_model, item_config, item in items_data: |
312 item.children = [] | 337 item.children = [] |
313 | 338 self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient}, |
314 self.dispatch({'items': items, 'node': node}, | |
315 '//event/pubsub/notify') | 339 '//event/pubsub/notify') |
316 | 340 |
317 | 341 |
318 def getNotifications(self, nodeIdentifier, items): | 342 def getNotifications(self, nodeDbId, items_data): |
319 | 343 """Build a list of subscriber to the node |
320 def toNotifications(subscriptions, nodeIdentifier, items): | 344 |
345 subscribers will be associated with subscribed items, | |
346 and subscription type. | |
347 """ | |
348 | |
349 def toNotifications(subscriptions, items_data): | |
321 subsBySubscriber = {} | 350 subsBySubscriber = {} |
322 for subscription in subscriptions: | 351 for subscription in subscriptions: |
323 if subscription.options.get('pubsub#subscription_type', | 352 if subscription.options.get('pubsub#subscription_type', |
324 'items') == 'items': | 353 'items') == 'items': |
325 subs = subsBySubscriber.setdefault(subscription.subscriber, | 354 subs = subsBySubscriber.setdefault(subscription.subscriber, |
326 set()) | 355 set()) |
327 subs.add(subscription) | 356 subs.add(subscription) |
328 | 357 |
329 notifications = [(subscriber, subscriptions_, items) | 358 notifications = [(subscriber, subscriptions_, items_data) |
330 for subscriber, subscriptions_ | 359 for subscriber, subscriptions_ |
331 in subsBySubscriber.iteritems()] | 360 in subsBySubscriber.iteritems()] |
332 | 361 |
333 return notifications | 362 return notifications |
334 | 363 |
335 def rootNotFound(failure): | 364 def rootNotFound(failure): |
336 failure.trap(error.NodeNotFound) | 365 failure.trap(error.NodeNotFound) |
337 return [] | 366 return [] |
338 | 367 |
339 d1 = self.storage.getNode(nodeIdentifier) | 368 d1 = self.storage.getNodeById(nodeDbId) |
340 d1.addCallback(lambda node: node.getSubscriptions('subscribed')) | 369 d1.addCallback(lambda node: node.getSubscriptions('subscribed')) |
341 d2 = self.storage.getNode('') | 370 # FIXME: must add root node subscriptions ? |
342 d2.addCallback(lambda node: node.getSubscriptions('subscribed')) | 371 # d2 = self.storage.getNode('', False) # FIXME: to check |
343 d2.addErrback(rootNotFound) | 372 # d2.addCallback(lambda node: node.getSubscriptions('subscribed')) |
344 d = defer.gatherResults([d1, d2]) | 373 # d2.addErrback(rootNotFound) |
345 d.addCallback(lambda result: result[0] + result[1]) | 374 # d = defer.gatherResults([d1, d2]) |
346 d.addCallback(toNotifications, nodeIdentifier, items) | 375 # d.addCallback(lambda result: result[0] + result[1]) |
347 return d | 376 d1.addCallback(toNotifications, items_data) |
348 | 377 return d1 |
349 def registerNotifier(self, observerfn, *args, **kwargs): | 378 |
379 def registerPublishNotifier(self, observerfn, *args, **kwargs): | |
350 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) | 380 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) |
351 | 381 |
352 def subscribe(self, nodeIdentifier, subscriber, requestor): | 382 def registerRetractNotifier(self, observerfn, *args, **kwargs): |
383 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs) | |
384 | |
385 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): | |
353 subscriberEntity = subscriber.userhostJID() | 386 subscriberEntity = subscriber.userhostJID() |
354 if subscriberEntity != requestor.userhostJID(): | 387 if subscriberEntity != requestor.userhostJID(): |
355 return defer.fail(error.Forbidden()) | 388 return defer.fail(error.Forbidden()) |
356 | 389 |
357 d = self.storage.getNode(nodeIdentifier) | 390 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
358 d.addCallback(_getAffiliation, subscriberEntity) | 391 d.addCallback(_getAffiliation, subscriberEntity) |
359 d.addCallback(self._doSubscribe, subscriber) | 392 d.addCallback(self._doSubscribe, subscriber) |
360 return d | 393 return d |
361 | 394 |
362 | 395 |
363 def _doSubscribe(self, result, subscriber): | 396 def _doSubscribe(self, result, subscriber): |
397 # TODO: implement other access models | |
364 node, affiliation = result | 398 node, affiliation = result |
365 #FIXME: must check node's access_model before subscribing | |
366 | 399 |
367 if affiliation == 'outcast': | 400 if affiliation == 'outcast': |
368 raise error.Forbidden() | 401 raise error.Forbidden() |
402 | |
403 access_model = node.getAccessModel() | |
404 | |
405 if access_model != const.VAL_AMODEL_OPEN: | |
406 raise NotImplementedError | |
369 | 407 |
370 def trapExists(failure): | 408 def trapExists(failure): |
371 failure.trap(error.SubscriptionExists) | 409 failure.trap(error.SubscriptionExists) |
372 return False | 410 return False |
373 | 411 |
378 return d | 416 return d |
379 | 417 |
380 d = node.addSubscription(subscriber, 'subscribed', {}) | 418 d = node.addSubscription(subscriber, 'subscribed', {}) |
381 d.addCallbacks(lambda _: True, trapExists) | 419 d.addCallbacks(lambda _: True, trapExists) |
382 d.addCallback(cb) | 420 d.addCallback(cb) |
421 | |
383 return d | 422 return d |
384 | 423 |
385 | 424 |
386 def _sendLastPublished(self, subscription, node): | 425 def _sendLastPublished(self, subscription, node): |
387 | 426 |
404 d.addErrback(log.err) | 443 d.addErrback(log.err) |
405 | 444 |
406 return subscription | 445 return subscription |
407 | 446 |
408 | 447 |
409 def unsubscribe(self, nodeIdentifier, subscriber, requestor): | 448 def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): |
410 if subscriber.userhostJID() != requestor.userhostJID(): | 449 if subscriber.userhostJID() != requestor.userhostJID(): |
411 return defer.fail(error.Forbidden()) | 450 return defer.fail(error.Forbidden()) |
412 | 451 |
413 d = self.storage.getNode(nodeIdentifier) | 452 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
414 d.addCallback(lambda node: node.removeSubscription(subscriber)) | 453 d.addCallback(lambda node: node.removeSubscription(subscriber)) |
415 return d | 454 return d |
416 | 455 |
417 | 456 |
418 def getSubscriptions(self, entity): | 457 def getSubscriptions(self, entity): |
426 | 465 |
427 def supportsInstantNodes(self): | 466 def supportsInstantNodes(self): |
428 return True | 467 return True |
429 | 468 |
430 | 469 |
431 def createNode(self, nodeIdentifier, requestor, options = None): | 470 def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None): |
432 if not nodeIdentifier: | 471 if not nodeIdentifier: |
433 nodeIdentifier = 'generic/%s' % uuid.uuid4() | 472 nodeIdentifier = 'generic/%s' % uuid.uuid4() |
434 | 473 |
435 if not options: | 474 if not options: |
436 options = {} | 475 options = {} |
437 | 476 |
438 if self.supportsCreatorCheck(): | 477 # if self.supportsCreatorCheck(): |
439 groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) | 478 # groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) |
440 try: | 479 # try: |
441 nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) | 480 # nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) |
442 except InvalidFormat: | 481 # except InvalidFormat: |
443 is_user_jid = False | 482 # is_user_jid = False |
444 else: | 483 # else: |
445 is_user_jid = bool(nodeIdentifierJID.user) | 484 # is_user_jid = bool(nodeIdentifierJID.user) |
446 | 485 |
447 if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): | 486 # if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): |
448 #we have an user jid node, but not created by the owner of this jid | 487 # #we have an user jid node, but not created by the owner of this jid |
449 print "Wrong creator" | 488 # print "Wrong creator" |
450 raise error.Forbidden() | 489 # raise error.Forbidden() |
451 | 490 |
452 nodeType = 'leaf' | 491 nodeType = 'leaf' |
453 config = self.storage.getDefaultConfiguration(nodeType) | 492 config = self.storage.getDefaultConfiguration(nodeType) |
454 config['pubsub#node_type'] = nodeType | 493 config['pubsub#node_type'] = nodeType |
455 config.update(options) | 494 config.update(options) |
456 | 495 |
457 d = self.storage.createNode(nodeIdentifier, requestor, config) | 496 d = self.storage.createNode(nodeIdentifier, requestor, config, pep, recipient) |
458 d.addCallback(lambda _: nodeIdentifier) | 497 d.addCallback(lambda _: nodeIdentifier) |
459 return d | 498 return d |
460 | 499 |
461 | 500 |
462 def getDefaultConfiguration(self, nodeType): | 501 def getDefaultConfiguration(self, nodeType): |
463 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) | 502 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) |
464 return d | 503 return d |
465 | 504 |
466 | 505 |
467 def getNodeConfiguration(self, nodeIdentifier): | 506 def getNodeConfiguration(self, nodeIdentifier, pep, recipient): |
468 if not nodeIdentifier: | 507 if not nodeIdentifier: |
469 return defer.fail(error.NoRootNode()) | 508 return defer.fail(error.NoRootNode()) |
470 | 509 |
471 d = self.storage.getNode(nodeIdentifier) | 510 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
472 d.addCallback(lambda node: node.getConfiguration()) | 511 d.addCallback(lambda node: node.getConfiguration()) |
473 | 512 |
474 return d | 513 return d |
475 | 514 |
476 | 515 |
477 def setNodeConfiguration(self, nodeIdentifier, options, requestor): | 516 def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient): |
478 if not nodeIdentifier: | 517 if not nodeIdentifier: |
479 return defer.fail(error.NoRootNode()) | 518 return defer.fail(error.NoRootNode()) |
480 | 519 |
481 d = self.storage.getNode(nodeIdentifier) | 520 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
482 d.addCallback(_getAffiliation, requestor) | 521 d.addCallback(_getAffiliation, requestor) |
483 d.addCallback(self._doSetNodeConfiguration, options) | 522 d.addCallback(self._doSetNodeConfiguration, options) |
484 return d | 523 return d |
485 | 524 |
486 | 525 |
495 | 534 |
496 def getAffiliations(self, entity): | 535 def getAffiliations(self, entity): |
497 return self.storage.getAffiliations(entity) | 536 return self.storage.getAffiliations(entity) |
498 | 537 |
499 | 538 |
500 def getItems(self, nodeIdentifier, requestor, maxItems=None, | 539 def getItems(self, nodeIdentifier, recipient, maxItems=None, |
501 itemIdentifiers=None, ext_data=None): | 540 itemIdentifiers=None, ext_data=None): |
502 if ext_data is None: | 541 if ext_data is None: |
503 ext_data = {} | 542 ext_data = {} |
504 d = self.storage.getNode(nodeIdentifier) | 543 d = self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) |
505 d.addCallback(_getAffiliation, requestor) | 544 d.addCallback(_getAffiliation, recipient) |
506 d.addCallback(self._doGetItems, requestor, maxItems, itemIdentifiers, | 545 d.addCallback(self._doGetItems, recipient, maxItems, itemIdentifiers, |
507 ext_data) | 546 ext_data) |
508 return d | 547 return d |
509 | 548 |
510 def checkGroup(self, roster_groups, entity): | 549 def checkGroup(self, roster_groups, entity): |
511 """Check that entity is authorized and in roster | 550 """Check that entity is authorized and in roster |
543 if access_model == const.VAL_AMODEL_OPEN: | 582 if access_model == const.VAL_AMODEL_OPEN: |
544 pass | 583 pass |
545 elif access_model == const.VAL_AMODEL_ROSTER: | 584 elif access_model == const.VAL_AMODEL_ROSTER: |
546 form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG) | 585 form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG) |
547 access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER) | 586 access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER) |
548 allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list) | 587 allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list[const.OPT_ROSTER_GROUPS_ALLOWED]) |
549 form.addField(access) | 588 form.addField(access) |
550 form.addField(allowed) | 589 form.addField(allowed) |
551 item.addChild(form.toElement()) | 590 item.addChild(form.toElement()) |
552 elif access_model == const.VAL_AMODEL_JID: | 591 elif access_model == const.VAL_AMODEL_JID: |
553 #FIXME: manage jid | 592 #FIXME: manage jid |
588 return [] | 627 return [] |
589 | 628 |
590 if affiliation == 'outcast': | 629 if affiliation == 'outcast': |
591 raise error.Forbidden() | 630 raise error.Forbidden() |
592 | 631 |
593 access_model = node.getConfiguration()["pubsub#access_model"] | 632 access_model = node.getAccessModel() |
594 d = node.getNodeOwner() | 633 d = node.getNodeOwner() |
595 d.addCallback(self.privilege.getRoster) | 634 d.addCallback(self.privilege.getRoster) |
596 d.addErrback(self._rosterEb) | 635 d.addErrback(self._rosterEb) |
597 | 636 |
598 if access_model == 'open' or affiliation == 'owner': | 637 if access_model == const.VAL_AMODEL_OPEN or affiliation == 'owner': |
599 d.addCallback(lambda roster: (True, roster)) | 638 d.addCallback(lambda roster: (True, roster)) |
600 d.addCallback(access_checked) | 639 d.addCallback(access_checked) |
601 elif access_model == 'roster': | 640 elif access_model == const.VAL_AMODEL_ROSTER: |
602 d.addCallback(self._getNodeGroups,node.nodeIdentifier) | 641 d.addCallback(self._getNodeGroups,node.nodeIdentifier) |
603 d.addCallback(self.checkGroup, requestor) | 642 d.addCallback(self.checkGroup, requestor) |
604 d.addCallback(access_checked) | 643 d.addCallback(access_checked) |
605 | 644 |
606 return d | 645 return d |
648 elts.append(response.toElement()) | 687 elts.append(response.toElement()) |
649 return elts | 688 return elts |
650 | 689 |
651 return defer.DeferredList(d_list).addCallback(render) | 690 return defer.DeferredList(d_list).addCallback(render) |
652 | 691 |
653 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): | 692 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): |
654 d = self.storage.getNode(nodeIdentifier) | 693 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
655 d.addCallback(_getAffiliation, requestor) | 694 d.addCallback(_getAffiliation, requestor) |
656 if const.FLAG_RETRACT_ALLOW_PUBLISHER: | 695 # FIXME: to be checked |
657 d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor) | 696 # if const.FLAG_RETRACT_ALLOW_PUBLISHER: |
658 else: | 697 # d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor) |
659 d.addCallback(self._doRetract, itemIdentifiers) | 698 # else: |
660 return d | 699 # d.addCallback(self._doRetract, itemIdentifiers) |
661 | 700 d.addCallback(self._doRetract, itemIdentifiers, notify, pep, recipient) |
662 def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor): | 701 return d |
663 """This method has been added to allow the publisher | 702 |
664 of an item to retract it, even if he has no affiliation | 703 # FIXME: to be checked |
665 to that item. For instance, this allows you to delete | 704 # def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor): |
666 an item you posted in a node of "open" publish model. | 705 # """This method has been added to allow the publisher |
667 """ | 706 # of an item to retract it, even if he has no affiliation |
707 # to that item. For instance, this allows you to delete | |
708 # an item you posted in a node of "open" publish model. | |
709 # """ | |
710 # node, affiliation = result | |
711 # if affiliation in ['owner', 'publisher']: | |
712 # return self._doRetract(result, itemIdentifiers) | |
713 # d = node.filterItemsWithPublisher(itemIdentifiers, requestor) | |
714 # def filterCb(filteredItems): | |
715 # if not filteredItems: | |
716 # return self._doRetract(result, itemIdentifiers) | |
717 # # XXX: fake an affiliation that does NOT exist | |
718 # return self._doRetract((node, 'publisher'), filteredItems) | |
719 # d.addCallback(filterCb) | |
720 # return d | |
721 | |
722 def _doRetract(self, result, itemIdentifiers, notify, pep, recipient): | |
668 node, affiliation = result | 723 node, affiliation = result |
669 if affiliation in ['owner', 'publisher']: | 724 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] |
670 return self._doRetract(result, itemIdentifiers) | |
671 d = node.filterItemsWithPublisher(itemIdentifiers, requestor) | |
672 def filterCb(filteredItems): | |
673 if not filteredItems: | |
674 return self._doRetract(result, itemIdentifiers) | |
675 # XXX: fake an affiliation that does NOT exist | |
676 return self._doRetract((node, 'publisher'), filteredItems) | |
677 d.addCallback(filterCb) | |
678 return d | |
679 | |
680 def _doRetract(self, result, itemIdentifiers): | |
681 node, affiliation = result | |
682 persistItems = node.getConfiguration()["pubsub#persist_items"] | |
683 | 725 |
684 if affiliation not in ['owner', 'publisher']: | 726 if affiliation not in ['owner', 'publisher']: |
685 raise error.Forbidden() | 727 raise error.Forbidden() |
686 | 728 |
687 if not persistItems: | 729 if not persistItems: |
688 raise error.NodeNotPersistent() | 730 raise error.NodeNotPersistent() |
689 | 731 |
690 d = node.removeItems(itemIdentifiers) | 732 # we need to get the items before removing them, for the notifications |
691 d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) | 733 |
692 return d | 734 def removeItems(items_data): |
693 | 735 """Remove the items and keep only actually removed ones in items_data""" |
694 | 736 d = node.removeItems(itemIdentifiers) |
695 def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): | 737 d.addCallback(lambda removed: [item_data for item_data in items_data if item_data[0]["id"] in removed]) |
696 self.dispatch({'itemIdentifiers': itemIdentifiers, | 738 return d |
697 'nodeIdentifier': nodeIdentifier }, | 739 |
740 d = node.getItemsById(None, True, itemIdentifiers) | |
741 d.addCallback(removeItems) | |
742 | |
743 if notify: | |
744 d.addCallback(self._doNotifyRetraction, node, pep, recipient) | |
745 return d | |
746 | |
747 | |
748 def _doNotifyRetraction(self, items_data, node, pep, recipient): | |
749 self.dispatch({'items_data': items_data, | |
750 'node': node, | |
751 'pep': pep, | |
752 'recipient': recipient}, | |
698 '//event/pubsub/retract') | 753 '//event/pubsub/retract') |
699 | 754 |
700 | 755 |
701 def purgeNode(self, nodeIdentifier, requestor): | 756 def purgeNode(self, nodeIdentifier, requestor, pep, recipient): |
702 d = self.storage.getNode(nodeIdentifier) | 757 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
703 d.addCallback(_getAffiliation, requestor) | 758 d.addCallback(_getAffiliation, requestor) |
704 d.addCallback(self._doPurge) | 759 d.addCallback(self._doPurge) |
705 return d | 760 return d |
706 | 761 |
707 | 762 |
708 def _doPurge(self, result): | 763 def _doPurge(self, result): |
709 node, affiliation = result | 764 node, affiliation = result |
710 persistItems = node.getConfiguration()["pubsub#persist_items"] | 765 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] |
711 | 766 |
712 if affiliation != 'owner': | 767 if affiliation != 'owner': |
713 raise error.Forbidden() | 768 raise error.Forbidden() |
714 | 769 |
715 if not persistItems: | 770 if not persistItems: |
726 | 781 |
727 def registerPreDelete(self, preDeleteFn): | 782 def registerPreDelete(self, preDeleteFn): |
728 self._callbackList.append(preDeleteFn) | 783 self._callbackList.append(preDeleteFn) |
729 | 784 |
730 | 785 |
731 def getSubscribers(self, nodeIdentifier): | 786 def getSubscribers(self, nodeIdentifier, pep, recipient): |
732 def cb(subscriptions): | 787 def cb(subscriptions): |
733 return [subscription.subscriber for subscription in subscriptions] | 788 return [subscription.subscriber for subscription in subscriptions] |
734 | 789 |
735 d = self.storage.getNode(nodeIdentifier) | 790 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
736 d.addCallback(lambda node: node.getSubscriptions('subscribed')) | 791 d.addCallback(lambda node: node.getSubscriptions('subscribed')) |
737 d.addCallback(cb) | 792 d.addCallback(cb) |
738 return d | 793 return d |
739 | 794 |
740 | 795 |
741 def deleteNode(self, nodeIdentifier, requestor, redirectURI=None): | 796 def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): |
742 d = self.storage.getNode(nodeIdentifier) | 797 d = self.storage.getNode(nodeIdentifier, pep, recipient) |
743 d.addCallback(_getAffiliation, requestor) | 798 d.addCallback(_getAffiliation, requestor) |
744 d.addCallback(self._doPreDelete, redirectURI) | 799 d.addCallback(self._doPreDelete, redirectURI, pep, recipient) |
745 return d | 800 return d |
746 | 801 |
747 | 802 |
748 def _doPreDelete(self, result, redirectURI): | 803 def _doPreDelete(self, result, redirectURI, pep, recipient): |
749 node, affiliation = result | 804 node, affiliation = result |
750 | 805 |
751 if affiliation != 'owner': | 806 if affiliation != 'owner': |
752 raise error.Forbidden() | 807 raise error.Forbidden() |
753 | 808 |
754 data = {'node': node, | 809 data = {'node': node, |
755 'redirectURI': redirectURI} | 810 'redirectURI': redirectURI} |
756 | 811 |
757 d = defer.DeferredList([cb(data) | 812 d = defer.DeferredList([cb(data, pep, recipient) |
758 for cb in self._callbackList], | 813 for cb in self._callbackList], |
759 consumeErrors=1) | 814 consumeErrors=1) |
760 d.addCallback(self._doDelete, node.nodeIdentifier) | 815 d.addCallback(self._doDelete, node.nodeDbId) |
761 | 816 |
762 | 817 |
763 def _doDelete(self, result, nodeIdentifier): | 818 def _doDelete(self, result, nodeDbId): |
764 dl = [] | 819 dl = [] |
765 for succeeded, r in result: | 820 for succeeded, r in result: |
766 if succeeded and r: | 821 if succeeded and r: |
767 dl.extend(r) | 822 dl.extend(r) |
768 | 823 |
769 d = self.storage.deleteNode(nodeIdentifier) | 824 d = self.storage.deleteNodeByDbId(nodeDbId) |
770 d.addCallback(self._doNotifyDelete, dl) | 825 d.addCallback(self._doNotifyDelete, dl) |
771 | 826 |
772 return d | 827 return d |
773 | 828 |
774 | 829 |
810 error.NodeNotFound: ('item-not-found', None, None), | 865 error.NodeNotFound: ('item-not-found', None, None), |
811 error.NodeExists: ('conflict', None, None), | 866 error.NodeExists: ('conflict', None, None), |
812 error.Forbidden: ('forbidden', None, None), | 867 error.Forbidden: ('forbidden', None, None), |
813 error.NotAuthorized: ('not-authorized', None, None), | 868 error.NotAuthorized: ('not-authorized', None, None), |
814 error.NotInRoster: ('not-authorized', 'not-in-roster-group', None), | 869 error.NotInRoster: ('not-authorized', 'not-in-roster-group', None), |
870 error.ItemNotFound: ('item-not-found', None, None), | |
815 error.ItemForbidden: ('bad-request', 'item-forbidden', None), | 871 error.ItemForbidden: ('bad-request', 'item-forbidden', None), |
816 error.ItemRequired: ('bad-request', 'item-required', None), | 872 error.ItemRequired: ('bad-request', 'item-required', None), |
817 error.NoInstantNodes: ('not-acceptable', | 873 error.NoInstantNodes: ('not-acceptable', |
818 'unsupported', | 874 'unsupported', |
819 'instant-nodes'), | 875 'instant-nodes'), |
836 PubSubResource.__init__(self) | 892 PubSubResource.__init__(self) |
837 | 893 |
838 self.backend = backend | 894 self.backend = backend |
839 self.hideNodes = False | 895 self.hideNodes = False |
840 | 896 |
841 self.backend.registerNotifier(self._notify) | 897 self.backend.registerPublishNotifier(self._notifyPublish) |
898 self.backend.registerRetractNotifier(self._notifyRetract) | |
842 self.backend.registerPreDelete(self._preDelete) | 899 self.backend.registerPreDelete(self._preDelete) |
843 | 900 |
844 if self.backend.supportsCreatorCheck(): | 901 # FIXME: to be removed, it's not useful anymore as PEP is now used |
845 self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to | 902 # if self.backend.supportsCreatorCheck(): |
903 # self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to | |
846 # a jid in this server) is created by the right jid | 904 # a jid in this server) is created by the right jid |
847 | 905 |
848 if self.backend.supportsAutoCreate(): | 906 if self.backend.supportsAutoCreate(): |
849 self.features.append("auto-create") | 907 self.features.append("auto-create") |
850 | 908 |
864 self.features.append("groupblog") | 922 self.features.append("groupblog") |
865 | 923 |
866 # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples | 924 # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples |
867 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) | 925 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) |
868 | 926 |
869 def _notify(self, data): | 927 def _notifyPublish(self, data): |
870 items = data['items'] | 928 items_data = data['items_data'] |
871 node = data['node'] | 929 node = data['node'] |
872 | 930 pep = data['pep'] |
873 def _notifyAllowed(result): | 931 recipient = data['recipient'] |
874 """Check access of subscriber for each item, | 932 |
875 and notify only allowed ones""" | 933 def afterPrepare(result): |
876 notifications, (owner_jid,roster) = result | 934 owner_jid, notifications_filtered = result |
877 | |
878 #we filter items not allowed for the subscribers | |
879 notifications_filtered = [] | |
880 | |
881 for subscriber, subscriptions, _items in notifications: | |
882 allowed_items = [] #we keep only item which subscriber can access | |
883 | |
884 for access_model, item_config, item in _items: | |
885 if access_model == 'open': | |
886 allowed_items.append(item) | |
887 elif access_model == 'roster': | |
888 _subscriber = subscriber.userhostJID() | |
889 if not _subscriber in roster: | |
890 continue | |
891 #the subscriber is known, is he in the right group ? | |
892 authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] | |
893 if roster[_subscriber].groups.intersection(authorized_groups): | |
894 allowed_items.append(item) | |
895 | |
896 else: #unknown access_model | |
897 raise NotImplementedError | |
898 | |
899 if allowed_items: | |
900 notifications_filtered.append((subscriber, subscriptions, allowed_items)) | |
901 | |
902 #we notify the owner | 935 #we notify the owner |
903 #FIXME: check if this comply with XEP-0060 (option needed ?) | 936 #FIXME: check if this comply with XEP-0060 (option needed ?) |
904 #TODO: item's access model have to be sent back to owner | 937 #TODO: item's access model have to be sent back to owner |
905 #TODO: same thing for getItems | 938 #TODO: same thing for getItems |
906 | 939 |
908 """ Attach item configuration to this item | 941 """ Attach item configuration to this item |
909 Used to give item configuration back to node's owner (and *only* to owner) | 942 Used to give item configuration back to node's owner (and *only* to owner) |
910 """ | 943 """ |
911 #TODO: a test should check that only the owner get the item configuration back | 944 #TODO: a test should check that only the owner get the item configuration back |
912 | 945 |
913 access_model, item_config, item = item_data | 946 item, access_model, item_config = item_data |
914 new_item = deepcopy(item) | 947 new_item = deepcopy(item) |
915 if item_config: | 948 if item_config: |
916 new_item.addChild(item_config.toElement()) | 949 new_item.addChild(item_config.toElement()) |
917 return new_item | 950 return new_item |
918 | 951 |
919 notifications_filtered.append((owner_jid, | 952 notifications_filtered.append((owner_jid, |
920 set([Subscription(node.nodeIdentifier, | 953 set([Subscription(node.nodeIdentifier, |
921 owner_jid, | 954 owner_jid, |
922 'subscribed')]), | 955 'subscribed')]), |
923 [getFullItem(item_data) for item_data in items])) | 956 [getFullItem(item_data) for item_data in items_data])) |
924 | 957 |
925 return self.pubsubService.notifyPublish( | 958 if pep: |
926 self.serviceJID, | 959 return self.backend.privilege.notifyPublish( |
927 node.nodeIdentifier, | 960 recipient, |
928 notifications_filtered) | 961 node.nodeIdentifier, |
929 | 962 notifications_filtered) |
930 | 963 |
931 if 'subscription' not in data: | 964 else: |
932 d1 = self.backend.getNotifications(node.nodeIdentifier, items) | 965 return self.pubsubService.notifyPublish( |
966 self.serviceJID, | |
967 node.nodeIdentifier, | |
968 notifications_filtered) | |
969 | |
970 d = self._prepareNotify(items_data, node, data.get('subscription')) | |
971 d.addCallback(afterPrepare) | |
972 return d | |
973 | |
974 def _notifyRetract(self, data): | |
975 items_data = data['items_data'] | |
976 node = data['node'] | |
977 pep = data['pep'] | |
978 recipient = data['recipient'] | |
979 | |
980 def afterPrepare(result): | |
981 owner_jid, notifications_filtered = result | |
982 #we add the owner | |
983 | |
984 notifications_filtered.append((owner_jid, | |
985 set([Subscription(node.nodeIdentifier, | |
986 owner_jid, | |
987 'subscribed')]), | |
988 [item for item, _, _ in items_data])) | |
989 | |
990 if pep: | |
991 return self.backend.privilege.notifyRetract( | |
992 recipient, | |
993 node.nodeIdentifier, | |
994 notifications_filtered) | |
995 | |
996 else: | |
997 return self.pubsubService.notifyRetract( | |
998 self.serviceJID, | |
999 node.nodeIdentifier, | |
1000 notifications_filtered) | |
1001 | |
1002 d = self._prepareNotify(items_data, node, data.get('subscription')) | |
1003 d.addCallback(afterPrepare) | |
1004 return d | |
1005 | |
1006 | |
1007 def _prepareNotify(self, items_data, node, subscription=None): | |
1008 """Do a bunch of permissions check and filter notifications | |
1009 | |
1010 The owner is not added to these notifications, | |
1011 it must be called by the calling method | |
1012 @param items_data(tuple): must contain: | |
1013 - item (domish.Element) | |
1014 - access_model (unicode) | |
1015 - access_list (dict as returned getItemsById, or item_config) | |
1016 @param node(LeafNode): node hosting the items | |
1017 @param subscription(pubsub.Subscription, None): TODO | |
1018 | |
1019 @return (tuple): will contain: | |
1020 - notifications_filtered | |
1021 - node_owner_jid | |
1022 - items_data | |
1023 """ | |
1024 | |
1025 def filterNotifications(result): | |
1026 """Check access of subscriber for each item, and keep only allowed ones""" | |
1027 notifications, (owner_jid,roster) = result | |
1028 | |
1029 #we filter items not allowed for the subscribers | |
1030 notifications_filtered = [] | |
1031 | |
1032 for subscriber, subscriptions, _items_data in notifications: | |
1033 if subscriber == owner_jid: | |
1034 # as notification is always sent to owner, | |
1035 # we ignore owner if he is here | |
1036 continue | |
1037 allowed_items = [] #we keep only item which subscriber can access | |
1038 | |
1039 for item, access_model, access_list in _items_data: | |
1040 if access_model == const.VAL_AMODEL_OPEN: | |
1041 allowed_items.append(item) | |
1042 elif access_model == const.VAL_AMODEL_ROSTER: | |
1043 _subscriber = subscriber.userhostJID() | |
1044 if not _subscriber in roster: | |
1045 continue | |
1046 #the subscriber is known, is he in the right group ? | |
1047 authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED] | |
1048 if roster[_subscriber].groups.intersection(authorized_groups): | |
1049 allowed_items.append(item) | |
1050 | |
1051 else: #unknown access_model | |
1052 raise NotImplementedError | |
1053 | |
1054 if allowed_items: | |
1055 notifications_filtered.append((subscriber, subscriptions, allowed_items)) | |
1056 return (owner_jid, notifications_filtered) | |
1057 | |
1058 | |
1059 if subscription is None: | |
1060 d1 = self.backend.getNotifications(node.nodeDbId, items_data) | |
933 else: | 1061 else: |
934 subscription = data['subscription'] | |
935 d1 = defer.succeed([(subscription.subscriber, [subscription], | 1062 d1 = defer.succeed([(subscription.subscriber, [subscription], |
936 items)]) | 1063 items_data)]) |
937 | 1064 |
938 def _got_owner(owner_jid): | 1065 def _got_owner(owner_jid): |
939 #return a tuple with owner_jid and roster | 1066 #return a tuple with owner_jid and roster |
1067 def rosterEb(failure): | |
1068 log.msg("Error while getting roster: {}".format(failure.value)) | |
1069 return (owner_jid, {}) | |
1070 | |
940 d = self.backend.privilege.getRoster(owner_jid) | 1071 d = self.backend.privilege.getRoster(owner_jid) |
941 d.addErrback(self._rosterEb) | 1072 d.addErrback(rosterEb) |
942 d.addCallback(lambda roster: (owner_jid,roster)) | 1073 d.addCallback(lambda roster: (owner_jid,roster)) |
1074 return d | |
943 | 1075 |
944 d2 = node.getNodeOwner() | 1076 d2 = node.getNodeOwner() |
945 d2.addCallback(_got_owner) | 1077 d2.addCallback(_got_owner) |
946 | |
947 d = defer.gatherResults([d1, d2]) | 1078 d = defer.gatherResults([d1, d2]) |
948 d.addCallback(_notifyAllowed) | 1079 d.addCallback(filterNotifications) |
949 | 1080 return d |
950 def _preDelete(self, data): | 1081 |
1082 def _preDelete(self, data, pep, recipient): | |
951 nodeIdentifier = data['node'].nodeIdentifier | 1083 nodeIdentifier = data['node'].nodeIdentifier |
952 redirectURI = data.get('redirectURI', None) | 1084 redirectURI = data.get('redirectURI', None) |
953 d = self.backend.getSubscribers(nodeIdentifier) | 1085 d = self.backend.getSubscribers(nodeIdentifier, pep, recipient) |
954 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( | 1086 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( |
955 self.serviceJID, | 1087 self.serviceJID, |
956 nodeIdentifier, | 1088 nodeIdentifier, |
957 subscribers, | 1089 subscribers, |
958 redirectURI)) | 1090 redirectURI)) |
970 else: | 1102 else: |
971 exc = StanzaError(condition, text=msg) | 1103 exc = StanzaError(condition, text=msg) |
972 | 1104 |
973 raise exc | 1105 raise exc |
974 | 1106 |
975 def getInfo(self, requestor, service, nodeIdentifier): | 1107 def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None): |
1108 return [] # FIXME: disabled for now, need to manage PEP | |
976 if not requestor.resource: | 1109 if not requestor.resource: |
977 # this avoid error when getting a disco request from server during namespace delegation | 1110 # this avoid error when getting a disco request from server during namespace delegation |
978 return [] | 1111 return [] |
979 info = {} | 1112 info = {} |
980 | 1113 |
998 d.addErrback(trapNotFound) | 1131 d.addErrback(trapNotFound) |
999 d.addErrback(self._mapErrors) | 1132 d.addErrback(self._mapErrors) |
1000 return d | 1133 return d |
1001 | 1134 |
1002 | 1135 |
1003 def getNodes(self, requestor, service, nodeIdentifier): | 1136 def getNodes(self, requestor, service, nodeIdentifier, pep=None): |
1137 return defer.succeed([]) # FIXME: disabled for now, need to manage PEP | |
1004 if service.resource: | 1138 if service.resource: |
1005 return defer.succeed([]) | 1139 return defer.succeed([]) |
1006 d = self.backend.getNodes() | 1140 d = self.backend.getNodes(pep) |
1007 return d.addErrback(self._mapErrors) | 1141 return d.addErrback(self._mapErrors) |
1008 | 1142 |
1009 | 1143 |
1010 def getConfigurationOptions(self): | 1144 def getConfigurationOptions(self): |
1011 return self.backend.nodeOptions | 1145 return self.backend.nodeOptions |
1012 | 1146 |
1013 def _publish_errb(self, failure, request): | 1147 def _publish_errb(self, failure, request): |
1014 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): | 1148 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): |
1015 print "Auto-creating node %s" % (request.nodeIdentifier,) | 1149 print "Auto-creating node %s" % (request.nodeIdentifier,) |
1016 d = self.backend.createNode(request.nodeIdentifier, | 1150 d = self.backend.createNode(request.nodeIdentifier, |
1017 request.sender) | 1151 request.sender, |
1152 pep=self._isPep(request), | |
1153 recipient=request.recipient) | |
1018 d.addCallback(lambda ignore, | 1154 d.addCallback(lambda ignore, |
1019 request: self.backend.publish(request.nodeIdentifier, | 1155 request: self.backend.publish(request.nodeIdentifier, |
1020 request.items, | 1156 request.items, |
1021 request.sender), | 1157 request.sender, |
1158 self._isPep(request), | |
1159 request.recipient, | |
1160 ), | |
1022 request) | 1161 request) |
1023 return d | 1162 return d |
1024 | 1163 |
1025 return failure | 1164 return failure |
1165 | |
1166 def _isPep(self, request): | |
1167 try: | |
1168 return request.delegated | |
1169 except AttributeError: | |
1170 return False | |
1026 | 1171 |
1027 def publish(self, request): | 1172 def publish(self, request): |
1028 d = self.backend.publish(request.nodeIdentifier, | 1173 d = self.backend.publish(request.nodeIdentifier, |
1029 request.items, | 1174 request.items, |
1030 request.sender) | 1175 request.sender, |
1176 self._isPep(request), | |
1177 request.recipient) | |
1031 d.addErrback(self._publish_errb, request) | 1178 d.addErrback(self._publish_errb, request) |
1032 return d.addErrback(self._mapErrors) | 1179 return d.addErrback(self._mapErrors) |
1033 | 1180 |
1034 | 1181 |
1035 def subscribe(self, request): | 1182 def subscribe(self, request): |
1036 d = self.backend.subscribe(request.nodeIdentifier, | 1183 d = self.backend.subscribe(request.nodeIdentifier, |
1037 request.subscriber, | 1184 request.subscriber, |
1038 request.sender) | 1185 request.sender, |
1186 self._isPep(request), | |
1187 request.recipient) | |
1039 return d.addErrback(self._mapErrors) | 1188 return d.addErrback(self._mapErrors) |
1040 | 1189 |
1041 | 1190 |
1042 def unsubscribe(self, request): | 1191 def unsubscribe(self, request): |
1043 d = self.backend.unsubscribe(request.nodeIdentifier, | 1192 d = self.backend.unsubscribe(request.nodeIdentifier, |
1044 request.subscriber, | 1193 request.subscriber, |
1045 request.sender) | 1194 request.sender, |
1195 self._isPep(request), | |
1196 request.recipient) | |
1046 return d.addErrback(self._mapErrors) | 1197 return d.addErrback(self._mapErrors) |
1047 | 1198 |
1048 | 1199 |
1049 def subscriptions(self, request): | 1200 def subscriptions(self, request): |
1050 d = self.backend.getSubscriptions(request.sender) | 1201 d = self.backend.getSubscriptions(self._isPep(request), |
1202 request.sender) | |
1051 return d.addErrback(self._mapErrors) | 1203 return d.addErrback(self._mapErrors) |
1052 | 1204 |
1053 | 1205 |
1054 def affiliations(self, request): | 1206 def affiliations(self, request): |
1055 d = self.backend.getAffiliations(request.sender) | 1207 d = self.backend.getAffiliations(self._isPep(request), |
1208 request.sender) | |
1056 return d.addErrback(self._mapErrors) | 1209 return d.addErrback(self._mapErrors) |
1057 | 1210 |
1058 | 1211 |
1059 def create(self, request): | 1212 def create(self, request): |
1060 d = self.backend.createNode(request.nodeIdentifier, | 1213 d = self.backend.createNode(request.nodeIdentifier, |
1061 request.sender, request.options) | 1214 request.sender, request.options, |
1215 self._isPep(request), | |
1216 request.recipient) | |
1062 return d.addErrback(self._mapErrors) | 1217 return d.addErrback(self._mapErrors) |
1063 | 1218 |
1064 | 1219 |
1065 def default(self, request): | 1220 def default(self, request): |
1066 d = self.backend.getDefaultConfiguration(request.nodeType) | 1221 d = self.backend.getDefaultConfiguration(request.nodeType, |
1222 self._isPep(request), | |
1223 request.sender) | |
1067 return d.addErrback(self._mapErrors) | 1224 return d.addErrback(self._mapErrors) |
1068 | 1225 |
1069 | 1226 |
1070 def configureGet(self, request): | 1227 def configureGet(self, request): |
1071 d = self.backend.getNodeConfiguration(request.nodeIdentifier) | 1228 d = self.backend.getNodeConfiguration(request.nodeIdentifier, |
1229 self._isPep(request), | |
1230 request.recipient) | |
1072 return d.addErrback(self._mapErrors) | 1231 return d.addErrback(self._mapErrors) |
1073 | 1232 |
1074 | 1233 |
1075 def configureSet(self, request): | 1234 def configureSet(self, request): |
1076 d = self.backend.setNodeConfiguration(request.nodeIdentifier, | 1235 d = self.backend.setNodeConfiguration(request.nodeIdentifier, |
1077 request.options, | 1236 request.options, |
1078 request.sender) | 1237 request.sender, |
1238 self._isPep(request), | |
1239 request.recipient) | |
1079 return d.addErrback(self._mapErrors) | 1240 return d.addErrback(self._mapErrors) |
1080 | 1241 |
1081 | 1242 |
1082 def items(self, request): | 1243 def items(self, request): |
1083 ext_data = {} | 1244 ext_data = {} |
1084 if const.FLAG_ENABLE_RSM: | 1245 if const.FLAG_ENABLE_RSM and request.rsm is not None: |
1085 ext_data['rsm'] = request.rsm | 1246 ext_data['rsm'] = request.rsm |
1247 try: | |
1248 ext_data['pep'] = request.delegated | |
1249 except AttributeError: | |
1250 pass | |
1086 d = self.backend.getItems(request.nodeIdentifier, | 1251 d = self.backend.getItems(request.nodeIdentifier, |
1087 request.sender, | 1252 request.recipient, |
1088 request.maxItems, | 1253 request.maxItems, |
1089 request.itemIdentifiers, | 1254 request.itemIdentifiers, |
1090 ext_data) | 1255 ext_data) |
1091 return d.addErrback(self._mapErrors) | 1256 return d.addErrback(self._mapErrors) |
1092 | 1257 |
1093 def retract(self, request): | 1258 def retract(self, request): |
1094 d = self.backend.retractItem(request.nodeIdentifier, | 1259 d = self.backend.retractItem(request.nodeIdentifier, |
1095 request.itemIdentifiers, | 1260 request.itemIdentifiers, |
1096 request.sender) | 1261 request.sender, |
1262 request.notify, | |
1263 self._isPep(request), | |
1264 request.recipient) | |
1097 return d.addErrback(self._mapErrors) | 1265 return d.addErrback(self._mapErrors) |
1098 | 1266 |
1099 | 1267 |
1100 def purge(self, request): | 1268 def purge(self, request): |
1101 d = self.backend.purgeNode(request.nodeIdentifier, | 1269 d = self.backend.purgeNode(request.nodeIdentifier, |
1102 request.sender) | 1270 request.sender, |
1271 self._isPep(request), | |
1272 request.recipient) | |
1103 return d.addErrback(self._mapErrors) | 1273 return d.addErrback(self._mapErrors) |
1104 | 1274 |
1105 | 1275 |
1106 def delete(self, request): | 1276 def delete(self, request): |
1107 d = self.backend.deleteNode(request.nodeIdentifier, | 1277 d = self.backend.deleteNode(request.nodeIdentifier, |
1108 request.sender) | 1278 request.sender, |
1279 self._isPep(request), | |
1280 request.recipient) | |
1109 return d.addErrback(self._mapErrors) | 1281 return d.addErrback(self._mapErrors) |
1110 | 1282 |
1111 components.registerAdapter(PubSubResourceFromBackend, | 1283 components.registerAdapter(PubSubResourceFromBackend, |
1112 IBackendService, | 1284 IBackendService, |
1113 IPubSubResource) | 1285 IPubSubResource) |