comparison sat_pubsub/pgsql_storage.py @ 262:7b821432d012

fixed node auto-create feature
author souliane <souliane@mailoo.org>
date Fri, 06 Dec 2013 00:37:08 +0100
parents f0cd02c032b3
children 9dfd3890e646
comparison
equal deleted inserted replaced
261:65d4fed44edf 262:7b821432d012
50 OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 50 OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
51 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 51 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
52 52
53 """ 53 """
54 54
55 import copy 55 import copy, logging
56 56
57 from zope.interface import implements 57 from zope.interface import implements
58 58
59 from twisted.internet import defer 59 from twisted.internet import defer
60 from twisted.words.protocols.jabber import jid 60 from twisted.words.protocols.jabber import jid
61 61
62 from wokkel.generic import parseXml, stripNamespace 62 from wokkel.generic import parseXml, stripNamespace
63 from wokkel.pubsub import Subscription 63 from wokkel.pubsub import Subscription
64 64
65 from sat_pubsub import error, iidavoll, const 65 from sat_pubsub import error, iidavoll, const
66 import psycopg2
66 67
67 class Storage: 68 class Storage:
68 69
69 implements(iidavoll.IStorage) 70 implements(iidavoll.IStorage)
70 71
169 170
170 cursor.execute("""SELECT 1 from entities where jid=%s""", 171 cursor.execute("""SELECT 1 from entities where jid=%s""",
171 (owner,)) 172 (owner,))
172 173
173 if not cursor.fetchone(): 174 if not cursor.fetchone():
174 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", 175 # XXX: we can NOT rely on the previous query! Commit is needed now because
175 (owner,)) 176 # if the entry exists the next query will leave the database in a corrupted
177 # state: the solution is to rollback. I tried with other methods like
178 # "WHERE NOT EXISTS" but none of them worked, so the following solution
179 # looks like the sole - unless you have auto-commit on. More info
180 # about this issue: http://cssmay.com/question/tag/tag-psycopg2
181 cursor._connection.commit()
182 try:
183 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
184 (owner,))
185 except psycopg2.IntegrityError as e:
186 cursor._connection.rollback()
187 logging.warning("during node creation: %s" % e.message)
176 188
177 cursor.execute("""INSERT INTO affiliations 189 cursor.execute("""INSERT INTO affiliations
178 (node_id, entity_id, affiliation) 190 (node_id, entity_id, affiliation)
179 SELECT %s, entity_id, 'owner' FROM 191 SELECT %s, entity_id, 'owner' FROM
180 (SELECT entity_id FROM entities 192 (SELECT entity_id FROM entities
529 541
530 542
531 def _storeItem(self, cursor, item_datum, publisher): 543 def _storeItem(self, cursor, item_datum, publisher):
532 access_model, item_config, item = item_datum 544 access_model, item_config, item = item_datum
533 data = item.toXml() 545 data = item.toXml()
534 546
535 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s 547 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
536 FROM nodes 548 FROM nodes
537 WHERE nodes.node_id = items.node_id AND 549 WHERE nodes.node_id = items.node_id AND
538 nodes.node = %s and items.item=%s""", 550 nodes.node = %s and items.item=%s""",
539 (publisher.full(), 551 (publisher.full(),
609 else: 621 else:
610 query = ["""SELECT data FROM nodes 622 query = ["""SELECT data FROM nodes
611 INNER JOIN items USING (node_id) 623 INNER JOIN items USING (node_id)
612 LEFT JOIN item_groups_authorized USING (item_id) 624 LEFT JOIN item_groups_authorized USING (item_id)
613 WHERE node=%s AND 625 WHERE node=%s AND
614 (items.access_model='open' """ + 626 (items.access_model='open' """ +
615 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + 627 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
616 """) 628 """)
617 ORDER BY date DESC"""] 629 ORDER BY date DESC"""]
618 args = [self.nodeIdentifier] 630 args = [self.nodeIdentifier]
619 if authorized_groups: 631 if authorized_groups:
620 args.append(authorized_groups) 632 args.append(authorized_groups)
621 633
622 if maxItems: 634 if maxItems:
623 query.append("LIMIT %s") 635 query.append("LIMIT %s")
624 args.append(maxItems) 636 args.append(maxItems)
625 637
626 cursor.execute(' '.join(query), args) 638 cursor.execute(' '.join(query), args)
627 639
628 result = cursor.fetchall() 640 result = cursor.fetchall()
629 if unrestricted: 641 if unrestricted:
630 ret = [] 642 ret = []
632 item = stripNamespace(parseXml(data[0])) 644 item = stripNamespace(parseXml(data[0]))
633 access_model = data[1] 645 access_model = data[1]
634 item_id = data[2] 646 item_id = data[2]
635 if access_model == 'roster': #TODO: jid access_model 647 if access_model == 'roster': #TODO: jid access_model
636 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) 648 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
637 access_list = [r[0] for r in cursor.fetchall()] 649 access_list = [r[0] for r in cursor.fetchall()]
638 else: 650 else:
639 access_list = None 651 access_list = None
640 652
641 ret.append((item, access_model, access_list)) 653 ret.append((item, access_model, access_list))
642 return ret 654 return ret
670 item = stripNamespace(parseXml(data[0])) 682 item = stripNamespace(parseXml(data[0]))
671 access_model = data[1] 683 access_model = data[1]
672 item_id = data[2] 684 item_id = data[2]
673 if access_model == 'roster': #TODO: jid access_model 685 if access_model == 'roster': #TODO: jid access_model
674 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) 686 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
675 access_list = [r[0] for r in cursor.fetchall()] 687 access_list = [r[0] for r in cursor.fetchall()]
676 else: 688 else:
677 access_list = None 689 access_list = None
678 690
679 ret.append((item, access_model, access_list)) 691 ret.append((item, access_model, access_list))
680 else: #we check permission before returning items 692 else: #we check permission before returning items
684 args.append(authorized_groups) 696 args.append(authorized_groups)
685 cursor.execute("""SELECT data FROM nodes 697 cursor.execute("""SELECT data FROM nodes
686 INNER JOIN items USING (node_id) 698 INNER JOIN items USING (node_id)
687 LEFT JOIN item_groups_authorized USING (item_id) 699 LEFT JOIN item_groups_authorized USING (item_id)
688 WHERE node=%s AND item=%s AND 700 WHERE node=%s AND item=%s AND
689 (items.access_model='open' """ + 701 (items.access_model='open' """ +
690 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", 702 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")",
691 args) 703 args)
692 704
693 result = cursor.fetchone() 705 result = cursor.fetchone()
694 if result: 706 if result:
695 ret.append(parseXml(result[0])) 707 ret.append(parseXml(result[0]))
696 708
697 return ret 709 return ret
698 710
699 def purge(self): 711 def purge(self):
700 return self.dbpool.runInteraction(self._purge) 712 return self.dbpool.runInteraction(self._purge)
701 713