Mercurial > libervia-pubsub
comparison idavoll/pgsql_storage.py @ 206:274a45d2a5ab
Implement root collection that includes all leaf nodes.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 04 Aug 2008 13:47:10 +0000 |
parents | b4bf0a5ce50d |
children | 8540825f85e0 |
comparison
equal
deleted
inserted
replaced
205:e6b710bf2b24 | 206:274a45d2a5ab |
---|---|
2 # See LICENSE for details. | 2 # See LICENSE for details. |
3 | 3 |
4 import copy | 4 import copy |
5 | 5 |
6 from zope.interface import implements | 6 from zope.interface import implements |
7 | |
8 from twisted.enterprise import adbapi | |
7 from twisted.words.protocols.jabber import jid | 9 from twisted.words.protocols.jabber import jid |
8 from wokkel.generic import parseXml | 10 |
11 from wokkel.generic import parseXml, stripNamespace | |
12 from wokkel.pubsub import Subscription | |
9 | 13 |
10 from idavoll import error, iidavoll | 14 from idavoll import error, iidavoll |
11 | 15 |
12 class Storage: | 16 class Storage: |
13 | 17 |
14 implements(iidavoll.IStorage) | 18 implements(iidavoll.IStorage) |
15 | 19 |
20 defaultConfig = { | |
21 'leaf': { | |
22 "pubsub#persist_items": True, | |
23 "pubsub#deliver_payloads": True, | |
24 "pubsub#send_last_published_item": 'on_sub', | |
25 }, | |
26 'collection': { | |
27 "pubsub#deliver_payloads": True, | |
28 "pubsub#send_last_published_item": 'on_sub', | |
29 } | |
30 } | |
16 | 31 |
17 def __init__(self, dbpool): | 32 def __init__(self, dbpool): |
18 self.dbpool = dbpool | 33 self.dbpool = dbpool |
19 | 34 |
20 | 35 |
22 return self.dbpool.runInteraction(self._getNode, nodeIdentifier) | 37 return self.dbpool.runInteraction(self._getNode, nodeIdentifier) |
23 | 38 |
24 | 39 |
25 def _getNode(self, cursor, nodeIdentifier): | 40 def _getNode(self, cursor, nodeIdentifier): |
26 configuration = {} | 41 configuration = {} |
27 cursor.execute("""SELECT persistent, deliver_payload, | 42 cursor.execute("""SELECT node_type, |
43 persist_items, | |
44 deliver_payloads, | |
28 send_last_published_item | 45 send_last_published_item |
29 FROM nodes | 46 FROM nodes |
30 WHERE node=%s""", | 47 WHERE node=%s""", |
31 (nodeIdentifier,)) | 48 (nodeIdentifier,)) |
32 try: | 49 row = cursor.fetchone() |
33 (configuration["pubsub#persist_items"], | 50 |
34 configuration["pubsub#deliver_payloads"], | 51 if not row: |
35 configuration["pubsub#send_last_published_item"]) = \ | |
36 cursor.fetchone() | |
37 except TypeError: | |
38 raise error.NodeNotFound() | 52 raise error.NodeNotFound() |
39 else: | 53 |
54 if row.node_type == 'leaf': | |
55 configuration = { | |
56 'pubsub#persist_items': row.persist_items, | |
57 'pubsub#deliver_payloads': row.deliver_payloads, | |
58 'pubsub#send_last_published_item': | |
59 row.send_last_published_item} | |
40 node = LeafNode(nodeIdentifier, configuration) | 60 node = LeafNode(nodeIdentifier, configuration) |
41 node.dbpool = self.dbpool | 61 node.dbpool = self.dbpool |
42 return node | 62 return node |
63 elif row.node_type == 'collection': | |
64 configuration = { | |
65 'pubsub#deliver_payloads': row.deliver_payloads, | |
66 'pubsub#send_last_published_item': | |
67 row.send_last_published_item} | |
68 node = CollectionNode(nodeIdentifier, configuration) | |
69 node.dbpool = self.dbpool | |
70 return node | |
71 | |
43 | 72 |
44 | 73 |
45 def getNodeIds(self): | 74 def getNodeIds(self): |
46 d = self.dbpool.runQuery("""SELECT node from nodes""") | 75 d = self.dbpool.runQuery("""SELECT node from nodes""") |
47 d.addCallback(lambda results: [r[0] for r in results]) | 76 d.addCallback(lambda results: [r[0] for r in results]) |
48 return d | 77 return d |
49 | 78 |
50 | 79 |
51 def createNode(self, nodeIdentifier, owner, config=None): | 80 def createNode(self, nodeIdentifier, owner, config): |
52 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, | 81 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, |
53 owner) | 82 owner, config) |
54 | 83 |
55 | 84 |
56 def _createNode(self, cursor, nodeIdentifier, owner): | 85 def _createNode(self, cursor, nodeIdentifier, owner, config): |
86 if config['pubsub#node_type'] != 'leaf': | |
87 raise error.NoCollections() | |
88 | |
57 owner = owner.userhost() | 89 owner = owner.userhost() |
58 try: | 90 try: |
59 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", | 91 cursor.execute("""INSERT INTO nodes |
60 (nodeIdentifier)) | 92 (node, node_type, persist_items, |
93 deliver_payloads, send_last_published_item) | |
94 VALUES | |
95 (%s, 'leaf', %s, %s, %s)""", | |
96 (nodeIdentifier, | |
97 config['pubsub#persist_items'], | |
98 config['pubsub#deliver_payloads'], | |
99 config['pubsub#send_last_published_item']) | |
100 ) | |
61 except cursor._pool.dbapi.OperationalError: | 101 except cursor._pool.dbapi.OperationalError: |
62 raise error.NodeExists() | 102 raise error.NodeExists() |
63 | 103 |
64 cursor.execute("""SELECT 1 from entities where jid=%s""", | 104 cursor.execute("""SELECT 1 from entities where jid=%s""", |
65 (owner)) | 105 (owner)) |
68 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 108 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
69 (owner)) | 109 (owner)) |
70 | 110 |
71 cursor.execute("""INSERT INTO affiliations | 111 cursor.execute("""INSERT INTO affiliations |
72 (node_id, entity_id, affiliation) | 112 (node_id, entity_id, affiliation) |
73 SELECT n.id, e.id, 'owner' FROM | 113 SELECT node_id, entity_id, 'owner' FROM |
74 (SELECT id FROM nodes WHERE node=%s) AS n | 114 (SELECT node_id FROM nodes WHERE node=%s) as n |
75 CROSS JOIN | 115 CROSS JOIN |
76 (SELECT id FROM entities WHERE jid=%s) AS e""", | 116 (SELECT entity_id FROM entities |
117 WHERE jid=%s) as e""", | |
77 (nodeIdentifier, owner)) | 118 (nodeIdentifier, owner)) |
78 | 119 |
79 | 120 |
80 def deleteNode(self, nodeIdentifier): | 121 def deleteNode(self, nodeIdentifier): |
81 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) | 122 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) |
89 raise error.NodeNotFound() | 130 raise error.NodeNotFound() |
90 | 131 |
91 | 132 |
92 def getAffiliations(self, entity): | 133 def getAffiliations(self, entity): |
93 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities | 134 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities |
94 JOIN affiliations ON | 135 NATURAL JOIN affiliations |
95 (affiliations.entity_id=entities.id) | 136 NATURAL JOIN nodes |
96 JOIN nodes ON | |
97 (nodes.id=affiliations.node_id) | |
98 WHERE jid=%s""", | 137 WHERE jid=%s""", |
99 (entity.userhost(),)) | 138 (entity.userhost(),)) |
100 d.addCallback(lambda results: [tuple(r) for r in results]) | 139 d.addCallback(lambda results: [tuple(r) for r in results]) |
101 return d | 140 return d |
102 | 141 |
103 | 142 |
104 def getSubscriptions(self, entity): | 143 def getSubscriptions(self, entity): |
105 d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription | 144 def toSubscriptions(rows): |
106 FROM entities JOIN subscriptions ON | 145 subscriptions = [] |
107 (subscriptions.entity_id=entities.id) | 146 for row in rows: |
108 JOIN nodes ON | 147 subscriber = jid.internJID('%s/%s' % (row.jid, |
109 (nodes.id=subscriptions.node_id) | 148 row.resource)) |
149 subscription = Subscription(row.node, subscriber, row.state) | |
150 subscriptions.append(subscription) | |
151 return subscriptions | |
152 | |
153 d = self.dbpool.runQuery("""SELECT node, jid, resource, state | |
154 FROM entities | |
155 NATURAL JOIN subscriptions | |
156 NATURAL JOIN nodes | |
110 WHERE jid=%s""", | 157 WHERE jid=%s""", |
111 (entity.userhost(),)) | 158 (entity.userhost(),)) |
112 d.addCallback(self._convertSubscriptionJIDs) | 159 d.addCallback(toSubscriptions) |
113 return d | 160 return d |
114 | 161 |
115 | 162 |
116 def _convertSubscriptionJIDs(self, subscriptions): | 163 def getDefaultConfiguration(self, nodeType): |
117 return [(node, | 164 return self.defaultConfig[nodeType] |
118 jid.internJID('%s/%s' % (subscriber, resource)), | |
119 subscription) | |
120 for node, subscriber, resource, subscription in subscriptions] | |
121 | 165 |
122 | 166 |
123 | 167 |
124 class Node: | 168 class Node: |
125 | 169 |
129 self.nodeIdentifier = nodeIdentifier | 173 self.nodeIdentifier = nodeIdentifier |
130 self._config = config | 174 self._config = config |
131 | 175 |
132 | 176 |
133 def _checkNodeExists(self, cursor): | 177 def _checkNodeExists(self, cursor): |
134 cursor.execute("""SELECT id FROM nodes WHERE node=%s""", | 178 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", |
135 (self.nodeIdentifier)) | 179 (self.nodeIdentifier)) |
136 if not cursor.fetchone(): | 180 if not cursor.fetchone(): |
137 raise error.NodeNotFound() | 181 raise error.NodeNotFound() |
138 | 182 |
139 | 183 |
157 return d | 201 return d |
158 | 202 |
159 | 203 |
160 def _setConfiguration(self, cursor, config): | 204 def _setConfiguration(self, cursor, config): |
161 self._checkNodeExists(cursor) | 205 self._checkNodeExists(cursor) |
162 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s, | 206 cursor.execute("""UPDATE nodes SET persist_items=%s, |
207 deliver_payloads=%s, | |
163 send_last_published_item=%s | 208 send_last_published_item=%s |
164 WHERE node=%s""", | 209 WHERE node=%s""", |
165 (config["pubsub#persist_items"], | 210 (config["pubsub#persist_items"], |
166 config["pubsub#deliver_payloads"], | 211 config["pubsub#deliver_payloads"], |
167 config["pubsub#send_last_published_item"], | 212 config["pubsub#send_last_published_item"], |
183 | 228 |
184 | 229 |
185 def _getAffiliation(self, cursor, entity): | 230 def _getAffiliation(self, cursor, entity): |
186 self._checkNodeExists(cursor) | 231 self._checkNodeExists(cursor) |
187 cursor.execute("""SELECT affiliation FROM affiliations | 232 cursor.execute("""SELECT affiliation FROM affiliations |
188 JOIN nodes ON (node_id=nodes.id) | 233 NATURAL JOIN nodes |
189 JOIN entities ON (entity_id=entities.id) | 234 NATURAL JOIN entities |
190 WHERE node=%s AND jid=%s""", | 235 WHERE node=%s AND jid=%s""", |
191 (self.nodeIdentifier, | 236 (self.nodeIdentifier, |
192 entity.userhost())) | 237 entity.userhost())) |
193 | 238 |
194 try: | 239 try: |
205 self._checkNodeExists(cursor) | 250 self._checkNodeExists(cursor) |
206 | 251 |
207 userhost = subscriber.userhost() | 252 userhost = subscriber.userhost() |
208 resource = subscriber.resource or '' | 253 resource = subscriber.resource or '' |
209 | 254 |
210 cursor.execute("""SELECT subscription FROM subscriptions | 255 cursor.execute("""SELECT state FROM subscriptions |
211 JOIN nodes ON (nodes.id=subscriptions.node_id) | 256 NATURAL JOIN nodes |
212 JOIN entities ON | 257 NATURAL JOIN entities |
213 (entities.id=subscriptions.entity_id) | |
214 WHERE node=%s AND jid=%s AND resource=%s""", | 258 WHERE node=%s AND jid=%s AND resource=%s""", |
215 (self.nodeIdentifier, | 259 (self.nodeIdentifier, |
216 userhost, | 260 userhost, |
217 resource)) | 261 resource)) |
218 try: | 262 row = cursor.fetchone() |
219 return cursor.fetchone()[0] | 263 if not row: |
220 except TypeError: | |
221 return None | 264 return None |
222 | 265 else: |
223 | 266 return Subscription(self.nodeIdentifier, subscriber, row.state) |
224 def addSubscription(self, subscriber, state): | 267 |
268 | |
269 def getSubscriptions(self, state=None): | |
270 return self.dbpool.runInteraction(self._getSubscriptions, state) | |
271 | |
272 | |
273 def _getSubscriptions(self, cursor, state): | |
274 self._checkNodeExists(cursor) | |
275 | |
276 query = """SELECT jid, resource, state, | |
277 subscription_type, subscription_depth | |
278 FROM subscriptions | |
279 NATURAL JOIN nodes | |
280 NATURAL JOIN entities | |
281 WHERE node=%s"""; | |
282 values = [self.nodeIdentifier] | |
283 | |
284 if state: | |
285 query += " AND state=%s" | |
286 values.append(state) | |
287 | |
288 cursor.execute(query, values); | |
289 rows = cursor.fetchall() | |
290 | |
291 subscriptions = [] | |
292 for row in rows: | |
293 subscriber = jid.JID('%s/%s' % (row.jid, row.resource)) | |
294 | |
295 options = {} | |
296 if row.subscription_type: | |
297 options['pubsub#subscription_type'] = row.subscription_type; | |
298 if row.subscription_depth: | |
299 options['pubsub#subscription_depth'] = row.subscription_depth; | |
300 | |
301 subscriptions.append(Subscription(self.nodeIdentifier, subscriber, | |
302 row.state, options)) | |
303 | |
304 return subscriptions | |
305 | |
306 | |
307 def addSubscription(self, subscriber, state, config): | |
225 return self.dbpool.runInteraction(self._addSubscription, subscriber, | 308 return self.dbpool.runInteraction(self._addSubscription, subscriber, |
226 state) | 309 state, config) |
227 | 310 |
228 | 311 |
229 def _addSubscription(self, cursor, subscriber, state): | 312 def _addSubscription(self, cursor, subscriber, state, config): |
230 self._checkNodeExists(cursor) | 313 self._checkNodeExists(cursor) |
231 | 314 |
232 userhost = subscriber.userhost() | 315 userhost = subscriber.userhost() |
233 resource = subscriber.resource or '' | 316 resource = subscriber.resource or '' |
317 | |
318 subscription_type = config.get('pubsub#subscription_type') | |
319 subscription_depth = config.get('pubsub#subscription_depth') | |
234 | 320 |
235 try: | 321 try: |
236 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 322 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
237 (userhost)) | 323 (userhost)) |
238 except cursor._pool.dbapi.OperationalError: | 324 except cursor._pool.dbapi.OperationalError: |
239 pass | 325 pass |
240 | 326 |
241 try: | 327 try: |
242 cursor.execute("""INSERT INTO subscriptions | 328 cursor.execute("""INSERT INTO subscriptions |
243 (node_id, entity_id, resource, subscription) | 329 (node_id, entity_id, resource, state, |
244 SELECT n.id, e.id, %s, %s FROM | 330 subscription_type, subscription_depth) |
245 (SELECT id FROM nodes WHERE node=%s) AS n | 331 SELECT node_id, entity_id, %s, %s, %s, %s FROM |
332 (SELECT node_id FROM nodes | |
333 WHERE node=%s) as n | |
246 CROSS JOIN | 334 CROSS JOIN |
247 (SELECT id FROM entities WHERE jid=%s) AS e""", | 335 (SELECT entity_id FROM entities |
336 WHERE jid=%s) as e""", | |
248 (resource, | 337 (resource, |
249 state, | 338 state, |
339 subscription_type, | |
340 subscription_depth, | |
250 self.nodeIdentifier, | 341 self.nodeIdentifier, |
251 userhost)) | 342 userhost)) |
252 except cursor._pool.dbapi.OperationalError: | 343 except cursor._pool.dbapi.OperationalError: |
253 raise error.SubscriptionExists() | 344 raise error.SubscriptionExists() |
254 | 345 |
263 | 354 |
264 userhost = subscriber.userhost() | 355 userhost = subscriber.userhost() |
265 resource = subscriber.resource or '' | 356 resource = subscriber.resource or '' |
266 | 357 |
267 cursor.execute("""DELETE FROM subscriptions WHERE | 358 cursor.execute("""DELETE FROM subscriptions WHERE |
268 node_id=(SELECT id FROM nodes WHERE node=%s) AND | 359 node_id=(SELECT node_id FROM nodes |
269 entity_id=(SELECT id FROM entities WHERE jid=%s) | 360 WHERE node=%s) AND |
270 AND resource=%s""", | 361 entity_id=(SELECT entity_id FROM entities |
362 WHERE jid=%s) AND | |
363 resource=%s""", | |
271 (self.nodeIdentifier, | 364 (self.nodeIdentifier, |
272 userhost, | 365 userhost, |
273 resource)) | 366 resource)) |
274 if cursor.rowcount != 1: | 367 if cursor.rowcount != 1: |
275 raise error.NotSubscribed() | 368 raise error.NotSubscribed() |
276 | 369 |
277 return None | 370 return None |
278 | 371 |
279 | 372 |
280 def getSubscribers(self): | |
281 d = self.dbpool.runInteraction(self._getSubscribers) | |
282 d.addCallback(self._convertToJIDs) | |
283 return d | |
284 | |
285 | |
286 def _getSubscribers(self, cursor): | |
287 self._checkNodeExists(cursor) | |
288 cursor.execute("""SELECT jid, resource FROM subscriptions | |
289 JOIN nodes ON (node_id=nodes.id) | |
290 JOIN entities ON (entity_id=entities.id) | |
291 WHERE node=%s AND | |
292 subscription='subscribed'""", | |
293 (self.nodeIdentifier,)) | |
294 return cursor.fetchall() | |
295 | |
296 | |
297 def _convertToJIDs(self, list): | |
298 return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list] | |
299 | |
300 | |
301 def isSubscribed(self, entity): | 373 def isSubscribed(self, entity): |
302 return self.dbpool.runInteraction(self._isSubscribed, entity) | 374 return self.dbpool.runInteraction(self._isSubscribed, entity) |
303 | 375 |
304 | 376 |
305 def _isSubscribed(self, cursor, entity): | 377 def _isSubscribed(self, cursor, entity): |
306 self._checkNodeExists(cursor) | 378 self._checkNodeExists(cursor) |
307 | 379 |
308 cursor.execute("""SELECT 1 FROM entities | 380 cursor.execute("""SELECT 1 FROM entities |
309 JOIN subscriptions ON | 381 NATURAL JOIN subscriptions |
310 (entities.id=subscriptions.entity_id) | 382 NATURAL JOIN nodes |
311 JOIN nodes ON | |
312 (nodes.id=subscriptions.node_id) | |
313 WHERE entities.jid=%s | 383 WHERE entities.jid=%s |
314 AND node=%s AND subscription='subscribed'""", | 384 AND node=%s AND state='subscribed'""", |
315 (entity.userhost(), | 385 (entity.userhost(), |
316 self.nodeIdentifier)) | 386 self.nodeIdentifier)) |
317 | 387 |
318 return cursor.fetchone() is not None | 388 return cursor.fetchone() is not None |
319 | 389 |
324 | 394 |
325 def _getAffiliations(self, cursor): | 395 def _getAffiliations(self, cursor): |
326 self._checkNodeExists(cursor) | 396 self._checkNodeExists(cursor) |
327 | 397 |
328 cursor.execute("""SELECT jid, affiliation FROM nodes | 398 cursor.execute("""SELECT jid, affiliation FROM nodes |
329 JOIN affiliations ON | 399 NATURAL JOIN affiliations |
330 (nodes.id = affiliations.node_id) | 400 NATURAL JOIN entities |
331 JOIN entities ON | |
332 (affiliations.entity_id = entities.id) | |
333 WHERE node=%s""", | 401 WHERE node=%s""", |
334 self.nodeIdentifier) | 402 self.nodeIdentifier) |
335 result = cursor.fetchall() | 403 result = cursor.fetchall() |
336 | 404 |
337 return [(jid.internJID(r[0]), r[1]) for r in result] | 405 return [(jid.internJID(r[0]), r[1]) for r in result] |
338 | 406 |
339 | 407 |
340 | 408 |
341 class LeafNodeMixin: | 409 class LeafNode(Node): |
410 | |
411 implements(iidavoll.ILeafNode) | |
342 | 412 |
343 nodeType = 'leaf' | 413 nodeType = 'leaf' |
344 | 414 |
345 def storeItems(self, items, publisher): | 415 def storeItems(self, items, publisher): |
346 return self.dbpool.runInteraction(self._storeItems, items, publisher) | 416 return self.dbpool.runInteraction(self._storeItems, items, publisher) |
354 | 424 |
355 def _storeItem(self, cursor, item, publisher): | 425 def _storeItem(self, cursor, item, publisher): |
356 data = item.toXml() | 426 data = item.toXml() |
357 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s | 427 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s |
358 FROM nodes | 428 FROM nodes |
359 WHERE nodes.id = items.node_id AND | 429 WHERE nodes.node_id = items.node_id AND |
360 nodes.node = %s and items.item=%s""", | 430 nodes.node = %s and items.item=%s""", |
361 (publisher.full(), | 431 (publisher.full(), |
362 data, | 432 data, |
363 self.nodeIdentifier, | 433 self.nodeIdentifier, |
364 item["id"])) | 434 item["id"])) |
365 if cursor.rowcount == 1: | 435 if cursor.rowcount == 1: |
366 return | 436 return |
367 | 437 |
368 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) | 438 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) |
369 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", | 439 SELECT node_id, %s, %s, %s FROM nodes |
440 WHERE node=%s""", | |
370 (item["id"], | 441 (item["id"], |
371 publisher.full(), | 442 publisher.full(), |
372 data, | 443 data, |
373 self.nodeIdentifier)) | 444 self.nodeIdentifier)) |
374 | 445 |
382 | 453 |
383 deleted = [] | 454 deleted = [] |
384 | 455 |
385 for itemIdentifier in itemIdentifiers: | 456 for itemIdentifier in itemIdentifiers: |
386 cursor.execute("""DELETE FROM items WHERE | 457 cursor.execute("""DELETE FROM items WHERE |
387 node_id=(SELECT id FROM nodes WHERE node=%s) AND | 458 node_id=(SELECT node_id FROM nodes |
459 WHERE node=%s) AND | |
388 item=%s""", | 460 item=%s""", |
389 (self.nodeIdentifier, | 461 (self.nodeIdentifier, |
390 itemIdentifier)) | 462 itemIdentifier)) |
391 | 463 |
392 if cursor.rowcount: | 464 if cursor.rowcount: |
399 return self.dbpool.runInteraction(self._getItems, maxItems) | 471 return self.dbpool.runInteraction(self._getItems, maxItems) |
400 | 472 |
401 | 473 |
402 def _getItems(self, cursor, maxItems): | 474 def _getItems(self, cursor, maxItems): |
403 self._checkNodeExists(cursor) | 475 self._checkNodeExists(cursor) |
404 query = """SELECT data FROM nodes JOIN items ON | 476 query = """SELECT data FROM nodes |
405 (nodes.id=items.node_id) | 477 NATURAL JOIN items |
406 WHERE node=%s ORDER BY date DESC""" | 478 WHERE node=%s ORDER BY date DESC""" |
407 if maxItems: | 479 if maxItems: |
408 cursor.execute(query + " LIMIT %s", | 480 cursor.execute(query + " LIMIT %s", |
409 (self.nodeIdentifier, | 481 (self.nodeIdentifier, |
410 maxItems)) | 482 maxItems)) |
411 else: | 483 else: |
412 cursor.execute(query, (self.nodeIdentifier)) | 484 cursor.execute(query, (self.nodeIdentifier)) |
413 | 485 |
414 result = cursor.fetchall() | 486 result = cursor.fetchall() |
415 return [parseXml(r[0]) for r in result] | 487 items = [stripNamespace(parseXml(r[0])) for r in result] |
488 return items | |
416 | 489 |
417 | 490 |
418 def getItemsById(self, itemIdentifiers): | 491 def getItemsById(self, itemIdentifiers): |
419 return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers) | 492 return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers) |
420 | 493 |
421 | 494 |
422 def _getItemsById(self, cursor, itemIdentifiers): | 495 def _getItemsById(self, cursor, itemIdentifiers): |
423 self._checkNodeExists(cursor) | 496 self._checkNodeExists(cursor) |
424 items = [] | 497 items = [] |
425 for itemIdentifier in itemIdentifiers: | 498 for itemIdentifier in itemIdentifiers: |
426 cursor.execute("""SELECT data FROM nodes JOIN items ON | 499 cursor.execute("""SELECT data FROM nodes |
427 (nodes.id=items.node_id) | 500 NATURAL JOIN items |
428 WHERE node=%s AND item=%s""", | 501 WHERE node=%s AND item=%s""", |
429 (self.nodeIdentifier, | 502 (self.nodeIdentifier, |
430 itemIdentifier)) | 503 itemIdentifier)) |
431 result = cursor.fetchone() | 504 result = cursor.fetchone() |
432 if result: | 505 if result: |
440 | 513 |
441 def _purge(self, cursor): | 514 def _purge(self, cursor): |
442 self._checkNodeExists(cursor) | 515 self._checkNodeExists(cursor) |
443 | 516 |
444 cursor.execute("""DELETE FROM items WHERE | 517 cursor.execute("""DELETE FROM items WHERE |
445 node_id=(SELECT id FROM nodes WHERE node=%s)""", | 518 node_id=(SELECT node_id FROM nodes WHERE node=%s)""", |
446 (self.nodeIdentifier,)) | 519 (self.nodeIdentifier,)) |
447 | 520 |
448 | 521 |
449 | 522 class CollectionNode(Node): |
450 class LeafNode(Node, LeafNodeMixin): | 523 |
451 | 524 nodeType = 'collection' |
452 implements(iidavoll.ILeafNode) | |
453 | 525 |
454 | 526 |
455 | 527 |
456 class GatewayStorage(object): | 528 class GatewayStorage(object): |
457 """ | 529 """ |
480 WHERE service=%s and node=%s and uri=%s""", | 552 WHERE service=%s and node=%s and uri=%s""", |
481 service.full(), | 553 service.full(), |
482 nodeIdentifier, | 554 nodeIdentifier, |
483 callback) | 555 callback) |
484 if cursor.fetchall(): | 556 if cursor.fetchall(): |
485 raise error.SubscriptionExists() | 557 return |
486 | 558 |
487 cursor.execute("""INSERT INTO callbacks | 559 cursor.execute("""INSERT INTO callbacks |
488 (service, node, uri) VALUES | 560 (service, node, uri) VALUES |
489 (%s, %s, %s)""", | 561 (%s, %s, %s)""", |
490 service.full(), | 562 service.full(), |