Mercurial > libervia-pubsub
comparison sat_pubsub/backend.py @ 285:a87c155d0fd5
replaced former roster dirty hack by a XEP-0356 first draft implementation, only roster get is implemented so far
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 31 Mar 2015 17:31:56 +0200 |
parents | 002c59dbc23f |
children | 073161f6f143 |
comparison
equal
deleted
inserted
replaced
284:dfc47748d8d8 | 285:a87c155d0fd5 |
---|---|
80 from sat_pubsub import error, iidavoll, const | 80 from sat_pubsub import error, iidavoll, const |
81 from sat_pubsub.iidavoll import IBackendService, ILeafNode | 81 from sat_pubsub.iidavoll import IBackendService, ILeafNode |
82 | 82 |
83 from copy import deepcopy | 83 from copy import deepcopy |
84 | 84 |
85 | |
85 def _getAffiliation(node, entity): | 86 def _getAffiliation(node, entity): |
86 d = node.getAffiliation(entity) | 87 d = node.getAffiliation(entity) |
87 d.addCallback(lambda affiliation: (node, affiliation)) | 88 d.addCallback(lambda affiliation: (node, affiliation)) |
88 return d | 89 return d |
89 | |
90 | 90 |
91 | 91 |
92 class BackendService(service.Service, utility.EventDispatcher): | 92 class BackendService(service.Service, utility.EventDispatcher): |
93 """ | 93 """ |
94 Generic publish-subscribe backend service. | 94 Generic publish-subscribe backend service. |
292 item.uri = None | 292 item.uri = None |
293 item.defaultUri = None | 293 item.defaultUri = None |
294 if not item.getAttribute("id"): | 294 if not item.getAttribute("id"): |
295 item["id"] = str(uuid.uuid4()) | 295 item["id"] = str(uuid.uuid4()) |
296 access_model, item_config = self.parseItemConfig(item) | 296 access_model, item_config = self.parseItemConfig(item) |
297 parsed_items.append((access_model, item_config, item)) | 297 parsed_items.append((access_model, item_config, item)) |
298 | 298 |
299 if persistItems: | 299 if persistItems: |
300 d = node.storeItems(parsed_items, requestor) | 300 d = node.storeItems(parsed_items, requestor) |
301 else: | 301 else: |
302 d = defer.succeed(None) | 302 d = defer.succeed(None) |
324 'items') == 'items': | 324 'items') == 'items': |
325 subs = subsBySubscriber.setdefault(subscription.subscriber, | 325 subs = subsBySubscriber.setdefault(subscription.subscriber, |
326 set()) | 326 set()) |
327 subs.add(subscription) | 327 subs.add(subscription) |
328 | 328 |
329 notifications = [(subscriber, subscriptions, items) | 329 notifications = [(subscriber, subscriptions_, items) |
330 for subscriber, subscriptions | 330 for subscriber, subscriptions_ |
331 in subsBySubscriber.iteritems()] | 331 in subsBySubscriber.iteritems()] |
332 | 332 |
333 return notifications | 333 return notifications |
334 | 334 |
335 def rootNotFound(failure): | 335 def rootNotFound(failure): |
344 d = defer.gatherResults([d1, d2]) | 344 d = defer.gatherResults([d1, d2]) |
345 d.addCallback(lambda result: result[0] + result[1]) | 345 d.addCallback(lambda result: result[0] + result[1]) |
346 d.addCallback(toNotifications, nodeIdentifier, items) | 346 d.addCallback(toNotifications, nodeIdentifier, items) |
347 return d | 347 return d |
348 | 348 |
349 | |
350 def registerNotifier(self, observerfn, *args, **kwargs): | 349 def registerNotifier(self, observerfn, *args, **kwargs): |
351 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) | 350 self.addObserver('//event/pubsub/notify', observerfn, *args, **kwargs) |
352 | |
353 | 351 |
354 def subscribe(self, nodeIdentifier, subscriber, requestor): | 352 def subscribe(self, nodeIdentifier, subscriber, requestor): |
355 subscriberEntity = subscriber.userhostJID() | 353 subscriberEntity = subscriber.userhostJID() |
356 if subscriberEntity != requestor.userhostJID(): | 354 if subscriberEntity != requestor.userhostJID(): |
357 return defer.fail(error.Forbidden()) | 355 return defer.fail(error.Forbidden()) |
443 nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) | 441 nodeIdentifierJID = JID(nodeIdentifier[len(const.NS_GROUPBLOG_PREFIX):] if groupblog else nodeIdentifier) |
444 except InvalidFormat: | 442 except InvalidFormat: |
445 is_user_jid = False | 443 is_user_jid = False |
446 else: | 444 else: |
447 is_user_jid = bool(nodeIdentifierJID.user) | 445 is_user_jid = bool(nodeIdentifierJID.user) |
448 | 446 |
449 if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): | 447 if is_user_jid and nodeIdentifierJID.userhostJID() != requestor.userhostJID(): |
450 #we have an user jid node, but not created by the owner of this jid | 448 #we have an user jid node, but not created by the owner of this jid |
451 print "Wrong creator" | 449 print "Wrong creator" |
452 raise error.Forbidden() | 450 raise error.Forbidden() |
453 | 451 |
510 return d | 508 return d |
511 | 509 |
512 def checkGroup(self, roster_groups, entity): | 510 def checkGroup(self, roster_groups, entity): |
513 """Check that entity is authorized and in roster | 511 """Check that entity is authorized and in roster |
514 @param roster_group: tuple which 2 items: | 512 @param roster_group: tuple which 2 items: |
515 - roster: mapping of jid to RosterItem as given by self.roster.getRoster | 513 - roster: mapping of jid to RosterItem as given by self.privilege.getRoster |
516 - groups: list of authorized groups | 514 - groups: list of authorized groups |
517 @param entity: entity which must be in group | 515 @param entity: entity which must be in group |
518 @return: (True, roster) if entity is in roster and authorized | 516 @return: (True, roster) if entity is in roster and authorized |
519 (False, roster) if entity is in roster but not authorized | 517 (False, roster) if entity is in roster but not authorized |
520 @raise: error.NotInRoster if entity is not in roster""" | 518 @raise: error.NotInRoster if entity is not in roster""" |
521 roster, authorized_groups = roster_groups | 519 roster, authorized_groups = roster_groups |
522 _entity = entity.userhostJID() | 520 _entity = entity.userhostJID() |
523 | 521 |
524 if not _entity in roster: | 522 if not _entity in roster: |
525 raise error.NotInRoster | 523 raise error.NotInRoster |
526 return (roster[_entity].groups.intersection(authorized_groups), roster) | 524 return (roster[_entity].groups.intersection(authorized_groups), roster) |
527 | 525 |
528 def _getNodeGroups(self, roster, nodeIdentifier): | 526 def _getNodeGroups(self, roster, nodeIdentifier): |
529 d = self.storage.getNodeGroups(nodeIdentifier) | 527 d = self.storage.getNodeGroups(nodeIdentifier) |
530 d.addCallback(lambda groups: (roster, groups)) | 528 d.addCallback(lambda groups: (roster, groups)) |
531 return d | 529 return d |
530 | |
531 def _rosterEb(self, failure): | |
532 log.msg("Error while getting roster: {}".format(failure.value)) | |
533 return {} | |
532 | 534 |
533 def _doGetItems(self, result, requestor, maxItems, itemIdentifiers, | 535 def _doGetItems(self, result, requestor, maxItems, itemIdentifiers, |
534 ext_data): | 536 ext_data): |
535 node, affiliation = result | 537 node, affiliation = result |
536 | 538 |
550 elif access_model == const.VAL_AMODEL_JID: | 552 elif access_model == const.VAL_AMODEL_JID: |
551 #FIXME: manage jid | 553 #FIXME: manage jid |
552 raise NotImplementedError | 554 raise NotImplementedError |
553 else: | 555 else: |
554 raise error.BadAccessTypeError(access_model) | 556 raise error.BadAccessTypeError(access_model) |
555 | 557 |
556 ret.append(item) | 558 ret.append(item) |
557 return ret | 559 return ret |
558 | 560 |
559 def access_checked(access_data): | 561 def access_checked(access_data): |
560 authorized, roster = access_data | 562 authorized, roster = access_data |
588 if affiliation == 'outcast': | 590 if affiliation == 'outcast': |
589 raise error.Forbidden() | 591 raise error.Forbidden() |
590 | 592 |
591 access_model = node.getConfiguration()["pubsub#access_model"] | 593 access_model = node.getConfiguration()["pubsub#access_model"] |
592 d = node.getNodeOwner() | 594 d = node.getNodeOwner() |
593 d.addCallback(self.roster.getRoster) | 595 d.addCallback(self.privilege.getRoster) |
596 d.addErrback(self._rosterEb) | |
594 | 597 |
595 if access_model == 'open' or affiliation == 'owner': | 598 if access_model == 'open' or affiliation == 'owner': |
596 d.addCallback(lambda roster: (True, roster)) | 599 d.addCallback(lambda roster: (True, roster)) |
597 d.addCallback(access_checked) | 600 d.addCallback(access_checked) |
598 elif access_model == 'roster': | 601 elif access_model == 'roster': |
835 self.backend = backend | 838 self.backend = backend |
836 self.hideNodes = False | 839 self.hideNodes = False |
837 | 840 |
838 self.backend.registerNotifier(self._notify) | 841 self.backend.registerNotifier(self._notify) |
839 self.backend.registerPreDelete(self._preDelete) | 842 self.backend.registerPreDelete(self._preDelete) |
840 | 843 |
841 if self.backend.supportsCreatorCheck(): | 844 if self.backend.supportsCreatorCheck(): |
842 self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to | 845 self.features.append("creator-jid-check") #SàT custom feature: Check that a node (which correspond to |
843 # a jid in this server) is created by the right jid | 846 # a jid in this server) is created by the right jid |
844 | 847 |
845 if self.backend.supportsAutoCreate(): | 848 if self.backend.supportsAutoCreate(): |
864 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) | 867 # self.features.append("publish_model") # but it's necessary for microblogging comments (see XEP-0277) |
865 | 868 |
866 def _notify(self, data): | 869 def _notify(self, data): |
867 items = data['items'] | 870 items = data['items'] |
868 node = data['node'] | 871 node = data['node'] |
869 | 872 |
870 def _notifyAllowed(result): | 873 def _notifyAllowed(result): |
871 """Check access of subscriber for each item, | 874 """Check access of subscriber for each item, |
872 and notify only allowed ones""" | 875 and notify only allowed ones""" |
873 notifications, (owner_jid,roster) = result | 876 notifications, (owner_jid,roster) = result |
874 | 877 |
875 #we filter items not allowed for the subscribers | 878 #we filter items not allowed for the subscribers |
876 notifications_filtered = [] | 879 notifications_filtered = [] |
877 | 880 |
878 for subscriber, subscriptions, _items in notifications: | 881 for subscriber, subscriptions, _items in notifications: |
879 allowed_items = [] #we keep only item which subscriber can access | 882 allowed_items = [] #we keep only item which subscriber can access |
887 continue | 890 continue |
888 #the subscriber is known, is he in the right group ? | 891 #the subscriber is known, is he in the right group ? |
889 authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] | 892 authorized_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] |
890 if roster[_subscriber].groups.intersection(authorized_groups): | 893 if roster[_subscriber].groups.intersection(authorized_groups): |
891 allowed_items.append(item) | 894 allowed_items.append(item) |
892 | 895 |
893 else: #unknown access_model | 896 else: #unknown access_model |
894 raise NotImplementedError | 897 raise NotImplementedError |
895 | 898 |
896 if allowed_items: | 899 if allowed_items: |
897 notifications_filtered.append((subscriber, subscriptions, allowed_items)) | 900 notifications_filtered.append((subscriber, subscriptions, allowed_items)) |
898 | 901 |
899 #we notify the owner | 902 #we notify the owner |
900 #FIXME: check if this comply with XEP-0060 (option needed ?) | 903 #FIXME: check if this comply with XEP-0060 (option needed ?) |
901 #TODO: item's access model have to be sent back to owner | 904 #TODO: item's access model have to be sent back to owner |
902 #TODO: same thing for getItems | 905 #TODO: same thing for getItems |
903 | 906 |
904 def getFullItem(item_data): | 907 def getFullItem(item_data): |
905 """ Attach item configuration to this item | 908 """ Attach item configuration to this item |
906 Used to give item configuration back to node's owner (and *only* to owner) | 909 Used to give item configuration back to node's owner (and *only* to owner) |
907 """ | 910 """ |
908 #TODO: a test should check that only the owner get the item configuration back | 911 #TODO: a test should check that only the owner get the item configuration back |
909 | 912 |
910 access_model, item_config, item = item_data | 913 access_model, item_config, item = item_data |
911 new_item = deepcopy(item) | 914 new_item = deepcopy(item) |
912 if item_config: | 915 if item_config: |
913 new_item.addChild(item_config.toElement()) | 916 new_item.addChild(item_config.toElement()) |
914 return new_item | 917 return new_item |
915 | 918 |
916 notifications_filtered.append((owner_jid, | 919 notifications_filtered.append((owner_jid, |
917 set([Subscription(node.nodeIdentifier, | 920 set([Subscription(node.nodeIdentifier, |
918 owner_jid, | 921 owner_jid, |
919 'subscribed')]), | 922 'subscribed')]), |
920 [getFullItem(item_data) for item_data in items])) | 923 [getFullItem(item_data) for item_data in items])) |
921 | 924 |
922 return self.pubsubService.notifyPublish( | 925 return self.pubsubService.notifyPublish( |
923 self.serviceJID, | 926 self.serviceJID, |
924 node.nodeIdentifier, | 927 node.nodeIdentifier, |
925 notifications_filtered) | 928 notifications_filtered) |
926 | 929 |
927 | 930 |
928 if 'subscription' not in data: | 931 if 'subscription' not in data: |
929 d1 = self.backend.getNotifications(node.nodeIdentifier, items) | 932 d1 = self.backend.getNotifications(node.nodeIdentifier, items) |
930 else: | 933 else: |
931 subscription = data['subscription'] | 934 subscription = data['subscription'] |
932 d1 = defer.succeed([(subscription.subscriber, [subscription], | 935 d1 = defer.succeed([(subscription.subscriber, [subscription], |
933 items)]) | 936 items)]) |
934 | 937 |
935 def _got_owner(owner_jid): | 938 def _got_owner(owner_jid): |
936 #return a tuple with owner_jid and roster | 939 #return a tuple with owner_jid and roster |
937 d = self.backend.roster.getRoster(owner_jid) | 940 d = self.backend.privilege.getRoster(owner_jid) |
938 return d.addCallback(lambda roster: (owner_jid,roster)) | 941 d.addErrback(self._rosterEb) |
942 d.addCallback(lambda roster: (owner_jid,roster)) | |
939 | 943 |
940 d2 = node.getNodeOwner() | 944 d2 = node.getNodeOwner() |
941 d2.addCallback(_got_owner) | 945 d2.addCallback(_got_owner) |
942 | 946 |
943 d = defer.gatherResults([d1, d2]) | 947 d = defer.gatherResults([d1, d2]) |
944 d.addCallback(_notifyAllowed) | 948 d.addCallback(_notifyAllowed) |
945 | |
946 | 949 |
947 def _preDelete(self, data): | 950 def _preDelete(self, data): |
948 nodeIdentifier = data['node'].nodeIdentifier | 951 nodeIdentifier = data['node'].nodeIdentifier |
949 redirectURI = data.get('redirectURI', None) | 952 redirectURI = data.get('redirectURI', None) |
950 d = self.backend.getSubscribers(nodeIdentifier) | 953 d = self.backend.getSubscribers(nodeIdentifier) |