Mercurial > libervia-pubsub
comparison src/pgsql_storage.py @ 403:1dc606612405
implemented experimental "consistent_publisher" option:
/!\ pgsql schema needs to be updated /!\
New "consistent_publisher" option has been implemented to allow node owners + admins to
modify an item while preserving the original publisher. This way, original publisher can
still edit the item.
In addition to `consistent_publisher`, `max_items` has been added to PGQSL schema to
prepare for future implementation.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 12 Jun 2019 21:51:50 +0200 |
parents | 724e39d596a9 |
children |
comparison
equal
deleted
inserted
replaced
402:724e39d596a9 | 403:1dc606612405 |
---|---|
77 | 77 |
78 # parseXml manage str, but we get unicode | 78 # parseXml manage str, but we get unicode |
79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) | 79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) |
80 ITEMS_SEQ_NAME = u'node_{node_id}_seq' | 80 ITEMS_SEQ_NAME = u'node_{node_id}_seq' |
81 PEP_COL_NAME = 'pep' | 81 PEP_COL_NAME = 'pep' |
82 CURRENT_VERSION = '4' | 82 CURRENT_VERSION = '5' |
83 # retrieve the maximum integer item id + 1 | 83 # retrieve the maximum integer item id + 1 |
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" | 84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" |
85 | 85 |
86 | 86 |
87 def withPEP(query, values, pep, recipient): | 87 def withPEP(query, values, pep, recipient): |
112 const.OPT_DELIVER_PAYLOADS: True, | 112 const.OPT_DELIVER_PAYLOADS: True, |
113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | 113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, | 115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, |
116 const.OPT_SERIAL_IDS: False, | 116 const.OPT_SERIAL_IDS: False, |
117 const.OPT_CONSISTENT_PUBLISHER: False, | |
117 }, | 118 }, |
118 'collection': { | 119 'collection': { |
119 const.OPT_DELIVER_PAYLOADS: True, | 120 const.OPT_DELIVER_PAYLOADS: True, |
120 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | 121 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
121 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 122 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
152 'pubsub#deliver_payloads': row[4], | 153 'pubsub#deliver_payloads': row[4], |
153 'pubsub#send_last_published_item': row[5], | 154 'pubsub#send_last_published_item': row[5], |
154 const.OPT_ACCESS_MODEL:row[6], | 155 const.OPT_ACCESS_MODEL:row[6], |
155 const.OPT_PUBLISH_MODEL:row[7], | 156 const.OPT_PUBLISH_MODEL:row[7], |
156 const.OPT_SERIAL_IDS:row[8], | 157 const.OPT_SERIAL_IDS:row[8], |
158 const.OPT_CONSISTENT_PUBLISHER:row[9], | |
157 } | 159 } |
158 schema = row[9] | 160 schema = row[10] |
159 if schema is not None: | 161 if schema is not None: |
160 schema = parseXml(schema) | 162 schema = parseXml(schema) |
161 node = LeafNode(row[0], row[1], configuration, schema) | 163 node = LeafNode(row[0], row[1], configuration, schema) |
162 node.dbpool = self.dbpool | 164 node.dbpool = self.dbpool |
163 return node | 165 return node |
189 deliver_payloads, | 191 deliver_payloads, |
190 send_last_published_item, | 192 send_last_published_item, |
191 access_model, | 193 access_model, |
192 publish_model, | 194 publish_model, |
193 serial_ids, | 195 serial_ids, |
196 consistent_publisher, | |
194 schema::text, | 197 schema::text, |
195 pep | 198 pep |
196 FROM nodes | 199 FROM nodes |
197 WHERE node_id=%s""", | 200 WHERE node_id=%s""", |
198 (nodeDbId,)) | 201 (nodeDbId,)) |
210 deliver_payloads, | 213 deliver_payloads, |
211 send_last_published_item, | 214 send_last_published_item, |
212 access_model, | 215 access_model, |
213 publish_model, | 216 publish_model, |
214 serial_ids, | 217 serial_ids, |
218 consistent_publisher, | |
215 schema::text, | 219 schema::text, |
216 pep | 220 pep |
217 FROM nodes | 221 FROM nodes |
218 WHERE node=%s""", | 222 WHERE node=%s""", |
219 (nodeIdentifier,), pep, recipient)) | 223 (nodeIdentifier,), pep, recipient)) |
263 deliver_payloads, | 267 deliver_payloads, |
264 send_last_published_item, | 268 send_last_published_item, |
265 access_model, | 269 access_model, |
266 publish_model, | 270 publish_model, |
267 serial_ids, | 271 serial_ids, |
272 consistent_publisher, | |
268 schema, | 273 schema, |
269 pep) | 274 pep) |
270 VALUES | 275 VALUES |
271 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s)""", | 276 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s)""", |
272 (nodeIdentifier, | 277 (nodeIdentifier, |
273 config['pubsub#persist_items'], | 278 config['pubsub#persist_items'], |
274 config['pubsub#deliver_payloads'], | 279 config['pubsub#deliver_payloads'], |
275 config['pubsub#send_last_published_item'], | 280 config['pubsub#send_last_published_item'], |
276 config[const.OPT_ACCESS_MODEL], | 281 config[const.OPT_ACCESS_MODEL], |
277 config[const.OPT_PUBLISH_MODEL], | 282 config[const.OPT_PUBLISH_MODEL], |
278 config[const.OPT_SERIAL_IDS], | 283 config[const.OPT_SERIAL_IDS], |
284 config[const.OPT_CONSISTENT_PUBLISHER], | |
279 schema, | 285 schema, |
280 recipient.userhost() if pep else None | 286 recipient.userhost() if pep else None |
281 ) | 287 ) |
282 ) | 288 ) |
283 except cursor._pool.dbapi.IntegrityError as e: | 289 except cursor._pool.dbapi.IntegrityError as e: |
533 cursor.execute("""UPDATE nodes SET persist_items=%s, | 539 cursor.execute("""UPDATE nodes SET persist_items=%s, |
534 deliver_payloads=%s, | 540 deliver_payloads=%s, |
535 send_last_published_item=%s, | 541 send_last_published_item=%s, |
536 access_model=%s, | 542 access_model=%s, |
537 publish_model=%s, | 543 publish_model=%s, |
538 serial_ids=%s | 544 serial_ids=%s, |
545 consistent_publisher=%s | |
539 WHERE node_id=%s""", | 546 WHERE node_id=%s""", |
540 (config[const.OPT_PERSIST_ITEMS], | 547 (config[const.OPT_PERSIST_ITEMS], |
541 config[const.OPT_DELIVER_PAYLOADS], | 548 config[const.OPT_DELIVER_PAYLOADS], |
542 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], | 549 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], |
543 config[const.OPT_ACCESS_MODEL], | 550 config[const.OPT_ACCESS_MODEL], |
544 config[const.OPT_PUBLISH_MODEL], | 551 config[const.OPT_PUBLISH_MODEL], |
545 config[const.OPT_SERIAL_IDS], | 552 config[const.OPT_SERIAL_IDS], |
553 config[const.OPT_CONSISTENT_PUBLISHER], | |
546 self.nodeDbId)) | 554 self.nodeDbId)) |
547 | 555 |
548 def _setCachedConfiguration(self, void, config): | 556 def _setCachedConfiguration(self, void, config): |
549 self._config = config | 557 self._config = config |
550 | 558 |