comparison sat_pubsub/pgsql_storage.py @ 352:efbdca10f0fb

schema: node schema implementation node schema is an experimental (not standard yet, protoXEP should follow) feature allowing to attach a data schema to a node. This commit implement it and method needed to retrieve/set a schema.
author Goffi <goffi@goffi.org>
date Fri, 08 Sep 2017 08:02:05 +0200
parents 2098295747fd
children 95c83899b5e9
comparison
equal deleted inserted replaced
351:2098295747fd 352:efbdca10f0fb
145 'pubsub#deliver_payloads': row[4], 145 'pubsub#deliver_payloads': row[4],
146 'pubsub#send_last_published_item': row[5], 146 'pubsub#send_last_published_item': row[5],
147 const.OPT_ACCESS_MODEL:row[6], 147 const.OPT_ACCESS_MODEL:row[6],
148 const.OPT_PUBLISH_MODEL:row[7], 148 const.OPT_PUBLISH_MODEL:row[7],
149 } 149 }
150 node = LeafNode(row[0], row[1], configuration) 150 schema = row[8]
151 if schema is not None:
152 schema = parseXml(schema)
153 node = LeafNode(row[0], row[1], configuration, schema)
151 node.dbpool = self.dbpool 154 node.dbpool = self.dbpool
152 return node 155 return node
153 elif row[2] == 'collection': 156 elif row[2] == 'collection':
154 configuration = { 157 configuration = {
155 'pubsub#deliver_payloads': row[4], 158 'pubsub#deliver_payloads': row[4],
156 'pubsub#send_last_published_item': row[5], 159 'pubsub#send_last_published_item': row[5],
157 const.OPT_ACCESS_MODEL: row[6], 160 const.OPT_ACCESS_MODEL: row[6],
158 const.OPT_PUBLISH_MODEL:row[7], 161 const.OPT_PUBLISH_MODEL:row[7],
159 } 162 }
160 node = CollectionNode(row[0], row[1], configuration) 163 node = CollectionNode(row[0], row[1], configuration, None)
161 node.dbpool = self.dbpool 164 node.dbpool = self.dbpool
162 return node 165 return node
163 else: 166 else:
164 raise ValueError("Unknown node type !") 167 raise ValueError("Unknown node type !")
165 168
177 persist_items, 180 persist_items,
178 deliver_payloads, 181 deliver_payloads,
179 send_last_published_item, 182 send_last_published_item,
180 access_model, 183 access_model,
181 publish_model, 184 publish_model,
185 schema::text,
182 pep 186 pep
183 FROM nodes 187 FROM nodes
184 WHERE node_id=%s""", 188 WHERE node_id=%s""",
185 (nodeDbId,)) 189 (nodeDbId,))
186 row = cursor.fetchone() 190 row = cursor.fetchone()
196 persist_items, 200 persist_items,
197 deliver_payloads, 201 deliver_payloads,
198 send_last_published_item, 202 send_last_published_item,
199 access_model, 203 access_model,
200 publish_model, 204 publish_model,
205 schema::text,
201 pep 206 pep
202 FROM nodes 207 FROM nodes
203 WHERE node=%s""", 208 WHERE node=%s""",
204 (nodeIdentifier,), pep, recipient)) 209 (nodeIdentifier,), pep, recipient))
205 row = cursor.fetchone() 210 row = cursor.fetchone()
226 231
227 d = self.dbpool.runQuery(query, values) 232 d = self.dbpool.runQuery(query, values)
228 d.addCallback(lambda results: [r[0] for r in results]) 233 d.addCallback(lambda results: [r[0] for r in results])
229 return d 234 return d
230 235
231 def createNode(self, nodeIdentifier, owner, config, pep, recipient=None): 236 def createNode(self, nodeIdentifier, owner, config, schema, pep, recipient=None):
232 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, 237 return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
233 owner, config, pep, recipient) 238 owner, config, schema, pep, recipient)
234 239
235 def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient): 240 def _createNode(self, cursor, nodeIdentifier, owner, config, schema, pep, recipient):
236 if config['pubsub#node_type'] != 'leaf': 241 if config['pubsub#node_type'] != 'leaf':
237 raise error.NoCollections() 242 raise error.NoCollections()
238 243
239 owner = owner.userhost() 244 owner = owner.userhost()
240 245
241 try: 246 try:
242 cursor.execute("""INSERT INTO nodes 247 cursor.execute("""INSERT INTO nodes
243 (node, node_type, persist_items, 248 (node, node_type, persist_items,
244 deliver_payloads, send_last_published_item, access_model, publish_model, pep) 249 deliver_payloads, send_last_published_item, access_model, publish_model, schema, pep)
245 VALUES 250 VALUES
246 (%s, 'leaf', %s, %s, %s, %s, %s, %s)""", 251 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s)""",
247 (nodeIdentifier, 252 (nodeIdentifier,
248 config['pubsub#persist_items'], 253 config['pubsub#persist_items'],
249 config['pubsub#deliver_payloads'], 254 config['pubsub#deliver_payloads'],
250 config['pubsub#send_last_published_item'], 255 config['pubsub#send_last_published_item'],
251 config[const.OPT_ACCESS_MODEL], 256 config[const.OPT_ACCESS_MODEL],
252 config[const.OPT_PUBLISH_MODEL], 257 config[const.OPT_PUBLISH_MODEL],
258 schema,
253 recipient.userhost() if pep else None 259 recipient.userhost() if pep else None
254 ) 260 )
255 ) 261 )
256 except cursor._pool.dbapi.IntegrityError as e: 262 except cursor._pool.dbapi.IntegrityError as e:
257 if e.pgcode == "23505": 263 if e.pgcode == "23505":
396 402
397 class Node: 403 class Node:
398 404
399 implements(iidavoll.INode) 405 implements(iidavoll.INode)
400 406
401 def __init__(self, nodeDbId, nodeIdentifier, config): 407 def __init__(self, nodeDbId, nodeIdentifier, config, schema):
402 self.nodeDbId = nodeDbId 408 self.nodeDbId = nodeDbId
403 self.nodeIdentifier = nodeIdentifier 409 self.nodeIdentifier = nodeIdentifier
404 self._config = config 410 self._config = config
411 self._schema = schema
405 412
406 def _checkNodeExists(self, cursor): 413 def _checkNodeExists(self, cursor):
407 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", 414 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""",
408 (self.nodeDbId,)) 415 (self.nodeDbId,))
409 if not cursor.fetchone(): 416 if not cursor.fetchone():
446 config[const.OPT_PUBLISH_MODEL], 453 config[const.OPT_PUBLISH_MODEL],
447 self.nodeDbId)) 454 self.nodeDbId))
448 455
449 def _setCachedConfiguration(self, void, config): 456 def _setCachedConfiguration(self, void, config):
450 self._config = config 457 self._config = config
458
459 def getSchema(self):
460 return self._schema
461
462 def setSchema(self, schema):
463 d = self.dbpool.runInteraction(self._setSchema, schema)
464 d.addCallback(self._setCachedSchema, schema)
465 return d
466
467 def _setSchema(self, cursor, schema):
468 self._checkNodeExists(cursor)
469 cursor.execute("""UPDATE nodes SET schema=%s
470 WHERE node_id=%s""",
471 (schema.toXml() if schema else None,
472 self.nodeDbId))
473
474 def _setCachedSchema(self, void, schema):
475 self._schema = schema
451 476
452 def getMetaData(self): 477 def getMetaData(self):
453 config = copy.copy(self._config) 478 config = copy.copy(self._config)
454 config["pubsub#node_type"] = self.nodeType 479 config["pubsub#node_type"] = self.nodeType
455 return config 480 return config