comparison sat_pubsub/pgsql_storage.py @ 430:5a0ada3b61ca

Full-Text Search implementation: /!\ pgsql schema needs to be updated /!\ /!\ Minimal PostgreSQL required version is now 12 /!\ A new options is available to specify main language of a node. By default a `generic` language is used (which uses the `simple` configuration in PostgreSQL). When a node owner changes the language, the index is rebuilt accordingly. It is possible to have item specific language for multilingual nodes (but for the moment the search is done with node language, so the results won't be good). If an item language is explicitely set in `item_languages`, the FTS configuration won't be affected by node FTS language setting. Search is parsed with `websearch_to_tsquery` for now, but this parser doesn't handle prefix matching, so it may be replaced in the future. SetConfiguration now only updates the modified values, this avoid triggering the FTS re-indexing on each config change. `_checkNodeExists` is not called anymore as we can check if a row has been modified to see if the node exists, this avoid a useless query. Item storing has been slighly improved with a useless SELECT and condition removed. To avoid 2 schema updates in a row, the `sat_pubsub_update_5_6.sql` file also prepares the implementation of XEP-0346 by updating nodes with a schema and creating the suitable template nodes.
author Goffi <goffi@goffi.org>
date Fri, 11 Dec 2020 17:18:52 +0100
parents 0526073ff2ab
children 920440200570
comparison
equal deleted inserted replaced
429:0526073ff2ab 430:5a0ada3b61ca
77 77
78 # parseXml manage str, but we get unicode 78 # parseXml manage str, but we get unicode
79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) 79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
80 ITEMS_SEQ_NAME = 'node_{node_id}_seq' 80 ITEMS_SEQ_NAME = 'node_{node_id}_seq'
81 PEP_COL_NAME = 'pep' 81 PEP_COL_NAME = 'pep'
82 CURRENT_VERSION = '5' 82 CURRENT_VERSION = '6'
83 # retrieve the maximum integer item id + 1 83 # retrieve the maximum integer item id + 1
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" 84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'"
85 85
86 86
87 def withPEP(query, values, pep, recipient): 87 def withPEP(query, values, pep, recipient):
103 103
104 104
105 @implementer(iidavoll.IStorage) 105 @implementer(iidavoll.IStorage)
106 class Storage: 106 class Storage:
107 107
108 108 fts_languages = ['generic']
109 defaultConfig = { 109 defaultConfig = {
110 'leaf': { 110 'leaf': {
111 const.OPT_PERSIST_ITEMS: True, 111 const.OPT_PERSIST_ITEMS: True,
112 const.OPT_MAX_ITEMS: 'max', 112 const.OPT_MAX_ITEMS: 'max',
113 const.OPT_DELIVER_PAYLOADS: True, 113 const.OPT_DELIVER_PAYLOADS: True,
114 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', 114 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
115 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 115 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
116 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, 116 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
117 const.OPT_SERIAL_IDS: False, 117 const.OPT_SERIAL_IDS: False,
118 const.OPT_CONSISTENT_PUBLISHER: False, 118 const.OPT_CONSISTENT_PUBLISHER: False,
119 const.OPT_FTS_LANGUAGE: const.VAL_FTS_GENERIC,
119 }, 120 },
120 'collection': { 121 'collection': {
121 const.OPT_DELIVER_PAYLOADS: True, 122 const.OPT_DELIVER_PAYLOADS: True,
122 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', 123 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
123 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 124 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
154 const.OPT_SEND_LAST_PUBLISHED_ITEM: row[6], 155 const.OPT_SEND_LAST_PUBLISHED_ITEM: row[6],
155 const.OPT_ACCESS_MODEL:row[7], 156 const.OPT_ACCESS_MODEL:row[7],
156 const.OPT_PUBLISH_MODEL:row[8], 157 const.OPT_PUBLISH_MODEL:row[8],
157 const.OPT_SERIAL_IDS:row[9], 158 const.OPT_SERIAL_IDS:row[9],
158 const.OPT_CONSISTENT_PUBLISHER:row[10], 159 const.OPT_CONSISTENT_PUBLISHER:row[10],
160 const.OPT_FTS_LANGUAGE: row[11],
159 } 161 }
160 schema = row[11] 162 schema = row[12]
161 if schema is not None: 163 if schema is not None:
162 schema = parseXml(schema) 164 schema = parseXml(schema)
163 node = LeafNode(row[0], row[1], configuration, schema) 165 node = LeafNode(row[0], row[1], configuration, schema)
164 node.dbpool = self.dbpool 166 node.dbpool = self.dbpool
165 return node 167 return node
174 node.dbpool = self.dbpool 176 node.dbpool = self.dbpool
175 return node 177 return node
176 else: 178 else:
177 raise BadRequest(text="Unknown node type !") 179 raise BadRequest(text="Unknown node type !")
178 180
181 def getFTSLanguages(self):
182 """Get list of available languages for full text search"""
183 return self.dbpool.runInteraction(self._getFTSLanguages)
184
185 def _getFTSLanguages(self, cursor):
186 cursor.execute("SELECT cfgname FROM pg_ts_config")
187 result = [r.cfgname for r in cursor.fetchall()]
188 result.remove("simple")
189 result.insert(0, "generic")
190 Node.fts_languages = self.fts_languages = result
191 return result
192
179 def getNodeById(self, nodeDbId): 193 def getNodeById(self, nodeDbId):
180 """Get node using database ID insted of pubsub identifier 194 """Get node using database ID insted of pubsub identifier
181 195
182 @param nodeDbId(unicode): database ID 196 @param nodeDbId(unicode): database ID
183 """ 197 """
193 send_last_published_item, 207 send_last_published_item,
194 access_model, 208 access_model,
195 publish_model, 209 publish_model,
196 serial_ids, 210 serial_ids,
197 consistent_publisher, 211 consistent_publisher,
212 fts_language,
198 schema::text, 213 schema::text,
199 pep 214 pep
200 FROM nodes 215 FROM nodes
201 WHERE node_id=%s""", 216 WHERE node_id=%s""",
202 (nodeDbId,)) 217 (nodeDbId,))
216 send_last_published_item, 231 send_last_published_item,
217 access_model, 232 access_model,
218 publish_model, 233 publish_model,
219 serial_ids, 234 serial_ids,
220 consistent_publisher, 235 consistent_publisher,
236 fts_language,
221 schema::text, 237 schema::text,
222 pep 238 pep
223 FROM nodes 239 FROM nodes
224 WHERE node=%s""", 240 WHERE node=%s""",
225 (nodeIdentifier,), pep, recipient)) 241 (nodeIdentifier,), pep, recipient))
270 send_last_published_item, 286 send_last_published_item,
271 access_model, 287 access_model,
272 publish_model, 288 publish_model,
273 serial_ids, 289 serial_ids,
274 consistent_publisher, 290 consistent_publisher,
291 fts_language,
275 schema, 292 schema,
276 pep) 293 pep)
277 VALUES 294 VALUES
278 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s)""", 295 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
279 (nodeIdentifier, 296 (nodeIdentifier,
280 config['pubsub#persist_items'], 297 config['pubsub#persist_items'],
281 config['pubsub#deliver_payloads'], 298 config['pubsub#deliver_payloads'],
282 config['pubsub#send_last_published_item'], 299 config['pubsub#send_last_published_item'],
283 config[const.OPT_ACCESS_MODEL], 300 config[const.OPT_ACCESS_MODEL],
284 config[const.OPT_PUBLISH_MODEL], 301 config[const.OPT_PUBLISH_MODEL],
285 config[const.OPT_SERIAL_IDS], 302 config[const.OPT_SERIAL_IDS],
286 config[const.OPT_CONSISTENT_PUBLISHER], 303 config[const.OPT_CONSISTENT_PUBLISHER],
304 config[const.OPT_FTS_LANGUAGE],
287 schema, 305 schema,
288 recipient.userhost() if pep else None 306 recipient.userhost() if pep else None
289 ) 307 )
290 ) 308 )
291 except cursor._pool.dbapi.IntegrityError as e: 309 except cursor._pool.dbapi.IntegrityError as e:
502 @param cursor(): current db cursor 520 @param cursor(): current db cursor
503 @param node_id(unicode): database ID of the node 521 @param node_id(unicode): database ID of the node
504 @param old_config(dict): config of the node before the change 522 @param old_config(dict): config of the node before the change
505 @param new_config(dict): new options that will be changed 523 @param new_config(dict): new options that will be changed
506 """ 524 """
525 if const.OPT_SERIAL_IDS not in new_config:
526 return
507 serial_ids = new_config[const.OPT_SERIAL_IDS] 527 serial_ids = new_config[const.OPT_SERIAL_IDS]
508 if serial_ids != old_config[const.OPT_SERIAL_IDS]: 528 if serial_ids != old_config[const.OPT_SERIAL_IDS]:
509 # serial_ids option has been modified, 529 # serial_ids option has been modified,
510 # we need to handle corresponding sequence 530 # we need to handle corresponding sequence
511 531
523 next_val = next_val)) 543 next_val = next_val))
524 else: 544 else:
525 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name)) 545 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name))
526 546
527 def setConfiguration(self, options): 547 def setConfiguration(self, options):
528 config = copy.copy(self._config) 548 to_delete = []
529 549 for option, value in options.items():
530 for option in options: 550 try:
531 if option in config: 551 if self._config[option] == value:
532 config[option] = options[option] 552 to_delete.append(option)
533 553 except KeyError:
534 if config[const.OPT_MAX_ITEMS] == "max": 554 raise BadRequest(text=f"Invalid option: {option!r}")
555
556 for option in to_delete:
557 options.remove(option)
558
559 if not options:
560 return
561
562 if options.get(const.OPT_MAX_ITEMS) == "max":
535 # XXX: "max" is default value for config we must convert 563 # XXX: "max" is default value for config we must convert
536 # it to an interger. See backend's _doSetNodeConfiguration comment 564 # it to an interger. See backend's _doSetNodeConfiguration comment
537 config[const.OPT_MAX_ITEMS] = "0" 565 options[const.OPT_MAX_ITEMS] = "0"
538 566
539 d = self.dbpool.runInteraction(self._setConfiguration, config) 567 if ((const.OPT_FTS_LANGUAGE in options
540 d.addCallback(self._setCachedConfiguration, config) 568 and options[const.OPT_FTS_LANGUAGE] not in self.fts_languages)):
569 raise BadRequest(text=
570 f"invalid {const.OPT_FTS_LANGUAGE} value: "
571 f"{options[const.OPT_FTS_LANGUAGE]!r}"
572 )
573
574
575 d = self.dbpool.runInteraction(self._setConfiguration, options)
576 d.addCallback(self._updateCachedConfiguration, options)
541 return d 577 return d
542 578
543 def _setConfiguration(self, cursor, config): 579 def _setConfiguration(self, cursor, options):
544 self._checkNodeExists(cursor) 580 self._configurationTriggers(cursor, self.nodeDbId, self._config, options)
545 self._configurationTriggers(cursor, self.nodeDbId, self._config, config) 581 # options names all follow the scheme "pubsub#{col_name}"
546 cursor.execute("""UPDATE nodes SET persist_items=%s, 582 col_names = (o[7:] for o in options)
547 max_items=%s, 583 values = ','.join(f"{name}=%s" for name in col_names)
548 deliver_payloads=%s, 584 cursor.execute(f"UPDATE nodes SET {values} WHERE node_id=%s",
549 send_last_published_item=%s, 585 (*options.values(), self.nodeDbId))
550 access_model=%s, 586 if cursor.rowcount == 0:
551 publish_model=%s, 587 raise error.NodeNotFound()
552 serial_ids=%s, 588
553 consistent_publisher=%s 589 def _updateCachedConfiguration(self, __, options):
554 WHERE node_id=%s""", 590 self._config.update(options)
555 (config[const.OPT_PERSIST_ITEMS],
556 config[const.OPT_MAX_ITEMS],
557 config[const.OPT_DELIVER_PAYLOADS],
558 config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
559 config[const.OPT_ACCESS_MODEL],
560 config[const.OPT_PUBLISH_MODEL],
561 config[const.OPT_SERIAL_IDS],
562 config[const.OPT_CONSISTENT_PUBLISHER],
563 self.nodeDbId))
564
565 def _setCachedConfiguration(self, void, config):
566 self._config = config
567 591
568 def getSchema(self): 592 def getSchema(self):
569 return self._schema 593 return self._schema
570 594
571 def setSchema(self, schema): 595 def setSchema(self, schema):
578 cursor.execute("""UPDATE nodes SET schema=%s 602 cursor.execute("""UPDATE nodes SET schema=%s
579 WHERE node_id=%s""", 603 WHERE node_id=%s""",
580 (schema.toXml() if schema else None, 604 (schema.toXml() if schema else None,
581 self.nodeDbId)) 605 self.nodeDbId))
582 606
583 def _setCachedSchema(self, void, schema): 607 def _setCachedSchema(self, __, schema):
584 self._schema = schema 608 self._schema = schema
585 609
586 def getMetaData(self): 610 def getMetaData(self):
587 config = copy.copy(self._config) 611 config = copy.copy(self._config)
588 config["pubsub#node_type"] = self.nodeType 612 config["pubsub#node_type"] = self.nodeType
898 # this case should be rare enough to consider this situation acceptable) 922 # this case should be rare enough to consider this situation acceptable)
899 # - if item insertion fail and the item is not new, we do an update 923 # - if item insertion fail and the item is not new, we do an update
900 # - in other cases, exception is raised 924 # - in other cases, exception is raised
901 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config 925 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config
902 data = item.toXml() 926 data = item.toXml()
903 927 data_fts_cfg = self._config[const.OPT_FTS_LANGUAGE]
904 insert_query = """INSERT INTO items (node_id, item, publisher, data, access_model) 928 if data_fts_cfg == const.VAL_FTS_GENERIC:
905 SELECT %s, %s, %s, %s, %s FROM nodes 929 data_fts_cfg = "simple"
906 WHERE node_id=%s 930
907 RETURNING item_id""" 931 insert_query = (
932 "INSERT INTO items(node_id, item, publisher, data, access_model, "
933 "data_fts_cfg) VALUES (%s, %s, %s, %s, %s, %s) RETURNING item_id"
934 )
908 insert_data = [self.nodeDbId, 935 insert_data = [self.nodeDbId,
909 item["id"], 936 item["id"],
910 publisher.full(), 937 publisher.full(),
911 data, 938 data,
912 access_model, 939 access_model,
913 self.nodeDbId] 940 data_fts_cfg,
941 ]
914 942
915 try: 943 try:
916 cursor.execute(insert_query, insert_data) 944 cursor.execute(insert_query, insert_data)
917 except cursor._pool.dbapi.IntegrityError as e: 945 except cursor._pool.dbapi.IntegrityError as e:
918 if e.pgcode != "23505": 946 if e.pgcode != "23505":
939 # but if we have not serial_ids, we have a real problem 967 # but if we have not serial_ids, we have a real problem
940 raise e 968 raise e
941 else: 969 else:
942 # this is an update 970 # this is an update
943 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s 971 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s
944 FROM nodes 972 WHERE node_id=%s AND items.item=%s
945 WHERE nodes.node_id = items.node_id AND
946 nodes.node_id = %s and items.item=%s
947 RETURNING item_id""", 973 RETURNING item_id""",
948 (publisher.full(), 974 (publisher.full(),
949 data, 975 data,
950 self.nodeDbId, 976 self.nodeDbId,
951 item["id"])) 977 item["id"]))
1087 query_filters.append("AND publisher=%s") 1113 query_filters.append("AND publisher=%s")
1088 args.append(filter_.value) 1114 args.append(filter_.value)
1089 else: 1115 else:
1090 query_filters.append("AND publisher LIKE %s") 1116 query_filters.append("AND publisher LIKE %s")
1091 args.append("{}%".format(filter_.value)) 1117 args.append("{}%".format(filter_.value))
1118 elif filter_.var == const.MAM_FILTER_FTS:
1119 fts_cfg = self._config[const.OPT_FTS_LANGUAGE]
1120 if fts_cfg == const.VAL_FTS_GENERIC:
1121 fts_cfg = "simple"
1122 query_filters.append(
1123 "AND data_fts @@ websearch_to_tsquery(%s, %s)"
1124 )
1125 args.append(fts_cfg)
1126 args.append(filter_.value)
1092 elif filter_.var == const.MAM_FILTER_CATEGORY: 1127 elif filter_.var == const.MAM_FILTER_CATEGORY:
1093 query.append("LEFT JOIN item_categories USING (item_id)") 1128 query.append("LEFT JOIN item_categories USING (item_id)")
1094 query_filters.append("AND category=%s") 1129 query_filters.append("AND category=%s")
1095 args.append(filter_.value) 1130 args.append(filter_.value)
1096 else: 1131 else: