Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 367:a772f7dac930
backend, storage(pgsql): creation/update date + serial ids:
/!\ this patch updates pgqsl schema /!\
Had to set 2 features in the same patch, to avoid updating 2 times the schema.
1) creation/last modification date:
column keeping the date of creation of items is renamed from "date" to "created"
the date of last modification of items is saved in the new "updated" column
2) serial ids:
this experimental feature allows to have ids in series (i.e. 1, 2, 3, etc.) instead of UUID.
This is a convenience feature and there are some drawbacks:
- PostgreSQL sequences are used, so gaps can happen (see PostgreSQL documentation for more details)
- if somebody create an item with a future id in the series, the series will adapt, which can have undesired effect, and may lead to item fail if several items are created at the same time. For instance if next id in series is "8", and somebody hads already created item "8" and "256", the item will be created with biggest value in items +1 (i.e. 257). if 2 people want to create item in this situation, the second will fail with a conflict error.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 04 Nov 2017 21:31:32 +0100 |
parents | 81e6d4a516c3 |
children | 618a92080812 |
comparison
equal
deleted
inserted
replaced
366:81e6d4a516c3 | 367:a772f7dac930 |
---|---|
54 import copy, logging | 54 import copy, logging |
55 | 55 |
56 from zope.interface import implements | 56 from zope.interface import implements |
57 | 57 |
58 from twisted.internet import reactor | 58 from twisted.internet import reactor |
59 from twisted.internet import defer | |
59 from twisted.words.protocols.jabber import jid | 60 from twisted.words.protocols.jabber import jid |
60 from twisted.python import log | 61 from twisted.python import log |
61 | 62 |
62 from wokkel import generic | 63 from wokkel import generic |
63 from wokkel.pubsub import Subscription | 64 from wokkel.pubsub import Subscription |
64 | 65 |
65 from sat_pubsub import error | 66 from sat_pubsub import error |
66 from sat_pubsub import iidavoll | 67 from sat_pubsub import iidavoll |
67 from sat_pubsub import const | 68 from sat_pubsub import const |
68 from sat_pubsub import container | 69 from sat_pubsub import container |
70 from sat_pubsub import exceptions | |
71 import uuid | |
69 import psycopg2 | 72 import psycopg2 |
70 import psycopg2.extensions | 73 import psycopg2.extensions |
71 # we wants psycopg2 to return us unicode, not str | 74 # we wants psycopg2 to return us unicode, not str |
72 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) | 75 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) |
73 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) | 76 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) |
74 | 77 |
75 # parseXml manage str, but we get unicode | 78 # parseXml manage str, but we get unicode |
76 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) | 79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) |
80 ITEMS_SEQ_NAME = u'node_{node_id}_seq' | |
77 PEP_COL_NAME = 'pep' | 81 PEP_COL_NAME = 'pep' |
78 CURRENT_VERSION = '3' | 82 CURRENT_VERSION = '4' |
83 # retrieve the maximum integer item id + 1 | |
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" | |
79 | 85 |
80 | 86 |
81 def withPEP(query, values, pep, recipient): | 87 def withPEP(query, values, pep, recipient): |
82 """Helper method to facilitate PEP management | 88 """Helper method to facilitate PEP management |
83 | 89 |
105 const.OPT_PERSIST_ITEMS: True, | 111 const.OPT_PERSIST_ITEMS: True, |
106 const.OPT_DELIVER_PAYLOADS: True, | 112 const.OPT_DELIVER_PAYLOADS: True, |
107 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | 113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
108 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
109 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, | 115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, |
116 const.OPT_SERIAL_IDS: False, | |
110 }, | 117 }, |
111 'collection': { | 118 'collection': { |
112 const.OPT_DELIVER_PAYLOADS: True, | 119 const.OPT_DELIVER_PAYLOADS: True, |
113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | 120 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 121 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
144 'pubsub#persist_items': row[3], | 151 'pubsub#persist_items': row[3], |
145 'pubsub#deliver_payloads': row[4], | 152 'pubsub#deliver_payloads': row[4], |
146 'pubsub#send_last_published_item': row[5], | 153 'pubsub#send_last_published_item': row[5], |
147 const.OPT_ACCESS_MODEL:row[6], | 154 const.OPT_ACCESS_MODEL:row[6], |
148 const.OPT_PUBLISH_MODEL:row[7], | 155 const.OPT_PUBLISH_MODEL:row[7], |
156 const.OPT_SERIAL_IDS:row[8], | |
149 } | 157 } |
150 schema = row[8] | 158 schema = row[9] |
151 if schema is not None: | 159 if schema is not None: |
152 schema = parseXml(schema) | 160 schema = parseXml(schema) |
153 node = LeafNode(row[0], row[1], configuration, schema) | 161 node = LeafNode(row[0], row[1], configuration, schema) |
154 node.dbpool = self.dbpool | 162 node.dbpool = self.dbpool |
155 return node | 163 return node |
180 persist_items, | 188 persist_items, |
181 deliver_payloads, | 189 deliver_payloads, |
182 send_last_published_item, | 190 send_last_published_item, |
183 access_model, | 191 access_model, |
184 publish_model, | 192 publish_model, |
193 serial_ids, | |
185 schema::text, | 194 schema::text, |
186 pep | 195 pep |
187 FROM nodes | 196 FROM nodes |
188 WHERE node_id=%s""", | 197 WHERE node_id=%s""", |
189 (nodeDbId,)) | 198 (nodeDbId,)) |
200 persist_items, | 209 persist_items, |
201 deliver_payloads, | 210 deliver_payloads, |
202 send_last_published_item, | 211 send_last_published_item, |
203 access_model, | 212 access_model, |
204 publish_model, | 213 publish_model, |
214 serial_ids, | |
205 schema::text, | 215 schema::text, |
206 pep | 216 pep |
207 FROM nodes | 217 FROM nodes |
208 WHERE node=%s""", | 218 WHERE node=%s""", |
209 (nodeIdentifier,), pep, recipient)) | 219 (nodeIdentifier,), pep, recipient)) |
245 | 255 |
246 owner = owner.userhost() | 256 owner = owner.userhost() |
247 | 257 |
248 try: | 258 try: |
249 cursor.execute("""INSERT INTO nodes | 259 cursor.execute("""INSERT INTO nodes |
250 (node, node_type, persist_items, | 260 (node, |
251 deliver_payloads, send_last_published_item, access_model, publish_model, schema, pep) | 261 node_type, |
262 persist_items, | |
263 deliver_payloads, | |
264 send_last_published_item, | |
265 access_model, | |
266 publish_model, | |
267 serial_ids, | |
268 schema, | |
269 pep) | |
252 VALUES | 270 VALUES |
253 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s)""", | 271 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s)""", |
254 (nodeIdentifier, | 272 (nodeIdentifier, |
255 config['pubsub#persist_items'], | 273 config['pubsub#persist_items'], |
256 config['pubsub#deliver_payloads'], | 274 config['pubsub#deliver_payloads'], |
257 config['pubsub#send_last_published_item'], | 275 config['pubsub#send_last_published_item'], |
258 config[const.OPT_ACCESS_MODEL], | 276 config[const.OPT_ACCESS_MODEL], |
259 config[const.OPT_PUBLISH_MODEL], | 277 config[const.OPT_PUBLISH_MODEL], |
278 config[const.OPT_SERIAL_IDS], | |
260 schema, | 279 schema, |
261 recipient.userhost() if pep else None | 280 recipient.userhost() if pep else None |
262 ) | 281 ) |
263 ) | 282 ) |
264 except cursor._pool.dbapi.IntegrityError as e: | 283 except cursor._pool.dbapi.IntegrityError as e: |
280 # if the entry exists the next query will leave the database in a corrupted | 299 # if the entry exists the next query will leave the database in a corrupted |
281 # state: the solution is to rollback. I tried with other methods like | 300 # state: the solution is to rollback. I tried with other methods like |
282 # "WHERE NOT EXISTS" but none of them worked, so the following solution | 301 # "WHERE NOT EXISTS" but none of them worked, so the following solution |
283 # looks like the sole - unless you have auto-commit on. More info | 302 # looks like the sole - unless you have auto-commit on. More info |
284 # about this issue: http://cssmay.com/question/tag/tag-psycopg2 | 303 # about this issue: http://cssmay.com/question/tag/tag-psycopg2 |
285 cursor._connection.commit() | 304 cursor.connection.commit() |
286 try: | 305 try: |
287 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 306 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
288 (owner,)) | 307 (owner,)) |
289 except psycopg2.IntegrityError as e: | 308 except psycopg2.IntegrityError as e: |
290 cursor._connection.rollback() | 309 cursor.connection.rollback() |
291 logging.warning("during node creation: %s" % e.message) | 310 logging.warning("during node creation: %s" % e.message) |
292 | 311 |
293 cursor.execute("""INSERT INTO affiliations | 312 cursor.execute("""INSERT INTO affiliations |
294 (node_id, entity_id, affiliation) | 313 (node_id, entity_id, affiliation) |
295 SELECT %s, entity_id, 'owner' FROM | 314 SELECT %s, entity_id, 'owner' FROM |
306 #TODO: check that group are actually in roster | 325 #TODO: check that group are actually in roster |
307 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) | 326 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) |
308 VALUES (%s,%s)""" , (node_id, group)) | 327 VALUES (%s,%s)""" , (node_id, group)) |
309 # XXX: affiliations can't be set on during node creation (at least not with XEP-0060 alone) | 328 # XXX: affiliations can't be set on during node creation (at least not with XEP-0060 alone) |
310 # so whitelist affiliations need to be done afterward | 329 # so whitelist affiliations need to be done afterward |
330 | |
331 # no we may have to do extra things according to config options | |
332 default_conf = self.defaultConfig['leaf'] | |
333 # XXX: trigger works on node creation because OPT_SERIAL_IDS is False in defaultConfig | |
334 # if this value is changed, the _configurationTriggers method should be adapted. | |
335 Node._configurationTriggers(cursor, node_id, default_conf, config) | |
311 | 336 |
312 def deleteNodeByDbId(self, db_id): | 337 def deleteNodeByDbId(self, db_id): |
313 """Delete a node using directly its database id""" | 338 """Delete a node using directly its database id""" |
314 return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id) | 339 return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id) |
315 | 340 |
446 return d | 471 return d |
447 | 472 |
448 def getConfiguration(self): | 473 def getConfiguration(self): |
449 return self._config | 474 return self._config |
450 | 475 |
476 def getNextId(self): | |
477 """return XMPP item id usable for next item to publish | |
478 | |
479 the return value will be next int if serila_ids is set, | |
480 else an UUID will be returned | |
481 """ | |
482 if self._config[const.OPT_SERIAL_IDS]: | |
483 d = self.dbpool.runQuery("SELECT nextval('{seq_name}')".format( | |
484 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId))) | |
485 d.addCallback(lambda rows: unicode(rows[0][0])) | |
486 return d | |
487 else: | |
488 return defer.succeed(unicode(uuid.uuid4())) | |
489 | |
490 @staticmethod | |
491 def _configurationTriggers(cursor, node_id, old_config, new_config): | |
492 """trigger database relative actions needed when a config is changed | |
493 | |
494 @param cursor(): current db cursor | |
495 @param node_id(unicode): database ID of the node | |
496 @param old_config(dict): config of the node before the change | |
497 @param new_config(dict): new options that will be changed | |
498 """ | |
499 serial_ids = new_config[const.OPT_SERIAL_IDS] | |
500 if serial_ids != old_config[const.OPT_SERIAL_IDS]: | |
501 # serial_ids option has been modified, | |
502 # we need to handle corresponding sequence | |
503 | |
504 # XXX: we use .format in following queries because values | |
505 # are generated by ourself | |
506 seq_name = ITEMS_SEQ_NAME.format(node_id=node_id) | |
507 if serial_ids: | |
508 # the next query get the max value +1 of all XMPP items ids | |
509 # which are integers, and default to 1 | |
510 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=node_id)) | |
511 next_val = cursor.fetchone()[0] | |
512 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name)) | |
513 cursor.execute("CREATE SEQUENCE {seq_name} START {next_val} OWNED BY nodes.node_id".format( | |
514 seq_name = seq_name, | |
515 next_val = next_val)) | |
516 else: | |
517 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name)) | |
518 | |
451 def setConfiguration(self, options): | 519 def setConfiguration(self, options): |
452 config = copy.copy(self._config) | 520 config = copy.copy(self._config) |
453 | 521 |
454 for option in options: | 522 for option in options: |
455 if option in config: | 523 if option in config: |
459 d.addCallback(self._setCachedConfiguration, config) | 527 d.addCallback(self._setCachedConfiguration, config) |
460 return d | 528 return d |
461 | 529 |
462 def _setConfiguration(self, cursor, config): | 530 def _setConfiguration(self, cursor, config): |
463 self._checkNodeExists(cursor) | 531 self._checkNodeExists(cursor) |
532 self._configurationTriggers(cursor, self.nodeDbId, self._config, config) | |
464 cursor.execute("""UPDATE nodes SET persist_items=%s, | 533 cursor.execute("""UPDATE nodes SET persist_items=%s, |
465 deliver_payloads=%s, | 534 deliver_payloads=%s, |
466 send_last_published_item=%s, | 535 send_last_published_item=%s, |
467 access_model=%s, | 536 access_model=%s, |
468 publish_model=%s | 537 publish_model=%s, |
538 serial_ids=%s | |
469 WHERE node_id=%s""", | 539 WHERE node_id=%s""", |
470 (config[const.OPT_PERSIST_ITEMS], | 540 (config[const.OPT_PERSIST_ITEMS], |
471 config[const.OPT_DELIVER_PAYLOADS], | 541 config[const.OPT_DELIVER_PAYLOADS], |
472 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], | 542 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], |
473 config[const.OPT_ACCESS_MODEL], | 543 config[const.OPT_ACCESS_MODEL], |
474 config[const.OPT_PUBLISH_MODEL], | 544 config[const.OPT_PUBLISH_MODEL], |
545 config[const.OPT_SERIAL_IDS], | |
475 self.nodeDbId)) | 546 self.nodeDbId)) |
476 | 547 |
477 def _setCachedConfiguration(self, void, config): | 548 def _setCachedConfiguration(self, void, config): |
478 self._config = config | 549 self._config = config |
479 | 550 |
594 | 665 |
595 try: | 666 try: |
596 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 667 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
597 (userhost,)) | 668 (userhost,)) |
598 except cursor._pool.dbapi.IntegrityError: | 669 except cursor._pool.dbapi.IntegrityError: |
599 cursor._connection.rollback() | 670 cursor.connection.rollback() |
600 | 671 |
601 try: | 672 try: |
602 cursor.execute("""INSERT INTO subscriptions | 673 cursor.execute("""INSERT INTO subscriptions |
603 (node_id, entity_id, resource, state, | 674 (node_id, entity_id, resource, state, |
604 subscription_type, subscription_depth) | 675 subscription_type, subscription_depth) |
770 self._checkNodeExists(cursor) | 841 self._checkNodeExists(cursor) |
771 for item_data in items_data: | 842 for item_data in items_data: |
772 self._storeItem(cursor, item_data, publisher) | 843 self._storeItem(cursor, item_data, publisher) |
773 | 844 |
774 def _storeItem(self, cursor, item_data, publisher): | 845 def _storeItem(self, cursor, item_data, publisher): |
846 # first try to insert the item | |
847 # - if it fails (conflict), and the item is new and we have serial_ids options, | |
848 # current id will be recomputed using next item id query (note that is not perfect, as | |
849 # table is not locked and this can fail if two items are added at the same time | |
850 # but this can only happen with serial_ids and if future ids have been set by a client, | |
851 # this case should be rare enough to consider this situation acceptable) | |
852 # - if item insertion fail and the item is not new, we do an update | |
853 # - in other cases, exception is raised | |
775 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config | 854 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config |
776 data = item.toXml() | 855 data = item.toXml() |
777 | 856 |
778 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s | 857 insert_query = """INSERT INTO items (node_id, item, publisher, data, access_model) |
779 FROM nodes | 858 SELECT %s, %s, %s, %s, %s FROM nodes |
780 WHERE nodes.node_id = items.node_id AND | 859 WHERE node_id=%s |
781 nodes.node_id = %s and items.item=%s | 860 RETURNING item_id""" |
782 RETURNING item_id""", | 861 insert_data = [self.nodeDbId, |
783 (publisher.full(), | 862 item["id"], |
784 data, | 863 publisher.full(), |
785 self.nodeDbId, | 864 data, |
786 item["id"])) | 865 access_model, |
787 if cursor.rowcount == 1: | 866 self.nodeDbId] |
788 item_id = cursor.fetchone()[0]; | 867 |
789 self._storeCategories(cursor, item_id, item_data.categories, update=True) | 868 try: |
790 return | 869 cursor.execute(insert_query, insert_data) |
791 | 870 except cursor._pool.dbapi.IntegrityError as e: |
792 cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model) | 871 if e.pgcode != "23505": |
793 SELECT %s, %s, %s, %s, %s FROM nodes | 872 # we only handle unique_violation, every other exception must be raised |
794 WHERE node_id=%s | 873 raise e |
795 RETURNING item_id""", | 874 cursor.connection.rollback() |
796 (self.nodeDbId, | 875 # the item already exist |
797 item["id"], | 876 if item_data.new: |
798 publisher.full(), | 877 # the item is new |
799 data, | 878 if self._config[const.OPT_SERIAL_IDS]: |
800 access_model, | 879 # this can happen with serial_ids, if a item has been stored |
801 self.nodeDbId)) | 880 # with a future id (generated by XMPP client) |
881 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=self.nodeDbId)) | |
882 next_id = cursor.fetchone()[0] | |
883 # we update the sequence, so we can skip conflicting ids | |
884 cursor.execute(u"SELECT setval('{seq_name}', %s)".format( | |
885 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)), [next_id]) | |
886 # and now we can retry the query with the new id | |
887 item['id'] = insert_data[1] = unicode(next_id) | |
888 # item saved in DB must also be updated with the new id | |
889 insert_data[3] = item.toXml() | |
890 cursor.execute(insert_query, insert_data) | |
891 else: | |
892 # but if we have not serial_ids, we have a real problem | |
893 raise e | |
894 else: | |
895 # this is an update | |
896 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s | |
897 FROM nodes | |
898 WHERE nodes.node_id = items.node_id AND | |
899 nodes.node_id = %s and items.item=%s | |
900 RETURNING item_id""", | |
901 (publisher.full(), | |
902 data, | |
903 self.nodeDbId, | |
904 item["id"])) | |
905 if cursor.rowcount != 1: | |
906 raise exceptions.InternalError("item has not been updated correctly") | |
907 item_id = cursor.fetchone()[0]; | |
908 self._storeCategories(cursor, item_id, item_data.categories, update=True) | |
909 return | |
802 | 910 |
803 item_id = cursor.fetchone()[0]; | 911 item_id = cursor.fetchone()[0]; |
804 self._storeCategories(cursor, item_id, item_data.categories) | 912 self._storeCategories(cursor, item_id, item_data.categories) |
805 | 913 |
806 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | 914 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: |
899 | 1007 |
900 # FILTERS | 1008 # FILTERS |
901 if 'filters' in ext_data: # MAM filters | 1009 if 'filters' in ext_data: # MAM filters |
902 for filter_ in ext_data['filters']: | 1010 for filter_ in ext_data['filters']: |
903 if filter_.var == 'start': | 1011 if filter_.var == 'start': |
904 query_filters.append("AND date>=%s") | 1012 query_filters.append("AND created>=%s") |
905 args.append(filter_.value) | 1013 args.append(filter_.value) |
906 elif filter_.var == 'end': | 1014 elif filter_.var == 'end': |
907 query_filters.append("AND date<=%s") | 1015 query_filters.append("AND created<=%s") |
908 args.append(filter_.value) | 1016 args.append(filter_.value) |
909 elif filter_.var == 'with': | 1017 elif filter_.var == 'with': |
910 jid_s = filter_.value | 1018 jid_s = filter_.value |
911 if '/' in jid_s: | 1019 if '/' in jid_s: |
912 query_filters.append("AND publisher=%s") | 1020 query_filters.append("AND publisher=%s") |
935 | 1043 |
936 # SELECT | 1044 # SELECT |
937 if ids_only: | 1045 if ids_only: |
938 query = ["SELECT item"] | 1046 query = ["SELECT item"] |
939 else: | 1047 else: |
940 query = ["SELECT data::text,items.access_model,item_id,date"] | 1048 query = ["SELECT data::text,items.access_model,item_id,created,updated"] |
941 | 1049 |
942 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) | 1050 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) |
943 | 1051 |
944 if 'rsm' in ext_data: | 1052 if 'rsm' in ext_data: |
945 rsm = ext_data['rsm'] | 1053 rsm = ext_data['rsm'] |
987 ret = [] | 1095 ret = [] |
988 for item_data in result: | 1096 for item_data in result: |
989 item = generic.stripNamespace(parseXml(item_data.data)) | 1097 item = generic.stripNamespace(parseXml(item_data.data)) |
990 access_model = item_data.access_model | 1098 access_model = item_data.access_model |
991 item_id = item_data.item_id | 1099 item_id = item_data.item_id |
992 date = item_data.date | 1100 created = item_data.created |
1101 updated = item_data.updated | |
993 access_list = {} | 1102 access_list = {} |
994 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | 1103 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: |
995 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | 1104 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) |
996 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r.groupname for r in cursor.fetchall()] | 1105 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r.groupname for r in cursor.fetchall()] |
997 | 1106 |
998 ret.append(container.ItemData(item, access_model, access_list, date=date)) | 1107 ret.append(container.ItemData(item, access_model, access_list, created=created, updated=updated)) |
999 # TODO: whitelist item access model | 1108 # TODO: whitelist item access model |
1000 return ret | 1109 return ret |
1001 | 1110 |
1002 if ids_only: | 1111 if ids_only: |
1003 return [r.item for r in result] | 1112 return [r.item for r in result] |
1004 else: | 1113 else: |
1005 items_data = [container.ItemData(generic.stripNamespace(parseXml(r.data)), r.access_model, date=r.date) for r in result] | 1114 items_data = [container.ItemData(generic.stripNamespace(parseXml(r.data)), r.access_model, created=r.created, updated=r.updated) for r in result] |
1006 return items_data | 1115 return items_data |
1007 | 1116 |
1008 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): | 1117 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): |
1009 """Get items which are in the given list | 1118 """Get items which are in the given list |
1010 | 1119 |
1020 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): | 1129 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): |
1021 self._checkNodeExists(cursor) | 1130 self._checkNodeExists(cursor) |
1022 ret = [] | 1131 ret = [] |
1023 if unrestricted: #we get everything without checking permissions | 1132 if unrestricted: #we get everything without checking permissions |
1024 for itemIdentifier in itemIdentifiers: | 1133 for itemIdentifier in itemIdentifiers: |
1025 cursor.execute("""SELECT data::text,items.access_model,item_id,date FROM nodes | 1134 cursor.execute("""SELECT data::text,items.access_model,item_id,created,updated FROM nodes |
1026 INNER JOIN items USING (node_id) | 1135 INNER JOIN items USING (node_id) |
1027 WHERE node_id=%s AND item=%s""", | 1136 WHERE node_id=%s AND item=%s""", |
1028 (self.nodeDbId, | 1137 (self.nodeDbId, |
1029 itemIdentifier)) | 1138 itemIdentifier)) |
1030 result = cursor.fetchone() | 1139 result = cursor.fetchone() |
1032 raise error.ItemNotFound() | 1141 raise error.ItemNotFound() |
1033 | 1142 |
1034 item = generic.stripNamespace(parseXml(result[0])) | 1143 item = generic.stripNamespace(parseXml(result[0])) |
1035 access_model = result[1] | 1144 access_model = result[1] |
1036 item_id = result[2] | 1145 item_id = result[2] |
1037 date= result[3] | 1146 created= result[3] |
1147 updated= result[4] | |
1038 access_list = {} | 1148 access_list = {} |
1039 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | 1149 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: |
1040 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | 1150 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) |
1041 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] | 1151 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] |
1042 #TODO: WHITELIST access_model | 1152 #TODO: WHITELIST access_model |
1043 | 1153 |
1044 ret.append(container.ItemData(item, access_model, access_list, date=date)) | 1154 ret.append(container.ItemData(item, access_model, access_list, created=created, updated=updated)) |
1045 else: #we check permission before returning items | 1155 else: #we check permission before returning items |
1046 for itemIdentifier in itemIdentifiers: | 1156 for itemIdentifier in itemIdentifiers: |
1047 args = [self.nodeDbId, itemIdentifier] | 1157 args = [self.nodeDbId, itemIdentifier] |
1048 if authorized_groups: | 1158 if authorized_groups: |
1049 args.append(authorized_groups) | 1159 args.append(authorized_groups) |
1050 cursor.execute("""SELECT data::text, date FROM nodes | 1160 cursor.execute("""SELECT data::text, created, updated FROM nodes |
1051 INNER JOIN items USING (node_id) | 1161 INNER JOIN items USING (node_id) |
1052 LEFT JOIN item_groups_authorized USING (item_id) | 1162 LEFT JOIN item_groups_authorized USING (item_id) |
1053 WHERE node_id=%s AND item=%s AND | 1163 WHERE node_id=%s AND item=%s AND |
1054 (items.access_model='open' """ + | 1164 (items.access_model='open' """ + |
1055 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", | 1165 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", |
1056 args) | 1166 args) |
1057 | 1167 |
1058 result = cursor.fetchone() | 1168 result = cursor.fetchone() |
1059 if result: | 1169 if result: |
1060 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), date=result[1])) | 1170 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), created=result[1], updated=result[2])) |
1061 | 1171 |
1062 return ret | 1172 return ret |
1063 | 1173 |
1064 def getItemsCount(self, authorized_groups, unrestricted, ext_data=None): | 1174 def getItemsCount(self, authorized_groups, unrestricted, ext_data=None): |
1065 """Count expected number of items in a getItems query | 1175 """Count expected number of items in a getItems query |