comparison sat_pubsub/pgsql_storage.py @ 276:b757c29b20d7

import changes from idavoll changeset 233 (24be3a11ddbc), by Ralph Meijer and based on a patch by Nuno Santos
author souliane <souliane@mailoo.org>
date Mon, 13 Oct 2014 15:59:25 +0200
parents 232002e132db
children 8a71486c3e95
comparison
equal deleted inserted replaced
275:9c74cd2635f6 276:b757c29b20d7
172 raise error.NodeExists() 172 raise error.NodeExists()
173 173
174 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,)); 174 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,));
175 node_id = cursor.fetchone()[0] 175 node_id = cursor.fetchone()[0]
176 176
177 cursor.execute("""SELECT 1 from entities where jid=%s""", 177 cursor.execute("""SELECT 1 as bool from entities where jid=%s""",
178 (owner,)) 178 (owner,))
179 179
180 if not cursor.fetchone(): 180 if not cursor.fetchone():
181 # XXX: we can NOT rely on the previous query! Commit is needed now because 181 # XXX: we can NOT rely on the previous query! Commit is needed now because
182 # if the entry exists the next query will leave the database in a corrupted 182 # if the entry exists the next query will leave the database in a corrupted
381 NATURAL JOIN entities 381 NATURAL JOIN entities
382 WHERE node=%s AND jid=%s AND resource=%s""", 382 WHERE node=%s AND jid=%s AND resource=%s""",
383 (self.nodeIdentifier, 383 (self.nodeIdentifier,
384 userhost, 384 userhost,
385 resource)) 385 resource))
386
386 row = cursor.fetchone() 387 row = cursor.fetchone()
387 if not row: 388 if not row:
388 return None 389 return None
389 else: 390 else:
390 return Subscription(self.nodeIdentifier, subscriber, row[0]) 391 return Subscription(self.nodeIdentifier, subscriber, row[0])
400 query = """SELECT jid, resource, state, 401 query = """SELECT jid, resource, state,
401 subscription_type, subscription_depth 402 subscription_type, subscription_depth
402 FROM subscriptions 403 FROM subscriptions
403 NATURAL JOIN nodes 404 NATURAL JOIN nodes
404 NATURAL JOIN entities 405 NATURAL JOIN entities
405 WHERE node=%s"""; 406 WHERE node=%s"""
406 values = [self.nodeIdentifier] 407 values = [self.nodeIdentifier]
407 408
408 if state: 409 if state:
409 query += " AND state=%s" 410 query += " AND state=%s"
410 values.append(state) 411 values.append(state)
411 412
412 cursor.execute(query, values); 413 cursor.execute(query, values)
413 rows = cursor.fetchall() 414 rows = cursor.fetchall()
414 415
415 subscriptions = [] 416 subscriptions = []
416 for row in rows: 417 for row in rows:
417 subscriber = jid.JID(u'%s/%s' % (row[0], row[1])) 418 subscriber = jid.JID(u'%s/%s' % (row[0], row[1]))
499 500
500 501
501 def _isSubscribed(self, cursor, entity): 502 def _isSubscribed(self, cursor, entity):
502 self._checkNodeExists(cursor) 503 self._checkNodeExists(cursor)
503 504
504 cursor.execute("""SELECT 1 FROM entities 505 cursor.execute("""SELECT 1 as bool FROM entities
505 NATURAL JOIN subscriptions 506 NATURAL JOIN subscriptions
506 NATURAL JOIN nodes 507 NATURAL JOIN nodes
507 WHERE entities.jid=%s 508 WHERE entities.jid=%s
508 AND node=%s AND state='subscribed'""", 509 AND node=%s AND state='subscribed'""",
509 (entity.userhost(), 510 (entity.userhost(),
759 """ 760 """
760 Count number of callbacks registered for a node. 761 Count number of callbacks registered for a node.
761 """ 762 """
762 cursor.execute("""SELECT count(*) FROM callbacks 763 cursor.execute("""SELECT count(*) FROM callbacks
763 WHERE service=%s and node=%s""", 764 WHERE service=%s and node=%s""",
764 service.full(), 765 (service.full(),
765 nodeIdentifier) 766 nodeIdentifier))
766 results = cursor.fetchall() 767 results = cursor.fetchall()
767 return results[0][0] 768 return results[0][0]
768 769
769 770
770 def addCallback(self, service, nodeIdentifier, callback): 771 def addCallback(self, service, nodeIdentifier, callback):
771 def interaction(cursor): 772 def interaction(cursor):
772 cursor.execute("""SELECT 1 FROM callbacks 773 cursor.execute("""SELECT 1 as bool FROM callbacks
773 WHERE service=%s and node=%s and uri=%s""", 774 WHERE service=%s and node=%s and uri=%s""",
774 service.full(), 775 (service.full(),
775 nodeIdentifier, 776 nodeIdentifier,
776 callback) 777 callback))
777 if cursor.fetchall(): 778 if cursor.fetchall():
778 return 779 return
779 780
780 cursor.execute("""INSERT INTO callbacks 781 cursor.execute("""INSERT INTO callbacks
781 (service, node, uri) VALUES 782 (service, node, uri) VALUES
782 (%s, %s, %s)""", 783 (%s, %s, %s)""",
783 service.full(), 784 (service.full(),
784 nodeIdentifier, 785 nodeIdentifier,
785 callback) 786 callback))
786 787
787 return self.dbpool.runInteraction(interaction) 788 return self.dbpool.runInteraction(interaction)
788 789
789 790
790 def removeCallback(self, service, nodeIdentifier, callback): 791 def removeCallback(self, service, nodeIdentifier, callback):
791 def interaction(cursor): 792 def interaction(cursor):
792 cursor.execute("""DELETE FROM callbacks 793 cursor.execute("""DELETE FROM callbacks
793 WHERE service=%s and node=%s and uri=%s""", 794 WHERE service=%s and node=%s and uri=%s""",
794 service.full(), 795 (service.full(),
795 nodeIdentifier, 796 nodeIdentifier,
796 callback) 797 callback))
797 798
798 if cursor.rowcount != 1: 799 if cursor.rowcount != 1:
799 raise error.NotSubscribed() 800 raise error.NotSubscribed()
800 801
801 last = not self._countCallbacks(cursor, service, nodeIdentifier) 802 last = not self._countCallbacks(cursor, service, nodeIdentifier)
805 806
806 def getCallbacks(self, service, nodeIdentifier): 807 def getCallbacks(self, service, nodeIdentifier):
807 def interaction(cursor): 808 def interaction(cursor):
808 cursor.execute("""SELECT uri FROM callbacks 809 cursor.execute("""SELECT uri FROM callbacks
809 WHERE service=%s and node=%s""", 810 WHERE service=%s and node=%s""",
810 service.full(), 811 (service.full(),
811 nodeIdentifier) 812 nodeIdentifier))
812 results = cursor.fetchall() 813 results = cursor.fetchall()
813 814
814 if not results: 815 if not results:
815 raise error.NoCallbacks() 816 raise error.NoCallbacks()
816 817