comparison idavoll/pgsql_backend.py @ 51:40ac06941edc

Added node existance checks. Added Storage support for adding and removing subscriptions. Added SubscriptionService as subclass of backend.SubscriptionService.
author Ralph Meijer <ralphm@ik.nu>
date Wed, 03 Nov 2004 17:27:23 +0000
parents 4447b3c5b857
children 7c4dfef5d964
comparison
equal deleted inserted replaced
50:64f0986d8b35 51:40ac06941edc
6 6
7 class Storage: 7 class Storage:
8 def __init__(self, user, database): 8 def __init__(self, user, database):
9 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, 9 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user,
10 database=database) 10 database=database)
11
12 def _check_node_exists(self, cursor, node_id):
13 cursor.execute("""SELECT id FROM nodes WHERE node=%s""",
14 (node_id.encode('utf8')))
15 if not cursor.fetchone():
16 raise backend.NodeNotFound
17 else:
18 return
11 19
12 def _get_node_configuration(self, cursor, node_id): 20 def _get_node_configuration(self, cursor, node_id):
13 configuration = {} 21 configuration = {}
14 cursor.execute("""SELECT persistent, deliver_payload FROM nodes 22 cursor.execute("""SELECT persistent, deliver_payload FROM nodes
15 WHERE node=%s""", 23 WHERE node=%s""",
23 31
24 def get_node_configuration(self, node_id): 32 def get_node_configuration(self, node_id):
25 return self.dbpool.runInteraction(self._get_node_configuration, node_id) 33 return self.dbpool.runInteraction(self._get_node_configuration, node_id)
26 34
27 def _get_affiliation(self, cursor, node_id, entity): 35 def _get_affiliation(self, cursor, node_id, entity):
36 self._check_node_exists(cursor, node_id)
28 cursor.execute("""SELECT affiliation FROM affiliations 37 cursor.execute("""SELECT affiliation FROM affiliations
29 JOIN nodes ON (node_id=nodes.id) 38 JOIN nodes ON (node_id=nodes.id)
30 JOIN entities ON (entity_id=entities.id) 39 JOIN entities ON (entity_id=entities.id)
31 WHERE node=%s AND jid=%s""", 40 WHERE node=%s AND jid=%s""",
32 (node_id.encode('utf8'), entity.encode('utf8'))) 41 (node_id.encode('utf8'), entity.encode('utf8')))
39 def get_affiliation(self, node_id, entity): 48 def get_affiliation(self, node_id, entity):
40 return self.dbpool.runInteraction(self._get_affiliation, node_id, 49 return self.dbpool.runInteraction(self._get_affiliation, node_id,
41 entity) 50 entity)
42 51
43 def get_subscribers(self, node_id): 52 def get_subscribers(self, node_id):
53 self._check_node_exists(cursor, node_id)
44 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions 54 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions
45 JOIN nodes ON (node_id=nodes.id) 55 JOIN nodes ON (node_id=nodes.id)
46 JOIN entities ON (entity_id=entities.id) 56 JOIN entities ON (entity_id=entities.id)
47 WHERE node=%s AND 57 WHERE node=%s AND
48 subscription='subscribed'""", 58 subscription='subscribed'""",
56 def store_items(self, node_id, items, publisher): 66 def store_items(self, node_id, items, publisher):
57 return self.dbpool.runInteraction(self._store_items, node_id, items, 67 return self.dbpool.runInteraction(self._store_items, node_id, items,
58 publisher) 68 publisher)
59 69
60 def _store_items(self, cursor, node_id, items, publisher): 70 def _store_items(self, cursor, node_id, items, publisher):
71 self._check_node_exists(cursor, node_id)
61 for item in items: 72 for item in items:
62 self._store_item(cursor, node_id, item, publisher) 73 self._store_item(cursor, node_id, item, publisher)
63 74
64 def _store_item(self, cursor, node_id, item, publisher): 75 def _store_item(self, cursor, node_id, item, publisher):
65 data = item.toXml() 76 data = item.toXml()
79 (item["id"].encode('utf8'), 90 (item["id"].encode('utf8'),
80 publisher.encode('utf8'), 91 publisher.encode('utf8'),
81 data.encode('utf8'), 92 data.encode('utf8'),
82 node_id.encode('utf8'))) 93 node_id.encode('utf8')))
83 94
95 def add_subscription(self, node_id, subscriber, state):
96 return self.dbpool.runInteraction(self._add_subscription, node_id,
97 subscriber, state)
98
99 def _add_subscription(self, cursor, node_id, subscriber, state):
100 self._check_node_exists(cursor, node_id)
101 subscriber = jid.JID(subscriber)
102 userhost = subscriber.userhost()
103 resource = subscriber.resource or ''
104
105 try:
106 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
107 (userhost.encode('utf8')))
108 except:
109 pass
110
111 try:
112 cursor.execute("""INSERT INTO subscriptions
113 (node_id, entity_id, resource, subscription)
114 SELECT n.id, e.id, %s, %s FROM
115 (SELECT id FROM nodes WHERE node=%s) AS n
116 CROSS JOIN
117 (SELECT id FROM entities WHERE jid=%s) AS e""",
118 (resource.encode('utf8'),
119 state.encode('utf8'),
120 node_id.encode('utf8'),
121 userhost.encode('utf8')))
122 except:
123 cursor.execute("""SELECT subscription FROM subscriptions
124 JOIN nodes ON (nodes.id=subscriptions.node_id)
125 JOIN entities ON
126 (entities.id=subscriptions.entity_id)
127 WHERE node=%s AND jid=%s AND resource=%s""",
128 (node_id.encode('utf8'),
129 userhost.encode('utf8'),
130 resource.encode('utf8')))
131 state = cursor.fetchone()[0]
132
133 return {'node': node_id,
134 'jid': subscriber.full(),
135 'subscription': state}
136
137 def remove_subscription(self, node_id, subscriber):
138 return self.dbpool.runInteraction(self._remove_subscription, node_id,
139 subscriber)
140
141 def _remove_subscription(self, cursor, node_id, subscriber):
142 self._check_node_exists(cursor, node_id)
143 subscriber = jid.JID(subscriber)
144 userhost = subscriber.userhost()
145 resource = subscriber.resource or ''
146
147 cursor.execute("""DELETE FROM subscriptions WHERE
148 node_id=(SELECT id FROM nodes WHERE node=%s) AND
149 entity_id=(SELECT id FROM entities WHERE jid=%s)
150 AND resource=%s""",
151 (node_id.encode('utf8'),
152 userhost.encode('utf8'),
153 resource.encode('utf8')))
154 if cursor.rowcount != 1:
155 raise backend.NotSubscribed
156
157 return None
158
84 class BackendService(backend.BackendService): 159 class BackendService(backend.BackendService):
85 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ 160 """ PostgreSQL backend Service for a JEP-0060 pubsub service """
86 161
87 class PublishService(backend.PublishService): 162 class PublishService(backend.PublishService):
88 pass 163 pass
89 164
90 class NotificationService(backend.NotificationService): 165 class NotificationService(backend.NotificationService):
91 pass 166 pass
167
168 class SubscriptionService(backend.SubscriptionService):
169 pass