Mercurial > libervia-pubsub
comparison src/backend.py @ 369:dabee42494ac
config file + cleaning:
- SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir).
Its options must be in the "pubsub" section
- options on command line override config options
- removed tap and http files which are not used anymore
- changed directory structure to put source in src, to be coherent with SàT and Libervia
- changed options name, db* become db_*, secret become xmpp_pwd
- an exception is raised if jid or xmpp_pwd is are not configured
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Mar 2018 12:59:38 +0100 |
parents | sat_pubsub/backend.py@618a92080812 |
children | 9a787881b824 |
comparison
equal
deleted
inserted
replaced
368:618a92080812 | 369:dabee42494ac |
---|---|
1 #!/usr/bin/python | |
2 #-*- coding: utf-8 -*- | |
3 # | |
4 # Copyright (c) 2012-2018 Jérôme Poisson | |
5 # Copyright (c) 2013-2016 Adrien Cossa | |
6 # Copyright (c) 2003-2011 Ralph Meijer | |
7 | |
8 | |
9 # This program is free software: you can redistribute it and/or modify | |
10 # it under the terms of the GNU Affero General Public License as published by | |
11 # the Free Software Foundation, either version 3 of the License, or | |
12 # (at your option) any later version. | |
13 | |
14 # This program is distributed in the hope that it will be useful, | |
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
17 # GNU Affero General Public License for more details. | |
18 | |
19 # You should have received a copy of the GNU Affero General Public License | |
20 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
21 # -- | |
22 | |
23 # This program is based on Idavoll (http://idavoll.ik.nu/), | |
24 # originaly written by Ralph Meijer (http://ralphm.net/blog/) | |
25 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original | |
26 # license. | |
27 | |
28 # -- | |
29 | |
30 # Here is a copy of the original license: | |
31 | |
32 # Copyright (c) 2003-2011 Ralph Meijer | |
33 | |
34 # Permission is hereby granted, free of charge, to any person obtaining | |
35 # a copy of this software and associated documentation files (the | |
36 # "Software"), to deal in the Software without restriction, including | |
37 # without limitation the rights to use, copy, modify, merge, publish, | |
38 # distribute, sublicense, and/or sell copies of the Software, and to | |
39 # permit persons to whom the Software is furnished to do so, subject to | |
40 # the following conditions: | |
41 | |
42 # The above copyright notice and this permission notice shall be | |
43 # included in all copies or substantial portions of the Software. | |
44 | |
45 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
46 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
47 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
48 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE | |
49 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION | |
50 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION | |
51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
52 | |
53 | |
54 """ | |
55 Generic publish-subscribe backend. | |
56 | |
57 This module implements a generic publish-subscribe backend service with | |
58 business logic as per | |
59 U{XEP-0060<http://www.xmpp.org/extensions/xep-0060.html>} that interacts with | |
60 a given storage facility. It also provides an adapter from the XMPP | |
61 publish-subscribe protocol. | |
62 """ | |
63 | |
64 import uuid | |
65 | |
66 from zope.interface import implements | |
67 | |
68 from twisted.application import service | |
69 from twisted.python import components, log | |
70 from twisted.internet import defer, reactor | |
71 from twisted.words.protocols.jabber.error import StanzaError | |
72 # from twisted.words.protocols.jabber.jid import JID, InvalidFormat | |
73 from twisted.words.xish import utility | |
74 | |
75 from wokkel import disco | |
76 from wokkel import data_form | |
77 from wokkel import rsm | |
78 from wokkel import iwokkel | |
79 from wokkel import pubsub | |
80 | |
81 from sat_pubsub import error | |
82 from sat_pubsub import iidavoll | |
83 from sat_pubsub import const | |
84 from sat_pubsub import container | |
85 | |
86 from copy import deepcopy | |
87 | |
88 | |
89 def _getAffiliation(node, entity): | |
90 d = node.getAffiliation(entity) | |
91 d.addCallback(lambda affiliation: (node, affiliation)) | |
92 return d | |
93 | |
94 | |
95 class BackendService(service.Service, utility.EventDispatcher): | |
96 """ | |
97 Generic publish-subscribe backend service. | |
98 | |
99 @cvar nodeOptions: Node configuration form as a mapping from the field | |
100 name to a dictionary that holds the field's type, label | |
101 and possible options to choose from. | |
102 @type nodeOptions: C{dict}. | |
103 @cvar defaultConfig: The default node configuration. | |
104 """ | |
105 | |
106 implements(iidavoll.IBackendService) | |
107 | |
108 nodeOptions = { | |
109 const.OPT_PERSIST_ITEMS: | |
110 {"type": "boolean", | |
111 "label": "Persist items to storage"}, | |
112 const.OPT_DELIVER_PAYLOADS: | |
113 {"type": "boolean", | |
114 "label": "Deliver payloads with event notifications"}, | |
115 const.OPT_SEND_LAST_PUBLISHED_ITEM: | |
116 {"type": "list-single", | |
117 "label": "When to send the last published item", | |
118 "options": { | |
119 "never": "Never", | |
120 "on_sub": "When a new subscription is processed"} | |
121 }, | |
122 const.OPT_ACCESS_MODEL: | |
123 {"type": "list-single", | |
124 "label": "Who can subscribe to this node", | |
125 "options": { | |
126 const.VAL_AMODEL_OPEN: "Public node", | |
127 const.VAL_AMODEL_PRESENCE: "Node restricted to entites subscribed to owner presence", | |
128 const.VAL_AMODEL_PUBLISHER_ROSTER: "Node restricted to some groups of publisher's roster", | |
129 const.VAL_AMODEL_WHITELIST: "Node restricted to some jids", | |
130 } | |
131 }, | |
132 const.OPT_ROSTER_GROUPS_ALLOWED: | |
133 {"type": "list-multi", | |
134 "label": "Groups of the roster allowed to access the node", | |
135 }, | |
136 const.OPT_PUBLISH_MODEL: | |
137 {"type": "list-single", | |
138 "label": "Who can publish to this node", | |
139 "options": { | |
140 const.VAL_PMODEL_OPEN: "Everybody can publish", | |
141 const.VAL_PMODEL_PUBLISHERS: "Only owner and publishers can publish", | |
142 const.VAL_PMODEL_SUBSCRIBERS: "Everybody which subscribed to the node", | |
143 } | |
144 }, | |
145 const.OPT_SERIAL_IDS: | |
146 {"type": "boolean", | |
147 "label": "Use serial ids"}, | |
148 } | |
149 | |
150 subscriptionOptions = { | |
151 "pubsub#subscription_type": | |
152 {"type": "list-single", | |
153 "options": { | |
154 "items": "Receive notification of new items only", | |
155 "nodes": "Receive notification of new nodes only"} | |
156 }, | |
157 "pubsub#subscription_depth": | |
158 {"type": "list-single", | |
159 "options": { | |
160 "1": "Receive notification from direct child nodes only", | |
161 "all": "Receive notification from all descendent nodes"} | |
162 }, | |
163 } | |
164 | |
165 def __init__(self, storage): | |
166 utility.EventDispatcher.__init__(self) | |
167 self.storage = storage | |
168 self._callbackList = [] | |
169 | |
170 def supportsPublishOptions(self): | |
171 return True | |
172 def supportsPublisherAffiliation(self): | |
173 return True | |
174 | |
175 def supportsGroupBlog(self): | |
176 return True | |
177 | |
178 def supportsOutcastAffiliation(self): | |
179 return True | |
180 | |
181 def supportsPersistentItems(self): | |
182 return True | |
183 | |
184 def supportsPublishModel(self): | |
185 return True | |
186 | |
187 def getNodeType(self, nodeIdentifier, pep, recipient=None): | |
188 # FIXME: manage pep and recipient | |
189 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
190 d.addCallback(lambda node: node.getType()) | |
191 return d | |
192 | |
193 def _getNodesIds(self, subscribed, pep, recipient): | |
194 # TODO: filter whitelist nodes | |
195 # TODO: handle publisher-roster (should probably be renamed to owner-roster for nodes) | |
196 if not subscribed: | |
197 allowed_accesses = {'open', 'whitelist'} | |
198 else: | |
199 allowed_accesses = {'open', 'presence', 'whitelist'} | |
200 return self.storage.getNodeIds(pep, recipient, allowed_accesses) | |
201 | |
202 def getNodes(self, requestor, pep, recipient): | |
203 if pep: | |
204 d = self.privilege.isSubscribedFrom(requestor, recipient) | |
205 d.addCallback(self._getNodesIds, pep, recipient) | |
206 return d | |
207 return self.storage.getNodeIds(pep, recipient) | |
208 | |
209 def getNodeMetaData(self, nodeIdentifier, pep, recipient=None): | |
210 # FIXME: manage pep and recipient | |
211 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
212 d.addCallback(lambda node: node.getMetaData()) | |
213 d.addCallback(self._makeMetaData) | |
214 return d | |
215 | |
216 def _makeMetaData(self, metaData): | |
217 options = [] | |
218 for key, value in metaData.iteritems(): | |
219 if key in self.nodeOptions: | |
220 option = {"var": key} | |
221 option.update(self.nodeOptions[key]) | |
222 option["value"] = value | |
223 options.append(option) | |
224 | |
225 return options | |
226 | |
227 def _checkAuth(self, node, requestor): | |
228 """ Check authorisation of publishing in node for requestor """ | |
229 | |
230 def check(affiliation): | |
231 d = defer.succeed((affiliation, node)) | |
232 configuration = node.getConfiguration() | |
233 publish_model = configuration[const.OPT_PUBLISH_MODEL] | |
234 if publish_model == const.VAL_PMODEL_PUBLISHERS: | |
235 if affiliation not in ['owner', 'publisher']: | |
236 raise error.Forbidden() | |
237 elif publish_model == const.VAL_PMODEL_SUBSCRIBERS: | |
238 if affiliation not in ['owner', 'publisher']: | |
239 # we are in subscribers publish model, we must check that | |
240 # the requestor is a subscriber to allow him to publish | |
241 | |
242 def checkSubscription(subscribed): | |
243 if not subscribed: | |
244 raise error.Forbidden() | |
245 return (affiliation, node) | |
246 | |
247 d.addCallback(lambda ignore: node.isSubscribed(requestor)) | |
248 d.addCallback(checkSubscription) | |
249 elif publish_model != const.VAL_PMODEL_OPEN: | |
250 raise ValueError('Unexpected value') # publish_model must be publishers (default), subscribers or open. | |
251 | |
252 return d | |
253 | |
254 d = node.getAffiliation(requestor) | |
255 d.addCallback(check) | |
256 return d | |
257 | |
258 def parseItemConfig(self, item): | |
259 """Get and remove item configuration information | |
260 | |
261 @param item (domish.Element): item to parse | |
262 @return (tuple[unicode, dict)): (access_model, item_config) | |
263 """ | |
264 item_config = None | |
265 access_model = const.VAL_AMODEL_DEFAULT | |
266 for idx, elt in enumerate(item.elements()): | |
267 if elt.uri != 'data_form.NS_X_DATA' or elt.name != 'x': | |
268 continue | |
269 form = data_form.Form.fromElement(elt) | |
270 if form.formNamespace == const.NS_ITEM_CONFIG: | |
271 item_config = form | |
272 del item.children[idx] #we need to remove the config from item | |
273 break | |
274 | |
275 if item_config: | |
276 access_model = item_config.get(const.OPT_ACCESS_MODEL, const.VAL_AMODEL_DEFAULT) | |
277 return (access_model, item_config) | |
278 | |
279 def parseCategories(self, item_elt): | |
280 """Check if item contain an atom entry, and parse categories if possible | |
281 | |
282 @param item_elt (domish.Element): item to parse | |
283 @return (list): list of found categories | |
284 """ | |
285 categories = [] | |
286 try: | |
287 entry_elt = item_elt.elements(const.NS_ATOM, "entry").next() | |
288 except StopIteration: | |
289 return categories | |
290 | |
291 for category_elt in entry_elt.elements(const.NS_ATOM, 'category'): | |
292 category = category_elt.getAttribute('term') | |
293 if category: | |
294 categories.append(category) | |
295 | |
296 return categories | |
297 | |
298 def enforceSchema(self, item_elt, schema, affiliation): | |
299 """modifify item according to element, or refuse publishing | |
300 | |
301 @param item_elt(domish.Element): item to check/modify | |
302 @param schema(domish.Eement): schema to enfore | |
303 @param affiliation(unicode): affiliation of the publisher | |
304 """ | |
305 try: | |
306 x_elt = next(item_elt.elements(data_form.NS_X_DATA, 'x')) | |
307 item_form = data_form.Form.fromElement(x_elt) | |
308 except (StopIteration, data_form.Error): | |
309 raise pubsub.BadRequest(text="node has a schema but item has no form") | |
310 else: | |
311 item_elt.children.remove(x_elt) | |
312 | |
313 schema_form = data_form.Form.fromElement(schema) | |
314 | |
315 # we enforce restrictions | |
316 for field_elt in schema.elements(data_form.NS_X_DATA, 'field'): | |
317 var = field_elt['var'] | |
318 for restrict_elt in field_elt.elements(const.NS_SCHEMA_RESTRICT, 'restrict'): | |
319 write_restriction = restrict_elt.attributes.get('write') | |
320 if write_restriction is not None: | |
321 if write_restriction == 'owner': | |
322 if affiliation != 'owner': | |
323 # write is not allowed on this field, we use default value | |
324 # we can safely use Field from schema_form because | |
325 # we have created this instance only for this method | |
326 try: | |
327 item_form.removeField(item_form.fields[var]) | |
328 except KeyError: | |
329 pass | |
330 item_form.addField(schema_form.fields[var]) | |
331 else: | |
332 raise StanzaError('feature-not-implemented', text='unknown write restriction {}'.format(write_restriction)) | |
333 | |
334 # we now remove every field which is not in data schema | |
335 to_remove = set() | |
336 for item_var, item_field in item_form.fields.iteritems(): | |
337 if item_var not in schema_form.fields: | |
338 to_remove.add(item_field) | |
339 | |
340 for field in to_remove: | |
341 item_form.removeField(field) | |
342 item_elt.addChild(item_form.toElement()) | |
343 | |
344 def _checkOverwrite(self, node, itemIdentifiers, publisher): | |
345 """Check that the itemIdentifiers correspond to items published | |
346 by the current publisher""" | |
347 def doCheck(item_pub_map): | |
348 for item_publisher in item_pub_map.itervalues(): | |
349 if item_publisher.userhost() != publisher.userhost(): | |
350 raise error.ItemForbidden() | |
351 | |
352 d = node.getItemsPublishers(itemIdentifiers) | |
353 d.addCallback(doCheck) | |
354 return d | |
355 | |
356 def publish(self, nodeIdentifier, items, requestor, pep, recipient): | |
357 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
358 d.addCallback(self._checkAuth, requestor) | |
359 #FIXME: owner and publisher are not necessarly the same. So far we use only owner to get roster. | |
360 #FIXME: in addition, there can be several owners: that is not managed yet | |
361 d.addCallback(self._doPublish, items, requestor, pep, recipient) | |
362 return d | |
363 | |
364 @defer.inlineCallbacks | |
365 def _doPublish(self, result, items, requestor, pep, recipient): | |
366 affiliation, node = result | |
367 if node.nodeType == 'collection': | |
368 raise error.NoPublishing() | |
369 | |
370 configuration = node.getConfiguration() | |
371 persistItems = configuration[const.OPT_PERSIST_ITEMS] | |
372 deliverPayloads = configuration[const.OPT_DELIVER_PAYLOADS] | |
373 | |
374 if items and not persistItems and not deliverPayloads: | |
375 raise error.ItemForbidden() | |
376 elif not items and (persistItems or deliverPayloads): | |
377 raise error.ItemRequired() | |
378 | |
379 items_data = [] | |
380 check_overwrite = False | |
381 for item in items: | |
382 # we enforce publisher (cf XEP-0060 §7.1.2.3) | |
383 item['publisher'] = requestor.full() | |
384 if persistItems or deliverPayloads: | |
385 item.uri = None | |
386 item.defaultUri = None | |
387 if not item.getAttribute("id"): | |
388 item["id"] = yield node.getNextId() | |
389 new_item = True | |
390 else: | |
391 check_overwrite = True | |
392 new_item = False | |
393 access_model, item_config = self.parseItemConfig(item) | |
394 categories = self.parseCategories(item) | |
395 schema = node.getSchema() | |
396 if schema is not None: | |
397 self.enforceSchema(item, schema, affiliation) | |
398 items_data.append(container.ItemData(item, access_model, item_config, categories, new=new_item)) | |
399 | |
400 if persistItems: | |
401 | |
402 if check_overwrite and affiliation != 'owner': | |
403 # we don't want a publisher to overwrite the item | |
404 # of an other publisher | |
405 yield self._checkOverwrite(node, [item['id'] for item in items if item.getAttribute('id')], requestor) | |
406 | |
407 # TODO: check conflict and recalculate max id if serial_ids is set | |
408 yield node.storeItems(items_data, requestor) | |
409 | |
410 yield self._doNotify(node, items_data, deliverPayloads, pep, recipient) | |
411 | |
412 def _doNotify(self, node, items_data, deliverPayloads, pep, recipient): | |
413 if items_data and not deliverPayloads: | |
414 for item_data in items_data: | |
415 item_data.item.children = [] | |
416 self.dispatch({'items_data': items_data, 'node': node, 'pep': pep, 'recipient': recipient}, | |
417 '//event/pubsub/notify') | |
418 | |
419 def getNotifications(self, node, items_data): | |
420 """Build a list of subscriber to the node | |
421 | |
422 subscribers will be associated with subscribed items, | |
423 and subscription type. | |
424 """ | |
425 | |
426 def toNotifications(subscriptions, items_data): | |
427 subsBySubscriber = {} | |
428 for subscription in subscriptions: | |
429 if subscription.options.get('pubsub#subscription_type', | |
430 'items') == 'items': | |
431 subs = subsBySubscriber.setdefault(subscription.subscriber, | |
432 set()) | |
433 subs.add(subscription) | |
434 | |
435 notifications = [(subscriber, subscriptions_, items_data) | |
436 for subscriber, subscriptions_ | |
437 in subsBySubscriber.iteritems()] | |
438 | |
439 return notifications | |
440 | |
441 def rootNotFound(failure): | |
442 failure.trap(error.NodeNotFound) | |
443 return [] | |
444 | |
445 d1 = node.getSubscriptions('subscribed') | |
446 # FIXME: must add root node subscriptions ? | |
447 # d2 = self.storage.getNode('', False) # FIXME: to check | |
448 # d2.addCallback(lambda node: node.getSubscriptions('subscribed')) | |
449 # d2.addErrback(rootNotFound) | |
450 # d = defer.gatherResults([d1, d2]) | |
451 # d.addCallback(lambda result: result[0] + result[1]) | |
452 d1.addCallback(toNotifications, items_data) | |
453 return d1 | |
454 | |
455 def registerPublishNotifier(self, observerfn, *args, **kwargs): | |
456 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) | |
457 | |
458 def registerRetractNotifier(self, observerfn, *args, **kwargs): | |
459 self.addObserver('//event/pubsub/retract', observerfn, *args, **kwargs) | |
460 | |
461 def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): | |
462 subscriberEntity = subscriber.userhostJID() | |
463 if subscriberEntity != requestor.userhostJID(): | |
464 return defer.fail(error.Forbidden()) | |
465 | |
466 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
467 d.addCallback(_getAffiliation, subscriberEntity) | |
468 d.addCallback(self._doSubscribe, subscriber, pep, recipient) | |
469 return d | |
470 | |
471 def _doSubscribe(self, result, subscriber, pep, recipient): | |
472 node, affiliation = result | |
473 | |
474 if affiliation == 'outcast': | |
475 raise error.Forbidden() | |
476 | |
477 access_model = node.getAccessModel() | |
478 | |
479 if access_model == const.VAL_AMODEL_OPEN: | |
480 d = defer.succeed(None) | |
481 elif access_model == const.VAL_AMODEL_PRESENCE: | |
482 d = self.checkPresenceSubscription(node, subscriber) | |
483 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | |
484 d = self.checkRosterGroups(node, subscriber) | |
485 elif access_model == const.VAL_AMODEL_WHITELIST: | |
486 d = self.checkNodeAffiliations(node, subscriber) | |
487 else: | |
488 raise NotImplementedError | |
489 | |
490 def trapExists(failure): | |
491 failure.trap(error.SubscriptionExists) | |
492 return False | |
493 | |
494 def cb(sendLast): | |
495 d = node.getSubscription(subscriber) | |
496 if sendLast: | |
497 d.addCallback(self._sendLastPublished, node, pep, recipient) | |
498 return d | |
499 | |
500 d.addCallback(lambda _: node.addSubscription(subscriber, 'subscribed', {})) | |
501 d.addCallbacks(lambda _: True, trapExists) | |
502 d.addCallback(cb) | |
503 | |
504 return d | |
505 | |
506 def _sendLastPublished(self, subscription, node, pep, recipient): | |
507 | |
508 def notifyItem(items_data): | |
509 if items_data: | |
510 reactor.callLater(0, self.dispatch, | |
511 {'items_data': items_data, | |
512 'node': node, | |
513 'pep': pep, | |
514 'recipient': recipient, | |
515 'subscription': subscription, | |
516 }, | |
517 '//event/pubsub/notify') | |
518 | |
519 config = node.getConfiguration() | |
520 sendLastPublished = config.get('pubsub#send_last_published_item', | |
521 'never') | |
522 if sendLastPublished == 'on_sub' and node.nodeType == 'leaf': | |
523 entity = subscription.subscriber.userhostJID() | |
524 d = self.getItemsData(node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep}) | |
525 d.addCallback(notifyItem) | |
526 d.addErrback(log.err) | |
527 | |
528 return subscription | |
529 | |
530 def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient): | |
531 if subscriber.userhostJID() != requestor.userhostJID(): | |
532 return defer.fail(error.Forbidden()) | |
533 | |
534 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
535 d.addCallback(lambda node: node.removeSubscription(subscriber)) | |
536 return d | |
537 | |
538 def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient): | |
539 """retrieve subscriptions of an entity | |
540 | |
541 @param requestor(jid.JID): entity who want to check subscriptions | |
542 @param nodeIdentifier(unicode, None): identifier of the node | |
543 node to get all subscriptions of a service | |
544 @param pep(bool): True if it's a PEP request | |
545 @param recipient(jid.JID, None): recipient of the PEP request | |
546 """ | |
547 return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient) | |
548 | |
549 def supportsAutoCreate(self): | |
550 return True | |
551 | |
552 def supportsCreatorCheck(self): | |
553 return True | |
554 | |
555 def supportsInstantNodes(self): | |
556 return True | |
557 | |
558 def createNode(self, nodeIdentifier, requestor, options = None, pep=False, recipient=None): | |
559 if not nodeIdentifier: | |
560 nodeIdentifier = 'generic/%s' % uuid.uuid4() | |
561 | |
562 if not options: | |
563 options = {} | |
564 | |
565 # if self.supportsCreatorCheck(): | |
566 # groupblog = nodeIdentifier.startswith(const.NS_GROUPBLOG_PREFIX) | |
567 # try: | |
568 # nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) | |
569 # except InvalidFormat: | |
570 # is_user_jid = False | |
571 # else: | |
572 # is_user_jid = bool(nodeIdentifierJID.user) | |
573 | |
574 # if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): | |
575 # #we have an user jid node, but not created by the owner of this jid | |
576 # print "Wrong creator" | |
577 # raise error.Forbidden() | |
578 | |
579 nodeType = 'leaf' | |
580 config = self.storage.getDefaultConfiguration(nodeType) | |
581 config['pubsub#node_type'] = nodeType | |
582 config.update(options) | |
583 | |
584 # TODO: handle schema on creation | |
585 d = self.storage.createNode(nodeIdentifier, requestor, config, None, pep, recipient) | |
586 d.addCallback(lambda _: nodeIdentifier) | |
587 return d | |
588 | |
589 def getDefaultConfiguration(self, nodeType): | |
590 d = defer.succeed(self.storage.getDefaultConfiguration(nodeType)) | |
591 return d | |
592 | |
593 def getNodeConfiguration(self, nodeIdentifier, pep, recipient): | |
594 if not nodeIdentifier: | |
595 return defer.fail(error.NoRootNode()) | |
596 | |
597 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
598 d.addCallback(lambda node: node.getConfiguration()) | |
599 | |
600 return d | |
601 | |
602 def setNodeConfiguration(self, nodeIdentifier, options, requestor, pep, recipient): | |
603 if not nodeIdentifier: | |
604 return defer.fail(error.NoRootNode()) | |
605 | |
606 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
607 d.addCallback(_getAffiliation, requestor) | |
608 d.addCallback(self._doSetNodeConfiguration, options) | |
609 return d | |
610 | |
611 def _doSetNodeConfiguration(self, result, options): | |
612 node, affiliation = result | |
613 | |
614 if affiliation != 'owner': | |
615 raise error.Forbidden() | |
616 | |
617 return node.setConfiguration(options) | |
618 | |
619 def getNodeSchema(self, nodeIdentifier, pep, recipient): | |
620 if not nodeIdentifier: | |
621 return defer.fail(error.NoRootNode()) | |
622 | |
623 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
624 d.addCallback(lambda node: node.getSchema()) | |
625 | |
626 return d | |
627 | |
628 def setNodeSchema(self, nodeIdentifier, schema, requestor, pep, recipient): | |
629 """set or remove Schema of a node | |
630 | |
631 @param nodeIdentifier(unicode): identifier of the pubusb node | |
632 @param schema(domish.Element, None): schema to set | |
633 None to remove schema | |
634 @param requestor(jid.JID): entity doing the request | |
635 @param pep(bool): True if it's a PEP request | |
636 @param recipient(jid.JID, None): recipient of the PEP request | |
637 """ | |
638 if not nodeIdentifier: | |
639 return defer.fail(error.NoRootNode()) | |
640 | |
641 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
642 d.addCallback(_getAffiliation, requestor) | |
643 d.addCallback(self._doSetNodeSchema, schema) | |
644 return d | |
645 | |
646 def _doSetNodeSchema(self, result, schema): | |
647 node, affiliation = result | |
648 | |
649 if affiliation != 'owner': | |
650 raise error.Forbidden() | |
651 | |
652 return node.setSchema(schema) | |
653 | |
654 def getAffiliations(self, entity, nodeIdentifier, pep, recipient): | |
655 return self.storage.getAffiliations(entity, nodeIdentifier, pep, recipient) | |
656 | |
657 def getAffiliationsOwner(self, nodeIdentifier, requestor, pep, recipient): | |
658 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
659 d.addCallback(_getAffiliation, requestor) | |
660 d.addCallback(self._doGetAffiliationsOwner) | |
661 return d | |
662 | |
663 def _doGetAffiliationsOwner(self, result): | |
664 node, affiliation = result | |
665 | |
666 if affiliation != 'owner': | |
667 raise error.Forbidden() | |
668 return node.getAffiliations() | |
669 | |
670 def setAffiliationsOwner(self, nodeIdentifier, requestor, affiliations, pep, recipient): | |
671 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
672 d.addCallback(_getAffiliation, requestor) | |
673 d.addCallback(self._doSetAffiliationsOwner, requestor, affiliations) | |
674 return d | |
675 | |
676 def _doSetAffiliationsOwner(self, result, requestor, affiliations): | |
677 # Check that requestor is allowed to set affiliations, and delete entities | |
678 # with "none" affiliation | |
679 | |
680 # TODO: return error with failed affiliations in case of failure | |
681 node, requestor_affiliation = result | |
682 | |
683 if requestor_affiliation != 'owner': | |
684 raise error.Forbidden() | |
685 | |
686 # we don't allow requestor to change its own affiliation | |
687 requestor_bare = requestor.userhostJID() | |
688 if requestor_bare in affiliations and affiliations[requestor_bare] != 'owner': | |
689 # FIXME: it may be interesting to allow the owner to ask for ownership removal | |
690 # if at least one other entity is owner for this node | |
691 raise error.Forbidden("You can't change your own affiliation") | |
692 | |
693 to_delete = [jid_ for jid_, affiliation in affiliations.iteritems() if affiliation == 'none'] | |
694 for jid_ in to_delete: | |
695 del affiliations[jid_] | |
696 | |
697 if to_delete: | |
698 d = node.deleteAffiliations(to_delete) | |
699 if affiliations: | |
700 d.addCallback(lambda dummy: node.setAffiliations(affiliations)) | |
701 else: | |
702 d = node.setAffiliations(affiliations) | |
703 | |
704 return d | |
705 | |
706 def getSubscriptionsOwner(self, nodeIdentifier, requestor, pep, recipient): | |
707 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
708 d.addCallback(_getAffiliation, requestor) | |
709 d.addCallback(self._doGetSubscriptionsOwner) | |
710 return d | |
711 | |
712 def _doGetSubscriptionsOwner(self, result): | |
713 node, affiliation = result | |
714 | |
715 if affiliation != 'owner': | |
716 raise error.Forbidden() | |
717 return node.getSubscriptions() | |
718 | |
719 def setSubscriptionsOwner(self, nodeIdentifier, requestor, subscriptions, pep, recipient): | |
720 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
721 d.addCallback(_getAffiliation, requestor) | |
722 d.addCallback(self._doSetSubscriptionsOwner, requestor, subscriptions) | |
723 return d | |
724 | |
725 def unwrapFirstError(self, failure): | |
726 failure.trap(defer.FirstError) | |
727 return failure.value.subFailure | |
728 | |
729 def _doSetSubscriptionsOwner(self, result, requestor, subscriptions): | |
730 # Check that requestor is allowed to set subscriptions, and delete entities | |
731 # with "none" subscription | |
732 | |
733 # TODO: return error with failed subscriptions in case of failure | |
734 node, requestor_affiliation = result | |
735 | |
736 if requestor_affiliation != 'owner': | |
737 raise error.Forbidden() | |
738 | |
739 d_list = [] | |
740 | |
741 for subscription in subscriptions.copy(): | |
742 if subscription.state == 'none': | |
743 subscriptions.remove(subscription) | |
744 d_list.append(node.removeSubscription(subscription.subscriber)) | |
745 | |
746 if subscriptions: | |
747 d_list.append(node.setSubscriptions(subscriptions)) | |
748 | |
749 d = defer.gatherResults(d_list, consumeErrors=True) | |
750 d.addCallback(lambda _: None) | |
751 d.addErrback(self.unwrapFirstError) | |
752 return d | |
753 | |
754 def filterItemsWithSchema(self, items_data, schema, owner): | |
755 """check schema restriction and remove fields/items if they don't comply | |
756 | |
757 @param items_data(list[ItemData]): items to filter | |
758 items in this list will be modified | |
759 @param schema(domish.Element): node schema | |
760 @param owner(bool): True is requestor is a owner of the node | |
761 """ | |
762 fields_to_remove = set() | |
763 for field_elt in schema.elements(data_form.NS_X_DATA, 'field'): | |
764 for restrict_elt in field_elt.elements(const.NS_SCHEMA_RESTRICT, 'restrict'): | |
765 read_restriction = restrict_elt.attributes.get('read') | |
766 if read_restriction is not None: | |
767 if read_restriction == 'owner': | |
768 if not owner: | |
769 fields_to_remove.add(field_elt['var']) | |
770 else: | |
771 raise StanzaError('feature-not-implemented', text='unknown read restriction {}'.format(read_restriction)) | |
772 items_to_remove = [] | |
773 for idx, item_data in enumerate(items_data): | |
774 item_elt = item_data.item | |
775 try: | |
776 x_elt = next(item_elt.elements(data_form.NS_X_DATA, 'x')) | |
777 except StopIteration: | |
778 log.msg("WARNING, item {id} has a schema but no form, ignoring it") | |
779 items_to_remove.append(item_data) | |
780 continue | |
781 form = data_form.Form.fromElement(x_elt) | |
782 # we remove fields which are not visible for this user | |
783 for field in fields_to_remove: | |
784 try: | |
785 form.removeField(form.fields[field]) | |
786 except KeyError: | |
787 continue | |
788 item_elt.children.remove(x_elt) | |
789 item_elt.addChild(form.toElement()) | |
790 | |
791 for item_data in items_to_remove: | |
792 items_data.remove(item_data) | |
793 | |
794 def checkPresenceSubscription(self, node, requestor): | |
795 """check if requestor has presence subscription from node owner | |
796 | |
797 @param node(Node): node to check | |
798 @param requestor(jid.JID): entity who want to access node | |
799 """ | |
800 def gotRoster(roster): | |
801 if roster is None: | |
802 raise error.Forbidden() | |
803 | |
804 if requestor not in roster: | |
805 raise error.Forbidden() | |
806 | |
807 if not roster[requestor].subscriptionFrom: | |
808 raise error.Forbidden() | |
809 | |
810 d = self.getOwnerRoster(node) | |
811 d.addCallback(gotRoster) | |
812 return d | |
813 | |
814 @defer.inlineCallbacks | |
815 def checkRosterGroups(self, node, requestor): | |
816 """check if requestor is in allowed groups of a node | |
817 | |
818 @param node(Node): node to check | |
819 @param requestor(jid.JID): entity who want to access node | |
820 """ | |
821 roster = yield self.getOwnerRoster(node) | |
822 | |
823 if roster is None: | |
824 raise error.Forbidden() | |
825 | |
826 if requestor not in roster: | |
827 raise error.Forbidden() | |
828 | |
829 authorized_groups = yield node.getAuthorizedGroups() | |
830 | |
831 if not roster[requestor].groups.intersection(authorized_groups): | |
832 # requestor is in roster but not in one of the allowed groups | |
833 raise error.Forbidden() | |
834 | |
835 def checkNodeAffiliations(self, node, requestor): | |
836 """check if requestor is in white list of a node | |
837 | |
838 @param node(Node): node to check | |
839 @param requestor(jid.JID): entity who want to access node | |
840 """ | |
841 def gotAffiliations(affiliations): | |
842 try: | |
843 affiliation = affiliations[requestor.userhostJID()] | |
844 except KeyError: | |
845 raise error.Forbidden() | |
846 else: | |
847 if affiliation not in ('owner', 'publisher', 'member'): | |
848 raise error.Forbidden() | |
849 | |
850 d = node.getAffiliations() | |
851 d.addCallback(gotAffiliations) | |
852 return d | |
853 | |
854 @defer.inlineCallbacks | |
855 def checkNodeAccess(self, node, requestor): | |
856 """check if a requestor can access data of a node | |
857 | |
858 @param node(Node): node to check | |
859 @param requestor(jid.JID): entity who want to access node | |
860 @return (tuple): permissions data with: | |
861 - owner(bool): True if requestor is owner of the node | |
862 - roster(None, ): roster of the requestor | |
863 None if not needed/available | |
864 - access_model(str): access model of the node | |
865 @raise error.Forbidden: access is not granted | |
866 @raise error.NotLeafNodeError: this node is not a leaf | |
867 """ | |
868 node, affiliation = yield _getAffiliation(node, requestor) | |
869 | |
870 if not iidavoll.ILeafNode.providedBy(node): | |
871 raise error.NotLeafNodeError() | |
872 | |
873 if affiliation == 'outcast': | |
874 raise error.Forbidden() | |
875 | |
876 # node access check | |
877 owner = affiliation == 'owner' | |
878 access_model = node.getAccessModel() | |
879 roster = None | |
880 | |
881 if access_model == const.VAL_AMODEL_OPEN or owner: | |
882 pass | |
883 elif access_model == const.VAL_AMODEL_PRESENCE: | |
884 yield self.checkPresenceSubscription(node, requestor) | |
885 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | |
886 # FIXME: for node, access should be renamed owner-roster, not publisher | |
887 yield self.checkRosterGroups(node, requestor) | |
888 elif access_model == const.VAL_AMODEL_WHITELIST: | |
889 yield self.checkNodeAffiliations(node, requestor) | |
890 else: | |
891 raise Exception(u"Unknown access_model") | |
892 | |
893 defer.returnValue((affiliation, owner, roster, access_model)) | |
894 | |
895 @defer.inlineCallbacks | |
896 def getItemsIds(self, nodeIdentifier, requestor, authorized_groups, unrestricted, maxItems=None, ext_data=None, pep=False, recipient=None): | |
897 # FIXME: items access model are not checked | |
898 # TODO: check items access model | |
899 node = yield self.storage.getNode(nodeIdentifier, pep, recipient) | |
900 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor) | |
901 ids = yield node.getItemsIds(authorized_groups, | |
902 unrestricted, | |
903 maxItems, | |
904 ext_data) | |
905 defer.returnValue(ids) | |
906 | |
907 def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None, | |
908 itemIdentifiers=None, ext_data=None): | |
909 d = self.getItemsData(nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data) | |
910 d.addCallback(lambda items_data: [item_data.item for item_data in items_data]) | |
911 return d | |
912 | |
913 @defer.inlineCallbacks | |
914 def getOwnerRoster(self, node, owners=None): | |
915 # FIXME: roster of publisher, not owner, must be used | |
916 if owners is None: | |
917 owners = yield node.getOwners() | |
918 | |
919 if len(owners) != 1: | |
920 log.msg('publisher-roster access is not allowed with more than 1 owner') | |
921 return | |
922 | |
923 owner_jid = owners[0] | |
924 | |
925 try: | |
926 roster = yield self.privilege.getRoster(owner_jid) | |
927 except Exception as e: | |
928 log.msg("Error while getting roster of {owner_jid}: {msg}".format( | |
929 owner_jid = owner_jid.full(), | |
930 msg = e)) | |
931 return | |
932 defer.returnValue(roster) | |
933 | |
934 @defer.inlineCallbacks | |
935 def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None, | |
936 itemIdentifiers=None, ext_data=None): | |
937 """like getItems but return the whole ItemData""" | |
938 if maxItems == 0: | |
939 log.msg("WARNING: maxItems=0 on items retrieval") | |
940 defer.returnValue([]) | |
941 | |
942 if ext_data is None: | |
943 ext_data = {} | |
944 node = yield self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient) | |
945 try: | |
946 affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor) | |
947 except error.NotLeafNodeError: | |
948 defer.returnValue([]) | |
949 | |
950 # at this point node access is checked | |
951 | |
952 if owner: | |
953 # requestor_groups is only used in restricted access | |
954 requestor_groups = None | |
955 else: | |
956 if roster is None: | |
957 # FIXME: publisher roster should be used, not owner | |
958 roster = yield self.getOwnerRoster(node) | |
959 if roster is None: | |
960 roster = {} | |
961 roster_item = roster.get(requestor.userhostJID()) | |
962 requestor_groups = tuple(roster_item.groups) if roster_item else tuple() | |
963 | |
964 if itemIdentifiers: | |
965 items_data = yield node.getItemsById(requestor_groups, owner, itemIdentifiers) | |
966 else: | |
967 items_data = yield node.getItems(requestor_groups, owner, maxItems, ext_data) | |
968 | |
969 if owner: | |
970 # Add item config data form to items with roster access model | |
971 for item_data in items_data: | |
972 if item_data.access_model == const.VAL_AMODEL_OPEN: | |
973 pass | |
974 elif item_data.access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | |
975 form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG) | |
976 access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_PUBLISHER_ROSTER) | |
977 allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=item_data.config[const.OPT_ROSTER_GROUPS_ALLOWED]) | |
978 form.addField(access) | |
979 form.addField(allowed) | |
980 item_data.item.addChild(form.toElement()) | |
981 elif access_model == const.VAL_AMODEL_WHITELIST: | |
982 #FIXME | |
983 raise NotImplementedError | |
984 else: | |
985 raise error.BadAccessTypeError(access_model) | |
986 | |
987 schema = node.getSchema() | |
988 if schema is not None: | |
989 self.filterItemsWithSchema(items_data, schema, owner) | |
990 | |
991 yield self._items_rsm(items_data, node, requestor_groups, owner, itemIdentifiers, ext_data) | |
992 defer.returnValue(items_data) | |
993 | |
994 def _setCount(self, value, response): | |
995 response.count = value | |
996 | |
997 def _setIndex(self, value, response, adjust): | |
998 """Set index in RSM response | |
999 | |
1000 @param value(int): value of the reference index (i.e. before or after item) | |
1001 @param response(RSMResponse): response instance to fill | |
1002 @param adjust(int): adjustement term (i.e. difference between reference index and first item of the result) | |
1003 """ | |
1004 response.index = value + adjust | |
1005 | |
1006 def _items_rsm(self, items_data, node, authorized_groups, owner, | |
1007 itemIdentifiers, ext_data): | |
1008 # FIXME: move this to a separate module | |
1009 # TODO: Index can be optimized by keeping a cache of the last RSM request | |
1010 # An other optimisation would be to look for index first and use it as offset | |
1011 try: | |
1012 rsm_request = ext_data['rsm'] | |
1013 except KeyError: | |
1014 # No RSM in this request, nothing to do | |
1015 return items_data | |
1016 | |
1017 if itemIdentifiers: | |
1018 log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part") | |
1019 return items_data | |
1020 | |
1021 response = rsm.RSMResponse() | |
1022 | |
1023 d_count = node.getItemsCount(authorized_groups, owner, ext_data) | |
1024 d_count.addCallback(self._setCount, response) | |
1025 d_list = [d_count] | |
1026 | |
1027 if items_data: | |
1028 response.first = items_data[0].item['id'] | |
1029 response.last = items_data[-1].item['id'] | |
1030 | |
1031 # index handling | |
1032 if rsm_request.index is not None: | |
1033 response.index = rsm_request.index | |
1034 elif rsm_request.before: | |
1035 # The last page case (before == '') is managed in render method | |
1036 d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data) | |
1037 d_index.addCallback(self._setIndex, response, -len(items_data)) | |
1038 d_list.append(d_index) | |
1039 elif rsm_request.after is not None: | |
1040 d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data) | |
1041 d_index.addCallback(self._setIndex, response, 1) | |
1042 d_list.append(d_index) | |
1043 else: | |
1044 # the first page was requested | |
1045 response.index = 0 | |
1046 | |
1047 def render(result): | |
1048 if rsm_request.before == '': | |
1049 # the last page was requested | |
1050 response.index = response.count - len(items_data) | |
1051 items_data.append(container.ItemData(response.toElement())) | |
1052 return items_data | |
1053 | |
1054 return defer.DeferredList(d_list).addCallback(render) | |
1055 | |
1056 def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient): | |
1057 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
1058 d.addCallback(_getAffiliation, requestor) | |
1059 d.addCallback(self._doRetract, itemIdentifiers, requestor, notify, pep, recipient) | |
1060 return d | |
1061 | |
1062 def _doRetract(self, result, itemIdentifiers, requestor, notify, pep, recipient): | |
1063 node, affiliation = result | |
1064 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] | |
1065 | |
1066 if not persistItems: | |
1067 raise error.NodeNotPersistent() | |
1068 | |
1069 # we need to get the items before removing them, for the notifications | |
1070 | |
1071 def removeItems(items_data): | |
1072 """Remove the items and keep only actually removed ones in items_data""" | |
1073 d = node.removeItems(itemIdentifiers) | |
1074 d.addCallback(lambda removed: [item_data for item_data in items_data if item_data.item["id"] in removed]) | |
1075 return d | |
1076 | |
1077 def checkPublishers(publishers_map): | |
1078 """Called when requestor is neither owner neither publisher of the Node | |
1079 | |
1080 We check that requestor is publisher of all the items he wants to retract | |
1081 and raise error.Forbidden if it is not the case | |
1082 """ | |
1083 # TODO: the behaviour should be configurable (per node ?) | |
1084 if any((requestor.userhostJID() != publisher.userhostJID() for publisher in publishers_map.itervalues())): | |
1085 raise error.Forbidden() | |
1086 | |
1087 if affiliation in ['owner', 'publisher']: | |
1088 # the requestor is owner or publisher of the node | |
1089 # he can retract what he wants | |
1090 d = defer.succeed(None) | |
1091 else: | |
1092 # the requestor doesn't have right to retract on the whole node | |
1093 # we check if he is a publisher for all items he wants to retract | |
1094 # and forbid the retraction else. | |
1095 d = node.getItemsPublishers(itemIdentifiers) | |
1096 d.addCallback(checkPublishers) | |
1097 d.addCallback(lambda dummy: node.getItemsById(None, True, itemIdentifiers)) | |
1098 d.addCallback(removeItems) | |
1099 | |
1100 if notify: | |
1101 d.addCallback(self._doNotifyRetraction, node, pep, recipient) | |
1102 return d | |
1103 | |
1104 def _doNotifyRetraction(self, items_data, node, pep, recipient): | |
1105 self.dispatch({'items_data': items_data, | |
1106 'node': node, | |
1107 'pep': pep, | |
1108 'recipient': recipient}, | |
1109 '//event/pubsub/retract') | |
1110 | |
1111 def purgeNode(self, nodeIdentifier, requestor, pep, recipient): | |
1112 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
1113 d.addCallback(_getAffiliation, requestor) | |
1114 d.addCallback(self._doPurge) | |
1115 return d | |
1116 | |
1117 def _doPurge(self, result): | |
1118 node, affiliation = result | |
1119 persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS] | |
1120 | |
1121 if affiliation != 'owner': | |
1122 raise error.Forbidden() | |
1123 | |
1124 if not persistItems: | |
1125 raise error.NodeNotPersistent() | |
1126 | |
1127 d = node.purge() | |
1128 d.addCallback(self._doNotifyPurge, node.nodeIdentifier) | |
1129 return d | |
1130 | |
1131 def _doNotifyPurge(self, result, nodeIdentifier): | |
1132 self.dispatch(nodeIdentifier, '//event/pubsub/purge') | |
1133 | |
1134 def registerPreDelete(self, preDeleteFn): | |
1135 self._callbackList.append(preDeleteFn) | |
1136 | |
1137 def getSubscribers(self, nodeIdentifier, pep, recipient): | |
1138 def cb(subscriptions): | |
1139 return [subscription.subscriber for subscription in subscriptions] | |
1140 | |
1141 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
1142 d.addCallback(lambda node: node.getSubscriptions('subscribed')) | |
1143 d.addCallback(cb) | |
1144 return d | |
1145 | |
1146 def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None): | |
1147 d = self.storage.getNode(nodeIdentifier, pep, recipient) | |
1148 d.addCallback(_getAffiliation, requestor) | |
1149 d.addCallback(self._doPreDelete, redirectURI, pep, recipient) | |
1150 return d | |
1151 | |
1152 def _doPreDelete(self, result, redirectURI, pep, recipient): | |
1153 node, affiliation = result | |
1154 | |
1155 if affiliation != 'owner': | |
1156 raise error.Forbidden() | |
1157 | |
1158 data = {'node': node, | |
1159 'redirectURI': redirectURI} | |
1160 | |
1161 d = defer.DeferredList([cb(data, pep, recipient) | |
1162 for cb in self._callbackList], | |
1163 consumeErrors=1) | |
1164 d.addCallback(self._doDelete, node.nodeDbId) | |
1165 | |
1166 def _doDelete(self, result, nodeDbId): | |
1167 dl = [] | |
1168 for succeeded, r in result: | |
1169 if succeeded and r: | |
1170 dl.extend(r) | |
1171 | |
1172 d = self.storage.deleteNodeByDbId(nodeDbId) | |
1173 d.addCallback(self._doNotifyDelete, dl) | |
1174 | |
1175 return d | |
1176 | |
1177 def _doNotifyDelete(self, result, dl): | |
1178 for d in dl: | |
1179 d.callback(None) | |
1180 | |
1181 | |
1182 class PubSubResourceFromBackend(pubsub.PubSubResource): | |
1183 """ | |
1184 Adapts a backend to an xmpp publish-subscribe service. | |
1185 """ | |
1186 | |
1187 features = [ | |
1188 "config-node", | |
1189 "create-nodes", | |
1190 "delete-any", | |
1191 "delete-nodes", | |
1192 "item-ids", | |
1193 "meta-data", | |
1194 "publish", | |
1195 "purge-nodes", | |
1196 "retract-items", | |
1197 "retrieve-affiliations", | |
1198 "retrieve-default", | |
1199 "retrieve-items", | |
1200 "retrieve-subscriptions", | |
1201 "subscribe", | |
1202 ] | |
1203 | |
1204 discoIdentity = disco.DiscoIdentity('pubsub', | |
1205 'service', | |
1206 u'Salut à Toi pubsub service') | |
1207 | |
1208 pubsubService = None | |
1209 | |
1210 _errorMap = { | |
1211 error.NodeNotFound: ('item-not-found', None, None), | |
1212 error.NodeExists: ('conflict', None, None), | |
1213 error.Forbidden: ('forbidden', None, None), | |
1214 error.NotAuthorized: ('not-authorized', None, None), | |
1215 error.ItemNotFound: ('item-not-found', None, None), | |
1216 error.ItemForbidden: ('bad-request', 'item-forbidden', None), | |
1217 error.ItemRequired: ('bad-request', 'item-required', None), | |
1218 error.NoInstantNodes: ('not-acceptable', | |
1219 'unsupported', | |
1220 'instant-nodes'), | |
1221 error.NotSubscribed: ('unexpected-request', 'not-subscribed', None), | |
1222 error.InvalidConfigurationOption: ('not-acceptable', None, None), | |
1223 error.InvalidConfigurationValue: ('not-acceptable', None, None), | |
1224 error.NodeNotPersistent: ('feature-not-implemented', | |
1225 'unsupported', | |
1226 'persistent-node'), | |
1227 error.NoRootNode: ('bad-request', None, None), | |
1228 error.NoCollections: ('feature-not-implemented', | |
1229 'unsupported', | |
1230 'collections'), | |
1231 error.NoPublishing: ('feature-not-implemented', | |
1232 'unsupported', | |
1233 'publish'), | |
1234 } | |
1235 | |
1236 def __init__(self, backend): | |
1237 pubsub.PubSubResource.__init__(self) | |
1238 | |
1239 self.backend = backend | |
1240 self.hideNodes = False | |
1241 | |
1242 self.backend.registerPublishNotifier(self._notifyPublish) | |
1243 self.backend.registerRetractNotifier(self._notifyRetract) | |
1244 self.backend.registerPreDelete(self._preDelete) | |
1245 | |
1246 # FIXME: to be removed, it's not useful anymore as PEP is now used | |
1247 # if self.backend.supportsCreatorCheck(): | |
1248 # self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to | |
1249 # a jid in this server) is created by the right jid | |
1250 | |
1251 if self.backend.supportsAutoCreate(): | |
1252 self.features.append("auto-create") | |
1253 | |
1254 if self.backend.supportsPublishOptions(): | |
1255 self.features.append("publish-options") | |
1256 | |
1257 if self.backend.supportsInstantNodes(): | |
1258 self.features.append("instant-nodes") | |
1259 | |
1260 if self.backend.supportsOutcastAffiliation(): | |
1261 self.features.append("outcast-affiliation") | |
1262 | |
1263 if self.backend.supportsPersistentItems(): | |
1264 self.features.append("persistent-items") | |
1265 | |
1266 if self.backend.supportsPublisherAffiliation(): | |
1267 self.features.append("publisher-affiliation") | |
1268 | |
1269 if self.backend.supportsGroupBlog(): | |
1270 self.features.append("groupblog") | |
1271 | |
1272 | |
1273 # if self.backend.supportsPublishModel(): #XXX: this feature is not really described in XEP-0060, we just can see it in examples | |
1274 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) | |
1275 | |
1276 def getFullItem(self, item_data): | |
1277 """ Attach item configuration to this item | |
1278 | |
1279 Used to give item configuration back to node's owner (and *only* to owner) | |
1280 """ | |
1281 # TODO: a test should check that only the owner get the item configuration back | |
1282 | |
1283 item, item_config = item_data.item, item_data.config | |
1284 new_item = deepcopy(item) | |
1285 if item_config: | |
1286 new_item.addChild(item_config.toElement()) | |
1287 return new_item | |
1288 | |
1289 @defer.inlineCallbacks | |
1290 def _notifyPublish(self, data): | |
1291 items_data = data['items_data'] | |
1292 node = data['node'] | |
1293 pep = data['pep'] | |
1294 recipient = data['recipient'] | |
1295 | |
1296 owners, notifications_filtered = yield self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient) | |
1297 | |
1298 # we notify the owners | |
1299 # FIXME: check if this comply with XEP-0060 (option needed ?) | |
1300 # TODO: item's access model have to be sent back to owner | |
1301 # TODO: same thing for getItems | |
1302 | |
1303 for owner_jid in owners: | |
1304 notifications_filtered.append( | |
1305 (owner_jid, | |
1306 {pubsub.Subscription(node.nodeIdentifier, | |
1307 owner_jid, | |
1308 'subscribed')}, | |
1309 [self.getFullItem(item_data) for item_data in items_data])) | |
1310 | |
1311 if pep: | |
1312 defer.returnValue(self.backend.privilege.notifyPublish( | |
1313 recipient, | |
1314 node.nodeIdentifier, | |
1315 notifications_filtered)) | |
1316 | |
1317 else: | |
1318 defer.returnValue(self.pubsubService.notifyPublish( | |
1319 self.serviceJID, | |
1320 node.nodeIdentifier, | |
1321 notifications_filtered)) | |
1322 | |
1323 def _notifyRetract(self, data): | |
1324 items_data = data['items_data'] | |
1325 node = data['node'] | |
1326 pep = data['pep'] | |
1327 recipient = data['recipient'] | |
1328 | |
1329 def afterPrepare(result): | |
1330 owners, notifications_filtered = result | |
1331 #we add the owners | |
1332 | |
1333 for owner_jid in owners: | |
1334 notifications_filtered.append( | |
1335 (owner_jid, | |
1336 {pubsub.Subscription(node.nodeIdentifier, | |
1337 owner_jid, | |
1338 'subscribed')}, | |
1339 [item_data.item for item_data in items_data])) | |
1340 | |
1341 if pep: | |
1342 return self.backend.privilege.notifyRetract( | |
1343 recipient, | |
1344 node.nodeIdentifier, | |
1345 notifications_filtered) | |
1346 | |
1347 else: | |
1348 return self.pubsubService.notifyRetract( | |
1349 self.serviceJID, | |
1350 node.nodeIdentifier, | |
1351 notifications_filtered) | |
1352 | |
1353 d = self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient) | |
1354 d.addCallback(afterPrepare) | |
1355 return d | |
1356 | |
1357 @defer.inlineCallbacks | |
1358 def _prepareNotify(self, items_data, node, subscription=None, pep=None, recipient=None): | |
1359 """Do a bunch of permissions check and filter notifications | |
1360 | |
1361 The owner is not added to these notifications, | |
1362 it must be added by the calling method | |
1363 @param items_data(tuple): must contain: | |
1364 - item (domish.Element) | |
1365 - access_model (unicode) | |
1366 - access_list (dict as returned getItemsById, or item_config) | |
1367 @param node(LeafNode): node hosting the items | |
1368 @param subscription(pubsub.Subscription, None): TODO | |
1369 | |
1370 @return (tuple): will contain: | |
1371 - notifications_filtered | |
1372 - node_owner_jid | |
1373 - items_data | |
1374 """ | |
1375 if subscription is None: | |
1376 notifications = yield self.backend.getNotifications(node, items_data) | |
1377 else: | |
1378 notifications = [(subscription.subscriber, [subscription], items_data)] | |
1379 | |
1380 if pep and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence'): | |
1381 # for PEP we need to manage automatic subscriptions (cf. XEP-0163 §4) | |
1382 explicit_subscribers = {subscriber for subscriber, _, _ in notifications} | |
1383 auto_subscribers = yield self.backend.privilege.getAutoSubscribers(recipient, node.nodeIdentifier, explicit_subscribers) | |
1384 for sub_jid in auto_subscribers: | |
1385 sub = pubsub.Subscription(node.nodeIdentifier, sub_jid, 'subscribed') | |
1386 notifications.append((sub_jid, [sub], items_data)) | |
1387 | |
1388 owners = yield node.getOwners() | |
1389 owner_roster = None | |
1390 | |
1391 # now we check access of subscriber for each item, and keep only allowed ones | |
1392 | |
1393 #we filter items not allowed for the subscribers | |
1394 notifications_filtered = [] | |
1395 schema = node.getSchema() | |
1396 | |
1397 for subscriber, subscriptions, items_data in notifications: | |
1398 subscriber_bare = subscriber.userhostJID() | |
1399 if subscriber_bare in owners: | |
1400 # as notification is always sent to owner, | |
1401 # we ignore owner if he is here | |
1402 continue | |
1403 allowed_items = [] #we keep only item which subscriber can access | |
1404 | |
1405 if schema is not None: | |
1406 # we have to deepcopy items because different subscribers may receive | |
1407 # different items (e.g. read restriction in schema) | |
1408 items_data = deepcopy(items_data) | |
1409 self.backend.filterItemsWithSchema(items_data, schema, False) | |
1410 | |
1411 for item_data in items_data: | |
1412 item, access_model = item_data.item, item_data.access_model | |
1413 access_list = item_data.config | |
1414 if access_model == const.VAL_AMODEL_OPEN: | |
1415 allowed_items.append(item) | |
1416 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | |
1417 if owner_roster is None: | |
1418 # FIXME: publisher roster should be used, not owner | |
1419 owner_roster= yield self.getOwnerRoster(node, owners) | |
1420 if owner_roster is None: | |
1421 owner_roster = {} | |
1422 if not subscriber_bare in owner_roster: | |
1423 continue | |
1424 #the subscriber is known, is he in the right group ? | |
1425 authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED] | |
1426 if owner_roster[subscriber_bare].groups.intersection(authorized_groups): | |
1427 allowed_items.append(item) | |
1428 else: #unknown access_model | |
1429 # TODO: white list access | |
1430 raise NotImplementedError | |
1431 | |
1432 if allowed_items: | |
1433 notifications_filtered.append((subscriber, subscriptions, allowed_items)) | |
1434 | |
1435 defer.returnValue((owners, notifications_filtered)) | |
1436 | |
1437 def _preDelete(self, data, pep, recipient): | |
1438 nodeIdentifier = data['node'].nodeIdentifier | |
1439 redirectURI = data.get('redirectURI', None) | |
1440 d = self.backend.getSubscribers(nodeIdentifier, pep, recipient) | |
1441 d.addCallback(lambda subscribers: self.pubsubService.notifyDelete( | |
1442 self.serviceJID, | |
1443 nodeIdentifier, | |
1444 subscribers, | |
1445 redirectURI)) | |
1446 return d | |
1447 | |
1448 def _mapErrors(self, failure): | |
1449 e = failure.trap(*self._errorMap.keys()) | |
1450 | |
1451 condition, pubsubCondition, feature = self._errorMap[e] | |
1452 msg = failure.value.msg | |
1453 | |
1454 if pubsubCondition: | |
1455 exc = pubsub.PubSubError(condition, pubsubCondition, feature, msg) | |
1456 else: | |
1457 exc = StanzaError(condition, text=msg) | |
1458 | |
1459 raise exc | |
1460 | |
1461 def getInfo(self, requestor, service, nodeIdentifier, pep=None, recipient=None): | |
1462 return [] # FIXME: disabled for now, need to manage PEP | |
1463 if not requestor.resource: | |
1464 # this avoid error when getting a disco request from server during namespace delegation | |
1465 return [] | |
1466 info = {} | |
1467 | |
1468 def saveType(result): | |
1469 info['type'] = result | |
1470 return nodeIdentifier | |
1471 | |
1472 def saveMetaData(result): | |
1473 info['meta-data'] = result | |
1474 return info | |
1475 | |
1476 def trapNotFound(failure): | |
1477 failure.trap(error.NodeNotFound) | |
1478 return info | |
1479 | |
1480 d = defer.succeed(nodeIdentifier) | |
1481 d.addCallback(self.backend.getNodeType) | |
1482 d.addCallback(saveType) | |
1483 d.addCallback(self.backend.getNodeMetaData) | |
1484 d.addCallback(saveMetaData) | |
1485 d.addErrback(trapNotFound) | |
1486 d.addErrback(self._mapErrors) | |
1487 return d | |
1488 | |
1489 def getNodes(self, requestor, service, nodeIdentifier): | |
1490 """return nodes for disco#items | |
1491 | |
1492 Pubsub/PEP nodes will be returned if disco node is not specified | |
1493 else Pubsub/PEP items will be returned | |
1494 (according to what requestor can access) | |
1495 """ | |
1496 try: | |
1497 pep = service.pep | |
1498 except AttributeError: | |
1499 pep = False | |
1500 | |
1501 if service.resource: | |
1502 return defer.succeed([]) | |
1503 | |
1504 if nodeIdentifier: | |
1505 d = self.backend.getItemsIds(nodeIdentifier, | |
1506 requestor, | |
1507 [], | |
1508 requestor.userhostJID() == service, | |
1509 None, | |
1510 None, | |
1511 pep, | |
1512 service) | |
1513 # items must be set as name, not node | |
1514 d.addCallback(lambda items: [(None, item) for item in items]) | |
1515 | |
1516 else: | |
1517 d = self.backend.getNodes(requestor.userhostJID(), | |
1518 pep, | |
1519 service) | |
1520 return d.addErrback(self._mapErrors) | |
1521 | |
1522 def getConfigurationOptions(self): | |
1523 return self.backend.nodeOptions | |
1524 | |
1525 def _publish_errb(self, failure, request): | |
1526 if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate(): | |
1527 print "Auto-creating node %s" % (request.nodeIdentifier,) | |
1528 d = self.backend.createNode(request.nodeIdentifier, | |
1529 request.sender, | |
1530 pep=self._isPep(request), | |
1531 recipient=request.recipient) | |
1532 d.addCallback(lambda ignore, | |
1533 request: self.backend.publish(request.nodeIdentifier, | |
1534 request.items, | |
1535 request.sender, | |
1536 self._isPep(request), | |
1537 request.recipient, | |
1538 ), | |
1539 request) | |
1540 return d | |
1541 | |
1542 return failure | |
1543 | |
1544 def _isPep(self, request): | |
1545 try: | |
1546 return request.delegated | |
1547 except AttributeError: | |
1548 return False | |
1549 | |
1550 def publish(self, request): | |
1551 d = self.backend.publish(request.nodeIdentifier, | |
1552 request.items, | |
1553 request.sender, | |
1554 self._isPep(request), | |
1555 request.recipient) | |
1556 d.addErrback(self._publish_errb, request) | |
1557 return d.addErrback(self._mapErrors) | |
1558 | |
1559 def subscribe(self, request): | |
1560 d = self.backend.subscribe(request.nodeIdentifier, | |
1561 request.subscriber, | |
1562 request.sender, | |
1563 self._isPep(request), | |
1564 request.recipient) | |
1565 return d.addErrback(self._mapErrors) | |
1566 | |
1567 def unsubscribe(self, request): | |
1568 d = self.backend.unsubscribe(request.nodeIdentifier, | |
1569 request.subscriber, | |
1570 request.sender, | |
1571 self._isPep(request), | |
1572 request.recipient) | |
1573 return d.addErrback(self._mapErrors) | |
1574 | |
1575 def subscriptions(self, request): | |
1576 d = self.backend.getSubscriptions(request.sender, | |
1577 request.nodeIdentifier, | |
1578 self._isPep(request), | |
1579 request.recipient) | |
1580 return d.addErrback(self._mapErrors) | |
1581 | |
1582 def affiliations(self, request): | |
1583 """Retrieve affiliation for normal entity (cf. XEP-0060 §5.7) | |
1584 | |
1585 retrieve all node where this jid is affiliated | |
1586 """ | |
1587 d = self.backend.getAffiliations(request.sender, | |
1588 request.nodeIdentifier, | |
1589 self._isPep(request), | |
1590 request.recipient) | |
1591 return d.addErrback(self._mapErrors) | |
1592 | |
1593 def create(self, request): | |
1594 d = self.backend.createNode(request.nodeIdentifier, | |
1595 request.sender, request.options, | |
1596 self._isPep(request), | |
1597 request.recipient) | |
1598 return d.addErrback(self._mapErrors) | |
1599 | |
1600 def default(self, request): | |
1601 d = self.backend.getDefaultConfiguration(request.nodeType, | |
1602 self._isPep(request), | |
1603 request.sender) | |
1604 return d.addErrback(self._mapErrors) | |
1605 | |
1606 def configureGet(self, request): | |
1607 d = self.backend.getNodeConfiguration(request.nodeIdentifier, | |
1608 self._isPep(request), | |
1609 request.recipient) | |
1610 return d.addErrback(self._mapErrors) | |
1611 | |
1612 def configureSet(self, request): | |
1613 d = self.backend.setNodeConfiguration(request.nodeIdentifier, | |
1614 request.options, | |
1615 request.sender, | |
1616 self._isPep(request), | |
1617 request.recipient) | |
1618 return d.addErrback(self._mapErrors) | |
1619 | |
1620 def affiliationsGet(self, request): | |
1621 """Retrieve affiliations for owner (cf. XEP-0060 §8.9.1) | |
1622 | |
1623 retrieve all affiliations for a node | |
1624 """ | |
1625 d = self.backend.getAffiliationsOwner(request.nodeIdentifier, | |
1626 request.sender, | |
1627 self._isPep(request), | |
1628 request.recipient) | |
1629 return d.addErrback(self._mapErrors) | |
1630 | |
1631 def affiliationsSet(self, request): | |
1632 d = self.backend.setAffiliationsOwner(request.nodeIdentifier, | |
1633 request.sender, | |
1634 request.affiliations, | |
1635 self._isPep(request), | |
1636 request.recipient) | |
1637 return d.addErrback(self._mapErrors) | |
1638 | |
1639 def subscriptionsGet(self, request): | |
1640 """Retrieve subscriptions for owner (cf. XEP-0060 §8.8.1) | |
1641 | |
1642 retrieve all affiliations for a node | |
1643 """ | |
1644 d = self.backend.getSubscriptionsOwner(request.nodeIdentifier, | |
1645 request.sender, | |
1646 self._isPep(request), | |
1647 request.recipient) | |
1648 return d.addErrback(self._mapErrors) | |
1649 | |
1650 def subscriptionsSet(self, request): | |
1651 d = self.backend.setSubscriptionsOwner(request.nodeIdentifier, | |
1652 request.sender, | |
1653 request.subscriptions, | |
1654 self._isPep(request), | |
1655 request.recipient) | |
1656 return d.addErrback(self._mapErrors) | |
1657 | |
1658 def items(self, request): | |
1659 ext_data = {} | |
1660 if const.FLAG_ENABLE_RSM and request.rsm is not None: | |
1661 ext_data['rsm'] = request.rsm | |
1662 try: | |
1663 ext_data['pep'] = request.delegated | |
1664 except AttributeError: | |
1665 pass | |
1666 d = self.backend.getItems(request.nodeIdentifier, | |
1667 request.sender, | |
1668 request.recipient, | |
1669 request.maxItems, | |
1670 request.itemIdentifiers, | |
1671 ext_data) | |
1672 return d.addErrback(self._mapErrors) | |
1673 | |
1674 def retract(self, request): | |
1675 d = self.backend.retractItem(request.nodeIdentifier, | |
1676 request.itemIdentifiers, | |
1677 request.sender, | |
1678 request.notify, | |
1679 self._isPep(request), | |
1680 request.recipient) | |
1681 return d.addErrback(self._mapErrors) | |
1682 | |
1683 def purge(self, request): | |
1684 d = self.backend.purgeNode(request.nodeIdentifier, | |
1685 request.sender, | |
1686 self._isPep(request), | |
1687 request.recipient) | |
1688 return d.addErrback(self._mapErrors) | |
1689 | |
1690 def delete(self, request): | |
1691 d = self.backend.deleteNode(request.nodeIdentifier, | |
1692 request.sender, | |
1693 self._isPep(request), | |
1694 request.recipient) | |
1695 return d.addErrback(self._mapErrors) | |
1696 | |
1697 components.registerAdapter(PubSubResourceFromBackend, | |
1698 iidavoll.IBackendService, | |
1699 iwokkel.IPubSubResource) |