changeset 330:82d1259b3e36

backend, pgsql storage: better items/notification handling, various fixes: - replaced const.VAL_AMODEL_ROSTER by const.VAL_AMODEL_PUBLISHER_ROSTER to follow change in pgsql schema - implemented whitelist access model - fixed bad access check during items retrieval (access was checked on recipient instead of requestor/sender) - getItemsData and notification filtering now use inline callbacks: this make these complexe workflows far mor easy to read, and clarity is imperative in these security critical sections. - publisher-roster access model now need to have only one owner, else it will fail. The idea is to use this model only when owner=publisher, else there is ambiguity on the roster to use to check access - replaced getNodeOwner by node.getOwners, as a node can have several owners - notifications filtering has been fixed in a similar way - psql: simplified withPEP method, pep_table argument is actually not needed - removed error.NotInRoster: error.Forbidden is used instead - notifications now notify all the owners, not only the first one
author Goffi <>
date Sun, 26 Mar 2017 20:52:32 +0200
parents 98409ef42c94
children e93a9fd329d9
files sat_pubsub/ sat_pubsub/ sat_pubsub/
diffstat 3 files changed, 177 insertions(+), 189 deletions(-) [+]
line wrap: on
line diff
--- a/sat_pubsub/	Sun Mar 26 20:33:18 2017 +0200
+++ b/sat_pubsub/	Sun Mar 26 20:52:32 2017 +0200
@@ -124,8 +124,8 @@
                  "label": "Who can subscribe to this node",
                  "options": {
                      const.VAL_AMODEL_OPEN: "Public node",
-                     const.VAL_AMODEL_ROSTER: "Node restricted to some roster groups",
-                     const.VAL_AMODEL_JID: "Node restricted to some jids",
+                     const.VAL_AMODEL_PUBLISHER_ROSTER: "Node restricted to some groups of publisher's roster",
+                     const.VAL_AMODEL_WHITELIST: "Node restricted to some jids",
@@ -559,120 +559,123 @@
-    def getItems(self, nodeIdentifier, recipient, maxItems=None,
+    def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None,
                        itemIdentifiers=None, ext_data=None):
-        d = self.getItemsData(nodeIdentifier, recipient, maxItems, itemIdentifiers, ext_data)
+        d = self.getItemsData(nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data)
         d.addCallback(lambda items_data: [item_data.item for item_data in items_data])
         return d
-    def getItemsData(self, nodeIdentifier, recipient, maxItems=None,
+    @defer.inlineCallbacks
+    def getOwnerRoster(self, node, owners=None):
+        if owners is None:
+            owners = yield node.getOwners()
+        if len(owners) != 1:
+            log.msg('publisher-roster access is not allowed with more than 1 owner')
+            return
+        owner_jid = owners[0]
+        try:
+            roster = yield self.privilege.getRoster(owner_jid)
+        except Exception as e:
+            log.msg("Error while getting roster of {owner_jid}: {msg}".format(
+                owner_jid = owner_jid.full(),
+                msg = e))
+            return
+        defer.returnValue(roster)
+    @defer.inlineCallbacks
+    def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None,
                        itemIdentifiers=None, ext_data=None):
         """like getItems but return the whole ItemData"""
+        if maxItems == 0:
+            log.msg("WARNING: maxItems=0 on items retrieval")
+            defer.returnValue([])
         if ext_data is None:
             ext_data = {}
-        d =, ext_data.get('pep', False), recipient)
-        d.addCallback(_getAffiliation, recipient)
-        d.addCallback(self._doGetItems, recipient, maxItems, itemIdentifiers,
-                      ext_data)
-        return d
+        node = yield, ext_data.get('pep', False), recipient)
+        node, affiliation = yield _getAffiliation(node, requestor)
+        if not iidavoll.ILeafNode.providedBy(node):
+            defer.returnValue([])
+        if affiliation == 'outcast':
+            raise error.Forbidden()
-    def checkGroup(self, roster_groups, entity):
-        """Check that entity is authorized and in roster
-        @param roster_group: tuple which 2 items:
-                        - roster: mapping of jid to RosterItem as given by self.privilege.getRoster
-                        - groups: list of authorized groups
-        @param entity: entity which must be in group
-        @return: (True, roster) if entity is in roster and authorized
-                 (False, roster) if entity is in roster but not authorized
-        @raise: error.NotInRoster if entity is not in roster"""
-        roster, authorized_groups = roster_groups
-        _entity = entity.userhostJID()
+        # node access check
+        owner = affiliation == 'owner'
+        access_model = node.getAccessModel()
+        roster = None
+        if access_model == const.VAL_AMODEL_OPEN or owner:
+            pass
+        elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
+            roster = yield self.getOwnerRoster(node)
+            if roster is None:
+                raise error.Forbidden()
+            if requestor not in roster:
+                raise error.Forbidden()
+            authorized_groups = yield node.getAuthorizedGroups()
-        if not _entity in roster:
-            raise error.NotInRoster
-        return (roster[_entity].groups.intersection(authorized_groups), roster)
+            if not roster[requestor].groups.intersection(authorized_groups):
+                # requestor is in roster but not in one of the allowed groups
+                raise error.Forbidden()
+        elif access_model == const.VAL_AMODEL_WHITELIST:
+            affiliations = yield node.getAffiliations()
+            try:
+                affiliation = affiliations[requestor.userhostJID()]
+            except KeyError:
+                raise error.Forbidden()
+            else:
+                if affiliation not in ('owner', 'publisher', 'member'):
+                    raise error.Forbidden()
+        else:
+            raise Exception(u"Unknown access_model")
-    def _getNodeGroups(self, roster, nodeIdentifier, pep):
-        d =, pep)
-        d.addCallback(lambda groups: (roster, groups))
-        return d
+        # at this point node access is checked
-    def _rosterEb(self, failure, owner_jid):
-        log.msg("Error while getting roster of {}: {}".format(unicode(owner_jid), failure.value))
-        return {}
+        if owner:
+            # requestor_groups is only used in restricted access
+            requestor_groups = None
+        else:
+            if roster is None:
+                roster = yield self.getOwnerRoster(node)
+                if roster is None:
+                    roster = {}
+            roster_item = roster.get(requestor.userhostJID())
+            requestor_groups = tuple(roster_item.groups) if roster_item else tuple()
-    def _doGetItems(self, result, requestor, maxItems, itemIdentifiers,
-                    ext_data):
-        node, affiliation = result
-        if maxItems == 0:
-            log.msg("WARNING: maxItems=0 on items retrieval")
-            return []
+        if itemIdentifiers:
+            items_data = yield node.getItemsById(requestor_groups, owner, itemIdentifiers)
+        else:
+            items_data = yield node.getItems(requestor_groups, owner, maxItems, ext_data)
-        def append_item_config(items_data):
-            """Add item config data form to items with roster access model"""
+        if owner:
+            # Add item config data form to items with roster access model
             for item_data in items_data:
                 if item_data.access_model == const.VAL_AMODEL_OPEN:
-                elif item_data.access_model == const.VAL_AMODEL_ROSTER:
+                elif item_data.access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
                     form = data_form.Form('submit', formNamespace=const.NS_ITEM_CONFIG)
-                    access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_ROSTER)
+                    access = data_form.Field(None, const.OPT_ACCESS_MODEL, value=const.VAL_AMODEL_PUBLISHER_ROSTER)
                     allowed = data_form.Field(None, const.OPT_ROSTER_GROUPS_ALLOWED, values=item_data.config[const.OPT_ROSTER_GROUPS_ALLOWED])
-                elif access_model == const.VAL_AMODEL_JID:
-                    #FIXME: manage jid
+                elif access_model == const.VAL_AMODEL_WHITELIST:
+                    #FIXME
                     raise NotImplementedError
                     raise error.BadAccessTypeError(access_model)
-            return items_data
-        def access_checked(access_data):
-            authorized, roster = access_data
-            if not authorized:
-                raise error.NotAuthorized()
-            roster_item = roster.get(requestor.userhostJID())
-            authorized_groups = tuple(roster_item.groups) if roster_item else tuple()
-            owner = affiliation == 'owner'
-            if itemIdentifiers:
-                d = node.getItemsById(authorized_groups, owner, itemIdentifiers)
-            else:
-                d = node.getItems(authorized_groups, owner, maxItems, ext_data)
-                if owner:
-                    d.addCallback(append_item_config)
-            d.addCallback(self._items_rsm, node, authorized_groups,
-                          owner, itemIdentifiers,
-                          ext_data)
-            return d
-        if not iidavoll.ILeafNode.providedBy(node):
-            return []
-        if affiliation == 'outcast':
-            raise error.Forbidden()
-        access_model = node.getAccessModel()
-        d = node.getNodeOwner()
-        def gotOwner(owner_jid):
-            d_roster = self.privilege.getRoster(owner_jid)
-            d_roster.addErrback(self._rosterEb, owner_jid)
-            return d_roster
-        d.addCallback(gotOwner)
-        if access_model == const.VAL_AMODEL_OPEN or affiliation == 'owner':
-            d.addCallback(lambda roster: (True, roster))
-            d.addCallback(access_checked)
-        elif access_model == const.VAL_AMODEL_ROSTER:
-            d.addCallback(self._getNodeGroups, node.nodeIdentifier, ext_data.get('pep', False))
-            d.addCallback(self.checkGroup, requestor)
-            d.addCallback(access_checked)
-        return d
+        yield self._items_rsm(items_data, node, requestor_groups, owner, itemIdentifiers, ext_data)
+        defer.returnValue(items_data)
     def _setCount(self, value, response):
         response.count = value
@@ -906,7 +909,6 @@
         error.NodeExists: ('conflict', None, None),
         error.Forbidden: ('forbidden', None, None),
         error.NotAuthorized: ('not-authorized', None, None),
-        error.NotInRoster: ('not-authorized', 'not-in-roster-group', None),
         error.ItemNotFound: ('item-not-found', None, None),
         error.ItemForbidden: ('bad-request', 'item-forbidden', None),
         error.ItemRequired: ('bad-request', 'item-required', None),
@@ -971,8 +973,8 @@
         recipient = data['recipient']
         def afterPrepare(result):
-            owner_jid, notifications_filtered = result
-            #we notify the owner
+            owners, notifications_filtered = result
+            #we notify the owners
             #FIXME: check if this comply with XEP-0060 (option needed ?)
             #TODO: item's access model have to be sent back to owner
             #TODO: same thing for getItems
@@ -989,11 +991,13 @@
                 return new_item
-            notifications_filtered.append((owner_jid,
-                                           set([pubsub.Subscription(node.nodeIdentifier,
-                                                            owner_jid,
-                                                            'subscribed')]),
-                                           [getFullItem(item_data) for item_data in items_data]))
+            for owner_jid in owners:
+                notifications_filtered.append(
+                    (owner_jid,
+                     set([pubsub.Subscription(node.nodeIdentifier,
+                                              owner_jid,
+                                              'subscribed')]),
+                     [getFullItem(item_data) for item_data in items_data]))
             if pep:
                 return self.backend.privilege.notifyPublish(
@@ -1018,14 +1022,16 @@
         recipient = data['recipient']
         def afterPrepare(result):
-            owner_jid, notifications_filtered = result
-            #we add the owner
+            owners, notifications_filtered = result
+            #we add the owners
-            notifications_filtered.append((owner_jid,
-                                           set([pubsub.Subscription(node.nodeIdentifier,
-                                                            owner_jid,
-                                                            'subscribed')]),
-                                           [item_data.item for item_data in items_data]))
+            for owner_jid in owners:
+                notifications_filtered.append(
+                    (owner_jid,
+                     set([pubsub.Subscription(node.nodeIdentifier,
+                                              owner_jid,
+                                              'subscribed')]),
+                     [item_data.item for item_data in items_data]))
             if pep:
                 return self.backend.privilege.notifyRetract(
@@ -1044,6 +1050,7 @@
         return d
+    @defer.inlineCallbacks
     def _prepareNotify(self, items_data, node, subscription=None):
         """Do a bunch of permissions check and filter notifications
@@ -1062,64 +1069,52 @@
             - items_data
-        def filterNotifications(result):
-            """Check access of subscriber for each item, and keep only allowed ones"""
-            notifications, (owner_jid,roster) = result
-            #we filter items not allowed for the subscribers
-            notifications_filtered = []
-            for subscriber, subscriptions, items_data in notifications:
-                if subscriber == owner_jid:
-                    # as notification is always sent to owner,
-                    # we ignore owner if he is here
-                    continue
-                allowed_items = [] #we keep only item which subscriber can access
-                for item_data in items_data:
-                    item, access_model = item_data.item, item_data.access_model
-                    access_list = item_data.config
-                    if access_model == const.VAL_AMODEL_OPEN:
-                        allowed_items.append(item)
-                    elif access_model == const.VAL_AMODEL_ROSTER:
-                        _subscriber = subscriber.userhostJID()
-                        if not _subscriber in roster:
-                            continue
-                        #the subscriber is known, is he in the right group ?
-                        authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED]
-                        if roster[_subscriber].groups.intersection(authorized_groups):
-                            allowed_items.append(item)
-                    else: #unknown access_model
-                        raise NotImplementedError
-                if allowed_items:
-                    notifications_filtered.append((subscriber, subscriptions, allowed_items))
-            return (owner_jid, notifications_filtered)
         if subscription is None:
-            d1 = self.backend.getNotifications(node.nodeDbId, items_data)
+            notifications = yield self.backend.getNotifications(node.nodeDbId, items_data)
-            d1 = defer.succeed([(subscription.subscriber, [subscription],
-                                items_data)])
+            notifications = [(subscription.subscriber, [subscription], items_data)]
+        owners = node.getOwners()
+        owner_roster = None
+        # now we check access of subscriber for each item, and keep only allowed ones
-        def _got_owner(owner_jid):
-            #return a tuple with owner_jid and roster
-            def rosterEb(failure):
-                log.msg("Error while getting roster of {}: {}".format(unicode(owner_jid), failure.value))
-                return (owner_jid, {})
+        #we filter items not allowed for the subscribers
+        notifications_filtered = []
+        for subscriber, subscriptions, items_data in notifications:
+            subscriber_bare = subscriber.userhostJID()
+            if subscriber_bare in owners:
+                # as notification is always sent to owner,
+                # we ignore owner if he is here
+                continue
+            allowed_items = [] #we keep only item which subscriber can access
-            d = self.backend.privilege.getRoster(owner_jid)
-            d.addErrback(rosterEb)
-            d.addCallback(lambda roster: (owner_jid,roster))
-            return d
+            for item_data in items_data:
+                item, access_model = item_data.item, item_data.access_model
+                access_list = item_data.config
+                if access_model == const.VAL_AMODEL_OPEN:
+                    allowed_items.append(item)
+                elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
+                    if owner_roster is None:
+                        owner_roster= yield self.getOwnerRoster(node, owners)
+                    if owner_roster is None:
+                        owner_roster = {}
+                    if not subscriber_bare in owner_roster:
+                        continue
+                    #the subscriber is known, is he in the right group ?
+                    authorized_groups = access_list[const.OPT_ROSTER_GROUPS_ALLOWED]
+                    if owner_roster[subscriber_bare].groups.intersection(authorized_groups):
+                        allowed_items.append(item)
+                else: #unknown access_model
+                    # TODO: white list access
+                    raise NotImplementedError
-        d2 = node.getNodeOwner()
-        d2.addCallback(_got_owner)
-        d = defer.gatherResults([d1, d2])
-        d.addCallback(filterNotifications)
-        return d
+            if allowed_items:
+                notifications_filtered.append((subscriber, subscriptions, allowed_items))
+        defer.returnValue((owners, notifications_filtered))
     def _preDelete(self, data, pep, recipient):
         nodeIdentifier = data['node'].nodeIdentifier
@@ -1132,7 +1127,6 @@
         return d
     def _mapErrors(self, failure):
         e = failure.trap(*self._errorMap.keys())
@@ -1291,6 +1285,7 @@
         except AttributeError:
         d = self.backend.getItems(request.nodeIdentifier,
+                                  request.sender,
--- a/sat_pubsub/	Sun Mar 26 20:33:18 2017 +0200
+++ b/sat_pubsub/	Sun Mar 26 20:52:32 2017 +0200
@@ -63,9 +63,12 @@
 OPT_SEND_LAST_PUBLISHED_ITEM = "pubsub#send_last_published_item"
 OPT_PUBLISH_MODEL = 'pubsub#publish_model'
+VAL_AMODEL_PUBLISHER_ROSTER = 'publisher-roster'
+VAL_AMODEL_PUBLISH_ONLY = 'publish-only'
+VAL_AMODEL_SELF_PUBLISHER = 'self-publisher'
--- a/sat_pubsub/	Sun Mar 26 20:33:18 2017 +0200
+++ b/sat_pubsub/	Sun Mar 26 20:52:32 2017 +0200
@@ -55,7 +55,6 @@
 from zope.interface import implements
-from twisted.internet import defer
 from twisted.words.protocols.jabber import jid
 from twisted.python import log
@@ -74,27 +73,24 @@
 # parseXml manage str, but we get unicode
 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
+PEP_COL_NAME = 'pep'
-def withPEP(query, values, pep, recipient, pep_table=None):
+def withPEP(query, values, pep, recipient):
     """Helper method to facilitate PEP management
     @param query: SQL query basis
     @param values: current values to replace in query
     @param pep: True if we are in PEP mode
     @param recipient: jid of the recipient
-    @param pep_table: added before pep if table need to be specified
     @return: query + PEP AND check,
         recipient's bare jid is added to value if needed
-    pep_col_name = "{}pep".format(
-                   '' if pep_table is None
-                   else ".{}".format(pep_table))
     if pep:
-        pep_check="AND {}=%s".format(pep_col_name)
+        pep_check="AND {}=%s".format(PEP_COL_NAME)
         values=list(values) + [recipient.userhost()]
-        pep_check="AND {} IS NULL".format(pep_col_name)
+        pep_check="AND {} IS NULL".format(PEP_COL_NAME)
     return "{} {}".format(query, pep_check), values
@@ -261,8 +257,7 @@
                                             WHERE jid=%s) as e""",
                        (node_id, owner))
-        #TODO: manage JID access
-        if config[const.OPT_ACCESS_MODEL] == const.VAL_AMODEL_ROSTER:
+        if config[const.OPT_ACCESS_MODEL] == const.VAL_AMODEL_PUBLISHER_ROSTER:
             if const.OPT_ROSTER_GROUPS_ALLOWED in config:
                 allowed_groups = config[const.OPT_ROSTER_GROUPS_ALLOWED]
@@ -271,6 +266,8 @@
                 #TODO: check that group are actually in roster
                 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname)
                                   VALUES (%s,%s)""" , (node_id, group))
+        # XXX: affiliations can't be set on during node creation (at least not with XEP-0060 alone)
+        #      so whitelist affiliations need to be done afterward
     def deleteNodeByDbId(self, db_id):
         """Delete a node using directly its database id"""
@@ -294,15 +291,7 @@
         if cursor.rowcount != 1:
             raise error.NodeNotFound()
-    def getNodeGroups(self, nodeIdentifier, pep, recipient=None):
-        return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier, pep, recipient)
-    def _getNodeGroups(self, cursor, nodeIdentifier, pep, recipient):
-        cursor.execute(*withPEP("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
-                                (nodeIdentifier,), pep, recipient))
-        rows = cursor.fetchall()
-        return [row[0] for row in rows]
     def getAffiliations(self, entity, pep, recipient=None):
         d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities
@@ -347,7 +336,6 @@
         self.nodeDbId = nodeDbId
         self.nodeIdentifier = nodeIdentifier
         self._config = config
-        self.owner = None;
     def _checkNodeExists(self, cursor):
@@ -360,11 +348,9 @@
     def getType(self):
         return self.nodeType
-    def getNodeOwner(self):
-        if self.owner:
-            return defer.succeed(self.owner)
-        d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s""", (self.nodeDbId,))
-        d.addCallback(lambda result: jid.JID(result[0][0]))
+    def getOwners(self):
+        d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s and affiliation='owner'""", (self.nodeDbId,))
+        d.addCallback(lambda rows: [jid.JID(r[0]) for r in rows])
         return d
@@ -590,7 +576,7 @@
         result = cursor.fetchall()
-        return [(jid.internJID(r[0]), r[1]) for r in result]
+        return {jid.internJID(r[0]): r[1] for r in result}
@@ -640,7 +626,7 @@
         item_id = cursor.fetchone()[0];
         self._storeCategories(cursor, item_id, item_data.categories)
-        if access_model == const.VAL_AMODEL_ROSTER:
+        if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
             if const.OPT_ROSTER_GROUPS_ALLOWED in item_config:
                 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value
                 allowed_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED]
@@ -650,6 +636,7 @@
                 #TODO: check that group are actually in roster
                 cursor.execute("""INSERT INTO item_groups_authorized (item_id, groupname)
                                   VALUES (%s,%s)""" , (item_id, group))
+        # TODO: whitelist access model
     def _storeCategories(self, cursor, item_id, categories, update=False):
         # TODO: handle canonical form
@@ -809,18 +796,20 @@
                 item_id = data[2]
                 date = data[3]
                 access_list = {}
-                if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
+                if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
                     cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
                     access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
                 ret.append(container.ItemData(item, access_model, access_list, date=date))
+                # TODO: whitelist item access model
             return ret
         items_data = [container.ItemData(generic.stripNamespace(parseXml(r[0])), r[1], r[2], date=r[3]) for r in result]
         return items_data
     def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers):
-        """ Get items which are in the given list
+        """Get items which are in the given list
         @param authorized_groups: we want to get items that these groups can access
         @param unrestricted: if true, don't check permissions
         @param itemIdentifiers: list of ids of the items we want to get
@@ -849,9 +838,10 @@
                 item_id = result[2]
                 date= result[3]
                 access_list = {}
-                if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
+                if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
                     cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
                     access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
+                 #TODO: WHITELIST access_model
                 ret.append(container.ItemData(item, access_model, access_list, date=date))
         else: #we check permission before returning items