changeset 430:5a0ada3b61ca

Full-Text Search implementation: /!\ pgsql schema needs to be updated /!\ /!\ Minimal PostgreSQL required version is now 12 /!\ A new options is available to specify main language of a node. By default a `generic` language is used (which uses the `simple` configuration in PostgreSQL). When a node owner changes the language, the index is rebuilt accordingly. It is possible to have item specific language for multilingual nodes (but for the moment the search is done with node language, so the results won't be good). If an item language is explicitely set in `item_languages`, the FTS configuration won't be affected by node FTS language setting. Search is parsed with `websearch_to_tsquery` for now, but this parser doesn't handle prefix matching, so it may be replaced in the future. SetConfiguration now only updates the modified values, this avoid triggering the FTS re-indexing on each config change. `_checkNodeExists` is not called anymore as we can check if a row has been modified to see if the node exists, this avoid a useless query. Item storing has been slighly improved with a useless SELECT and condition removed. To avoid 2 schema updates in a row, the `sat_pubsub_update_5_6.sql` file also prepares the implementation of XEP-0346 by updating nodes with a schema and creating the suitable template nodes.
author Goffi <goffi@goffi.org>
date Fri, 11 Dec 2020 17:18:52 +0100
parents 0526073ff2ab
children 5e8b8ef5c862
files db/pubsub.sql db/sat_pubsub_update_5_6.sql sat_pubsub/backend.py sat_pubsub/const.py sat_pubsub/pgsql_storage.py twisted/plugins/pubsub.py
diffstat 6 files changed, 240 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/db/pubsub.sql	Thu Dec 10 10:46:34 2020 +0100
+++ b/db/pubsub.sql	Fri Dec 11 17:18:52 2020 +0100
@@ -6,7 +6,7 @@
 CREATE TABLE nodes (
     node_id serial PRIMARY KEY,
     node text NOT NULL,
-	pep text,
+    pep text,
     node_type text NOT NULL DEFAULT 'leaf'
         CHECK (node_type IN ('leaf', 'collection')),
     access_model text NOT NULL DEFAULT 'open'
@@ -14,9 +14,10 @@
     persist_items boolean,
     deliver_payloads boolean NOT NULL DEFAULT TRUE,
     max_items integer NOT NULL DEFAULT 0
-		CHECK (max_items >= 0),
+        CHECK (max_items >= 0),
     serial_ids boolean NOT NULL DEFAULT FALSE,
     consistent_publisher boolean NOT NULL DEFAULT FALSE,
+    fts_language text NOT NULL DEFAULT 'generic',
     send_last_published_item text NOT NULL DEFAULT 'on_sub'
         CHECK (send_last_published_item IN ('never', 'on_sub')),
     publish_model text NOT NULL DEFAULT 'publishers'
@@ -52,11 +53,11 @@
     resource text,
     node_id integer NOT NULL REFERENCES nodes ON delete CASCADE,
     state text NOT NULL DEFAULT 'subscribed'
-    	CHECK (state IN ('subscribed', 'pending', 'unconfigured')),
+        CHECK (state IN ('subscribed', 'pending', 'unconfigured')),
     subscription_type text
-    	CHECK (subscription_type IN (NULL, 'items', 'nodes')),
+        CHECK (subscription_type IN (NULL, 'items', 'nodes')),
     subscription_depth text
-    	CHECK (subscription_depth IN (NULL, '1', 'all')),
+        CHECK (subscription_depth IN (NULL, '1', 'all')),
     UNIQUE (entity_id, resource, node_id));
 
 CREATE TABLE items (
@@ -64,7 +65,9 @@
     node_id integer NOT NULL REFERENCES nodes ON DELETE CASCADE,
     item text NOT NULL,
     publisher text NOT NULL,
-    data xml,
+    data xml NOT NULL,
+    data_fts tsvector GENERATED ALWAYS AS (to_tsvector(data_fts_cfg, data::text)) STORED,
+    data_fts_cfg regconfig NOT NULL DEFAULT 'simple',
     access_model text NOT NULL DEFAULT 'open'
         CHECK (access_model IN ('open', 'publisher-roster', 'whitelist')),
     created timestamp with time zone NOT NULL DEFAULT now(),
@@ -100,9 +103,28 @@
     UNIQUE (item_id,category)
 );
 
+/* Full Text Search */
+CREATE INDEX items_data_fts ON items USING GIN (data_fts);
+
+CREATE OR REPLACE FUNCTION update_data_fts() RETURNS TRIGGER AS
+$$
+BEGIN
+    UPDATE items SET data_fts_cfg=replace(new.fts_language, 'generic', 'simple')::regconfig
+        WHERE items.node_id=new.node_id
+            AND NOT EXISTS(SELECT FROM item_languages AS lang WHERE lang.item_id=items.item_id);
+    RETURN new;
+END;
+$$
+language plpgsql;
+
+CREATE TRIGGER nodes_fts_language_update
+     AFTER UPDATE OF fts_language ON nodes
+     FOR EACH ROW
+     EXECUTE PROCEDURE update_data_fts();
+
 CREATE TABLE metadata (
-	key text PRIMARY KEY,
-	value text
+    key text PRIMARY KEY,
+    value text
 );
 
 INSERT INTO metadata VALUES ('version', '5');
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/db/sat_pubsub_update_5_6.sql	Fri Dec 11 17:18:52 2020 +0100
@@ -0,0 +1,104 @@
+-- we check version of the database before doing anything
+-- and stop execution if not good
+\set ON_ERROR_STOP
+DO $$
+DECLARE ver text;
+BEGIN
+    SELECT value INTO ver FROM metadata WHERE key='version';
+    IF NOT FOUND OR ver!='5' THEN
+        RAISE EXCEPTION 'This update file needs to be applied on database schema version 5, you use version %',ver;
+    END IF;
+END$$;
+\unset ON_ERROR_STOP
+-- end of version check
+
+/* NOT NULL constraint was not applied to items.data */
+ALTER TABLE items ALTER COLUMN data SET NOT NULL;
+
+/* Full Text Search */
+ALTER TABLE nodes ADD COLUMN fts_language text NOT NULL DEFAULT 'generic';
+ALTER TABLE items ADD COLUMN data_fts_cfg regconfig NOT NULL DEFAULT 'simple';
+ALTER TABLE items ADD COLUMN data_fts tsvector GENERATED ALWAYS AS (to_tsvector(data_fts_cfg, data::text)) STORED;
+CREATE INDEX items_data_fts ON items USING GIN (data_fts);
+
+CREATE OR REPLACE FUNCTION update_data_fts() RETURNS TRIGGER AS
+$$
+BEGIN
+    UPDATE items SET data_fts_cfg=replace(new.fts_language, 'generic', 'simple')::regconfig
+        WHERE items.node_id=new.node_id
+            AND NOT EXISTS(SELECT FROM item_languages AS lang WHERE lang.item_id=items.item_id);
+    RETURN new;
+END;
+$$
+language plpgsql;
+
+CREATE TRIGGER nodes_fts_language_update
+     AFTER UPDATE OF fts_language ON nodes
+     FOR EACH ROW
+     EXECUTE PROCEDURE update_data_fts();
+
+/* we update nodes with schema to prepare for XEP-0346 implementation */
+
+INSERT INTO nodes(node, pep, persist_items, publish_model, max_items)
+    SELECT 'fdp/template/'||s.node, s.pep, true, s.publish_model, 1
+    FROM (
+        SELECT node_id, node, pep, publish_model, schema
+        FROM nodes
+        WHERE schema IS NOT NULL
+    ) AS s;
+
+INSERT INTO affiliations(entity_id, node_id, affiliation)
+    SELECT aff.entity_id, tpl.node_id, 'owner'
+    FROM (
+        SELECT node_id, node, pep, publish_model, schema
+        FROM nodes
+        WHERE schema IS NOT NULL AND pep IS NOT NULL
+    ) AS s
+    LEFT JOIN nodes AS tpl ON tpl.node='fdp/template/'||s.node AND tpl.pep=s.pep
+    LEFT JOIN affiliations AS aff ON aff.node_id=s.node_id AND aff.affiliation='owner';
+
+/* we need to do a similar request for non PEP nodes */
+INSERT INTO affiliations(entity_id, node_id, affiliation)
+    SELECT aff.entity_id, tpl.node_id, 'owner'
+    FROM (
+        SELECT node_id, node, pep, publish_model, schema
+        FROM nodes
+        WHERE schema IS NOT NULL AND pep IS NULL
+    ) AS s
+    LEFT JOIN nodes AS tpl ON tpl.node='fdp/template/'||s.node AND tpl.pep IS NULL
+    LEFT JOIN affiliations AS aff ON aff.node_id=s.node_id AND aff.affiliation='owner';
+
+INSERT INTO items(node_id, item, publisher, data)
+    SELECT
+        tpl.node_id,
+        'current',
+        e.jid||'/generated',
+        xmlelement(name item, xmlattributes('current' as id, e.jid||'/generated' as publisher), s.schema)
+    FROM (
+        SELECT node_id, node, pep, publish_model, schema
+        FROM nodes
+        WHERE schema IS NOT NULL AND pep IS NOT NULL
+    ) AS s
+    LEFT JOIN nodes AS tpl ON tpl.node='fdp/template/'||s.node AND tpl.pep=s.pep
+    LEFT JOIN affiliations AS aff ON aff.node_id = tpl.node_id AND aff.affiliation='owner'
+    LEFT JOIN entities AS e ON e.entity_id = aff.entity_id;
+
+/* once again for non PEP nodes */
+INSERT INTO items(node_id, item, publisher, data)
+    SELECT
+        tpl.node_id,
+        'current',
+        e.jid||'/generated',
+        xmlelement(name item, xmlattributes('current' as id, e.jid||'/generated' as publisher), s.schema)
+    FROM (
+        SELECT node_id, node, pep, publish_model, schema
+        FROM nodes
+        WHERE schema IS NOT NULL AND pep IS NULL
+    ) AS s
+    LEFT JOIN nodes AS tpl ON tpl.node='fdp/template/'||s.node AND tpl.pep IS NULL
+    LEFT JOIN affiliations AS aff ON aff.node_id = tpl.node_id AND aff.affiliation='owner'
+    LEFT JOIN entities AS e ON e.entity_id = aff.entity_id;
+
+UPDATE nodes SET node='fdp/submitted/'||node WHERE schema IS NOT NULL;
+
+UPDATE metadata SET value='6' WHERE key='version';
--- a/sat_pubsub/backend.py	Thu Dec 10 10:46:34 2020 +0100
+++ b/sat_pubsub/backend.py	Fri Dec 11 17:18:52 2020 +0100
@@ -83,6 +83,9 @@
 from sat_pubsub import const
 from sat_pubsub import container
 
+# TODO: modernise the code, get rid of legacy Twisted logging, use coroutines,
+#   better formatting
+
 
 def _getAffiliation(node, entity):
     d = node.getAffiliation(entity)
@@ -180,6 +183,15 @@
             const.OPT_CONSISTENT_PUBLISHER:
                 {"type": "boolean",
                  "label": "Keep publisher on update"},
+            const.OPT_FTS_LANGUAGE:
+                {"type": "list-single",
+                 "label": "Default language to use for full text search",
+                 "options": {
+                     const.VAL_FTS_GENERIC: (
+                         "Generic (use if unsure of if your language is not present)"
+                     )
+                 }
+                },
             }
 
     subscriptionOptions = {
@@ -203,6 +215,16 @@
         self._callbackList = []
         self.config = config
         self.admins = config['admins_jids_list']
+        d = self.storage.getFTSLanguages()
+        d.addCallbacks(self._getFTSLanguagesCb, self._getFTSLanguagesEb)
+
+    def _getFTSLanguagesCb(self, languages):
+        # we skip the first one which is always "generic", as we already have it
+        for l in languages[1:]:
+            self.nodeOptions[const.OPT_FTS_LANGUAGE]["options"][l] = l.title()
+
+    def _getFTSLanguagesEb(self, failure_):
+        log.msg(f"WARNING: can get FTS languages: {failure_}")
 
     def isAdmin(self, entity_jid):
         """Return True if an entity is an administrator"""
@@ -724,7 +746,7 @@
     def setNodeSchema(self, nodeIdentifier, schema, requestor, pep, recipient):
         """set or remove Schema of a node
 
-        @param nodeIdentifier(unicode): identifier of the pubusb node
+        @param nodeIdentifier(unicode): identifier of the pubsub node
         @param schema(domish.Element, None): schema to set
             None to remove schema
         @param requestor(jid.JID): entity doing the request
--- a/sat_pubsub/const.py	Thu Dec 10 10:46:34 2020 +0100
+++ b/sat_pubsub/const.py	Fri Dec 11 17:18:52 2020 +0100
@@ -68,6 +68,7 @@
 OPT_PUBLISH_MODEL = 'pubsub#publish_model'
 OPT_SERIAL_IDS = 'pubsub#serial_ids'
 OPT_CONSISTENT_PUBLISHER = 'pubsub#consistent_publisher'
+OPT_FTS_LANGUAGE = 'pubsub#fts_language'
 VAL_AMODEL_OPEN = 'open'
 VAL_AMODEL_PRESENCE = 'presence'
 VAL_AMODEL_PUBLISHER_ROSTER = 'publisher-roster'
@@ -81,6 +82,8 @@
 VAL_PMODEL_OPEN = 'open'
 VAL_PMODEL_DEFAULT = VAL_PMODEL_PUBLISHERS
 VAL_RSM_MAX_DEFAULT = 10 # None for no limit
+VAL_FTS_GENERIC = 'generic'
 FLAG_ENABLE_RSM = True
 FLAG_ENABLE_MAM = True
 MAM_FILTER_CATEGORY = 'http://salut-a-toi.org/protocols/mam_filter_category'
+MAM_FILTER_FTS = 'urn:xmpp:fulltext:0'
--- a/sat_pubsub/pgsql_storage.py	Thu Dec 10 10:46:34 2020 +0100
+++ b/sat_pubsub/pgsql_storage.py	Fri Dec 11 17:18:52 2020 +0100
@@ -79,7 +79,7 @@
 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
 ITEMS_SEQ_NAME = 'node_{node_id}_seq'
 PEP_COL_NAME = 'pep'
-CURRENT_VERSION = '5'
+CURRENT_VERSION = '6'
 # retrieve the maximum integer item id + 1
 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'"
 
@@ -105,7 +105,7 @@
 @implementer(iidavoll.IStorage)
 class Storage:
 
-
+    fts_languages = ['generic']
     defaultConfig = {
             'leaf': {
                 const.OPT_PERSIST_ITEMS: True,
@@ -116,6 +116,7 @@
                 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
                 const.OPT_SERIAL_IDS: False,
                 const.OPT_CONSISTENT_PUBLISHER: False,
+                const.OPT_FTS_LANGUAGE: const.VAL_FTS_GENERIC,
             },
             'collection': {
                 const.OPT_DELIVER_PAYLOADS: True,
@@ -156,8 +157,9 @@
                     const.OPT_PUBLISH_MODEL:row[8],
                     const.OPT_SERIAL_IDS:row[9],
                     const.OPT_CONSISTENT_PUBLISHER:row[10],
+                    const.OPT_FTS_LANGUAGE: row[11],
                     }
-            schema = row[11]
+            schema = row[12]
             if schema is not None:
                 schema = parseXml(schema)
             node = LeafNode(row[0], row[1], configuration, schema)
@@ -176,6 +178,18 @@
         else:
             raise BadRequest(text="Unknown node type !")
 
+    def getFTSLanguages(self):
+        """Get list of available languages for full text search"""
+        return self.dbpool.runInteraction(self._getFTSLanguages)
+
+    def _getFTSLanguages(self, cursor):
+        cursor.execute("SELECT cfgname FROM pg_ts_config")
+        result = [r.cfgname for r in cursor.fetchall()]
+        result.remove("simple")
+        result.insert(0, "generic")
+        Node.fts_languages = self.fts_languages = result
+        return result
+
     def getNodeById(self, nodeDbId):
         """Get node using database ID insted of pubsub identifier
 
@@ -195,6 +209,7 @@
                                  publish_model,
                                  serial_ids,
                                  consistent_publisher,
+                                 fts_language,
                                  schema::text,
                                  pep
                             FROM nodes
@@ -218,6 +233,7 @@
                                           publish_model,
                                           serial_ids,
                                           consistent_publisher,
+                                          fts_language,
                                           schema::text,
                                           pep
                                    FROM nodes
@@ -272,10 +288,11 @@
                                publish_model,
                                serial_ids,
                                consistent_publisher,
+                               fts_language,
                                schema,
                                pep)
                               VALUES
-                              (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
+                              (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
                            (nodeIdentifier,
                             config['pubsub#persist_items'],
                             config['pubsub#deliver_payloads'],
@@ -284,6 +301,7 @@
                             config[const.OPT_PUBLISH_MODEL],
                             config[const.OPT_SERIAL_IDS],
                             config[const.OPT_CONSISTENT_PUBLISHER],
+                            config[const.OPT_FTS_LANGUAGE],
                             schema,
                             recipient.userhost() if pep else None
                             )
@@ -504,6 +522,8 @@
         @param old_config(dict): config of the node before the change
         @param new_config(dict): new options that will be changed
         """
+        if const.OPT_SERIAL_IDS not in new_config:
+            return
         serial_ids = new_config[const.OPT_SERIAL_IDS]
         if serial_ids != old_config[const.OPT_SERIAL_IDS]:
             # serial_ids option has been modified,
@@ -525,45 +545,49 @@
                 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name))
 
     def setConfiguration(self, options):
-        config = copy.copy(self._config)
+        to_delete = []
+        for option, value in options.items():
+            try:
+                if self._config[option] == value:
+                    to_delete.append(option)
+            except KeyError:
+                raise BadRequest(text=f"Invalid option: {option!r}")
 
-        for option in options:
-            if option in config:
-                config[option] = options[option]
+        for option in to_delete:
+            options.remove(option)
 
-        if config[const.OPT_MAX_ITEMS] == "max":
+        if not options:
+            return
+
+        if options.get(const.OPT_MAX_ITEMS) == "max":
             # XXX: "max" is default value for config we must convert
             #   it to an interger. See backend's _doSetNodeConfiguration comment
-            config[const.OPT_MAX_ITEMS] = "0"
+            options[const.OPT_MAX_ITEMS] = "0"
 
-        d = self.dbpool.runInteraction(self._setConfiguration, config)
-        d.addCallback(self._setCachedConfiguration, config)
+        if ((const.OPT_FTS_LANGUAGE in options
+             and options[const.OPT_FTS_LANGUAGE] not in self.fts_languages)):
+            raise BadRequest(text=
+                f"invalid {const.OPT_FTS_LANGUAGE} value: "
+                f"{options[const.OPT_FTS_LANGUAGE]!r}"
+            )
+
+
+        d = self.dbpool.runInteraction(self._setConfiguration, options)
+        d.addCallback(self._updateCachedConfiguration, options)
         return d
 
-    def _setConfiguration(self, cursor, config):
-        self._checkNodeExists(cursor)
-        self._configurationTriggers(cursor, self.nodeDbId, self._config, config)
-        cursor.execute("""UPDATE nodes SET persist_items=%s,
-                                           max_items=%s,
-                                           deliver_payloads=%s,
-                                           send_last_published_item=%s,
-                                           access_model=%s,
-                                           publish_model=%s,
-                                           serial_ids=%s,
-                                           consistent_publisher=%s
-                          WHERE node_id=%s""",
-                       (config[const.OPT_PERSIST_ITEMS],
-                        config[const.OPT_MAX_ITEMS],
-                        config[const.OPT_DELIVER_PAYLOADS],
-                        config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
-                        config[const.OPT_ACCESS_MODEL],
-                        config[const.OPT_PUBLISH_MODEL],
-                        config[const.OPT_SERIAL_IDS],
-                        config[const.OPT_CONSISTENT_PUBLISHER],
-                        self.nodeDbId))
+    def _setConfiguration(self, cursor, options):
+        self._configurationTriggers(cursor, self.nodeDbId, self._config, options)
+        # options names all follow the scheme "pubsub#{col_name}"
+        col_names = (o[7:] for o in options)
+        values = ','.join(f"{name}=%s" for name in col_names)
+        cursor.execute(f"UPDATE nodes SET {values} WHERE node_id=%s",
+                       (*options.values(), self.nodeDbId))
+        if cursor.rowcount == 0:
+            raise error.NodeNotFound()
 
-    def _setCachedConfiguration(self, void, config):
-        self._config = config
+    def _updateCachedConfiguration(self, __, options):
+        self._config.update(options)
 
     def getSchema(self):
         return self._schema
@@ -580,7 +604,7 @@
                        (schema.toXml() if schema else None,
                         self.nodeDbId))
 
-    def _setCachedSchema(self, void, schema):
+    def _setCachedSchema(self, __, schema):
         self._schema = schema
 
     def getMetaData(self):
@@ -900,17 +924,21 @@
         # - in other cases, exception is raised
         item, access_model, item_config = item_data.item, item_data.access_model, item_data.config
         data = item.toXml()
+        data_fts_cfg = self._config[const.OPT_FTS_LANGUAGE]
+        if data_fts_cfg == const.VAL_FTS_GENERIC:
+            data_fts_cfg = "simple"
 
-        insert_query = """INSERT INTO items (node_id, item, publisher, data, access_model)
-                                             SELECT %s, %s, %s, %s, %s FROM nodes
-                                                                        WHERE node_id=%s
-                                                                        RETURNING item_id"""
+        insert_query = (
+            "INSERT INTO items(node_id, item, publisher, data, access_model, "
+            "data_fts_cfg) VALUES (%s, %s, %s, %s, %s, %s) RETURNING item_id"
+        )
         insert_data = [self.nodeDbId,
                        item["id"],
                        publisher.full(),
                        data,
                        access_model,
-                       self.nodeDbId]
+                       data_fts_cfg,
+                       ]
 
         try:
             cursor.execute(insert_query, insert_data)
@@ -941,9 +969,7 @@
             else:
                 # this is an update
                 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s
-                                  FROM nodes
-                                  WHERE nodes.node_id = items.node_id AND
-                                        nodes.node_id = %s and items.item=%s
+                                  WHERE node_id=%s AND items.item=%s
                                   RETURNING item_id""",
                                (publisher.full(),
                                 data,
@@ -1089,6 +1115,15 @@
                     else:
                         query_filters.append("AND publisher LIKE %s")
                         args.append("{}%".format(filter_.value))
+                elif filter_.var == const.MAM_FILTER_FTS:
+                    fts_cfg = self._config[const.OPT_FTS_LANGUAGE]
+                    if fts_cfg == const.VAL_FTS_GENERIC:
+                        fts_cfg = "simple"
+                    query_filters.append(
+                        "AND data_fts @@ websearch_to_tsquery(%s, %s)"
+                    )
+                    args.append(fts_cfg)
+                    args.append(filter_.value)
                 elif filter_.var == const.MAM_FILTER_CATEGORY:
                     query.append("LEFT JOIN item_categories USING (item_id)")
                     query_filters.append("AND category=%s")
--- a/twisted/plugins/pubsub.py	Thu Dec 10 10:46:34 2020 +0100
+++ b/twisted/plugins/pubsub.py	Fri Dec 11 17:18:52 2020 +0100
@@ -287,6 +287,7 @@
             mam_resource = pubsub_mam.MAMResource(bs)
             mam_s = mam.MAMService(mam_resource)
             mam_s.addFilter(data_form.Field(var=const.MAM_FILTER_CATEGORY))
+            mam_s.addFilter(data_form.Field(var=const.MAM_FILTER_FTS))
             mam_s.setHandlerParent(cs)
 
         pa = pubsub_admin.PubsubAdminHandler(bs)