comparison sat_pubsub/backend.py @ 285:a87c155d0fd5

replaced former roster dirty hack by a XEP-0356 first draft implementation, only roster get is implemented so far
author Goffi <goffi@goffi.org>
date Tue, 31 Mar 2015 17:31:56 +0200
parents 002c59dbc23f
children 073161f6f143
comparison
equal deleted inserted replaced
284:dfc47748d8d8 285:a87c155d0fd5
80 from sat_pubsub import error, iidavoll, const 80 from sat_pubsub import error, iidavoll, const
81 from sat_pubsub.iidavoll import IBackendService, ILeafNode 81 from sat_pubsub.iidavoll import IBackendService, ILeafNode
82 82
83 from copy import deepcopy 83 from copy import deepcopy
84 84
85
85 def _getAffiliation(node, entity): 86 def _getAffiliation(node, entity):
86 d = node.getAffiliation(entity) 87 d = node.getAffiliation(entity)
87 d.addCallback(lambda affiliation: (node, affiliation)) 88 d.addCallback(lambda affiliation: (node, affiliation))
88 return d 89 return d
89
90 90
91 91
92 class BackendService(service.Service, utility.EventDispatcher): 92 class BackendService(service.Service, utility.EventDispatcher):
93 """ 93 """
94 Generic publish-subscribe backend service. 94 Generic publish-subscribe backend service.
292 item.uri = None 292 item.uri = None
293 item.defaultUri = None 293 item.defaultUri = None
294 if not item.getAttribute("id"): 294 if not item.getAttribute("id"):
295 item["id"] = str(uuid.uuid4()) 295 item["id"] = str(uuid.uuid4())
296 access_model, item_config = self.parseItemConfig(item) 296 access_model, item_config = self.parseItemConfig(item)
297 parsed_items.append((access_model, item_config, item)) 297 parsed_items.append((access_model, item_config, item))
298 298
299 if persistItems: 299 if persistItems:
300 d = node.storeItems(parsed_items, requestor) 300 d = node.storeItems(parsed_items, requestor)
301 else: 301 else:
302 d = defer.succeed(None) 302 d = defer.succeed(None)
324 'items') == 'items': 324 'items') == 'items':
325 subs = subsBySubscriber.setdefault(subscription.subscriber, 325 subs = subsBySubscriber.setdefault(subscription.subscriber,
326 set()) 326 set())
327 subs.add(subscription) 327 subs.add(subscription)
328 328
329 notifications = [(subscriber, subscriptions, items) 329 notifications = [(subscriber, subscriptions_, items)
330 for subscriber, subscriptions 330 for subscriber, subscriptions_
331 in subsBySubscriber.iteritems()] 331 in subsBySubscriber.iteritems()]
332 332
333 return notifications 333 return notifications
334 334
335 def rootNotFound(failure): 335 def rootNotFound(failure):
344 d = defer.gatherResults([d1, d2]) 344 d = defer.gatherResults([d1, d2])
345 d.addCallback(lambda result: result[0] + result[1]) 345 d.addCallback(lambda result: result[0] + result[1])
346 d.addCallback(toNotifications, nodeIdentifier, items) 346 d.addCallback(toNotifications, nodeIdentifier, items)
347 return d 347 return d
348 348
349
350 def registerNotifier(self, observerfn, *args, **kwargs): 349 def registerNotifier(self, observerfn, *args, **kwargs):
351 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) 350 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs)
352
353 351
354 def subscribe(self, nodeIdentifier, subscriber, requestor): 352 def subscribe(self, nodeIdentifier, subscriber, requestor):
355 subscriberEntity = subscriber.userhostJID() 353 subscriberEntity = subscriber.userhostJID()
356 if subscriberEntity != requestor.userhostJID(): 354 if subscriberEntity != requestor.userhostJID():
357 return defer.fail(error.Forbidden()) 355 return defer.fail(error.Forbidden())
443 nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) 441 nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier)
444 except InvalidFormat: 442 except InvalidFormat:
445 is_user_jid = False 443 is_user_jid = False
446 else: 444 else:
447 is_user_jid = bool(nodeIdentifierJID.user) 445 is_user_jid = bool(nodeIdentifierJID.user)
448 446
449 if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): 447 if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID():
450 #we have an user jid node, but not created by the owner of this jid 448 #we have an user jid node, but not created by the owner of this jid
451 print "Wrong creator" 449 print "Wrong creator"
452 raise error.Forbidden() 450 raise error.Forbidden()
453 451
510 return d 508 return d
511 509
512 def checkGroup(self, roster_groups, entity): 510 def checkGroup(self, roster_groups, entity):
513 """Check that entity is authorized and in roster 511 """Check that entity is authorized and in roster
514 @param roster_group: tuple which 2 items: 512 @param roster_group: tuple which 2 items:
515 - roster: mapping of jid to RosterItem as given by self.roster.getRoster 513 - roster: mapping of jid to RosterItem as given by self.privilege.getRoster
516 - groups: list of authorized groups 514 - groups: list of authorized groups
517 @param entity: entity which must be in group 515 @param entity: entity which must be in group
518 @return: (True, roster) if entity is in roster and authorized 516 @return: (True, roster) if entity is in roster and authorized
519 (False, roster) if entity is in roster but not authorized 517 (False, roster) if entity is in roster but not authorized
520 @raise: error.NotInRoster if entity is not in roster""" 518 @raise: error.NotInRoster if entity is not in roster"""
521 roster, authorized_groups = roster_groups 519 roster, authorized_groups = roster_groups
522 _entity = entity.userhostJID() 520 _entity = entity.userhostJID()
523 521
524 if not _entity in roster: 522 if not _entity in roster:
525 raise error.NotInRoster 523 raise error.NotInRoster
526 return (roster[_entity].groups.intersection(authorized_groups), roster) 524 return (roster[_entity].groups.intersection(authorized_groups), roster)
527 525
528 def _getNodeGroups(self, roster, nodeIdentifier): 526 def _getNodeGroups(self, roster, nodeIdentifier):
529 d = self.storage.getNodeGroups(nodeIdentifier) 527 d = self.storage.getNodeGroups(nodeIdentifier)
530 d.addCallback(lambda groups: (roster, groups)) 528 d.addCallback(lambda groups: (roster, groups))
531 return d 529 return d
530
531 def _rosterEb(self, failure):
532 log.msg("Error while getting roster: {}".format(failure.value))
533 return {}
532 534
533 def _doGetItems(self, result, requestor, maxItems, itemIdentifiers, 535 def _doGetItems(self, result, requestor, maxItems, itemIdentifiers,
534 ext_data): 536 ext_data):
535 node, affiliation = result 537 node, affiliation = result
536 538
550 elif access_model == const.VAL_AMODEL_JID: 552 elif access_model == const.VAL_AMODEL_JID:
551 #FIXME: manage jid 553 #FIXME: manage jid
552 raise NotImplementedError 554 raise NotImplementedError
553 else: 555 else:
554 raise error.BadAccessTypeError(access_model) 556 raise error.BadAccessTypeError(access_model)
555 557
556 ret.append(item) 558 ret.append(item)
557 return ret 559 return ret
558 560
559 def access_checked(access_data): 561 def access_checked(access_data):
560 authorized, roster = access_data 562 authorized, roster = access_data
588 if affiliation == 'outcast': 590 if affiliation == 'outcast':
589 raise error.Forbidden() 591 raise error.Forbidden()
590 592
591 access_model = node.getConfiguration()["pubsub#access_model"] 593 access_model = node.getConfiguration()["pubsub#access_model"]
592 d = node.getNodeOwner() 594 d = node.getNodeOwner()
593 d.addCallback(self.roster.getRoster) 595 d.addCallback(self.privilege.getRoster)
596 d.addErrback(self._rosterEb)
594 597
595 if access_model == 'open' or affiliation == 'owner': 598 if access_model == 'open' or affiliation == 'owner':
596 d.addCallback(lambda roster: (True, roster)) 599 d.addCallback(lambda roster: (True, roster))
597 d.addCallback(access_checked) 600 d.addCallback(access_checked)
598 elif access_model == 'roster': 601 elif access_model == 'roster':
835 self.backend = backend 838 self.backend = backend
836 self.hideNodes = False 839 self.hideNodes = False
837 840
838 self.backend.registerNotifier(self._notify) 841 self.backend.registerNotifier(self._notify)
839 self.backend.registerPreDelete(self._preDelete) 842 self.backend.registerPreDelete(self._preDelete)
840 843
841 if self.backend.supportsCreatorCheck(): 844 if self.backend.supportsCreatorCheck():
842 self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to 845 self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to
843 # a jid in this server) is created by the right jid 846 # a jid in this server) is created by the right jid
844 847
845 if self.backend.supportsAutoCreate(): 848 if self.backend.supportsAutoCreate():
864 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) 867 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277)
865 868
866 def _notify(self, data): 869 def _notify(self, data):
867 items = data['items'] 870 items = data['items']
868 node = data['node'] 871 node = data['node']
869 872
870 def _notifyAllowed(result): 873 def _notifyAllowed(result):
871 """Check access of subscriber for each item, 874 """Check access of subscriber for each item,
872 and notify only allowed ones""" 875 and notify only allowed ones"""
873 notifications, (owner_jid,roster) = result 876 notifications, (owner_jid,roster) = result
874 877
875 #we filter items not allowed for the subscribers 878 #we filter items not allowed for the subscribers
876 notifications_filtered = [] 879 notifications_filtered = []
877 880
878 for subscriber, subscriptions, _items in notifications: 881 for subscriber, subscriptions, _items in notifications:
879 allowed_items = [] #we keep only item which subscriber can access 882 allowed_items = [] #we keep only item which subscriber can access
887 continue 890 continue
888 #the subscriber is known, is he in the right group ? 891 #the subscriber is known, is he in the right group ?
889 authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] 892 authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED]
890 if roster[_subscriber].groups.intersection(authorized_groups): 893 if roster[_subscriber].groups.intersection(authorized_groups):
891 allowed_items.append(item) 894 allowed_items.append(item)
892 895
893 else: #unknown access_model 896 else: #unknown access_model
894 raise NotImplementedError 897 raise NotImplementedError
895 898
896 if allowed_items: 899 if allowed_items:
897 notifications_filtered.append((subscriber, subscriptions, allowed_items)) 900 notifications_filtered.append((subscriber, subscriptions, allowed_items))
898 901
899 #we notify the owner 902 #we notify the owner
900 #FIXME: check if this comply with XEP-0060 (option needed ?) 903 #FIXME: check if this comply with XEP-0060 (option needed ?)
901 #TODO: item's access model have to be sent back to owner 904 #TODO: item's access model have to be sent back to owner
902 #TODO: same thing for getItems 905 #TODO: same thing for getItems
903 906
904 def getFullItem(item_data): 907 def getFullItem(item_data):
905 """ Attach item configuration to this item 908 """ Attach item configuration to this item
906 Used to give item configuration back to node's owner (and *only* to owner) 909 Used to give item configuration back to node's owner (and *only* to owner)
907 """ 910 """
908 #TODO: a test should check that only the owner get the item configuration back 911 #TODO: a test should check that only the owner get the item configuration back
909 912
910 access_model, item_config, item = item_data 913 access_model, item_config, item = item_data
911 new_item = deepcopy(item) 914 new_item = deepcopy(item)
912 if item_config: 915 if item_config:
913 new_item.addChild(item_config.toElement()) 916 new_item.addChild(item_config.toElement())
914 return new_item 917 return new_item
915 918
916 notifications_filtered.append((owner_jid, 919 notifications_filtered.append((owner_jid,
917 set([Subscription(node.nodeIdentifier, 920 set([Subscription(node.nodeIdentifier,
918 owner_jid, 921 owner_jid,
919 'subscribed')]), 922 'subscribed')]),
920 [getFullItem(item_data) for item_data in items])) 923 [getFullItem(item_data) for item_data in items]))
921 924
922 return self.pubsubService.notifyPublish( 925 return self.pubsubService.notifyPublish(
923 self.serviceJID, 926 self.serviceJID,
924 node.nodeIdentifier, 927 node.nodeIdentifier,
925 notifications_filtered) 928 notifications_filtered)
926 929
927 930
928 if 'subscription' not in data: 931 if 'subscription' not in data:
929 d1 = self.backend.getNotifications(node.nodeIdentifier, items) 932 d1 = self.backend.getNotifications(node.nodeIdentifier, items)
930 else: 933 else:
931 subscription = data['subscription'] 934 subscription = data['subscription']
932 d1 = defer.succeed([(subscription.subscriber, [subscription], 935 d1 = defer.succeed([(subscription.subscriber, [subscription],
933 items)]) 936 items)])
934 937
935 def _got_owner(owner_jid): 938 def _got_owner(owner_jid):
936 #return a tuple with owner_jid and roster 939 #return a tuple with owner_jid and roster
937 d = self.backend.roster.getRoster(owner_jid) 940 d = self.backend.privilege.getRoster(owner_jid)
938 return d.addCallback(lambda roster: (owner_jid,roster)) 941 d.addErrback(self._rosterEb)
942 d.addCallback(lambda roster: (owner_jid,roster))
939 943
940 d2 = node.getNodeOwner() 944 d2 = node.getNodeOwner()
941 d2.addCallback(_got_owner) 945 d2.addCallback(_got_owner)
942 946
943 d = defer.gatherResults([d1, d2]) 947 d = defer.gatherResults([d1, d2])
944 d.addCallback(_notifyAllowed) 948 d.addCallback(_notifyAllowed)
945
946 949
947 def _preDelete(self, data): 950 def _preDelete(self, data):
948 nodeIdentifier = data['node'].nodeIdentifier 951 nodeIdentifier = data['node'].nodeIdentifier
949 redirectURI = data.get('redirectURI', None) 952 redirectURI = data.get('redirectURI', None)
950 d = self.backend.getSubscribers(nodeIdentifier) 953 d = self.backend.getSubscribers(nodeIdentifier)