comparison idavoll/pgsql_storage.py @ 204:b4bf0a5ce50d

Implement storage facilities for the HTTP gateway. Author: ralphm. Fixes #12. One of the storage facilities is PostgreSQL based, providing persistence.
author Ralph Meijer <ralphm@ik.nu>
date Wed, 16 Jul 2008 06:38:32 +0000
parents 77c61e2b8c75
children 274a45d2a5ab
comparison
equal deleted inserted replaced
203:2c46e6664680 204:b4bf0a5ce50d
2 # See LICENSE for details. 2 # See LICENSE for details.
3 3
4 import copy 4 import copy
5 5
6 from zope.interface import implements 6 from zope.interface import implements
7
8 from twisted.enterprise import adbapi
9 from twisted.words.protocols.jabber import jid 7 from twisted.words.protocols.jabber import jid
10
11 from wokkel.generic import parseXml 8 from wokkel.generic import parseXml
12 9
13 from idavoll import error, iidavoll 10 from idavoll import error, iidavoll
14 11
15 class Storage: 12 class Storage:
16 13
17 implements(iidavoll.IStorage) 14 implements(iidavoll.IStorage)
18 15
19 def __init__(self, user, database, password=None, host=None, port=None): 16
20 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', 17 def __init__(self, dbpool):
21 user=user, 18 self.dbpool = dbpool
22 password=password,
23 database=database,
24 host=host,
25 port=port,
26 cp_reconnect=True,
27 client_encoding='utf-8'
28 )
29 19
30 20
31 def getNode(self, nodeIdentifier): 21 def getNode(self, nodeIdentifier):
32 return self._dbpool.runInteraction(self._getNode, nodeIdentifier) 22 return self.dbpool.runInteraction(self._getNode, nodeIdentifier)
33 23
34 24
35 def _getNode(self, cursor, nodeIdentifier): 25 def _getNode(self, cursor, nodeIdentifier):
36 configuration = {} 26 configuration = {}
37 cursor.execute("""SELECT persistent, deliver_payload, 27 cursor.execute("""SELECT persistent, deliver_payload,
46 cursor.fetchone() 36 cursor.fetchone()
47 except TypeError: 37 except TypeError:
48 raise error.NodeNotFound() 38 raise error.NodeNotFound()
49 else: 39 else:
50 node = LeafNode(nodeIdentifier, configuration) 40 node = LeafNode(nodeIdentifier, configuration)
51 node._dbpool = self._dbpool 41 node.dbpool = self.dbpool
52 return node 42 return node
53 43
54 44
55 def getNodeIds(self): 45 def getNodeIds(self):
56 d = self._dbpool.runQuery("""SELECT node from nodes""") 46 d = self.dbpool.runQuery("""SELECT node from nodes""")
57 d.addCallback(lambda results: [r[0] for r in results]) 47 d.addCallback(lambda results: [r[0] for r in results])
58 return d 48 return d
59 49
60 50
61 def createNode(self, nodeIdentifier, owner, config=None): 51 def createNode(self, nodeIdentifier, owner, config=None):
62 return self._dbpool.runInteraction(self._createNode, nodeIdentifier, 52 return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
63 owner) 53 owner)
64 54
65 55
66 def _createNode(self, cursor, nodeIdentifier, owner): 56 def _createNode(self, cursor, nodeIdentifier, owner):
67 owner = owner.userhost() 57 owner = owner.userhost()
86 (SELECT id FROM entities WHERE jid=%s) AS e""", 76 (SELECT id FROM entities WHERE jid=%s) AS e""",
87 (nodeIdentifier, owner)) 77 (nodeIdentifier, owner))
88 78
89 79
90 def deleteNode(self, nodeIdentifier): 80 def deleteNode(self, nodeIdentifier):
91 return self._dbpool.runInteraction(self._deleteNode, nodeIdentifier) 81 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier)
92 82
93 83
94 def _deleteNode(self, cursor, nodeIdentifier): 84 def _deleteNode(self, cursor, nodeIdentifier):
95 cursor.execute("""DELETE FROM nodes WHERE node=%s""", 85 cursor.execute("""DELETE FROM nodes WHERE node=%s""",
96 (nodeIdentifier,)) 86 (nodeIdentifier,))
98 if cursor.rowcount != 1: 88 if cursor.rowcount != 1:
99 raise error.NodeNotFound() 89 raise error.NodeNotFound()
100 90
101 91
102 def getAffiliations(self, entity): 92 def getAffiliations(self, entity):
103 d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities 93 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
104 JOIN affiliations ON 94 JOIN affiliations ON
105 (affiliations.entity_id=entities.id) 95 (affiliations.entity_id=entities.id)
106 JOIN nodes ON 96 JOIN nodes ON
107 (nodes.id=affiliations.node_id) 97 (nodes.id=affiliations.node_id)
108 WHERE jid=%s""", 98 WHERE jid=%s""",
110 d.addCallback(lambda results: [tuple(r) for r in results]) 100 d.addCallback(lambda results: [tuple(r) for r in results])
111 return d 101 return d
112 102
113 103
114 def getSubscriptions(self, entity): 104 def getSubscriptions(self, entity):
115 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription 105 d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription
116 FROM entities JOIN subscriptions ON 106 FROM entities JOIN subscriptions ON
117 (subscriptions.entity_id=entities.id) 107 (subscriptions.entity_id=entities.id)
118 JOIN nodes ON 108 JOIN nodes ON
119 (nodes.id=subscriptions.node_id) 109 (nodes.id=subscriptions.node_id)
120 WHERE jid=%s""", 110 WHERE jid=%s""",
160 150
161 for option in options: 151 for option in options:
162 if option in config: 152 if option in config:
163 config[option] = options[option] 153 config[option] = options[option]
164 154
165 d = self._dbpool.runInteraction(self._setConfiguration, config) 155 d = self.dbpool.runInteraction(self._setConfiguration, config)
166 d.addCallback(self._setCachedConfiguration, config) 156 d.addCallback(self._setCachedConfiguration, config)
167 return d 157 return d
168 158
169 159
170 def _setConfiguration(self, cursor, config): 160 def _setConfiguration(self, cursor, config):
187 config["pubsub#node_type"] = self.nodeType 177 config["pubsub#node_type"] = self.nodeType
188 return config 178 return config
189 179
190 180
191 def getAffiliation(self, entity): 181 def getAffiliation(self, entity):
192 return self._dbpool.runInteraction(self._getAffiliation, entity) 182 return self.dbpool.runInteraction(self._getAffiliation, entity)
193 183
194 184
195 def _getAffiliation(self, cursor, entity): 185 def _getAffiliation(self, cursor, entity):
196 self._checkNodeExists(cursor) 186 self._checkNodeExists(cursor)
197 cursor.execute("""SELECT affiliation FROM affiliations 187 cursor.execute("""SELECT affiliation FROM affiliations
206 except TypeError: 196 except TypeError:
207 return None 197 return None
208 198
209 199
210 def getSubscription(self, subscriber): 200 def getSubscription(self, subscriber):
211 return self._dbpool.runInteraction(self._getSubscription, subscriber) 201 return self.dbpool.runInteraction(self._getSubscription, subscriber)
212 202
213 203
214 def _getSubscription(self, cursor, subscriber): 204 def _getSubscription(self, cursor, subscriber):
215 self._checkNodeExists(cursor) 205 self._checkNodeExists(cursor)
216 206
230 except TypeError: 220 except TypeError:
231 return None 221 return None
232 222
233 223
234 def addSubscription(self, subscriber, state): 224 def addSubscription(self, subscriber, state):
235 return self._dbpool.runInteraction(self._addSubscription, subscriber, 225 return self.dbpool.runInteraction(self._addSubscription, subscriber,
236 state) 226 state)
237 227
238 228
239 def _addSubscription(self, cursor, subscriber, state): 229 def _addSubscription(self, cursor, subscriber, state):
240 self._checkNodeExists(cursor) 230 self._checkNodeExists(cursor)
262 except cursor._pool.dbapi.OperationalError: 252 except cursor._pool.dbapi.OperationalError:
263 raise error.SubscriptionExists() 253 raise error.SubscriptionExists()
264 254
265 255
266 def removeSubscription(self, subscriber): 256 def removeSubscription(self, subscriber):
267 return self._dbpool.runInteraction(self._removeSubscription, 257 return self.dbpool.runInteraction(self._removeSubscription,
268 subscriber) 258 subscriber)
269 259
270 260
271 def _removeSubscription(self, cursor, subscriber): 261 def _removeSubscription(self, cursor, subscriber):
272 self._checkNodeExists(cursor) 262 self._checkNodeExists(cursor)
286 276
287 return None 277 return None
288 278
289 279
290 def getSubscribers(self): 280 def getSubscribers(self):
291 d = self._dbpool.runInteraction(self._getSubscribers) 281 d = self.dbpool.runInteraction(self._getSubscribers)
292 d.addCallback(self._convertToJIDs) 282 d.addCallback(self._convertToJIDs)
293 return d 283 return d
294 284
295 285
296 def _getSubscribers(self, cursor): 286 def _getSubscribers(self, cursor):
307 def _convertToJIDs(self, list): 297 def _convertToJIDs(self, list):
308 return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] 298 return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list]
309 299
310 300
311 def isSubscribed(self, entity): 301 def isSubscribed(self, entity):
312 return self._dbpool.runInteraction(self._isSubscribed, entity) 302 return self.dbpool.runInteraction(self._isSubscribed, entity)
313 303
314 304
315 def _isSubscribed(self, cursor, entity): 305 def _isSubscribed(self, cursor, entity):
316 self._checkNodeExists(cursor) 306 self._checkNodeExists(cursor)
317 307
327 317
328 return cursor.fetchone() is not None 318 return cursor.fetchone() is not None
329 319
330 320
331 def getAffiliations(self): 321 def getAffiliations(self):
332 return self._dbpool.runInteraction(self._getAffiliations) 322 return self.dbpool.runInteraction(self._getAffiliations)
333 323
334 324
335 def _getAffiliations(self, cursor): 325 def _getAffiliations(self, cursor):
336 self._checkNodeExists(cursor) 326 self._checkNodeExists(cursor)
337 327
351 class LeafNodeMixin: 341 class LeafNodeMixin:
352 342
353 nodeType = 'leaf' 343 nodeType = 'leaf'
354 344
355 def storeItems(self, items, publisher): 345 def storeItems(self, items, publisher):
356 return self._dbpool.runInteraction(self._storeItems, items, publisher) 346 return self.dbpool.runInteraction(self._storeItems, items, publisher)
357 347
358 348
359 def _storeItems(self, cursor, items, publisher): 349 def _storeItems(self, cursor, items, publisher):
360 self._checkNodeExists(cursor) 350 self._checkNodeExists(cursor)
361 for item in items: 351 for item in items:
382 data, 372 data,
383 self.nodeIdentifier)) 373 self.nodeIdentifier))
384 374
385 375
386 def removeItems(self, itemIdentifiers): 376 def removeItems(self, itemIdentifiers):
387 return self._dbpool.runInteraction(self._removeItems, itemIdentifiers) 377 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers)
388 378
389 379
390 def _removeItems(self, cursor, itemIdentifiers): 380 def _removeItems(self, cursor, itemIdentifiers):
391 self._checkNodeExists(cursor) 381 self._checkNodeExists(cursor)
392 382
404 394
405 return deleted 395 return deleted
406 396
407 397
408 def getItems(self, maxItems=None): 398 def getItems(self, maxItems=None):
409 return self._dbpool.runInteraction(self._getItems, maxItems) 399 return self.dbpool.runInteraction(self._getItems, maxItems)
410 400
411 401
412 def _getItems(self, cursor, maxItems): 402 def _getItems(self, cursor, maxItems):
413 self._checkNodeExists(cursor) 403 self._checkNodeExists(cursor)
414 query = """SELECT data FROM nodes JOIN items ON 404 query = """SELECT data FROM nodes JOIN items ON
424 result = cursor.fetchall() 414 result = cursor.fetchall()
425 return [parseXml(r[0]) for r in result] 415 return [parseXml(r[0]) for r in result]
426 416
427 417
428 def getItemsById(self, itemIdentifiers): 418 def getItemsById(self, itemIdentifiers):
429 return self._dbpool.runInteraction(self._getItemsById, itemIdentifiers) 419 return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers)
430 420
431 421
432 def _getItemsById(self, cursor, itemIdentifiers): 422 def _getItemsById(self, cursor, itemIdentifiers):
433 self._checkNodeExists(cursor) 423 self._checkNodeExists(cursor)
434 items = [] 424 items = []
443 items.append(parseXml(result[0])) 433 items.append(parseXml(result[0]))
444 return items 434 return items
445 435
446 436
447 def purge(self): 437 def purge(self):
448 return self._dbpool.runInteraction(self._purge) 438 return self.dbpool.runInteraction(self._purge)
449 439
450 440
451 def _purge(self, cursor): 441 def _purge(self, cursor):
452 self._checkNodeExists(cursor) 442 self._checkNodeExists(cursor)
453 443
458 448
459 449
460 class LeafNode(Node, LeafNodeMixin): 450 class LeafNode(Node, LeafNodeMixin):
461 451
462 implements(iidavoll.ILeafNode) 452 implements(iidavoll.ILeafNode)
453
454
455
456 class GatewayStorage(object):
457 """
458 Memory based storage facility for the XMPP-HTTP gateway.
459 """
460
461 def __init__(self, dbpool):
462 self.dbpool = dbpool
463
464
465 def _countCallbacks(self, cursor, service, nodeIdentifier):
466 """
467 Count number of callbacks registered for a node.
468 """
469 cursor.execute("""SELECT count(*) FROM callbacks
470 WHERE service=%s and node=%s""",
471 service.full(),
472 nodeIdentifier)
473 results = cursor.fetchall()
474 return results[0][0]
475
476
477 def addCallback(self, service, nodeIdentifier, callback):
478 def interaction(cursor):
479 cursor.execute("""SELECT 1 FROM callbacks
480 WHERE service=%s and node=%s and uri=%s""",
481 service.full(),
482 nodeIdentifier,
483 callback)
484 if cursor.fetchall():
485 raise error.SubscriptionExists()
486
487 cursor.execute("""INSERT INTO callbacks
488 (service, node, uri) VALUES
489 (%s, %s, %s)""",
490 service.full(),
491 nodeIdentifier,
492 callback)
493
494 return self.dbpool.runInteraction(interaction)
495
496
497 def removeCallback(self, service, nodeIdentifier, callback):
498 def interaction(cursor):
499 cursor.execute("""DELETE FROM callbacks
500 WHERE service=%s and node=%s and uri=%s""",
501 service.full(),
502 nodeIdentifier,
503 callback)
504
505 if cursor.rowcount != 1:
506 raise error.NotSubscribed()
507
508 last = not self._countCallbacks(cursor, service, nodeIdentifier)
509 return last
510
511 return self.dbpool.runInteraction(interaction)
512
513 def getCallbacks(self, service, nodeIdentifier):
514 def interaction(cursor):
515 cursor.execute("""SELECT uri FROM callbacks
516 WHERE service=%s and node=%s""",
517 service.full(),
518 nodeIdentifier)
519 results = cursor.fetchall()
520
521 if not results:
522 raise error.NoCallbacks()
523
524 return [result[0] for result in results]
525
526 return self.dbpool.runInteraction(interaction)
527
528
529 def hasCallbacks(self, service, nodeIdentifier):
530 def interaction(cursor):
531 return bool(self._countCallbacks(cursor, service, nodeIdentifier))
532
533 return self.dbpool.runInteraction(interaction)