comparison sat_pubsub/pgsql_storage.py @ 341:b49f75a26156

backend, pgsql: implemented subscriptionsGet and subscriptionsSet
author Goffi <goffi@goffi.org>
date Sun, 20 Aug 2017 11:56:04 +0200
parents 57a3051ee435
children 8cf1be9572f8
comparison
equal deleted inserted replaced
340:567e486bce24 341:b49f75a26156
153 153
154 @param nodeDbId(unicode): database ID 154 @param nodeDbId(unicode): database ID
155 """ 155 """
156 return self.dbpool.runInteraction(self._getNodeById, nodeDbId) 156 return self.dbpool.runInteraction(self._getNodeById, nodeDbId)
157 157
158
159 def _getNodeById(self, cursor, nodeDbId): 158 def _getNodeById(self, cursor, nodeDbId):
160 cursor.execute("""SELECT node_id, 159 cursor.execute("""SELECT node_id,
161 node, 160 node,
162 node_type, 161 node_type,
163 persist_items, 162 persist_items,
173 return self._buildNode(row) 172 return self._buildNode(row)
174 173
175 def getNode(self, nodeIdentifier, pep, recipient=None): 174 def getNode(self, nodeIdentifier, pep, recipient=None):
176 return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient) 175 return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient)
177 176
178
179 def _getNode(self, cursor, nodeIdentifier, pep, recipient): 177 def _getNode(self, cursor, nodeIdentifier, pep, recipient):
180 cursor.execute(*withPEP("""SELECT node_id, 178 cursor.execute(*withPEP("""SELECT node_id,
181 node, 179 node,
182 node_type, 180 node_type,
183 persist_items, 181 persist_items,
196 d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL""" 194 d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL"""
197 .format("NOT " if pep else "")) 195 .format("NOT " if pep else ""))
198 d.addCallback(lambda results: [r[0] for r in results]) 196 d.addCallback(lambda results: [r[0] for r in results])
199 return d 197 return d
200 198
201
202 def createNode(self, nodeIdentifier, owner, config, pep, recipient=None): 199 def createNode(self, nodeIdentifier, owner, config, pep, recipient=None):
203 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, 200 return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
204 owner, config, pep, recipient) 201 owner, config, pep, recipient)
205
206 202
207 def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient): 203 def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient):
208 if config['pubsub#node_type'] != 'leaf': 204 if config['pubsub#node_type'] != 'leaf':
209 raise error.NoCollections() 205 raise error.NoCollections()
210 206
285 raise error.NodeNotFound() 281 raise error.NodeNotFound()
286 282
287 def deleteNode(self, nodeIdentifier, pep, recipient=None): 283 def deleteNode(self, nodeIdentifier, pep, recipient=None):
288 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient) 284 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient)
289 285
290
291 def _deleteNode(self, cursor, nodeIdentifier, pep, recipient): 286 def _deleteNode(self, cursor, nodeIdentifier, pep, recipient):
292 cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""", 287 cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""",
293 (nodeIdentifier,), pep, recipient)) 288 (nodeIdentifier,), pep, recipient))
294 289
295 if cursor.rowcount != 1: 290 if cursor.rowcount != 1:
310 args.append(nodeIdentifier) 305 args.append(nodeIdentifier)
311 306
312 cursor.execute(*withPEP(' '.join(query), args, pep, recipient)) 307 cursor.execute(*withPEP(' '.join(query), args, pep, recipient))
313 rows = cursor.fetchall() 308 rows = cursor.fetchall()
314 return [tuple(r) for r in rows] 309 return [tuple(r) for r in rows]
315
316 310
317 def getSubscriptions(self, entity, pep, recipient=None): 311 def getSubscriptions(self, entity, pep, recipient=None):
318 def toSubscriptions(rows): 312 def toSubscriptions(rows):
319 subscriptions = [] 313 subscriptions = []
320 for row in rows: 314 for row in rows:
331 WHERE jid=%s AND nodes.pep=%s""", 325 WHERE jid=%s AND nodes.pep=%s""",
332 (entity.userhost(), recipient.userhost() if pep else None)) 326 (entity.userhost(), recipient.userhost() if pep else None))
333 d.addCallback(toSubscriptions) 327 d.addCallback(toSubscriptions)
334 return d 328 return d
335 329
336
337 def getDefaultConfiguration(self, nodeType): 330 def getDefaultConfiguration(self, nodeType):
338 return self.defaultConfig[nodeType] 331 return self.defaultConfig[nodeType]
339
340 332
341 def formatLastItems(self, result): 333 def formatLastItems(self, result):
342 last_items = [] 334 last_items = []
343 for pep_jid_s, node, data, item_access_model in result: 335 for pep_jid_s, node, data, item_access_model in result:
344 pep_jid = jid.JID(pep_jid_s) 336 pep_jid = jid.JID(pep_jid_s)
345 item = generic.stripNamespace(parseXml(data)) 337 item = generic.stripNamespace(parseXml(data))
346 last_items.append((pep_jid, node, item, item_access_model)) 338 last_items.append((pep_jid, node, item, item_access_model))
347 return last_items 339 return last_items
348
349 340
350 def getLastItems(self, entities, nodes, node_accesses, item_accesses, pep): 341 def getLastItems(self, entities, nodes, node_accesses, item_accesses, pep):
351 """get last item for several nodes and entities in a single request""" 342 """get last item for several nodes and entities in a single request"""
352 if not entities or not nodes or not node_accesses or not item_accesses: 343 if not entities or not nodes or not node_accesses or not item_accesses:
353 raise ValueError("entities, nodes and accesses must not be empty") 344 raise ValueError("entities, nodes and accesses must not be empty")
378 def __init__(self, nodeDbId, nodeIdentifier, config): 369 def __init__(self, nodeDbId, nodeIdentifier, config):
379 self.nodeDbId = nodeDbId 370 self.nodeDbId = nodeDbId
380 self.nodeIdentifier = nodeIdentifier 371 self.nodeIdentifier = nodeIdentifier
381 self._config = config 372 self._config = config
382 373
383
384 def _checkNodeExists(self, cursor): 374 def _checkNodeExists(self, cursor):
385 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", 375 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""",
386 (self.nodeDbId,)) 376 (self.nodeDbId,))
387 if not cursor.fetchone(): 377 if not cursor.fetchone():
388 raise error.NodeNotFound() 378 raise error.NodeNotFound()
389 379
390
391 def getType(self): 380 def getType(self):
392 return self.nodeType 381 return self.nodeType
393 382
394 def getOwners(self): 383 def getOwners(self):
395 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s and affiliation='owner'""", (self.nodeDbId,)) 384 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s and affiliation='owner'""", (self.nodeDbId,))
396 d.addCallback(lambda rows: [jid.JID(r[0]) for r in rows]) 385 d.addCallback(lambda rows: [jid.JID(r[0]) for r in rows])
397 return d 386 return d
398 387
399
400 def getConfiguration(self): 388 def getConfiguration(self):
401 return self._config 389 return self._config
402
403 390
404 def setConfiguration(self, options): 391 def setConfiguration(self, options):
405 config = copy.copy(self._config) 392 config = copy.copy(self._config)
406 393
407 for option in options: 394 for option in options:
409 config[option] = options[option] 396 config[option] = options[option]
410 397
411 d = self.dbpool.runInteraction(self._setConfiguration, config) 398 d = self.dbpool.runInteraction(self._setConfiguration, config)
412 d.addCallback(self._setCachedConfiguration, config) 399 d.addCallback(self._setCachedConfiguration, config)
413 return d 400 return d
414
415 401
416 def _setConfiguration(self, cursor, config): 402 def _setConfiguration(self, cursor, config):
417 self._checkNodeExists(cursor) 403 self._checkNodeExists(cursor)
418 cursor.execute("""UPDATE nodes SET persist_items=%s, 404 cursor.execute("""UPDATE nodes SET persist_items=%s,
419 deliver_payloads=%s, 405 deliver_payloads=%s,
426 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], 412 config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
427 config[const.OPT_ACCESS_MODEL], 413 config[const.OPT_ACCESS_MODEL],
428 config[const.OPT_PUBLISH_MODEL], 414 config[const.OPT_PUBLISH_MODEL],
429 self.nodeDbId)) 415 self.nodeDbId))
430 416
431
432 def _setCachedConfiguration(self, void, config): 417 def _setCachedConfiguration(self, void, config):
433 self._config = config 418 self._config = config
434
435 419
436 def getMetaData(self): 420 def getMetaData(self):
437 config = copy.copy(self._config) 421 config = copy.copy(self._config)
438 config["pubsub#node_type"] = self.nodeType 422 config["pubsub#node_type"] = self.nodeType
439 return config 423 return config
440 424
441
442 def getAffiliation(self, entity): 425 def getAffiliation(self, entity):
443 return self.dbpool.runInteraction(self._getAffiliation, entity) 426 return self.dbpool.runInteraction(self._getAffiliation, entity)
444
445 427
446 def _getAffiliation(self, cursor, entity): 428 def _getAffiliation(self, cursor, entity):
447 self._checkNodeExists(cursor) 429 self._checkNodeExists(cursor)
448 cursor.execute("""SELECT affiliation FROM affiliations 430 cursor.execute("""SELECT affiliation FROM affiliations
449 NATURAL JOIN nodes 431 NATURAL JOIN nodes
455 try: 437 try:
456 return cursor.fetchone()[0] 438 return cursor.fetchone()[0]
457 except TypeError: 439 except TypeError:
458 return None 440 return None
459 441
460
461 def getAccessModel(self): 442 def getAccessModel(self):
462 return self._config[const.OPT_ACCESS_MODEL] 443 return self._config[const.OPT_ACCESS_MODEL]
463 444
464
465 def getSubscription(self, subscriber): 445 def getSubscription(self, subscriber):
466 return self.dbpool.runInteraction(self._getSubscription, subscriber) 446 return self.dbpool.runInteraction(self._getSubscription, subscriber)
467
468 447
469 def _getSubscription(self, cursor, subscriber): 448 def _getSubscription(self, cursor, subscriber):
470 self._checkNodeExists(cursor) 449 self._checkNodeExists(cursor)
471 450
472 userhost = subscriber.userhost() 451 userhost = subscriber.userhost()
484 if not row: 463 if not row:
485 return None 464 return None
486 else: 465 else:
487 return Subscription(self.nodeIdentifier, subscriber, row[0]) 466 return Subscription(self.nodeIdentifier, subscriber, row[0])
488 467
489
490 def getSubscriptions(self, state=None): 468 def getSubscriptions(self, state=None):
491 return self.dbpool.runInteraction(self._getSubscriptions, state) 469 return self.dbpool.runInteraction(self._getSubscriptions, state)
492
493 470
494 def _getSubscriptions(self, cursor, state): 471 def _getSubscriptions(self, cursor, state):
495 self._checkNodeExists(cursor) 472 self._checkNodeExists(cursor)
496 473
497 query = """SELECT node, jid, resource, state, 474 query = """SELECT node, jid, resource, state,
522 subscriptions.append(Subscription(row[0], subscriber, 499 subscriptions.append(Subscription(row[0], subscriber,
523 row[3], options)) 500 row[3], options))
524 501
525 return subscriptions 502 return subscriptions
526 503
527
528 def addSubscription(self, subscriber, state, config): 504 def addSubscription(self, subscriber, state, config):
529 return self.dbpool.runInteraction(self._addSubscription, subscriber, 505 return self.dbpool.runInteraction(self._addSubscription, subscriber,
530 state, config) 506 state, config)
531
532 507
533 def _addSubscription(self, cursor, subscriber, state, config): 508 def _addSubscription(self, cursor, subscriber, state, config):
534 self._checkNodeExists(cursor) 509 self._checkNodeExists(cursor)
535 510
536 userhost = subscriber.userhost() 511 userhost = subscriber.userhost()
559 subscription_depth, 534 subscription_depth,
560 userhost)) 535 userhost))
561 except cursor._pool.dbapi.IntegrityError: 536 except cursor._pool.dbapi.IntegrityError:
562 raise error.SubscriptionExists() 537 raise error.SubscriptionExists()
563 538
564
565 def removeSubscription(self, subscriber): 539 def removeSubscription(self, subscriber):
566 return self.dbpool.runInteraction(self._removeSubscription, 540 return self.dbpool.runInteraction(self._removeSubscription,
567 subscriber) 541 subscriber)
568
569 542
570 def _removeSubscription(self, cursor, subscriber): 543 def _removeSubscription(self, cursor, subscriber):
571 self._checkNodeExists(cursor) 544 self._checkNodeExists(cursor)
572 545
573 userhost = subscriber.userhost() 546 userhost = subscriber.userhost()
584 if cursor.rowcount != 1: 557 if cursor.rowcount != 1:
585 raise error.NotSubscribed() 558 raise error.NotSubscribed()
586 559
587 return None 560 return None
588 561
562 def setSubscriptions(self, subscriptions):
563 return self.dbpool.runInteraction(self._setSubscriptions, subscriptions)
564
565 def _setSubscriptions(self, cursor, subscriptions):
566 self._checkNodeExists(cursor)
567
568 entities = self.getOrCreateEntities(cursor, [s.subscriber for s in subscriptions])
569 entities_map = {jid.JID(e.jid): e for e in entities}
570
571 # then we construct values for subscriptions update according to entity_id we just got
572 placeholders = ','.join(len(subscriptions) * ["%s"])
573 values = []
574 for subscription in subscriptions:
575 entity_id = entities_map[subscription.subscriber].entity_id
576 resource = subscription.subscriber.resource or u''
577 values.append((self.nodeDbId, entity_id, resource, subscription.state, None, None))
578 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5
579 cursor.execute("INSERT INTO subscriptions(node_id, entity_id, resource, state, subscription_type, subscription_depth) VALUES " + placeholders + " ON CONFLICT (entity_id, resource, node_id) DO UPDATE SET state=EXCLUDED.state", [v for v in values])
589 580
590 def isSubscribed(self, entity): 581 def isSubscribed(self, entity):
591 return self.dbpool.runInteraction(self._isSubscribed, entity) 582 return self.dbpool.runInteraction(self._isSubscribed, entity)
592
593 583
594 def _isSubscribed(self, cursor, entity): 584 def _isSubscribed(self, cursor, entity):
595 self._checkNodeExists(cursor) 585 self._checkNodeExists(cursor)
596 586
597 cursor.execute("""SELECT 1 as bool FROM entities 587 cursor.execute("""SELECT 1 as bool FROM entities
602 (entity.userhost(), 592 (entity.userhost(),
603 self.nodeDbId)) 593 self.nodeDbId))
604 594
605 return cursor.fetchone() is not None 595 return cursor.fetchone() is not None
606 596
607
608 def getAffiliations(self): 597 def getAffiliations(self):
609 return self.dbpool.runInteraction(self._getAffiliations) 598 return self.dbpool.runInteraction(self._getAffiliations)
610
611 599
612 def _getAffiliations(self, cursor): 600 def _getAffiliations(self, cursor):
613 self._checkNodeExists(cursor) 601 self._checkNodeExists(cursor)
614 602
615 cursor.execute("""SELECT jid, affiliation FROM nodes 603 cursor.execute("""SELECT jid, affiliation FROM nodes
624 def getOrCreateEntities(self, cursor, entities_jids): 612 def getOrCreateEntities(self, cursor, entities_jids):
625 """Get entity_id from entities in entities table 613 """Get entity_id from entities in entities table
626 614
627 Entities will be inserted it they don't exist 615 Entities will be inserted it they don't exist
628 @param entities_jid(list[jid.JID]): entities to get or create 616 @param entities_jid(list[jid.JID]): entities to get or create
629 @return list[record(entity_jid,jid)]]: list of entity_id and jid (as plain string) 617 @return list[record(entity_id,jid)]]: list of entity_id and jid (as plain string)
630 both existing and inserted entities are returned 618 both existing and inserted entities are returned
631 """ 619 """
632 # cf. http://stackoverflow.com/a/35265559 620 # cf. http://stackoverflow.com/a/35265559
633 placeholders = ','.join(len(entities_jids) * ["(%s)"]) 621 placeholders = ','.join(len(entities_jids) * ["(%s)"])
634 query = ( 622 query = (
653 return cursor.fetchall() 641 return cursor.fetchall()
654 642
655 def setAffiliations(self, affiliations): 643 def setAffiliations(self, affiliations):
656 return self.dbpool.runInteraction(self._setAffiliations, affiliations) 644 return self.dbpool.runInteraction(self._setAffiliations, affiliations)
657 645
658
659 def _setAffiliations(self, cursor, affiliations): 646 def _setAffiliations(self, cursor, affiliations):
660 self._checkNodeExists(cursor) 647 self._checkNodeExists(cursor)
661 648
662 entities = self.getOrCreateEntities(cursor, affiliations) 649 entities = self.getOrCreateEntities(cursor, affiliations)
663 650
667 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities)) 654 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities))
668 655
669 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 656 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5
670 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) 657 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values)
671 658
672
673 def deleteAffiliations(self, entities): 659 def deleteAffiliations(self, entities):
674 return self.dbpool.runInteraction(self._deleteAffiliations, entities) 660 return self.dbpool.runInteraction(self._deleteAffiliations, entities)
675
676 661
677 def _deleteAffiliations(self, cursor, entities): 662 def _deleteAffiliations(self, cursor, entities):
678 """delete affiliations and subscriptions for this entity""" 663 """delete affiliations and subscriptions for this entity"""
679 self._checkNodeExists(cursor) 664 self._checkNodeExists(cursor)
680 placeholders = ','.join(len(entities) * ["%s"]) 665 placeholders = ','.join(len(entities) * ["%s"])
1076 """ 1061 """
1077 1062
1078 def __init__(self, dbpool): 1063 def __init__(self, dbpool):
1079 self.dbpool = dbpool 1064 self.dbpool = dbpool
1080 1065
1081
1082 def _countCallbacks(self, cursor, service, nodeIdentifier): 1066 def _countCallbacks(self, cursor, service, nodeIdentifier):
1083 """ 1067 """
1084 Count number of callbacks registered for a node. 1068 Count number of callbacks registered for a node.
1085 """ 1069 """
1086 cursor.execute("""SELECT count(*) FROM callbacks 1070 cursor.execute("""SELECT count(*) FROM callbacks
1088 (service.full(), 1072 (service.full(),
1089 nodeIdentifier)) 1073 nodeIdentifier))
1090 results = cursor.fetchall() 1074 results = cursor.fetchall()
1091 return results[0][0] 1075 return results[0][0]
1092 1076
1093
1094 def addCallback(self, service, nodeIdentifier, callback): 1077 def addCallback(self, service, nodeIdentifier, callback):
1095 def interaction(cursor): 1078 def interaction(cursor):
1096 cursor.execute("""SELECT 1 as bool FROM callbacks 1079 cursor.execute("""SELECT 1 as bool FROM callbacks
1097 WHERE service=%s and node=%s and uri=%s""", 1080 WHERE service=%s and node=%s and uri=%s""",
1098 (service.full(), 1081 (service.full(),
1108 nodeIdentifier, 1091 nodeIdentifier,
1109 callback)) 1092 callback))
1110 1093
1111 return self.dbpool.runInteraction(interaction) 1094 return self.dbpool.runInteraction(interaction)
1112 1095
1113
1114 def removeCallback(self, service, nodeIdentifier, callback): 1096 def removeCallback(self, service, nodeIdentifier, callback):
1115 def interaction(cursor): 1097 def interaction(cursor):
1116 cursor.execute("""DELETE FROM callbacks 1098 cursor.execute("""DELETE FROM callbacks
1117 WHERE service=%s and node=%s and uri=%s""", 1099 WHERE service=%s and node=%s and uri=%s""",
1118 (service.full(), 1100 (service.full(),
1140 1122
1141 return [result[0] for result in results] 1123 return [result[0] for result in results]
1142 1124
1143 return self.dbpool.runInteraction(interaction) 1125 return self.dbpool.runInteraction(interaction)
1144 1126
1145
1146 def hasCallbacks(self, service, nodeIdentifier): 1127 def hasCallbacks(self, service, nodeIdentifier):
1147 def interaction(cursor): 1128 def interaction(cursor):
1148 return bool(self._countCallbacks(cursor, service, nodeIdentifier)) 1129 return bool(self._countCallbacks(cursor, service, nodeIdentifier))
1149 1130
1150 return self.dbpool.runInteraction(interaction) 1131 return self.dbpool.runInteraction(interaction)