Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 232:923281d4c5bc
renamed idavoll directory to sat_pubsub
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 17 May 2012 12:48:14 +0200 |
parents | idavoll/pgsql_storage.py@8540825f85e0 |
children | 564ae55219e1 |
comparison
equal
deleted
inserted
replaced
231:d99047cd90f9 | 232:923281d4c5bc |
---|---|
1 # Copyright (c) 2003-2008 Ralph Meijer | |
2 # See LICENSE for details. | |
3 | |
4 import copy | |
5 | |
6 from zope.interface import implements | |
7 | |
8 from twisted.enterprise import adbapi | |
9 from twisted.words.protocols.jabber import jid | |
10 | |
11 from wokkel.generic import parseXml, stripNamespace | |
12 from wokkel.pubsub import Subscription | |
13 | |
14 from idavoll import error, iidavoll | |
15 | |
16 class Storage: | |
17 | |
18 implements(iidavoll.IStorage) | |
19 | |
20 defaultConfig = { | |
21 'leaf': { | |
22 "pubsub#persist_items": True, | |
23 "pubsub#deliver_payloads": True, | |
24 "pubsub#send_last_published_item": 'on_sub', | |
25 }, | |
26 'collection': { | |
27 "pubsub#deliver_payloads": True, | |
28 "pubsub#send_last_published_item": 'on_sub', | |
29 } | |
30 } | |
31 | |
32 def __init__(self, dbpool): | |
33 self.dbpool = dbpool | |
34 | |
35 | |
36 def getNode(self, nodeIdentifier): | |
37 return self.dbpool.runInteraction(self._getNode, nodeIdentifier) | |
38 | |
39 | |
40 def _getNode(self, cursor, nodeIdentifier): | |
41 configuration = {} | |
42 cursor.execute("""SELECT node_type, | |
43 persist_items, | |
44 deliver_payloads, | |
45 send_last_published_item | |
46 FROM nodes | |
47 WHERE node=%s""", | |
48 (nodeIdentifier,)) | |
49 row = cursor.fetchone() | |
50 | |
51 if not row: | |
52 raise error.NodeNotFound() | |
53 | |
54 if row[0] == 'leaf': | |
55 configuration = { | |
56 'pubsub#persist_items': row[1], | |
57 'pubsub#deliver_payloads': row[2], | |
58 'pubsub#send_last_published_item': | |
59 row[3]} | |
60 node = LeafNode(nodeIdentifier, configuration) | |
61 node.dbpool = self.dbpool | |
62 return node | |
63 elif row[0] == 'collection': | |
64 configuration = { | |
65 'pubsub#deliver_payloads': row[2], | |
66 'pubsub#send_last_published_item': | |
67 row[3]} | |
68 node = CollectionNode(nodeIdentifier, configuration) | |
69 node.dbpool = self.dbpool | |
70 return node | |
71 | |
72 | |
73 | |
74 def getNodeIds(self): | |
75 d = self.dbpool.runQuery("""SELECT node from nodes""") | |
76 d.addCallback(lambda results: [r[0] for r in results]) | |
77 return d | |
78 | |
79 | |
80 def createNode(self, nodeIdentifier, owner, config): | |
81 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, | |
82 owner, config) | |
83 | |
84 | |
85 def _createNode(self, cursor, nodeIdentifier, owner, config): | |
86 if config['pubsub#node_type'] != 'leaf': | |
87 raise error.NoCollections() | |
88 | |
89 owner = owner.userhost() | |
90 try: | |
91 cursor.execute("""INSERT INTO nodes | |
92 (node, node_type, persist_items, | |
93 deliver_payloads, send_last_published_item) | |
94 VALUES | |
95 (%s, 'leaf', %s, %s, %s)""", | |
96 (nodeIdentifier, | |
97 config['pubsub#persist_items'], | |
98 config['pubsub#deliver_payloads'], | |
99 config['pubsub#send_last_published_item']) | |
100 ) | |
101 except cursor._pool.dbapi.IntegrityError: | |
102 raise error.NodeExists() | |
103 | |
104 cursor.execute("""SELECT 1 from entities where jid=%s""", | |
105 (owner,)) | |
106 | |
107 if not cursor.fetchone(): | |
108 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | |
109 (owner,)) | |
110 | |
111 cursor.execute("""INSERT INTO affiliations | |
112 (node_id, entity_id, affiliation) | |
113 SELECT node_id, entity_id, 'owner' FROM | |
114 (SELECT node_id FROM nodes WHERE node=%s) as n | |
115 CROSS JOIN | |
116 (SELECT entity_id FROM entities | |
117 WHERE jid=%s) as e""", | |
118 (nodeIdentifier, owner)) | |
119 | |
120 | |
121 def deleteNode(self, nodeIdentifier): | |
122 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) | |
123 | |
124 | |
125 def _deleteNode(self, cursor, nodeIdentifier): | |
126 cursor.execute("""DELETE FROM nodes WHERE node=%s""", | |
127 (nodeIdentifier,)) | |
128 | |
129 if cursor.rowcount != 1: | |
130 raise error.NodeNotFound() | |
131 | |
132 | |
133 def getAffiliations(self, entity): | |
134 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities | |
135 NATURAL JOIN affiliations | |
136 NATURAL JOIN nodes | |
137 WHERE jid=%s""", | |
138 (entity.userhost(),)) | |
139 d.addCallback(lambda results: [tuple(r) for r in results]) | |
140 return d | |
141 | |
142 | |
143 def getSubscriptions(self, entity): | |
144 def toSubscriptions(rows): | |
145 subscriptions = [] | |
146 for row in rows: | |
147 subscriber = jid.internJID('%s/%s' % (row[1], | |
148 row[2])) | |
149 subscription = Subscription(row[0], subscriber, row[3]) | |
150 subscriptions.append(subscription) | |
151 return subscriptions | |
152 | |
153 d = self.dbpool.runQuery("""SELECT node, jid, resource, state | |
154 FROM entities | |
155 NATURAL JOIN subscriptions | |
156 NATURAL JOIN nodes | |
157 WHERE jid=%s""", | |
158 (entity.userhost(),)) | |
159 d.addCallback(toSubscriptions) | |
160 return d | |
161 | |
162 | |
163 def getDefaultConfiguration(self, nodeType): | |
164 return self.defaultConfig[nodeType] | |
165 | |
166 | |
167 | |
168 class Node: | |
169 | |
170 implements(iidavoll.INode) | |
171 | |
172 def __init__(self, nodeIdentifier, config): | |
173 self.nodeIdentifier = nodeIdentifier | |
174 self._config = config | |
175 | |
176 | |
177 def _checkNodeExists(self, cursor): | |
178 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", | |
179 (self.nodeIdentifier,)) | |
180 if not cursor.fetchone(): | |
181 raise error.NodeNotFound() | |
182 | |
183 | |
184 def getType(self): | |
185 return self.nodeType | |
186 | |
187 | |
188 def getConfiguration(self): | |
189 return self._config | |
190 | |
191 | |
192 def setConfiguration(self, options): | |
193 config = copy.copy(self._config) | |
194 | |
195 for option in options: | |
196 if option in config: | |
197 config[option] = options[option] | |
198 | |
199 d = self.dbpool.runInteraction(self._setConfiguration, config) | |
200 d.addCallback(self._setCachedConfiguration, config) | |
201 return d | |
202 | |
203 | |
204 def _setConfiguration(self, cursor, config): | |
205 self._checkNodeExists(cursor) | |
206 cursor.execute("""UPDATE nodes SET persist_items=%s, | |
207 deliver_payloads=%s, | |
208 send_last_published_item=%s | |
209 WHERE node=%s""", | |
210 (config["pubsub#persist_items"], | |
211 config["pubsub#deliver_payloads"], | |
212 config["pubsub#send_last_published_item"], | |
213 self.nodeIdentifier)) | |
214 | |
215 | |
216 def _setCachedConfiguration(self, void, config): | |
217 self._config = config | |
218 | |
219 | |
220 def getMetaData(self): | |
221 config = copy.copy(self._config) | |
222 config["pubsub#node_type"] = self.nodeType | |
223 return config | |
224 | |
225 | |
226 def getAffiliation(self, entity): | |
227 return self.dbpool.runInteraction(self._getAffiliation, entity) | |
228 | |
229 | |
230 def _getAffiliation(self, cursor, entity): | |
231 self._checkNodeExists(cursor) | |
232 cursor.execute("""SELECT affiliation FROM affiliations | |
233 NATURAL JOIN nodes | |
234 NATURAL JOIN entities | |
235 WHERE node=%s AND jid=%s""", | |
236 (self.nodeIdentifier, | |
237 entity.userhost())) | |
238 | |
239 try: | |
240 return cursor.fetchone()[0] | |
241 except TypeError: | |
242 return None | |
243 | |
244 | |
245 def getSubscription(self, subscriber): | |
246 return self.dbpool.runInteraction(self._getSubscription, subscriber) | |
247 | |
248 | |
249 def _getSubscription(self, cursor, subscriber): | |
250 self._checkNodeExists(cursor) | |
251 | |
252 userhost = subscriber.userhost() | |
253 resource = subscriber.resource or '' | |
254 | |
255 cursor.execute("""SELECT state FROM subscriptions | |
256 NATURAL JOIN nodes | |
257 NATURAL JOIN entities | |
258 WHERE node=%s AND jid=%s AND resource=%s""", | |
259 (self.nodeIdentifier, | |
260 userhost, | |
261 resource)) | |
262 row = cursor.fetchone() | |
263 if not row: | |
264 return None | |
265 else: | |
266 return Subscription(self.nodeIdentifier, subscriber, row[0]) | |
267 | |
268 | |
269 def getSubscriptions(self, state=None): | |
270 return self.dbpool.runInteraction(self._getSubscriptions, state) | |
271 | |
272 | |
273 def _getSubscriptions(self, cursor, state): | |
274 self._checkNodeExists(cursor) | |
275 | |
276 query = """SELECT jid, resource, state, | |
277 subscription_type, subscription_depth | |
278 FROM subscriptions | |
279 NATURAL JOIN nodes | |
280 NATURAL JOIN entities | |
281 WHERE node=%s"""; | |
282 values = [self.nodeIdentifier] | |
283 | |
284 if state: | |
285 query += " AND state=%s" | |
286 values.append(state) | |
287 | |
288 cursor.execute(query, values); | |
289 rows = cursor.fetchall() | |
290 | |
291 subscriptions = [] | |
292 for row in rows: | |
293 subscriber = jid.JID('%s/%s' % (row[0], row[1])) | |
294 | |
295 options = {} | |
296 if row[3]: | |
297 options['pubsub#subscription_type'] = row[3]; | |
298 if row[4]: | |
299 options['pubsub#subscription_depth'] = row[4]; | |
300 | |
301 subscriptions.append(Subscription(self.nodeIdentifier, subscriber, | |
302 row[2], options)) | |
303 | |
304 return subscriptions | |
305 | |
306 | |
307 def addSubscription(self, subscriber, state, config): | |
308 return self.dbpool.runInteraction(self._addSubscription, subscriber, | |
309 state, config) | |
310 | |
311 | |
312 def _addSubscription(self, cursor, subscriber, state, config): | |
313 self._checkNodeExists(cursor) | |
314 | |
315 userhost = subscriber.userhost() | |
316 resource = subscriber.resource or '' | |
317 | |
318 subscription_type = config.get('pubsub#subscription_type') | |
319 subscription_depth = config.get('pubsub#subscription_depth') | |
320 | |
321 try: | |
322 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | |
323 (userhost,)) | |
324 except cursor._pool.dbapi.IntegrityError: | |
325 cursor._connection.rollback() | |
326 | |
327 try: | |
328 cursor.execute("""INSERT INTO subscriptions | |
329 (node_id, entity_id, resource, state, | |
330 subscription_type, subscription_depth) | |
331 SELECT node_id, entity_id, %s, %s, %s, %s FROM | |
332 (SELECT node_id FROM nodes | |
333 WHERE node=%s) as n | |
334 CROSS JOIN | |
335 (SELECT entity_id FROM entities | |
336 WHERE jid=%s) as e""", | |
337 (resource, | |
338 state, | |
339 subscription_type, | |
340 subscription_depth, | |
341 self.nodeIdentifier, | |
342 userhost)) | |
343 except cursor._pool.dbapi.IntegrityError: | |
344 raise error.SubscriptionExists() | |
345 | |
346 | |
347 def removeSubscription(self, subscriber): | |
348 return self.dbpool.runInteraction(self._removeSubscription, | |
349 subscriber) | |
350 | |
351 | |
352 def _removeSubscription(self, cursor, subscriber): | |
353 self._checkNodeExists(cursor) | |
354 | |
355 userhost = subscriber.userhost() | |
356 resource = subscriber.resource or '' | |
357 | |
358 cursor.execute("""DELETE FROM subscriptions WHERE | |
359 node_id=(SELECT node_id FROM nodes | |
360 WHERE node=%s) AND | |
361 entity_id=(SELECT entity_id FROM entities | |
362 WHERE jid=%s) AND | |
363 resource=%s""", | |
364 (self.nodeIdentifier, | |
365 userhost, | |
366 resource)) | |
367 if cursor.rowcount != 1: | |
368 raise error.NotSubscribed() | |
369 | |
370 return None | |
371 | |
372 | |
373 def isSubscribed(self, entity): | |
374 return self.dbpool.runInteraction(self._isSubscribed, entity) | |
375 | |
376 | |
377 def _isSubscribed(self, cursor, entity): | |
378 self._checkNodeExists(cursor) | |
379 | |
380 cursor.execute("""SELECT 1 FROM entities | |
381 NATURAL JOIN subscriptions | |
382 NATURAL JOIN nodes | |
383 WHERE entities.jid=%s | |
384 AND node=%s AND state='subscribed'""", | |
385 (entity.userhost(), | |
386 self.nodeIdentifier)) | |
387 | |
388 return cursor.fetchone() is not None | |
389 | |
390 | |
391 def getAffiliations(self): | |
392 return self.dbpool.runInteraction(self._getAffiliations) | |
393 | |
394 | |
395 def _getAffiliations(self, cursor): | |
396 self._checkNodeExists(cursor) | |
397 | |
398 cursor.execute("""SELECT jid, affiliation FROM nodes | |
399 NATURAL JOIN affiliations | |
400 NATURAL JOIN entities | |
401 WHERE node=%s""", | |
402 (self.nodeIdentifier,)) | |
403 result = cursor.fetchall() | |
404 | |
405 return [(jid.internJID(r[0]), r[1]) for r in result] | |
406 | |
407 | |
408 | |
409 class LeafNode(Node): | |
410 | |
411 implements(iidavoll.ILeafNode) | |
412 | |
413 nodeType = 'leaf' | |
414 | |
415 def storeItems(self, items, publisher): | |
416 return self.dbpool.runInteraction(self._storeItems, items, publisher) | |
417 | |
418 | |
419 def _storeItems(self, cursor, items, publisher): | |
420 self._checkNodeExists(cursor) | |
421 for item in items: | |
422 self._storeItem(cursor, item, publisher) | |
423 | |
424 | |
425 def _storeItem(self, cursor, item, publisher): | |
426 data = item.toXml() | |
427 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s | |
428 FROM nodes | |
429 WHERE nodes.node_id = items.node_id AND | |
430 nodes.node = %s and items.item=%s""", | |
431 (publisher.full(), | |
432 data, | |
433 self.nodeIdentifier, | |
434 item["id"])) | |
435 if cursor.rowcount == 1: | |
436 return | |
437 | |
438 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) | |
439 SELECT node_id, %s, %s, %s FROM nodes | |
440 WHERE node=%s""", | |
441 (item["id"], | |
442 publisher.full(), | |
443 data, | |
444 self.nodeIdentifier)) | |
445 | |
446 | |
447 def removeItems(self, itemIdentifiers): | |
448 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) | |
449 | |
450 | |
451 def _removeItems(self, cursor, itemIdentifiers): | |
452 self._checkNodeExists(cursor) | |
453 | |
454 deleted = [] | |
455 | |
456 for itemIdentifier in itemIdentifiers: | |
457 cursor.execute("""DELETE FROM items WHERE | |
458 node_id=(SELECT node_id FROM nodes | |
459 WHERE node=%s) AND | |
460 item=%s""", | |
461 (self.nodeIdentifier, | |
462 itemIdentifier)) | |
463 | |
464 if cursor.rowcount: | |
465 deleted.append(itemIdentifier) | |
466 | |
467 return deleted | |
468 | |
469 | |
470 def getItems(self, maxItems=None): | |
471 return self.dbpool.runInteraction(self._getItems, maxItems) | |
472 | |
473 | |
474 def _getItems(self, cursor, maxItems): | |
475 self._checkNodeExists(cursor) | |
476 query = """SELECT data FROM nodes | |
477 NATURAL JOIN items | |
478 WHERE node=%s ORDER BY date DESC""" | |
479 if maxItems: | |
480 cursor.execute(query + " LIMIT %s", | |
481 (self.nodeIdentifier, | |
482 maxItems)) | |
483 else: | |
484 cursor.execute(query, (self.nodeIdentifier,)) | |
485 | |
486 result = cursor.fetchall() | |
487 items = [stripNamespace(parseXml(r[0])) for r in result] | |
488 return items | |
489 | |
490 | |
491 def getItemsById(self, itemIdentifiers): | |
492 return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers) | |
493 | |
494 | |
495 def _getItemsById(self, cursor, itemIdentifiers): | |
496 self._checkNodeExists(cursor) | |
497 items = [] | |
498 for itemIdentifier in itemIdentifiers: | |
499 cursor.execute("""SELECT data FROM nodes | |
500 NATURAL JOIN items | |
501 WHERE node=%s AND item=%s""", | |
502 (self.nodeIdentifier, | |
503 itemIdentifier)) | |
504 result = cursor.fetchone() | |
505 if result: | |
506 items.append(parseXml(result[0])) | |
507 return items | |
508 | |
509 | |
510 def purge(self): | |
511 return self.dbpool.runInteraction(self._purge) | |
512 | |
513 | |
514 def _purge(self, cursor): | |
515 self._checkNodeExists(cursor) | |
516 | |
517 cursor.execute("""DELETE FROM items WHERE | |
518 node_id=(SELECT node_id FROM nodes WHERE node=%s)""", | |
519 (self.nodeIdentifier,)) | |
520 | |
521 | |
522 class CollectionNode(Node): | |
523 | |
524 nodeType = 'collection' | |
525 | |
526 | |
527 | |
528 class GatewayStorage(object): | |
529 """ | |
530 Memory based storage facility for the XMPP-HTTP gateway. | |
531 """ | |
532 | |
533 def __init__(self, dbpool): | |
534 self.dbpool = dbpool | |
535 | |
536 | |
537 def _countCallbacks(self, cursor, service, nodeIdentifier): | |
538 """ | |
539 Count number of callbacks registered for a node. | |
540 """ | |
541 cursor.execute("""SELECT count(*) FROM callbacks | |
542 WHERE service=%s and node=%s""", | |
543 service.full(), | |
544 nodeIdentifier) | |
545 results = cursor.fetchall() | |
546 return results[0][0] | |
547 | |
548 | |
549 def addCallback(self, service, nodeIdentifier, callback): | |
550 def interaction(cursor): | |
551 cursor.execute("""SELECT 1 FROM callbacks | |
552 WHERE service=%s and node=%s and uri=%s""", | |
553 service.full(), | |
554 nodeIdentifier, | |
555 callback) | |
556 if cursor.fetchall(): | |
557 return | |
558 | |
559 cursor.execute("""INSERT INTO callbacks | |
560 (service, node, uri) VALUES | |
561 (%s, %s, %s)""", | |
562 service.full(), | |
563 nodeIdentifier, | |
564 callback) | |
565 | |
566 return self.dbpool.runInteraction(interaction) | |
567 | |
568 | |
569 def removeCallback(self, service, nodeIdentifier, callback): | |
570 def interaction(cursor): | |
571 cursor.execute("""DELETE FROM callbacks | |
572 WHERE service=%s and node=%s and uri=%s""", | |
573 service.full(), | |
574 nodeIdentifier, | |
575 callback) | |
576 | |
577 if cursor.rowcount != 1: | |
578 raise error.NotSubscribed() | |
579 | |
580 last = not self._countCallbacks(cursor, service, nodeIdentifier) | |
581 return last | |
582 | |
583 return self.dbpool.runInteraction(interaction) | |
584 | |
585 def getCallbacks(self, service, nodeIdentifier): | |
586 def interaction(cursor): | |
587 cursor.execute("""SELECT uri FROM callbacks | |
588 WHERE service=%s and node=%s""", | |
589 service.full(), | |
590 nodeIdentifier) | |
591 results = cursor.fetchall() | |
592 | |
593 if not results: | |
594 raise error.NoCallbacks() | |
595 | |
596 return [result[0] for result in results] | |
597 | |
598 return self.dbpool.runInteraction(interaction) | |
599 | |
600 | |
601 def hasCallbacks(self, service, nodeIdentifier): | |
602 def interaction(cursor): | |
603 return bool(self._countCallbacks(cursor, service, nodeIdentifier)) | |
604 | |
605 return self.dbpool.runInteraction(interaction) |