Mercurial > libervia-pubsub
comparison idavoll/pgsql_backend.py @ 51:40ac06941edc
Added node existance checks.
Added Storage support for adding and removing subscriptions.
Added SubscriptionService as subclass of backend.SubscriptionService.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Wed, 03 Nov 2004 17:27:23 +0000 |
parents | 4447b3c5b857 |
children | 7c4dfef5d964 |
comparison
equal
deleted
inserted
replaced
50:64f0986d8b35 | 51:40ac06941edc |
---|---|
6 | 6 |
7 class Storage: | 7 class Storage: |
8 def __init__(self, user, database): | 8 def __init__(self, user, database): |
9 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, | 9 self.dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, |
10 database=database) | 10 database=database) |
11 | |
12 def _check_node_exists(self, cursor, node_id): | |
13 cursor.execute("""SELECT id FROM nodes WHERE node=%s""", | |
14 (node_id.encode('utf8'))) | |
15 if not cursor.fetchone(): | |
16 raise backend.NodeNotFound | |
17 else: | |
18 return | |
11 | 19 |
12 def _get_node_configuration(self, cursor, node_id): | 20 def _get_node_configuration(self, cursor, node_id): |
13 configuration = {} | 21 configuration = {} |
14 cursor.execute("""SELECT persistent, deliver_payload FROM nodes | 22 cursor.execute("""SELECT persistent, deliver_payload FROM nodes |
15 WHERE node=%s""", | 23 WHERE node=%s""", |
23 | 31 |
24 def get_node_configuration(self, node_id): | 32 def get_node_configuration(self, node_id): |
25 return self.dbpool.runInteraction(self._get_node_configuration, node_id) | 33 return self.dbpool.runInteraction(self._get_node_configuration, node_id) |
26 | 34 |
27 def _get_affiliation(self, cursor, node_id, entity): | 35 def _get_affiliation(self, cursor, node_id, entity): |
36 self._check_node_exists(cursor, node_id) | |
28 cursor.execute("""SELECT affiliation FROM affiliations | 37 cursor.execute("""SELECT affiliation FROM affiliations |
29 JOIN nodes ON (node_id=nodes.id) | 38 JOIN nodes ON (node_id=nodes.id) |
30 JOIN entities ON (entity_id=entities.id) | 39 JOIN entities ON (entity_id=entities.id) |
31 WHERE node=%s AND jid=%s""", | 40 WHERE node=%s AND jid=%s""", |
32 (node_id.encode('utf8'), entity.encode('utf8'))) | 41 (node_id.encode('utf8'), entity.encode('utf8'))) |
39 def get_affiliation(self, node_id, entity): | 48 def get_affiliation(self, node_id, entity): |
40 return self.dbpool.runInteraction(self._get_affiliation, node_id, | 49 return self.dbpool.runInteraction(self._get_affiliation, node_id, |
41 entity) | 50 entity) |
42 | 51 |
43 def get_subscribers(self, node_id): | 52 def get_subscribers(self, node_id): |
53 self._check_node_exists(cursor, node_id) | |
44 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions | 54 d = self.dbpool.runQuery("""SELECT jid, resource FROM subscriptions |
45 JOIN nodes ON (node_id=nodes.id) | 55 JOIN nodes ON (node_id=nodes.id) |
46 JOIN entities ON (entity_id=entities.id) | 56 JOIN entities ON (entity_id=entities.id) |
47 WHERE node=%s AND | 57 WHERE node=%s AND |
48 subscription='subscribed'""", | 58 subscription='subscribed'""", |
56 def store_items(self, node_id, items, publisher): | 66 def store_items(self, node_id, items, publisher): |
57 return self.dbpool.runInteraction(self._store_items, node_id, items, | 67 return self.dbpool.runInteraction(self._store_items, node_id, items, |
58 publisher) | 68 publisher) |
59 | 69 |
60 def _store_items(self, cursor, node_id, items, publisher): | 70 def _store_items(self, cursor, node_id, items, publisher): |
71 self._check_node_exists(cursor, node_id) | |
61 for item in items: | 72 for item in items: |
62 self._store_item(cursor, node_id, item, publisher) | 73 self._store_item(cursor, node_id, item, publisher) |
63 | 74 |
64 def _store_item(self, cursor, node_id, item, publisher): | 75 def _store_item(self, cursor, node_id, item, publisher): |
65 data = item.toXml() | 76 data = item.toXml() |
79 (item["id"].encode('utf8'), | 90 (item["id"].encode('utf8'), |
80 publisher.encode('utf8'), | 91 publisher.encode('utf8'), |
81 data.encode('utf8'), | 92 data.encode('utf8'), |
82 node_id.encode('utf8'))) | 93 node_id.encode('utf8'))) |
83 | 94 |
95 def add_subscription(self, node_id, subscriber, state): | |
96 return self.dbpool.runInteraction(self._add_subscription, node_id, | |
97 subscriber, state) | |
98 | |
99 def _add_subscription(self, cursor, node_id, subscriber, state): | |
100 self._check_node_exists(cursor, node_id) | |
101 subscriber = jid.JID(subscriber) | |
102 userhost = subscriber.userhost() | |
103 resource = subscriber.resource or '' | |
104 | |
105 try: | |
106 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | |
107 (userhost.encode('utf8'))) | |
108 except: | |
109 pass | |
110 | |
111 try: | |
112 cursor.execute("""INSERT INTO subscriptions | |
113 (node_id, entity_id, resource, subscription) | |
114 SELECT n.id, e.id, %s, %s FROM | |
115 (SELECT id FROM nodes WHERE node=%s) AS n | |
116 CROSS JOIN | |
117 (SELECT id FROM entities WHERE jid=%s) AS e""", | |
118 (resource.encode('utf8'), | |
119 state.encode('utf8'), | |
120 node_id.encode('utf8'), | |
121 userhost.encode('utf8'))) | |
122 except: | |
123 cursor.execute("""SELECT subscription FROM subscriptions | |
124 JOIN nodes ON (nodes.id=subscriptions.node_id) | |
125 JOIN entities ON | |
126 (entities.id=subscriptions.entity_id) | |
127 WHERE node=%s AND jid=%s AND resource=%s""", | |
128 (node_id.encode('utf8'), | |
129 userhost.encode('utf8'), | |
130 resource.encode('utf8'))) | |
131 state = cursor.fetchone()[0] | |
132 | |
133 return {'node': node_id, | |
134 'jid': subscriber.full(), | |
135 'subscription': state} | |
136 | |
137 def remove_subscription(self, node_id, subscriber): | |
138 return self.dbpool.runInteraction(self._remove_subscription, node_id, | |
139 subscriber) | |
140 | |
141 def _remove_subscription(self, cursor, node_id, subscriber): | |
142 self._check_node_exists(cursor, node_id) | |
143 subscriber = jid.JID(subscriber) | |
144 userhost = subscriber.userhost() | |
145 resource = subscriber.resource or '' | |
146 | |
147 cursor.execute("""DELETE FROM subscriptions WHERE | |
148 node_id=(SELECT id FROM nodes WHERE node=%s) AND | |
149 entity_id=(SELECT id FROM entities WHERE jid=%s) | |
150 AND resource=%s""", | |
151 (node_id.encode('utf8'), | |
152 userhost.encode('utf8'), | |
153 resource.encode('utf8'))) | |
154 if cursor.rowcount != 1: | |
155 raise backend.NotSubscribed | |
156 | |
157 return None | |
158 | |
84 class BackendService(backend.BackendService): | 159 class BackendService(backend.BackendService): |
85 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ | 160 """ PostgreSQL backend Service for a JEP-0060 pubsub service """ |
86 | 161 |
87 class PublishService(backend.PublishService): | 162 class PublishService(backend.PublishService): |
88 pass | 163 pass |
89 | 164 |
90 class NotificationService(backend.NotificationService): | 165 class NotificationService(backend.NotificationService): |
91 pass | 166 pass |
167 | |
168 class SubscriptionService(backend.SubscriptionService): | |
169 pass |