changeset 431:5e8b8ef5c862

implentation of XEP-0346 (Form Discovery and Publishing): The former non standard node schema has been replaced by XEP-0346 which uses 2 nodes (one from schema/template and one for submitted values). The implementation is an adapation of the former one, and data validation is still done even if this is not currently specified in any XEP. When the template node is modified, the change is reflected in the node schema.
author Goffi <goffi@goffi.org>
date Fri, 11 Dec 2020 17:19:00 +0100
parents 5a0ada3b61ca
children d9745fe5db46
files sat_pubsub/backend.py sat_pubsub/const.py sat_pubsub/mam.py sat_pubsub/schema.py twisted/plugins/pubsub.py
diffstat 5 files changed, 134 insertions(+), 220 deletions(-) [+]
line wrap: on
line diff
--- a/sat_pubsub/backend.py	Fri Dec 11 17:18:52 2020 +0100
+++ b/sat_pubsub/backend.py	Fri Dec 11 17:19:00 2020 +0100
@@ -62,6 +62,7 @@
 
 import copy
 import uuid
+from typing import Optional
 
 from zope.interface import implementer
 
@@ -69,6 +70,7 @@
 from twisted.python import components, log
 from twisted.internet import defer, reactor
 from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.protocols.jabber import jid
 from twisted.words.xish import domish, utility
 
 from wokkel import disco
@@ -420,7 +422,27 @@
         d.addCallback(doCheck)
         return d
 
+    def _getFDPSubmittedNode(
+        self,
+        nodeIdentifier: str,
+        pep: bool,
+        recipient: jid.JID,
+    ) -> Optional[defer.Deferred]:
+        """Get submitted forms node for Form Discovery and Publishing
+
+        @param nodeIdentifier: template node (must start with const.FDP_TEMPLATE_PREFIX)
+        @êeturn: node or None if the node doesn't exist
+        """
+        app_ns = nodeIdentifier[len(const.FDP_TEMPLATE_PREFIX):]
+        submitted_node_id = f"{const.FDP_SUBMITTED_PREFIX}{app_ns}"
+        try:
+            return self.storage.getNode(submitted_node_id, pep, recipient)
+        except error.NodeNotFound:
+            return None
+
     async def publish(self, nodeIdentifier, items, requestor, options, pep, recipient):
+        if len(items) == 0:
+            raise pubsub.BadRequest(text="no item to publish")
         node = await self.storage.getNode(nodeIdentifier, pep, recipient)
         affiliation, node = await self._checkAuth(node, requestor)
 
@@ -510,6 +532,17 @@
                     # of an other publisher
                     await self._checkOverwrite(node, itemIdentifiers, requestor)
 
+            if node.nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX):
+                schema_item = items_data[-1].item
+                try:
+                    schema = next(schema_item.elements(data_form.NS_X_DATA, 'x'))
+                except StopIteration:
+                    raise pubsub.BadRequest(text="Data form is missing in FDP request")
+                submitted_node = await self._getFDPSubmittedNode(
+                    node.nodeIdentifier, pep, recipient)
+                if submitted_node is not None:
+                    await submitted_node.setSchema(schema)
+
             # TODO: check conflict and recalculate max id if serial_ids is set
             await node.storeItems(items_data, requestor)
 
@@ -628,7 +661,11 @@
                                        'never')
         if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
             entity = subscription.subscriber.userhostJID()
-            d = self.getItemsData(node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep})
+            d = defer.ensureDeferred(
+                self.getItemsData(
+                    node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep}
+                )
+            )
             d.addCallback(notifyItem)
             d.addErrback(log.err)
 
@@ -734,41 +771,6 @@
 
         return await node.setConfiguration(options)
 
-    def getNodeSchema(self, nodeIdentifier, pep, recipient):
-        if not nodeIdentifier:
-            return defer.fail(error.NoRootNode())
-
-        d = self.storage.getNode(nodeIdentifier, pep, recipient)
-        d.addCallback(lambda node: node.getSchema())
-
-        return d
-
-    def setNodeSchema(self, nodeIdentifier, schema, requestor, pep, recipient):
-        """set or remove Schema of a node
-
-        @param nodeIdentifier(unicode): identifier of the pubsub node
-        @param schema(domish.Element, None): schema to set
-            None to remove schema
-        @param requestor(jid.JID): entity doing the request
-        @param pep(bool): True if it's a PEP request
-        @param recipient(jid.JID, None): recipient of the PEP request
-        """
-        if not nodeIdentifier:
-            return defer.fail(error.NoRootNode())
-
-        d = self.storage.getNode(nodeIdentifier, pep, recipient)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doSetNodeSchema, requestor, schema)
-        return d
-
-    def _doSetNodeSchema(self, result, requestor, schema):
-        node, affiliation = result
-
-        if affiliation != 'owner' and not self.isAdmin(requestor):
-            raise error.Forbidden()
-
-        return node.setSchema(schema)
-
     def getAffiliations(self, entity, nodeIdentifier, pep, recipient):
         return self.storage.getAffiliations(entity, nodeIdentifier, pep, recipient)
 
@@ -870,7 +872,7 @@
         return d
 
     def filterItemsWithSchema(self, items_data, schema, owner):
-        """check schema restriction and remove fields/items if they don't comply
+        """Check schema restriction and remove fields/items if they don't comply
 
         @param items_data(list[ItemData]): items to filter
             items in this list will be modified
@@ -1024,7 +1026,11 @@
 
     def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None,
                        itemIdentifiers=None, ext_data=None):
-        d = self.getItemsData(nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data)
+        d = defer.ensureDeferred(
+            self.getItemsData(
+                nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data
+            )
+        )
         d.addCallback(lambda items_data: [item_data.item for item_data in items_data])
         return d
 
@@ -1049,21 +1055,20 @@
             return
         defer.returnValue(roster)
 
-    @defer.inlineCallbacks
-    def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None,
+    async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None,
                        itemIdentifiers=None, ext_data=None):
         """like getItems but return the whole ItemData"""
         if maxItems == 0:
             log.msg("WARNING: maxItems=0 on items retrieval")
-            defer.returnValue([])
+            return []
 
         if ext_data is None:
             ext_data = {}
-        node = yield self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
+        node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
         try:
-            affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor)
+            affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor)
         except error.NotLeafNodeError:
-            defer.returnValue([])
+            return []
 
         # at this point node access is checked
 
@@ -1073,16 +1078,16 @@
         else:
             if roster is None:
                 # FIXME: publisher roster should be used, not owner
-                roster = yield self.getOwnerRoster(node)
+                roster = await self.getOwnerRoster(node)
                 if roster is None:
                     roster = {}
             roster_item = roster.get(requestor.userhostJID())
             requestor_groups = tuple(roster_item.groups) if roster_item else tuple()
 
         if itemIdentifiers:
-            items_data = yield node.getItemsById(requestor_groups, owner, itemIdentifiers)
+            items_data = await node.getItemsById(requestor_groups, owner, itemIdentifiers)
         else:
-            items_data = yield node.getItems(requestor_groups, owner, maxItems, ext_data)
+            items_data = await node.getItems(requestor_groups, owner, maxItems, ext_data)
 
         if owner:
             # Add item config data form to items with roster access model
@@ -1106,8 +1111,9 @@
         if schema is not None:
             self.filterItemsWithSchema(items_data, schema, owner)
 
-        yield self._items_rsm(items_data, node, requestor_groups, owner, itemIdentifiers, ext_data)
-        defer.returnValue(items_data)
+        await self._items_rsm(
+            items_data, node, requestor_groups, owner, itemIdentifiers, ext_data)
+        return items_data
 
     def _setCount(self, value, response):
         response.count = value
@@ -1121,7 +1127,7 @@
         """
         response.index = value + adjust
 
-    def _items_rsm(self, items_data, node, authorized_groups, owner,
+    async def _items_rsm(self, items_data, node, authorized_groups, owner,
                    itemIdentifiers, ext_data):
         # FIXME: move this to a separate module
         # TODO: Index can be optimized by keeping a cache of the last RSM request
@@ -1162,23 +1168,21 @@
                 # the first page was requested
                 response.index = 0
 
-        def render(result):
-            if rsm_request.before == '':
-                # the last page was requested
-                response.index = response.count - len(items_data)
-            items_data.append(container.ItemData(response.toElement()))
-            return items_data
+
+        await defer.DeferredList(d_list)
 
-        return defer.DeferredList(d_list).addCallback(render)
+        if rsm_request.before == '':
+            # the last page was requested
+            response.index = response.count - len(items_data)
 
-    def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
-        d = self.storage.getNode(nodeIdentifier, pep, recipient)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doRetract, itemIdentifiers, requestor, notify, pep, recipient)
-        return d
+        items_data.append(container.ItemData(response.toElement()))
+
+        return items_data
 
-    def _doRetract(self, result, itemIdentifiers, requestor, notify, pep, recipient):
-        node, affiliation = result
+    async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
+        node = await self.storage.getNode(nodeIdentifier, pep, recipient)
+        node, affiliation = await _getAffiliation(node, requestor)
+
         persistItems = node.getConfiguration()[const.OPT_PERSIST_ITEMS]
 
         if not persistItems:
@@ -1186,18 +1190,11 @@
 
         # we need to get the items before removing them, for the notifications
 
-        def removeItems(items_data):
-            """Remove the items and keep only actually removed ones in items_data"""
-            d = node.removeItems(itemIdentifiers)
-            d.addCallback(lambda removed: [item_data for item_data in items_data if item_data.item["id"] in removed])
-            return d
-
-        def checkPublishers(publishers_map):
-            """Called when requestor is neither owner neither publisher of the Node
-
-            We check that requestor is publisher of all the items he wants to retract
-            and raise error.Forbidden if it is not the case
-            """
+        if affiliation not in ['owner', 'publisher']:
+            # the requestor doesn't have right to retract on the whole node
+            # we check if he is a publisher for all items he wants to retract
+            # and forbid the retraction else.
+            publishers_map = await node.getItemsPublishers(itemIdentifiers)
             # TODO: the behaviour should be configurable (per node ?)
             if (any((requestor.userhostJID() != publisher.userhostJID()
                     for publisher in publishers_map.values()))
@@ -1205,22 +1202,39 @@
                ):
                 raise error.Forbidden()
 
-        if affiliation in ['owner', 'publisher']:
-            # the requestor is owner or publisher of the node
-            # he can retract what he wants
-            d = defer.succeed(None)
-        else:
-            # the requestor doesn't have right to retract on the whole node
-            # we check if he is a publisher for all items he wants to retract
-            # and forbid the retraction else.
-            d = node.getItemsPublishers(itemIdentifiers)
-            d.addCallback(checkPublishers)
-        d.addCallback(lambda dummy: node.getItemsById(None, True, itemIdentifiers))
-        d.addCallback(removeItems)
+        items_data = await node.getItemsById(None, True, itemIdentifiers)
+        # Remove the items and keep only actually removed ones in items_data
+        removed = await node.removeItems(itemIdentifiers)
+        retracted_items = [
+            item_data for item_data in items_data if item_data.item["id"] in removed
+        ]
+        if nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX):
+            app_ns = nodeIdentifier[len(const.FDP_TEMPLATE_PREFIX):]
+            submitted_node_id = f"{const.FDP_SUBMITTED_PREFIX}{app_ns}"
+            submitted_node = await self.storage.getNode(submitted_node_id, pep, recipient)
+            schema_items = await node.getItems([], True, maxItems=1)
+            if not schema_items:
+                # no more schema, we need to remove it from submitted node
+                submitted_node = await self._getFDPSubmittedNode(
+                    nodeIdentifier, pep, recipient)
+                if submitted_node is not None:
+                    submitted_node.setSchema(None)
+            else:
+                # not all items have been removed from the FDP template node, we check
+                # if the last one correspond to current submitted node schema, and if not,
+                # we update it.
+                current_schema = next(schema_items[0].item.elements(
+                    data_form.NS_X_DATA, 'x'))
+                if current_schema == node.schema:
+                    submitted_node = await self._getFDPSubmittedNode(
+                        nodeIdentifier, pep, recipient)
+                    if submitted_node is not None:
+                        submitted_node.setSchema(current_schema)
 
         if notify:
-            d.addCallback(self._doNotifyRetraction, node, pep, recipient)
-        return d
+            await self._doNotifyRetraction(retracted_items, node, pep, recipient)
+
+        return retracted_items
 
     def _doNotifyRetraction(self, items_data, node, pep, recipient):
         self.dispatch({'items_data': items_data,
@@ -1264,38 +1278,36 @@
         d.addCallback(cb)
         return d
 
-    def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None):
-        d = self.storage.getNode(nodeIdentifier, pep, recipient)
-        d.addCallback(_getAffiliation, requestor)
-        d.addCallback(self._doPreDelete, requestor, redirectURI, pep, recipient)
-        return d
-
-    def _doPreDelete(self, result, requestor, redirectURI, pep, recipient):
-        node, affiliation = result
+    async def deleteNode(self, nodeIdentifier, requestor, pep, recipient, redirectURI=None):
+        node = await self.storage.getNode(nodeIdentifier, pep, recipient)
+        node, affiliation = await _getAffiliation(node, requestor)
 
         if affiliation != 'owner' and not self.isAdmin(requestor):
             raise error.Forbidden()
 
-        data = {'node': node,
-                'redirectURI': redirectURI}
+        data = {
+            'node': node,
+            'redirectURI': redirectURI
+        }
 
         d = defer.DeferredList([cb(data, pep, recipient)
                                 for cb in self._callbackList],
                                consumeErrors=1)
-        d.addCallback(self._doDelete, node.nodeDbId)
-
-    def _doDelete(self, result, nodeDbId):
+        result = await d
         dl = []
         for succeeded, r in result:
             if succeeded and r:
                 dl.extend(r)
 
-        d = self.storage.deleteNodeByDbId(nodeDbId)
-        d.addCallback(self._doNotifyDelete, dl)
+        await self.storage.deleteNodeByDbId(node.nodeDbId)
 
-        return d
+        if node.nodeIdentifier.startswith(const.FDP_TEMPLATE_PREFIX):
+            # we need to delete the associated schema
+            submitted_node = await self._getFDPSubmittedNode(
+                node.nodeIdentifier, pep, recipient)
+            if submitted_node is not None:
+                await submitted_node.setSchema(None)
 
-    def _doNotifyDelete(self, result, dl):
         for d in dl:
             d.callback(None)
 
@@ -1802,12 +1814,14 @@
         return d.addErrback(self._mapErrors)
 
     def retract(self, request):
-        d = self.backend.retractItem(request.nodeIdentifier,
+        d = defer.ensureDeferred(
+            self.backend.retractItem(request.nodeIdentifier,
                                      request.itemIdentifiers,
                                      request.sender,
                                      request.notify,
                                      self._isPep(request),
                                      request.recipient)
+        )
         return d.addErrback(self._mapErrors)
 
     def purge(self, request):
@@ -1818,10 +1832,12 @@
         return d.addErrback(self._mapErrors)
 
     def delete(self, request):
-        d = self.backend.deleteNode(request.nodeIdentifier,
+        d = defer.ensureDeferred(
+            self.backend.deleteNode(request.nodeIdentifier,
                                     request.sender,
                                     self._isPep(request),
                                     request.recipient)
+        )
         return d.addErrback(self._mapErrors)
 
 components.registerAdapter(PubSubResourceFromBackend,
@@ -1836,7 +1852,7 @@
     # FIXME: upstream must be fixed so we can use custom (non pubsub#) disco features
 
     def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
-        return [disco.DiscoFeature(pubsub.NS_ORDER_BY)]
+        return [disco.DiscoFeature(pubsub.NS_ORDER_BY), disco.DiscoFeature(const.NS_FDP)]
 
     def getDiscoItems(self, requestor, service, nodeIdentifier=''):
         return []
--- a/sat_pubsub/const.py	Fri Dec 11 17:18:52 2020 +0100
+++ b/sat_pubsub/const.py	Fri Dec 11 17:19:00 2020 +0100
@@ -56,9 +56,12 @@
 NS_ITEM_CONFIG = "http://jabber.org/protocol/pubsub#item-config"
 NS_ATOM = "http://www.w3.org/2005/Atom"
 NS_FORWARD = 'urn:xmpp:forward:0'
-NS_SCHEMA = 'https://salut-a-toi/protocol/schema:0'
+NS_FDP = 'urn:xmpp:fdp:0'
 NS_SCHEMA_RESTRICT = 'https://salut-a-toi/protocol/schema#restrict:0'
 
+FDP_TEMPLATE_PREFIX = "fdp/template/"
+FDP_SUBMITTED_PREFIX = "fdp/submitted/"
+
 OPT_ACCESS_MODEL = 'pubsub#access_model'
 OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed'
 OPT_PERSIST_ITEMS = "pubsub#persist_items"
--- a/sat_pubsub/mam.py	Fri Dec 11 17:18:52 2020 +0100
+++ b/sat_pubsub/mam.py	Fri Dec 11 17:19:00 2020 +0100
@@ -29,6 +29,7 @@
 
 from twisted.words.xish import domish
 from twisted.python import log
+from twisted.internet import defer
 from twisted.words.protocols.jabber import error
 
 from sat_pubsub import const
@@ -88,8 +89,10 @@
         if mam_request.orderBy:
             ext_data['order_by'] = mam_request.orderBy
 
-        d = self.backend.getItemsData(mam_request.node, mam_request.sender,
+        d = defer.ensureDeferred(
+            self.backend.getItemsData(mam_request.node, mam_request.sender,
                                       mam_request.recipient, None, None, ext_data)
+        )
 
         def make_message(elt):
             # XXX: http://xmpp.org/extensions/xep-0297.html#sect-idp629952 (rule 3)
--- a/sat_pubsub/schema.py	Fri Dec 11 17:18:52 2020 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,104 +0,0 @@
-#!/usr/bin/env python3
-#-*- coding: utf-8 -*-
-#
-# Copyright (c) 2015 Jérôme Poisson
-
-
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU Affero General Public License for more details.
-
-# You should have received a copy of the GNU Affero General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-# ---
-
-# This module implements node schema
-
-from twisted.words.protocols.jabber import jid
-from twisted.words.xish import domish
-from wokkel import disco, iwokkel
-from wokkel.iwokkel import IPubSubService
-from wokkel.subprotocols import XMPPHandler, IQHandlerMixin
-from wokkel import data_form, pubsub
-from zope.interface import implementer
-from sat_pubsub import const
-
-QUERY_SCHEMA = "/pubsub[@xmlns='" + const.NS_SCHEMA + "']"
-
-
-@implementer(iwokkel.IDisco)
-class SchemaHandler(XMPPHandler, IQHandlerMixin):
-    iqHandlers = {"/iq[@type='get']" + QUERY_SCHEMA: 'onSchemaGet',
-                  "/iq[@type='set']" + QUERY_SCHEMA: 'onSchemaSet'}
-
-    def __init__(self):
-        super(SchemaHandler, self).__init__()
-        self.pubsub_service = None
-
-    def connectionInitialized(self):
-        for handler in self.parent.handlers:
-            if IPubSubService.providedBy(handler):
-                self.pubsub_service = handler
-                break
-        self.backend = self.parent.parent.getServiceNamed('backend')
-        self.xmlstream.addObserver("/iq[@type='get' or @type='set']" + QUERY_SCHEMA, self.handleRequest)
-
-    def _getNodeSchemaCb(self, x_elt, nodeIdentifier):
-        schema_elt = domish.Element((const.NS_SCHEMA, 'schema'))
-        schema_elt['node'] = nodeIdentifier
-        if x_elt is not None:
-            assert x_elt.uri == 'jabber:x:data'
-            schema_elt.addChild(x_elt)
-        return schema_elt
-
-    def onSchemaGet(self, iq_elt):
-        try:
-            schema_elt = next(iq_elt.pubsub.elements(const.NS_SCHEMA, 'schema'))
-            nodeIdentifier = schema_elt['node']
-        except StopIteration:
-            raise pubsub.BadRequest(text='missing schema element')
-        except KeyError:
-            raise pubsub.BadRequest(text='missing node')
-        pep = iq_elt.delegated
-        recipient = jid.JID(iq_elt['to'])
-        d = self.backend.getNodeSchema(nodeIdentifier,
-                                       pep,
-                                       recipient)
-        d.addCallback(self._getNodeSchemaCb, nodeIdentifier)
-        return d.addErrback(self.pubsub_service.resource._mapErrors)
-
-    def onSchemaSet(self, iq_elt):
-        try:
-            schema_elt = next(iq_elt.pubsub.elements(const.NS_SCHEMA, 'schema'))
-            nodeIdentifier = schema_elt['node']
-        except StopIteration:
-            raise pubsub.BadRequest(text='missing schema element')
-        except KeyError:
-            raise pubsub.BadRequest(text='missing node')
-        requestor = jid.JID(iq_elt['from'])
-        pep = iq_elt.delegated
-        recipient = jid.JID(iq_elt['to'])
-        try:
-            x_elt = next(schema_elt.elements(data_form.NS_X_DATA, 'x'))
-        except StopIteration:
-            # no schema form has been found
-            x_elt = None
-        d = self.backend.setNodeSchema(nodeIdentifier,
-                                       x_elt,
-                                       requestor,
-                                       pep,
-                                       recipient)
-        return d.addErrback(self.pubsub_service.resource._mapErrors)
-
-    def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
-        return [disco.DiscoFeature(const.NS_SCHEMA)]
-
-    def getDiscoItems(self, requestor, service, nodeIdentifier=''):
-        return []
--- a/twisted/plugins/pubsub.py	Fri Dec 11 17:18:52 2020 +0100
+++ b/twisted/plugins/pubsub.py	Fri Dec 11 17:19:00 2020 +0100
@@ -214,7 +214,6 @@
         from sat_pubsub import mam as pubsub_mam
         from sat_pubsub import pubsub_admin
         from sat_pubsub.backend import BackendService, ExtraDiscoHandler
-        from sat_pubsub.schema import SchemaHandler
         from sat_pubsub.privilege import PrivilegesHandler
         from sat_pubsub.delegation import DelegationsHandler
 
@@ -293,9 +292,6 @@
         pa = pubsub_admin.PubsubAdminHandler(bs)
         pa.setHandlerParent(cs)
 
-        sh = SchemaHandler()
-        sh.setHandlerParent(cs)
-
         # wokkel.pubsub doesn't handle non pubsub# disco
         # and we need to announce other feature, so this is a workaround
         # to add them