Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 422:c21f31355ab9
configuration: "max_items" option:
"max_items" is implemented using a text-single field, as it is done in the XEP-0060
example (there is no real formal description). When changing the node configuration, the
max_items can't be set to a number lower than the total number of items in the node (the
configuration will then be rejected), this is to avoid accidental deletion of items.
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 10 Mar 2020 11:11:38 +0100 |
parents | ccb2a22ea0fc |
children | 3fce48c0a44d |
comparison
equal
deleted
inserted
replaced
421:f124ed5ea78b | 422:c21f31355ab9 |
---|---|
107 | 107 |
108 | 108 |
109 defaultConfig = { | 109 defaultConfig = { |
110 'leaf': { | 110 'leaf': { |
111 const.OPT_PERSIST_ITEMS: True, | 111 const.OPT_PERSIST_ITEMS: True, |
112 const.OPT_MAX_ITEMS: 'max', | |
112 const.OPT_DELIVER_PAYLOADS: True, | 113 const.OPT_DELIVER_PAYLOADS: True, |
113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | 114 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 115 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, | 116 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, |
116 const.OPT_SERIAL_IDS: False, | 117 const.OPT_SERIAL_IDS: False, |
140 logging.error("Can't check schema version: {reason}".format(reason=failure)) | 141 logging.error("Can't check schema version: {reason}".format(reason=failure)) |
141 reactor.stop() | 142 reactor.stop() |
142 | 143 |
143 def _buildNode(self, row): | 144 def _buildNode(self, row): |
144 """Build a note class from database result row""" | 145 """Build a note class from database result row""" |
145 configuration = {} | |
146 | |
147 if not row: | 146 if not row: |
148 raise error.NodeNotFound() | 147 raise error.NodeNotFound() |
149 | 148 |
150 if row[2] == 'leaf': | 149 if row[2] == 'leaf': |
151 configuration = { | 150 configuration = { |
152 'pubsub#persist_items': row[3], | 151 const.OPT_PERSIST_ITEMS: row[3], |
153 'pubsub#deliver_payloads': row[4], | 152 const.OPT_MAX_ITEMS: 'max' if row[4] == 0 else str(row[4]), |
154 'pubsub#send_last_published_item': row[5], | 153 const.OPT_DELIVER_PAYLOADS: row[5], |
155 const.OPT_ACCESS_MODEL:row[6], | 154 const.OPT_SEND_LAST_PUBLISHED_ITEM: row[6], |
156 const.OPT_PUBLISH_MODEL:row[7], | 155 const.OPT_ACCESS_MODEL:row[7], |
157 const.OPT_SERIAL_IDS:row[8], | 156 const.OPT_PUBLISH_MODEL:row[8], |
158 const.OPT_CONSISTENT_PUBLISHER:row[9], | 157 const.OPT_SERIAL_IDS:row[9], |
158 const.OPT_CONSISTENT_PUBLISHER:row[10], | |
159 } | 159 } |
160 schema = row[10] | 160 schema = row[11] |
161 if schema is not None: | 161 if schema is not None: |
162 schema = parseXml(schema) | 162 schema = parseXml(schema) |
163 node = LeafNode(row[0], row[1], configuration, schema) | 163 node = LeafNode(row[0], row[1], configuration, schema) |
164 node.dbpool = self.dbpool | 164 node.dbpool = self.dbpool |
165 return node | 165 return node |
166 elif row[2] == 'collection': | 166 elif row[2] == 'collection': |
167 configuration = { | 167 configuration = { |
168 'pubsub#deliver_payloads': row[4], | 168 const.OPT_DELIVER_PAYLOADS: row[5], |
169 'pubsub#send_last_published_item': row[5], | 169 const.OPT_SEND_LAST_PUBLISHED_ITEM: row[6], |
170 const.OPT_ACCESS_MODEL: row[6], | 170 const.OPT_ACCESS_MODEL: row[7], |
171 const.OPT_PUBLISH_MODEL:row[7], | 171 const.OPT_PUBLISH_MODEL:row[8], |
172 } | 172 } |
173 node = CollectionNode(row[0], row[1], configuration, None) | 173 node = CollectionNode(row[0], row[1], configuration, None) |
174 node.dbpool = self.dbpool | 174 node.dbpool = self.dbpool |
175 return node | 175 return node |
176 else: | 176 else: |
186 def _getNodeById(self, cursor, nodeDbId): | 186 def _getNodeById(self, cursor, nodeDbId): |
187 cursor.execute("""SELECT node_id, | 187 cursor.execute("""SELECT node_id, |
188 node, | 188 node, |
189 node_type, | 189 node_type, |
190 persist_items, | 190 persist_items, |
191 max_items, | |
191 deliver_payloads, | 192 deliver_payloads, |
192 send_last_published_item, | 193 send_last_published_item, |
193 access_model, | 194 access_model, |
194 publish_model, | 195 publish_model, |
195 serial_ids, | 196 serial_ids, |
208 def _getNode(self, cursor, nodeIdentifier, pep, recipient): | 209 def _getNode(self, cursor, nodeIdentifier, pep, recipient): |
209 cursor.execute(*withPEP("""SELECT node_id, | 210 cursor.execute(*withPEP("""SELECT node_id, |
210 node, | 211 node, |
211 node_type, | 212 node_type, |
212 persist_items, | 213 persist_items, |
214 max_items, | |
213 deliver_payloads, | 215 deliver_payloads, |
214 send_last_published_item, | 216 send_last_published_item, |
215 access_model, | 217 access_model, |
216 publish_model, | 218 publish_model, |
217 serial_ids, | 219 serial_ids, |
535 | 537 |
536 def _setConfiguration(self, cursor, config): | 538 def _setConfiguration(self, cursor, config): |
537 self._checkNodeExists(cursor) | 539 self._checkNodeExists(cursor) |
538 self._configurationTriggers(cursor, self.nodeDbId, self._config, config) | 540 self._configurationTriggers(cursor, self.nodeDbId, self._config, config) |
539 cursor.execute("""UPDATE nodes SET persist_items=%s, | 541 cursor.execute("""UPDATE nodes SET persist_items=%s, |
542 max_items=%s, | |
540 deliver_payloads=%s, | 543 deliver_payloads=%s, |
541 send_last_published_item=%s, | 544 send_last_published_item=%s, |
542 access_model=%s, | 545 access_model=%s, |
543 publish_model=%s, | 546 publish_model=%s, |
544 serial_ids=%s, | 547 serial_ids=%s, |
545 consistent_publisher=%s | 548 consistent_publisher=%s |
546 WHERE node_id=%s""", | 549 WHERE node_id=%s""", |
547 (config[const.OPT_PERSIST_ITEMS], | 550 (config[const.OPT_PERSIST_ITEMS], |
551 config[const.OPT_MAX_ITEMS], | |
548 config[const.OPT_DELIVER_PAYLOADS], | 552 config[const.OPT_DELIVER_PAYLOADS], |
549 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], | 553 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], |
550 config[const.OPT_ACCESS_MODEL], | 554 config[const.OPT_ACCESS_MODEL], |
551 config[const.OPT_PUBLISH_MODEL], | 555 config[const.OPT_PUBLISH_MODEL], |
552 config[const.OPT_SERIAL_IDS], | 556 config[const.OPT_SERIAL_IDS], |
949 item_id = cursor.fetchone()[0]; | 953 item_id = cursor.fetchone()[0]; |
950 self._storeCategories(cursor, item_id, item_data.categories) | 954 self._storeCategories(cursor, item_id, item_data.categories) |
951 | 955 |
952 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | 956 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: |
953 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config: | 957 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config: |
954 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value | 958 # XXX: needed to force list if there is only one value |
959 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' | |
955 allowed_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] | 960 allowed_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] |
956 else: | 961 else: |
957 allowed_groups = [] | 962 allowed_groups = [] |
958 for group in allowed_groups: | 963 for group in allowed_groups: |
959 #TODO: check that group are actually in roster | 964 #TODO: check that group are actually in roster |
960 cursor.execute("""INSERT INTO item_groups_authorized (item_id, groupname) | 965 cursor.execute("""INSERT INTO item_groups_authorized (item_id, groupname) |
961 VALUES (%s,%s)""" , (item_id, group)) | 966 VALUES (%s,%s)""" , (item_id, group)) |
962 # TODO: whitelist access model | 967 # TODO: whitelist access model |
968 | |
969 max_items = self._config.get(const.OPT_MAX_ITEMS, 'max') | |
970 | |
971 if max_items != 'max': | |
972 try: | |
973 max_items = int(self._config[const.OPT_MAX_ITEMS]) | |
974 except ValueError: | |
975 log.err(f"Invalid max_items value: {max_items!r}") | |
976 else: | |
977 if max_items > 0: | |
978 # we delete all items above the requested max | |
979 cursor.execute( | |
980 "DELETE FROM items WHERE node_id=%s and item_id in (SELECT " | |
981 "item_id FROM items WHERE node_id=%s ORDER BY items.updated " | |
982 "DESC, items.item_id DESC OFFSET %s)" , | |
983 (self.nodeDbId, self.nodeDbId, max_items) | |
984 ) | |
985 | |
963 | 986 |
964 def _storeCategories(self, cursor, item_id, categories, update=False): | 987 def _storeCategories(self, cursor, item_id, categories, update=False): |
965 # TODO: handle canonical form | 988 # TODO: handle canonical form |
966 if update: | 989 if update: |
967 cursor.execute("""DELETE FROM item_categories | 990 cursor.execute("""DELETE FROM item_categories |