comparison sat_pubsub/pgsql_storage.py @ 422:c21f31355ab9

configuration: "max_items" option: "max_items" is implemented using a text-single field, as it is done in the XEP-0060 example (there is no real formal description). When changing the node configuration, the max_items can't be set to a number lower than the total number of items in the node (the configuration will then be rejected), this is to avoid accidental deletion of items.
author Goffi <goffi@goffi.org>
date Tue, 10 Mar 2020 11:11:38 +0100
parents ccb2a22ea0fc
children 3fce48c0a44d
comparison
equal deleted inserted replaced
421:f124ed5ea78b 422:c21f31355ab9
107 107
108 108
109 defaultConfig = { 109 defaultConfig = {
110 'leaf': { 110 'leaf': {
111 const.OPT_PERSIST_ITEMS: True, 111 const.OPT_PERSIST_ITEMS: True,
112 const.OPT_MAX_ITEMS: 'max',
112 const.OPT_DELIVER_PAYLOADS: True, 113 const.OPT_DELIVER_PAYLOADS: True,
113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', 114 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 115 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, 116 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
116 const.OPT_SERIAL_IDS: False, 117 const.OPT_SERIAL_IDS: False,
140 logging.error("Can't check schema version: {reason}".format(reason=failure)) 141 logging.error("Can't check schema version: {reason}".format(reason=failure))
141 reactor.stop() 142 reactor.stop()
142 143
143 def _buildNode(self, row): 144 def _buildNode(self, row):
144 """Build a note class from database result row""" 145 """Build a note class from database result row"""
145 configuration = {}
146
147 if not row: 146 if not row:
148 raise error.NodeNotFound() 147 raise error.NodeNotFound()
149 148
150 if row[2] == 'leaf': 149 if row[2] == 'leaf':
151 configuration = { 150 configuration = {
152 'pubsub#persist_items': row[3], 151 const.OPT_PERSIST_ITEMS: row[3],
153 'pubsub#deliver_payloads': row[4], 152 const.OPT_MAX_ITEMS: 'max' if row[4] == 0 else str(row[4]),
154 'pubsub#send_last_published_item': row[5], 153 const.OPT_DELIVER_PAYLOADS: row[5],
155 const.OPT_ACCESS_MODEL:row[6], 154 const.OPT_SEND_LAST_PUBLISHED_ITEM: row[6],
156 const.OPT_PUBLISH_MODEL:row[7], 155 const.OPT_ACCESS_MODEL:row[7],
157 const.OPT_SERIAL_IDS:row[8], 156 const.OPT_PUBLISH_MODEL:row[8],
158 const.OPT_CONSISTENT_PUBLISHER:row[9], 157 const.OPT_SERIAL_IDS:row[9],
158 const.OPT_CONSISTENT_PUBLISHER:row[10],
159 } 159 }
160 schema = row[10] 160 schema = row[11]
161 if schema is not None: 161 if schema is not None:
162 schema = parseXml(schema) 162 schema = parseXml(schema)
163 node = LeafNode(row[0], row[1], configuration, schema) 163 node = LeafNode(row[0], row[1], configuration, schema)
164 node.dbpool = self.dbpool 164 node.dbpool = self.dbpool
165 return node 165 return node
166 elif row[2] == 'collection': 166 elif row[2] == 'collection':
167 configuration = { 167 configuration = {
168 'pubsub#deliver_payloads': row[4], 168 const.OPT_DELIVER_PAYLOADS: row[5],
169 'pubsub#send_last_published_item': row[5], 169 const.OPT_SEND_LAST_PUBLISHED_ITEM: row[6],
170 const.OPT_ACCESS_MODEL: row[6], 170 const.OPT_ACCESS_MODEL: row[7],
171 const.OPT_PUBLISH_MODEL:row[7], 171 const.OPT_PUBLISH_MODEL:row[8],
172 } 172 }
173 node = CollectionNode(row[0], row[1], configuration, None) 173 node = CollectionNode(row[0], row[1], configuration, None)
174 node.dbpool = self.dbpool 174 node.dbpool = self.dbpool
175 return node 175 return node
176 else: 176 else:
186 def _getNodeById(self, cursor, nodeDbId): 186 def _getNodeById(self, cursor, nodeDbId):
187 cursor.execute("""SELECT node_id, 187 cursor.execute("""SELECT node_id,
188 node, 188 node,
189 node_type, 189 node_type,
190 persist_items, 190 persist_items,
191 max_items,
191 deliver_payloads, 192 deliver_payloads,
192 send_last_published_item, 193 send_last_published_item,
193 access_model, 194 access_model,
194 publish_model, 195 publish_model,
195 serial_ids, 196 serial_ids,
208 def _getNode(self, cursor, nodeIdentifier, pep, recipient): 209 def _getNode(self, cursor, nodeIdentifier, pep, recipient):
209 cursor.execute(*withPEP("""SELECT node_id, 210 cursor.execute(*withPEP("""SELECT node_id,
210 node, 211 node,
211 node_type, 212 node_type,
212 persist_items, 213 persist_items,
214 max_items,
213 deliver_payloads, 215 deliver_payloads,
214 send_last_published_item, 216 send_last_published_item,
215 access_model, 217 access_model,
216 publish_model, 218 publish_model,
217 serial_ids, 219 serial_ids,
535 537
536 def _setConfiguration(self, cursor, config): 538 def _setConfiguration(self, cursor, config):
537 self._checkNodeExists(cursor) 539 self._checkNodeExists(cursor)
538 self._configurationTriggers(cursor, self.nodeDbId, self._config, config) 540 self._configurationTriggers(cursor, self.nodeDbId, self._config, config)
539 cursor.execute("""UPDATE nodes SET persist_items=%s, 541 cursor.execute("""UPDATE nodes SET persist_items=%s,
542 max_items=%s,
540 deliver_payloads=%s, 543 deliver_payloads=%s,
541 send_last_published_item=%s, 544 send_last_published_item=%s,
542 access_model=%s, 545 access_model=%s,
543 publish_model=%s, 546 publish_model=%s,
544 serial_ids=%s, 547 serial_ids=%s,
545 consistent_publisher=%s 548 consistent_publisher=%s
546 WHERE node_id=%s""", 549 WHERE node_id=%s""",
547 (config[const.OPT_PERSIST_ITEMS], 550 (config[const.OPT_PERSIST_ITEMS],
551 config[const.OPT_MAX_ITEMS],
548 config[const.OPT_DELIVER_PAYLOADS], 552 config[const.OPT_DELIVER_PAYLOADS],
549 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], 553 config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
550 config[const.OPT_ACCESS_MODEL], 554 config[const.OPT_ACCESS_MODEL],
551 config[const.OPT_PUBLISH_MODEL], 555 config[const.OPT_PUBLISH_MODEL],
552 config[const.OPT_SERIAL_IDS], 556 config[const.OPT_SERIAL_IDS],
949 item_id = cursor.fetchone()[0]; 953 item_id = cursor.fetchone()[0];
950 self._storeCategories(cursor, item_id, item_data.categories) 954 self._storeCategories(cursor, item_id, item_data.categories)
951 955
952 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: 956 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
953 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config: 957 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config:
954 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value 958 # XXX: needed to force list if there is only one value
959 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi'
955 allowed_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] 960 allowed_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED]
956 else: 961 else:
957 allowed_groups = [] 962 allowed_groups = []
958 for group in allowed_groups: 963 for group in allowed_groups:
959 #TODO: check that group are actually in roster 964 #TODO: check that group are actually in roster
960 cursor.execute("""INSERT INTO item_groups_authorized (item_id, groupname) 965 cursor.execute("""INSERT INTO item_groups_authorized (item_id, groupname)
961 VALUES (%s,%s)""" , (item_id, group)) 966 VALUES (%s,%s)""" , (item_id, group))
962 # TODO: whitelist access model 967 # TODO: whitelist access model
968
969 max_items = self._config.get(const.OPT_MAX_ITEMS, 'max')
970
971 if max_items != 'max':
972 try:
973 max_items = int(self._config[const.OPT_MAX_ITEMS])
974 except ValueError:
975 log.err(f"Invalid max_items value: {max_items!r}")
976 else:
977 if max_items > 0:
978 # we delete all items above the requested max
979 cursor.execute(
980 "DELETE FROM items WHERE node_id=%s and item_id in (SELECT "
981 "item_id FROM items WHERE node_id=%s ORDER BY items.updated "
982 "DESC, items.item_id DESC OFFSET %s)" ,
983 (self.nodeDbId, self.nodeDbId, max_items)
984 )
985
963 986
964 def _storeCategories(self, cursor, item_id, categories, update=False): 987 def _storeCategories(self, cursor, item_id, categories, update=False):
965 # TODO: handle canonical form 988 # TODO: handle canonical form
966 if update: 989 if update:
967 cursor.execute("""DELETE FROM item_categories 990 cursor.execute("""DELETE FROM item_categories