Mercurial > libervia-pubsub
comparison idavoll/pgsql_storage.py @ 204:b4bf0a5ce50d
Implement storage facilities for the HTTP gateway.
Author: ralphm.
Fixes #12.
One of the storage facilities is PostgreSQL based, providing persistence.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Wed, 16 Jul 2008 06:38:32 +0000 |
parents | 77c61e2b8c75 |
children | 274a45d2a5ab |
comparison
equal
deleted
inserted
replaced
203:2c46e6664680 | 204:b4bf0a5ce50d |
---|---|
2 # See LICENSE for details. | 2 # See LICENSE for details. |
3 | 3 |
4 import copy | 4 import copy |
5 | 5 |
6 from zope.interface import implements | 6 from zope.interface import implements |
7 | |
8 from twisted.enterprise import adbapi | |
9 from twisted.words.protocols.jabber import jid | 7 from twisted.words.protocols.jabber import jid |
10 | |
11 from wokkel.generic import parseXml | 8 from wokkel.generic import parseXml |
12 | 9 |
13 from idavoll import error, iidavoll | 10 from idavoll import error, iidavoll |
14 | 11 |
15 class Storage: | 12 class Storage: |
16 | 13 |
17 implements(iidavoll.IStorage) | 14 implements(iidavoll.IStorage) |
18 | 15 |
19 def __init__(self, user, database, password=None, host=None, port=None): | 16 |
20 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', | 17 def __init__(self, dbpool): |
21 user=user, | 18 self.dbpool = dbpool |
22 password=password, | |
23 database=database, | |
24 host=host, | |
25 port=port, | |
26 cp_reconnect=True, | |
27 client_encoding='utf-8' | |
28 ) | |
29 | 19 |
30 | 20 |
31 def getNode(self, nodeIdentifier): | 21 def getNode(self, nodeIdentifier): |
32 return self._dbpool.runInteraction(self._getNode, nodeIdentifier) | 22 return self.dbpool.runInteraction(self._getNode, nodeIdentifier) |
33 | 23 |
34 | 24 |
35 def _getNode(self, cursor, nodeIdentifier): | 25 def _getNode(self, cursor, nodeIdentifier): |
36 configuration = {} | 26 configuration = {} |
37 cursor.execute("""SELECT persistent, deliver_payload, | 27 cursor.execute("""SELECT persistent, deliver_payload, |
46 cursor.fetchone() | 36 cursor.fetchone() |
47 except TypeError: | 37 except TypeError: |
48 raise error.NodeNotFound() | 38 raise error.NodeNotFound() |
49 else: | 39 else: |
50 node = LeafNode(nodeIdentifier, configuration) | 40 node = LeafNode(nodeIdentifier, configuration) |
51 node._dbpool = self._dbpool | 41 node.dbpool = self.dbpool |
52 return node | 42 return node |
53 | 43 |
54 | 44 |
55 def getNodeIds(self): | 45 def getNodeIds(self): |
56 d = self._dbpool.runQuery("""SELECT node from nodes""") | 46 d = self.dbpool.runQuery("""SELECT node from nodes""") |
57 d.addCallback(lambda results: [r[0] for r in results]) | 47 d.addCallback(lambda results: [r[0] for r in results]) |
58 return d | 48 return d |
59 | 49 |
60 | 50 |
61 def createNode(self, nodeIdentifier, owner, config=None): | 51 def createNode(self, nodeIdentifier, owner, config=None): |
62 return self._dbpool.runInteraction(self._createNode, nodeIdentifier, | 52 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, |
63 owner) | 53 owner) |
64 | 54 |
65 | 55 |
66 def _createNode(self, cursor, nodeIdentifier, owner): | 56 def _createNode(self, cursor, nodeIdentifier, owner): |
67 owner = owner.userhost() | 57 owner = owner.userhost() |
86 (SELECT id FROM entities WHERE jid=%s) AS e""", | 76 (SELECT id FROM entities WHERE jid=%s) AS e""", |
87 (nodeIdentifier, owner)) | 77 (nodeIdentifier, owner)) |
88 | 78 |
89 | 79 |
90 def deleteNode(self, nodeIdentifier): | 80 def deleteNode(self, nodeIdentifier): |
91 return self._dbpool.runInteraction(self._deleteNode, nodeIdentifier) | 81 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) |
92 | 82 |
93 | 83 |
94 def _deleteNode(self, cursor, nodeIdentifier): | 84 def _deleteNode(self, cursor, nodeIdentifier): |
95 cursor.execute("""DELETE FROM nodes WHERE node=%s""", | 85 cursor.execute("""DELETE FROM nodes WHERE node=%s""", |
96 (nodeIdentifier,)) | 86 (nodeIdentifier,)) |
98 if cursor.rowcount != 1: | 88 if cursor.rowcount != 1: |
99 raise error.NodeNotFound() | 89 raise error.NodeNotFound() |
100 | 90 |
101 | 91 |
102 def getAffiliations(self, entity): | 92 def getAffiliations(self, entity): |
103 d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities | 93 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities |
104 JOIN affiliations ON | 94 JOIN affiliations ON |
105 (affiliations.entity_id=entities.id) | 95 (affiliations.entity_id=entities.id) |
106 JOIN nodes ON | 96 JOIN nodes ON |
107 (nodes.id=affiliations.node_id) | 97 (nodes.id=affiliations.node_id) |
108 WHERE jid=%s""", | 98 WHERE jid=%s""", |
110 d.addCallback(lambda results: [tuple(r) for r in results]) | 100 d.addCallback(lambda results: [tuple(r) for r in results]) |
111 return d | 101 return d |
112 | 102 |
113 | 103 |
114 def getSubscriptions(self, entity): | 104 def getSubscriptions(self, entity): |
115 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription | 105 d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription |
116 FROM entities JOIN subscriptions ON | 106 FROM entities JOIN subscriptions ON |
117 (subscriptions.entity_id=entities.id) | 107 (subscriptions.entity_id=entities.id) |
118 JOIN nodes ON | 108 JOIN nodes ON |
119 (nodes.id=subscriptions.node_id) | 109 (nodes.id=subscriptions.node_id) |
120 WHERE jid=%s""", | 110 WHERE jid=%s""", |
160 | 150 |
161 for option in options: | 151 for option in options: |
162 if option in config: | 152 if option in config: |
163 config[option] = options[option] | 153 config[option] = options[option] |
164 | 154 |
165 d = self._dbpool.runInteraction(self._setConfiguration, config) | 155 d = self.dbpool.runInteraction(self._setConfiguration, config) |
166 d.addCallback(self._setCachedConfiguration, config) | 156 d.addCallback(self._setCachedConfiguration, config) |
167 return d | 157 return d |
168 | 158 |
169 | 159 |
170 def _setConfiguration(self, cursor, config): | 160 def _setConfiguration(self, cursor, config): |
187 config["pubsub#node_type"] = self.nodeType | 177 config["pubsub#node_type"] = self.nodeType |
188 return config | 178 return config |
189 | 179 |
190 | 180 |
191 def getAffiliation(self, entity): | 181 def getAffiliation(self, entity): |
192 return self._dbpool.runInteraction(self._getAffiliation, entity) | 182 return self.dbpool.runInteraction(self._getAffiliation, entity) |
193 | 183 |
194 | 184 |
195 def _getAffiliation(self, cursor, entity): | 185 def _getAffiliation(self, cursor, entity): |
196 self._checkNodeExists(cursor) | 186 self._checkNodeExists(cursor) |
197 cursor.execute("""SELECT affiliation FROM affiliations | 187 cursor.execute("""SELECT affiliation FROM affiliations |
206 except TypeError: | 196 except TypeError: |
207 return None | 197 return None |
208 | 198 |
209 | 199 |
210 def getSubscription(self, subscriber): | 200 def getSubscription(self, subscriber): |
211 return self._dbpool.runInteraction(self._getSubscription, subscriber) | 201 return self.dbpool.runInteraction(self._getSubscription, subscriber) |
212 | 202 |
213 | 203 |
214 def _getSubscription(self, cursor, subscriber): | 204 def _getSubscription(self, cursor, subscriber): |
215 self._checkNodeExists(cursor) | 205 self._checkNodeExists(cursor) |
216 | 206 |
230 except TypeError: | 220 except TypeError: |
231 return None | 221 return None |
232 | 222 |
233 | 223 |
234 def addSubscription(self, subscriber, state): | 224 def addSubscription(self, subscriber, state): |
235 return self._dbpool.runInteraction(self._addSubscription, subscriber, | 225 return self.dbpool.runInteraction(self._addSubscription, subscriber, |
236 state) | 226 state) |
237 | 227 |
238 | 228 |
239 def _addSubscription(self, cursor, subscriber, state): | 229 def _addSubscription(self, cursor, subscriber, state): |
240 self._checkNodeExists(cursor) | 230 self._checkNodeExists(cursor) |
262 except cursor._pool.dbapi.OperationalError: | 252 except cursor._pool.dbapi.OperationalError: |
263 raise error.SubscriptionExists() | 253 raise error.SubscriptionExists() |
264 | 254 |
265 | 255 |
266 def removeSubscription(self, subscriber): | 256 def removeSubscription(self, subscriber): |
267 return self._dbpool.runInteraction(self._removeSubscription, | 257 return self.dbpool.runInteraction(self._removeSubscription, |
268 subscriber) | 258 subscriber) |
269 | 259 |
270 | 260 |
271 def _removeSubscription(self, cursor, subscriber): | 261 def _removeSubscription(self, cursor, subscriber): |
272 self._checkNodeExists(cursor) | 262 self._checkNodeExists(cursor) |
286 | 276 |
287 return None | 277 return None |
288 | 278 |
289 | 279 |
290 def getSubscribers(self): | 280 def getSubscribers(self): |
291 d = self._dbpool.runInteraction(self._getSubscribers) | 281 d = self.dbpool.runInteraction(self._getSubscribers) |
292 d.addCallback(self._convertToJIDs) | 282 d.addCallback(self._convertToJIDs) |
293 return d | 283 return d |
294 | 284 |
295 | 285 |
296 def _getSubscribers(self, cursor): | 286 def _getSubscribers(self, cursor): |
307 def _convertToJIDs(self, list): | 297 def _convertToJIDs(self, list): |
308 return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] | 298 return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] |
309 | 299 |
310 | 300 |
311 def isSubscribed(self, entity): | 301 def isSubscribed(self, entity): |
312 return self._dbpool.runInteraction(self._isSubscribed, entity) | 302 return self.dbpool.runInteraction(self._isSubscribed, entity) |
313 | 303 |
314 | 304 |
315 def _isSubscribed(self, cursor, entity): | 305 def _isSubscribed(self, cursor, entity): |
316 self._checkNodeExists(cursor) | 306 self._checkNodeExists(cursor) |
317 | 307 |
327 | 317 |
328 return cursor.fetchone() is not None | 318 return cursor.fetchone() is not None |
329 | 319 |
330 | 320 |
331 def getAffiliations(self): | 321 def getAffiliations(self): |
332 return self._dbpool.runInteraction(self._getAffiliations) | 322 return self.dbpool.runInteraction(self._getAffiliations) |
333 | 323 |
334 | 324 |
335 def _getAffiliations(self, cursor): | 325 def _getAffiliations(self, cursor): |
336 self._checkNodeExists(cursor) | 326 self._checkNodeExists(cursor) |
337 | 327 |
351 class LeafNodeMixin: | 341 class LeafNodeMixin: |
352 | 342 |
353 nodeType = 'leaf' | 343 nodeType = 'leaf' |
354 | 344 |
355 def storeItems(self, items, publisher): | 345 def storeItems(self, items, publisher): |
356 return self._dbpool.runInteraction(self._storeItems, items, publisher) | 346 return self.dbpool.runInteraction(self._storeItems, items, publisher) |
357 | 347 |
358 | 348 |
359 def _storeItems(self, cursor, items, publisher): | 349 def _storeItems(self, cursor, items, publisher): |
360 self._checkNodeExists(cursor) | 350 self._checkNodeExists(cursor) |
361 for item in items: | 351 for item in items: |
382 data, | 372 data, |
383 self.nodeIdentifier)) | 373 self.nodeIdentifier)) |
384 | 374 |
385 | 375 |
386 def removeItems(self, itemIdentifiers): | 376 def removeItems(self, itemIdentifiers): |
387 return self._dbpool.runInteraction(self._removeItems, itemIdentifiers) | 377 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) |
388 | 378 |
389 | 379 |
390 def _removeItems(self, cursor, itemIdentifiers): | 380 def _removeItems(self, cursor, itemIdentifiers): |
391 self._checkNodeExists(cursor) | 381 self._checkNodeExists(cursor) |
392 | 382 |
404 | 394 |
405 return deleted | 395 return deleted |
406 | 396 |
407 | 397 |
408 def getItems(self, maxItems=None): | 398 def getItems(self, maxItems=None): |
409 return self._dbpool.runInteraction(self._getItems, maxItems) | 399 return self.dbpool.runInteraction(self._getItems, maxItems) |
410 | 400 |
411 | 401 |
412 def _getItems(self, cursor, maxItems): | 402 def _getItems(self, cursor, maxItems): |
413 self._checkNodeExists(cursor) | 403 self._checkNodeExists(cursor) |
414 query = """SELECT data FROM nodes JOIN items ON | 404 query = """SELECT data FROM nodes JOIN items ON |
424 result = cursor.fetchall() | 414 result = cursor.fetchall() |
425 return [parseXml(r[0]) for r in result] | 415 return [parseXml(r[0]) for r in result] |
426 | 416 |
427 | 417 |
428 def getItemsById(self, itemIdentifiers): | 418 def getItemsById(self, itemIdentifiers): |
429 return self._dbpool.runInteraction(self._getItemsById, itemIdentifiers) | 419 return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers) |
430 | 420 |
431 | 421 |
432 def _getItemsById(self, cursor, itemIdentifiers): | 422 def _getItemsById(self, cursor, itemIdentifiers): |
433 self._checkNodeExists(cursor) | 423 self._checkNodeExists(cursor) |
434 items = [] | 424 items = [] |
443 items.append(parseXml(result[0])) | 433 items.append(parseXml(result[0])) |
444 return items | 434 return items |
445 | 435 |
446 | 436 |
447 def purge(self): | 437 def purge(self): |
448 return self._dbpool.runInteraction(self._purge) | 438 return self.dbpool.runInteraction(self._purge) |
449 | 439 |
450 | 440 |
451 def _purge(self, cursor): | 441 def _purge(self, cursor): |
452 self._checkNodeExists(cursor) | 442 self._checkNodeExists(cursor) |
453 | 443 |
458 | 448 |
459 | 449 |
460 class LeafNode(Node, LeafNodeMixin): | 450 class LeafNode(Node, LeafNodeMixin): |
461 | 451 |
462 implements(iidavoll.ILeafNode) | 452 implements(iidavoll.ILeafNode) |
453 | |
454 | |
455 | |
456 class GatewayStorage(object): | |
457 """ | |
458 Memory based storage facility for the XMPP-HTTP gateway. | |
459 """ | |
460 | |
461 def __init__(self, dbpool): | |
462 self.dbpool = dbpool | |
463 | |
464 | |
465 def _countCallbacks(self, cursor, service, nodeIdentifier): | |
466 """ | |
467 Count number of callbacks registered for a node. | |
468 """ | |
469 cursor.execute("""SELECT count(*) FROM callbacks | |
470 WHERE service=%s and node=%s""", | |
471 service.full(), | |
472 nodeIdentifier) | |
473 results = cursor.fetchall() | |
474 return results[0][0] | |
475 | |
476 | |
477 def addCallback(self, service, nodeIdentifier, callback): | |
478 def interaction(cursor): | |
479 cursor.execute("""SELECT 1 FROM callbacks | |
480 WHERE service=%s and node=%s and uri=%s""", | |
481 service.full(), | |
482 nodeIdentifier, | |
483 callback) | |
484 if cursor.fetchall(): | |
485 raise error.SubscriptionExists() | |
486 | |
487 cursor.execute("""INSERT INTO callbacks | |
488 (service, node, uri) VALUES | |
489 (%s, %s, %s)""", | |
490 service.full(), | |
491 nodeIdentifier, | |
492 callback) | |
493 | |
494 return self.dbpool.runInteraction(interaction) | |
495 | |
496 | |
497 def removeCallback(self, service, nodeIdentifier, callback): | |
498 def interaction(cursor): | |
499 cursor.execute("""DELETE FROM callbacks | |
500 WHERE service=%s and node=%s and uri=%s""", | |
501 service.full(), | |
502 nodeIdentifier, | |
503 callback) | |
504 | |
505 if cursor.rowcount != 1: | |
506 raise error.NotSubscribed() | |
507 | |
508 last = not self._countCallbacks(cursor, service, nodeIdentifier) | |
509 return last | |
510 | |
511 return self.dbpool.runInteraction(interaction) | |
512 | |
513 def getCallbacks(self, service, nodeIdentifier): | |
514 def interaction(cursor): | |
515 cursor.execute("""SELECT uri FROM callbacks | |
516 WHERE service=%s and node=%s""", | |
517 service.full(), | |
518 nodeIdentifier) | |
519 results = cursor.fetchall() | |
520 | |
521 if not results: | |
522 raise error.NoCallbacks() | |
523 | |
524 return [result[0] for result in results] | |
525 | |
526 return self.dbpool.runInteraction(interaction) | |
527 | |
528 | |
529 def hasCallbacks(self, service, nodeIdentifier): | |
530 def interaction(cursor): | |
531 return bool(self._countCallbacks(cursor, service, nodeIdentifier)) | |
532 | |
533 return self.dbpool.runInteraction(interaction) |