comparison idavoll/pgsql_storage.py @ 206:274a45d2a5ab

Implement root collection that includes all leaf nodes.
author Ralph Meijer <ralphm@ik.nu>
date Mon, 04 Aug 2008 13:47:10 +0000
parents b4bf0a5ce50d
children 8540825f85e0
comparison
equal deleted inserted replaced
205:e6b710bf2b24 206:274a45d2a5ab
2 # See LICENSE for details. 2 # See LICENSE for details.
3 3
4 import copy 4 import copy
5 5
6 from zope.interface import implements 6 from zope.interface import implements
7
8 from twisted.enterprise import adbapi
7 from twisted.words.protocols.jabber import jid 9 from twisted.words.protocols.jabber import jid
8 from wokkel.generic import parseXml 10
11 from wokkel.generic import parseXml, stripNamespace
12 from wokkel.pubsub import Subscription
9 13
10 from idavoll import error, iidavoll 14 from idavoll import error, iidavoll
11 15
12 class Storage: 16 class Storage:
13 17
14 implements(iidavoll.IStorage) 18 implements(iidavoll.IStorage)
15 19
20 defaultConfig = {
21 'leaf': {
22 "pubsub#persist_items": True,
23 "pubsub#deliver_payloads": True,
24 "pubsub#send_last_published_item": 'on_sub',
25 },
26 'collection': {
27 "pubsub#deliver_payloads": True,
28 "pubsub#send_last_published_item": 'on_sub',
29 }
30 }
16 31
17 def __init__(self, dbpool): 32 def __init__(self, dbpool):
18 self.dbpool = dbpool 33 self.dbpool = dbpool
19 34
20 35
22 return self.dbpool.runInteraction(self._getNode, nodeIdentifier) 37 return self.dbpool.runInteraction(self._getNode, nodeIdentifier)
23 38
24 39
25 def _getNode(self, cursor, nodeIdentifier): 40 def _getNode(self, cursor, nodeIdentifier):
26 configuration = {} 41 configuration = {}
27 cursor.execute("""SELECT persistent, deliver_payload, 42 cursor.execute("""SELECT node_type,
43 persist_items,
44 deliver_payloads,
28 send_last_published_item 45 send_last_published_item
29 FROM nodes 46 FROM nodes
30 WHERE node=%s""", 47 WHERE node=%s""",
31 (nodeIdentifier,)) 48 (nodeIdentifier,))
32 try: 49 row = cursor.fetchone()
33 (configuration["pubsub#persist_items"], 50
34 configuration["pubsub#deliver_payloads"], 51 if not row:
35 configuration["pubsub#send_last_published_item"]) = \
36 cursor.fetchone()
37 except TypeError:
38 raise error.NodeNotFound() 52 raise error.NodeNotFound()
39 else: 53
54 if row.node_type == 'leaf':
55 configuration = {
56 'pubsub#persist_items': row.persist_items,
57 'pubsub#deliver_payloads': row.deliver_payloads,
58 'pubsub#send_last_published_item':
59 row.send_last_published_item}
40 node = LeafNode(nodeIdentifier, configuration) 60 node = LeafNode(nodeIdentifier, configuration)
41 node.dbpool = self.dbpool 61 node.dbpool = self.dbpool
42 return node 62 return node
63 elif row.node_type == 'collection':
64 configuration = {
65 'pubsub#deliver_payloads': row.deliver_payloads,
66 'pubsub#send_last_published_item':
67 row.send_last_published_item}
68 node = CollectionNode(nodeIdentifier, configuration)
69 node.dbpool = self.dbpool
70 return node
71
43 72
44 73
45 def getNodeIds(self): 74 def getNodeIds(self):
46 d = self.dbpool.runQuery("""SELECT node from nodes""") 75 d = self.dbpool.runQuery("""SELECT node from nodes""")
47 d.addCallback(lambda results: [r[0] for r in results]) 76 d.addCallback(lambda results: [r[0] for r in results])
48 return d 77 return d
49 78
50 79
51 def createNode(self, nodeIdentifier, owner, config=None): 80 def createNode(self, nodeIdentifier, owner, config):
52 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, 81 return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
53 owner) 82 owner, config)
54 83
55 84
56 def _createNode(self, cursor, nodeIdentifier, owner): 85 def _createNode(self, cursor, nodeIdentifier, owner, config):
86 if config['pubsub#node_type'] != 'leaf':
87 raise error.NoCollections()
88
57 owner = owner.userhost() 89 owner = owner.userhost()
58 try: 90 try:
59 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", 91 cursor.execute("""INSERT INTO nodes
60 (nodeIdentifier)) 92 (node, node_type, persist_items,
93 deliver_payloads, send_last_published_item)
94 VALUES
95 (%s, 'leaf', %s, %s, %s)""",
96 (nodeIdentifier,
97 config['pubsub#persist_items'],
98 config['pubsub#deliver_payloads'],
99 config['pubsub#send_last_published_item'])
100 )
61 except cursor._pool.dbapi.OperationalError: 101 except cursor._pool.dbapi.OperationalError:
62 raise error.NodeExists() 102 raise error.NodeExists()
63 103
64 cursor.execute("""SELECT 1 from entities where jid=%s""", 104 cursor.execute("""SELECT 1 from entities where jid=%s""",
65 (owner)) 105 (owner))
68 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", 108 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
69 (owner)) 109 (owner))
70 110
71 cursor.execute("""INSERT INTO affiliations 111 cursor.execute("""INSERT INTO affiliations
72 (node_id, entity_id, affiliation) 112 (node_id, entity_id, affiliation)
73 SELECT n.id, e.id, 'owner' FROM 113 SELECT node_id, entity_id, 'owner' FROM
74 (SELECT id FROM nodes WHERE node=%s) AS n 114 (SELECT node_id FROM nodes WHERE node=%s) as n
75 CROSS JOIN 115 CROSS JOIN
76 (SELECT id FROM entities WHERE jid=%s) AS e""", 116 (SELECT entity_id FROM entities
117 WHERE jid=%s) as e""",
77 (nodeIdentifier, owner)) 118 (nodeIdentifier, owner))
78 119
79 120
80 def deleteNode(self, nodeIdentifier): 121 def deleteNode(self, nodeIdentifier):
81 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) 122 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier)
89 raise error.NodeNotFound() 130 raise error.NodeNotFound()
90 131
91 132
92 def getAffiliations(self, entity): 133 def getAffiliations(self, entity):
93 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities 134 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities
94 JOIN affiliations ON 135 NATURAL JOIN affiliations
95 (affiliations.entity_id=entities.id) 136 NATURAL JOIN nodes
96 JOIN nodes ON
97 (nodes.id=affiliations.node_id)
98 WHERE jid=%s""", 137 WHERE jid=%s""",
99 (entity.userhost(),)) 138 (entity.userhost(),))
100 d.addCallback(lambda results: [tuple(r) for r in results]) 139 d.addCallback(lambda results: [tuple(r) for r in results])
101 return d 140 return d
102 141
103 142
104 def getSubscriptions(self, entity): 143 def getSubscriptions(self, entity):
105 d = self.dbpool.runQuery("""SELECT node, jid, resource, subscription 144 def toSubscriptions(rows):
106 FROM entities JOIN subscriptions ON 145 subscriptions = []
107 (subscriptions.entity_id=entities.id) 146 for row in rows:
108 JOIN nodes ON 147 subscriber = jid.internJID('%s/%s' % (row.jid,
109 (nodes.id=subscriptions.node_id) 148 row.resource))
149 subscription = Subscription(row.node, subscriber, row.state)
150 subscriptions.append(subscription)
151 return subscriptions
152
153 d = self.dbpool.runQuery("""SELECT node, jid, resource, state
154 FROM entities
155 NATURAL JOIN subscriptions
156 NATURAL JOIN nodes
110 WHERE jid=%s""", 157 WHERE jid=%s""",
111 (entity.userhost(),)) 158 (entity.userhost(),))
112 d.addCallback(self._convertSubscriptionJIDs) 159 d.addCallback(toSubscriptions)
113 return d 160 return d
114 161
115 162
116 def _convertSubscriptionJIDs(self, subscriptions): 163 def getDefaultConfiguration(self, nodeType):
117 return [(node, 164 return self.defaultConfig[nodeType]
118 jid.internJID('%s/%s' % (subscriber, resource)),
119 subscription)
120 for node, subscriber, resource, subscription in subscriptions]
121 165
122 166
123 167
124 class Node: 168 class Node:
125 169
129 self.nodeIdentifier = nodeIdentifier 173 self.nodeIdentifier = nodeIdentifier
130 self._config = config 174 self._config = config
131 175
132 176
133 def _checkNodeExists(self, cursor): 177 def _checkNodeExists(self, cursor):
134 cursor.execute("""SELECT id FROM nodes WHERE node=%s""", 178 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""",
135 (self.nodeIdentifier)) 179 (self.nodeIdentifier))
136 if not cursor.fetchone(): 180 if not cursor.fetchone():
137 raise error.NodeNotFound() 181 raise error.NodeNotFound()
138 182
139 183
157 return d 201 return d
158 202
159 203
160 def _setConfiguration(self, cursor, config): 204 def _setConfiguration(self, cursor, config):
161 self._checkNodeExists(cursor) 205 self._checkNodeExists(cursor)
162 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s, 206 cursor.execute("""UPDATE nodes SET persist_items=%s,
207 deliver_payloads=%s,
163 send_last_published_item=%s 208 send_last_published_item=%s
164 WHERE node=%s""", 209 WHERE node=%s""",
165 (config["pubsub#persist_items"], 210 (config["pubsub#persist_items"],
166 config["pubsub#deliver_payloads"], 211 config["pubsub#deliver_payloads"],
167 config["pubsub#send_last_published_item"], 212 config["pubsub#send_last_published_item"],
183 228
184 229
185 def _getAffiliation(self, cursor, entity): 230 def _getAffiliation(self, cursor, entity):
186 self._checkNodeExists(cursor) 231 self._checkNodeExists(cursor)
187 cursor.execute("""SELECT affiliation FROM affiliations 232 cursor.execute("""SELECT affiliation FROM affiliations
188 JOIN nodes ON (node_id=nodes.id) 233 NATURAL JOIN nodes
189 JOIN entities ON (entity_id=entities.id) 234 NATURAL JOIN entities
190 WHERE node=%s AND jid=%s""", 235 WHERE node=%s AND jid=%s""",
191 (self.nodeIdentifier, 236 (self.nodeIdentifier,
192 entity.userhost())) 237 entity.userhost()))
193 238
194 try: 239 try:
205 self._checkNodeExists(cursor) 250 self._checkNodeExists(cursor)
206 251
207 userhost = subscriber.userhost() 252 userhost = subscriber.userhost()
208 resource = subscriber.resource or '' 253 resource = subscriber.resource or ''
209 254
210 cursor.execute("""SELECT subscription FROM subscriptions 255 cursor.execute("""SELECT state FROM subscriptions
211 JOIN nodes ON (nodes.id=subscriptions.node_id) 256 NATURAL JOIN nodes
212 JOIN entities ON 257 NATURAL JOIN entities
213 (entities.id=subscriptions.entity_id)
214 WHERE node=%s AND jid=%s AND resource=%s""", 258 WHERE node=%s AND jid=%s AND resource=%s""",
215 (self.nodeIdentifier, 259 (self.nodeIdentifier,
216 userhost, 260 userhost,
217 resource)) 261 resource))
218 try: 262 row = cursor.fetchone()
219 return cursor.fetchone()[0] 263 if not row:
220 except TypeError:
221 return None 264 return None
222 265 else:
223 266 return Subscription(self.nodeIdentifier, subscriber, row.state)
224 def addSubscription(self, subscriber, state): 267
268
269 def getSubscriptions(self, state=None):
270 return self.dbpool.runInteraction(self._getSubscriptions, state)
271
272
273 def _getSubscriptions(self, cursor, state):
274 self._checkNodeExists(cursor)
275
276 query = """SELECT jid, resource, state,
277 subscription_type, subscription_depth
278 FROM subscriptions
279 NATURAL JOIN nodes
280 NATURAL JOIN entities
281 WHERE node=%s""";
282 values = [self.nodeIdentifier]
283
284 if state:
285 query += " AND state=%s"
286 values.append(state)
287
288 cursor.execute(query, values);
289 rows = cursor.fetchall()
290
291 subscriptions = []
292 for row in rows:
293 subscriber = jid.JID('%s/%s' % (row.jid, row.resource))
294
295 options = {}
296 if row.subscription_type:
297 options['pubsub#subscription_type'] = row.subscription_type;
298 if row.subscription_depth:
299 options['pubsub#subscription_depth'] = row.subscription_depth;
300
301 subscriptions.append(Subscription(self.nodeIdentifier, subscriber,
302 row.state, options))
303
304 return subscriptions
305
306
307 def addSubscription(self, subscriber, state, config):
225 return self.dbpool.runInteraction(self._addSubscription, subscriber, 308 return self.dbpool.runInteraction(self._addSubscription, subscriber,
226 state) 309 state, config)
227 310
228 311
229 def _addSubscription(self, cursor, subscriber, state): 312 def _addSubscription(self, cursor, subscriber, state, config):
230 self._checkNodeExists(cursor) 313 self._checkNodeExists(cursor)
231 314
232 userhost = subscriber.userhost() 315 userhost = subscriber.userhost()
233 resource = subscriber.resource or '' 316 resource = subscriber.resource or ''
317
318 subscription_type = config.get('pubsub#subscription_type')
319 subscription_depth = config.get('pubsub#subscription_depth')
234 320
235 try: 321 try:
236 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", 322 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
237 (userhost)) 323 (userhost))
238 except cursor._pool.dbapi.OperationalError: 324 except cursor._pool.dbapi.OperationalError:
239 pass 325 pass
240 326
241 try: 327 try:
242 cursor.execute("""INSERT INTO subscriptions 328 cursor.execute("""INSERT INTO subscriptions
243 (node_id, entity_id, resource, subscription) 329 (node_id, entity_id, resource, state,
244 SELECT n.id, e.id, %s, %s FROM 330 subscription_type, subscription_depth)
245 (SELECT id FROM nodes WHERE node=%s) AS n 331 SELECT node_id, entity_id, %s, %s, %s, %s FROM
332 (SELECT node_id FROM nodes
333 WHERE node=%s) as n
246 CROSS JOIN 334 CROSS JOIN
247 (SELECT id FROM entities WHERE jid=%s) AS e""", 335 (SELECT entity_id FROM entities
336 WHERE jid=%s) as e""",
248 (resource, 337 (resource,
249 state, 338 state,
339 subscription_type,
340 subscription_depth,
250 self.nodeIdentifier, 341 self.nodeIdentifier,
251 userhost)) 342 userhost))
252 except cursor._pool.dbapi.OperationalError: 343 except cursor._pool.dbapi.OperationalError:
253 raise error.SubscriptionExists() 344 raise error.SubscriptionExists()
254 345
263 354
264 userhost = subscriber.userhost() 355 userhost = subscriber.userhost()
265 resource = subscriber.resource or '' 356 resource = subscriber.resource or ''
266 357
267 cursor.execute("""DELETE FROM subscriptions WHERE 358 cursor.execute("""DELETE FROM subscriptions WHERE
268 node_id=(SELECT id FROM nodes WHERE node=%s) AND 359 node_id=(SELECT node_id FROM nodes
269 entity_id=(SELECT id FROM entities WHERE jid=%s) 360 WHERE node=%s) AND
270 AND resource=%s""", 361 entity_id=(SELECT entity_id FROM entities
362 WHERE jid=%s) AND
363 resource=%s""",
271 (self.nodeIdentifier, 364 (self.nodeIdentifier,
272 userhost, 365 userhost,
273 resource)) 366 resource))
274 if cursor.rowcount != 1: 367 if cursor.rowcount != 1:
275 raise error.NotSubscribed() 368 raise error.NotSubscribed()
276 369
277 return None 370 return None
278 371
279 372
280 def getSubscribers(self):
281 d = self.dbpool.runInteraction(self._getSubscribers)
282 d.addCallback(self._convertToJIDs)
283 return d
284
285
286 def _getSubscribers(self, cursor):
287 self._checkNodeExists(cursor)
288 cursor.execute("""SELECT jid, resource FROM subscriptions
289 JOIN nodes ON (node_id=nodes.id)
290 JOIN entities ON (entity_id=entities.id)
291 WHERE node=%s AND
292 subscription='subscribed'""",
293 (self.nodeIdentifier,))
294 return cursor.fetchall()
295
296
297 def _convertToJIDs(self, list):
298 return [jid.internJID("%s/%s" % (l[0], l[1])) for l in list]
299
300
301 def isSubscribed(self, entity): 373 def isSubscribed(self, entity):
302 return self.dbpool.runInteraction(self._isSubscribed, entity) 374 return self.dbpool.runInteraction(self._isSubscribed, entity)
303 375
304 376
305 def _isSubscribed(self, cursor, entity): 377 def _isSubscribed(self, cursor, entity):
306 self._checkNodeExists(cursor) 378 self._checkNodeExists(cursor)
307 379
308 cursor.execute("""SELECT 1 FROM entities 380 cursor.execute("""SELECT 1 FROM entities
309 JOIN subscriptions ON 381 NATURAL JOIN subscriptions
310 (entities.id=subscriptions.entity_id) 382 NATURAL JOIN nodes
311 JOIN nodes ON
312 (nodes.id=subscriptions.node_id)
313 WHERE entities.jid=%s 383 WHERE entities.jid=%s
314 AND node=%s AND subscription='subscribed'""", 384 AND node=%s AND state='subscribed'""",
315 (entity.userhost(), 385 (entity.userhost(),
316 self.nodeIdentifier)) 386 self.nodeIdentifier))
317 387
318 return cursor.fetchone() is not None 388 return cursor.fetchone() is not None
319 389
324 394
325 def _getAffiliations(self, cursor): 395 def _getAffiliations(self, cursor):
326 self._checkNodeExists(cursor) 396 self._checkNodeExists(cursor)
327 397
328 cursor.execute("""SELECT jid, affiliation FROM nodes 398 cursor.execute("""SELECT jid, affiliation FROM nodes
329 JOIN affiliations ON 399 NATURAL JOIN affiliations
330 (nodes.id = affiliations.node_id) 400 NATURAL JOIN entities
331 JOIN entities ON
332 (affiliations.entity_id = entities.id)
333 WHERE node=%s""", 401 WHERE node=%s""",
334 self.nodeIdentifier) 402 self.nodeIdentifier)
335 result = cursor.fetchall() 403 result = cursor.fetchall()
336 404
337 return [(jid.internJID(r[0]), r[1]) for r in result] 405 return [(jid.internJID(r[0]), r[1]) for r in result]
338 406
339 407
340 408
341 class LeafNodeMixin: 409 class LeafNode(Node):
410
411 implements(iidavoll.ILeafNode)
342 412
343 nodeType = 'leaf' 413 nodeType = 'leaf'
344 414
345 def storeItems(self, items, publisher): 415 def storeItems(self, items, publisher):
346 return self.dbpool.runInteraction(self._storeItems, items, publisher) 416 return self.dbpool.runInteraction(self._storeItems, items, publisher)
354 424
355 def _storeItem(self, cursor, item, publisher): 425 def _storeItem(self, cursor, item, publisher):
356 data = item.toXml() 426 data = item.toXml()
357 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s 427 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
358 FROM nodes 428 FROM nodes
359 WHERE nodes.id = items.node_id AND 429 WHERE nodes.node_id = items.node_id AND
360 nodes.node = %s and items.item=%s""", 430 nodes.node = %s and items.item=%s""",
361 (publisher.full(), 431 (publisher.full(),
362 data, 432 data,
363 self.nodeIdentifier, 433 self.nodeIdentifier,
364 item["id"])) 434 item["id"]))
365 if cursor.rowcount == 1: 435 if cursor.rowcount == 1:
366 return 436 return
367 437
368 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) 438 cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
369 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", 439 SELECT node_id, %s, %s, %s FROM nodes
440 WHERE node=%s""",
370 (item["id"], 441 (item["id"],
371 publisher.full(), 442 publisher.full(),
372 data, 443 data,
373 self.nodeIdentifier)) 444 self.nodeIdentifier))
374 445
382 453
383 deleted = [] 454 deleted = []
384 455
385 for itemIdentifier in itemIdentifiers: 456 for itemIdentifier in itemIdentifiers:
386 cursor.execute("""DELETE FROM items WHERE 457 cursor.execute("""DELETE FROM items WHERE
387 node_id=(SELECT id FROM nodes WHERE node=%s) AND 458 node_id=(SELECT node_id FROM nodes
459 WHERE node=%s) AND
388 item=%s""", 460 item=%s""",
389 (self.nodeIdentifier, 461 (self.nodeIdentifier,
390 itemIdentifier)) 462 itemIdentifier))
391 463
392 if cursor.rowcount: 464 if cursor.rowcount:
399 return self.dbpool.runInteraction(self._getItems, maxItems) 471 return self.dbpool.runInteraction(self._getItems, maxItems)
400 472
401 473
402 def _getItems(self, cursor, maxItems): 474 def _getItems(self, cursor, maxItems):
403 self._checkNodeExists(cursor) 475 self._checkNodeExists(cursor)
404 query = """SELECT data FROM nodes JOIN items ON 476 query = """SELECT data FROM nodes
405 (nodes.id=items.node_id) 477 NATURAL JOIN items
406 WHERE node=%s ORDER BY date DESC""" 478 WHERE node=%s ORDER BY date DESC"""
407 if maxItems: 479 if maxItems:
408 cursor.execute(query + " LIMIT %s", 480 cursor.execute(query + " LIMIT %s",
409 (self.nodeIdentifier, 481 (self.nodeIdentifier,
410 maxItems)) 482 maxItems))
411 else: 483 else:
412 cursor.execute(query, (self.nodeIdentifier)) 484 cursor.execute(query, (self.nodeIdentifier))
413 485
414 result = cursor.fetchall() 486 result = cursor.fetchall()
415 return [parseXml(r[0]) for r in result] 487 items = [stripNamespace(parseXml(r[0])) for r in result]
488 return items
416 489
417 490
418 def getItemsById(self, itemIdentifiers): 491 def getItemsById(self, itemIdentifiers):
419 return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers) 492 return self.dbpool.runInteraction(self._getItemsById, itemIdentifiers)
420 493
421 494
422 def _getItemsById(self, cursor, itemIdentifiers): 495 def _getItemsById(self, cursor, itemIdentifiers):
423 self._checkNodeExists(cursor) 496 self._checkNodeExists(cursor)
424 items = [] 497 items = []
425 for itemIdentifier in itemIdentifiers: 498 for itemIdentifier in itemIdentifiers:
426 cursor.execute("""SELECT data FROM nodes JOIN items ON 499 cursor.execute("""SELECT data FROM nodes
427 (nodes.id=items.node_id) 500 NATURAL JOIN items
428 WHERE node=%s AND item=%s""", 501 WHERE node=%s AND item=%s""",
429 (self.nodeIdentifier, 502 (self.nodeIdentifier,
430 itemIdentifier)) 503 itemIdentifier))
431 result = cursor.fetchone() 504 result = cursor.fetchone()
432 if result: 505 if result:
440 513
441 def _purge(self, cursor): 514 def _purge(self, cursor):
442 self._checkNodeExists(cursor) 515 self._checkNodeExists(cursor)
443 516
444 cursor.execute("""DELETE FROM items WHERE 517 cursor.execute("""DELETE FROM items WHERE
445 node_id=(SELECT id FROM nodes WHERE node=%s)""", 518 node_id=(SELECT node_id FROM nodes WHERE node=%s)""",
446 (self.nodeIdentifier,)) 519 (self.nodeIdentifier,))
447 520
448 521
449 522 class CollectionNode(Node):
450 class LeafNode(Node, LeafNodeMixin): 523
451 524 nodeType = 'collection'
452 implements(iidavoll.ILeafNode)
453 525
454 526
455 527
456 class GatewayStorage(object): 528 class GatewayStorage(object):
457 """ 529 """
480 WHERE service=%s and node=%s and uri=%s""", 552 WHERE service=%s and node=%s and uri=%s""",
481 service.full(), 553 service.full(),
482 nodeIdentifier, 554 nodeIdentifier,
483 callback) 555 callback)
484 if cursor.fetchall(): 556 if cursor.fetchall():
485 raise error.SubscriptionExists() 557 return
486 558
487 cursor.execute("""INSERT INTO callbacks 559 cursor.execute("""INSERT INTO callbacks
488 (service, node, uri) VALUES 560 (service, node, uri) VALUES
489 (%s, %s, %s)""", 561 (%s, %s, %s)""",
490 service.full(), 562 service.full(),