comparison src/backend.py @ 369:dabee42494ac

config file + cleaning: - SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir). Its options must be in the "pubsub" section - options on command line override config options - removed tap and http files which are not used anymore - changed directory structure to put source in src, to be coherent with SàT and Libervia - changed options name, db* become db_*, secret become xmpp_pwd - an exception is raised if jid or xmpp_pwd is are not configured
author Goffi <goffi@goffi.org>
date Fri, 02 Mar 2018 12:59:38 +0100
parents sat_pubsub/backend.py@618a92080812
children 9a787881b824
comparison
equal deleted inserted replaced
368:618a92080812 369:dabee42494ac
1 #!/usr/bin/python
2 #-*- coding: utf-8 -*-
3 #
4 # Copyright (c) 2012-2018 Jérôme Poisson
5 # Copyright (c) 2013-2016 Adrien Cossa
6 # Copyright (c) 2003-2011 Ralph Meijer
7
8
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Affero General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU Affero General Public License for more details.
18
19 # You should have received a copy of the GNU Affero General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 # --
22
23 # This program is based on Idavoll (http://idavoll.ik.nu/),
24 # originaly written by Ralph Meijer (http://ralphm.net/blog/)
25 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original
26 # license.
27
28 # --
29
30 # Here is a copy of the original license:
31
32 # Copyright (c) 2003-2011 Ralph Meijer
33
34 # Permission is hereby granted, free of charge, to any person obtaining
35 # a copy of this software and associated documentation files (the
36 # "Software"), to deal in the Software without restriction, including
37 # without limitation the rights to use, copy, modify, merge, publish,
38 # distribute, sublicense, and/or sell copies of the Software, and to
39 # permit persons to whom the Software is furnished to do so, subject to
40 # the following conditions:
41
42 # The above copyright notice and this permission notice shall be
43 # included in all copies or substantial portions of the Software.
44
45 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
46 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
47 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
48 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
49 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
50 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
52
53
54 """
55 Generic publish-subscribe backend.
56
57 This module implements a generic publish-subscribe backend service with
58 business logic as per
59 U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>} that interacts with
60 a given storage facility. It also provides an adapter from the XMPP
61 publish-subscribe protocol.
62 """
63
64 import uuid
65
66 from zope.interface import implements
67
68 from twisted.application import service
69 from twisted.python import components, log
70 from twisted.internet import defer, reactor
71 from twisted.words.protocols.jabber.error import StanzaError
72 # from twisted.words.protocols.jabber.jid import JID, InvalidFormat
73 from twisted.words.xish import utility
74
75 from wokkel import disco
76 from wokkel import data_form
77 from wokkel import rsm
78 from wokkel import iwokkel
79 from wokkel import pubsub
80
81 from sat_pubsub import error
82 from sat_pubsub import iidavoll
83 from sat_pubsub import const
84 from sat_pubsub import container
85
86 from copy import deepcopy
87
88
89 def _getAffiliation(node, entity):
90 d = node.getAffiliation(entity)
91 d.addCallback(lambda affiliation: (node, affiliation))
92 return d
93
94
95 class BackendService(service.Service, utility.EventDispatcher):
96 """
97 Generic publish-subscribe backend service.
98
99 @cvar nodeOptions: Node configuration form as a mapping from the field
100 name to a dictionary that holds the field's type, label
101 and possible options to choose from.
102 @type nodeOptions: C{dict}.
103 @cvar defaultConfig: The default node configuration.
104 """
105
106 implements(iidavoll.IBackendService)
107
108 nodeOptions = {
109 const.OPT_PERSIST_ITEMS:
110 {"type": "boolean",
111 "label": "Persist items to storage"},
112 const.OPT_DELIVER_PAYLOADS:
113 {"type": "boolean",
114 "label": "Deliver payloads with event notifications"},
115 const.OPT_SEND_LAST_PUBLISHED_ITEM:
116 {"type": "list-single",
117 "label": "When to send the last published item",
118 "options": {
119 "never": "Never",
120 "on_sub": "When a new subscription is processed"}
121 },
122 const.OPT_ACCESS_MODEL:
123 {"type": "list-single",
124 "label": "Who can subscribe to this node",
125 "options": {
126 const.VAL_AMODEL_OPEN: "Public node",
127 const.VAL_AMODEL_PRESENCE: "Node restricted to entites subscribed to owner presence",
128 const.VAL_AMODEL_PUBLISHER_ROSTER: "Node restricted to some groups of publisher's roster",
129 const.VAL_AMODEL_WHITELIST: "Node restricted to some jids",
130 }
131 },
132 const.OPT_ROSTER_GROUPS_ALLOWED:
133 {"type": "list-multi",
134 "label": "Groups of the roster allowed to access the node",
135 },
136 const.OPT_PUBLISH_MODEL:
137 {"type": "list-single",
138 "label": "Who can publish to this node",
139 "options": {
140 const.VAL_PMODEL_OPEN: "Everybody can publish",
141 const.VAL_PMODEL_PUBLISHERS: "Only owner and publishers can publish",
142 const.VAL_PMODEL_SUBSCRIBERS: "Everybody which subscribed to the node",
143 }
144 },
145 const.OPT_SERIAL_IDS:
146 {"type": "boolean",
147 "label": "Use serial ids"},
148 }
149
150 subscriptionOptions = {
151 "pubsub#subscription_type":
152 {"type": "list-single",
153 "options": {
154 "items": "Receive notification of new items only",
155 "nodes": "Receive notification of new nodes only"}
156 },
157 "pubsub#subscription_depth":
158 {"type": "list-single",
159 "options": {
160 "1": "Receive notification from direct child nodes only",
161 "all": "Receive notification from all descendent nodes"}
162 },
163 }
164
165 def __init__(self, storage):
166 utility.EventDispatcher.__init__(self)
167 self.storage = storage
168 self._callbackList = []
169
170 def supportsPublishOptions(self):
171 return True
172 def supportsPublisherAffiliation(self):
173 return True
174
175 def supportsGroupBlog(self):
176 return True
177
178 def supportsOutcastAffiliation(self):
179 return True
180
181 def supportsPersistentItems(self):
182 return True
183
184 def supportsPublishModel(self):
185 return True
186
187 def getNodeType(self, nodeIdentifier, pep, recipient=None):
188 # FIXME: manage pep and recipient
189 d = self.storage.getNode(nodeIdentifier, pep, recipient)
190 d.addCallback(lambda node: node.getType())
191 return d
192
193 def _getNodesIds(self, subscribed, pep, recipient):
194 # TODO: filter whitelist nodes
195 # TODO: handle publisher-roster (should probably be renamed to owner-roster for nodes)
196 if not subscribed:
197 allowed_accesses = {'open', 'whitelist'}
198 else:
199 allowed_accesses = {'open', 'presence', 'whitelist'}
200 return self.storage.getNodeIds(pep, recipient, allowed_accesses)
201
202 def getNodes(self, requestor, pep, recipient):
203 if pep:
204 d = self.privilege.isSubscribedFrom(requestor, recipient)
205 d.addCallback(self._getNodesIds, pep, recipient)
206 return d
207 return self.storage.getNodeIds(pep, recipient)
208
209 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None):
210 # FIXME: manage pep and recipient
211 d = self.storage.getNode(nodeIdentifier, pep, recipient)
212 d.addCallback(lambda node: node.getMetaData())
213 d.addCallback(self._makeMetaData)
214 return d
215
216 def _makeMetaData(self, metaData):
217 options = []
218 for key, value in metaData.iteritems():
219 if key in self.nodeOptions:
220 option = {"var": key}
221 option.update(self.nodeOptions[key])
222 option["value"] = value
223 options.append(option)
224
225 return options
226
227 def _checkAuth(self, node, requestor):
228 """ Check authorisation of publishing in node for requestor """
229
230 def check(affiliation):
231 d = defer.succeed((affiliation, node))
232 configuration = node.getConfiguration()
233 publish_model = configuration[const.OPT_PUBLISH_MODEL]
234 if publish_model == const.VAL_PMODEL_PUBLISHERS:
235 if affiliation not in ['owner', 'publisher']:
236 raise error.Forbidden()
237 elif publish_model == const.VAL_PMODEL_SUBSCRIBERS:
238 if affiliation not in ['owner', 'publisher']:
239 # we are in subscribers publish model, we must check that
240 # the requestor is a subscriber to allow him to publish
241
242 def checkSubscription(subscribed):
243 if not subscribed:
244 raise error.Forbidden()
245 return (affiliation, node)
246
247 d.addCallback(lambda ignore: node.isSubscribed(requestor))
248 d.addCallback(checkSubscription)
249 elif publish_model != const.VAL_PMODEL_OPEN:
250 raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open.
251
252 return d
253
254 d = node.getAffiliation(requestor)
255 d.addCallback(check)
256 return d
257
258 def parseItemConfig(self, item):
259 """Get and remove item configuration information
260
261 @param item (domish.Element): item to parse
262 @return (tuple[unicode, dict)): (access_model, item_config)
263 """
264 item_config = None
265 access_model = const.VAL_AMODEL_DEFAULT
266 for idx, elt in enumerate(item.elements()):
267 if elt.uri != 'data_form.NS_X_DATA' or elt.name != 'x':
268 continue
269 form = data_form.Form.fromElement(elt)
270 if form.formNamespace == const.NS_ITEM_CONFIG:
271 item_config = form
272 del item.children[idx] #we need to remove the config from item
273 break
274
275 if item_config:
276 access_model = item_config.get(const.OPT_ACCESS_MODEL, const.VAL_AMODEL_DEFAULT)
277 return (access_model, item_config)
278
279 def parseCategories(self, item_elt):
280 """Check if item contain an atom entry, and parse categories if possible
281
282 @param item_elt (domish.Element): item to parse
283 @return (list): list of found categories
284 """
285 categories = []
286 try:
287 entry_elt = item_elt.elements(const.NS_ATOM, "entry").next()
288 except StopIteration:
289 return categories
290
291 for category_elt in entry_elt.elements(const.NS_ATOM, 'category'):
292 category = category_elt.getAttribute('term')
293 if category:
294 categories.append(category)
295
296 return categories
297
298 def enforceSchema(self, item_elt, schema, affiliation):
299 """modifify item according to element, or refuse publishing
300
301 @param item_elt(domish.Element): item to check/modify
302 @param schema(domish.Eement): schema to enfore
303 @param affiliation(unicode): affiliation of the publisher
304 """
305 try:
306 x_elt = next(item_elt.elements(data_form.NS_X_DATA, 'x'))
307 item_form = data_form.Form.fromElement(x_elt)
308 except (StopIteration, data_form.Error):
309 raise pubsub.BadRequest(text="node has a schema but item has no form")
310 else:
311 item_elt.children.remove(x_elt)
312
313 schema_form = data_form.Form.fromElement(schema)
314
315 # we enforce restrictions
316 for field_elt in schema.elements(data_form.NS_X_DATA, 'field'):
317 var = field_elt['var']
318 for restrict_elt in field_elt.elements(const.NS_SCHEMA_RESTRICT, 'restrict'):
319 write_restriction = restrict_elt.attributes.get('write')
320 if write_restriction is not None:
321 if write_restriction == 'owner':
322 if affiliation != 'owner':
323 # write is not allowed on this field, we use default value
324 # we can safely use Field from schema_form because
325 # we have created this instance only for this method
326 try:
327 item_form.removeField(item_form.fields[var])
328 except KeyError:
329 pass
330 item_form.addField(schema_form.fields[var])
331 else:
332 raise StanzaError('feature-not-implemented', text='unknown write restriction {}'.format(write_restriction))
333
334 # we now remove every field which is not in data schema
335 to_remove = set()
336 for item_var, item_field in item_form.fields.iteritems():
337 if item_var not in schema_form.fields:
338 to_remove.add(item_field)
339
340 for field in to_remove:
341 item_form.removeField(field)
342 item_elt.addChild(item_form.toElement())
343
344 def _checkOverwrite(self, node, itemIdentifiers, publisher):
345 """Check that the itemIdentifiers correspond to items published
346 by the current publisher"""
347 def doCheck(item_pub_map):
348 for item_publisher in item_pub_map.itervalues():
349 if item_publisher.userhost() != publisher.userhost():
350 raise error.ItemForbidden()
351
352 d = node.getItemsPublishers(itemIdentifiers)
353 d.addCallback(doCheck)
354 return d
355
356 def publish(self, nodeIdentifier, items, requestor, pep, recipient):
357 d = self.storage.getNode(nodeIdentifier, pep, recipient)
358 d.addCallback(self._checkAuth, requestor)
359 #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster.
360 #FIXME: in addition, there can be several owners: that is not managed yet
361 d.addCallback(self._doPublish, items, requestor, pep, recipient)
362 return d
363
364 @defer.inlineCallbacks
365 def _doPublish(self, result, items, requestor, pep, recipient):
366 affiliation, node = result
367 if node.nodeType == 'collection':
368 raise error.NoPublishing()
369
370 configuration = node.getConfiguration()
371 persistItems = configuration[const.OPT_PERSIST_ITEMS]
372 deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS]
373
374 if items and not persistItems and not deliverPayloads:
375 raise error.ItemForbidden()
376 elif not items and (persistItems or deliverPayloads):
377 raise error.ItemRequired()
378
379 items_data = []
380 check_overwrite = False
381 for item in items:
382 # we enforce publisher (cf XEP-0060 §7.1.2.3)
383 item['publisher'] = requestor.full()
384 if persistItems or deliverPayloads:
385 item.uri = None
386 item.defaultUri = None
387 if not item.getAttribute("id"):
388 item["id"] = yield node.getNextId()
389 new_item = True
390 else:
391 check_overwrite = True
392 new_item = False
393 access_model, item_config = self.parseItemConfig(item)
394 categories = self.parseCategories(item)
395 schema = node.getSchema()
396 if schema is not None:
397 self.enforceSchema(item, schema, affiliation)
398 items_data.append(container.ItemData(item, access_model, item_config, categories, new=new_item))
399
400 if persistItems:
401
402 if check_overwrite and affiliation != 'owner':
403 # we don't want a publisher to overwrite the item
404 # of an other publisher
405 yield self._checkOverwrite(node, [item['id'] for item in items if item.getAttribute('id')], requestor)
406
407 # TODO: check conflict and recalculate max id if serial_ids is set
408 yield node.storeItems(items_data, requestor)
409
410 yield self._doNotify(node, items_data, deliverPayloads, pep, recipient)
411
412 def _doNotify(self, node, items_data, deliverPayloads, pep, recipient):
413 if items_data and not deliverPayloads:
414 for item_data in items_data:
415 item_data.item.children = []
416 self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient},
417 '//event/pubsub/notify')
418
419 def getNotifications(self, node, items_data):
420 """Build a list of subscriber to the node
421
422 subscribers will be associated with subscribed items,
423 and subscription type.
424 """
425
426 def toNotifications(subscriptions, items_data):
427 subsBySubscriber = {}
428 for subscription in subscriptions:
429 if subscription.options.get('pubsub#subscription_type',
430 'items') == 'items':
431 subs = subsBySubscriber.setdefault(subscription.subscriber,
432 set())
433 subs.add(subscription)
434
435 notifications = [(subscriber, subscriptions_, items_data)
436 for subscriber, subscriptions_
437 in subsBySubscriber.iteritems()]
438
439 return notifications
440
441 def rootNotFound(failure):
442 failure.trap(error.NodeNotFound)
443 return []
444
445 d1 = node.getSubscriptions('subscribed')
446 # FIXME: must add root node subscriptions ?
447 # d2 = self.storage.getNode('', False) # FIXME: to check
448 # d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
449 # d2.addErrback(rootNotFound)
450 # d = defer.gatherResults([d1, d2])
451 # d.addCallback(lambda result: result[0] + result[1])
452 d1.addCallback(toNotifications, items_data)
453 return d1
454
455 def registerPublishNotifier(self, observerfn, *args, **kwargs):
456 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
457
458 def registerRetractNotifier(self, observerfn, *args, **kwargs):
459 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs)
460
461 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
462 subscriberEntity = subscriber.userhostJID()
463 if subscriberEntity != requestor.userhostJID():
464 return defer.fail(error.Forbidden())
465
466 d = self.storage.getNode(nodeIdentifier, pep, recipient)
467 d.addCallback(_getAffiliation, subscriberEntity)
468 d.addCallback(self._doSubscribe, subscriber, pep, recipient)
469 return d
470
471 def _doSubscribe(self, result, subscriber, pep, recipient):
472 node, affiliation = result
473
474 if affiliation == 'outcast':
475 raise error.Forbidden()
476
477 access_model = node.getAccessModel()
478
479 if access_model == const.VAL_AMODEL_OPEN:
480 d = defer.succeed(None)
481 elif access_model == const.VAL_AMODEL_PRESENCE:
482 d = self.checkPresenceSubscription(node, subscriber)
483 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
484 d = self.checkRosterGroups(node, subscriber)
485 elif access_model == const.VAL_AMODEL_WHITELIST:
486 d = self.checkNodeAffiliations(node, subscriber)
487 else:
488 raise NotImplementedError
489
490 def trapExists(failure):
491 failure.trap(error.SubscriptionExists)
492 return False
493
494 def cb(sendLast):
495 d = node.getSubscription(subscriber)
496 if sendLast:
497 d.addCallback(self._sendLastPublished, node, pep, recipient)
498 return d
499
500 d.addCallback(lambda _: node.addSubscription(subscriber, 'subscribed', {}))
501 d.addCallbacks(lambda _: True, trapExists)
502 d.addCallback(cb)
503
504 return d
505
506 def _sendLastPublished(self, subscription, node, pep, recipient):
507
508 def notifyItem(items_data):
509 if items_data:
510 reactor.callLater(0, self.dispatch,
511 {'items_data': items_data,
512 'node': node,
513 'pep': pep,
514 'recipient': recipient,
515 'subscription': subscription,
516 },
517 '//event/pubsub/notify')
518
519 config = node.getConfiguration()
520 sendLastPublished = config.get('pubsub#send_last_published_item',
521 'never')
522 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
523 entity = subscription.subscriber.userhostJID()
524 d = self.getItemsData(node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep})
525 d.addCallback(notifyItem)
526 d.addErrback(log.err)
527
528 return subscription
529
530 def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
531 if subscriber.userhostJID() != requestor.userhostJID():
532 return defer.fail(error.Forbidden())
533
534 d = self.storage.getNode(nodeIdentifier, pep, recipient)
535 d.addCallback(lambda node: node.removeSubscription(subscriber))
536 return d
537
538 def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient):
539 """retrieve subscriptions of an entity
540
541 @param requestor(jid.JID): entity who want to check subscriptions
542 @param nodeIdentifier(unicode, None): identifier of the node
543 node to get all subscriptions of a service
544 @param pep(bool): True if it's a PEP request
545 @param recipient(jid.JID, None): recipient of the PEP request
546 """
547 return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient)
548
549 def supportsAutoCreate(self):
550 return True
551
552 def supportsCreatorCheck(self):
553 return True
554
555 def supportsInstantNodes(self):
556 return True
557
558 def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None):
559 if not nodeIdentifier:
560 nodeIdentifier = 'generic/%s' % uuid.uuid4()
561
562 if not options:
563 options = {}
564
565 # if self.supportsCreatorCheck():
566 # groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX)
567 # try:
568 # nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier)
569 # except InvalidFormat:
570 # is_user_jid = False
571 # else:
572 # is_user_jid = bool(nodeIdentifierJID.user)
573
574 # if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID():
575 # #we have an user jid node, but not created by the owner of this jid
576 # print "Wrong creator"
577 # raise error.Forbidden()
578
579 nodeType = 'leaf'
580 config = self.storage.getDefaultConfiguration(nodeType)
581 config['pubsub#node_type'] = nodeType
582 config.update(options)
583
584 # TODO: handle schema on creation
585 d = self.storage.createNode(nodeIdentifier, requestor, config, None, pep, recipient)
586 d.addCallback(lambda _: nodeIdentifier)
587 return d
588
589 def getDefaultConfiguration(self, nodeType):
590 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
591 return d
592
593 def getNodeConfiguration(self, nodeIdentifier, pep, recipient):
594 if not nodeIdentifier:
595 return defer.fail(error.NoRootNode())
596
597 d = self.storage.getNode(nodeIdentifier, pep, recipient)
598 d.addCallback(lambda node: node.getConfiguration())
599
600 return d
601
602 def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient):
603 if not nodeIdentifier:
604 return defer.fail(error.NoRootNode())
605
606 d = self.storage.getNode(nodeIdentifier, pep, recipient)
607 d.addCallback(_getAffiliation, requestor)
608 d.addCallback(self._doSetNodeConfiguration, options)
609 return d
610
611 def _doSetNodeConfiguration(self, result, options):
612 node, affiliation = result
613
614 if affiliation != 'owner':
615 raise error.Forbidden()
616
617 return node.setConfiguration(options)
618
619 def getNodeSchema(self, nodeIdentifier, pep, recipient):
620 if not nodeIdentifier:
621 return defer.fail(error.NoRootNode())
622
623 d = self.storage.getNode(nodeIdentifier, pep, recipient)
624 d.addCallback(lambda node: node.getSchema())
625
626 return d
627
628 def setNodeSchema(self, nodeIdentifier, schema, requestor, pep, recipient):
629 """set or remove Schema of a node
630
631 @param nodeIdentifier(unicode): identifier of the pubusb node
632 @param schema(domish.Element, None): schema to set
633 None to remove schema
634 @param requestor(jid.JID): entity doing the request
635 @param pep(bool): True if it's a PEP request
636 @param recipient(jid.JID, None): recipient of the PEP request
637 """
638 if not nodeIdentifier:
639 return defer.fail(error.NoRootNode())
640
641 d = self.storage.getNode(nodeIdentifier, pep, recipient)
642 d.addCallback(_getAffiliation, requestor)
643 d.addCallback(self._doSetNodeSchema, schema)
644 return d
645
646 def _doSetNodeSchema(self, result, schema):
647 node, affiliation = result
648
649 if affiliation != 'owner':
650 raise error.Forbidden()
651
652 return node.setSchema(schema)
653
654 def getAffiliations(self, entity, nodeIdentifier, pep, recipient):
655 return self.storage.getAffiliations(entity, nodeIdentifier, pep, recipient)
656
657 def getAffiliationsOwner(self, nodeIdentifier, requestor, pep, recipient):
658 d = self.storage.getNode(nodeIdentifier, pep, recipient)
659 d.addCallback(_getAffiliation, requestor)
660 d.addCallback(self._doGetAffiliationsOwner)
661 return d
662
663 def _doGetAffiliationsOwner(self, result):
664 node, affiliation = result
665
666 if affiliation != 'owner':
667 raise error.Forbidden()
668 return node.getAffiliations()
669
670 def setAffiliationsOwner(self, nodeIdentifier, requestor, affiliations, pep, recipient):
671 d = self.storage.getNode(nodeIdentifier, pep, recipient)
672 d.addCallback(_getAffiliation, requestor)
673 d.addCallback(self._doSetAffiliationsOwner, requestor, affiliations)
674 return d
675
676 def _doSetAffiliationsOwner(self, result, requestor, affiliations):
677 # Check that requestor is allowed to set affiliations, and delete entities
678 # with "none" affiliation
679
680 # TODO: return error with failed affiliations in case of failure
681 node, requestor_affiliation = result
682
683 if requestor_affiliation != 'owner':
684 raise error.Forbidden()
685
686 # we don't allow requestor to change its own affiliation
687 requestor_bare = requestor.userhostJID()
688 if requestor_bare in affiliations and affiliations[requestor_bare] != 'owner':
689 # FIXME: it may be interesting to allow the owner to ask for ownership removal
690 # if at least one other entity is owner for this node
691 raise error.Forbidden("You can't change your own affiliation")
692
693 to_delete = [jid_ for jid_, affiliation in affiliations.iteritems() if affiliation == 'none']
694 for jid_ in to_delete:
695 del affiliations[jid_]
696
697 if to_delete:
698 d = node.deleteAffiliations(to_delete)
699 if affiliations:
700 d.addCallback(lambda dummy: node.setAffiliations(affiliations))
701 else:
702 d = node.setAffiliations(affiliations)
703
704 return d
705
706 def getSubscriptionsOwner(self, nodeIdentifier, requestor, pep, recipient):
707 d = self.storage.getNode(nodeIdentifier, pep, recipient)
708 d.addCallback(_getAffiliation, requestor)
709 d.addCallback(self._doGetSubscriptionsOwner)
710 return d
711
712 def _doGetSubscriptionsOwner(self, result):
713 node, affiliation = result
714
715 if affiliation != 'owner':
716 raise error.Forbidden()
717 return node.getSubscriptions()
718
719 def setSubscriptionsOwner(self, nodeIdentifier, requestor, subscriptions, pep, recipient):
720 d = self.storage.getNode(nodeIdentifier, pep, recipient)
721 d.addCallback(_getAffiliation, requestor)
722 d.addCallback(self._doSetSubscriptionsOwner, requestor, subscriptions)
723 return d
724
725 def unwrapFirstError(self, failure):
726 failure.trap(defer.FirstError)
727 return failure.value.subFailure
728
729 def _doSetSubscriptionsOwner(self, result, requestor, subscriptions):
730 # Check that requestor is allowed to set subscriptions, and delete entities
731 # with "none" subscription
732
733 # TODO: return error with failed subscriptions in case of failure
734 node, requestor_affiliation = result
735
736 if requestor_affiliation != 'owner':
737 raise error.Forbidden()
738
739 d_list = []
740
741 for subscription in subscriptions.copy():
742 if subscription.state == 'none':
743 subscriptions.remove(subscription)
744 d_list.append(node.removeSubscription(subscription.subscriber))
745
746 if subscriptions:
747 d_list.append(node.setSubscriptions(subscriptions))
748
749 d = defer.gatherResults(d_list, consumeErrors=True)
750 d.addCallback(lambda _: None)
751 d.addErrback(self.unwrapFirstError)
752 return d
753
754 def filterItemsWithSchema(self, items_data, schema, owner):
755 """check schema restriction and remove fields/items if they don't comply
756
757 @param items_data(list[ItemData]): items to filter
758 items in this list will be modified
759 @param schema(domish.Element): node schema
760 @param owner(bool): True is requestor is a owner of the node
761 """
762 fields_to_remove = set()
763 for field_elt in schema.elements(data_form.NS_X_DATA, 'field'):
764 for restrict_elt in field_elt.elements(const.NS_SCHEMA_RESTRICT, 'restrict'):
765 read_restriction = restrict_elt.attributes.get('read')
766 if read_restriction is not None:
767 if read_restriction == 'owner':
768 if not owner:
769 fields_to_remove.add(field_elt['var'])
770 else:
771 raise StanzaError('feature-not-implemented', text='unknown read restriction {}'.format(read_restriction))
772 items_to_remove = []
773 for idx, item_data in enumerate(items_data):
774 item_elt = item_data.item
775 try:
776 x_elt = next(item_elt.elements(data_form.NS_X_DATA, 'x'))
777 except StopIteration:
778 log.msg("WARNING, item {id} has a schema but no form, ignoring it")
779 items_to_remove.append(item_data)
780 continue
781 form = data_form.Form.fromElement(x_elt)
782 # we remove fields which are not visible for this user
783 for field in fields_to_remove:
784 try:
785 form.removeField(form.fields[field])
786 except KeyError:
787 continue
788 item_elt.children.remove(x_elt)
789 item_elt.addChild(form.toElement())
790
791 for item_data in items_to_remove:
792 items_data.remove(item_data)
793
794 def checkPresenceSubscription(self, node, requestor):
795 """check if requestor has presence subscription from node owner
796
797 @param node(Node): node to check
798 @param requestor(jid.JID): entity who want to access node
799 """
800 def gotRoster(roster):
801 if roster is None:
802 raise error.Forbidden()
803
804 if requestor not in roster:
805 raise error.Forbidden()
806
807 if not roster[requestor].subscriptionFrom:
808 raise error.Forbidden()
809
810 d = self.getOwnerRoster(node)
811 d.addCallback(gotRoster)
812 return d
813
814 @defer.inlineCallbacks
815 def checkRosterGroups(self, node, requestor):
816 """check if requestor is in allowed groups of a node
817
818 @param node(Node): node to check
819 @param requestor(jid.JID): entity who want to access node
820 """
821 roster = yield self.getOwnerRoster(node)
822
823 if roster is None:
824 raise error.Forbidden()
825
826 if requestor not in roster:
827 raise error.Forbidden()
828
829 authorized_groups = yield node.getAuthorizedGroups()
830
831 if not roster[requestor].groups.intersection(authorized_groups):
832 # requestor is in roster but not in one of the allowed groups
833 raise error.Forbidden()
834
835 def checkNodeAffiliations(self, node, requestor):
836 """check if requestor is in white list of a node
837
838 @param node(Node): node to check
839 @param requestor(jid.JID): entity who want to access node
840 """
841 def gotAffiliations(affiliations):
842 try:
843 affiliation = affiliations[requestor.userhostJID()]
844 except KeyError:
845 raise error.Forbidden()
846 else:
847 if affiliation not in ('owner', 'publisher', 'member'):
848 raise error.Forbidden()
849
850 d = node.getAffiliations()
851 d.addCallback(gotAffiliations)
852 return d
853
854 @defer.inlineCallbacks
855 def checkNodeAccess(self, node, requestor):
856 """check if a requestor can access data of a node
857
858 @param node(Node): node to check
859 @param requestor(jid.JID): entity who want to access node
860 @return (tuple): permissions data with:
861 - owner(bool): True if requestor is owner of the node
862 - roster(None, ): roster of the requestor
863 None if not needed/available
864 - access_model(str): access model of the node
865 @raise error.Forbidden: access is not granted
866 @raise error.NotLeafNodeError: this node is not a leaf
867 """
868 node, affiliation = yield _getAffiliation(node, requestor)
869
870 if not iidavoll.ILeafNode.providedBy(node):
871 raise error.NotLeafNodeError()
872
873 if affiliation == 'outcast':
874 raise error.Forbidden()
875
876 # node access check
877 owner = affiliation == 'owner'
878 access_model = node.getAccessModel()
879 roster = None
880
881 if access_model == const.VAL_AMODEL_OPEN or owner:
882 pass
883 elif access_model == const.VAL_AMODEL_PRESENCE:
884 yield self.checkPresenceSubscription(node, requestor)
885 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
886 # FIXME: for node, access should be renamed owner-roster, not publisher
887 yield self.checkRosterGroups(node, requestor)
888 elif access_model == const.VAL_AMODEL_WHITELIST:
889 yield self.checkNodeAffiliations(node, requestor)
890 else:
891 raise Exception(u"Unknown access_model")
892
893 defer.returnValue((affiliation, owner, roster, access_model))
894
895 @defer.inlineCallbacks
896 def getItemsIds(self, nodeIdentifier, requestor, authorized_groups, unrestricted, maxItems=None, ext_data=None, pep=False, recipient=None):
897 # FIXME: items access model are not checked
898 # TODO: check items access model
899 node = yield self.storage.getNode(nodeIdentifier, pep, recipient)
900 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor)
901 ids = yield node.getItemsIds(authorized_groups,
902 unrestricted,
903 maxItems,
904 ext_data)
905 defer.returnValue(ids)
906
907 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None,
908 itemIdentifiers=None, ext_data=None):
909 d = self.getItemsData(nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data)
910 d.addCallback(lambda items_data: [item_data.item for item_data in items_data])
911 return d
912
913 @defer.inlineCallbacks
914 def getOwnerRoster(self, node, owners=None):
915 # FIXME: roster of publisher, not owner, must be used
916 if owners is None:
917 owners = yield node.getOwners()
918
919 if len(owners) != 1:
920 log.msg('publisher-roster access is not allowed with more than 1 owner')
921 return
922
923 owner_jid = owners[0]
924
925 try:
926 roster = yield self.privilege.getRoster(owner_jid)
927 except Exception as e:
928 log.msg("Error while getting roster of {owner_jid}: {msg}".format(
929 owner_jid = owner_jid.full(),
930 msg = e))
931 return
932 defer.returnValue(roster)
933
934 @defer.inlineCallbacks
935 def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None,
936 itemIdentifiers=None, ext_data=None):
937 """like getItems but return the whole ItemData"""
938 if maxItems == 0:
939 log.msg("WARNING: maxItems=0 on items retrieval")
940 defer.returnValue([])
941
942 if ext_data is None:
943 ext_data = {}
944 node = yield self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
945 try:
946 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor)
947 except error.NotLeafNodeError:
948 defer.returnValue([])
949
950 # at this point node access is checked
951
952 if owner:
953 # requestor_groups is only used in restricted access
954 requestor_groups = None
955 else:
956 if roster is None:
957 # FIXME: publisher roster should be used, not owner
958 roster = yield self.getOwnerRoster(node)
959 if roster is None:
960 roster = {}
961 roster_item = roster.get(requestor.userhostJID())
962 requestor_groups = tuple(roster_item.groups) if roster_item else tuple()
963
964 if itemIdentifiers:
965 items_data = yield node.getItemsById(requestor_groups, owner, itemIdentifiers)
966 else:
967 items_data = yield node.getItems(requestor_groups, owner, maxItems, ext_data)
968
969 if owner:
970 # Add item config data form to items with roster access model
971 for item_data in items_data:
972 if item_data.access_model == const.VAL_AMODEL_OPEN:
973 pass
974 elif item_data.access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
975 form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG)
976 access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_PUBLISHER_ROSTER)
977 allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=item_data.config[const.OPT_ROSTER_GROUPS_ALLOWED])
978 form.addField(access)
979 form.addField(allowed)
980 item_data.item.addChild(form.toElement())
981 elif access_model == const.VAL_AMODEL_WHITELIST:
982 #FIXME
983 raise NotImplementedError
984 else:
985 raise error.BadAccessTypeError(access_model)
986
987 schema = node.getSchema()
988 if schema is not None:
989 self.filterItemsWithSchema(items_data, schema, owner)
990
991 yield self._items_rsm(items_data, node, requestor_groups, owner, itemIdentifiers, ext_data)
992 defer.returnValue(items_data)
993
994 def _setCount(self, value, response):
995 response.count = value
996
997 def _setIndex(self, value, response, adjust):
998 """Set index in RSM response
999
1000 @param value(int): value of the reference index (i.e. before or after item)
1001 @param response(RSMResponse): response instance to fill
1002 @param adjust(int): adjustement term (i.e. difference between reference index and first item of the result)
1003 """
1004 response.index = value + adjust
1005
1006 def _items_rsm(self, items_data, node, authorized_groups, owner,
1007 itemIdentifiers, ext_data):
1008 # FIXME: move this to a separate module
1009 # TODO: Index can be optimized by keeping a cache of the last RSM request
1010 # An other optimisation would be to look for index first and use it as offset
1011 try:
1012 rsm_request = ext_data['rsm']
1013 except KeyError:
1014 # No RSM in this request, nothing to do
1015 return items_data
1016
1017 if itemIdentifiers:
1018 log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part")
1019 return items_data
1020
1021 response = rsm.RSMResponse()
1022
1023 d_count = node.getItemsCount(authorized_groups, owner, ext_data)
1024 d_count.addCallback(self._setCount, response)
1025 d_list = [d_count]
1026
1027 if items_data:
1028 response.first = items_data[0].item['id']
1029 response.last = items_data[-1].item['id']
1030
1031 # index handling
1032 if rsm_request.index is not None:
1033 response.index = rsm_request.index
1034 elif rsm_request.before:
1035 # The last page case (before == '') is managed in render method
1036 d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data)
1037 d_index.addCallback(self._setIndex, response, -len(items_data))
1038 d_list.append(d_index)
1039 elif rsm_request.after is not None:
1040 d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data)
1041 d_index.addCallback(self._setIndex, response, 1)
1042 d_list.append(d_index)
1043 else:
1044 # the first page was requested
1045 response.index = 0
1046
1047 def render(result):
1048 if rsm_request.before == '':
1049 # the last page was requested
1050 response.index = response.count - len(items_data)
1051 items_data.append(container.ItemData(response.toElement()))
1052 return items_data
1053
1054 return defer.DeferredList(d_list).addCallback(render)
1055
1056 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
1057 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1058 d.addCallback(_getAffiliation, requestor)
1059 d.addCallback(self._doRetract, itemIdentifiers, requestor, notify, pep, recipient)
1060 return d
1061
1062 def _doRetract(self, result, itemIdentifiers, requestor, notify, pep, recipient):
1063 node, affiliation = result
1064 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
1065
1066 if not persistItems:
1067 raise error.NodeNotPersistent()
1068
1069 # we need to get the items before removing them, for the notifications
1070
1071 def removeItems(items_data):
1072 """Remove the items and keep only actually removed ones in items_data"""
1073 d = node.removeItems(itemIdentifiers)
1074 d.addCallback(lambda removed: [item_data for item_data in items_data if item_data.item["id"] in removed])
1075 return d
1076
1077 def checkPublishers(publishers_map):
1078 """Called when requestor is neither owner neither publisher of the Node
1079
1080 We check that requestor is publisher of all the items he wants to retract
1081 and raise error.Forbidden if it is not the case
1082 """
1083 # TODO: the behaviour should be configurable (per node ?)
1084 if any((requestor.userhostJID() != publisher.userhostJID() for publisher in publishers_map.itervalues())):
1085 raise error.Forbidden()
1086
1087 if affiliation in ['owner', 'publisher']:
1088 # the requestor is owner or publisher of the node
1089 # he can retract what he wants
1090 d = defer.succeed(None)
1091 else:
1092 # the requestor doesn't have right to retract on the whole node
1093 # we check if he is a publisher for all items he wants to retract
1094 # and forbid the retraction else.
1095 d = node.getItemsPublishers(itemIdentifiers)
1096 d.addCallback(checkPublishers)
1097 d.addCallback(lambda dummy: node.getItemsById(None, True, itemIdentifiers))
1098 d.addCallback(removeItems)
1099
1100 if notify:
1101 d.addCallback(self._doNotifyRetraction, node, pep, recipient)
1102 return d
1103
1104 def _doNotifyRetraction(self, items_data, node, pep, recipient):
1105 self.dispatch({'items_data': items_data,
1106 'node': node,
1107 'pep': pep,
1108 'recipient': recipient},
1109 '//event/pubsub/retract')
1110
1111 def purgeNode(self, nodeIdentifier, requestor, pep, recipient):
1112 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1113 d.addCallback(_getAffiliation, requestor)
1114 d.addCallback(self._doPurge)
1115 return d
1116
1117 def _doPurge(self, result):
1118 node, affiliation = result
1119 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
1120
1121 if affiliation != 'owner':
1122 raise error.Forbidden()
1123
1124 if not persistItems:
1125 raise error.NodeNotPersistent()
1126
1127 d = node.purge()
1128 d.addCallback(self._doNotifyPurge, node.nodeIdentifier)
1129 return d
1130
1131 def _doNotifyPurge(self, result, nodeIdentifier):
1132 self.dispatch(nodeIdentifier, '//event/pubsub/purge')
1133
1134 def registerPreDelete(self, preDeleteFn):
1135 self._callbackList.append(preDeleteFn)
1136
1137 def getSubscribers(self, nodeIdentifier, pep, recipient):
1138 def cb(subscriptions):
1139 return [subscription.subscriber for subscription in subscriptions]
1140
1141 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1142 d.addCallback(lambda node: node.getSubscriptions('subscribed'))
1143 d.addCallback(cb)
1144 return d
1145
1146 def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None):
1147 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1148 d.addCallback(_getAffiliation, requestor)
1149 d.addCallback(self._doPreDelete, redirectURI, pep, recipient)
1150 return d
1151
1152 def _doPreDelete(self, result, redirectURI, pep, recipient):
1153 node, affiliation = result
1154
1155 if affiliation != 'owner':
1156 raise error.Forbidden()
1157
1158 data = {'node': node,
1159 'redirectURI': redirectURI}
1160
1161 d = defer.DeferredList([cb(data, pep, recipient)
1162 for cb in self._callbackList],
1163 consumeErrors=1)
1164 d.addCallback(self._doDelete, node.nodeDbId)
1165
1166 def _doDelete(self, result, nodeDbId):
1167 dl = []
1168 for succeeded, r in result:
1169 if succeeded and r:
1170 dl.extend(r)
1171
1172 d = self.storage.deleteNodeByDbId(nodeDbId)
1173 d.addCallback(self._doNotifyDelete, dl)
1174
1175 return d
1176
1177 def _doNotifyDelete(self, result, dl):
1178 for d in dl:
1179 d.callback(None)
1180
1181
1182 class PubSubResourceFromBackend(pubsub.PubSubResource):
1183 """
1184 Adapts a backend to an xmpp publish-subscribe service.
1185 """
1186
1187 features = [
1188 "config-node",
1189 "create-nodes",
1190 "delete-any",
1191 "delete-nodes",
1192 "item-ids",
1193 "meta-data",
1194 "publish",
1195 "purge-nodes",
1196 "retract-items",
1197 "retrieve-affiliations",
1198 "retrieve-default",
1199 "retrieve-items",
1200 "retrieve-subscriptions",
1201 "subscribe",
1202 ]
1203
1204 discoIdentity = disco.DiscoIdentity('pubsub',
1205 'service',
1206 u'Salut à Toi pubsub service')
1207
1208 pubsubService = None
1209
1210 _errorMap = {
1211 error.NodeNotFound: ('item-not-found', None, None),
1212 error.NodeExists: ('conflict', None, None),
1213 error.Forbidden: ('forbidden', None, None),
1214 error.NotAuthorized: ('not-authorized', None, None),
1215 error.ItemNotFound: ('item-not-found', None, None),
1216 error.ItemForbidden: ('bad-request', 'item-forbidden', None),
1217 error.ItemRequired: ('bad-request', 'item-required', None),
1218 error.NoInstantNodes: ('not-acceptable',
1219 'unsupported',
1220 'instant-nodes'),
1221 error.NotSubscribed: ('unexpected-request', 'not-subscribed', None),
1222 error.InvalidConfigurationOption: ('not-acceptable', None, None),
1223 error.InvalidConfigurationValue: ('not-acceptable', None, None),
1224 error.NodeNotPersistent: ('feature-not-implemented',
1225 'unsupported',
1226 'persistent-node'),
1227 error.NoRootNode: ('bad-request', None, None),
1228 error.NoCollections: ('feature-not-implemented',
1229 'unsupported',
1230 'collections'),
1231 error.NoPublishing: ('feature-not-implemented',
1232 'unsupported',
1233 'publish'),
1234 }
1235
1236 def __init__(self, backend):
1237 pubsub.PubSubResource.__init__(self)
1238
1239 self.backend = backend
1240 self.hideNodes = False
1241
1242 self.backend.registerPublishNotifier(self._notifyPublish)
1243 self.backend.registerRetractNotifier(self._notifyRetract)
1244 self.backend.registerPreDelete(self._preDelete)
1245
1246 # FIXME: to be removed, it's not useful anymore as PEP is now used
1247 # if self.backend.supportsCreatorCheck():
1248 # self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to
1249 # a jid in this server) is created by the right jid
1250
1251 if self.backend.supportsAutoCreate():
1252 self.features.append("auto-create")
1253
1254 if self.backend.supportsPublishOptions():
1255 self.features.append("publish-options")
1256
1257 if self.backend.supportsInstantNodes():
1258 self.features.append("instant-nodes")
1259
1260 if self.backend.supportsOutcastAffiliation():
1261 self.features.append("outcast-affiliation")
1262
1263 if self.backend.supportsPersistentItems():
1264 self.features.append("persistent-items")
1265
1266 if self.backend.supportsPublisherAffiliation():
1267 self.features.append("publisher-affiliation")
1268
1269 if self.backend.supportsGroupBlog():
1270 self.features.append("groupblog")
1271
1272
1273 # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples
1274 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277)
1275
1276 def getFullItem(self, item_data):
1277 """ Attach item configuration to this item
1278
1279 Used to give item configuration back to node's owner (and *only* to owner)
1280 """
1281 # TODO: a test should check that only the owner get the item configuration back
1282
1283 item, item_config = item_data.item, item_data.config
1284 new_item = deepcopy(item)
1285 if item_config:
1286 new_item.addChild(item_config.toElement())
1287 return new_item
1288
1289 @defer.inlineCallbacks
1290 def _notifyPublish(self, data):
1291 items_data = data['items_data']
1292 node = data['node']
1293 pep = data['pep']
1294 recipient = data['recipient']
1295
1296 owners, notifications_filtered = yield self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient)
1297
1298 # we notify the owners
1299 # FIXME: check if this comply with XEP-0060 (option needed ?)
1300 # TODO: item's access model have to be sent back to owner
1301 # TODO: same thing for getItems
1302
1303 for owner_jid in owners:
1304 notifications_filtered.append(
1305 (owner_jid,
1306 {pubsub.Subscription(node.nodeIdentifier,
1307 owner_jid,
1308 'subscribed')},
1309 [self.getFullItem(item_data) for item_data in items_data]))
1310
1311 if pep:
1312 defer.returnValue(self.backend.privilege.notifyPublish(
1313 recipient,
1314 node.nodeIdentifier,
1315 notifications_filtered))
1316
1317 else:
1318 defer.returnValue(self.pubsubService.notifyPublish(
1319 self.serviceJID,
1320 node.nodeIdentifier,
1321 notifications_filtered))
1322
1323 def _notifyRetract(self, data):
1324 items_data = data['items_data']
1325 node = data['node']
1326 pep = data['pep']
1327 recipient = data['recipient']
1328
1329 def afterPrepare(result):
1330 owners, notifications_filtered = result
1331 #we add the owners
1332
1333 for owner_jid in owners:
1334 notifications_filtered.append(
1335 (owner_jid,
1336 {pubsub.Subscription(node.nodeIdentifier,
1337 owner_jid,
1338 'subscribed')},
1339 [item_data.item for item_data in items_data]))
1340
1341 if pep:
1342 return self.backend.privilege.notifyRetract(
1343 recipient,
1344 node.nodeIdentifier,
1345 notifications_filtered)
1346
1347 else:
1348 return self.pubsubService.notifyRetract(
1349 self.serviceJID,
1350 node.nodeIdentifier,
1351 notifications_filtered)
1352
1353 d = self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient)
1354 d.addCallback(afterPrepare)
1355 return d
1356
1357 @defer.inlineCallbacks
1358 def _prepareNotify(self, items_data, node, subscription=None, pep=None, recipient=None):
1359 """Do a bunch of permissions check and filter notifications
1360
1361 The owner is not added to these notifications,
1362 it must be added by the calling method
1363 @param items_data(tuple): must contain:
1364 - item (domish.Element)
1365 - access_model (unicode)
1366 - access_list (dict as returned getItemsById, or item_config)
1367 @param node(LeafNode): node hosting the items
1368 @param subscription(pubsub.Subscription, None): TODO
1369
1370 @return (tuple): will contain:
1371 - notifications_filtered
1372 - node_owner_jid
1373 - items_data
1374 """
1375 if subscription is None:
1376 notifications = yield self.backend.getNotifications(node, items_data)
1377 else:
1378 notifications = [(subscription.subscriber, [subscription], items_data)]
1379
1380 if pep and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence'):
1381 # for PEP we need to manage automatic subscriptions (cf. XEP-0163 §4)
1382 explicit_subscribers = {subscriber for subscriber, _, _ in notifications}
1383 auto_subscribers = yield self.backend.privilege.getAutoSubscribers(recipient, node.nodeIdentifier, explicit_subscribers)
1384 for sub_jid in auto_subscribers:
1385 sub = pubsub.Subscription(node.nodeIdentifier, sub_jid, 'subscribed')
1386 notifications.append((sub_jid, [sub], items_data))
1387
1388 owners = yield node.getOwners()
1389 owner_roster = None
1390
1391 # now we check access of subscriber for each item, and keep only allowed ones
1392
1393 #we filter items not allowed for the subscribers
1394 notifications_filtered = []
1395 schema = node.getSchema()
1396
1397 for subscriber, subscriptions, items_data in notifications:
1398 subscriber_bare = subscriber.userhostJID()
1399 if subscriber_bare in owners:
1400 # as notification is always sent to owner,
1401 # we ignore owner if he is here
1402 continue
1403 allowed_items = [] #we keep only item which subscriber can access
1404
1405 if schema is not None:
1406 # we have to deepcopy items because different subscribers may receive
1407 # different items (e.g. read restriction in schema)
1408 items_data = deepcopy(items_data)
1409 self.backend.filterItemsWithSchema(items_data, schema, False)
1410
1411 for item_data in items_data:
1412 item, access_model = item_data.item, item_data.access_model
1413 access_list = item_data.config
1414 if access_model == const.VAL_AMODEL_OPEN:
1415 allowed_items.append(item)
1416 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
1417 if owner_roster is None:
1418 # FIXME: publisher roster should be used, not owner
1419 owner_roster= yield self.getOwnerRoster(node, owners)
1420 if owner_roster is None:
1421 owner_roster = {}
1422 if not subscriber_bare in owner_roster:
1423 continue
1424 #the subscriber is known, is he in the right group ?
1425 authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED]
1426 if owner_roster[subscriber_bare].groups.intersection(authorized_groups):
1427 allowed_items.append(item)
1428 else: #unknown access_model
1429 # TODO: white list access
1430 raise NotImplementedError
1431
1432 if allowed_items:
1433 notifications_filtered.append((subscriber, subscriptions, allowed_items))
1434
1435 defer.returnValue((owners, notifications_filtered))
1436
1437 def _preDelete(self, data, pep, recipient):
1438 nodeIdentifier = data['node'].nodeIdentifier
1439 redirectURI = data.get('redirectURI', None)
1440 d = self.backend.getSubscribers(nodeIdentifier, pep, recipient)
1441 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
1442 self.serviceJID,
1443 nodeIdentifier,
1444 subscribers,
1445 redirectURI))
1446 return d
1447
1448 def _mapErrors(self, failure):
1449 e = failure.trap(*self._errorMap.keys())
1450
1451 condition, pubsubCondition, feature = self._errorMap[e]
1452 msg = failure.value.msg
1453
1454 if pubsubCondition:
1455 exc = pubsub.PubSubError(condition, pubsubCondition, feature, msg)
1456 else:
1457 exc = StanzaError(condition, text=msg)
1458
1459 raise exc
1460
1461 def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None):
1462 return [] # FIXME: disabled for now, need to manage PEP
1463 if not requestor.resource:
1464 # this avoid error when getting a disco request from server during namespace delegation
1465 return []
1466 info = {}
1467
1468 def saveType(result):
1469 info['type'] = result
1470 return nodeIdentifier
1471
1472 def saveMetaData(result):
1473 info['meta-data'] = result
1474 return info
1475
1476 def trapNotFound(failure):
1477 failure.trap(error.NodeNotFound)
1478 return info
1479
1480 d = defer.succeed(nodeIdentifier)
1481 d.addCallback(self.backend.getNodeType)
1482 d.addCallback(saveType)
1483 d.addCallback(self.backend.getNodeMetaData)
1484 d.addCallback(saveMetaData)
1485 d.addErrback(trapNotFound)
1486 d.addErrback(self._mapErrors)
1487 return d
1488
1489 def getNodes(self, requestor, service, nodeIdentifier):
1490 """return nodes for disco#items
1491
1492 Pubsub/PEP nodes will be returned if disco node is not specified
1493 else Pubsub/PEP items will be returned
1494 (according to what requestor can access)
1495 """
1496 try:
1497 pep = service.pep
1498 except AttributeError:
1499 pep = False
1500
1501 if service.resource:
1502 return defer.succeed([])
1503
1504 if nodeIdentifier:
1505 d = self.backend.getItemsIds(nodeIdentifier,
1506 requestor,
1507 [],
1508 requestor.userhostJID() == service,
1509 None,
1510 None,
1511 pep,
1512 service)
1513 # items must be set as name, not node
1514 d.addCallback(lambda items: [(None, item) for item in items])
1515
1516 else:
1517 d = self.backend.getNodes(requestor.userhostJID(),
1518 pep,
1519 service)
1520 return d.addErrback(self._mapErrors)
1521
1522 def getConfigurationOptions(self):
1523 return self.backend.nodeOptions
1524
1525 def _publish_errb(self, failure, request):
1526 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
1527 print "Auto-creating node %s" % (request.nodeIdentifier,)
1528 d = self.backend.createNode(request.nodeIdentifier,
1529 request.sender,
1530 pep=self._isPep(request),
1531 recipient=request.recipient)
1532 d.addCallback(lambda ignore,
1533 request: self.backend.publish(request.nodeIdentifier,
1534 request.items,
1535 request.sender,
1536 self._isPep(request),
1537 request.recipient,
1538 ),
1539 request)
1540 return d
1541
1542 return failure
1543
1544 def _isPep(self, request):
1545 try:
1546 return request.delegated
1547 except AttributeError:
1548 return False
1549
1550 def publish(self, request):
1551 d = self.backend.publish(request.nodeIdentifier,
1552 request.items,
1553 request.sender,
1554 self._isPep(request),
1555 request.recipient)
1556 d.addErrback(self._publish_errb, request)
1557 return d.addErrback(self._mapErrors)
1558
1559 def subscribe(self, request):
1560 d = self.backend.subscribe(request.nodeIdentifier,
1561 request.subscriber,
1562 request.sender,
1563 self._isPep(request),
1564 request.recipient)
1565 return d.addErrback(self._mapErrors)
1566
1567 def unsubscribe(self, request):
1568 d = self.backend.unsubscribe(request.nodeIdentifier,
1569 request.subscriber,
1570 request.sender,
1571 self._isPep(request),
1572 request.recipient)
1573 return d.addErrback(self._mapErrors)
1574
1575 def subscriptions(self, request):
1576 d = self.backend.getSubscriptions(request.sender,
1577 request.nodeIdentifier,
1578 self._isPep(request),
1579 request.recipient)
1580 return d.addErrback(self._mapErrors)
1581
1582 def affiliations(self, request):
1583 """Retrieve affiliation for normal entity (cf. XEP-0060 §5.7)
1584
1585 retrieve all node where this jid is affiliated
1586 """
1587 d = self.backend.getAffiliations(request.sender,
1588 request.nodeIdentifier,
1589 self._isPep(request),
1590 request.recipient)
1591 return d.addErrback(self._mapErrors)
1592
1593 def create(self, request):
1594 d = self.backend.createNode(request.nodeIdentifier,
1595 request.sender, request.options,
1596 self._isPep(request),
1597 request.recipient)
1598 return d.addErrback(self._mapErrors)
1599
1600 def default(self, request):
1601 d = self.backend.getDefaultConfiguration(request.nodeType,
1602 self._isPep(request),
1603 request.sender)
1604 return d.addErrback(self._mapErrors)
1605
1606 def configureGet(self, request):
1607 d = self.backend.getNodeConfiguration(request.nodeIdentifier,
1608 self._isPep(request),
1609 request.recipient)
1610 return d.addErrback(self._mapErrors)
1611
1612 def configureSet(self, request):
1613 d = self.backend.setNodeConfiguration(request.nodeIdentifier,
1614 request.options,
1615 request.sender,
1616 self._isPep(request),
1617 request.recipient)
1618 return d.addErrback(self._mapErrors)
1619
1620 def affiliationsGet(self, request):
1621 """Retrieve affiliations for owner (cf. XEP-0060 §8.9.1)
1622
1623 retrieve all affiliations for a node
1624 """
1625 d = self.backend.getAffiliationsOwner(request.nodeIdentifier,
1626 request.sender,
1627 self._isPep(request),
1628 request.recipient)
1629 return d.addErrback(self._mapErrors)
1630
1631 def affiliationsSet(self, request):
1632 d = self.backend.setAffiliationsOwner(request.nodeIdentifier,
1633 request.sender,
1634 request.affiliations,
1635 self._isPep(request),
1636 request.recipient)
1637 return d.addErrback(self._mapErrors)
1638
1639 def subscriptionsGet(self, request):
1640 """Retrieve subscriptions for owner (cf. XEP-0060 §8.8.1)
1641
1642 retrieve all affiliations for a node
1643 """
1644 d = self.backend.getSubscriptionsOwner(request.nodeIdentifier,
1645 request.sender,
1646 self._isPep(request),
1647 request.recipient)
1648 return d.addErrback(self._mapErrors)
1649
1650 def subscriptionsSet(self, request):
1651 d = self.backend.setSubscriptionsOwner(request.nodeIdentifier,
1652 request.sender,
1653 request.subscriptions,
1654 self._isPep(request),
1655 request.recipient)
1656 return d.addErrback(self._mapErrors)
1657
1658 def items(self, request):
1659 ext_data = {}
1660 if const.FLAG_ENABLE_RSM and request.rsm is not None:
1661 ext_data['rsm'] = request.rsm
1662 try:
1663 ext_data['pep'] = request.delegated
1664 except AttributeError:
1665 pass
1666 d = self.backend.getItems(request.nodeIdentifier,
1667 request.sender,
1668 request.recipient,
1669 request.maxItems,
1670 request.itemIdentifiers,
1671 ext_data)
1672 return d.addErrback(self._mapErrors)
1673
1674 def retract(self, request):
1675 d = self.backend.retractItem(request.nodeIdentifier,
1676 request.itemIdentifiers,
1677 request.sender,
1678 request.notify,
1679 self._isPep(request),
1680 request.recipient)
1681 return d.addErrback(self._mapErrors)
1682
1683 def purge(self, request):
1684 d = self.backend.purgeNode(request.nodeIdentifier,
1685 request.sender,
1686 self._isPep(request),
1687 request.recipient)
1688 return d.addErrback(self._mapErrors)
1689
1690 def delete(self, request):
1691 d = self.backend.deleteNode(request.nodeIdentifier,
1692 request.sender,
1693 self._isPep(request),
1694 request.recipient)
1695 return d.addErrback(self._mapErrors)
1696
1697 components.registerAdapter(PubSubResourceFromBackend,
1698 iidavoll.IBackendService,
1699 iwokkel.IPubSubResource)