comparison sat_pubsub/backend.py @ 294:df1edebb0466

PEP implementation, draft (huge patch sorry): /!\ database schema has changed ! /!\ - whole PEP behaviour is not managed yet - if the stanza is delegated, PEP is assumed - fixed potential SQL injection in pgsql_storage - publish notifications manage PEP - added retract notifications (if "notify" attribute is present), with PEP handling - a publisher can't replace an item he didn't publised anymore - /!\ schema has changed, sat_pubsub_update_0_1.sql update it - sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author Goffi <goffi@goffi.org>
date Sun, 16 Aug 2015 01:32:42 +0200
parents 9f612fa19eea
children 6ce33757d21e
comparison
equal deleted inserted replaced
293:b96a4ac25f8b 294:df1edebb0466
68 68
69 from twisted.application import service 69 from twisted.application import service
70 from twisted.python import components, log 70 from twisted.python import components, log
71 from twisted.internet import defer, reactor 71 from twisted.internet import defer, reactor
72 from twisted.words.protocols.jabber.error import StanzaError 72 from twisted.words.protocols.jabber.error import StanzaError
73 from twisted.words.protocols.jabber.jid import JID, InvalidFormat 73 # from twisted.words.protocols.jabber.jid import JID, InvalidFormat
74 from twisted.words.xish import utility 74 from twisted.words.xish import utility
75 75
76 from wokkel import disco, data_form, rsm 76 from wokkel import disco, data_form, rsm
77 from wokkel.iwokkel import IPubSubResource 77 from wokkel.iwokkel import IPubSubResource
78 from wokkel.pubsub import PubSubResource, PubSubError, Subscription 78 from wokkel.pubsub import PubSubResource, PubSubError, Subscription
101 """ 101 """
102 102
103 implements(iidavoll.IBackendService) 103 implements(iidavoll.IBackendService)
104 104
105 nodeOptions = { 105 nodeOptions = {
106 "pubsub#persist_items": 106 const.OPT_PERSIST_ITEMS:
107 {"type": "boolean", 107 {"type": "boolean",
108 "label": "Persist items to storage"}, 108 "label": "Persist items to storage"},
109 "pubsub#deliver_payloads": 109 const.OPT_DELIVER_PAYLOADS:
110 {"type": "boolean", 110 {"type": "boolean",
111 "label": "Deliver payloads with event notifications"}, 111 "label": "Deliver payloads with event notifications"},
112 "pubsub#send_last_published_item": 112 const.OPT_SEND_LAST_PUBLISHED_ITEM:
113 {"type": "list-single", 113 {"type": "list-single",
114 "label": "When to send the last published item", 114 "label": "When to send the last published item",
115 "options": { 115 "options": {
116 "never": "Never", 116 "never": "Never",
117 "on_sub": "When a new subscription is processed"} 117 "on_sub": "When a new subscription is processed"}
179 179
180 def supportsPublishModel(self): 180 def supportsPublishModel(self):
181 return True 181 return True
182 182
183 183
184 def getNodeType(self, nodeIdentifier): 184 def getNodeType(self, nodeIdentifier, pep, recipient=None):
185 d = self.storage.getNode(nodeIdentifier) 185 # FIXME: manage pep and recipient
186 d = self.storage.getNode(nodeIdentifier, pep, recipient)
186 d.addCallback(lambda node: node.getType()) 187 d.addCallback(lambda node: node.getType())
187 return d 188 return d
188 189
189 190
190 def getNodes(self): 191 def getNodes(self, pep):
191 return self.storage.getNodeIds() 192 return self.storage.getNodeIds(pep)
192 193
193 194
194 def getNodeMetaData(self, nodeIdentifier): 195 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None):
195 d = self.storage.getNode(nodeIdentifier) 196 # FIXME: manage pep and recipient
197 d = self.storage.getNode(nodeIdentifier, pep, recipient)
196 d.addCallback(lambda node: node.getMetaData()) 198 d.addCallback(lambda node: node.getMetaData())
197 d.addCallback(self._makeMetaData) 199 d.addCallback(self._makeMetaData)
198 return d 200 return d
199 201
200 202
212 214
213 def _checkAuth(self, node, requestor): 215 def _checkAuth(self, node, requestor):
214 """ Check authorisation of publishing in node for requestor """ 216 """ Check authorisation of publishing in node for requestor """
215 217
216 def check(affiliation): 218 def check(affiliation):
217 d = defer.succeed(node) 219 d = defer.succeed((affiliation, node))
218 configuration = node.getConfiguration() 220 configuration = node.getConfiguration()
219 publish_model = configuration[const.OPT_PUBLISH_MODEL] 221 publish_model = configuration[const.OPT_PUBLISH_MODEL]
220 222 if publish_model == const.VAL_PMODEL_PUBLISHERS:
221 if (publish_model == const.VAL_PMODEL_PUBLISHERS):
222 if affiliation not in ['owner', 'publisher']: 223 if affiliation not in ['owner', 'publisher']:
223 raise error.Forbidden() 224 raise error.Forbidden()
224 elif (publish_model == const.VAL_PMODEL_SUBSCRIBERS): 225 elif publish_model == const.VAL_PMODEL_SUBSCRIBERS:
225 if affiliation not in ['owner', 'publisher']: 226 if affiliation not in ['owner', 'publisher']:
226 # we are in subscribers publish model, we must check that 227 # we are in subscribers publish model, we must check that
227 # the requestor is a subscriber to allow him to publish 228 # the requestor is a subscriber to allow him to publish
228 229
229 def checkSubscription(subscribed): 230 def checkSubscription(subscribed):
230 if not subscribed: 231 if not subscribed:
231 raise error.Forbidden() 232 raise error.Forbidden()
232 return node 233 return (affiliation, node)
233 234
234 d.addCallback(lambda ignore: node.isSubscribed(requestor)) 235 d.addCallback(lambda ignore: node.isSubscribed(requestor))
235 d.addCallback(checkSubscription) 236 d.addCallback(checkSubscription)
236 elif publish_model != const.VAL_PMODEL_OPEN: 237 elif publish_model != const.VAL_PMODEL_OPEN:
237 raise Exception('Unexpected value') # publish_model must be publishers (default), subscribers or open. 238 raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open.
238 239
239 return d 240 return d
240 241
241 d = node.getAffiliation(requestor) 242 d = node.getAffiliation(requestor)
242 d.addCallback(check) 243 d.addCallback(check)
244 245
245 def parseItemConfig(self, item): 246 def parseItemConfig(self, item):
246 """Get and remove item configuration information 247 """Get and remove item configuration information
247 @param item: 248 @param item:
248 """ 249 """
250 # FIXME: dirty ! Need to use elements()
249 item_config = None 251 item_config = None
250 access_model = const.VAL_AMODEL_DEFAULT 252 access_model = const.VAL_AMODEL_DEFAULT
251 for i in range(len(item.children)): 253 for i in range(len(item.children)):
252 elt = item.children[i] 254 elt = item.children[i]
253 if not (elt.uri,elt.name)==(data_form.NS_X_DATA,'x'): 255 if not (elt.uri,elt.name)==(data_form.NS_X_DATA,'x'):
262 access_model = item_config.get(const.OPT_ACCESS_MODEL, const.VAL_AMODEL_DEFAULT) 264 access_model = item_config.get(const.OPT_ACCESS_MODEL, const.VAL_AMODEL_DEFAULT)
263 265
264 return (access_model, item_config) 266 return (access_model, item_config)
265 267
266 268
267 def publish(self, nodeIdentifier, items, requestor): 269 def _checkOverwrite(self, node, itemIdentifiers, publisher):
268 d = self.storage.getNode(nodeIdentifier) 270 """Check that the itemIdentifiers correspond to items published
271 by the current publisher"""
272 def doCheck(item_pub_map):
273 for item_publisher in item_pub_map.iterValues():
274 if item_publisher.userhost() != publisher.userhost():
275 raise error.ItemForbidden()
276
277 d = node.getItemsPublishers(itemIdentifiers)
278 d.addCallback(doCheck)
279 return d
280
281
282 def publish(self, nodeIdentifier, items, requestor, pep, recipient):
283 d = self.storage.getNode(nodeIdentifier, pep, recipient)
269 d.addCallback(self._checkAuth, requestor) 284 d.addCallback(self._checkAuth, requestor)
270 #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster. 285 #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster.
271 #FIXME: in addition, there can be several owners: that is not managed yet 286 #FIXME: in addition, there can be several owners: that is not managed yet
272 d.addCallback(self._doPublish, items, requestor) 287 d.addCallback(self._doPublish, items, requestor, pep, recipient)
273 return d 288 return d
274 289
275 290
276 def _doPublish(self, node, items, requestor): 291 def _doPublish(self, result, items, requestor, pep, recipient):
292 affiliation, node = result
277 if node.nodeType == 'collection': 293 if node.nodeType == 'collection':
278 raise error.NoPublishing() 294 raise error.NoPublishing()
279 295
280 configuration = node.getConfiguration() 296 configuration = node.getConfiguration()
281 persistItems = configuration["pubsub#persist_items"] 297 persistItems = configuration[const.OPT_PERSIST_ITEMS]
282 deliverPayloads = configuration["pubsub#deliver_payloads"] 298 deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS]
283 299
284 if items and not persistItems and not deliverPayloads: 300 if items and not persistItems and not deliverPayloads:
285 raise error.ItemForbidden() 301 raise error.ItemForbidden()
286 elif not items and (persistItems or deliverPayloads): 302 elif not items and (persistItems or deliverPayloads):
287 raise error.ItemRequired() 303 raise error.ItemRequired()
288 304
289 parsed_items = [] 305 items_data = []
306 check_overwrite = False
290 for item in items: 307 for item in items:
291 if persistItems or deliverPayloads: 308 if persistItems or deliverPayloads:
292 item.uri = None 309 item.uri = None
293 item.defaultUri = None 310 item.defaultUri = None
294 if not item.getAttribute("id"): 311 if not item.getAttribute("id"):
295 item["id"] = str(uuid.uuid4()) 312 item["id"] = str(uuid.uuid4())
313 else:
314 check_overwrite = True
296 access_model, item_config = self.parseItemConfig(item) 315 access_model, item_config = self.parseItemConfig(item)
297 parsed_items.append((access_model, item_config, item)) 316 items_data.append((item, access_model, item_config))
298 317
299 if persistItems: 318 if persistItems:
300 d = node.storeItems(parsed_items, requestor) 319 if check_overwrite and affiliation != 'owner':
320 # we don't want a publisher to overwrite the item
321 # of an other publisher
322 d = self._checkOverwrite(node, [item['id'] for item in items if item.getAttribute('id')], requestor)
323 d.addCallback(lambda _: node.storeItems(items_data, requestor))
324 else:
325 d = node.storeItems(items_data, requestor)
301 else: 326 else:
302 d = defer.succeed(None) 327 d = defer.succeed(None)
303 328
304 d.addCallback(self._doNotify, node, parsed_items, 329 d.addCallback(self._doNotify, node, items_data,
305 deliverPayloads) 330 deliverPayloads, pep, recipient)
306 return d 331 return d
307 332
308 333
309 def _doNotify(self, result, node, items, deliverPayloads): 334 def _doNotify(self, result, node, items_data, deliverPayloads, pep, recipient):
310 if items and not deliverPayloads: 335 if items_data and not deliverPayloads:
311 for access_model, item_config, item in items: 336 for access_model, item_config, item in items_data:
312 item.children = [] 337 item.children = []
313 338 self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient},
314 self.dispatch({'items': items, 'node': node},
315 '//event/pubsub/notify') 339 '//event/pubsub/notify')
316 340
317 341
318 def getNotifications(self, nodeIdentifier, items): 342 def getNotifications(self, nodeDbId, items_data):
319 343 """Build a list of subscriber to the node
320 def toNotifications(subscriptions, nodeIdentifier, items): 344
345 subscribers will be associated with subscribed items,
346 and subscription type.
347 """
348
349 def toNotifications(subscriptions, items_data):
321 subsBySubscriber = {} 350 subsBySubscriber = {}
322 for subscription in subscriptions: 351 for subscription in subscriptions:
323 if subscription.options.get('pubsub#subscription_type', 352 if subscription.options.get('pubsub#subscription_type',
324 'items') == 'items': 353 'items') == 'items':
325 subs = subsBySubscriber.setdefault(subscription.subscriber, 354 subs = subsBySubscriber.setdefault(subscription.subscriber,
326 set()) 355 set())
327 subs.add(subscription) 356 subs.add(subscription)
328 357
329 notifications = [(subscriber, subscriptions_, items) 358 notifications = [(subscriber, subscriptions_, items_data)
330 for subscriber, subscriptions_ 359 for subscriber, subscriptions_
331 in subsBySubscriber.iteritems()] 360 in subsBySubscriber.iteritems()]
332 361
333 return notifications 362 return notifications
334 363
335 def rootNotFound(failure): 364 def rootNotFound(failure):
336 failure.trap(error.NodeNotFound) 365 failure.trap(error.NodeNotFound)
337 return [] 366 return []
338 367
339 d1 = self.storage.getNode(nodeIdentifier) 368 d1 = self.storage.getNodeById(nodeDbId)
340 d1.addCallback(lambda node: node.getSubscriptions('subscribed')) 369 d1.addCallback(lambda node: node.getSubscriptions('subscribed'))
341 d2 = self.storage.getNode('') 370 # FIXME: must add root node subscriptions ?
342 d2.addCallback(lambda node: node.getSubscriptions('subscribed')) 371 # d2 = self.storage.getNode('', False) # FIXME: to check
343 d2.addErrback(rootNotFound) 372 # d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
344 d = defer.gatherResults([d1, d2]) 373 # d2.addErrback(rootNotFound)
345 d.addCallback(lambda result: result[0] + result[1]) 374 # d = defer.gatherResults([d1, d2])
346 d.addCallback(toNotifications, nodeIdentifier, items) 375 # d.addCallback(lambda result: result[0] + result[1])
347 return d 376 d1.addCallback(toNotifications, items_data)
348 377 return d1
349 def registerNotifier(self, observerfn, *args, **kwargs): 378
379 def registerPublishNotifier(self, observerfn, *args, **kwargs):
350 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) 380 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
351 381
352 def subscribe(self, nodeIdentifier, subscriber, requestor): 382 def registerRetractNotifier(self, observerfn, *args, **kwargs):
383 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs)
384
385 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
353 subscriberEntity = subscriber.userhostJID() 386 subscriberEntity = subscriber.userhostJID()
354 if subscriberEntity != requestor.userhostJID(): 387 if subscriberEntity != requestor.userhostJID():
355 return defer.fail(error.Forbidden()) 388 return defer.fail(error.Forbidden())
356 389
357 d = self.storage.getNode(nodeIdentifier) 390 d = self.storage.getNode(nodeIdentifier, pep, recipient)
358 d.addCallback(_getAffiliation, subscriberEntity) 391 d.addCallback(_getAffiliation, subscriberEntity)
359 d.addCallback(self._doSubscribe, subscriber) 392 d.addCallback(self._doSubscribe, subscriber)
360 return d 393 return d
361 394
362 395
363 def _doSubscribe(self, result, subscriber): 396 def _doSubscribe(self, result, subscriber):
397 # TODO: implement other access models
364 node, affiliation = result 398 node, affiliation = result
365 #FIXME: must check node's access_model before subscribing
366 399
367 if affiliation == 'outcast': 400 if affiliation == 'outcast':
368 raise error.Forbidden() 401 raise error.Forbidden()
402
403 access_model = node.getAccessModel()
404
405 if access_model != const.VAL_AMODEL_OPEN:
406 raise NotImplementedError
369 407
370 def trapExists(failure): 408 def trapExists(failure):
371 failure.trap(error.SubscriptionExists) 409 failure.trap(error.SubscriptionExists)
372 return False 410 return False
373 411
378 return d 416 return d
379 417
380 d = node.addSubscription(subscriber, 'subscribed', {}) 418 d = node.addSubscription(subscriber, 'subscribed', {})
381 d.addCallbacks(lambda _: True, trapExists) 419 d.addCallbacks(lambda _: True, trapExists)
382 d.addCallback(cb) 420 d.addCallback(cb)
421
383 return d 422 return d
384 423
385 424
386 def _sendLastPublished(self, subscription, node): 425 def _sendLastPublished(self, subscription, node):
387 426
404 d.addErrback(log.err) 443 d.addErrback(log.err)
405 444
406 return subscription 445 return subscription
407 446
408 447
409 def unsubscribe(self, nodeIdentifier, subscriber, requestor): 448 def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
410 if subscriber.userhostJID() != requestor.userhostJID(): 449 if subscriber.userhostJID() != requestor.userhostJID():
411 return defer.fail(error.Forbidden()) 450 return defer.fail(error.Forbidden())
412 451
413 d = self.storage.getNode(nodeIdentifier) 452 d = self.storage.getNode(nodeIdentifier, pep, recipient)
414 d.addCallback(lambda node: node.removeSubscription(subscriber)) 453 d.addCallback(lambda node: node.removeSubscription(subscriber))
415 return d 454 return d
416 455
417 456
418 def getSubscriptions(self, entity): 457 def getSubscriptions(self, entity):
426 465
427 def supportsInstantNodes(self): 466 def supportsInstantNodes(self):
428 return True 467 return True
429 468
430 469
431 def createNode(self, nodeIdentifier, requestor, options = None): 470 def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None):
432 if not nodeIdentifier: 471 if not nodeIdentifier:
433 nodeIdentifier = 'generic/%s' % uuid.uuid4() 472 nodeIdentifier = 'generic/%s' % uuid.uuid4()
434 473
435 if not options: 474 if not options:
436 options = {} 475 options = {}
437 476
438 if self.supportsCreatorCheck(): 477 # if self.supportsCreatorCheck():
439 groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) 478 # groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX)
440 try: 479 # try:
441 nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) 480 # nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier)
442 except InvalidFormat: 481 # except InvalidFormat:
443 is_user_jid = False 482 # is_user_jid = False
444 else: 483 # else:
445 is_user_jid = bool(nodeIdentifierJID.user) 484 # is_user_jid = bool(nodeIdentifierJID.user)
446 485
447 if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): 486 # if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID():
448 #we have an user jid node, but not created by the owner of this jid 487 # #we have an user jid node, but not created by the owner of this jid
449 print "Wrong creator" 488 # print "Wrong creator"
450 raise error.Forbidden() 489 # raise error.Forbidden()
451 490
452 nodeType = 'leaf' 491 nodeType = 'leaf'
453 config = self.storage.getDefaultConfiguration(nodeType) 492 config = self.storage.getDefaultConfiguration(nodeType)
454 config['pubsub#node_type'] = nodeType 493 config['pubsub#node_type'] = nodeType
455 config.update(options) 494 config.update(options)
456 495
457 d = self.storage.createNode(nodeIdentifier, requestor, config) 496 d = self.storage.createNode(nodeIdentifier, requestor, config, pep, recipient)
458 d.addCallback(lambda _: nodeIdentifier) 497 d.addCallback(lambda _: nodeIdentifier)
459 return d 498 return d
460 499
461 500
462 def getDefaultConfiguration(self, nodeType): 501 def getDefaultConfiguration(self, nodeType):
463 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) 502 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
464 return d 503 return d
465 504
466 505
467 def getNodeConfiguration(self, nodeIdentifier): 506 def getNodeConfiguration(self, nodeIdentifier, pep, recipient):
468 if not nodeIdentifier: 507 if not nodeIdentifier:
469 return defer.fail(error.NoRootNode()) 508 return defer.fail(error.NoRootNode())
470 509
471 d = self.storage.getNode(nodeIdentifier) 510 d = self.storage.getNode(nodeIdentifier, pep, recipient)
472 d.addCallback(lambda node: node.getConfiguration()) 511 d.addCallback(lambda node: node.getConfiguration())
473 512
474 return d 513 return d
475 514
476 515
477 def setNodeConfiguration(self, nodeIdentifier, options, requestor): 516 def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient):
478 if not nodeIdentifier: 517 if not nodeIdentifier:
479 return defer.fail(error.NoRootNode()) 518 return defer.fail(error.NoRootNode())
480 519
481 d = self.storage.getNode(nodeIdentifier) 520 d = self.storage.getNode(nodeIdentifier, pep, recipient)
482 d.addCallback(_getAffiliation, requestor) 521 d.addCallback(_getAffiliation, requestor)
483 d.addCallback(self._doSetNodeConfiguration, options) 522 d.addCallback(self._doSetNodeConfiguration, options)
484 return d 523 return d
485 524
486 525
495 534
496 def getAffiliations(self, entity): 535 def getAffiliations(self, entity):
497 return self.storage.getAffiliations(entity) 536 return self.storage.getAffiliations(entity)
498 537
499 538
500 def getItems(self, nodeIdentifier, requestor, maxItems=None, 539 def getItems(self, nodeIdentifier, recipient, maxItems=None,
501 itemIdentifiers=None, ext_data=None): 540 itemIdentifiers=None, ext_data=None):
502 if ext_data is None: 541 if ext_data is None:
503 ext_data = {} 542 ext_data = {}
504 d = self.storage.getNode(nodeIdentifier) 543 d = self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
505 d.addCallback(_getAffiliation, requestor) 544 d.addCallback(_getAffiliation, recipient)
506 d.addCallback(self._doGetItems, requestor, maxItems, itemIdentifiers, 545 d.addCallback(self._doGetItems, recipient, maxItems, itemIdentifiers,
507 ext_data) 546 ext_data)
508 return d 547 return d
509 548
510 def checkGroup(self, roster_groups, entity): 549 def checkGroup(self, roster_groups, entity):
511 """Check that entity is authorized and in roster 550 """Check that entity is authorized and in roster
543 if access_model == const.VAL_AMODEL_OPEN: 582 if access_model == const.VAL_AMODEL_OPEN:
544 pass 583 pass
545 elif access_model == const.VAL_AMODEL_ROSTER: 584 elif access_model == const.VAL_AMODEL_ROSTER:
546 form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG) 585 form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG)
547 access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER) 586 access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER)
548 allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list) 587 allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=access_list[const.OPT_ROSTER_GROUPS_ALLOWED])
549 form.addField(access) 588 form.addField(access)
550 form.addField(allowed) 589 form.addField(allowed)
551 item.addChild(form.toElement()) 590 item.addChild(form.toElement())
552 elif access_model == const.VAL_AMODEL_JID: 591 elif access_model == const.VAL_AMODEL_JID:
553 #FIXME: manage jid 592 #FIXME: manage jid
588 return [] 627 return []
589 628
590 if affiliation == 'outcast': 629 if affiliation == 'outcast':
591 raise error.Forbidden() 630 raise error.Forbidden()
592 631
593 access_model = node.getConfiguration()["pubsub#access_model"] 632 access_model = node.getAccessModel()
594 d = node.getNodeOwner() 633 d = node.getNodeOwner()
595 d.addCallback(self.privilege.getRoster) 634 d.addCallback(self.privilege.getRoster)
596 d.addErrback(self._rosterEb) 635 d.addErrback(self._rosterEb)
597 636
598 if access_model == 'open' or affiliation == 'owner': 637 if access_model == const.VAL_AMODEL_OPEN or affiliation == 'owner':
599 d.addCallback(lambda roster: (True, roster)) 638 d.addCallback(lambda roster: (True, roster))
600 d.addCallback(access_checked) 639 d.addCallback(access_checked)
601 elif access_model == 'roster': 640 elif access_model == const.VAL_AMODEL_ROSTER:
602 d.addCallback(self._getNodeGroups,node.nodeIdentifier) 641 d.addCallback(self._getNodeGroups,node.nodeIdentifier)
603 d.addCallback(self.checkGroup, requestor) 642 d.addCallback(self.checkGroup, requestor)
604 d.addCallback(access_checked) 643 d.addCallback(access_checked)
605 644
606 return d 645 return d
648 elts.append(response.toElement()) 687 elts.append(response.toElement())
649 return elts 688 return elts
650 689
651 return defer.DeferredList(d_list).addCallback(render) 690 return defer.DeferredList(d_list).addCallback(render)
652 691
653 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor): 692 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
654 d = self.storage.getNode(nodeIdentifier) 693 d = self.storage.getNode(nodeIdentifier, pep, recipient)
655 d.addCallback(_getAffiliation, requestor) 694 d.addCallback(_getAffiliation, requestor)
656 if const.FLAG_RETRACT_ALLOW_PUBLISHER: 695 # FIXME: to be checked
657 d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor) 696 # if const.FLAG_RETRACT_ALLOW_PUBLISHER:
658 else: 697 # d.addCallback(self._doRetractAllowPublisher, itemIdentifiers, requestor)
659 d.addCallback(self._doRetract, itemIdentifiers) 698 # else:
660 return d 699 # d.addCallback(self._doRetract, itemIdentifiers)
661 700 d.addCallback(self._doRetract, itemIdentifiers, notify, pep, recipient)
662 def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor): 701 return d
663 """This method has been added to allow the publisher 702
664 of an item to retract it, even if he has no affiliation 703 # FIXME: to be checked
665 to that item. For instance, this allows you to delete 704 # def _doRetractAllowPublisher(self, result, itemIdentifiers, requestor):
666 an item you posted in a node of "open" publish model. 705 # """This method has been added to allow the publisher
667 """ 706 # of an item to retract it, even if he has no affiliation
707 # to that item. For instance, this allows you to delete
708 # an item you posted in a node of "open" publish model.
709 # """
710 # node, affiliation = result
711 # if affiliation in ['owner', 'publisher']:
712 # return self._doRetract(result, itemIdentifiers)
713 # d = node.filterItemsWithPublisher(itemIdentifiers, requestor)
714 # def filterCb(filteredItems):
715 # if not filteredItems:
716 # return self._doRetract(result, itemIdentifiers)
717 # # XXX: fake an affiliation that does NOT exist
718 # return self._doRetract((node, 'publisher'), filteredItems)
719 # d.addCallback(filterCb)
720 # return d
721
722 def _doRetract(self, result, itemIdentifiers, notify, pep, recipient):
668 node, affiliation = result 723 node, affiliation = result
669 if affiliation in ['owner', 'publisher']: 724 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
670 return self._doRetract(result, itemIdentifiers)
671 d = node.filterItemsWithPublisher(itemIdentifiers, requestor)
672 def filterCb(filteredItems):
673 if not filteredItems:
674 return self._doRetract(result, itemIdentifiers)
675 # XXX: fake an affiliation that does NOT exist
676 return self._doRetract((node, 'publisher'), filteredItems)
677 d.addCallback(filterCb)
678 return d
679
680 def _doRetract(self, result, itemIdentifiers):
681 node, affiliation = result
682 persistItems = node.getConfiguration()["pubsub#persist_items"]
683 725
684 if affiliation not in ['owner', 'publisher']: 726 if affiliation not in ['owner', 'publisher']:
685 raise error.Forbidden() 727 raise error.Forbidden()
686 728
687 if not persistItems: 729 if not persistItems:
688 raise error.NodeNotPersistent() 730 raise error.NodeNotPersistent()
689 731
690 d = node.removeItems(itemIdentifiers) 732 # we need to get the items before removing them, for the notifications
691 d.addCallback(self._doNotifyRetraction, node.nodeIdentifier) 733
692 return d 734 def removeItems(items_data):
693 735 """Remove the items and keep only actually removed ones in items_data"""
694 736 d = node.removeItems(itemIdentifiers)
695 def _doNotifyRetraction(self, itemIdentifiers, nodeIdentifier): 737 d.addCallback(lambda removed: [item_data for item_data in items_data if item_data[0]["id"] in removed])
696 self.dispatch({'itemIdentifiers': itemIdentifiers, 738 return d
697 'nodeIdentifier': nodeIdentifier }, 739
740 d = node.getItemsById(None, True, itemIdentifiers)
741 d.addCallback(removeItems)
742
743 if notify:
744 d.addCallback(self._doNotifyRetraction, node, pep, recipient)
745 return d
746
747
748 def _doNotifyRetraction(self, items_data, node, pep, recipient):
749 self.dispatch({'items_data': items_data,
750 'node': node,
751 'pep': pep,
752 'recipient': recipient},
698 '//event/pubsub/retract') 753 '//event/pubsub/retract')
699 754
700 755
701 def purgeNode(self, nodeIdentifier, requestor): 756 def purgeNode(self, nodeIdentifier, requestor, pep, recipient):
702 d = self.storage.getNode(nodeIdentifier) 757 d = self.storage.getNode(nodeIdentifier, pep, recipient)
703 d.addCallback(_getAffiliation, requestor) 758 d.addCallback(_getAffiliation, requestor)
704 d.addCallback(self._doPurge) 759 d.addCallback(self._doPurge)
705 return d 760 return d
706 761
707 762
708 def _doPurge(self, result): 763 def _doPurge(self, result):
709 node, affiliation = result 764 node, affiliation = result
710 persistItems = node.getConfiguration()["pubsub#persist_items"] 765 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
711 766
712 if affiliation != 'owner': 767 if affiliation != 'owner':
713 raise error.Forbidden() 768 raise error.Forbidden()
714 769
715 if not persistItems: 770 if not persistItems:
726 781
727 def registerPreDelete(self, preDeleteFn): 782 def registerPreDelete(self, preDeleteFn):
728 self._callbackList.append(preDeleteFn) 783 self._callbackList.append(preDeleteFn)
729 784
730 785
731 def getSubscribers(self, nodeIdentifier): 786 def getSubscribers(self, nodeIdentifier, pep, recipient):
732 def cb(subscriptions): 787 def cb(subscriptions):
733 return [subscription.subscriber for subscription in subscriptions] 788 return [subscription.subscriber for subscription in subscriptions]
734 789
735 d = self.storage.getNode(nodeIdentifier) 790 d = self.storage.getNode(nodeIdentifier, pep, recipient)
736 d.addCallback(lambda node: node.getSubscriptions('subscribed')) 791 d.addCallback(lambda node: node.getSubscriptions('subscribed'))
737 d.addCallback(cb) 792 d.addCallback(cb)
738 return d 793 return d
739 794
740 795
741 def deleteNode(self, nodeIdentifier, requestor, redirectURI=None): 796 def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None):
742 d = self.storage.getNode(nodeIdentifier) 797 d = self.storage.getNode(nodeIdentifier, pep, recipient)
743 d.addCallback(_getAffiliation, requestor) 798 d.addCallback(_getAffiliation, requestor)
744 d.addCallback(self._doPreDelete, redirectURI) 799 d.addCallback(self._doPreDelete, redirectURI, pep, recipient)
745 return d 800 return d
746 801
747 802
748 def _doPreDelete(self, result, redirectURI): 803 def _doPreDelete(self, result, redirectURI, pep, recipient):
749 node, affiliation = result 804 node, affiliation = result
750 805
751 if affiliation != 'owner': 806 if affiliation != 'owner':
752 raise error.Forbidden() 807 raise error.Forbidden()
753 808
754 data = {'node': node, 809 data = {'node': node,
755 'redirectURI': redirectURI} 810 'redirectURI': redirectURI}
756 811
757 d = defer.DeferredList([cb(data) 812 d = defer.DeferredList([cb(data, pep, recipient)
758 for cb in self._callbackList], 813 for cb in self._callbackList],
759 consumeErrors=1) 814 consumeErrors=1)
760 d.addCallback(self._doDelete, node.nodeIdentifier) 815 d.addCallback(self._doDelete, node.nodeDbId)
761 816
762 817
763 def _doDelete(self, result, nodeIdentifier): 818 def _doDelete(self, result, nodeDbId):
764 dl = [] 819 dl = []
765 for succeeded, r in result: 820 for succeeded, r in result:
766 if succeeded and r: 821 if succeeded and r:
767 dl.extend(r) 822 dl.extend(r)
768 823
769 d = self.storage.deleteNode(nodeIdentifier) 824 d = self.storage.deleteNodeByDbId(nodeDbId)
770 d.addCallback(self._doNotifyDelete, dl) 825 d.addCallback(self._doNotifyDelete, dl)
771 826
772 return d 827 return d
773 828
774 829
810 error.NodeNotFound: ('item-not-found', None, None), 865 error.NodeNotFound: ('item-not-found', None, None),
811 error.NodeExists: ('conflict', None, None), 866 error.NodeExists: ('conflict', None, None),
812 error.Forbidden: ('forbidden', None, None), 867 error.Forbidden: ('forbidden', None, None),
813 error.NotAuthorized: ('not-authorized', None, None), 868 error.NotAuthorized: ('not-authorized', None, None),
814 error.NotInRoster: ('not-authorized', 'not-in-roster-group', None), 869 error.NotInRoster: ('not-authorized', 'not-in-roster-group', None),
870 error.ItemNotFound: ('item-not-found', None, None),
815 error.ItemForbidden: ('bad-request', 'item-forbidden', None), 871 error.ItemForbidden: ('bad-request', 'item-forbidden', None),
816 error.ItemRequired: ('bad-request', 'item-required', None), 872 error.ItemRequired: ('bad-request', 'item-required', None),
817 error.NoInstantNodes: ('not-acceptable', 873 error.NoInstantNodes: ('not-acceptable',
818 'unsupported', 874 'unsupported',
819 'instant-nodes'), 875 'instant-nodes'),
836 PubSubResource.__init__(self) 892 PubSubResource.__init__(self)
837 893
838 self.backend = backend 894 self.backend = backend
839 self.hideNodes = False 895 self.hideNodes = False
840 896
841 self.backend.registerNotifier(self._notify) 897 self.backend.registerPublishNotifier(self._notifyPublish)
898 self.backend.registerRetractNotifier(self._notifyRetract)
842 self.backend.registerPreDelete(self._preDelete) 899 self.backend.registerPreDelete(self._preDelete)
843 900
844 if self.backend.supportsCreatorCheck(): 901 # FIXME: to be removed, it's not useful anymore as PEP is now used
845 self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to 902 # if self.backend.supportsCreatorCheck():
903 # self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to
846 # a jid in this server) is created by the right jid 904 # a jid in this server) is created by the right jid
847 905
848 if self.backend.supportsAutoCreate(): 906 if self.backend.supportsAutoCreate():
849 self.features.append("auto-create") 907 self.features.append("auto-create")
850 908
864 self.features.append("groupblog") 922 self.features.append("groupblog")
865 923
866 # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples 924 # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples
867 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) 925 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277)
868 926
869 def _notify(self, data): 927 def _notifyPublish(self, data):
870 items = data['items'] 928 items_data = data['items_data']
871 node = data['node'] 929 node = data['node']
872 930 pep = data['pep']
873 def _notifyAllowed(result): 931 recipient = data['recipient']
874 """Check access of subscriber for each item, 932
875 and notify only allowed ones""" 933 def afterPrepare(result):
876 notifications, (owner_jid,roster) = result 934 owner_jid, notifications_filtered = result
877
878 #we filter items not allowed for the subscribers
879 notifications_filtered = []
880
881 for subscriber, subscriptions, _items in notifications:
882 allowed_items = [] #we keep only item which subscriber can access
883
884 for access_model, item_config, item in _items:
885 if access_model == 'open':
886 allowed_items.append(item)
887 elif access_model == 'roster':
888 _subscriber = subscriber.userhostJID()
889 if not _subscriber in roster:
890 continue
891 #the subscriber is known, is he in the right group ?
892 authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED]
893 if roster[_subscriber].groups.intersection(authorized_groups):
894 allowed_items.append(item)
895
896 else: #unknown access_model
897 raise NotImplementedError
898
899 if allowed_items:
900 notifications_filtered.append((subscriber, subscriptions, allowed_items))
901
902 #we notify the owner 935 #we notify the owner
903 #FIXME: check if this comply with XEP-0060 (option needed ?) 936 #FIXME: check if this comply with XEP-0060 (option needed ?)
904 #TODO: item's access model have to be sent back to owner 937 #TODO: item's access model have to be sent back to owner
905 #TODO: same thing for getItems 938 #TODO: same thing for getItems
906 939
908 """ Attach item configuration to this item 941 """ Attach item configuration to this item
909 Used to give item configuration back to node's owner (and *only* to owner) 942 Used to give item configuration back to node's owner (and *only* to owner)
910 """ 943 """
911 #TODO: a test should check that only the owner get the item configuration back 944 #TODO: a test should check that only the owner get the item configuration back
912 945
913 access_model, item_config, item = item_data 946 item, access_model, item_config = item_data
914 new_item = deepcopy(item) 947 new_item = deepcopy(item)
915 if item_config: 948 if item_config:
916 new_item.addChild(item_config.toElement()) 949 new_item.addChild(item_config.toElement())
917 return new_item 950 return new_item
918 951
919 notifications_filtered.append((owner_jid, 952 notifications_filtered.append((owner_jid,
920 set([Subscription(node.nodeIdentifier, 953 set([Subscription(node.nodeIdentifier,
921 owner_jid, 954 owner_jid,
922 'subscribed')]), 955 'subscribed')]),
923 [getFullItem(item_data) for item_data in items])) 956 [getFullItem(item_data) for item_data in items_data]))
924 957
925 return self.pubsubService.notifyPublish( 958 if pep:
926 self.serviceJID, 959 return self.backend.privilege.notifyPublish(
927 node.nodeIdentifier, 960 recipient,
928 notifications_filtered) 961 node.nodeIdentifier,
929 962 notifications_filtered)
930 963
931 if 'subscription' not in data: 964 else:
932 d1 = self.backend.getNotifications(node.nodeIdentifier, items) 965 return self.pubsubService.notifyPublish(
966 self.serviceJID,
967 node.nodeIdentifier,
968 notifications_filtered)
969
970 d = self._prepareNotify(items_data, node, data.get('subscription'))
971 d.addCallback(afterPrepare)
972 return d
973
974 def _notifyRetract(self, data):
975 items_data = data['items_data']
976 node = data['node']
977 pep = data['pep']
978 recipient = data['recipient']
979
980 def afterPrepare(result):
981 owner_jid, notifications_filtered = result
982 #we add the owner
983
984 notifications_filtered.append((owner_jid,
985 set([Subscription(node.nodeIdentifier,
986 owner_jid,
987 'subscribed')]),
988 [item for item, _, _ in items_data]))
989
990 if pep:
991 return self.backend.privilege.notifyRetract(
992 recipient,
993 node.nodeIdentifier,
994 notifications_filtered)
995
996 else:
997 return self.pubsubService.notifyRetract(
998 self.serviceJID,
999 node.nodeIdentifier,
1000 notifications_filtered)
1001
1002 d = self._prepareNotify(items_data, node, data.get('subscription'))
1003 d.addCallback(afterPrepare)
1004 return d
1005
1006
1007 def _prepareNotify(self, items_data, node, subscription=None):
1008 """Do a bunch of permissions check and filter notifications
1009
1010 The owner is not added to these notifications,
1011 it must be called by the calling method
1012 @param items_data(tuple): must contain:
1013 - item (domish.Element)
1014 - access_model (unicode)
1015 - access_list (dict as returned getItemsById, or item_config)
1016 @param node(LeafNode): node hosting the items
1017 @param subscription(pubsub.Subscription, None): TODO
1018
1019 @return (tuple): will contain:
1020 - notifications_filtered
1021 - node_owner_jid
1022 - items_data
1023 """
1024
1025 def filterNotifications(result):
1026 """Check access of subscriber for each item, and keep only allowed ones"""
1027 notifications, (owner_jid,roster) = result
1028
1029 #we filter items not allowed for the subscribers
1030 notifications_filtered = []
1031
1032 for subscriber, subscriptions, _items_data in notifications:
1033 if subscriber == owner_jid:
1034 # as notification is always sent to owner,
1035 # we ignore owner if he is here
1036 continue
1037 allowed_items = [] #we keep only item which subscriber can access
1038
1039 for item, access_model, access_list in _items_data:
1040 if access_model == const.VAL_AMODEL_OPEN:
1041 allowed_items.append(item)
1042 elif access_model == const.VAL_AMODEL_ROSTER:
1043 _subscriber = subscriber.userhostJID()
1044 if not _subscriber in roster:
1045 continue
1046 #the subscriber is known, is he in the right group ?
1047 authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED]
1048 if roster[_subscriber].groups.intersection(authorized_groups):
1049 allowed_items.append(item)
1050
1051 else: #unknown access_model
1052 raise NotImplementedError
1053
1054 if allowed_items:
1055 notifications_filtered.append((subscriber, subscriptions, allowed_items))
1056 return (owner_jid, notifications_filtered)
1057
1058
1059 if subscription is None:
1060 d1 = self.backend.getNotifications(node.nodeDbId, items_data)
933 else: 1061 else:
934 subscription = data['subscription']
935 d1 = defer.succeed([(subscription.subscriber, [subscription], 1062 d1 = defer.succeed([(subscription.subscriber, [subscription],
936 items)]) 1063 items_data)])
937 1064
938 def _got_owner(owner_jid): 1065 def _got_owner(owner_jid):
939 #return a tuple with owner_jid and roster 1066 #return a tuple with owner_jid and roster
1067 def rosterEb(failure):
1068 log.msg("Error while getting roster: {}".format(failure.value))
1069 return (owner_jid, {})
1070
940 d = self.backend.privilege.getRoster(owner_jid) 1071 d = self.backend.privilege.getRoster(owner_jid)
941 d.addErrback(self._rosterEb) 1072 d.addErrback(rosterEb)
942 d.addCallback(lambda roster: (owner_jid,roster)) 1073 d.addCallback(lambda roster: (owner_jid,roster))
1074 return d
943 1075
944 d2 = node.getNodeOwner() 1076 d2 = node.getNodeOwner()
945 d2.addCallback(_got_owner) 1077 d2.addCallback(_got_owner)
946
947 d = defer.gatherResults([d1, d2]) 1078 d = defer.gatherResults([d1, d2])
948 d.addCallback(_notifyAllowed) 1079 d.addCallback(filterNotifications)
949 1080 return d
950 def _preDelete(self, data): 1081
1082 def _preDelete(self, data, pep, recipient):
951 nodeIdentifier = data['node'].nodeIdentifier 1083 nodeIdentifier = data['node'].nodeIdentifier
952 redirectURI = data.get('redirectURI', None) 1084 redirectURI = data.get('redirectURI', None)
953 d = self.backend.getSubscribers(nodeIdentifier) 1085 d = self.backend.getSubscribers(nodeIdentifier, pep, recipient)
954 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( 1086 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
955 self.serviceJID, 1087 self.serviceJID,
956 nodeIdentifier, 1088 nodeIdentifier,
957 subscribers, 1089 subscribers,
958 redirectURI)) 1090 redirectURI))
970 else: 1102 else:
971 exc = StanzaError(condition, text=msg) 1103 exc = StanzaError(condition, text=msg)
972 1104
973 raise exc 1105 raise exc
974 1106
975 def getInfo(self, requestor, service, nodeIdentifier): 1107 def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None):
1108 return [] # FIXME: disabled for now, need to manage PEP
976 if not requestor.resource: 1109 if not requestor.resource:
977 # this avoid error when getting a disco request from server during namespace delegation 1110 # this avoid error when getting a disco request from server during namespace delegation
978 return [] 1111 return []
979 info = {} 1112 info = {}
980 1113
998 d.addErrback(trapNotFound) 1131 d.addErrback(trapNotFound)
999 d.addErrback(self._mapErrors) 1132 d.addErrback(self._mapErrors)
1000 return d 1133 return d
1001 1134
1002 1135
1003 def getNodes(self, requestor, service, nodeIdentifier): 1136 def getNodes(self, requestor, service, nodeIdentifier, pep=None):
1137 return defer.succeed([]) # FIXME: disabled for now, need to manage PEP
1004 if service.resource: 1138 if service.resource:
1005 return defer.succeed([]) 1139 return defer.succeed([])
1006 d = self.backend.getNodes() 1140 d = self.backend.getNodes(pep)
1007 return d.addErrback(self._mapErrors) 1141 return d.addErrback(self._mapErrors)
1008 1142
1009 1143
1010 def getConfigurationOptions(self): 1144 def getConfigurationOptions(self):
1011 return self.backend.nodeOptions 1145 return self.backend.nodeOptions
1012 1146
1013 def _publish_errb(self, failure, request): 1147 def _publish_errb(self, failure, request):
1014 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): 1148 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
1015 print "Auto-creating node %s" % (request.nodeIdentifier,) 1149 print "Auto-creating node %s" % (request.nodeIdentifier,)
1016 d = self.backend.createNode(request.nodeIdentifier, 1150 d = self.backend.createNode(request.nodeIdentifier,
1017 request.sender) 1151 request.sender,
1152 pep=self._isPep(request),
1153 recipient=request.recipient)
1018 d.addCallback(lambda ignore, 1154 d.addCallback(lambda ignore,
1019 request: self.backend.publish(request.nodeIdentifier, 1155 request: self.backend.publish(request.nodeIdentifier,
1020 request.items, 1156 request.items,
1021 request.sender), 1157 request.sender,
1158 self._isPep(request),
1159 request.recipient,
1160 ),
1022 request) 1161 request)
1023 return d 1162 return d
1024 1163
1025 return failure 1164 return failure
1165
1166 def _isPep(self, request):
1167 try:
1168 return request.delegated
1169 except AttributeError:
1170 return False
1026 1171
1027 def publish(self, request): 1172 def publish(self, request):
1028 d = self.backend.publish(request.nodeIdentifier, 1173 d = self.backend.publish(request.nodeIdentifier,
1029 request.items, 1174 request.items,
1030 request.sender) 1175 request.sender,
1176 self._isPep(request),
1177 request.recipient)
1031 d.addErrback(self._publish_errb, request) 1178 d.addErrback(self._publish_errb, request)
1032 return d.addErrback(self._mapErrors) 1179 return d.addErrback(self._mapErrors)
1033 1180
1034 1181
1035 def subscribe(self, request): 1182 def subscribe(self, request):
1036 d = self.backend.subscribe(request.nodeIdentifier, 1183 d = self.backend.subscribe(request.nodeIdentifier,
1037 request.subscriber, 1184 request.subscriber,
1038 request.sender) 1185 request.sender,
1186 self._isPep(request),
1187 request.recipient)
1039 return d.addErrback(self._mapErrors) 1188 return d.addErrback(self._mapErrors)
1040 1189
1041 1190
1042 def unsubscribe(self, request): 1191 def unsubscribe(self, request):
1043 d = self.backend.unsubscribe(request.nodeIdentifier, 1192 d = self.backend.unsubscribe(request.nodeIdentifier,
1044 request.subscriber, 1193 request.subscriber,
1045 request.sender) 1194 request.sender,
1195 self._isPep(request),
1196 request.recipient)
1046 return d.addErrback(self._mapErrors) 1197 return d.addErrback(self._mapErrors)
1047 1198
1048 1199
1049 def subscriptions(self, request): 1200 def subscriptions(self, request):
1050 d = self.backend.getSubscriptions(request.sender) 1201 d = self.backend.getSubscriptions(self._isPep(request),
1202 request.sender)
1051 return d.addErrback(self._mapErrors) 1203 return d.addErrback(self._mapErrors)
1052 1204
1053 1205
1054 def affiliations(self, request): 1206 def affiliations(self, request):
1055 d = self.backend.getAffiliations(request.sender) 1207 d = self.backend.getAffiliations(self._isPep(request),
1208 request.sender)
1056 return d.addErrback(self._mapErrors) 1209 return d.addErrback(self._mapErrors)
1057 1210
1058 1211
1059 def create(self, request): 1212 def create(self, request):
1060 d = self.backend.createNode(request.nodeIdentifier, 1213 d = self.backend.createNode(request.nodeIdentifier,
1061 request.sender, request.options) 1214 request.sender, request.options,
1215 self._isPep(request),
1216 request.recipient)
1062 return d.addErrback(self._mapErrors) 1217 return d.addErrback(self._mapErrors)
1063 1218
1064 1219
1065 def default(self, request): 1220 def default(self, request):
1066 d = self.backend.getDefaultConfiguration(request.nodeType) 1221 d = self.backend.getDefaultConfiguration(request.nodeType,
1222 self._isPep(request),
1223 request.sender)
1067 return d.addErrback(self._mapErrors) 1224 return d.addErrback(self._mapErrors)
1068 1225
1069 1226
1070 def configureGet(self, request): 1227 def configureGet(self, request):
1071 d = self.backend.getNodeConfiguration(request.nodeIdentifier) 1228 d = self.backend.getNodeConfiguration(request.nodeIdentifier,
1229 self._isPep(request),
1230 request.recipient)
1072 return d.addErrback(self._mapErrors) 1231 return d.addErrback(self._mapErrors)
1073 1232
1074 1233
1075 def configureSet(self, request): 1234 def configureSet(self, request):
1076 d = self.backend.setNodeConfiguration(request.nodeIdentifier, 1235 d = self.backend.setNodeConfiguration(request.nodeIdentifier,
1077 request.options, 1236 request.options,
1078 request.sender) 1237 request.sender,
1238 self._isPep(request),
1239 request.recipient)
1079 return d.addErrback(self._mapErrors) 1240 return d.addErrback(self._mapErrors)
1080 1241
1081 1242
1082 def items(self, request): 1243 def items(self, request):
1083 ext_data = {} 1244 ext_data = {}
1084 if const.FLAG_ENABLE_RSM: 1245 if const.FLAG_ENABLE_RSM and request.rsm is not None:
1085 ext_data['rsm'] = request.rsm 1246 ext_data['rsm'] = request.rsm
1247 try:
1248 ext_data['pep'] = request.delegated
1249 except AttributeError:
1250 pass
1086 d = self.backend.getItems(request.nodeIdentifier, 1251 d = self.backend.getItems(request.nodeIdentifier,
1087 request.sender, 1252 request.recipient,
1088 request.maxItems, 1253 request.maxItems,
1089 request.itemIdentifiers, 1254 request.itemIdentifiers,
1090 ext_data) 1255 ext_data)
1091 return d.addErrback(self._mapErrors) 1256 return d.addErrback(self._mapErrors)
1092 1257
1093 def retract(self, request): 1258 def retract(self, request):
1094 d = self.backend.retractItem(request.nodeIdentifier, 1259 d = self.backend.retractItem(request.nodeIdentifier,
1095 request.itemIdentifiers, 1260 request.itemIdentifiers,
1096 request.sender) 1261 request.sender,
1262 request.notify,
1263 self._isPep(request),
1264 request.recipient)
1097 return d.addErrback(self._mapErrors) 1265 return d.addErrback(self._mapErrors)
1098 1266
1099 1267
1100 def purge(self, request): 1268 def purge(self, request):
1101 d = self.backend.purgeNode(request.nodeIdentifier, 1269 d = self.backend.purgeNode(request.nodeIdentifier,
1102 request.sender) 1270 request.sender,
1271 self._isPep(request),
1272 request.recipient)
1103 return d.addErrback(self._mapErrors) 1273 return d.addErrback(self._mapErrors)
1104 1274
1105 1275
1106 def delete(self, request): 1276 def delete(self, request):
1107 d = self.backend.deleteNode(request.nodeIdentifier, 1277 d = self.backend.deleteNode(request.nodeIdentifier,
1108 request.sender) 1278 request.sender,
1279 self._isPep(request),
1280 request.recipient)
1109 return d.addErrback(self._mapErrors) 1281 return d.addErrback(self._mapErrors)
1110 1282
1111 components.registerAdapter(PubSubResourceFromBackend, 1283 components.registerAdapter(PubSubResourceFromBackend,
1112 IBackendService, 1284 IBackendService,
1113 IPubSubResource) 1285 IPubSubResource)