Mercurial > libervia-pubsub
comparison idavoll/pgsql_storage.py @ 227:8540825f85e0
Replaced unmaintained pyPgSQL by Psycopg 2
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 17 May 2012 00:31:36 +0200 |
parents | 274a45d2a5ab |
children |
comparison
equal
deleted
inserted
replaced
226:b7018ec56ee5 | 227:8540825f85e0 |
---|---|
49 row = cursor.fetchone() | 49 row = cursor.fetchone() |
50 | 50 |
51 if not row: | 51 if not row: |
52 raise error.NodeNotFound() | 52 raise error.NodeNotFound() |
53 | 53 |
54 if row.node_type == 'leaf': | 54 if row[0] == 'leaf': |
55 configuration = { | 55 configuration = { |
56 'pubsub#persist_items': row.persist_items, | 56 'pubsub#persist_items': row[1], |
57 'pubsub#deliver_payloads': row.deliver_payloads, | 57 'pubsub#deliver_payloads': row[2], |
58 'pubsub#send_last_published_item': | 58 'pubsub#send_last_published_item': |
59 row.send_last_published_item} | 59 row[3]} |
60 node = LeafNode(nodeIdentifier, configuration) | 60 node = LeafNode(nodeIdentifier, configuration) |
61 node.dbpool = self.dbpool | 61 node.dbpool = self.dbpool |
62 return node | 62 return node |
63 elif row.node_type == 'collection': | 63 elif row[0] == 'collection': |
64 configuration = { | 64 configuration = { |
65 'pubsub#deliver_payloads': row.deliver_payloads, | 65 'pubsub#deliver_payloads': row[2], |
66 'pubsub#send_last_published_item': | 66 'pubsub#send_last_published_item': |
67 row.send_last_published_item} | 67 row[3]} |
68 node = CollectionNode(nodeIdentifier, configuration) | 68 node = CollectionNode(nodeIdentifier, configuration) |
69 node.dbpool = self.dbpool | 69 node.dbpool = self.dbpool |
70 return node | 70 return node |
71 | 71 |
72 | 72 |
96 (nodeIdentifier, | 96 (nodeIdentifier, |
97 config['pubsub#persist_items'], | 97 config['pubsub#persist_items'], |
98 config['pubsub#deliver_payloads'], | 98 config['pubsub#deliver_payloads'], |
99 config['pubsub#send_last_published_item']) | 99 config['pubsub#send_last_published_item']) |
100 ) | 100 ) |
101 except cursor._pool.dbapi.OperationalError: | 101 except cursor._pool.dbapi.IntegrityError: |
102 raise error.NodeExists() | 102 raise error.NodeExists() |
103 | 103 |
104 cursor.execute("""SELECT 1 from entities where jid=%s""", | 104 cursor.execute("""SELECT 1 from entities where jid=%s""", |
105 (owner)) | 105 (owner,)) |
106 | 106 |
107 if not cursor.fetchone(): | 107 if not cursor.fetchone(): |
108 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 108 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
109 (owner)) | 109 (owner,)) |
110 | 110 |
111 cursor.execute("""INSERT INTO affiliations | 111 cursor.execute("""INSERT INTO affiliations |
112 (node_id, entity_id, affiliation) | 112 (node_id, entity_id, affiliation) |
113 SELECT node_id, entity_id, 'owner' FROM | 113 SELECT node_id, entity_id, 'owner' FROM |
114 (SELECT node_id FROM nodes WHERE node=%s) as n | 114 (SELECT node_id FROM nodes WHERE node=%s) as n |
142 | 142 |
143 def getSubscriptions(self, entity): | 143 def getSubscriptions(self, entity): |
144 def toSubscriptions(rows): | 144 def toSubscriptions(rows): |
145 subscriptions = [] | 145 subscriptions = [] |
146 for row in rows: | 146 for row in rows: |
147 subscriber = jid.internJID('%s/%s' % (row.jid, | 147 subscriber = jid.internJID('%s/%s' % (row[1], |
148 row.resource)) | 148 row[2])) |
149 subscription = Subscription(row.node, subscriber, row.state) | 149 subscription = Subscription(row[0], subscriber, row[3]) |
150 subscriptions.append(subscription) | 150 subscriptions.append(subscription) |
151 return subscriptions | 151 return subscriptions |
152 | 152 |
153 d = self.dbpool.runQuery("""SELECT node, jid, resource, state | 153 d = self.dbpool.runQuery("""SELECT node, jid, resource, state |
154 FROM entities | 154 FROM entities |
174 self._config = config | 174 self._config = config |
175 | 175 |
176 | 176 |
177 def _checkNodeExists(self, cursor): | 177 def _checkNodeExists(self, cursor): |
178 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", | 178 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", |
179 (self.nodeIdentifier)) | 179 (self.nodeIdentifier,)) |
180 if not cursor.fetchone(): | 180 if not cursor.fetchone(): |
181 raise error.NodeNotFound() | 181 raise error.NodeNotFound() |
182 | 182 |
183 | 183 |
184 def getType(self): | 184 def getType(self): |
261 resource)) | 261 resource)) |
262 row = cursor.fetchone() | 262 row = cursor.fetchone() |
263 if not row: | 263 if not row: |
264 return None | 264 return None |
265 else: | 265 else: |
266 return Subscription(self.nodeIdentifier, subscriber, row.state) | 266 return Subscription(self.nodeIdentifier, subscriber, row[0]) |
267 | 267 |
268 | 268 |
269 def getSubscriptions(self, state=None): | 269 def getSubscriptions(self, state=None): |
270 return self.dbpool.runInteraction(self._getSubscriptions, state) | 270 return self.dbpool.runInteraction(self._getSubscriptions, state) |
271 | 271 |
288 cursor.execute(query, values); | 288 cursor.execute(query, values); |
289 rows = cursor.fetchall() | 289 rows = cursor.fetchall() |
290 | 290 |
291 subscriptions = [] | 291 subscriptions = [] |
292 for row in rows: | 292 for row in rows: |
293 subscriber = jid.JID('%s/%s' % (row.jid, row.resource)) | 293 subscriber = jid.JID('%s/%s' % (row[0], row[1])) |
294 | 294 |
295 options = {} | 295 options = {} |
296 if row.subscription_type: | 296 if row[3]: |
297 options['pubsub#subscription_type'] = row.subscription_type; | 297 options['pubsub#subscription_type'] = row[3]; |
298 if row.subscription_depth: | 298 if row[4]: |
299 options['pubsub#subscription_depth'] = row.subscription_depth; | 299 options['pubsub#subscription_depth'] = row[4]; |
300 | 300 |
301 subscriptions.append(Subscription(self.nodeIdentifier, subscriber, | 301 subscriptions.append(Subscription(self.nodeIdentifier, subscriber, |
302 row.state, options)) | 302 row[2], options)) |
303 | 303 |
304 return subscriptions | 304 return subscriptions |
305 | 305 |
306 | 306 |
307 def addSubscription(self, subscriber, state, config): | 307 def addSubscription(self, subscriber, state, config): |
318 subscription_type = config.get('pubsub#subscription_type') | 318 subscription_type = config.get('pubsub#subscription_type') |
319 subscription_depth = config.get('pubsub#subscription_depth') | 319 subscription_depth = config.get('pubsub#subscription_depth') |
320 | 320 |
321 try: | 321 try: |
322 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 322 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
323 (userhost)) | 323 (userhost,)) |
324 except cursor._pool.dbapi.OperationalError: | 324 except cursor._pool.dbapi.IntegrityError: |
325 pass | 325 cursor._connection.rollback() |
326 | 326 |
327 try: | 327 try: |
328 cursor.execute("""INSERT INTO subscriptions | 328 cursor.execute("""INSERT INTO subscriptions |
329 (node_id, entity_id, resource, state, | 329 (node_id, entity_id, resource, state, |
330 subscription_type, subscription_depth) | 330 subscription_type, subscription_depth) |
338 state, | 338 state, |
339 subscription_type, | 339 subscription_type, |
340 subscription_depth, | 340 subscription_depth, |
341 self.nodeIdentifier, | 341 self.nodeIdentifier, |
342 userhost)) | 342 userhost)) |
343 except cursor._pool.dbapi.OperationalError: | 343 except cursor._pool.dbapi.IntegrityError: |
344 raise error.SubscriptionExists() | 344 raise error.SubscriptionExists() |
345 | 345 |
346 | 346 |
347 def removeSubscription(self, subscriber): | 347 def removeSubscription(self, subscriber): |
348 return self.dbpool.runInteraction(self._removeSubscription, | 348 return self.dbpool.runInteraction(self._removeSubscription, |
397 | 397 |
398 cursor.execute("""SELECT jid, affiliation FROM nodes | 398 cursor.execute("""SELECT jid, affiliation FROM nodes |
399 NATURAL JOIN affiliations | 399 NATURAL JOIN affiliations |
400 NATURAL JOIN entities | 400 NATURAL JOIN entities |
401 WHERE node=%s""", | 401 WHERE node=%s""", |
402 self.nodeIdentifier) | 402 (self.nodeIdentifier,)) |
403 result = cursor.fetchall() | 403 result = cursor.fetchall() |
404 | 404 |
405 return [(jid.internJID(r[0]), r[1]) for r in result] | 405 return [(jid.internJID(r[0]), r[1]) for r in result] |
406 | 406 |
407 | 407 |
479 if maxItems: | 479 if maxItems: |
480 cursor.execute(query + " LIMIT %s", | 480 cursor.execute(query + " LIMIT %s", |
481 (self.nodeIdentifier, | 481 (self.nodeIdentifier, |
482 maxItems)) | 482 maxItems)) |
483 else: | 483 else: |
484 cursor.execute(query, (self.nodeIdentifier)) | 484 cursor.execute(query, (self.nodeIdentifier,)) |
485 | 485 |
486 result = cursor.fetchall() | 486 result = cursor.fetchall() |
487 items = [stripNamespace(parseXml(r[0])) for r in result] | 487 items = [stripNamespace(parseXml(r[0])) for r in result] |
488 return items | 488 return items |
489 | 489 |