Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 276:b757c29b20d7
import changes from idavoll changeset 233 (24be3a11ddbc), by Ralph Meijer and based on a patch by Nuno Santos
author | souliane <souliane@mailoo.org> |
---|---|
date | Mon, 13 Oct 2014 15:59:25 +0200 |
parents | 232002e132db |
children | 8a71486c3e95 |
comparison
equal
deleted
inserted
replaced
275:9c74cd2635f6 | 276:b757c29b20d7 |
---|---|
172 raise error.NodeExists() | 172 raise error.NodeExists() |
173 | 173 |
174 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,)); | 174 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,)); |
175 node_id = cursor.fetchone()[0] | 175 node_id = cursor.fetchone()[0] |
176 | 176 |
177 cursor.execute("""SELECT 1 from entities where jid=%s""", | 177 cursor.execute("""SELECT 1 as bool from entities where jid=%s""", |
178 (owner,)) | 178 (owner,)) |
179 | 179 |
180 if not cursor.fetchone(): | 180 if not cursor.fetchone(): |
181 # XXX: we can NOT rely on the previous query! Commit is needed now because | 181 # XXX: we can NOT rely on the previous query! Commit is needed now because |
182 # if the entry exists the next query will leave the database in a corrupted | 182 # if the entry exists the next query will leave the database in a corrupted |
381 NATURAL JOIN entities | 381 NATURAL JOIN entities |
382 WHERE node=%s AND jid=%s AND resource=%s""", | 382 WHERE node=%s AND jid=%s AND resource=%s""", |
383 (self.nodeIdentifier, | 383 (self.nodeIdentifier, |
384 userhost, | 384 userhost, |
385 resource)) | 385 resource)) |
386 | |
386 row = cursor.fetchone() | 387 row = cursor.fetchone() |
387 if not row: | 388 if not row: |
388 return None | 389 return None |
389 else: | 390 else: |
390 return Subscription(self.nodeIdentifier, subscriber, row[0]) | 391 return Subscription(self.nodeIdentifier, subscriber, row[0]) |
400 query = """SELECT jid, resource, state, | 401 query = """SELECT jid, resource, state, |
401 subscription_type, subscription_depth | 402 subscription_type, subscription_depth |
402 FROM subscriptions | 403 FROM subscriptions |
403 NATURAL JOIN nodes | 404 NATURAL JOIN nodes |
404 NATURAL JOIN entities | 405 NATURAL JOIN entities |
405 WHERE node=%s"""; | 406 WHERE node=%s""" |
406 values = [self.nodeIdentifier] | 407 values = [self.nodeIdentifier] |
407 | 408 |
408 if state: | 409 if state: |
409 query += " AND state=%s" | 410 query += " AND state=%s" |
410 values.append(state) | 411 values.append(state) |
411 | 412 |
412 cursor.execute(query, values); | 413 cursor.execute(query, values) |
413 rows = cursor.fetchall() | 414 rows = cursor.fetchall() |
414 | 415 |
415 subscriptions = [] | 416 subscriptions = [] |
416 for row in rows: | 417 for row in rows: |
417 subscriber = jid.JID(u'%s/%s' % (row[0], row[1])) | 418 subscriber = jid.JID(u'%s/%s' % (row[0], row[1])) |
499 | 500 |
500 | 501 |
501 def _isSubscribed(self, cursor, entity): | 502 def _isSubscribed(self, cursor, entity): |
502 self._checkNodeExists(cursor) | 503 self._checkNodeExists(cursor) |
503 | 504 |
504 cursor.execute("""SELECT 1 FROM entities | 505 cursor.execute("""SELECT 1 as bool FROM entities |
505 NATURAL JOIN subscriptions | 506 NATURAL JOIN subscriptions |
506 NATURAL JOIN nodes | 507 NATURAL JOIN nodes |
507 WHERE entities.jid=%s | 508 WHERE entities.jid=%s |
508 AND node=%s AND state='subscribed'""", | 509 AND node=%s AND state='subscribed'""", |
509 (entity.userhost(), | 510 (entity.userhost(), |
759 """ | 760 """ |
760 Count number of callbacks registered for a node. | 761 Count number of callbacks registered for a node. |
761 """ | 762 """ |
762 cursor.execute("""SELECT count(*) FROM callbacks | 763 cursor.execute("""SELECT count(*) FROM callbacks |
763 WHERE service=%s and node=%s""", | 764 WHERE service=%s and node=%s""", |
764 service.full(), | 765 (service.full(), |
765 nodeIdentifier) | 766 nodeIdentifier)) |
766 results = cursor.fetchall() | 767 results = cursor.fetchall() |
767 return results[0][0] | 768 return results[0][0] |
768 | 769 |
769 | 770 |
770 def addCallback(self, service, nodeIdentifier, callback): | 771 def addCallback(self, service, nodeIdentifier, callback): |
771 def interaction(cursor): | 772 def interaction(cursor): |
772 cursor.execute("""SELECT 1 FROM callbacks | 773 cursor.execute("""SELECT 1 as bool FROM callbacks |
773 WHERE service=%s and node=%s and uri=%s""", | 774 WHERE service=%s and node=%s and uri=%s""", |
774 service.full(), | 775 (service.full(), |
775 nodeIdentifier, | 776 nodeIdentifier, |
776 callback) | 777 callback)) |
777 if cursor.fetchall(): | 778 if cursor.fetchall(): |
778 return | 779 return |
779 | 780 |
780 cursor.execute("""INSERT INTO callbacks | 781 cursor.execute("""INSERT INTO callbacks |
781 (service, node, uri) VALUES | 782 (service, node, uri) VALUES |
782 (%s, %s, %s)""", | 783 (%s, %s, %s)""", |
783 service.full(), | 784 (service.full(), |
784 nodeIdentifier, | 785 nodeIdentifier, |
785 callback) | 786 callback)) |
786 | 787 |
787 return self.dbpool.runInteraction(interaction) | 788 return self.dbpool.runInteraction(interaction) |
788 | 789 |
789 | 790 |
790 def removeCallback(self, service, nodeIdentifier, callback): | 791 def removeCallback(self, service, nodeIdentifier, callback): |
791 def interaction(cursor): | 792 def interaction(cursor): |
792 cursor.execute("""DELETE FROM callbacks | 793 cursor.execute("""DELETE FROM callbacks |
793 WHERE service=%s and node=%s and uri=%s""", | 794 WHERE service=%s and node=%s and uri=%s""", |
794 service.full(), | 795 (service.full(), |
795 nodeIdentifier, | 796 nodeIdentifier, |
796 callback) | 797 callback)) |
797 | 798 |
798 if cursor.rowcount != 1: | 799 if cursor.rowcount != 1: |
799 raise error.NotSubscribed() | 800 raise error.NotSubscribed() |
800 | 801 |
801 last = not self._countCallbacks(cursor, service, nodeIdentifier) | 802 last = not self._countCallbacks(cursor, service, nodeIdentifier) |
805 | 806 |
806 def getCallbacks(self, service, nodeIdentifier): | 807 def getCallbacks(self, service, nodeIdentifier): |
807 def interaction(cursor): | 808 def interaction(cursor): |
808 cursor.execute("""SELECT uri FROM callbacks | 809 cursor.execute("""SELECT uri FROM callbacks |
809 WHERE service=%s and node=%s""", | 810 WHERE service=%s and node=%s""", |
810 service.full(), | 811 (service.full(), |
811 nodeIdentifier) | 812 nodeIdentifier)) |
812 results = cursor.fetchall() | 813 results = cursor.fetchall() |
813 | 814 |
814 if not results: | 815 if not results: |
815 raise error.NoCallbacks() | 816 raise error.NoCallbacks() |
816 | 817 |