Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 262:7b821432d012
fixed node auto-create feature
author | souliane <souliane@mailoo.org> |
---|---|
date | Fri, 06 Dec 2013 00:37:08 +0100 |
parents | f0cd02c032b3 |
children | 9dfd3890e646 |
comparison
equal
deleted
inserted
replaced
261:65d4fed44edf | 262:7b821432d012 |
---|---|
50 OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION | 50 OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |
51 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | 51 WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
52 | 52 |
53 """ | 53 """ |
54 | 54 |
55 import copy | 55 import copy, logging |
56 | 56 |
57 from zope.interface import implements | 57 from zope.interface import implements |
58 | 58 |
59 from twisted.internet import defer | 59 from twisted.internet import defer |
60 from twisted.words.protocols.jabber import jid | 60 from twisted.words.protocols.jabber import jid |
61 | 61 |
62 from wokkel.generic import parseXml, stripNamespace | 62 from wokkel.generic import parseXml, stripNamespace |
63 from wokkel.pubsub import Subscription | 63 from wokkel.pubsub import Subscription |
64 | 64 |
65 from sat_pubsub import error, iidavoll, const | 65 from sat_pubsub import error, iidavoll, const |
66 import psycopg2 | |
66 | 67 |
67 class Storage: | 68 class Storage: |
68 | 69 |
69 implements(iidavoll.IStorage) | 70 implements(iidavoll.IStorage) |
70 | 71 |
169 | 170 |
170 cursor.execute("""SELECT 1 from entities where jid=%s""", | 171 cursor.execute("""SELECT 1 from entities where jid=%s""", |
171 (owner,)) | 172 (owner,)) |
172 | 173 |
173 if not cursor.fetchone(): | 174 if not cursor.fetchone(): |
174 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 175 # XXX: we can NOT rely on the previous query! Commit is needed now because |
175 (owner,)) | 176 # if the entry exists the next query will leave the database in a corrupted |
177 # state: the solution is to rollback. I tried with other methods like | |
178 # "WHERE NOT EXISTS" but none of them worked, so the following solution | |
179 # looks like the sole - unless you have auto-commit on. More info | |
180 # about this issue: http://cssmay.com/question/tag/tag-psycopg2 | |
181 cursor._connection.commit() | |
182 try: | |
183 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | |
184 (owner,)) | |
185 except psycopg2.IntegrityError as e: | |
186 cursor._connection.rollback() | |
187 logging.warning("during node creation: %s" % e.message) | |
176 | 188 |
177 cursor.execute("""INSERT INTO affiliations | 189 cursor.execute("""INSERT INTO affiliations |
178 (node_id, entity_id, affiliation) | 190 (node_id, entity_id, affiliation) |
179 SELECT %s, entity_id, 'owner' FROM | 191 SELECT %s, entity_id, 'owner' FROM |
180 (SELECT entity_id FROM entities | 192 (SELECT entity_id FROM entities |
529 | 541 |
530 | 542 |
531 def _storeItem(self, cursor, item_datum, publisher): | 543 def _storeItem(self, cursor, item_datum, publisher): |
532 access_model, item_config, item = item_datum | 544 access_model, item_config, item = item_datum |
533 data = item.toXml() | 545 data = item.toXml() |
534 | 546 |
535 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s | 547 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s |
536 FROM nodes | 548 FROM nodes |
537 WHERE nodes.node_id = items.node_id AND | 549 WHERE nodes.node_id = items.node_id AND |
538 nodes.node = %s and items.item=%s""", | 550 nodes.node = %s and items.item=%s""", |
539 (publisher.full(), | 551 (publisher.full(), |
609 else: | 621 else: |
610 query = ["""SELECT data FROM nodes | 622 query = ["""SELECT data FROM nodes |
611 INNER JOIN items USING (node_id) | 623 INNER JOIN items USING (node_id) |
612 LEFT JOIN item_groups_authorized USING (item_id) | 624 LEFT JOIN item_groups_authorized USING (item_id) |
613 WHERE node=%s AND | 625 WHERE node=%s AND |
614 (items.access_model='open' """ + | 626 (items.access_model='open' """ + |
615 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + | 627 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + |
616 """) | 628 """) |
617 ORDER BY date DESC"""] | 629 ORDER BY date DESC"""] |
618 args = [self.nodeIdentifier] | 630 args = [self.nodeIdentifier] |
619 if authorized_groups: | 631 if authorized_groups: |
620 args.append(authorized_groups) | 632 args.append(authorized_groups) |
621 | 633 |
622 if maxItems: | 634 if maxItems: |
623 query.append("LIMIT %s") | 635 query.append("LIMIT %s") |
624 args.append(maxItems) | 636 args.append(maxItems) |
625 | 637 |
626 cursor.execute(' '.join(query), args) | 638 cursor.execute(' '.join(query), args) |
627 | 639 |
628 result = cursor.fetchall() | 640 result = cursor.fetchall() |
629 if unrestricted: | 641 if unrestricted: |
630 ret = [] | 642 ret = [] |
632 item = stripNamespace(parseXml(data[0])) | 644 item = stripNamespace(parseXml(data[0])) |
633 access_model = data[1] | 645 access_model = data[1] |
634 item_id = data[2] | 646 item_id = data[2] |
635 if access_model == 'roster': #TODO: jid access_model | 647 if access_model == 'roster': #TODO: jid access_model |
636 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | 648 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) |
637 access_list = [r[0] for r in cursor.fetchall()] | 649 access_list = [r[0] for r in cursor.fetchall()] |
638 else: | 650 else: |
639 access_list = None | 651 access_list = None |
640 | 652 |
641 ret.append((item, access_model, access_list)) | 653 ret.append((item, access_model, access_list)) |
642 return ret | 654 return ret |
670 item = stripNamespace(parseXml(data[0])) | 682 item = stripNamespace(parseXml(data[0])) |
671 access_model = data[1] | 683 access_model = data[1] |
672 item_id = data[2] | 684 item_id = data[2] |
673 if access_model == 'roster': #TODO: jid access_model | 685 if access_model == 'roster': #TODO: jid access_model |
674 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | 686 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) |
675 access_list = [r[0] for r in cursor.fetchall()] | 687 access_list = [r[0] for r in cursor.fetchall()] |
676 else: | 688 else: |
677 access_list = None | 689 access_list = None |
678 | 690 |
679 ret.append((item, access_model, access_list)) | 691 ret.append((item, access_model, access_list)) |
680 else: #we check permission before returning items | 692 else: #we check permission before returning items |
684 args.append(authorized_groups) | 696 args.append(authorized_groups) |
685 cursor.execute("""SELECT data FROM nodes | 697 cursor.execute("""SELECT data FROM nodes |
686 INNER JOIN items USING (node_id) | 698 INNER JOIN items USING (node_id) |
687 LEFT JOIN item_groups_authorized USING (item_id) | 699 LEFT JOIN item_groups_authorized USING (item_id) |
688 WHERE node=%s AND item=%s AND | 700 WHERE node=%s AND item=%s AND |
689 (items.access_model='open' """ + | 701 (items.access_model='open' """ + |
690 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", | 702 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", |
691 args) | 703 args) |
692 | 704 |
693 result = cursor.fetchone() | 705 result = cursor.fetchone() |
694 if result: | 706 if result: |
695 ret.append(parseXml(result[0])) | 707 ret.append(parseXml(result[0])) |
696 | 708 |
697 return ret | 709 return ret |
698 | 710 |
699 def purge(self): | 711 def purge(self): |
700 return self.dbpool.runInteraction(self._purge) | 712 return self.dbpool.runInteraction(self._purge) |
701 | 713 |