Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 341:b49f75a26156
backend, pgsql: implemented subscriptionsGet and subscriptionsSet
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 20 Aug 2017 11:56:04 +0200 |
parents | 57a3051ee435 |
children | 8cf1be9572f8 |
comparison
equal
deleted
inserted
replaced
340:567e486bce24 | 341:b49f75a26156 |
---|---|
153 | 153 |
154 @param nodeDbId(unicode): database ID | 154 @param nodeDbId(unicode): database ID |
155 """ | 155 """ |
156 return self.dbpool.runInteraction(self._getNodeById, nodeDbId) | 156 return self.dbpool.runInteraction(self._getNodeById, nodeDbId) |
157 | 157 |
158 | |
159 def _getNodeById(self, cursor, nodeDbId): | 158 def _getNodeById(self, cursor, nodeDbId): |
160 cursor.execute("""SELECT node_id, | 159 cursor.execute("""SELECT node_id, |
161 node, | 160 node, |
162 node_type, | 161 node_type, |
163 persist_items, | 162 persist_items, |
173 return self._buildNode(row) | 172 return self._buildNode(row) |
174 | 173 |
175 def getNode(self, nodeIdentifier, pep, recipient=None): | 174 def getNode(self, nodeIdentifier, pep, recipient=None): |
176 return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient) | 175 return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient) |
177 | 176 |
178 | |
179 def _getNode(self, cursor, nodeIdentifier, pep, recipient): | 177 def _getNode(self, cursor, nodeIdentifier, pep, recipient): |
180 cursor.execute(*withPEP("""SELECT node_id, | 178 cursor.execute(*withPEP("""SELECT node_id, |
181 node, | 179 node, |
182 node_type, | 180 node_type, |
183 persist_items, | 181 persist_items, |
196 d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL""" | 194 d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL""" |
197 .format("NOT " if pep else "")) | 195 .format("NOT " if pep else "")) |
198 d.addCallback(lambda results: [r[0] for r in results]) | 196 d.addCallback(lambda results: [r[0] for r in results]) |
199 return d | 197 return d |
200 | 198 |
201 | |
202 def createNode(self, nodeIdentifier, owner, config, pep, recipient=None): | 199 def createNode(self, nodeIdentifier, owner, config, pep, recipient=None): |
203 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, | 200 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, |
204 owner, config, pep, recipient) | 201 owner, config, pep, recipient) |
205 | |
206 | 202 |
207 def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient): | 203 def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient): |
208 if config['pubsub#node_type'] != 'leaf': | 204 if config['pubsub#node_type'] != 'leaf': |
209 raise error.NoCollections() | 205 raise error.NoCollections() |
210 | 206 |
285 raise error.NodeNotFound() | 281 raise error.NodeNotFound() |
286 | 282 |
287 def deleteNode(self, nodeIdentifier, pep, recipient=None): | 283 def deleteNode(self, nodeIdentifier, pep, recipient=None): |
288 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient) | 284 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient) |
289 | 285 |
290 | |
291 def _deleteNode(self, cursor, nodeIdentifier, pep, recipient): | 286 def _deleteNode(self, cursor, nodeIdentifier, pep, recipient): |
292 cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""", | 287 cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""", |
293 (nodeIdentifier,), pep, recipient)) | 288 (nodeIdentifier,), pep, recipient)) |
294 | 289 |
295 if cursor.rowcount != 1: | 290 if cursor.rowcount != 1: |
310 args.append(nodeIdentifier) | 305 args.append(nodeIdentifier) |
311 | 306 |
312 cursor.execute(*withPEP(' '.join(query), args, pep, recipient)) | 307 cursor.execute(*withPEP(' '.join(query), args, pep, recipient)) |
313 rows = cursor.fetchall() | 308 rows = cursor.fetchall() |
314 return [tuple(r) for r in rows] | 309 return [tuple(r) for r in rows] |
315 | |
316 | 310 |
317 def getSubscriptions(self, entity, pep, recipient=None): | 311 def getSubscriptions(self, entity, pep, recipient=None): |
318 def toSubscriptions(rows): | 312 def toSubscriptions(rows): |
319 subscriptions = [] | 313 subscriptions = [] |
320 for row in rows: | 314 for row in rows: |
331 WHERE jid=%s AND nodes.pep=%s""", | 325 WHERE jid=%s AND nodes.pep=%s""", |
332 (entity.userhost(), recipient.userhost() if pep else None)) | 326 (entity.userhost(), recipient.userhost() if pep else None)) |
333 d.addCallback(toSubscriptions) | 327 d.addCallback(toSubscriptions) |
334 return d | 328 return d |
335 | 329 |
336 | |
337 def getDefaultConfiguration(self, nodeType): | 330 def getDefaultConfiguration(self, nodeType): |
338 return self.defaultConfig[nodeType] | 331 return self.defaultConfig[nodeType] |
339 | |
340 | 332 |
341 def formatLastItems(self, result): | 333 def formatLastItems(self, result): |
342 last_items = [] | 334 last_items = [] |
343 for pep_jid_s, node, data, item_access_model in result: | 335 for pep_jid_s, node, data, item_access_model in result: |
344 pep_jid = jid.JID(pep_jid_s) | 336 pep_jid = jid.JID(pep_jid_s) |
345 item = generic.stripNamespace(parseXml(data)) | 337 item = generic.stripNamespace(parseXml(data)) |
346 last_items.append((pep_jid, node, item, item_access_model)) | 338 last_items.append((pep_jid, node, item, item_access_model)) |
347 return last_items | 339 return last_items |
348 | |
349 | 340 |
350 def getLastItems(self, entities, nodes, node_accesses, item_accesses, pep): | 341 def getLastItems(self, entities, nodes, node_accesses, item_accesses, pep): |
351 """get last item for several nodes and entities in a single request""" | 342 """get last item for several nodes and entities in a single request""" |
352 if not entities or not nodes or not node_accesses or not item_accesses: | 343 if not entities or not nodes or not node_accesses or not item_accesses: |
353 raise ValueError("entities, nodes and accesses must not be empty") | 344 raise ValueError("entities, nodes and accesses must not be empty") |
378 def __init__(self, nodeDbId, nodeIdentifier, config): | 369 def __init__(self, nodeDbId, nodeIdentifier, config): |
379 self.nodeDbId = nodeDbId | 370 self.nodeDbId = nodeDbId |
380 self.nodeIdentifier = nodeIdentifier | 371 self.nodeIdentifier = nodeIdentifier |
381 self._config = config | 372 self._config = config |
382 | 373 |
383 | |
384 def _checkNodeExists(self, cursor): | 374 def _checkNodeExists(self, cursor): |
385 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", | 375 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", |
386 (self.nodeDbId,)) | 376 (self.nodeDbId,)) |
387 if not cursor.fetchone(): | 377 if not cursor.fetchone(): |
388 raise error.NodeNotFound() | 378 raise error.NodeNotFound() |
389 | 379 |
390 | |
391 def getType(self): | 380 def getType(self): |
392 return self.nodeType | 381 return self.nodeType |
393 | 382 |
394 def getOwners(self): | 383 def getOwners(self): |
395 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s and affiliation='owner'""", (self.nodeDbId,)) | 384 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s and affiliation='owner'""", (self.nodeDbId,)) |
396 d.addCallback(lambda rows: [jid.JID(r[0]) for r in rows]) | 385 d.addCallback(lambda rows: [jid.JID(r[0]) for r in rows]) |
397 return d | 386 return d |
398 | 387 |
399 | |
400 def getConfiguration(self): | 388 def getConfiguration(self): |
401 return self._config | 389 return self._config |
402 | |
403 | 390 |
404 def setConfiguration(self, options): | 391 def setConfiguration(self, options): |
405 config = copy.copy(self._config) | 392 config = copy.copy(self._config) |
406 | 393 |
407 for option in options: | 394 for option in options: |
409 config[option] = options[option] | 396 config[option] = options[option] |
410 | 397 |
411 d = self.dbpool.runInteraction(self._setConfiguration, config) | 398 d = self.dbpool.runInteraction(self._setConfiguration, config) |
412 d.addCallback(self._setCachedConfiguration, config) | 399 d.addCallback(self._setCachedConfiguration, config) |
413 return d | 400 return d |
414 | |
415 | 401 |
416 def _setConfiguration(self, cursor, config): | 402 def _setConfiguration(self, cursor, config): |
417 self._checkNodeExists(cursor) | 403 self._checkNodeExists(cursor) |
418 cursor.execute("""UPDATE nodes SET persist_items=%s, | 404 cursor.execute("""UPDATE nodes SET persist_items=%s, |
419 deliver_payloads=%s, | 405 deliver_payloads=%s, |
426 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], | 412 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], |
427 config[const.OPT_ACCESS_MODEL], | 413 config[const.OPT_ACCESS_MODEL], |
428 config[const.OPT_PUBLISH_MODEL], | 414 config[const.OPT_PUBLISH_MODEL], |
429 self.nodeDbId)) | 415 self.nodeDbId)) |
430 | 416 |
431 | |
432 def _setCachedConfiguration(self, void, config): | 417 def _setCachedConfiguration(self, void, config): |
433 self._config = config | 418 self._config = config |
434 | |
435 | 419 |
436 def getMetaData(self): | 420 def getMetaData(self): |
437 config = copy.copy(self._config) | 421 config = copy.copy(self._config) |
438 config["pubsub#node_type"] = self.nodeType | 422 config["pubsub#node_type"] = self.nodeType |
439 return config | 423 return config |
440 | 424 |
441 | |
442 def getAffiliation(self, entity): | 425 def getAffiliation(self, entity): |
443 return self.dbpool.runInteraction(self._getAffiliation, entity) | 426 return self.dbpool.runInteraction(self._getAffiliation, entity) |
444 | |
445 | 427 |
446 def _getAffiliation(self, cursor, entity): | 428 def _getAffiliation(self, cursor, entity): |
447 self._checkNodeExists(cursor) | 429 self._checkNodeExists(cursor) |
448 cursor.execute("""SELECT affiliation FROM affiliations | 430 cursor.execute("""SELECT affiliation FROM affiliations |
449 NATURAL JOIN nodes | 431 NATURAL JOIN nodes |
455 try: | 437 try: |
456 return cursor.fetchone()[0] | 438 return cursor.fetchone()[0] |
457 except TypeError: | 439 except TypeError: |
458 return None | 440 return None |
459 | 441 |
460 | |
461 def getAccessModel(self): | 442 def getAccessModel(self): |
462 return self._config[const.OPT_ACCESS_MODEL] | 443 return self._config[const.OPT_ACCESS_MODEL] |
463 | 444 |
464 | |
465 def getSubscription(self, subscriber): | 445 def getSubscription(self, subscriber): |
466 return self.dbpool.runInteraction(self._getSubscription, subscriber) | 446 return self.dbpool.runInteraction(self._getSubscription, subscriber) |
467 | |
468 | 447 |
469 def _getSubscription(self, cursor, subscriber): | 448 def _getSubscription(self, cursor, subscriber): |
470 self._checkNodeExists(cursor) | 449 self._checkNodeExists(cursor) |
471 | 450 |
472 userhost = subscriber.userhost() | 451 userhost = subscriber.userhost() |
484 if not row: | 463 if not row: |
485 return None | 464 return None |
486 else: | 465 else: |
487 return Subscription(self.nodeIdentifier, subscriber, row[0]) | 466 return Subscription(self.nodeIdentifier, subscriber, row[0]) |
488 | 467 |
489 | |
490 def getSubscriptions(self, state=None): | 468 def getSubscriptions(self, state=None): |
491 return self.dbpool.runInteraction(self._getSubscriptions, state) | 469 return self.dbpool.runInteraction(self._getSubscriptions, state) |
492 | |
493 | 470 |
494 def _getSubscriptions(self, cursor, state): | 471 def _getSubscriptions(self, cursor, state): |
495 self._checkNodeExists(cursor) | 472 self._checkNodeExists(cursor) |
496 | 473 |
497 query = """SELECT node, jid, resource, state, | 474 query = """SELECT node, jid, resource, state, |
522 subscriptions.append(Subscription(row[0], subscriber, | 499 subscriptions.append(Subscription(row[0], subscriber, |
523 row[3], options)) | 500 row[3], options)) |
524 | 501 |
525 return subscriptions | 502 return subscriptions |
526 | 503 |
527 | |
528 def addSubscription(self, subscriber, state, config): | 504 def addSubscription(self, subscriber, state, config): |
529 return self.dbpool.runInteraction(self._addSubscription, subscriber, | 505 return self.dbpool.runInteraction(self._addSubscription, subscriber, |
530 state, config) | 506 state, config) |
531 | |
532 | 507 |
533 def _addSubscription(self, cursor, subscriber, state, config): | 508 def _addSubscription(self, cursor, subscriber, state, config): |
534 self._checkNodeExists(cursor) | 509 self._checkNodeExists(cursor) |
535 | 510 |
536 userhost = subscriber.userhost() | 511 userhost = subscriber.userhost() |
559 subscription_depth, | 534 subscription_depth, |
560 userhost)) | 535 userhost)) |
561 except cursor._pool.dbapi.IntegrityError: | 536 except cursor._pool.dbapi.IntegrityError: |
562 raise error.SubscriptionExists() | 537 raise error.SubscriptionExists() |
563 | 538 |
564 | |
565 def removeSubscription(self, subscriber): | 539 def removeSubscription(self, subscriber): |
566 return self.dbpool.runInteraction(self._removeSubscription, | 540 return self.dbpool.runInteraction(self._removeSubscription, |
567 subscriber) | 541 subscriber) |
568 | |
569 | 542 |
570 def _removeSubscription(self, cursor, subscriber): | 543 def _removeSubscription(self, cursor, subscriber): |
571 self._checkNodeExists(cursor) | 544 self._checkNodeExists(cursor) |
572 | 545 |
573 userhost = subscriber.userhost() | 546 userhost = subscriber.userhost() |
584 if cursor.rowcount != 1: | 557 if cursor.rowcount != 1: |
585 raise error.NotSubscribed() | 558 raise error.NotSubscribed() |
586 | 559 |
587 return None | 560 return None |
588 | 561 |
562 def setSubscriptions(self, subscriptions): | |
563 return self.dbpool.runInteraction(self._setSubscriptions, subscriptions) | |
564 | |
565 def _setSubscriptions(self, cursor, subscriptions): | |
566 self._checkNodeExists(cursor) | |
567 | |
568 entities = self.getOrCreateEntities(cursor, [s.subscriber for s in subscriptions]) | |
569 entities_map = {jid.JID(e.jid): e for e in entities} | |
570 | |
571 # then we construct values for subscriptions update according to entity_id we just got | |
572 placeholders = ','.join(len(subscriptions) * ["%s"]) | |
573 values = [] | |
574 for subscription in subscriptions: | |
575 entity_id = entities_map[subscription.subscriber].entity_id | |
576 resource = subscription.subscriber.resource or u'' | |
577 values.append((self.nodeDbId, entity_id, resource, subscription.state, None, None)) | |
578 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 | |
579 cursor.execute("INSERT INTO subscriptions(node_id, entity_id, resource, state, subscription_type, subscription_depth) VALUES " + placeholders + " ON CONFLICT (entity_id, resource, node_id) DO UPDATE SET state=EXCLUDED.state", [v for v in values]) | |
589 | 580 |
590 def isSubscribed(self, entity): | 581 def isSubscribed(self, entity): |
591 return self.dbpool.runInteraction(self._isSubscribed, entity) | 582 return self.dbpool.runInteraction(self._isSubscribed, entity) |
592 | |
593 | 583 |
594 def _isSubscribed(self, cursor, entity): | 584 def _isSubscribed(self, cursor, entity): |
595 self._checkNodeExists(cursor) | 585 self._checkNodeExists(cursor) |
596 | 586 |
597 cursor.execute("""SELECT 1 as bool FROM entities | 587 cursor.execute("""SELECT 1 as bool FROM entities |
602 (entity.userhost(), | 592 (entity.userhost(), |
603 self.nodeDbId)) | 593 self.nodeDbId)) |
604 | 594 |
605 return cursor.fetchone() is not None | 595 return cursor.fetchone() is not None |
606 | 596 |
607 | |
608 def getAffiliations(self): | 597 def getAffiliations(self): |
609 return self.dbpool.runInteraction(self._getAffiliations) | 598 return self.dbpool.runInteraction(self._getAffiliations) |
610 | |
611 | 599 |
612 def _getAffiliations(self, cursor): | 600 def _getAffiliations(self, cursor): |
613 self._checkNodeExists(cursor) | 601 self._checkNodeExists(cursor) |
614 | 602 |
615 cursor.execute("""SELECT jid, affiliation FROM nodes | 603 cursor.execute("""SELECT jid, affiliation FROM nodes |
624 def getOrCreateEntities(self, cursor, entities_jids): | 612 def getOrCreateEntities(self, cursor, entities_jids): |
625 """Get entity_id from entities in entities table | 613 """Get entity_id from entities in entities table |
626 | 614 |
627 Entities will be inserted it they don't exist | 615 Entities will be inserted it they don't exist |
628 @param entities_jid(list[jid.JID]): entities to get or create | 616 @param entities_jid(list[jid.JID]): entities to get or create |
629 @return list[record(entity_jid,jid)]]: list of entity_id and jid (as plain string) | 617 @return list[record(entity_id,jid)]]: list of entity_id and jid (as plain string) |
630 both existing and inserted entities are returned | 618 both existing and inserted entities are returned |
631 """ | 619 """ |
632 # cf. http://stackoverflow.com/a/35265559 | 620 # cf. http://stackoverflow.com/a/35265559 |
633 placeholders = ','.join(len(entities_jids) * ["(%s)"]) | 621 placeholders = ','.join(len(entities_jids) * ["(%s)"]) |
634 query = ( | 622 query = ( |
653 return cursor.fetchall() | 641 return cursor.fetchall() |
654 | 642 |
655 def setAffiliations(self, affiliations): | 643 def setAffiliations(self, affiliations): |
656 return self.dbpool.runInteraction(self._setAffiliations, affiliations) | 644 return self.dbpool.runInteraction(self._setAffiliations, affiliations) |
657 | 645 |
658 | |
659 def _setAffiliations(self, cursor, affiliations): | 646 def _setAffiliations(self, cursor, affiliations): |
660 self._checkNodeExists(cursor) | 647 self._checkNodeExists(cursor) |
661 | 648 |
662 entities = self.getOrCreateEntities(cursor, affiliations) | 649 entities = self.getOrCreateEntities(cursor, affiliations) |
663 | 650 |
667 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities)) | 654 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities)) |
668 | 655 |
669 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 | 656 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 |
670 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) | 657 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) |
671 | 658 |
672 | |
673 def deleteAffiliations(self, entities): | 659 def deleteAffiliations(self, entities): |
674 return self.dbpool.runInteraction(self._deleteAffiliations, entities) | 660 return self.dbpool.runInteraction(self._deleteAffiliations, entities) |
675 | |
676 | 661 |
677 def _deleteAffiliations(self, cursor, entities): | 662 def _deleteAffiliations(self, cursor, entities): |
678 """delete affiliations and subscriptions for this entity""" | 663 """delete affiliations and subscriptions for this entity""" |
679 self._checkNodeExists(cursor) | 664 self._checkNodeExists(cursor) |
680 placeholders = ','.join(len(entities) * ["%s"]) | 665 placeholders = ','.join(len(entities) * ["%s"]) |
1076 """ | 1061 """ |
1077 | 1062 |
1078 def __init__(self, dbpool): | 1063 def __init__(self, dbpool): |
1079 self.dbpool = dbpool | 1064 self.dbpool = dbpool |
1080 | 1065 |
1081 | |
1082 def _countCallbacks(self, cursor, service, nodeIdentifier): | 1066 def _countCallbacks(self, cursor, service, nodeIdentifier): |
1083 """ | 1067 """ |
1084 Count number of callbacks registered for a node. | 1068 Count number of callbacks registered for a node. |
1085 """ | 1069 """ |
1086 cursor.execute("""SELECT count(*) FROM callbacks | 1070 cursor.execute("""SELECT count(*) FROM callbacks |
1088 (service.full(), | 1072 (service.full(), |
1089 nodeIdentifier)) | 1073 nodeIdentifier)) |
1090 results = cursor.fetchall() | 1074 results = cursor.fetchall() |
1091 return results[0][0] | 1075 return results[0][0] |
1092 | 1076 |
1093 | |
1094 def addCallback(self, service, nodeIdentifier, callback): | 1077 def addCallback(self, service, nodeIdentifier, callback): |
1095 def interaction(cursor): | 1078 def interaction(cursor): |
1096 cursor.execute("""SELECT 1 as bool FROM callbacks | 1079 cursor.execute("""SELECT 1 as bool FROM callbacks |
1097 WHERE service=%s and node=%s and uri=%s""", | 1080 WHERE service=%s and node=%s and uri=%s""", |
1098 (service.full(), | 1081 (service.full(), |
1108 nodeIdentifier, | 1091 nodeIdentifier, |
1109 callback)) | 1092 callback)) |
1110 | 1093 |
1111 return self.dbpool.runInteraction(interaction) | 1094 return self.dbpool.runInteraction(interaction) |
1112 | 1095 |
1113 | |
1114 def removeCallback(self, service, nodeIdentifier, callback): | 1096 def removeCallback(self, service, nodeIdentifier, callback): |
1115 def interaction(cursor): | 1097 def interaction(cursor): |
1116 cursor.execute("""DELETE FROM callbacks | 1098 cursor.execute("""DELETE FROM callbacks |
1117 WHERE service=%s and node=%s and uri=%s""", | 1099 WHERE service=%s and node=%s and uri=%s""", |
1118 (service.full(), | 1100 (service.full(), |
1140 | 1122 |
1141 return [result[0] for result in results] | 1123 return [result[0] for result in results] |
1142 | 1124 |
1143 return self.dbpool.runInteraction(interaction) | 1125 return self.dbpool.runInteraction(interaction) |
1144 | 1126 |
1145 | |
1146 def hasCallbacks(self, service, nodeIdentifier): | 1127 def hasCallbacks(self, service, nodeIdentifier): |
1147 def interaction(cursor): | 1128 def interaction(cursor): |
1148 return bool(self._countCallbacks(cursor, service, nodeIdentifier)) | 1129 return bool(self._countCallbacks(cursor, service, nodeIdentifier)) |
1149 | 1130 |
1150 return self.dbpool.runInteraction(interaction) | 1131 return self.dbpool.runInteraction(interaction) |