comparison sat_pubsub/backend.py @ 405:c56a728412f1

file organisation + setup refactoring: - `/src` has been renamed to `/sat_pubsub`, this is the recommended naming convention - revamped `setup.py` on the basis of SàT's `setup.py` - added a `VERSION` which is the unique place where version number will now be set - use same trick as in SàT to specify dev version (`D` at the end) - use setuptools_scm to retrieve Mercurial hash when in dev version
author Goffi <goffi@goffi.org>
date Fri, 16 Aug 2019 12:00:02 +0200
parents src/backend.py@1dc606612405
children ccb2a22ea0fc
comparison
equal deleted inserted replaced
404:105a0772eedd 405:c56a728412f1
1 #!/usr/bin/python
2 #-*- coding: utf-8 -*-
3 #
4 # Copyright (c) 2012-2019 Jérôme Poisson
5 # Copyright (c) 2013-2016 Adrien Cossa
6 # Copyright (c) 2003-2011 Ralph Meijer
7
8
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Affero General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU Affero General Public License for more details.
18
19 # You should have received a copy of the GNU Affero General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 # --
22
23 # This program is based on Idavoll (http://idavoll.ik.nu/),
24 # originaly written by Ralph Meijer (http://ralphm.net/blog/)
25 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original
26 # license.
27
28 # --
29
30 # Here is a copy of the original license:
31
32 # Copyright (c) 2003-2011 Ralph Meijer
33
34 # Permission is hereby granted, free of charge, to any person obtaining
35 # a copy of this software and associated documentation files (the
36 # "Software"), to deal in the Software without restriction, including
37 # without limitation the rights to use, copy, modify, merge, publish,
38 # distribute, sublicense, and/or sell copies of the Software, and to
39 # permit persons to whom the Software is furnished to do so, subject to
40 # the following conditions:
41
42 # The above copyright notice and this permission notice shall be
43 # included in all copies or substantial portions of the Software.
44
45 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
46 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
47 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
48 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
49 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
50 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
52
53
54 """
55 Generic publish-subscribe backend.
56
57 This module implements a generic publish-subscribe backend service with
58 business logic as per
59 U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>} that interacts with
60 a given storage facility. It also provides an adapter from the XMPP
61 publish-subscribe protocol.
62 """
63
64 import copy
65 import uuid
66
67 from zope.interface import implements
68
69 from twisted.application import service
70 from twisted.python import components, log
71 from twisted.internet import defer, reactor
72 from twisted.words.protocols.jabber.error import StanzaError
73 # from twisted.words.protocols.jabber.jid import JID, InvalidFormat
74 from twisted.words.xish import domish, utility
75
76 from wokkel import disco
77 from wokkel import data_form
78 from wokkel import rsm
79 from wokkel import iwokkel
80 from wokkel import pubsub
81 from wokkel.subprotocols import XMPPHandler
82
83 from sat_pubsub import error
84 from sat_pubsub import iidavoll
85 from sat_pubsub import const
86 from sat_pubsub import container
87
88
89 def _getAffiliation(node, entity):
90 d = node.getAffiliation(entity)
91 d.addCallback(lambda affiliation: (node, affiliation))
92 return d
93
94
95 def elementCopy(element):
96 """Make a copy of a domish.Element
97
98 The copy will have its own children list, so other elements
99 can be added as direct children without modifying orignal one.
100 Children are not deeply copied, so if an element is added to a child or grandchild,
101 it will also affect original element.
102 @param element(domish.Element): Element to clone
103 """
104 new_elt = domish.Element(
105 (element.uri, element.name),
106 defaultUri = element.defaultUri,
107 attribs = element.attributes,
108 localPrefixes = element.localPrefixes)
109 new_elt.parent = element.parent
110 new_elt.children = element.children[:]
111 return new_elt
112
113
114 def itemDataCopy(item_data):
115 """Make a copy of an item_data
116
117 deep copy every element of the tuple but item
118 do an elementCopy of item_data.item
119 @param item_data(ItemData): item data to copy
120 @return (ItemData): copied data
121 """
122 return container.ItemData(*[elementCopy(item_data.item)]
123 + [copy.deepcopy(d) for d in item_data[1:]])
124
125
126 class BackendService(service.Service, utility.EventDispatcher):
127 """
128 Generic publish-subscribe backend service.
129
130 @cvar nodeOptions: Node configuration form as a mapping from the field
131 name to a dictionary that holds the field's type, label
132 and possible options to choose from.
133 @type nodeOptions: C{dict}.
134 @cvar defaultConfig: The default node configuration.
135 """
136
137 implements(iidavoll.IBackendService)
138
139 nodeOptions = {
140 const.OPT_PERSIST_ITEMS:
141 {"type": "boolean",
142 "label": "Persist items to storage"},
143 const.OPT_DELIVER_PAYLOADS:
144 {"type": "boolean",
145 "label": "Deliver payloads with event notifications"},
146 const.OPT_SEND_LAST_PUBLISHED_ITEM:
147 {"type": "list-single",
148 "label": "When to send the last published item",
149 "options": {
150 "never": "Never",
151 "on_sub": "When a new subscription is processed"}
152 },
153 const.OPT_ACCESS_MODEL:
154 {"type": "list-single",
155 "label": "Who can subscribe to this node",
156 "options": {
157 const.VAL_AMODEL_OPEN: "Public node",
158 const.VAL_AMODEL_PRESENCE: "Node restricted to entites subscribed to owner presence",
159 const.VAL_AMODEL_PUBLISHER_ROSTER: "Node restricted to some groups of publisher's roster",
160 const.VAL_AMODEL_WHITELIST: "Node restricted to some jids",
161 }
162 },
163 const.OPT_ROSTER_GROUPS_ALLOWED:
164 {"type": "list-multi",
165 "label": "Groups of the roster allowed to access the node",
166 },
167 const.OPT_PUBLISH_MODEL:
168 {"type": "list-single",
169 "label": "Who can publish to this node",
170 "options": {
171 const.VAL_PMODEL_OPEN: "Everybody can publish",
172 const.VAL_PMODEL_PUBLISHERS: "Only owner and publishers can publish",
173 const.VAL_PMODEL_SUBSCRIBERS: "Everybody which subscribed to the node",
174 }
175 },
176 const.OPT_SERIAL_IDS:
177 {"type": "boolean",
178 "label": "Use serial ids"},
179 const.OPT_CONSISTENT_PUBLISHER:
180 {"type": "boolean",
181 "label": "Keep publisher on update"},
182 }
183
184 subscriptionOptions = {
185 "pubsub#subscription_type":
186 {"type": "list-single",
187 "options": {
188 "items": "Receive notification of new items only",
189 "nodes": "Receive notification of new nodes only"}
190 },
191 "pubsub#subscription_depth":
192 {"type": "list-single",
193 "options": {
194 "1": "Receive notification from direct child nodes only",
195 "all": "Receive notification from all descendent nodes"}
196 },
197 }
198
199 def __init__(self, storage, config):
200 utility.EventDispatcher.__init__(self)
201 self.storage = storage
202 self._callbackList = []
203 self.config = config
204 self.admins = config[u'admins_jids_list']
205
206 def isAdmin(self, entity_jid):
207 """Return True if an entity is an administrator"""
208 return entity_jid.userhostJID() in self.admins
209
210 def supportsPublishOptions(self):
211 return True
212 def supportsPublisherAffiliation(self):
213 return True
214
215 def supportsGroupBlog(self):
216 return True
217
218 def supportsOutcastAffiliation(self):
219 return True
220
221 def supportsPersistentItems(self):
222 return True
223
224 def supportsPublishModel(self):
225 return True
226
227 def getNodeType(self, nodeIdentifier, pep, recipient=None):
228 # FIXME: manage pep and recipient
229 d = self.storage.getNode(nodeIdentifier, pep, recipient)
230 d.addCallback(lambda node: node.getType())
231 return d
232
233 def _getNodesIds(self, subscribed, pep, recipient):
234 # TODO: filter whitelist nodes
235 # TODO: handle publisher-roster (should probably be renamed to owner-roster for nodes)
236 if not subscribed:
237 allowed_accesses = {'open', 'whitelist'}
238 else:
239 allowed_accesses = {'open', 'presence', 'whitelist'}
240 return self.storage.getNodeIds(pep, recipient, allowed_accesses)
241
242 def getNodes(self, requestor, pep, recipient):
243 if pep:
244 d = self.privilege.isSubscribedFrom(requestor, recipient)
245 d.addCallback(self._getNodesIds, pep, recipient)
246 return d
247 return self.storage.getNodeIds(pep, recipient)
248
249 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None):
250 # FIXME: manage pep and recipient
251 d = self.storage.getNode(nodeIdentifier, pep, recipient)
252 d.addCallback(lambda node: node.getMetaData())
253 d.addCallback(self._makeMetaData)
254 return d
255
256 def _makeMetaData(self, metaData):
257 options = []
258 for key, value in metaData.iteritems():
259 if key in self.nodeOptions:
260 option = {"var": key}
261 option.update(self.nodeOptions[key])
262 option["value"] = value
263 options.append(option)
264
265 return options
266
267 def _checkAuth(self, node, requestor):
268 """ Check authorisation of publishing in node for requestor """
269
270 def check(affiliation):
271 d = defer.succeed((affiliation, node))
272 configuration = node.getConfiguration()
273 publish_model = configuration[const.OPT_PUBLISH_MODEL]
274 if publish_model == const.VAL_PMODEL_PUBLISHERS:
275 if affiliation not in ['owner', 'publisher']:
276 raise error.Forbidden()
277 elif publish_model == const.VAL_PMODEL_SUBSCRIBERS:
278 if affiliation not in ['owner', 'publisher']:
279 # we are in subscribers publish model, we must check that
280 # the requestor is a subscriber to allow him to publish
281
282 def checkSubscription(subscribed):
283 if not subscribed:
284 raise error.Forbidden()
285 return (affiliation, node)
286
287 d.addCallback(lambda ignore: node.isSubscribed(requestor))
288 d.addCallback(checkSubscription)
289 elif publish_model != const.VAL_PMODEL_OPEN:
290 raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open.
291
292 return d
293
294 d = node.getAffiliation(requestor)
295 d.addCallback(check)
296 return d
297
298 def parseItemConfig(self, item):
299 """Get and remove item configuration information
300
301 @param item (domish.Element): item to parse
302 @return (tuple[unicode, dict)): (access_model, item_config)
303 """
304 item_config = None
305 access_model = const.VAL_AMODEL_DEFAULT
306 for idx, elt in enumerate(item.elements()):
307 if elt.uri != data_form.NS_X_DATA or elt.name != 'x':
308 continue
309 form = data_form.Form.fromElement(elt)
310 if form.formNamespace == const.NS_ITEM_CONFIG:
311 item_config = form
312 del item.children[idx] #we need to remove the config from item
313 break
314
315 if item_config:
316 access_model = item_config.get(const.OPT_ACCESS_MODEL, const.VAL_AMODEL_DEFAULT)
317 return (access_model, item_config)
318
319 def parseCategories(self, item_elt):
320 """Check if item contain an atom entry, and parse categories if possible
321
322 @param item_elt (domish.Element): item to parse
323 @return (list): list of found categories
324 """
325 categories = []
326 try:
327 entry_elt = item_elt.elements(const.NS_ATOM, "entry").next()
328 except StopIteration:
329 return categories
330
331 for category_elt in entry_elt.elements(const.NS_ATOM, 'category'):
332 category = category_elt.getAttribute('term')
333 if category:
334 categories.append(category)
335
336 return categories
337
338 def enforceSchema(self, item_elt, schema, affiliation):
339 """modifify item according to element, or refuse publishing
340
341 @param item_elt(domish.Element): item to check/modify
342 @param schema(domish.Eement): schema to enfore
343 @param affiliation(unicode): affiliation of the publisher
344 """
345 try:
346 x_elt = next(item_elt.elements(data_form.NS_X_DATA, 'x'))
347 item_form = data_form.Form.fromElement(x_elt)
348 except (StopIteration, data_form.Error):
349 raise pubsub.BadRequest(text="node has a schema but item has no form")
350 else:
351 item_elt.children.remove(x_elt)
352
353 schema_form = data_form.Form.fromElement(schema)
354
355 # we enforce restrictions
356 for field_elt in schema.elements(data_form.NS_X_DATA, 'field'):
357 var = field_elt['var']
358 for restrict_elt in field_elt.elements(const.NS_SCHEMA_RESTRICT, 'restrict'):
359 write_restriction = restrict_elt.attributes.get('write')
360 if write_restriction is not None:
361 if write_restriction == 'owner':
362 if affiliation != 'owner':
363 # write is not allowed on this field, we use default value
364 # we can safely use Field from schema_form because
365 # we have created this instance only for this method
366 try:
367 item_form.removeField(item_form.fields[var])
368 except KeyError:
369 pass
370 item_form.addField(schema_form.fields[var])
371 else:
372 raise StanzaError('feature-not-implemented', text='unknown write restriction {}'.format(write_restriction))
373
374 # we now remove every field which is not in data schema
375 to_remove = set()
376 for item_var, item_field in item_form.fields.iteritems():
377 if item_var not in schema_form.fields:
378 to_remove.add(item_field)
379
380 for field in to_remove:
381 item_form.removeField(field)
382 item_elt.addChild(item_form.toElement())
383
384 def _checkOverwrite(self, node, itemIdentifiers, publisher):
385 """Check that publisher can overwrite items
386
387 current publisher must correspond to each item publisher
388 """
389 def doCheck(item_pub_map):
390 for item_publisher in item_pub_map.itervalues():
391 if item_publisher.userhost() != publisher.userhost():
392 raise error.ItemForbidden()
393
394 d = node.getItemsPublishers(itemIdentifiers)
395 d.addCallback(doCheck)
396 return d
397
398 def publish(self, nodeIdentifier, items, requestor, pep, recipient):
399 d = self.storage.getNode(nodeIdentifier, pep, recipient)
400 d.addCallback(self._checkAuth, requestor)
401 #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster.
402 #FIXME: in addition, there can be several owners: that is not managed yet
403 d.addCallback(self._doPublish, items, requestor, pep, recipient)
404 return d
405
406 @defer.inlineCallbacks
407 def _doPublish(self, result, items, requestor, pep, recipient):
408 affiliation, node = result
409 if node.nodeType == 'collection':
410 raise error.NoPublishing()
411
412 configuration = node.getConfiguration()
413 persistItems = configuration[const.OPT_PERSIST_ITEMS]
414 deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS]
415
416 if items and not persistItems and not deliverPayloads:
417 raise error.ItemForbidden()
418 elif not items and (persistItems or deliverPayloads):
419 raise error.ItemRequired()
420
421 items_data = []
422 check_overwrite = False
423 ret_payload = None # payload returned, None or domish.Element
424 for item in items:
425 # we enforce publisher (cf XEP-0060 §7.1.2.3)
426 item['publisher'] = requestor.full()
427 if persistItems or deliverPayloads:
428 item.uri = None
429 item.defaultUri = None
430 if not item.getAttribute("id"):
431 item["id"] = yield node.getNextId()
432 new_item = True
433 if ret_payload is None:
434 ret_pubsub_elt = domish.Element((pubsub.NS_PUBSUB, u'pubsub'))
435 ret_publish_elt = ret_pubsub_elt.addElement(u'publish')
436 ret_publish_elt[u'node'] = node.nodeIdentifier
437 ret_payload = ret_pubsub_elt
438 ret_publish_elt = ret_payload.publish
439 ret_item_elt = ret_publish_elt.addElement(u'item')
440 ret_item_elt["id"] = item[u"id"]
441 else:
442 check_overwrite = True
443 new_item = False
444 access_model, item_config = self.parseItemConfig(item)
445 categories = self.parseCategories(item)
446 schema = node.getSchema()
447 if schema is not None:
448 self.enforceSchema(item, schema, affiliation)
449 items_data.append(container.ItemData(item, access_model, item_config, categories, new=new_item))
450
451 if persistItems:
452
453 if check_overwrite:
454 itemIdentifiers = [item['id'] for item in items
455 if item.getAttribute('id')]
456
457 if affiliation == 'owner' or self.isAdmin(requestor):
458 if configuration[const.OPT_CONSISTENT_PUBLISHER]:
459 pub_map = yield node.getItemsPublishers(itemIdentifiers)
460 publishers = set(pub_map.values())
461 if len(publishers) != 1:
462 # TODO: handle multiple items publishing (from several
463 # publishers)
464 raise error.NoPublishing(
465 u"consistent_publisher is currently only possible when "
466 u"publishing items from a single publisher. Try to "
467 u"publish one item at a time")
468 # we replace requestor and new payload's publisher by original
469 # item publisher to keep publisher consistent
470 requestor = publishers.pop()
471 for item in items:
472 item['publisher'] = requestor.full()
473 else:
474 # we don't want a publisher to overwrite the item
475 # of an other publisher
476 yield self._checkOverwrite(node, itemIdentifiers, requestor)
477
478 # TODO: check conflict and recalculate max id if serial_ids is set
479 yield node.storeItems(items_data, requestor)
480
481 yield self._doNotify(node, items_data, deliverPayloads, pep, recipient)
482 defer.returnValue(ret_payload)
483
484 def _doNotify(self, node, items_data, deliverPayloads, pep, recipient):
485 if items_data and not deliverPayloads:
486 for item_data in items_data:
487 item_data.item.children = []
488 self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient},
489 '//event/pubsub/notify')
490
491 def getNotifications(self, node, items_data):
492 """Build a list of subscriber to the node
493
494 subscribers will be associated with subscribed items,
495 and subscription type.
496 """
497
498 def toNotifications(subscriptions, items_data):
499 subsBySubscriber = {}
500 for subscription in subscriptions:
501 if subscription.options.get('pubsub#subscription_type',
502 'items') == 'items':
503 subs = subsBySubscriber.setdefault(subscription.subscriber,
504 set())
505 subs.add(subscription)
506
507 notifications = [(subscriber, subscriptions_, items_data)
508 for subscriber, subscriptions_
509 in subsBySubscriber.iteritems()]
510
511 return notifications
512
513 def rootNotFound(failure):
514 failure.trap(error.NodeNotFound)
515 return []
516
517 d1 = node.getSubscriptions('subscribed')
518 # FIXME: must add root node subscriptions ?
519 # d2 = self.storage.getNode('', False) # FIXME: to check
520 # d2.addCallback(lambda node: node.getSubscriptions('subscribed'))
521 # d2.addErrback(rootNotFound)
522 # d = defer.gatherResults([d1, d2])
523 # d.addCallback(lambda result: result[0] + result[1])
524 d1.addCallback(toNotifications, items_data)
525 return d1
526
527 def registerPublishNotifier(self, observerfn, *args, **kwargs):
528 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
529
530 def registerRetractNotifier(self, observerfn, *args, **kwargs):
531 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs)
532
533 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
534 subscriberEntity = subscriber.userhostJID()
535 if subscriberEntity != requestor.userhostJID():
536 return defer.fail(error.Forbidden())
537
538 d = self.storage.getNode(nodeIdentifier, pep, recipient)
539 d.addCallback(_getAffiliation, subscriberEntity)
540 d.addCallback(self._doSubscribe, subscriber, pep, recipient)
541 return d
542
543 def _doSubscribe(self, result, subscriber, pep, recipient):
544 node, affiliation = result
545
546 if affiliation == 'outcast':
547 raise error.Forbidden()
548
549 access_model = node.getAccessModel()
550
551 if access_model == const.VAL_AMODEL_OPEN:
552 d = defer.succeed(None)
553 elif access_model == const.VAL_AMODEL_PRESENCE:
554 d = self.checkPresenceSubscription(node, subscriber)
555 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
556 d = self.checkRosterGroups(node, subscriber)
557 elif access_model == const.VAL_AMODEL_WHITELIST:
558 d = self.checkNodeAffiliations(node, subscriber)
559 else:
560 raise NotImplementedError
561
562 def trapExists(failure):
563 failure.trap(error.SubscriptionExists)
564 return False
565
566 def cb(sendLast):
567 d = node.getSubscription(subscriber)
568 if sendLast:
569 d.addCallback(self._sendLastPublished, node, pep, recipient)
570 return d
571
572 d.addCallback(lambda _: node.addSubscription(subscriber, 'subscribed', {}))
573 d.addCallbacks(lambda _: True, trapExists)
574 d.addCallback(cb)
575
576 return d
577
578 def _sendLastPublished(self, subscription, node, pep, recipient):
579
580 def notifyItem(items_data):
581 if items_data:
582 reactor.callLater(0, self.dispatch,
583 {'items_data': items_data,
584 'node': node,
585 'pep': pep,
586 'recipient': recipient,
587 'subscription': subscription,
588 },
589 '//event/pubsub/notify')
590
591 config = node.getConfiguration()
592 sendLastPublished = config.get('pubsub#send_last_published_item',
593 'never')
594 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
595 entity = subscription.subscriber.userhostJID()
596 d = self.getItemsData(node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep})
597 d.addCallback(notifyItem)
598 d.addErrback(log.err)
599
600 return subscription
601
602 def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
603 if subscriber.userhostJID() != requestor.userhostJID():
604 return defer.fail(error.Forbidden())
605
606 d = self.storage.getNode(nodeIdentifier, pep, recipient)
607 d.addCallback(lambda node: node.removeSubscription(subscriber))
608 return d
609
610 def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient):
611 """retrieve subscriptions of an entity
612
613 @param requestor(jid.JID): entity who want to check subscriptions
614 @param nodeIdentifier(unicode, None): identifier of the node
615 node to get all subscriptions of a service
616 @param pep(bool): True if it's a PEP request
617 @param recipient(jid.JID, None): recipient of the PEP request
618 """
619 return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient)
620
621 def supportsAutoCreate(self):
622 return True
623
624 def supportsCreatorCheck(self):
625 return True
626
627 def supportsInstantNodes(self):
628 return True
629
630 def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None):
631 if not nodeIdentifier:
632 nodeIdentifier = 'generic/%s' % uuid.uuid4()
633
634 if not options:
635 options = {}
636
637 # if self.supportsCreatorCheck():
638 # groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX)
639 # try:
640 # nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier)
641 # except InvalidFormat:
642 # is_user_jid = False
643 # else:
644 # is_user_jid = bool(nodeIdentifierJID.user)
645
646 # if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID():
647 # #we have an user jid node, but not created by the owner of this jid
648 # print "Wrong creator"
649 # raise error.Forbidden()
650
651 nodeType = 'leaf'
652 config = self.storage.getDefaultConfiguration(nodeType)
653 config['pubsub#node_type'] = nodeType
654 config.update(options)
655
656 # TODO: handle schema on creation
657 d = self.storage.createNode(nodeIdentifier, requestor, config, None, pep, recipient)
658 d.addCallback(lambda _: nodeIdentifier)
659 return d
660
661 def getDefaultConfiguration(self, nodeType):
662 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType))
663 return d
664
665 def getNodeConfiguration(self, nodeIdentifier, pep, recipient):
666 if not nodeIdentifier:
667 return defer.fail(error.NoRootNode())
668
669 d = self.storage.getNode(nodeIdentifier, pep, recipient)
670 d.addCallback(lambda node: node.getConfiguration())
671
672 return d
673
674 def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient):
675 if not nodeIdentifier:
676 return defer.fail(error.NoRootNode())
677
678 d = self.storage.getNode(nodeIdentifier, pep, recipient)
679 d.addCallback(_getAffiliation, requestor)
680 d.addCallback(self._doSetNodeConfiguration, requestor, options)
681 return d
682
683 def _doSetNodeConfiguration(self, result, requestor, options):
684 node, affiliation = result
685
686 if affiliation != 'owner' and not self.isAdmin(requestor):
687 raise error.Forbidden()
688
689 return node.setConfiguration(options)
690
691 def getNodeSchema(self, nodeIdentifier, pep, recipient):
692 if not nodeIdentifier:
693 return defer.fail(error.NoRootNode())
694
695 d = self.storage.getNode(nodeIdentifier, pep, recipient)
696 d.addCallback(lambda node: node.getSchema())
697
698 return d
699
700 def setNodeSchema(self, nodeIdentifier, schema, requestor, pep, recipient):
701 """set or remove Schema of a node
702
703 @param nodeIdentifier(unicode): identifier of the pubusb node
704 @param schema(domish.Element, None): schema to set
705 None to remove schema
706 @param requestor(jid.JID): entity doing the request
707 @param pep(bool): True if it's a PEP request
708 @param recipient(jid.JID, None): recipient of the PEP request
709 """
710 if not nodeIdentifier:
711 return defer.fail(error.NoRootNode())
712
713 d = self.storage.getNode(nodeIdentifier, pep, recipient)
714 d.addCallback(_getAffiliation, requestor)
715 d.addCallback(self._doSetNodeSchema, requestor, schema)
716 return d
717
718 def _doSetNodeSchema(self, result, requestor, schema):
719 node, affiliation = result
720
721 if affiliation != 'owner' and not self.isAdmin(requestor):
722 raise error.Forbidden()
723
724 return node.setSchema(schema)
725
726 def getAffiliations(self, entity, nodeIdentifier, pep, recipient):
727 return self.storage.getAffiliations(entity, nodeIdentifier, pep, recipient)
728
729 def getAffiliationsOwner(self, nodeIdentifier, requestor, pep, recipient):
730 d = self.storage.getNode(nodeIdentifier, pep, recipient)
731 d.addCallback(_getAffiliation, requestor)
732 d.addCallback(self._doGetAffiliationsOwner, requestor)
733 return d
734
735 def _doGetAffiliationsOwner(self, result, requestor):
736 node, affiliation = result
737
738 if affiliation != 'owner' and not self.isAdmin(requestor):
739 raise error.Forbidden()
740 return node.getAffiliations()
741
742 def setAffiliationsOwner(self, nodeIdentifier, requestor, affiliations, pep, recipient):
743 d = self.storage.getNode(nodeIdentifier, pep, recipient)
744 d.addCallback(_getAffiliation, requestor)
745 d.addCallback(self._doSetAffiliationsOwner, requestor, affiliations)
746 return d
747
748 def _doSetAffiliationsOwner(self, result, requestor, affiliations):
749 # Check that requestor is allowed to set affiliations, and delete entities
750 # with "none" affiliation
751
752 # TODO: return error with failed affiliations in case of failure
753 node, requestor_affiliation = result
754
755 if requestor_affiliation != 'owner' and not self.isAdmin(requestor):
756 raise error.Forbidden()
757
758 # we don't allow requestor to change its own affiliation
759 requestor_bare = requestor.userhostJID()
760 if requestor_bare in affiliations and affiliations[requestor_bare] != 'owner':
761 # FIXME: it may be interesting to allow the owner to ask for ownership removal
762 # if at least one other entity is owner for this node
763 raise error.Forbidden("You can't change your own affiliation")
764
765 to_delete = [jid_ for jid_, affiliation in affiliations.iteritems() if affiliation == 'none']
766 for jid_ in to_delete:
767 del affiliations[jid_]
768
769 if to_delete:
770 d = node.deleteAffiliations(to_delete)
771 if affiliations:
772 d.addCallback(lambda dummy: node.setAffiliations(affiliations))
773 else:
774 d = node.setAffiliations(affiliations)
775
776 return d
777
778 def getSubscriptionsOwner(self, nodeIdentifier, requestor, pep, recipient):
779 d = self.storage.getNode(nodeIdentifier, pep, recipient)
780 d.addCallback(_getAffiliation, requestor)
781 d.addCallback(self._doGetSubscriptionsOwner, requestor)
782 return d
783
784 def _doGetSubscriptionsOwner(self, result, requestor):
785 node, affiliation = result
786
787 if affiliation != 'owner' and not self.isAdmin(requestor):
788 raise error.Forbidden()
789 return node.getSubscriptions()
790
791 def setSubscriptionsOwner(self, nodeIdentifier, requestor, subscriptions, pep, recipient):
792 d = self.storage.getNode(nodeIdentifier, pep, recipient)
793 d.addCallback(_getAffiliation, requestor)
794 d.addCallback(self._doSetSubscriptionsOwner, requestor, subscriptions)
795 return d
796
797 def unwrapFirstError(self, failure):
798 failure.trap(defer.FirstError)
799 return failure.value.subFailure
800
801 def _doSetSubscriptionsOwner(self, result, requestor, subscriptions):
802 # Check that requestor is allowed to set subscriptions, and delete entities
803 # with "none" subscription
804
805 # TODO: return error with failed subscriptions in case of failure
806 node, requestor_affiliation = result
807
808 if requestor_affiliation != 'owner' and not self.isAdmin(requestor):
809 raise error.Forbidden()
810
811 d_list = []
812
813 for subscription in subscriptions.copy():
814 if subscription.state == 'none':
815 subscriptions.remove(subscription)
816 d_list.append(node.removeSubscription(subscription.subscriber))
817
818 if subscriptions:
819 d_list.append(node.setSubscriptions(subscriptions))
820
821 d = defer.gatherResults(d_list, consumeErrors=True)
822 d.addCallback(lambda _: None)
823 d.addErrback(self.unwrapFirstError)
824 return d
825
826 def filterItemsWithSchema(self, items_data, schema, owner):
827 """check schema restriction and remove fields/items if they don't comply
828
829 @param items_data(list[ItemData]): items to filter
830 items in this list will be modified
831 @param schema(domish.Element): node schema
832 @param owner(bool): True is requestor is a owner of the node
833 """
834 fields_to_remove = set()
835 for field_elt in schema.elements(data_form.NS_X_DATA, 'field'):
836 for restrict_elt in field_elt.elements(const.NS_SCHEMA_RESTRICT, 'restrict'):
837 read_restriction = restrict_elt.attributes.get('read')
838 if read_restriction is not None:
839 if read_restriction == 'owner':
840 if not owner:
841 fields_to_remove.add(field_elt['var'])
842 else:
843 raise StanzaError('feature-not-implemented', text='unknown read restriction {}'.format(read_restriction))
844 items_to_remove = []
845 for idx, item_data in enumerate(items_data):
846 item_elt = item_data.item
847 try:
848 x_elt = next(item_elt.elements(data_form.NS_X_DATA, 'x'))
849 except StopIteration:
850 log.msg("WARNING, item {id} has a schema but no form, ignoring it")
851 items_to_remove.append(item_data)
852 continue
853 form = data_form.Form.fromElement(x_elt)
854 # we remove fields which are not visible for this user
855 for field in fields_to_remove:
856 try:
857 form.removeField(form.fields[field])
858 except KeyError:
859 continue
860 item_elt.children.remove(x_elt)
861 item_elt.addChild(form.toElement())
862
863 for item_data in items_to_remove:
864 items_data.remove(item_data)
865
866 def checkPresenceSubscription(self, node, requestor):
867 """check if requestor has presence subscription from node owner
868
869 @param node(Node): node to check
870 @param requestor(jid.JID): entity who want to access node
871 """
872 def gotRoster(roster):
873 if roster is None:
874 raise error.Forbidden()
875
876 if requestor not in roster:
877 raise error.Forbidden()
878
879 if not roster[requestor].subscriptionFrom:
880 raise error.Forbidden()
881
882 d = self.getOwnerRoster(node)
883 d.addCallback(gotRoster)
884 return d
885
886 @defer.inlineCallbacks
887 def checkRosterGroups(self, node, requestor):
888 """check if requestor is in allowed groups of a node
889
890 @param node(Node): node to check
891 @param requestor(jid.JID): entity who want to access node
892 """
893 roster = yield self.getOwnerRoster(node)
894
895 if roster is None:
896 raise error.Forbidden()
897
898 if requestor not in roster:
899 raise error.Forbidden()
900
901 authorized_groups = yield node.getAuthorizedGroups()
902
903 if not roster[requestor].groups.intersection(authorized_groups):
904 # requestor is in roster but not in one of the allowed groups
905 raise error.Forbidden()
906
907 def checkNodeAffiliations(self, node, requestor):
908 """check if requestor is in white list of a node
909
910 @param node(Node): node to check
911 @param requestor(jid.JID): entity who want to access node
912 """
913 def gotAffiliations(affiliations):
914 try:
915 affiliation = affiliations[requestor.userhostJID()]
916 except KeyError:
917 raise error.Forbidden()
918 else:
919 if affiliation not in ('owner', 'publisher', 'member'):
920 raise error.Forbidden()
921
922 d = node.getAffiliations()
923 d.addCallback(gotAffiliations)
924 return d
925
926 @defer.inlineCallbacks
927 def checkNodeAccess(self, node, requestor):
928 """check if a requestor can access data of a node
929
930 @param node(Node): node to check
931 @param requestor(jid.JID): entity who want to access node
932 @return (tuple): permissions data with:
933 - owner(bool): True if requestor is owner of the node
934 - roster(None, ): roster of the requestor
935 None if not needed/available
936 - access_model(str): access model of the node
937 @raise error.Forbidden: access is not granted
938 @raise error.NotLeafNodeError: this node is not a leaf
939 """
940 node, affiliation = yield _getAffiliation(node, requestor)
941
942 if not iidavoll.ILeafNode.providedBy(node):
943 raise error.NotLeafNodeError()
944
945 if affiliation == 'outcast':
946 raise error.Forbidden()
947
948 # node access check
949 owner = affiliation == 'owner'
950 access_model = node.getAccessModel()
951 roster = None
952
953 if access_model == const.VAL_AMODEL_OPEN or owner:
954 pass
955 elif access_model == const.VAL_AMODEL_PRESENCE:
956 yield self.checkPresenceSubscription(node, requestor)
957 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
958 # FIXME: for node, access should be renamed owner-roster, not publisher
959 yield self.checkRosterGroups(node, requestor)
960 elif access_model == const.VAL_AMODEL_WHITELIST:
961 yield self.checkNodeAffiliations(node, requestor)
962 else:
963 raise Exception(u"Unknown access_model")
964
965 defer.returnValue((affiliation, owner, roster, access_model))
966
967 @defer.inlineCallbacks
968 def getItemsIds(self, nodeIdentifier, requestor, authorized_groups, unrestricted, maxItems=None, ext_data=None, pep=False, recipient=None):
969 # FIXME: items access model are not checked
970 # TODO: check items access model
971 node = yield self.storage.getNode(nodeIdentifier, pep, recipient)
972 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor)
973 ids = yield node.getItemsIds(authorized_groups,
974 unrestricted,
975 maxItems,
976 ext_data)
977 defer.returnValue(ids)
978
979 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None,
980 itemIdentifiers=None, ext_data=None):
981 d = self.getItemsData(nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data)
982 d.addCallback(lambda items_data: [item_data.item for item_data in items_data])
983 return d
984
985 @defer.inlineCallbacks
986 def getOwnerRoster(self, node, owners=None):
987 # FIXME: roster of publisher, not owner, must be used
988 if owners is None:
989 owners = yield node.getOwners()
990
991 if len(owners) != 1:
992 log.msg('publisher-roster access is not allowed with more than 1 owner')
993 return
994
995 owner_jid = owners[0]
996
997 try:
998 roster = yield self.privilege.getRoster(owner_jid)
999 except Exception as e:
1000 log.msg("Error while getting roster of {owner_jid}: {msg}".format(
1001 owner_jid = owner_jid.full(),
1002 msg = e))
1003 return
1004 defer.returnValue(roster)
1005
1006 @defer.inlineCallbacks
1007 def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None,
1008 itemIdentifiers=None, ext_data=None):
1009 """like getItems but return the whole ItemData"""
1010 if maxItems == 0:
1011 log.msg("WARNING: maxItems=0 on items retrieval")
1012 defer.returnValue([])
1013
1014 if ext_data is None:
1015 ext_data = {}
1016 node = yield self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
1017 try:
1018 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor)
1019 except error.NotLeafNodeError:
1020 defer.returnValue([])
1021
1022 # at this point node access is checked
1023
1024 if owner:
1025 # requestor_groups is only used in restricted access
1026 requestor_groups = None
1027 else:
1028 if roster is None:
1029 # FIXME: publisher roster should be used, not owner
1030 roster = yield self.getOwnerRoster(node)
1031 if roster is None:
1032 roster = {}
1033 roster_item = roster.get(requestor.userhostJID())
1034 requestor_groups = tuple(roster_item.groups) if roster_item else tuple()
1035
1036 if itemIdentifiers:
1037 items_data = yield node.getItemsById(requestor_groups, owner, itemIdentifiers)
1038 else:
1039 items_data = yield node.getItems(requestor_groups, owner, maxItems, ext_data)
1040
1041 if owner:
1042 # Add item config data form to items with roster access model
1043 for item_data in items_data:
1044 if item_data.access_model == const.VAL_AMODEL_OPEN:
1045 pass
1046 elif item_data.access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
1047 form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG)
1048 access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_PUBLISHER_ROSTER)
1049 allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=item_data.config[const.OPT_ROSTER_GROUPS_ALLOWED])
1050 form.addField(access)
1051 form.addField(allowed)
1052 item_data.item.addChild(form.toElement())
1053 elif access_model == const.VAL_AMODEL_WHITELIST:
1054 #FIXME
1055 raise NotImplementedError
1056 else:
1057 raise error.BadAccessTypeError(access_model)
1058
1059 schema = node.getSchema()
1060 if schema is not None:
1061 self.filterItemsWithSchema(items_data, schema, owner)
1062
1063 yield self._items_rsm(items_data, node, requestor_groups, owner, itemIdentifiers, ext_data)
1064 defer.returnValue(items_data)
1065
1066 def _setCount(self, value, response):
1067 response.count = value
1068
1069 def _setIndex(self, value, response, adjust):
1070 """Set index in RSM response
1071
1072 @param value(int): value of the reference index (i.e. before or after item)
1073 @param response(RSMResponse): response instance to fill
1074 @param adjust(int): adjustement term (i.e. difference between reference index and first item of the result)
1075 """
1076 response.index = value + adjust
1077
1078 def _items_rsm(self, items_data, node, authorized_groups, owner,
1079 itemIdentifiers, ext_data):
1080 # FIXME: move this to a separate module
1081 # TODO: Index can be optimized by keeping a cache of the last RSM request
1082 # An other optimisation would be to look for index first and use it as offset
1083 try:
1084 rsm_request = ext_data['rsm']
1085 except KeyError:
1086 # No RSM in this request, nothing to do
1087 return items_data
1088
1089 if itemIdentifiers:
1090 log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part")
1091 return items_data
1092
1093 response = rsm.RSMResponse()
1094
1095 d_count = node.getItemsCount(authorized_groups, owner, ext_data)
1096 d_count.addCallback(self._setCount, response)
1097 d_list = [d_count]
1098
1099 if items_data:
1100 response.first = items_data[0].item['id']
1101 response.last = items_data[-1].item['id']
1102
1103 # index handling
1104 if rsm_request.index is not None:
1105 response.index = rsm_request.index
1106 elif rsm_request.before:
1107 # The last page case (before == '') is managed in render method
1108 d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data)
1109 d_index.addCallback(self._setIndex, response, -len(items_data))
1110 d_list.append(d_index)
1111 elif rsm_request.after is not None:
1112 d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data)
1113 d_index.addCallback(self._setIndex, response, 1)
1114 d_list.append(d_index)
1115 else:
1116 # the first page was requested
1117 response.index = 0
1118
1119 def render(result):
1120 if rsm_request.before == '':
1121 # the last page was requested
1122 response.index = response.count - len(items_data)
1123 items_data.append(container.ItemData(response.toElement()))
1124 return items_data
1125
1126 return defer.DeferredList(d_list).addCallback(render)
1127
1128 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
1129 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1130 d.addCallback(_getAffiliation, requestor)
1131 d.addCallback(self._doRetract, itemIdentifiers, requestor, notify, pep, recipient)
1132 return d
1133
1134 def _doRetract(self, result, itemIdentifiers, requestor, notify, pep, recipient):
1135 node, affiliation = result
1136 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
1137
1138 if not persistItems:
1139 raise error.NodeNotPersistent()
1140
1141 # we need to get the items before removing them, for the notifications
1142
1143 def removeItems(items_data):
1144 """Remove the items and keep only actually removed ones in items_data"""
1145 d = node.removeItems(itemIdentifiers)
1146 d.addCallback(lambda removed: [item_data for item_data in items_data if item_data.item["id"] in removed])
1147 return d
1148
1149 def checkPublishers(publishers_map):
1150 """Called when requestor is neither owner neither publisher of the Node
1151
1152 We check that requestor is publisher of all the items he wants to retract
1153 and raise error.Forbidden if it is not the case
1154 """
1155 # TODO: the behaviour should be configurable (per node ?)
1156 if (any((requestor.userhostJID() != publisher.userhostJID()
1157 for publisher in publishers_map.itervalues()))
1158 and not self.isAdmin(requestor)
1159 ):
1160 raise error.Forbidden()
1161
1162 if affiliation in ['owner', 'publisher']:
1163 # the requestor is owner or publisher of the node
1164 # he can retract what he wants
1165 d = defer.succeed(None)
1166 else:
1167 # the requestor doesn't have right to retract on the whole node
1168 # we check if he is a publisher for all items he wants to retract
1169 # and forbid the retraction else.
1170 d = node.getItemsPublishers(itemIdentifiers)
1171 d.addCallback(checkPublishers)
1172 d.addCallback(lambda dummy: node.getItemsById(None, True, itemIdentifiers))
1173 d.addCallback(removeItems)
1174
1175 if notify:
1176 d.addCallback(self._doNotifyRetraction, node, pep, recipient)
1177 return d
1178
1179 def _doNotifyRetraction(self, items_data, node, pep, recipient):
1180 self.dispatch({'items_data': items_data,
1181 'node': node,
1182 'pep': pep,
1183 'recipient': recipient},
1184 '//event/pubsub/retract')
1185
1186 def purgeNode(self, nodeIdentifier, requestor, pep, recipient):
1187 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1188 d.addCallback(_getAffiliation, requestor)
1189 d.addCallback(self._doPurge, requestor)
1190 return d
1191
1192 def _doPurge(self, result, requestor):
1193 node, affiliation = result
1194 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
1195
1196 if affiliation != 'owner' and not self.isAdmin(requestor):
1197 raise error.Forbidden()
1198
1199 if not persistItems:
1200 raise error.NodeNotPersistent()
1201
1202 d = node.purge()
1203 d.addCallback(self._doNotifyPurge, node.nodeIdentifier)
1204 return d
1205
1206 def _doNotifyPurge(self, result, nodeIdentifier):
1207 self.dispatch(nodeIdentifier, '//event/pubsub/purge')
1208
1209 def registerPreDelete(self, preDeleteFn):
1210 self._callbackList.append(preDeleteFn)
1211
1212 def getSubscribers(self, nodeIdentifier, pep, recipient):
1213 def cb(subscriptions):
1214 return [subscription.subscriber for subscription in subscriptions]
1215
1216 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1217 d.addCallback(lambda node: node.getSubscriptions('subscribed'))
1218 d.addCallback(cb)
1219 return d
1220
1221 def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None):
1222 d = self.storage.getNode(nodeIdentifier, pep, recipient)
1223 d.addCallback(_getAffiliation, requestor)
1224 d.addCallback(self._doPreDelete, requestor, redirectURI, pep, recipient)
1225 return d
1226
1227 def _doPreDelete(self, result, requestor, redirectURI, pep, recipient):
1228 node, affiliation = result
1229
1230 if affiliation != 'owner' and not self.isAdmin(requestor):
1231 raise error.Forbidden()
1232
1233 data = {'node': node,
1234 'redirectURI': redirectURI}
1235
1236 d = defer.DeferredList([cb(data, pep, recipient)
1237 for cb in self._callbackList],
1238 consumeErrors=1)
1239 d.addCallback(self._doDelete, node.nodeDbId)
1240
1241 def _doDelete(self, result, nodeDbId):
1242 dl = []
1243 for succeeded, r in result:
1244 if succeeded and r:
1245 dl.extend(r)
1246
1247 d = self.storage.deleteNodeByDbId(nodeDbId)
1248 d.addCallback(self._doNotifyDelete, dl)
1249
1250 return d
1251
1252 def _doNotifyDelete(self, result, dl):
1253 for d in dl:
1254 d.callback(None)
1255
1256
1257 class PubSubResourceFromBackend(pubsub.PubSubResource):
1258 """
1259 Adapts a backend to an xmpp publish-subscribe service.
1260 """
1261
1262 features = [
1263 "config-node",
1264 "create-nodes",
1265 "delete-any",
1266 "delete-nodes",
1267 "item-ids",
1268 "meta-data",
1269 "publish",
1270 "purge-nodes",
1271 "retract-items",
1272 "retrieve-affiliations",
1273 "retrieve-default",
1274 "retrieve-items",
1275 "retrieve-subscriptions",
1276 "subscribe",
1277 ]
1278
1279 discoIdentity = disco.DiscoIdentity('pubsub',
1280 'service',
1281 u'Salut à Toi pubsub service')
1282
1283 pubsubService = None
1284
1285 _errorMap = {
1286 error.NodeNotFound: ('item-not-found', None, None),
1287 error.NodeExists: ('conflict', None, None),
1288 error.Forbidden: ('forbidden', None, None),
1289 error.NotAuthorized: ('not-authorized', None, None),
1290 error.ItemNotFound: ('item-not-found', None, None),
1291 error.ItemForbidden: ('bad-request', 'item-forbidden', None),
1292 error.ItemRequired: ('bad-request', 'item-required', None),
1293 error.NoInstantNodes: ('not-acceptable',
1294 'unsupported',
1295 'instant-nodes'),
1296 error.NotSubscribed: ('unexpected-request', 'not-subscribed', None),
1297 error.InvalidConfigurationOption: ('not-acceptable', None, None),
1298 error.InvalidConfigurationValue: ('not-acceptable', None, None),
1299 error.NodeNotPersistent: ('feature-not-implemented',
1300 'unsupported',
1301 'persistent-node'),
1302 error.NoRootNode: ('bad-request', None, None),
1303 error.NoCollections: ('feature-not-implemented',
1304 'unsupported',
1305 'collections'),
1306 error.NoPublishing: ('feature-not-implemented',
1307 'unsupported',
1308 'publish'),
1309 }
1310
1311 def __init__(self, backend):
1312 pubsub.PubSubResource.__init__(self)
1313
1314 self.backend = backend
1315 self.hideNodes = False
1316
1317 self.backend.registerPublishNotifier(self._notifyPublish)
1318 self.backend.registerRetractNotifier(self._notifyRetract)
1319 self.backend.registerPreDelete(self._preDelete)
1320
1321 # FIXME: to be removed, it's not useful anymore as PEP is now used
1322 # if self.backend.supportsCreatorCheck():
1323 # self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to
1324 # a jid in this server) is created by the right jid
1325
1326 if self.backend.supportsAutoCreate():
1327 self.features.append("auto-create")
1328
1329 if self.backend.supportsPublishOptions():
1330 self.features.append("publish-options")
1331
1332 if self.backend.supportsInstantNodes():
1333 self.features.append("instant-nodes")
1334
1335 if self.backend.supportsOutcastAffiliation():
1336 self.features.append("outcast-affiliation")
1337
1338 if self.backend.supportsPersistentItems():
1339 self.features.append("persistent-items")
1340
1341 if self.backend.supportsPublisherAffiliation():
1342 self.features.append("publisher-affiliation")
1343
1344 if self.backend.supportsGroupBlog():
1345 self.features.append("groupblog")
1346
1347
1348 # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples
1349 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277)
1350
1351 def getFullItem(self, item_data):
1352 """ Attach item configuration to this item
1353
1354 Used to give item configuration back to node's owner (and *only* to owner)
1355 """
1356 # TODO: a test should check that only the owner get the item configuration back
1357
1358 item, item_config = item_data.item, item_data.config
1359 if item_config:
1360 new_item = elementCopy(item)
1361 new_item.addChild(item_config.toElement())
1362 return new_item
1363 else:
1364 return item
1365
1366 @defer.inlineCallbacks
1367 def _notifyPublish(self, data):
1368 items_data = data['items_data']
1369 node = data['node']
1370 pep = data['pep']
1371 recipient = data['recipient']
1372
1373 owners, notifications_filtered = yield self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient)
1374
1375 # we notify the owners
1376 # FIXME: check if this comply with XEP-0060 (option needed ?)
1377 # TODO: item's access model have to be sent back to owner
1378 # TODO: same thing for getItems
1379
1380 for owner_jid in owners:
1381 notifications_filtered.append(
1382 (owner_jid,
1383 {pubsub.Subscription(node.nodeIdentifier,
1384 owner_jid,
1385 'subscribed')},
1386 [self.getFullItem(item_data) for item_data in items_data]))
1387
1388 if pep:
1389 defer.returnValue(self.backend.privilege.notifyPublish(
1390 recipient,
1391 node.nodeIdentifier,
1392 notifications_filtered))
1393
1394 else:
1395 defer.returnValue(self.pubsubService.notifyPublish(
1396 self.serviceJID,
1397 node.nodeIdentifier,
1398 notifications_filtered))
1399
1400 def _notifyRetract(self, data):
1401 items_data = data['items_data']
1402 node = data['node']
1403 pep = data['pep']
1404 recipient = data['recipient']
1405
1406 def afterPrepare(result):
1407 owners, notifications_filtered = result
1408 #we add the owners
1409
1410 for owner_jid in owners:
1411 notifications_filtered.append(
1412 (owner_jid,
1413 {pubsub.Subscription(node.nodeIdentifier,
1414 owner_jid,
1415 'subscribed')},
1416 [item_data.item for item_data in items_data]))
1417
1418 if pep:
1419 return self.backend.privilege.notifyRetract(
1420 recipient,
1421 node.nodeIdentifier,
1422 notifications_filtered)
1423
1424 else:
1425 return self.pubsubService.notifyRetract(
1426 self.serviceJID,
1427 node.nodeIdentifier,
1428 notifications_filtered)
1429
1430 d = self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient)
1431 d.addCallback(afterPrepare)
1432 return d
1433
1434 @defer.inlineCallbacks
1435 def _prepareNotify(self, items_data, node, subscription=None, pep=None, recipient=None):
1436 """Do a bunch of permissions check and filter notifications
1437
1438 The owner is not added to these notifications,
1439 it must be added by the calling method
1440 @param items_data(tuple): must contain:
1441 - item (domish.Element)
1442 - access_model (unicode)
1443 - access_list (dict as returned getItemsById, or item_config)
1444 @param node(LeafNode): node hosting the items
1445 @param subscription(pubsub.Subscription, None): TODO
1446
1447 @return (tuple): will contain:
1448 - notifications_filtered
1449 - node_owner_jid
1450 - items_data
1451 """
1452 if subscription is None:
1453 notifications = yield self.backend.getNotifications(node, items_data)
1454 else:
1455 notifications = [(subscription.subscriber, [subscription], items_data)]
1456
1457 if pep and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence'):
1458 # for PEP we need to manage automatic subscriptions (cf. XEP-0163 §4)
1459 explicit_subscribers = {subscriber for subscriber, _, _ in notifications}
1460 auto_subscribers = yield self.backend.privilege.getAutoSubscribers(recipient, node.nodeIdentifier, explicit_subscribers)
1461 for sub_jid in auto_subscribers:
1462 sub = pubsub.Subscription(node.nodeIdentifier, sub_jid, 'subscribed')
1463 notifications.append((sub_jid, [sub], items_data))
1464
1465 owners = yield node.getOwners()
1466 owner_roster = None
1467
1468 # now we check access of subscriber for each item, and keep only allowed ones
1469
1470 #we filter items not allowed for the subscribers
1471 notifications_filtered = []
1472 schema = node.getSchema()
1473
1474 for subscriber, subscriptions, items_data in notifications:
1475 subscriber_bare = subscriber.userhostJID()
1476 if subscriber_bare in owners:
1477 # as notification is always sent to owner,
1478 # we ignore owner if he is here
1479 continue
1480 allowed_items = [] #we keep only item which subscriber can access
1481
1482 if schema is not None:
1483 # we have to copy items_data because different subscribers may receive
1484 # different items (e.g. read restriction in schema)
1485 items_data = [itemDataCopy(item_data) for item_data in items_data]
1486 self.backend.filterItemsWithSchema(items_data, schema, False)
1487
1488 for item_data in items_data:
1489 item, access_model = item_data.item, item_data.access_model
1490 access_list = item_data.config
1491 if access_model == const.VAL_AMODEL_OPEN:
1492 allowed_items.append(item)
1493 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
1494 if owner_roster is None:
1495 # FIXME: publisher roster should be used, not owner
1496 owner_roster= yield self.backend.getOwnerRoster(node, owners)
1497 if owner_roster is None:
1498 owner_roster = {}
1499 if not subscriber_bare in owner_roster:
1500 continue
1501 #the subscriber is known, is he in the right group ?
1502 authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED]
1503 if owner_roster[subscriber_bare].groups.intersection(authorized_groups):
1504 allowed_items.append(item)
1505 else: #unknown access_model
1506 # TODO: white list access
1507 raise NotImplementedError
1508
1509 if allowed_items:
1510 notifications_filtered.append((subscriber, subscriptions, allowed_items))
1511
1512 defer.returnValue((owners, notifications_filtered))
1513
1514 def _preDelete(self, data, pep, recipient):
1515 nodeIdentifier = data['node'].nodeIdentifier
1516 redirectURI = data.get('redirectURI', None)
1517 d = self.backend.getSubscribers(nodeIdentifier, pep, recipient)
1518 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete(
1519 self.serviceJID,
1520 nodeIdentifier,
1521 subscribers,
1522 redirectURI))
1523 return d
1524
1525 def _mapErrors(self, failure):
1526 e = failure.trap(*self._errorMap.keys())
1527
1528 condition, pubsubCondition, feature = self._errorMap[e]
1529 msg = failure.value.msg
1530
1531 if pubsubCondition:
1532 exc = pubsub.PubSubError(condition, pubsubCondition, feature, msg)
1533 else:
1534 exc = StanzaError(condition, text=msg)
1535
1536 raise exc
1537
1538 def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None):
1539 return [] # FIXME: disabled for now, need to manage PEP
1540 if not requestor.resource:
1541 # this avoid error when getting a disco request from server during namespace delegation
1542 return []
1543 info = {}
1544
1545 def saveType(result):
1546 info['type'] = result
1547 return nodeIdentifier
1548
1549 def saveMetaData(result):
1550 info['meta-data'] = result
1551 return info
1552
1553 def trapNotFound(failure):
1554 failure.trap(error.NodeNotFound)
1555 return info
1556
1557 d = defer.succeed(nodeIdentifier)
1558 d.addCallback(self.backend.getNodeType)
1559 d.addCallback(saveType)
1560 d.addCallback(self.backend.getNodeMetaData)
1561 d.addCallback(saveMetaData)
1562 d.addErrback(trapNotFound)
1563 d.addErrback(self._mapErrors)
1564 return d
1565
1566 def getNodes(self, requestor, service, nodeIdentifier):
1567 """return nodes for disco#items
1568
1569 Pubsub/PEP nodes will be returned if disco node is not specified
1570 else Pubsub/PEP items will be returned
1571 (according to what requestor can access)
1572 """
1573 try:
1574 pep = service.pep
1575 except AttributeError:
1576 pep = False
1577
1578 if service.resource:
1579 return defer.succeed([])
1580
1581 if nodeIdentifier:
1582 d = self.backend.getItemsIds(nodeIdentifier,
1583 requestor,
1584 [],
1585 requestor.userhostJID() == service,
1586 None,
1587 None,
1588 pep,
1589 service)
1590 # items must be set as name, not node
1591 d.addCallback(lambda items: [(None, item) for item in items])
1592
1593 else:
1594 d = self.backend.getNodes(requestor.userhostJID(),
1595 pep,
1596 service)
1597 return d.addErrback(self._mapErrors)
1598
1599 def getConfigurationOptions(self):
1600 return self.backend.nodeOptions
1601
1602 def _publish_errb(self, failure, request):
1603 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
1604 print "Auto-creating node %s" % (request.nodeIdentifier,)
1605 d = self.backend.createNode(request.nodeIdentifier,
1606 request.sender,
1607 pep=self._isPep(request),
1608 recipient=request.recipient)
1609 d.addCallback(lambda ignore,
1610 request: self.backend.publish(request.nodeIdentifier,
1611 request.items,
1612 request.sender,
1613 self._isPep(request),
1614 request.recipient,
1615 ),
1616 request)
1617 return d
1618
1619 return failure
1620
1621 def _isPep(self, request):
1622 try:
1623 return request.delegated
1624 except AttributeError:
1625 return False
1626
1627 def publish(self, request):
1628 d = self.backend.publish(request.nodeIdentifier,
1629 request.items,
1630 request.sender,
1631 self._isPep(request),
1632 request.recipient)
1633 d.addErrback(self._publish_errb, request)
1634 return d.addErrback(self._mapErrors)
1635
1636 def subscribe(self, request):
1637 d = self.backend.subscribe(request.nodeIdentifier,
1638 request.subscriber,
1639 request.sender,
1640 self._isPep(request),
1641 request.recipient)
1642 return d.addErrback(self._mapErrors)
1643
1644 def unsubscribe(self, request):
1645 d = self.backend.unsubscribe(request.nodeIdentifier,
1646 request.subscriber,
1647 request.sender,
1648 self._isPep(request),
1649 request.recipient)
1650 return d.addErrback(self._mapErrors)
1651
1652 def subscriptions(self, request):
1653 d = self.backend.getSubscriptions(request.sender,
1654 request.nodeIdentifier,
1655 self._isPep(request),
1656 request.recipient)
1657 return d.addErrback(self._mapErrors)
1658
1659 def affiliations(self, request):
1660 """Retrieve affiliation for normal entity (cf. XEP-0060 §5.7)
1661
1662 retrieve all node where this jid is affiliated
1663 """
1664 d = self.backend.getAffiliations(request.sender,
1665 request.nodeIdentifier,
1666 self._isPep(request),
1667 request.recipient)
1668 return d.addErrback(self._mapErrors)
1669
1670 def create(self, request):
1671 d = self.backend.createNode(request.nodeIdentifier,
1672 request.sender, request.options,
1673 self._isPep(request),
1674 request.recipient)
1675 return d.addErrback(self._mapErrors)
1676
1677 def default(self, request):
1678 d = self.backend.getDefaultConfiguration(request.nodeType,
1679 self._isPep(request),
1680 request.sender)
1681 return d.addErrback(self._mapErrors)
1682
1683 def configureGet(self, request):
1684 d = self.backend.getNodeConfiguration(request.nodeIdentifier,
1685 self._isPep(request),
1686 request.recipient)
1687 return d.addErrback(self._mapErrors)
1688
1689 def configureSet(self, request):
1690 d = self.backend.setNodeConfiguration(request.nodeIdentifier,
1691 request.options,
1692 request.sender,
1693 self._isPep(request),
1694 request.recipient)
1695 return d.addErrback(self._mapErrors)
1696
1697 def affiliationsGet(self, request):
1698 """Retrieve affiliations for owner (cf. XEP-0060 §8.9.1)
1699
1700 retrieve all affiliations for a node
1701 """
1702 d = self.backend.getAffiliationsOwner(request.nodeIdentifier,
1703 request.sender,
1704 self._isPep(request),
1705 request.recipient)
1706 return d.addErrback(self._mapErrors)
1707
1708 def affiliationsSet(self, request):
1709 d = self.backend.setAffiliationsOwner(request.nodeIdentifier,
1710 request.sender,
1711 request.affiliations,
1712 self._isPep(request),
1713 request.recipient)
1714 return d.addErrback(self._mapErrors)
1715
1716 def subscriptionsGet(self, request):
1717 """Retrieve subscriptions for owner (cf. XEP-0060 §8.8.1)
1718
1719 retrieve all affiliations for a node
1720 """
1721 d = self.backend.getSubscriptionsOwner(request.nodeIdentifier,
1722 request.sender,
1723 self._isPep(request),
1724 request.recipient)
1725 return d.addErrback(self._mapErrors)
1726
1727 def subscriptionsSet(self, request):
1728 d = self.backend.setSubscriptionsOwner(request.nodeIdentifier,
1729 request.sender,
1730 request.subscriptions,
1731 self._isPep(request),
1732 request.recipient)
1733 return d.addErrback(self._mapErrors)
1734
1735 def items(self, request):
1736 ext_data = {}
1737 if const.FLAG_ENABLE_RSM and request.rsm is not None:
1738 if request.rsm.max < 0:
1739 raise pubsub.BadRequest(text="max can't be negative")
1740 ext_data['rsm'] = request.rsm
1741 try:
1742 ext_data['pep'] = request.delegated
1743 except AttributeError:
1744 pass
1745 ext_data['order_by'] = request.orderBy or []
1746 d = self.backend.getItems(request.nodeIdentifier,
1747 request.sender,
1748 request.recipient,
1749 request.maxItems,
1750 request.itemIdentifiers,
1751 ext_data)
1752 return d.addErrback(self._mapErrors)
1753
1754 def retract(self, request):
1755 d = self.backend.retractItem(request.nodeIdentifier,
1756 request.itemIdentifiers,
1757 request.sender,
1758 request.notify,
1759 self._isPep(request),
1760 request.recipient)
1761 return d.addErrback(self._mapErrors)
1762
1763 def purge(self, request):
1764 d = self.backend.purgeNode(request.nodeIdentifier,
1765 request.sender,
1766 self._isPep(request),
1767 request.recipient)
1768 return d.addErrback(self._mapErrors)
1769
1770 def delete(self, request):
1771 d = self.backend.deleteNode(request.nodeIdentifier,
1772 request.sender,
1773 self._isPep(request),
1774 request.recipient)
1775 return d.addErrback(self._mapErrors)
1776
1777 components.registerAdapter(PubSubResourceFromBackend,
1778 iidavoll.IBackendService,
1779 iwokkel.IPubSubResource)
1780
1781
1782
1783 class ExtraDiscoHandler(XMPPHandler):
1784 implements(iwokkel.IDisco)
1785 # see comment in twisted/plugins/pubsub.py
1786 # FIXME: upstream must be fixed so we can use custom (non pubsub#) disco features
1787
1788 def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
1789 return [disco.DiscoFeature(pubsub.NS_ORDER_BY)]
1790
1791 def getDiscoItems(self, requestor, service, nodeIdentifier=''):
1792 return []