comparison sat_pubsub/pgsql_storage.py @ 232:923281d4c5bc

renamed idavoll directory to sat_pubsub
author Goffi <goffi@goffi.org>
date Thu, 17 May 2012 12:48:14 +0200
parents idavoll/pgsql_storage.py@8540825f85e0
children 564ae55219e1
comparison
equal deleted inserted replaced
231:d99047cd90f9 232:923281d4c5bc
1 # Copyright (c) 2003-2008 Ralph Meijer
2 # See LICENSE for details.
3
4 import copy
5
6 from zope.interface import implements
7
8 from twisted.enterprise import adbapi
9 from twisted.words.protocols.jabber import jid
10
11 from wokkel.generic import parseXml, stripNamespace
12 from wokkel.pubsub import Subscription
13
14 from idavoll import error, iidavoll
15
16 class Storage:
17
18 implements(iidavoll.IStorage)
19
20 defaultConfig = {
21 'leaf': {
22 "pubsub#persist_items": True,
23 "pubsub#deliver_payloads": True,
24 "pubsub#send_last_published_item": 'on_sub',
25 },
26 'collection': {
27 "pubsub#deliver_payloads": True,
28 "pubsub#send_last_published_item": 'on_sub',
29 }
30 }
31
32 def __init__(self, dbpool):
33 self.dbpool = dbpool
34
35
36 def getNode(self, nodeIdentifier):
37 return self.dbpool.runInteraction(self._getNode, nodeIdentifier)
38
39
40 def _getNode(self, cursor, nodeIdentifier):
41 configuration = {}
42 cursor.execute("""SELECT node_type,
43 persist_items,
44 deliver_payloads,
45 send_last_published_item
46 FROM nodes
47 WHERE node=%s""",
48 (nodeIdentifier,))
49 row = cursor.fetchone()
50
51 if not row:
52 raise error.NodeNotFound()
53
54 if row[0] == 'leaf':
55 configuration = {
56 'pubsub#persist_items': row[1],
57 'pubsub#deliver_payloads': row[2],
58 'pubsub#send_last_published_item':
59 row[3]}
60 node = LeafNode(nodeIdentifier, configuration)
61 node.dbpool = self.dbpool
62 return node
63 elif row[0] == 'collection':
64 configuration = {
65 'pubsub#deliver_payloads': row[2],
66 'pubsub#send_last_published_item':
67 row[3]}
68 node = CollectionNode(nodeIdentifier, configuration)
69 node.dbpool = self.dbpool
70 return node
71
72
73
74 def getNodeIds(self):
75 d = self.dbpool.runQuery("""SELECT node from nodes""")
76 d.addCallback(lambda results: [r[0] for r in results])
77 return d
78
79
80 def createNode(self, nodeIdentifier, owner, config):
81 return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
82 owner, config)
83
84
85 def _createNode(self, cursor, nodeIdentifier, owner, config):
86 if config['pubsub#node_type'] != 'leaf':
87 raise error.NoCollections()
88
89 owner = owner.userhost()
90 try:
91 cursor.execute("""INSERT INTO nodes
92 (node, node_type, persist_items,
93 deliver_payloads, send_last_published_item)
94 VALUES
95 (%s, 'leaf', %s, %s, %s)""",
96 (nodeIdentifier,
97 config['pubsub#persist_items'],
98 config['pubsub#deliver_payloads'],
99 config['pubsub#send_last_published_item'])
100 )
101 except cursor._pool.dbapi.IntegrityError:
102 raise error.NodeExists()
103
104 cursor.execute("""SELECT 1 from entities where jid=%s""",
105 (owner,))
106
107 if not cursor.fetchone():
108 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
109 (owner,))
110
111 cursor.execute("""INSERT INTO affiliations
112 (node_id, entity_id, affiliation)
113 SELECT node_id, entity_id, 'owner' FROM
114 (SELECT node_id FROM nodes WHERE node=%s) as n
115 CROSS JOIN
116 (SELECT entity_id FROM entities
117 WHERE jid=%s) as e""",
118 (nodeIdentifier, owner))
119
120
121 def deleteNode(self, nodeIdentifier):
122 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier)
123
124
125 def _deleteNode(self, cursor, nodeIdentifier):
126 cursor.execute("""DELETE FROM nodes WHERE node=%s""",
127 (nodeIdentifier,))
128
129 if cursor.rowcount != 1:
130 raise error.NodeNotFound()
131
132
133 def getAffiliations(self, entity):
134 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
135 NATURAL JOIN affiliations
136 NATURAL JOIN nodes
137 WHERE jid=%s""",
138 (entity.userhost(),))
139 d.addCallback(lambda results: [tuple(r) for r in results])
140 return d
141
142
143 def getSubscriptions(self, entity):
144 def toSubscriptions(rows):
145 subscriptions = []
146 for row in rows:
147 subscriber = jid.internJID('%s/%s' % (row[1],
148 row[2]))
149 subscription = Subscription(row[0], subscriber, row[3])
150 subscriptions.append(subscription)
151 return subscriptions
152
153 d = self.dbpool.runQuery("""SELECT node, jid, resource, state
154 FROM entities
155 NATURAL JOIN subscriptions
156 NATURAL JOIN nodes
157 WHERE jid=%s""",
158 (entity.userhost(),))
159 d.addCallback(toSubscriptions)
160 return d
161
162
163 def getDefaultConfiguration(self, nodeType):
164 return self.defaultConfig[nodeType]
165
166
167
168 class Node:
169
170 implements(iidavoll.INode)
171
172 def __init__(self, nodeIdentifier, config):
173 self.nodeIdentifier = nodeIdentifier
174 self._config = config
175
176
177 def _checkNodeExists(self, cursor):
178 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""",
179 (self.nodeIdentifier,))
180 if not cursor.fetchone():
181 raise error.NodeNotFound()
182
183
184 def getType(self):
185 return self.nodeType
186
187
188 def getConfiguration(self):
189 return self._config
190
191
192 def setConfiguration(self, options):
193 config = copy.copy(self._config)
194
195 for option in options:
196 if option in config:
197 config[option] = options[option]
198
199 d = self.dbpool.runInteraction(self._setConfiguration, config)
200 d.addCallback(self._setCachedConfiguration, config)
201 return d
202
203
204 def _setConfiguration(self, cursor, config):
205 self._checkNodeExists(cursor)
206 cursor.execute("""UPDATE nodes SET persist_items=%s,
207 deliver_payloads=%s,
208 send_last_published_item=%s
209 WHERE node=%s""",
210 (config["pubsub#persist_items"],
211 config["pubsub#deliver_payloads"],
212 config["pubsub#send_last_published_item"],
213 self.nodeIdentifier))
214
215
216 def _setCachedConfiguration(self, void, config):
217 self._config = config
218
219
220 def getMetaData(self):
221 config = copy.copy(self._config)
222 config["pubsub#node_type"] = self.nodeType
223 return config
224
225
226 def getAffiliation(self, entity):
227 return self.dbpool.runInteraction(self._getAffiliation, entity)
228
229
230 def _getAffiliation(self, cursor, entity):
231 self._checkNodeExists(cursor)
232 cursor.execute("""SELECT affiliation FROM affiliations
233 NATURAL JOIN nodes
234 NATURAL JOIN entities
235 WHERE node=%s AND jid=%s""",
236 (self.nodeIdentifier,
237 entity.userhost()))
238
239 try:
240 return cursor.fetchone()[0]
241 except TypeError:
242 return None
243
244
245 def getSubscription(self, subscriber):
246 return self.dbpool.runInteraction(self._getSubscription, subscriber)
247
248
249 def _getSubscription(self, cursor, subscriber):
250 self._checkNodeExists(cursor)
251
252 userhost = subscriber.userhost()
253 resource = subscriber.resource or ''
254
255 cursor.execute("""SELECT state FROM subscriptions
256 NATURAL JOIN nodes
257 NATURAL JOIN entities
258 WHERE node=%s AND jid=%s AND resource=%s""",
259 (self.nodeIdentifier,
260 userhost,
261 resource))
262 row = cursor.fetchone()
263 if not row:
264 return None
265 else:
266 return Subscription(self.nodeIdentifier, subscriber, row[0])
267
268
269 def getSubscriptions(self, state=None):
270 return self.dbpool.runInteraction(self._getSubscriptions, state)
271
272
273 def _getSubscriptions(self, cursor, state):
274 self._checkNodeExists(cursor)
275
276 query = """SELECT jid, resource, state,
277 subscription_type, subscription_depth
278 FROM subscriptions
279 NATURAL JOIN nodes
280 NATURAL JOIN entities
281 WHERE node=%s""";
282 values = [self.nodeIdentifier]
283
284 if state:
285 query += " AND state=%s"
286 values.append(state)
287
288 cursor.execute(query, values);
289 rows = cursor.fetchall()
290
291 subscriptions = []
292 for row in rows:
293 subscriber = jid.JID('%s/%s' % (row[0], row[1]))
294
295 options = {}
296 if row[3]:
297 options['pubsub#subscription_type'] = row[3];
298 if row[4]:
299 options['pubsub#subscription_depth'] = row[4];
300
301 subscriptions.append(Subscription(self.nodeIdentifier, subscriber,
302 row[2], options))
303
304 return subscriptions
305
306
307 def addSubscription(self, subscriber, state, config):
308 return self.dbpool.runInteraction(self._addSubscription, subscriber,
309 state, config)
310
311
312 def _addSubscription(self, cursor, subscriber, state, config):
313 self._checkNodeExists(cursor)
314
315 userhost = subscriber.userhost()
316 resource = subscriber.resource or ''
317
318 subscription_type = config.get('pubsub#subscription_type')
319 subscription_depth = config.get('pubsub#subscription_depth')
320
321 try:
322 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
323 (userhost,))
324 except cursor._pool.dbapi.IntegrityError:
325 cursor._connection.rollback()
326
327 try:
328 cursor.execute("""INSERT INTO subscriptions
329 (node_id, entity_id, resource, state,
330 subscription_type, subscription_depth)
331 SELECT node_id, entity_id, %s, %s, %s, %s FROM
332 (SELECT node_id FROM nodes
333 WHERE node=%s) as n
334 CROSS JOIN
335 (SELECT entity_id FROM entities
336 WHERE jid=%s) as e""",
337 (resource,
338 state,
339 subscription_type,
340 subscription_depth,
341 self.nodeIdentifier,
342 userhost))
343 except cursor._pool.dbapi.IntegrityError:
344 raise error.SubscriptionExists()
345
346
347 def removeSubscription(self, subscriber):
348 return self.dbpool.runInteraction(self._removeSubscription,
349 subscriber)
350
351
352 def _removeSubscription(self, cursor, subscriber):
353 self._checkNodeExists(cursor)
354
355 userhost = subscriber.userhost()
356 resource = subscriber.resource or ''
357
358 cursor.execute("""DELETE FROM subscriptions WHERE
359 node_id=(SELECT node_id FROM nodes
360 WHERE node=%s) AND
361 entity_id=(SELECT entity_id FROM entities
362 WHERE jid=%s) AND
363 resource=%s""",
364 (self.nodeIdentifier,
365 userhost,
366 resource))
367 if cursor.rowcount != 1:
368 raise error.NotSubscribed()
369
370 return None
371
372
373 def isSubscribed(self, entity):
374 return self.dbpool.runInteraction(self._isSubscribed, entity)
375
376
377 def _isSubscribed(self, cursor, entity):
378 self._checkNodeExists(cursor)
379
380 cursor.execute("""SELECT 1 FROM entities
381 NATURAL JOIN subscriptions
382 NATURAL JOIN nodes
383 WHERE entities.jid=%s
384 AND node=%s AND state='subscribed'""",
385 (entity.userhost(),
386 self.nodeIdentifier))
387
388 return cursor.fetchone() is not None
389
390
391 def getAffiliations(self):
392 return self.dbpool.runInteraction(self._getAffiliations)
393
394
395 def _getAffiliations(self, cursor):
396 self._checkNodeExists(cursor)
397
398 cursor.execute("""SELECT jid, affiliation FROM nodes
399 NATURAL JOIN affiliations
400 NATURAL JOIN entities
401 WHERE node=%s""",
402 (self.nodeIdentifier,))
403 result = cursor.fetchall()
404
405 return [(jid.internJID(r[0]), r[1]) for r in result]
406
407
408
409 class LeafNode(Node):
410
411 implements(iidavoll.ILeafNode)
412
413 nodeType = 'leaf'
414
415 def storeItems(self, items, publisher):
416 return self.dbpool.runInteraction(self._storeItems, items, publisher)
417
418
419 def _storeItems(self, cursor, items, publisher):
420 self._checkNodeExists(cursor)
421 for item in items:
422 self._storeItem(cursor, item, publisher)
423
424
425 def _storeItem(self, cursor, item, publisher):
426 data = item.toXml()
427 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
428 FROM nodes
429 WHERE nodes.node_id = items.node_id AND
430 nodes.node = %s and items.item=%s""",
431 (publisher.full(),
432 data,
433 self.nodeIdentifier,
434 item["id"]))
435 if cursor.rowcount == 1:
436 return
437
438 cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
439 SELECT node_id, %s, %s, %s FROM nodes
440 WHERE node=%s""",
441 (item["id"],
442 publisher.full(),
443 data,
444 self.nodeIdentifier))
445
446
447 def removeItems(self, itemIdentifiers):
448 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers)
449
450
451 def _removeItems(self, cursor, itemIdentifiers):
452 self._checkNodeExists(cursor)
453
454 deleted = []
455
456 for itemIdentifier in itemIdentifiers:
457 cursor.execute("""DELETE FROM items WHERE
458 node_id=(SELECT node_id FROM nodes
459 WHERE node=%s) AND
460 item=%s""",
461 (self.nodeIdentifier,
462 itemIdentifier))
463
464 if cursor.rowcount:
465 deleted.append(itemIdentifier)
466
467 return deleted
468
469
470 def getItems(self, maxItems=None):
471 return self.dbpool.runInteraction(self._getItems, maxItems)
472
473
474 def _getItems(self, cursor, maxItems):
475 self._checkNodeExists(cursor)
476 query = """SELECT data FROM nodes
477 NATURAL JOIN items
478 WHERE node=%s ORDER BY date DESC"""
479 if maxItems:
480 cursor.execute(query + " LIMIT %s",
481 (self.nodeIdentifier,
482 maxItems))
483 else:
484 cursor.execute(query, (self.nodeIdentifier,))
485
486 result = cursor.fetchall()
487 items = [stripNamespace(parseXml(r[0])) for r in result]
488 return items
489
490
491 def getItemsById(self, itemIdentifiers):
492 return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers)
493
494
495 def _getItemsById(self, cursor, itemIdentifiers):
496 self._checkNodeExists(cursor)
497 items = []
498 for itemIdentifier in itemIdentifiers:
499 cursor.execute("""SELECT data FROM nodes
500 NATURAL JOIN items
501 WHERE node=%s AND item=%s""",
502 (self.nodeIdentifier,
503 itemIdentifier))
504 result = cursor.fetchone()
505 if result:
506 items.append(parseXml(result[0]))
507 return items
508
509
510 def purge(self):
511 return self.dbpool.runInteraction(self._purge)
512
513
514 def _purge(self, cursor):
515 self._checkNodeExists(cursor)
516
517 cursor.execute("""DELETE FROM items WHERE
518 node_id=(SELECT node_id FROM nodes WHERE node=%s)""",
519 (self.nodeIdentifier,))
520
521
522 class CollectionNode(Node):
523
524 nodeType = 'collection'
525
526
527
528 class GatewayStorage(object):
529 """
530 Memory based storage facility for the XMPP-HTTP gateway.
531 """
532
533 def __init__(self, dbpool):
534 self.dbpool = dbpool
535
536
537 def _countCallbacks(self, cursor, service, nodeIdentifier):
538 """
539 Count number of callbacks registered for a node.
540 """
541 cursor.execute("""SELECT count(*) FROM callbacks
542 WHERE service=%s and node=%s""",
543 service.full(),
544 nodeIdentifier)
545 results = cursor.fetchall()
546 return results[0][0]
547
548
549 def addCallback(self, service, nodeIdentifier, callback):
550 def interaction(cursor):
551 cursor.execute("""SELECT 1 FROM callbacks
552 WHERE service=%s and node=%s and uri=%s""",
553 service.full(),
554 nodeIdentifier,
555 callback)
556 if cursor.fetchall():
557 return
558
559 cursor.execute("""INSERT INTO callbacks
560 (service, node, uri) VALUES
561 (%s, %s, %s)""",
562 service.full(),
563 nodeIdentifier,
564 callback)
565
566 return self.dbpool.runInteraction(interaction)
567
568
569 def removeCallback(self, service, nodeIdentifier, callback):
570 def interaction(cursor):
571 cursor.execute("""DELETE FROM callbacks
572 WHERE service=%s and node=%s and uri=%s""",
573 service.full(),
574 nodeIdentifier,
575 callback)
576
577 if cursor.rowcount != 1:
578 raise error.NotSubscribed()
579
580 last = not self._countCallbacks(cursor, service, nodeIdentifier)
581 return last
582
583 return self.dbpool.runInteraction(interaction)
584
585 def getCallbacks(self, service, nodeIdentifier):
586 def interaction(cursor):
587 cursor.execute("""SELECT uri FROM callbacks
588 WHERE service=%s and node=%s""",
589 service.full(),
590 nodeIdentifier)
591 results = cursor.fetchall()
592
593 if not results:
594 raise error.NoCallbacks()
595
596 return [result[0] for result in results]
597
598 return self.dbpool.runInteraction(interaction)
599
600
601 def hasCallbacks(self, service, nodeIdentifier):
602 def interaction(cursor):
603 return bool(self._countCallbacks(cursor, service, nodeIdentifier))
604
605 return self.dbpool.runInteraction(interaction)