comparison sat_pubsub/pgsql_storage.py @ 454:7f1394bb96db

psql: fix ordering by using windowed function `row_number`
author Goffi <goffi@goffi.org>
date Mon, 02 Aug 2021 21:56:43 +0200
parents 0e6e176cb572
children b52ebc45b8e3
comparison
equal deleted inserted replaced
453:1a179ad10125 454:7f1394bb96db
466 NATURAL JOIN nodes 466 NATURAL JOIN nodes
467 WHERE nodes.pep IN %s 467 WHERE nodes.pep IN %s
468 AND node IN %s 468 AND node IN %s
469 AND nodes.access_model in %s 469 AND nodes.access_model in %s
470 AND items.access_model in %s 470 AND items.access_model in %s
471 ORDER BY node_id DESC, items.updated DESC""", 471 ORDER BY node_id DESC, items.updated DESC, items.item_id DESC""",
472 (tuple([e.userhost() for e in entities]), 472 (tuple([e.userhost() for e in entities]),
473 nodes, 473 nodes,
474 node_accesses, 474 node_accesses,
475 item_accesses)) 475 item_accesses))
476 d.addCallback(self.formatLastItems) 476 d.addCallback(self.formatLastItems)
888 @param direction (unicode): ORDER BY direction (ASC or DESC) 888 @param direction (unicode): ORDER BY direction (ASC or DESC)
889 @return (unicode): ORDER BY clause to use 889 @return (unicode): ORDER BY clause to use
890 """ 890 """
891 keys = ext_data.get('order_by') 891 keys = ext_data.get('order_by')
892 if not keys: 892 if not keys:
893 return 'ORDER BY updated ' + direction 893 return f"ORDER BY updated {direction}, item_id {direction}"
894 cols_statmnt = [] 894 cols_statmnt = []
895 for key in keys: 895 for key in keys:
896 if key == 'creation': 896 if key == 'creation':
897 column = 'item_id' # could work with items.created too 897 column = 'item_id' # could work with items.created too
898 elif key == 'modification': 898 elif key == 'modification':
899 column = 'updated' 899 column = 'updated'
900 else: 900 else:
901 log.msg("WARNING: Unknown order by key: {key}".format(key=key)) 901 log.msg("WARNING: Unknown order by key: {key}".format(key=key))
902 column = 'updated' 902 column = 'updated'
903 cols_statmnt.append(column + ' ' + direction) 903 cols_statmnt.append(f"{column} {direction}")
904 904
905 if len(cols_statmnt) == 1 and column != "item_id":
906 cols_statmnt.append(f"item_id {direction}")
905 return "ORDER BY " + ",".join([col for col in cols_statmnt]) 907 return "ORDER BY " + ",".join([col for col in cols_statmnt])
906 908
907 @defer.inlineCallbacks 909 @defer.inlineCallbacks
908 def storeItems(self, items_data, publisher): 910 def storeItems(self, items_data, publisher):
909 # XXX: runInteraction doesn't seem to work when there are several "insert" 911 # XXX: runInteraction doesn't seem to work when there are several "insert"
1085 1087
1086 arguments query, args, authorized_groups, unrestricted and ext_data are the same as for 1088 arguments query, args, authorized_groups, unrestricted and ext_data are the same as for
1087 _getItems 1089 _getItems
1088 """ 1090 """
1089 # SOURCES 1091 # SOURCES
1090 query.append("FROM nodes INNER JOIN items USING (node_id)") 1092 query.append("FROM items")
1091 1093
1092 if unrestricted: 1094 if unrestricted:
1093 query_filters = ["WHERE node_id=%s"] 1095 query_filters = ["WHERE node_id=%s"]
1094 args.append(self.nodeDbId) 1096 args.append(self.nodeDbId)
1095 else: 1097 else:
1153 query = ["SELECT item"] 1155 query = ["SELECT item"]
1154 else: 1156 else:
1155 query = ["SELECT data::text,items.access_model,item_id,created,updated"] 1157 query = ["SELECT data::text,items.access_model,item_id,created,updated"]
1156 1158
1157 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) 1159 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
1158 if query_order.startswith("ORDER BY updated"):
1159 ref_field = "updated"
1160 else:
1161 ref_field = "item_id"
1162 1160
1163 if 'rsm' in ext_data: 1161 if 'rsm' in ext_data:
1164 rsm = ext_data['rsm'] 1162 rsm = ext_data['rsm']
1165 maxItems = rsm.max 1163 maxItems = rsm.max
1166 if rsm.index is not None: 1164 if rsm.index is not None:
1178 # now that we have the id, we can use it 1176 # now that we have the id, we can use it
1179 query.append("AND item_id<=%s") 1177 query.append("AND item_id<=%s")
1180 args.append(item_id) 1178 args.append(item_id)
1181 elif rsm.before is not None: 1179 elif rsm.before is not None:
1182 if rsm.before != '': 1180 if rsm.before != '':
1181 query.insert(0,"SELECT * from (")
1182 query[1] += f",row_number() OVER ({query_order}) as row_number"
1183 query.append(f"{query_order}) as x")
1183 query.append( 1184 query.append(
1184 f"AND {ref_field}<(SELECT {ref_field} FROM items WHERE " 1185 f"WHERE row_number<(SELECT row_number FROM (SELECT row_number() OVER ({query_order}) as row_number,item FROM items WHERE node_id=%s) as row_num_sub WHERE item=%s)"
1185 "node_id=%s AND item=%s LIMIT 1)"
1186 ) 1186 )
1187 args.extend((self.nodeDbId, rsm.before)) 1187 args.extend((self.nodeDbId, rsm.before))
1188 # we need to reverse order in a first query to get the right 1188 # we need to reverse order in a first query to get the right
1189 # items 1189 # items
1190 query.insert(0,"SELECT * from (") 1190 query.insert(0,"SELECT * from (")
1198 ) 1198 )
1199 args.append(10) 1199 args.append(10)
1200 else: 1200 else:
1201 args.append(maxItems) 1201 args.append(maxItems)
1202 elif rsm.after: 1202 elif rsm.after:
1203 query.insert(0,"SELECT * from (")
1204 query[1] += f",row_number() OVER ({query_order}) as row_number"
1205 query.append(f"{query_order}) as x")
1203 query.append( 1206 query.append(
1204 f"AND {ref_field}>(SELECT {ref_field} FROM items WHERE node_id=%s " 1207 f"WHERE row_number>(SELECT row_number FROM (SELECT row_number() OVER ({query_order}) as row_number,item FROM items WHERE node_id=%s) as row_num_sub WHERE item=%s)"
1205 "AND item=%s LIMIT 1)"
1206 ) 1208 )
1207 args.extend((self.nodeDbId, rsm.after)) 1209 args.extend((self.nodeDbId, rsm.after))
1208 else: 1210 else:
1209 rsm = False 1211 rsm = False
1210 1212