Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 352:efbdca10f0fb
schema: node schema implementation
node schema is an experimental (not standard yet, protoXEP should follow) feature allowing to attach a data schema to a node.
This commit implement it and method needed to retrieve/set a schema.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 08 Sep 2017 08:02:05 +0200 |
parents | 2098295747fd |
children | 95c83899b5e9 |
comparison
equal
deleted
inserted
replaced
351:2098295747fd | 352:efbdca10f0fb |
---|---|
145 'pubsub#deliver_payloads': row[4], | 145 'pubsub#deliver_payloads': row[4], |
146 'pubsub#send_last_published_item': row[5], | 146 'pubsub#send_last_published_item': row[5], |
147 const.OPT_ACCESS_MODEL:row[6], | 147 const.OPT_ACCESS_MODEL:row[6], |
148 const.OPT_PUBLISH_MODEL:row[7], | 148 const.OPT_PUBLISH_MODEL:row[7], |
149 } | 149 } |
150 node = LeafNode(row[0], row[1], configuration) | 150 schema = row[8] |
151 if schema is not None: | |
152 schema = parseXml(schema) | |
153 node = LeafNode(row[0], row[1], configuration, schema) | |
151 node.dbpool = self.dbpool | 154 node.dbpool = self.dbpool |
152 return node | 155 return node |
153 elif row[2] == 'collection': | 156 elif row[2] == 'collection': |
154 configuration = { | 157 configuration = { |
155 'pubsub#deliver_payloads': row[4], | 158 'pubsub#deliver_payloads': row[4], |
156 'pubsub#send_last_published_item': row[5], | 159 'pubsub#send_last_published_item': row[5], |
157 const.OPT_ACCESS_MODEL: row[6], | 160 const.OPT_ACCESS_MODEL: row[6], |
158 const.OPT_PUBLISH_MODEL:row[7], | 161 const.OPT_PUBLISH_MODEL:row[7], |
159 } | 162 } |
160 node = CollectionNode(row[0], row[1], configuration) | 163 node = CollectionNode(row[0], row[1], configuration, None) |
161 node.dbpool = self.dbpool | 164 node.dbpool = self.dbpool |
162 return node | 165 return node |
163 else: | 166 else: |
164 raise ValueError("Unknown node type !") | 167 raise ValueError("Unknown node type !") |
165 | 168 |
177 persist_items, | 180 persist_items, |
178 deliver_payloads, | 181 deliver_payloads, |
179 send_last_published_item, | 182 send_last_published_item, |
180 access_model, | 183 access_model, |
181 publish_model, | 184 publish_model, |
185 schema::text, | |
182 pep | 186 pep |
183 FROM nodes | 187 FROM nodes |
184 WHERE node_id=%s""", | 188 WHERE node_id=%s""", |
185 (nodeDbId,)) | 189 (nodeDbId,)) |
186 row = cursor.fetchone() | 190 row = cursor.fetchone() |
196 persist_items, | 200 persist_items, |
197 deliver_payloads, | 201 deliver_payloads, |
198 send_last_published_item, | 202 send_last_published_item, |
199 access_model, | 203 access_model, |
200 publish_model, | 204 publish_model, |
205 schema::text, | |
201 pep | 206 pep |
202 FROM nodes | 207 FROM nodes |
203 WHERE node=%s""", | 208 WHERE node=%s""", |
204 (nodeIdentifier,), pep, recipient)) | 209 (nodeIdentifier,), pep, recipient)) |
205 row = cursor.fetchone() | 210 row = cursor.fetchone() |
226 | 231 |
227 d = self.dbpool.runQuery(query, values) | 232 d = self.dbpool.runQuery(query, values) |
228 d.addCallback(lambda results: [r[0] for r in results]) | 233 d.addCallback(lambda results: [r[0] for r in results]) |
229 return d | 234 return d |
230 | 235 |
231 def createNode(self, nodeIdentifier, owner, config, pep, recipient=None): | 236 def createNode(self, nodeIdentifier, owner, config, schema, pep, recipient=None): |
232 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, | 237 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, |
233 owner, config, pep, recipient) | 238 owner, config, schema, pep, recipient) |
234 | 239 |
235 def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient): | 240 def _createNode(self, cursor, nodeIdentifier, owner, config, schema, pep, recipient): |
236 if config['pubsub#node_type'] != 'leaf': | 241 if config['pubsub#node_type'] != 'leaf': |
237 raise error.NoCollections() | 242 raise error.NoCollections() |
238 | 243 |
239 owner = owner.userhost() | 244 owner = owner.userhost() |
240 | 245 |
241 try: | 246 try: |
242 cursor.execute("""INSERT INTO nodes | 247 cursor.execute("""INSERT INTO nodes |
243 (node, node_type, persist_items, | 248 (node, node_type, persist_items, |
244 deliver_payloads, send_last_published_item, access_model, publish_model, pep) | 249 deliver_payloads, send_last_published_item, access_model, publish_model, schema, pep) |
245 VALUES | 250 VALUES |
246 (%s, 'leaf', %s, %s, %s, %s, %s, %s)""", | 251 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s)""", |
247 (nodeIdentifier, | 252 (nodeIdentifier, |
248 config['pubsub#persist_items'], | 253 config['pubsub#persist_items'], |
249 config['pubsub#deliver_payloads'], | 254 config['pubsub#deliver_payloads'], |
250 config['pubsub#send_last_published_item'], | 255 config['pubsub#send_last_published_item'], |
251 config[const.OPT_ACCESS_MODEL], | 256 config[const.OPT_ACCESS_MODEL], |
252 config[const.OPT_PUBLISH_MODEL], | 257 config[const.OPT_PUBLISH_MODEL], |
258 schema, | |
253 recipient.userhost() if pep else None | 259 recipient.userhost() if pep else None |
254 ) | 260 ) |
255 ) | 261 ) |
256 except cursor._pool.dbapi.IntegrityError as e: | 262 except cursor._pool.dbapi.IntegrityError as e: |
257 if e.pgcode == "23505": | 263 if e.pgcode == "23505": |
396 | 402 |
397 class Node: | 403 class Node: |
398 | 404 |
399 implements(iidavoll.INode) | 405 implements(iidavoll.INode) |
400 | 406 |
401 def __init__(self, nodeDbId, nodeIdentifier, config): | 407 def __init__(self, nodeDbId, nodeIdentifier, config, schema): |
402 self.nodeDbId = nodeDbId | 408 self.nodeDbId = nodeDbId |
403 self.nodeIdentifier = nodeIdentifier | 409 self.nodeIdentifier = nodeIdentifier |
404 self._config = config | 410 self._config = config |
411 self._schema = schema | |
405 | 412 |
406 def _checkNodeExists(self, cursor): | 413 def _checkNodeExists(self, cursor): |
407 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", | 414 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", |
408 (self.nodeDbId,)) | 415 (self.nodeDbId,)) |
409 if not cursor.fetchone(): | 416 if not cursor.fetchone(): |
446 config[const.OPT_PUBLISH_MODEL], | 453 config[const.OPT_PUBLISH_MODEL], |
447 self.nodeDbId)) | 454 self.nodeDbId)) |
448 | 455 |
449 def _setCachedConfiguration(self, void, config): | 456 def _setCachedConfiguration(self, void, config): |
450 self._config = config | 457 self._config = config |
458 | |
459 def getSchema(self): | |
460 return self._schema | |
461 | |
462 def setSchema(self, schema): | |
463 d = self.dbpool.runInteraction(self._setSchema, schema) | |
464 d.addCallback(self._setCachedSchema, schema) | |
465 return d | |
466 | |
467 def _setSchema(self, cursor, schema): | |
468 self._checkNodeExists(cursor) | |
469 cursor.execute("""UPDATE nodes SET schema=%s | |
470 WHERE node_id=%s""", | |
471 (schema.toXml() if schema else None, | |
472 self.nodeDbId)) | |
473 | |
474 def _setCachedSchema(self, void, schema): | |
475 self._schema = schema | |
451 | 476 |
452 def getMetaData(self): | 477 def getMetaData(self): |
453 config = copy.copy(self._config) | 478 config = copy.copy(self._config) |
454 config["pubsub#node_type"] = self.nodeType | 479 config["pubsub#node_type"] = self.nodeType |
455 return config | 480 return config |