Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 454:7f1394bb96db
psql: fix ordering by using windowed function `row_number`
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Aug 2021 21:56:43 +0200 |
parents | 0e6e176cb572 |
children | b52ebc45b8e3 |
comparison
equal
deleted
inserted
replaced
453:1a179ad10125 | 454:7f1394bb96db |
---|---|
466 NATURAL JOIN nodes | 466 NATURAL JOIN nodes |
467 WHERE nodes.pep IN %s | 467 WHERE nodes.pep IN %s |
468 AND node IN %s | 468 AND node IN %s |
469 AND nodes.access_model in %s | 469 AND nodes.access_model in %s |
470 AND items.access_model in %s | 470 AND items.access_model in %s |
471 ORDER BY node_id DESC, items.updated DESC""", | 471 ORDER BY node_id DESC, items.updated DESC, items.item_id DESC""", |
472 (tuple([e.userhost() for e in entities]), | 472 (tuple([e.userhost() for e in entities]), |
473 nodes, | 473 nodes, |
474 node_accesses, | 474 node_accesses, |
475 item_accesses)) | 475 item_accesses)) |
476 d.addCallback(self.formatLastItems) | 476 d.addCallback(self.formatLastItems) |
888 @param direction (unicode): ORDER BY direction (ASC or DESC) | 888 @param direction (unicode): ORDER BY direction (ASC or DESC) |
889 @return (unicode): ORDER BY clause to use | 889 @return (unicode): ORDER BY clause to use |
890 """ | 890 """ |
891 keys = ext_data.get('order_by') | 891 keys = ext_data.get('order_by') |
892 if not keys: | 892 if not keys: |
893 return 'ORDER BY updated ' + direction | 893 return f"ORDER BY updated {direction}, item_id {direction}" |
894 cols_statmnt = [] | 894 cols_statmnt = [] |
895 for key in keys: | 895 for key in keys: |
896 if key == 'creation': | 896 if key == 'creation': |
897 column = 'item_id' # could work with items.created too | 897 column = 'item_id' # could work with items.created too |
898 elif key == 'modification': | 898 elif key == 'modification': |
899 column = 'updated' | 899 column = 'updated' |
900 else: | 900 else: |
901 log.msg("WARNING: Unknown order by key: {key}".format(key=key)) | 901 log.msg("WARNING: Unknown order by key: {key}".format(key=key)) |
902 column = 'updated' | 902 column = 'updated' |
903 cols_statmnt.append(column + ' ' + direction) | 903 cols_statmnt.append(f"{column} {direction}") |
904 | 904 |
905 if len(cols_statmnt) == 1 and column != "item_id": | |
906 cols_statmnt.append(f"item_id {direction}") | |
905 return "ORDER BY " + ",".join([col for col in cols_statmnt]) | 907 return "ORDER BY " + ",".join([col for col in cols_statmnt]) |
906 | 908 |
907 @defer.inlineCallbacks | 909 @defer.inlineCallbacks |
908 def storeItems(self, items_data, publisher): | 910 def storeItems(self, items_data, publisher): |
909 # XXX: runInteraction doesn't seem to work when there are several "insert" | 911 # XXX: runInteraction doesn't seem to work when there are several "insert" |
1085 | 1087 |
1086 arguments query, args, authorized_groups, unrestricted and ext_data are the same as for | 1088 arguments query, args, authorized_groups, unrestricted and ext_data are the same as for |
1087 _getItems | 1089 _getItems |
1088 """ | 1090 """ |
1089 # SOURCES | 1091 # SOURCES |
1090 query.append("FROM nodes INNER JOIN items USING (node_id)") | 1092 query.append("FROM items") |
1091 | 1093 |
1092 if unrestricted: | 1094 if unrestricted: |
1093 query_filters = ["WHERE node_id=%s"] | 1095 query_filters = ["WHERE node_id=%s"] |
1094 args.append(self.nodeDbId) | 1096 args.append(self.nodeDbId) |
1095 else: | 1097 else: |
1153 query = ["SELECT item"] | 1155 query = ["SELECT item"] |
1154 else: | 1156 else: |
1155 query = ["SELECT data::text,items.access_model,item_id,created,updated"] | 1157 query = ["SELECT data::text,items.access_model,item_id,created,updated"] |
1156 | 1158 |
1157 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) | 1159 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) |
1158 if query_order.startswith("ORDER BY updated"): | |
1159 ref_field = "updated" | |
1160 else: | |
1161 ref_field = "item_id" | |
1162 | 1160 |
1163 if 'rsm' in ext_data: | 1161 if 'rsm' in ext_data: |
1164 rsm = ext_data['rsm'] | 1162 rsm = ext_data['rsm'] |
1165 maxItems = rsm.max | 1163 maxItems = rsm.max |
1166 if rsm.index is not None: | 1164 if rsm.index is not None: |
1178 # now that we have the id, we can use it | 1176 # now that we have the id, we can use it |
1179 query.append("AND item_id<=%s") | 1177 query.append("AND item_id<=%s") |
1180 args.append(item_id) | 1178 args.append(item_id) |
1181 elif rsm.before is not None: | 1179 elif rsm.before is not None: |
1182 if rsm.before != '': | 1180 if rsm.before != '': |
1181 query.insert(0,"SELECT * from (") | |
1182 query[1] += f",row_number() OVER ({query_order}) as row_number" | |
1183 query.append(f"{query_order}) as x") | |
1183 query.append( | 1184 query.append( |
1184 f"AND {ref_field}<(SELECT {ref_field} FROM items WHERE " | 1185 f"WHERE row_number<(SELECT row_number FROM (SELECT row_number() OVER ({query_order}) as row_number,item FROM items WHERE node_id=%s) as row_num_sub WHERE item=%s)" |
1185 "node_id=%s AND item=%s LIMIT 1)" | |
1186 ) | 1186 ) |
1187 args.extend((self.nodeDbId, rsm.before)) | 1187 args.extend((self.nodeDbId, rsm.before)) |
1188 # we need to reverse order in a first query to get the right | 1188 # we need to reverse order in a first query to get the right |
1189 # items | 1189 # items |
1190 query.insert(0,"SELECT * from (") | 1190 query.insert(0,"SELECT * from (") |
1198 ) | 1198 ) |
1199 args.append(10) | 1199 args.append(10) |
1200 else: | 1200 else: |
1201 args.append(maxItems) | 1201 args.append(maxItems) |
1202 elif rsm.after: | 1202 elif rsm.after: |
1203 query.insert(0,"SELECT * from (") | |
1204 query[1] += f",row_number() OVER ({query_order}) as row_number" | |
1205 query.append(f"{query_order}) as x") | |
1203 query.append( | 1206 query.append( |
1204 f"AND {ref_field}>(SELECT {ref_field} FROM items WHERE node_id=%s " | 1207 f"WHERE row_number>(SELECT row_number FROM (SELECT row_number() OVER ({query_order}) as row_number,item FROM items WHERE node_id=%s) as row_num_sub WHERE item=%s)" |
1205 "AND item=%s LIMIT 1)" | |
1206 ) | 1208 ) |
1207 args.extend((self.nodeDbId, rsm.after)) | 1209 args.extend((self.nodeDbId, rsm.after)) |
1208 else: | 1210 else: |
1209 rsm = False | 1211 rsm = False |
1210 | 1212 |