comparison src/pgsql_storage.py @ 403:1dc606612405

implemented experimental "consistent_publisher" option: /!\ pgsql schema needs to be updated /!\ New "consistent_publisher" option has been implemented to allow node owners + admins to modify an item while preserving the original publisher. This way, original publisher can still edit the item. In addition to `consistent_publisher`, `max_items` has been added to PGQSL schema to prepare for future implementation.
author Goffi <goffi@goffi.org>
date Wed, 12 Jun 2019 21:51:50 +0200
parents 724e39d596a9
children
comparison
equal deleted inserted replaced
402:724e39d596a9 403:1dc606612405
77 77
78 # parseXml manage str, but we get unicode 78 # parseXml manage str, but we get unicode
79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) 79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
80 ITEMS_SEQ_NAME = u'node_{node_id}_seq' 80 ITEMS_SEQ_NAME = u'node_{node_id}_seq'
81 PEP_COL_NAME = 'pep' 81 PEP_COL_NAME = 'pep'
82 CURRENT_VERSION = '4' 82 CURRENT_VERSION = '5'
83 # retrieve the maximum integer item id + 1 83 # retrieve the maximum integer item id + 1
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" 84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'"
85 85
86 86
87 def withPEP(query, values, pep, recipient): 87 def withPEP(query, values, pep, recipient):
112 const.OPT_DELIVER_PAYLOADS: True, 112 const.OPT_DELIVER_PAYLOADS: True,
113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', 113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, 115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
116 const.OPT_SERIAL_IDS: False, 116 const.OPT_SERIAL_IDS: False,
117 const.OPT_CONSISTENT_PUBLISHER: False,
117 }, 118 },
118 'collection': { 119 'collection': {
119 const.OPT_DELIVER_PAYLOADS: True, 120 const.OPT_DELIVER_PAYLOADS: True,
120 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', 121 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
121 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 122 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
152 'pubsub#deliver_payloads': row[4], 153 'pubsub#deliver_payloads': row[4],
153 'pubsub#send_last_published_item': row[5], 154 'pubsub#send_last_published_item': row[5],
154 const.OPT_ACCESS_MODEL:row[6], 155 const.OPT_ACCESS_MODEL:row[6],
155 const.OPT_PUBLISH_MODEL:row[7], 156 const.OPT_PUBLISH_MODEL:row[7],
156 const.OPT_SERIAL_IDS:row[8], 157 const.OPT_SERIAL_IDS:row[8],
158 const.OPT_CONSISTENT_PUBLISHER:row[9],
157 } 159 }
158 schema = row[9] 160 schema = row[10]
159 if schema is not None: 161 if schema is not None:
160 schema = parseXml(schema) 162 schema = parseXml(schema)
161 node = LeafNode(row[0], row[1], configuration, schema) 163 node = LeafNode(row[0], row[1], configuration, schema)
162 node.dbpool = self.dbpool 164 node.dbpool = self.dbpool
163 return node 165 return node
189 deliver_payloads, 191 deliver_payloads,
190 send_last_published_item, 192 send_last_published_item,
191 access_model, 193 access_model,
192 publish_model, 194 publish_model,
193 serial_ids, 195 serial_ids,
196 consistent_publisher,
194 schema::text, 197 schema::text,
195 pep 198 pep
196 FROM nodes 199 FROM nodes
197 WHERE node_id=%s""", 200 WHERE node_id=%s""",
198 (nodeDbId,)) 201 (nodeDbId,))
210 deliver_payloads, 213 deliver_payloads,
211 send_last_published_item, 214 send_last_published_item,
212 access_model, 215 access_model,
213 publish_model, 216 publish_model,
214 serial_ids, 217 serial_ids,
218 consistent_publisher,
215 schema::text, 219 schema::text,
216 pep 220 pep
217 FROM nodes 221 FROM nodes
218 WHERE node=%s""", 222 WHERE node=%s""",
219 (nodeIdentifier,), pep, recipient)) 223 (nodeIdentifier,), pep, recipient))
263 deliver_payloads, 267 deliver_payloads,
264 send_last_published_item, 268 send_last_published_item,
265 access_model, 269 access_model,
266 publish_model, 270 publish_model,
267 serial_ids, 271 serial_ids,
272 consistent_publisher,
268 schema, 273 schema,
269 pep) 274 pep)
270 VALUES 275 VALUES
271 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s)""", 276 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
272 (nodeIdentifier, 277 (nodeIdentifier,
273 config['pubsub#persist_items'], 278 config['pubsub#persist_items'],
274 config['pubsub#deliver_payloads'], 279 config['pubsub#deliver_payloads'],
275 config['pubsub#send_last_published_item'], 280 config['pubsub#send_last_published_item'],
276 config[const.OPT_ACCESS_MODEL], 281 config[const.OPT_ACCESS_MODEL],
277 config[const.OPT_PUBLISH_MODEL], 282 config[const.OPT_PUBLISH_MODEL],
278 config[const.OPT_SERIAL_IDS], 283 config[const.OPT_SERIAL_IDS],
284 config[const.OPT_CONSISTENT_PUBLISHER],
279 schema, 285 schema,
280 recipient.userhost() if pep else None 286 recipient.userhost() if pep else None
281 ) 287 )
282 ) 288 )
283 except cursor._pool.dbapi.IntegrityError as e: 289 except cursor._pool.dbapi.IntegrityError as e:
533 cursor.execute("""UPDATE nodes SET persist_items=%s, 539 cursor.execute("""UPDATE nodes SET persist_items=%s,
534 deliver_payloads=%s, 540 deliver_payloads=%s,
535 send_last_published_item=%s, 541 send_last_published_item=%s,
536 access_model=%s, 542 access_model=%s,
537 publish_model=%s, 543 publish_model=%s,
538 serial_ids=%s 544 serial_ids=%s,
545 consistent_publisher=%s
539 WHERE node_id=%s""", 546 WHERE node_id=%s""",
540 (config[const.OPT_PERSIST_ITEMS], 547 (config[const.OPT_PERSIST_ITEMS],
541 config[const.OPT_DELIVER_PAYLOADS], 548 config[const.OPT_DELIVER_PAYLOADS],
542 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], 549 config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
543 config[const.OPT_ACCESS_MODEL], 550 config[const.OPT_ACCESS_MODEL],
544 config[const.OPT_PUBLISH_MODEL], 551 config[const.OPT_PUBLISH_MODEL],
545 config[const.OPT_SERIAL_IDS], 552 config[const.OPT_SERIAL_IDS],
553 config[const.OPT_CONSISTENT_PUBLISHER],
546 self.nodeDbId)) 554 self.nodeDbId))
547 555
548 def _setCachedConfiguration(self, void, config): 556 def _setCachedConfiguration(self, void, config):
549 self._config = config 557 self._config = config
550 558