Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 430:5a0ada3b61ca
Full-Text Search implementation:
/!\ pgsql schema needs to be updated /!\
/!\ Minimal PostgreSQL required version is now 12 /!\
A new options is available to specify main language of a node. By default a `generic`
language is used (which uses the `simple` configuration in PostgreSQL). When a node owner
changes the language, the index is rebuilt accordingly. It is possible to have item
specific language for multilingual nodes (but for the moment the search is done with node
language, so the results won't be good). If an item language is explicitely set in
`item_languages`, the FTS configuration won't be affected by node FTS language setting.
Search is parsed with `websearch_to_tsquery` for now, but this parser doesn't handle
prefix matching, so it may be replaced in the future.
SetConfiguration now only updates the modified values, this avoid triggering the FTS
re-indexing on each config change. `_checkNodeExists` is not called anymore as we can
check if a row has been modified to see if the node exists, this avoid a useless query.
Item storing has been slighly improved with a useless SELECT and condition removed.
To avoid 2 schema updates in a row, the `sat_pubsub_update_5_6.sql` file also prepares the
implementation of XEP-0346 by updating nodes with a schema and creating the suitable
template nodes.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 11 Dec 2020 17:18:52 +0100 |
parents | 0526073ff2ab |
children | 920440200570 |
comparison
equal
deleted
inserted
replaced
429:0526073ff2ab | 430:5a0ada3b61ca |
---|---|
77 | 77 |
78 # parseXml manage str, but we get unicode | 78 # parseXml manage str, but we get unicode |
79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) | 79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) |
80 ITEMS_SEQ_NAME = 'node_{node_id}_seq' | 80 ITEMS_SEQ_NAME = 'node_{node_id}_seq' |
81 PEP_COL_NAME = 'pep' | 81 PEP_COL_NAME = 'pep' |
82 CURRENT_VERSION = '5' | 82 CURRENT_VERSION = '6' |
83 # retrieve the maximum integer item id + 1 | 83 # retrieve the maximum integer item id + 1 |
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" | 84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" |
85 | 85 |
86 | 86 |
87 def withPEP(query, values, pep, recipient): | 87 def withPEP(query, values, pep, recipient): |
103 | 103 |
104 | 104 |
105 @implementer(iidavoll.IStorage) | 105 @implementer(iidavoll.IStorage) |
106 class Storage: | 106 class Storage: |
107 | 107 |
108 | 108 fts_languages = ['generic'] |
109 defaultConfig = { | 109 defaultConfig = { |
110 'leaf': { | 110 'leaf': { |
111 const.OPT_PERSIST_ITEMS: True, | 111 const.OPT_PERSIST_ITEMS: True, |
112 const.OPT_MAX_ITEMS: 'max', | 112 const.OPT_MAX_ITEMS: 'max', |
113 const.OPT_DELIVER_PAYLOADS: True, | 113 const.OPT_DELIVER_PAYLOADS: True, |
114 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | 114 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
115 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 115 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
116 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, | 116 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, |
117 const.OPT_SERIAL_IDS: False, | 117 const.OPT_SERIAL_IDS: False, |
118 const.OPT_CONSISTENT_PUBLISHER: False, | 118 const.OPT_CONSISTENT_PUBLISHER: False, |
119 const.OPT_FTS_LANGUAGE: const.VAL_FTS_GENERIC, | |
119 }, | 120 }, |
120 'collection': { | 121 'collection': { |
121 const.OPT_DELIVER_PAYLOADS: True, | 122 const.OPT_DELIVER_PAYLOADS: True, |
122 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | 123 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
123 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 124 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
154 const.OPT_SEND_LAST_PUBLISHED_ITEM: row[6], | 155 const.OPT_SEND_LAST_PUBLISHED_ITEM: row[6], |
155 const.OPT_ACCESS_MODEL:row[7], | 156 const.OPT_ACCESS_MODEL:row[7], |
156 const.OPT_PUBLISH_MODEL:row[8], | 157 const.OPT_PUBLISH_MODEL:row[8], |
157 const.OPT_SERIAL_IDS:row[9], | 158 const.OPT_SERIAL_IDS:row[9], |
158 const.OPT_CONSISTENT_PUBLISHER:row[10], | 159 const.OPT_CONSISTENT_PUBLISHER:row[10], |
160 const.OPT_FTS_LANGUAGE: row[11], | |
159 } | 161 } |
160 schema = row[11] | 162 schema = row[12] |
161 if schema is not None: | 163 if schema is not None: |
162 schema = parseXml(schema) | 164 schema = parseXml(schema) |
163 node = LeafNode(row[0], row[1], configuration, schema) | 165 node = LeafNode(row[0], row[1], configuration, schema) |
164 node.dbpool = self.dbpool | 166 node.dbpool = self.dbpool |
165 return node | 167 return node |
174 node.dbpool = self.dbpool | 176 node.dbpool = self.dbpool |
175 return node | 177 return node |
176 else: | 178 else: |
177 raise BadRequest(text="Unknown node type !") | 179 raise BadRequest(text="Unknown node type !") |
178 | 180 |
181 def getFTSLanguages(self): | |
182 """Get list of available languages for full text search""" | |
183 return self.dbpool.runInteraction(self._getFTSLanguages) | |
184 | |
185 def _getFTSLanguages(self, cursor): | |
186 cursor.execute("SELECT cfgname FROM pg_ts_config") | |
187 result = [r.cfgname for r in cursor.fetchall()] | |
188 result.remove("simple") | |
189 result.insert(0, "generic") | |
190 Node.fts_languages = self.fts_languages = result | |
191 return result | |
192 | |
179 def getNodeById(self, nodeDbId): | 193 def getNodeById(self, nodeDbId): |
180 """Get node using database ID insted of pubsub identifier | 194 """Get node using database ID insted of pubsub identifier |
181 | 195 |
182 @param nodeDbId(unicode): database ID | 196 @param nodeDbId(unicode): database ID |
183 """ | 197 """ |
193 send_last_published_item, | 207 send_last_published_item, |
194 access_model, | 208 access_model, |
195 publish_model, | 209 publish_model, |
196 serial_ids, | 210 serial_ids, |
197 consistent_publisher, | 211 consistent_publisher, |
212 fts_language, | |
198 schema::text, | 213 schema::text, |
199 pep | 214 pep |
200 FROM nodes | 215 FROM nodes |
201 WHERE node_id=%s""", | 216 WHERE node_id=%s""", |
202 (nodeDbId,)) | 217 (nodeDbId,)) |
216 send_last_published_item, | 231 send_last_published_item, |
217 access_model, | 232 access_model, |
218 publish_model, | 233 publish_model, |
219 serial_ids, | 234 serial_ids, |
220 consistent_publisher, | 235 consistent_publisher, |
236 fts_language, | |
221 schema::text, | 237 schema::text, |
222 pep | 238 pep |
223 FROM nodes | 239 FROM nodes |
224 WHERE node=%s""", | 240 WHERE node=%s""", |
225 (nodeIdentifier,), pep, recipient)) | 241 (nodeIdentifier,), pep, recipient)) |
270 send_last_published_item, | 286 send_last_published_item, |
271 access_model, | 287 access_model, |
272 publish_model, | 288 publish_model, |
273 serial_ids, | 289 serial_ids, |
274 consistent_publisher, | 290 consistent_publisher, |
291 fts_language, | |
275 schema, | 292 schema, |
276 pep) | 293 pep) |
277 VALUES | 294 VALUES |
278 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s)""", | 295 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", |
279 (nodeIdentifier, | 296 (nodeIdentifier, |
280 config['pubsub#persist_items'], | 297 config['pubsub#persist_items'], |
281 config['pubsub#deliver_payloads'], | 298 config['pubsub#deliver_payloads'], |
282 config['pubsub#send_last_published_item'], | 299 config['pubsub#send_last_published_item'], |
283 config[const.OPT_ACCESS_MODEL], | 300 config[const.OPT_ACCESS_MODEL], |
284 config[const.OPT_PUBLISH_MODEL], | 301 config[const.OPT_PUBLISH_MODEL], |
285 config[const.OPT_SERIAL_IDS], | 302 config[const.OPT_SERIAL_IDS], |
286 config[const.OPT_CONSISTENT_PUBLISHER], | 303 config[const.OPT_CONSISTENT_PUBLISHER], |
304 config[const.OPT_FTS_LANGUAGE], | |
287 schema, | 305 schema, |
288 recipient.userhost() if pep else None | 306 recipient.userhost() if pep else None |
289 ) | 307 ) |
290 ) | 308 ) |
291 except cursor._pool.dbapi.IntegrityError as e: | 309 except cursor._pool.dbapi.IntegrityError as e: |
502 @param cursor(): current db cursor | 520 @param cursor(): current db cursor |
503 @param node_id(unicode): database ID of the node | 521 @param node_id(unicode): database ID of the node |
504 @param old_config(dict): config of the node before the change | 522 @param old_config(dict): config of the node before the change |
505 @param new_config(dict): new options that will be changed | 523 @param new_config(dict): new options that will be changed |
506 """ | 524 """ |
525 if const.OPT_SERIAL_IDS not in new_config: | |
526 return | |
507 serial_ids = new_config[const.OPT_SERIAL_IDS] | 527 serial_ids = new_config[const.OPT_SERIAL_IDS] |
508 if serial_ids != old_config[const.OPT_SERIAL_IDS]: | 528 if serial_ids != old_config[const.OPT_SERIAL_IDS]: |
509 # serial_ids option has been modified, | 529 # serial_ids option has been modified, |
510 # we need to handle corresponding sequence | 530 # we need to handle corresponding sequence |
511 | 531 |
523 next_val = next_val)) | 543 next_val = next_val)) |
524 else: | 544 else: |
525 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name)) | 545 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name)) |
526 | 546 |
527 def setConfiguration(self, options): | 547 def setConfiguration(self, options): |
528 config = copy.copy(self._config) | 548 to_delete = [] |
529 | 549 for option, value in options.items(): |
530 for option in options: | 550 try: |
531 if option in config: | 551 if self._config[option] == value: |
532 config[option] = options[option] | 552 to_delete.append(option) |
533 | 553 except KeyError: |
534 if config[const.OPT_MAX_ITEMS] == "max": | 554 raise BadRequest(text=f"Invalid option: {option!r}") |
555 | |
556 for option in to_delete: | |
557 options.remove(option) | |
558 | |
559 if not options: | |
560 return | |
561 | |
562 if options.get(const.OPT_MAX_ITEMS) == "max": | |
535 # XXX: "max" is default value for config we must convert | 563 # XXX: "max" is default value for config we must convert |
536 # it to an interger. See backend's _doSetNodeConfiguration comment | 564 # it to an interger. See backend's _doSetNodeConfiguration comment |
537 config[const.OPT_MAX_ITEMS] = "0" | 565 options[const.OPT_MAX_ITEMS] = "0" |
538 | 566 |
539 d = self.dbpool.runInteraction(self._setConfiguration, config) | 567 if ((const.OPT_FTS_LANGUAGE in options |
540 d.addCallback(self._setCachedConfiguration, config) | 568 and options[const.OPT_FTS_LANGUAGE] not in self.fts_languages)): |
569 raise BadRequest(text= | |
570 f"invalid {const.OPT_FTS_LANGUAGE} value: " | |
571 f"{options[const.OPT_FTS_LANGUAGE]!r}" | |
572 ) | |
573 | |
574 | |
575 d = self.dbpool.runInteraction(self._setConfiguration, options) | |
576 d.addCallback(self._updateCachedConfiguration, options) | |
541 return d | 577 return d |
542 | 578 |
543 def _setConfiguration(self, cursor, config): | 579 def _setConfiguration(self, cursor, options): |
544 self._checkNodeExists(cursor) | 580 self._configurationTriggers(cursor, self.nodeDbId, self._config, options) |
545 self._configurationTriggers(cursor, self.nodeDbId, self._config, config) | 581 # options names all follow the scheme "pubsub#{col_name}" |
546 cursor.execute("""UPDATE nodes SET persist_items=%s, | 582 col_names = (o[7:] for o in options) |
547 max_items=%s, | 583 values = ','.join(f"{name}=%s" for name in col_names) |
548 deliver_payloads=%s, | 584 cursor.execute(f"UPDATE nodes SET {values} WHERE node_id=%s", |
549 send_last_published_item=%s, | 585 (*options.values(), self.nodeDbId)) |
550 access_model=%s, | 586 if cursor.rowcount == 0: |
551 publish_model=%s, | 587 raise error.NodeNotFound() |
552 serial_ids=%s, | 588 |
553 consistent_publisher=%s | 589 def _updateCachedConfiguration(self, __, options): |
554 WHERE node_id=%s""", | 590 self._config.update(options) |
555 (config[const.OPT_PERSIST_ITEMS], | |
556 config[const.OPT_MAX_ITEMS], | |
557 config[const.OPT_DELIVER_PAYLOADS], | |
558 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], | |
559 config[const.OPT_ACCESS_MODEL], | |
560 config[const.OPT_PUBLISH_MODEL], | |
561 config[const.OPT_SERIAL_IDS], | |
562 config[const.OPT_CONSISTENT_PUBLISHER], | |
563 self.nodeDbId)) | |
564 | |
565 def _setCachedConfiguration(self, void, config): | |
566 self._config = config | |
567 | 591 |
568 def getSchema(self): | 592 def getSchema(self): |
569 return self._schema | 593 return self._schema |
570 | 594 |
571 def setSchema(self, schema): | 595 def setSchema(self, schema): |
578 cursor.execute("""UPDATE nodes SET schema=%s | 602 cursor.execute("""UPDATE nodes SET schema=%s |
579 WHERE node_id=%s""", | 603 WHERE node_id=%s""", |
580 (schema.toXml() if schema else None, | 604 (schema.toXml() if schema else None, |
581 self.nodeDbId)) | 605 self.nodeDbId)) |
582 | 606 |
583 def _setCachedSchema(self, void, schema): | 607 def _setCachedSchema(self, __, schema): |
584 self._schema = schema | 608 self._schema = schema |
585 | 609 |
586 def getMetaData(self): | 610 def getMetaData(self): |
587 config = copy.copy(self._config) | 611 config = copy.copy(self._config) |
588 config["pubsub#node_type"] = self.nodeType | 612 config["pubsub#node_type"] = self.nodeType |
898 # this case should be rare enough to consider this situation acceptable) | 922 # this case should be rare enough to consider this situation acceptable) |
899 # - if item insertion fail and the item is not new, we do an update | 923 # - if item insertion fail and the item is not new, we do an update |
900 # - in other cases, exception is raised | 924 # - in other cases, exception is raised |
901 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config | 925 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config |
902 data = item.toXml() | 926 data = item.toXml() |
903 | 927 data_fts_cfg = self._config[const.OPT_FTS_LANGUAGE] |
904 insert_query = """INSERT INTO items (node_id, item, publisher, data, access_model) | 928 if data_fts_cfg == const.VAL_FTS_GENERIC: |
905 SELECT %s, %s, %s, %s, %s FROM nodes | 929 data_fts_cfg = "simple" |
906 WHERE node_id=%s | 930 |
907 RETURNING item_id""" | 931 insert_query = ( |
932 "INSERT INTO items(node_id, item, publisher, data, access_model, " | |
933 "data_fts_cfg) VALUES (%s, %s, %s, %s, %s, %s) RETURNING item_id" | |
934 ) | |
908 insert_data = [self.nodeDbId, | 935 insert_data = [self.nodeDbId, |
909 item["id"], | 936 item["id"], |
910 publisher.full(), | 937 publisher.full(), |
911 data, | 938 data, |
912 access_model, | 939 access_model, |
913 self.nodeDbId] | 940 data_fts_cfg, |
941 ] | |
914 | 942 |
915 try: | 943 try: |
916 cursor.execute(insert_query, insert_data) | 944 cursor.execute(insert_query, insert_data) |
917 except cursor._pool.dbapi.IntegrityError as e: | 945 except cursor._pool.dbapi.IntegrityError as e: |
918 if e.pgcode != "23505": | 946 if e.pgcode != "23505": |
939 # but if we have not serial_ids, we have a real problem | 967 # but if we have not serial_ids, we have a real problem |
940 raise e | 968 raise e |
941 else: | 969 else: |
942 # this is an update | 970 # this is an update |
943 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s | 971 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s |
944 FROM nodes | 972 WHERE node_id=%s AND items.item=%s |
945 WHERE nodes.node_id = items.node_id AND | |
946 nodes.node_id = %s and items.item=%s | |
947 RETURNING item_id""", | 973 RETURNING item_id""", |
948 (publisher.full(), | 974 (publisher.full(), |
949 data, | 975 data, |
950 self.nodeDbId, | 976 self.nodeDbId, |
951 item["id"])) | 977 item["id"])) |
1087 query_filters.append("AND publisher=%s") | 1113 query_filters.append("AND publisher=%s") |
1088 args.append(filter_.value) | 1114 args.append(filter_.value) |
1089 else: | 1115 else: |
1090 query_filters.append("AND publisher LIKE %s") | 1116 query_filters.append("AND publisher LIKE %s") |
1091 args.append("{}%".format(filter_.value)) | 1117 args.append("{}%".format(filter_.value)) |
1118 elif filter_.var == const.MAM_FILTER_FTS: | |
1119 fts_cfg = self._config[const.OPT_FTS_LANGUAGE] | |
1120 if fts_cfg == const.VAL_FTS_GENERIC: | |
1121 fts_cfg = "simple" | |
1122 query_filters.append( | |
1123 "AND data_fts @@ websearch_to_tsquery(%s, %s)" | |
1124 ) | |
1125 args.append(fts_cfg) | |
1126 args.append(filter_.value) | |
1092 elif filter_.var == const.MAM_FILTER_CATEGORY: | 1127 elif filter_.var == const.MAM_FILTER_CATEGORY: |
1093 query.append("LEFT JOIN item_categories USING (item_id)") | 1128 query.append("LEFT JOIN item_categories USING (item_id)") |
1094 query_filters.append("AND category=%s") | 1129 query_filters.append("AND category=%s") |
1095 args.append(filter_.value) | 1130 args.append(filter_.value) |
1096 else: | 1131 else: |