comparison sat_pubsub/pgsql_storage.py @ 367:a772f7dac930

backend, storage(pgsql): creation/update date + serial ids: /!\ this patch updates pgqsl schema /!\ Had to set 2 features in the same patch, to avoid updating 2 times the schema. 1) creation/last modification date: column keeping the date of creation of items is renamed from "date" to "created" the date of last modification of items is saved in the new "updated" column 2) serial ids: this experimental feature allows to have ids in series (i.e. 1, 2, 3, etc.) instead of UUID. This is a convenience feature and there are some drawbacks: - PostgreSQL sequences are used, so gaps can happen (see PostgreSQL documentation for more details) - if somebody create an item with a future id in the series, the series will adapt, which can have undesired effect, and may lead to item fail if several items are created at the same time. For instance if next id in series is "8", and somebody hads already created item "8" and "256", the item will be created with biggest value in items +1 (i.e. 257). if 2 people want to create item in this situation, the second will fail with a conflict error.
author Goffi <goffi@goffi.org>
date Sat, 04 Nov 2017 21:31:32 +0100
parents 81e6d4a516c3
children 618a92080812
comparison
equal deleted inserted replaced
366:81e6d4a516c3 367:a772f7dac930
54 import copy, logging 54 import copy, logging
55 55
56 from zope.interface import implements 56 from zope.interface import implements
57 57
58 from twisted.internet import reactor 58 from twisted.internet import reactor
59 from twisted.internet import defer
59 from twisted.words.protocols.jabber import jid 60 from twisted.words.protocols.jabber import jid
60 from twisted.python import log 61 from twisted.python import log
61 62
62 from wokkel import generic 63 from wokkel import generic
63 from wokkel.pubsub import Subscription 64 from wokkel.pubsub import Subscription
64 65
65 from sat_pubsub import error 66 from sat_pubsub import error
66 from sat_pubsub import iidavoll 67 from sat_pubsub import iidavoll
67 from sat_pubsub import const 68 from sat_pubsub import const
68 from sat_pubsub import container 69 from sat_pubsub import container
70 from sat_pubsub import exceptions
71 import uuid
69 import psycopg2 72 import psycopg2
70 import psycopg2.extensions 73 import psycopg2.extensions
71 # we wants psycopg2 to return us unicode, not str 74 # we wants psycopg2 to return us unicode, not str
72 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) 75 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
73 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) 76 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
74 77
75 # parseXml manage str, but we get unicode 78 # parseXml manage str, but we get unicode
76 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) 79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
80 ITEMS_SEQ_NAME = u'node_{node_id}_seq'
77 PEP_COL_NAME = 'pep' 81 PEP_COL_NAME = 'pep'
78 CURRENT_VERSION = '3' 82 CURRENT_VERSION = '4'
83 # retrieve the maximum integer item id + 1
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'"
79 85
80 86
81 def withPEP(query, values, pep, recipient): 87 def withPEP(query, values, pep, recipient):
82 """Helper method to facilitate PEP management 88 """Helper method to facilitate PEP management
83 89
105 const.OPT_PERSIST_ITEMS: True, 111 const.OPT_PERSIST_ITEMS: True,
106 const.OPT_DELIVER_PAYLOADS: True, 112 const.OPT_DELIVER_PAYLOADS: True,
107 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', 113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
108 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
109 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, 115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
116 const.OPT_SERIAL_IDS: False,
110 }, 117 },
111 'collection': { 118 'collection': {
112 const.OPT_DELIVER_PAYLOADS: True, 119 const.OPT_DELIVER_PAYLOADS: True,
113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', 120 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 121 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
144 'pubsub#persist_items': row[3], 151 'pubsub#persist_items': row[3],
145 'pubsub#deliver_payloads': row[4], 152 'pubsub#deliver_payloads': row[4],
146 'pubsub#send_last_published_item': row[5], 153 'pubsub#send_last_published_item': row[5],
147 const.OPT_ACCESS_MODEL:row[6], 154 const.OPT_ACCESS_MODEL:row[6],
148 const.OPT_PUBLISH_MODEL:row[7], 155 const.OPT_PUBLISH_MODEL:row[7],
156 const.OPT_SERIAL_IDS:row[8],
149 } 157 }
150 schema = row[8] 158 schema = row[9]
151 if schema is not None: 159 if schema is not None:
152 schema = parseXml(schema) 160 schema = parseXml(schema)
153 node = LeafNode(row[0], row[1], configuration, schema) 161 node = LeafNode(row[0], row[1], configuration, schema)
154 node.dbpool = self.dbpool 162 node.dbpool = self.dbpool
155 return node 163 return node
180 persist_items, 188 persist_items,
181 deliver_payloads, 189 deliver_payloads,
182 send_last_published_item, 190 send_last_published_item,
183 access_model, 191 access_model,
184 publish_model, 192 publish_model,
193 serial_ids,
185 schema::text, 194 schema::text,
186 pep 195 pep
187 FROM nodes 196 FROM nodes
188 WHERE node_id=%s""", 197 WHERE node_id=%s""",
189 (nodeDbId,)) 198 (nodeDbId,))
200 persist_items, 209 persist_items,
201 deliver_payloads, 210 deliver_payloads,
202 send_last_published_item, 211 send_last_published_item,
203 access_model, 212 access_model,
204 publish_model, 213 publish_model,
214 serial_ids,
205 schema::text, 215 schema::text,
206 pep 216 pep
207 FROM nodes 217 FROM nodes
208 WHERE node=%s""", 218 WHERE node=%s""",
209 (nodeIdentifier,), pep, recipient)) 219 (nodeIdentifier,), pep, recipient))
245 255
246 owner = owner.userhost() 256 owner = owner.userhost()
247 257
248 try: 258 try:
249 cursor.execute("""INSERT INTO nodes 259 cursor.execute("""INSERT INTO nodes
250 (node, node_type, persist_items, 260 (node,
251 deliver_payloads, send_last_published_item, access_model, publish_model, schema, pep) 261 node_type,
262 persist_items,
263 deliver_payloads,
264 send_last_published_item,
265 access_model,
266 publish_model,
267 serial_ids,
268 schema,
269 pep)
252 VALUES 270 VALUES
253 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s)""", 271 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s)""",
254 (nodeIdentifier, 272 (nodeIdentifier,
255 config['pubsub#persist_items'], 273 config['pubsub#persist_items'],
256 config['pubsub#deliver_payloads'], 274 config['pubsub#deliver_payloads'],
257 config['pubsub#send_last_published_item'], 275 config['pubsub#send_last_published_item'],
258 config[const.OPT_ACCESS_MODEL], 276 config[const.OPT_ACCESS_MODEL],
259 config[const.OPT_PUBLISH_MODEL], 277 config[const.OPT_PUBLISH_MODEL],
278 config[const.OPT_SERIAL_IDS],
260 schema, 279 schema,
261 recipient.userhost() if pep else None 280 recipient.userhost() if pep else None
262 ) 281 )
263 ) 282 )
264 except cursor._pool.dbapi.IntegrityError as e: 283 except cursor._pool.dbapi.IntegrityError as e:
280 # if the entry exists the next query will leave the database in a corrupted 299 # if the entry exists the next query will leave the database in a corrupted
281 # state: the solution is to rollback. I tried with other methods like 300 # state: the solution is to rollback. I tried with other methods like
282 # "WHERE NOT EXISTS" but none of them worked, so the following solution 301 # "WHERE NOT EXISTS" but none of them worked, so the following solution
283 # looks like the sole - unless you have auto-commit on. More info 302 # looks like the sole - unless you have auto-commit on. More info
284 # about this issue: http://cssmay.com/question/tag/tag-psycopg2 303 # about this issue: http://cssmay.com/question/tag/tag-psycopg2
285 cursor._connection.commit() 304 cursor.connection.commit()
286 try: 305 try:
287 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", 306 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
288 (owner,)) 307 (owner,))
289 except psycopg2.IntegrityError as e: 308 except psycopg2.IntegrityError as e:
290 cursor._connection.rollback() 309 cursor.connection.rollback()
291 logging.warning("during node creation: %s" % e.message) 310 logging.warning("during node creation: %s" % e.message)
292 311
293 cursor.execute("""INSERT INTO affiliations 312 cursor.execute("""INSERT INTO affiliations
294 (node_id, entity_id, affiliation) 313 (node_id, entity_id, affiliation)
295 SELECT %s, entity_id, 'owner' FROM 314 SELECT %s, entity_id, 'owner' FROM
306 #TODO: check that group are actually in roster 325 #TODO: check that group are actually in roster
307 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) 326 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname)
308 VALUES (%s,%s)""" , (node_id, group)) 327 VALUES (%s,%s)""" , (node_id, group))
309 # XXX: affiliations can't be set on during node creation (at least not with XEP-0060 alone) 328 # XXX: affiliations can't be set on during node creation (at least not with XEP-0060 alone)
310 # so whitelist affiliations need to be done afterward 329 # so whitelist affiliations need to be done afterward
330
331 # no we may have to do extra things according to config options
332 default_conf = self.defaultConfig['leaf']
333 # XXX: trigger works on node creation because OPT_SERIAL_IDS is False in defaultConfig
334 # if this value is changed, the _configurationTriggers method should be adapted.
335 Node._configurationTriggers(cursor, node_id, default_conf, config)
311 336
312 def deleteNodeByDbId(self, db_id): 337 def deleteNodeByDbId(self, db_id):
313 """Delete a node using directly its database id""" 338 """Delete a node using directly its database id"""
314 return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id) 339 return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id)
315 340
446 return d 471 return d
447 472
448 def getConfiguration(self): 473 def getConfiguration(self):
449 return self._config 474 return self._config
450 475
476 def getNextId(self):
477 """return XMPP item id usable for next item to publish
478
479 the return value will be next int if serila_ids is set,
480 else an UUID will be returned
481 """
482 if self._config[const.OPT_SERIAL_IDS]:
483 d = self.dbpool.runQuery("SELECT nextval('{seq_name}')".format(
484 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)))
485 d.addCallback(lambda rows: unicode(rows[0][0]))
486 return d
487 else:
488 return defer.succeed(unicode(uuid.uuid4()))
489
490 @staticmethod
491 def _configurationTriggers(cursor, node_id, old_config, new_config):
492 """trigger database relative actions needed when a config is changed
493
494 @param cursor(): current db cursor
495 @param node_id(unicode): database ID of the node
496 @param old_config(dict): config of the node before the change
497 @param new_config(dict): new options that will be changed
498 """
499 serial_ids = new_config[const.OPT_SERIAL_IDS]
500 if serial_ids != old_config[const.OPT_SERIAL_IDS]:
501 # serial_ids option has been modified,
502 # we need to handle corresponding sequence
503
504 # XXX: we use .format in following queries because values
505 # are generated by ourself
506 seq_name = ITEMS_SEQ_NAME.format(node_id=node_id)
507 if serial_ids:
508 # the next query get the max value +1 of all XMPP items ids
509 # which are integers, and default to 1
510 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=node_id))
511 next_val = cursor.fetchone()[0]
512 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name))
513 cursor.execute("CREATE SEQUENCE {seq_name} START {next_val} OWNED BY nodes.node_id".format(
514 seq_name = seq_name,
515 next_val = next_val))
516 else:
517 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name))
518
451 def setConfiguration(self, options): 519 def setConfiguration(self, options):
452 config = copy.copy(self._config) 520 config = copy.copy(self._config)
453 521
454 for option in options: 522 for option in options:
455 if option in config: 523 if option in config:
459 d.addCallback(self._setCachedConfiguration, config) 527 d.addCallback(self._setCachedConfiguration, config)
460 return d 528 return d
461 529
462 def _setConfiguration(self, cursor, config): 530 def _setConfiguration(self, cursor, config):
463 self._checkNodeExists(cursor) 531 self._checkNodeExists(cursor)
532 self._configurationTriggers(cursor, self.nodeDbId, self._config, config)
464 cursor.execute("""UPDATE nodes SET persist_items=%s, 533 cursor.execute("""UPDATE nodes SET persist_items=%s,
465 deliver_payloads=%s, 534 deliver_payloads=%s,
466 send_last_published_item=%s, 535 send_last_published_item=%s,
467 access_model=%s, 536 access_model=%s,
468 publish_model=%s 537 publish_model=%s,
538 serial_ids=%s
469 WHERE node_id=%s""", 539 WHERE node_id=%s""",
470 (config[const.OPT_PERSIST_ITEMS], 540 (config[const.OPT_PERSIST_ITEMS],
471 config[const.OPT_DELIVER_PAYLOADS], 541 config[const.OPT_DELIVER_PAYLOADS],
472 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], 542 config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
473 config[const.OPT_ACCESS_MODEL], 543 config[const.OPT_ACCESS_MODEL],
474 config[const.OPT_PUBLISH_MODEL], 544 config[const.OPT_PUBLISH_MODEL],
545 config[const.OPT_SERIAL_IDS],
475 self.nodeDbId)) 546 self.nodeDbId))
476 547
477 def _setCachedConfiguration(self, void, config): 548 def _setCachedConfiguration(self, void, config):
478 self._config = config 549 self._config = config
479 550
594 665
595 try: 666 try:
596 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", 667 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
597 (userhost,)) 668 (userhost,))
598 except cursor._pool.dbapi.IntegrityError: 669 except cursor._pool.dbapi.IntegrityError:
599 cursor._connection.rollback() 670 cursor.connection.rollback()
600 671
601 try: 672 try:
602 cursor.execute("""INSERT INTO subscriptions 673 cursor.execute("""INSERT INTO subscriptions
603 (node_id, entity_id, resource, state, 674 (node_id, entity_id, resource, state,
604 subscription_type, subscription_depth) 675 subscription_type, subscription_depth)
770 self._checkNodeExists(cursor) 841 self._checkNodeExists(cursor)
771 for item_data in items_data: 842 for item_data in items_data:
772 self._storeItem(cursor, item_data, publisher) 843 self._storeItem(cursor, item_data, publisher)
773 844
774 def _storeItem(self, cursor, item_data, publisher): 845 def _storeItem(self, cursor, item_data, publisher):
846 # first try to insert the item
847 # - if it fails (conflict), and the item is new and we have serial_ids options,
848 # current id will be recomputed using next item id query (note that is not perfect, as
849 # table is not locked and this can fail if two items are added at the same time
850 # but this can only happen with serial_ids and if future ids have been set by a client,
851 # this case should be rare enough to consider this situation acceptable)
852 # - if item insertion fail and the item is not new, we do an update
853 # - in other cases, exception is raised
775 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config 854 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config
776 data = item.toXml() 855 data = item.toXml()
777 856
778 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s 857 insert_query = """INSERT INTO items (node_id, item, publisher, data, access_model)
779 FROM nodes 858 SELECT %s, %s, %s, %s, %s FROM nodes
780 WHERE nodes.node_id = items.node_id AND 859 WHERE node_id=%s
781 nodes.node_id = %s and items.item=%s 860 RETURNING item_id"""
782 RETURNING item_id""", 861 insert_data = [self.nodeDbId,
783 (publisher.full(), 862 item["id"],
784 data, 863 publisher.full(),
785 self.nodeDbId, 864 data,
786 item["id"])) 865 access_model,
787 if cursor.rowcount == 1: 866 self.nodeDbId]
788 item_id = cursor.fetchone()[0]; 867
789 self._storeCategories(cursor, item_id, item_data.categories, update=True) 868 try:
790 return 869 cursor.execute(insert_query, insert_data)
791 870 except cursor._pool.dbapi.IntegrityError as e:
792 cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model) 871 if e.pgcode != "23505":
793 SELECT %s, %s, %s, %s, %s FROM nodes 872 # we only handle unique_violation, every other exception must be raised
794 WHERE node_id=%s 873 raise e
795 RETURNING item_id""", 874 cursor.connection.rollback()
796 (self.nodeDbId, 875 # the item already exist
797 item["id"], 876 if item_data.new:
798 publisher.full(), 877 # the item is new
799 data, 878 if self._config[const.OPT_SERIAL_IDS]:
800 access_model, 879 # this can happen with serial_ids, if a item has been stored
801 self.nodeDbId)) 880 # with a future id (generated by XMPP client)
881 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=self.nodeDbId))
882 next_id = cursor.fetchone()[0]
883 # we update the sequence, so we can skip conflicting ids
884 cursor.execute(u"SELECT setval('{seq_name}', %s)".format(
885 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)), [next_id])
886 # and now we can retry the query with the new id
887 item['id'] = insert_data[1] = unicode(next_id)
888 # item saved in DB must also be updated with the new id
889 insert_data[3] = item.toXml()
890 cursor.execute(insert_query, insert_data)
891 else:
892 # but if we have not serial_ids, we have a real problem
893 raise e
894 else:
895 # this is an update
896 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s
897 FROM nodes
898 WHERE nodes.node_id = items.node_id AND
899 nodes.node_id = %s and items.item=%s
900 RETURNING item_id""",
901 (publisher.full(),
902 data,
903 self.nodeDbId,
904 item["id"]))
905 if cursor.rowcount != 1:
906 raise exceptions.InternalError("item has not been updated correctly")
907 item_id = cursor.fetchone()[0];
908 self._storeCategories(cursor, item_id, item_data.categories, update=True)
909 return
802 910
803 item_id = cursor.fetchone()[0]; 911 item_id = cursor.fetchone()[0];
804 self._storeCategories(cursor, item_id, item_data.categories) 912 self._storeCategories(cursor, item_id, item_data.categories)
805 913
806 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: 914 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
899 1007
900 # FILTERS 1008 # FILTERS
901 if 'filters' in ext_data: # MAM filters 1009 if 'filters' in ext_data: # MAM filters
902 for filter_ in ext_data['filters']: 1010 for filter_ in ext_data['filters']:
903 if filter_.var == 'start': 1011 if filter_.var == 'start':
904 query_filters.append("AND date>=%s") 1012 query_filters.append("AND created>=%s")
905 args.append(filter_.value) 1013 args.append(filter_.value)
906 elif filter_.var == 'end': 1014 elif filter_.var == 'end':
907 query_filters.append("AND date<=%s") 1015 query_filters.append("AND created<=%s")
908 args.append(filter_.value) 1016 args.append(filter_.value)
909 elif filter_.var == 'with': 1017 elif filter_.var == 'with':
910 jid_s = filter_.value 1018 jid_s = filter_.value
911 if '/' in jid_s: 1019 if '/' in jid_s:
912 query_filters.append("AND publisher=%s") 1020 query_filters.append("AND publisher=%s")
935 1043
936 # SELECT 1044 # SELECT
937 if ids_only: 1045 if ids_only:
938 query = ["SELECT item"] 1046 query = ["SELECT item"]
939 else: 1047 else:
940 query = ["SELECT data::text,items.access_model,item_id,date"] 1048 query = ["SELECT data::text,items.access_model,item_id,created,updated"]
941 1049
942 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) 1050 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
943 1051
944 if 'rsm' in ext_data: 1052 if 'rsm' in ext_data:
945 rsm = ext_data['rsm'] 1053 rsm = ext_data['rsm']
987 ret = [] 1095 ret = []
988 for item_data in result: 1096 for item_data in result:
989 item = generic.stripNamespace(parseXml(item_data.data)) 1097 item = generic.stripNamespace(parseXml(item_data.data))
990 access_model = item_data.access_model 1098 access_model = item_data.access_model
991 item_id = item_data.item_id 1099 item_id = item_data.item_id
992 date = item_data.date 1100 created = item_data.created
1101 updated = item_data.updated
993 access_list = {} 1102 access_list = {}
994 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: 1103 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
995 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) 1104 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
996 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r.groupname for r in cursor.fetchall()] 1105 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r.groupname for r in cursor.fetchall()]
997 1106
998 ret.append(container.ItemData(item, access_model, access_list, date=date)) 1107 ret.append(container.ItemData(item, access_model, access_list, created=created, updated=updated))
999 # TODO: whitelist item access model 1108 # TODO: whitelist item access model
1000 return ret 1109 return ret
1001 1110
1002 if ids_only: 1111 if ids_only:
1003 return [r.item for r in result] 1112 return [r.item for r in result]
1004 else: 1113 else:
1005 items_data = [container.ItemData(generic.stripNamespace(parseXml(r.data)), r.access_model, date=r.date) for r in result] 1114 items_data = [container.ItemData(generic.stripNamespace(parseXml(r.data)), r.access_model, created=r.created, updated=r.updated) for r in result]
1006 return items_data 1115 return items_data
1007 1116
1008 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): 1117 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers):
1009 """Get items which are in the given list 1118 """Get items which are in the given list
1010 1119
1020 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): 1129 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers):
1021 self._checkNodeExists(cursor) 1130 self._checkNodeExists(cursor)
1022 ret = [] 1131 ret = []
1023 if unrestricted: #we get everything without checking permissions 1132 if unrestricted: #we get everything without checking permissions
1024 for itemIdentifier in itemIdentifiers: 1133 for itemIdentifier in itemIdentifiers:
1025 cursor.execute("""SELECT data::text,items.access_model,item_id,date FROM nodes 1134 cursor.execute("""SELECT data::text,items.access_model,item_id,created,updated FROM nodes
1026 INNER JOIN items USING (node_id) 1135 INNER JOIN items USING (node_id)
1027 WHERE node_id=%s AND item=%s""", 1136 WHERE node_id=%s AND item=%s""",
1028 (self.nodeDbId, 1137 (self.nodeDbId,
1029 itemIdentifier)) 1138 itemIdentifier))
1030 result = cursor.fetchone() 1139 result = cursor.fetchone()
1032 raise error.ItemNotFound() 1141 raise error.ItemNotFound()
1033 1142
1034 item = generic.stripNamespace(parseXml(result[0])) 1143 item = generic.stripNamespace(parseXml(result[0]))
1035 access_model = result[1] 1144 access_model = result[1]
1036 item_id = result[2] 1145 item_id = result[2]
1037 date= result[3] 1146 created= result[3]
1147 updated= result[4]
1038 access_list = {} 1148 access_list = {}
1039 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: 1149 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
1040 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) 1150 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
1041 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] 1151 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
1042 #TODO: WHITELIST access_model 1152 #TODO: WHITELIST access_model
1043 1153
1044 ret.append(container.ItemData(item, access_model, access_list, date=date)) 1154 ret.append(container.ItemData(item, access_model, access_list, created=created, updated=updated))
1045 else: #we check permission before returning items 1155 else: #we check permission before returning items
1046 for itemIdentifier in itemIdentifiers: 1156 for itemIdentifier in itemIdentifiers:
1047 args = [self.nodeDbId, itemIdentifier] 1157 args = [self.nodeDbId, itemIdentifier]
1048 if authorized_groups: 1158 if authorized_groups:
1049 args.append(authorized_groups) 1159 args.append(authorized_groups)
1050 cursor.execute("""SELECT data::text, date FROM nodes 1160 cursor.execute("""SELECT data::text, created, updated FROM nodes
1051 INNER JOIN items USING (node_id) 1161 INNER JOIN items USING (node_id)
1052 LEFT JOIN item_groups_authorized USING (item_id) 1162 LEFT JOIN item_groups_authorized USING (item_id)
1053 WHERE node_id=%s AND item=%s AND 1163 WHERE node_id=%s AND item=%s AND
1054 (items.access_model='open' """ + 1164 (items.access_model='open' """ +
1055 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", 1165 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")",
1056 args) 1166 args)
1057 1167
1058 result = cursor.fetchone() 1168 result = cursor.fetchone()
1059 if result: 1169 if result:
1060 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), date=result[1])) 1170 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), created=result[1], updated=result[2]))
1061 1171
1062 return ret 1172 return ret
1063 1173
1064 def getItemsCount(self, authorized_groups, unrestricted, ext_data=None): 1174 def getItemsCount(self, authorized_groups, unrestricted, ext_data=None):
1065 """Count expected number of items in a getItems query 1175 """Count expected number of items in a getItems query