diff sat_pubsub/pgsql_storage.py @ 430:5a0ada3b61ca

Full-Text Search implementation: /!\ pgsql schema needs to be updated /!\ /!\ Minimal PostgreSQL required version is now 12 /!\ A new options is available to specify main language of a node. By default a `generic` language is used (which uses the `simple` configuration in PostgreSQL). When a node owner changes the language, the index is rebuilt accordingly. It is possible to have item specific language for multilingual nodes (but for the moment the search is done with node language, so the results won't be good). If an item language is explicitely set in `item_languages`, the FTS configuration won't be affected by node FTS language setting. Search is parsed with `websearch_to_tsquery` for now, but this parser doesn't handle prefix matching, so it may be replaced in the future. SetConfiguration now only updates the modified values, this avoid triggering the FTS re-indexing on each config change. `_checkNodeExists` is not called anymore as we can check if a row has been modified to see if the node exists, this avoid a useless query. Item storing has been slighly improved with a useless SELECT and condition removed. To avoid 2 schema updates in a row, the `sat_pubsub_update_5_6.sql` file also prepares the implementation of XEP-0346 by updating nodes with a schema and creating the suitable template nodes.
author Goffi <goffi@goffi.org>
date Fri, 11 Dec 2020 17:18:52 +0100
parents 0526073ff2ab
children 920440200570
line wrap: on
line diff
--- a/sat_pubsub/pgsql_storage.py	Thu Dec 10 10:46:34 2020 +0100
+++ b/sat_pubsub/pgsql_storage.py	Fri Dec 11 17:18:52 2020 +0100
@@ -79,7 +79,7 @@
 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
 ITEMS_SEQ_NAME = 'node_{node_id}_seq'
 PEP_COL_NAME = 'pep'
-CURRENT_VERSION = '5'
+CURRENT_VERSION = '6'
 # retrieve the maximum integer item id + 1
 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'"
 
@@ -105,7 +105,7 @@
 @implementer(iidavoll.IStorage)
 class Storage:
 
-
+    fts_languages = ['generic']
     defaultConfig = {
             'leaf': {
                 const.OPT_PERSIST_ITEMS: True,
@@ -116,6 +116,7 @@
                 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
                 const.OPT_SERIAL_IDS: False,
                 const.OPT_CONSISTENT_PUBLISHER: False,
+                const.OPT_FTS_LANGUAGE: const.VAL_FTS_GENERIC,
             },
             'collection': {
                 const.OPT_DELIVER_PAYLOADS: True,
@@ -156,8 +157,9 @@
                     const.OPT_PUBLISH_MODEL:row[8],
                     const.OPT_SERIAL_IDS:row[9],
                     const.OPT_CONSISTENT_PUBLISHER:row[10],
+                    const.OPT_FTS_LANGUAGE: row[11],
                     }
-            schema = row[11]
+            schema = row[12]
             if schema is not None:
                 schema = parseXml(schema)
             node = LeafNode(row[0], row[1], configuration, schema)
@@ -176,6 +178,18 @@
         else:
             raise BadRequest(text="Unknown node type !")
 
+    def getFTSLanguages(self):
+        """Get list of available languages for full text search"""
+        return self.dbpool.runInteraction(self._getFTSLanguages)
+
+    def _getFTSLanguages(self, cursor):
+        cursor.execute("SELECT cfgname FROM pg_ts_config")
+        result = [r.cfgname for r in cursor.fetchall()]
+        result.remove("simple")
+        result.insert(0, "generic")
+        Node.fts_languages = self.fts_languages = result
+        return result
+
     def getNodeById(self, nodeDbId):
         """Get node using database ID insted of pubsub identifier
 
@@ -195,6 +209,7 @@
                                  publish_model,
                                  serial_ids,
                                  consistent_publisher,
+                                 fts_language,
                                  schema::text,
                                  pep
                             FROM nodes
@@ -218,6 +233,7 @@
                                           publish_model,
                                           serial_ids,
                                           consistent_publisher,
+                                          fts_language,
                                           schema::text,
                                           pep
                                    FROM nodes
@@ -272,10 +288,11 @@
                                publish_model,
                                serial_ids,
                                consistent_publisher,
+                               fts_language,
                                schema,
                                pep)
                               VALUES
-                              (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
+                              (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
                            (nodeIdentifier,
                             config['pubsub#persist_items'],
                             config['pubsub#deliver_payloads'],
@@ -284,6 +301,7 @@
                             config[const.OPT_PUBLISH_MODEL],
                             config[const.OPT_SERIAL_IDS],
                             config[const.OPT_CONSISTENT_PUBLISHER],
+                            config[const.OPT_FTS_LANGUAGE],
                             schema,
                             recipient.userhost() if pep else None
                             )
@@ -504,6 +522,8 @@
         @param old_config(dict): config of the node before the change
         @param new_config(dict): new options that will be changed
         """
+        if const.OPT_SERIAL_IDS not in new_config:
+            return
         serial_ids = new_config[const.OPT_SERIAL_IDS]
         if serial_ids != old_config[const.OPT_SERIAL_IDS]:
             # serial_ids option has been modified,
@@ -525,45 +545,49 @@
                 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name))
 
     def setConfiguration(self, options):
-        config = copy.copy(self._config)
+        to_delete = []
+        for option, value in options.items():
+            try:
+                if self._config[option] == value:
+                    to_delete.append(option)
+            except KeyError:
+                raise BadRequest(text=f"Invalid option: {option!r}")
 
-        for option in options:
-            if option in config:
-                config[option] = options[option]
+        for option in to_delete:
+            options.remove(option)
 
-        if config[const.OPT_MAX_ITEMS] == "max":
+        if not options:
+            return
+
+        if options.get(const.OPT_MAX_ITEMS) == "max":
             # XXX: "max" is default value for config we must convert
             #   it to an interger. See backend's _doSetNodeConfiguration comment
-            config[const.OPT_MAX_ITEMS] = "0"
+            options[const.OPT_MAX_ITEMS] = "0"
 
-        d = self.dbpool.runInteraction(self._setConfiguration, config)
-        d.addCallback(self._setCachedConfiguration, config)
+        if ((const.OPT_FTS_LANGUAGE in options
+             and options[const.OPT_FTS_LANGUAGE] not in self.fts_languages)):
+            raise BadRequest(text=
+                f"invalid {const.OPT_FTS_LANGUAGE} value: "
+                f"{options[const.OPT_FTS_LANGUAGE]!r}"
+            )
+
+
+        d = self.dbpool.runInteraction(self._setConfiguration, options)
+        d.addCallback(self._updateCachedConfiguration, options)
         return d
 
-    def _setConfiguration(self, cursor, config):
-        self._checkNodeExists(cursor)
-        self._configurationTriggers(cursor, self.nodeDbId, self._config, config)
-        cursor.execute("""UPDATE nodes SET persist_items=%s,
-                                           max_items=%s,
-                                           deliver_payloads=%s,
-                                           send_last_published_item=%s,
-                                           access_model=%s,
-                                           publish_model=%s,
-                                           serial_ids=%s,
-                                           consistent_publisher=%s
-                          WHERE node_id=%s""",
-                       (config[const.OPT_PERSIST_ITEMS],
-                        config[const.OPT_MAX_ITEMS],
-                        config[const.OPT_DELIVER_PAYLOADS],
-                        config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
-                        config[const.OPT_ACCESS_MODEL],
-                        config[const.OPT_PUBLISH_MODEL],
-                        config[const.OPT_SERIAL_IDS],
-                        config[const.OPT_CONSISTENT_PUBLISHER],
-                        self.nodeDbId))
+    def _setConfiguration(self, cursor, options):
+        self._configurationTriggers(cursor, self.nodeDbId, self._config, options)
+        # options names all follow the scheme "pubsub#{col_name}"
+        col_names = (o[7:] for o in options)
+        values = ','.join(f"{name}=%s" for name in col_names)
+        cursor.execute(f"UPDATE nodes SET {values} WHERE node_id=%s",
+                       (*options.values(), self.nodeDbId))
+        if cursor.rowcount == 0:
+            raise error.NodeNotFound()
 
-    def _setCachedConfiguration(self, void, config):
-        self._config = config
+    def _updateCachedConfiguration(self, __, options):
+        self._config.update(options)
 
     def getSchema(self):
         return self._schema
@@ -580,7 +604,7 @@
                        (schema.toXml() if schema else None,
                         self.nodeDbId))
 
-    def _setCachedSchema(self, void, schema):
+    def _setCachedSchema(self, __, schema):
         self._schema = schema
 
     def getMetaData(self):
@@ -900,17 +924,21 @@
         # - in other cases, exception is raised
         item, access_model, item_config = item_data.item, item_data.access_model, item_data.config
         data = item.toXml()
+        data_fts_cfg = self._config[const.OPT_FTS_LANGUAGE]
+        if data_fts_cfg == const.VAL_FTS_GENERIC:
+            data_fts_cfg = "simple"
 
-        insert_query = """INSERT INTO items (node_id, item, publisher, data, access_model)
-                                             SELECT %s, %s, %s, %s, %s FROM nodes
-                                                                        WHERE node_id=%s
-                                                                        RETURNING item_id"""
+        insert_query = (
+            "INSERT INTO items(node_id, item, publisher, data, access_model, "
+            "data_fts_cfg) VALUES (%s, %s, %s, %s, %s, %s) RETURNING item_id"
+        )
         insert_data = [self.nodeDbId,
                        item["id"],
                        publisher.full(),
                        data,
                        access_model,
-                       self.nodeDbId]
+                       data_fts_cfg,
+                       ]
 
         try:
             cursor.execute(insert_query, insert_data)
@@ -941,9 +969,7 @@
             else:
                 # this is an update
                 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s
-                                  FROM nodes
-                                  WHERE nodes.node_id = items.node_id AND
-                                        nodes.node_id = %s and items.item=%s
+                                  WHERE node_id=%s AND items.item=%s
                                   RETURNING item_id""",
                                (publisher.full(),
                                 data,
@@ -1089,6 +1115,15 @@
                     else:
                         query_filters.append("AND publisher LIKE %s")
                         args.append("{}%".format(filter_.value))
+                elif filter_.var == const.MAM_FILTER_FTS:
+                    fts_cfg = self._config[const.OPT_FTS_LANGUAGE]
+                    if fts_cfg == const.VAL_FTS_GENERIC:
+                        fts_cfg = "simple"
+                    query_filters.append(
+                        "AND data_fts @@ websearch_to_tsquery(%s, %s)"
+                    )
+                    args.append(fts_cfg)
+                    args.append(filter_.value)
                 elif filter_.var == const.MAM_FILTER_CATEGORY:
                     query.append("LEFT JOIN item_categories USING (item_id)")
                     query_filters.append("AND category=%s")