Mercurial > libervia-pubsub
comparison idavoll/pgsql_storage.py @ 107:d252d793f0ed
Initial revision.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Fri, 08 Apr 2005 10:15:02 +0000 |
parents | |
children | dfef919aaf1b |
comparison
equal
deleted
inserted
replaced
106:dc36882d2620 | 107:d252d793f0ed |
---|---|
1 import copy | |
2 import storage | |
3 from twisted.enterprise import adbapi | |
4 from twisted.internet import defer | |
5 from twisted.words.protocols.jabber import jid | |
6 from zope.interface import implements | |
7 | |
8 class Storage: | |
9 | |
10 implements(storage.IStorage) | |
11 | |
12 def __init__(self, user, database): | |
13 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, | |
14 database=database) | |
15 | |
16 def get_node(self, node_id): | |
17 return self._dbpool.runInteraction(self._get_node, node_id) | |
18 | |
19 def _get_node(self, cursor, node_id): | |
20 configuration = {} | |
21 cursor.execute("""SELECT persistent, deliver_payload FROM nodes | |
22 WHERE node=%s""", | |
23 (node_id,)) | |
24 try: | |
25 (configuration["pubsub#persist_items"], | |
26 configuration["pubsub#deliver_payloads"]) = cursor.fetchone() | |
27 except TypeError: | |
28 raise storage.NodeNotFound | |
29 else: | |
30 node = LeafNode(node_id, configuration) | |
31 node._dbpool = self._dbpool | |
32 return node | |
33 | |
34 def get_node_ids(self): | |
35 d = self._dbpool.runQuery("""SELECT node from nodes""") | |
36 d.addCallback(lambda results: [r[0] for r in results]) | |
37 return d | |
38 | |
39 def create_node(self, node_id, owner, type='leaf'): | |
40 return self._dbpool.runInteraction(self._create_node, node_id, owner) | |
41 | |
42 def _create_node(self, cursor, node_id, owner): | |
43 try: | |
44 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", | |
45 (node_id.encode('utf8'))) | |
46 except: | |
47 raise storage.NodeExists | |
48 | |
49 cursor.execute("""SELECT 1 from entities where jid=%s""", | |
50 (owner.full().encode('utf8'))) | |
51 | |
52 if not cursor.fetchone(): | |
53 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | |
54 (owner.full().encode('utf8'))) | |
55 | |
56 cursor.execute("""INSERT INTO affiliations | |
57 (node_id, entity_id, affiliation) | |
58 SELECT n.id, e.id, 'owner' FROM | |
59 (SELECT id FROM nodes WHERE node=%s) AS n | |
60 CROSS JOIN | |
61 (SELECT id FROM entities WHERE jid=%s) AS e""", | |
62 (node_id.encode('utf8'), | |
63 owner.full().encode('utf8'))) | |
64 | |
65 def delete_node(self, node_id): | |
66 return self._dbpool.runInteraction(self._delete_node, node_id) | |
67 | |
68 def _delete_node(self, cursor, node_id): | |
69 cursor.execute("""DELETE FROM nodes WHERE node=%s""", | |
70 (node_id.encode('utf-8'),)) | |
71 | |
72 if cursor.rowcount != 1: | |
73 raise storage.NodeNotFound | |
74 | |
75 def get_affiliations(self, entity): | |
76 d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities | |
77 JOIN affiliations ON | |
78 (affiliations.entity_id=entities.id) | |
79 JOIN nodes ON | |
80 (nodes.id=affiliations.node_id) | |
81 WHERE jid=%s""", | |
82 (entity.full().encode('utf8'),)) | |
83 d.addCallback(lambda results: [tuple(r) for r in results]) | |
84 return d | |
85 | |
86 def get_subscriptions(self, entity): | |
87 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription | |
88 FROM entities JOIN subscriptions ON | |
89 (subscriptions.entity_id=entities.id) | |
90 JOIN nodes ON | |
91 (nodes.id=subscriptions.node_id) | |
92 WHERE jid=%s""", | |
93 (entity.userhost().encode('utf8'),)) | |
94 d.addCallback(self._convert_subscription_jids) | |
95 return d | |
96 | |
97 def _convert_subscription_jids(self, subscriptions): | |
98 return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription) | |
99 for node, subscriber, resource, subscription in subscriptions] | |
100 | |
101 class Node: | |
102 | |
103 implements(storage.INode) | |
104 | |
105 def __init__(self, node_id, config): | |
106 self.id = node_id | |
107 self._config = config | |
108 | |
109 def get_type(self): | |
110 return self.type | |
111 | |
112 def get_configuration(self): | |
113 return self._config | |
114 | |
115 def set_configuration(self, options): | |
116 return self._dbpool.runInteraction(self._set_node_configuration, | |
117 options) | |
118 | |
119 def _set_configuration(self, cursor, options): | |
120 for option in options: | |
121 if option in self._config: | |
122 self._config[option] = options[option] | |
123 | |
124 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s | |
125 WHERE node=%s""", | |
126 (self._config["pubsub#persist_items"].encode('utf8'), | |
127 self._config["pubsub#deliver_payloads"].encode('utf8'), | |
128 self.id.encode('utf-8'))) | |
129 | |
130 def get_meta_data(self): | |
131 config = copy.copy(self._config) | |
132 config["pubsub#node_type"] = self.type | |
133 return config | |
134 | |
135 def get_affiliation(self, entity): | |
136 return self._dbpool.runInteraction(self._get_affiliation, entity) | |
137 | |
138 def _get_affiliation(self, cursor, entity): | |
139 cursor.execute("""SELECT affiliation FROM affiliations | |
140 JOIN nodes ON (node_id=nodes.id) | |
141 JOIN entities ON (entity_id=entities.id) | |
142 WHERE node=%s AND jid=%s""", | |
143 (self.id.encode('utf8'), | |
144 entity.full().encode('utf8'))) | |
145 | |
146 try: | |
147 return cursor.fetchone()[0] | |
148 except TypeError: | |
149 return None | |
150 | |
151 def add_subscription(self, subscriber, state): | |
152 return self._dbpool.runInteraction(self._add_subscription, subscriber, | |
153 state) | |
154 | |
155 def _add_subscription(self, cursor, subscriber, state): | |
156 userhost = subscriber.userhost() | |
157 resource = subscriber.resource or '' | |
158 | |
159 try: | |
160 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | |
161 (userhost.encode('utf8'))) | |
162 except: | |
163 pass | |
164 | |
165 try: | |
166 cursor.execute("""INSERT INTO subscriptions | |
167 (node_id, entity_id, resource, subscription) | |
168 SELECT n.id, e.id, %s, %s FROM | |
169 (SELECT id FROM nodes WHERE node=%s) AS n | |
170 CROSS JOIN | |
171 (SELECT id FROM entities WHERE jid=%s) AS e""", | |
172 (resource.encode('utf8'), | |
173 state.encode('utf8'), | |
174 self.id.encode('utf8'), | |
175 userhost.encode('utf8'))) | |
176 except: | |
177 cursor.execute("""SELECT subscription FROM subscriptions | |
178 JOIN nodes ON (nodes.id=subscriptions.node_id) | |
179 JOIN entities ON | |
180 (entities.id=subscriptions.entity_id) | |
181 WHERE node=%s AND jid=%s AND resource=%s""", | |
182 (self.id.encode('utf8'), | |
183 userhost.encode('utf8'), | |
184 resource.encode('utf8'))) | |
185 state = cursor.fetchone()[0] | |
186 | |
187 return {'node': self.id, | |
188 'jid': subscriber, | |
189 'subscription': state} | |
190 | |
191 def remove_subscription(self, subscriber, state): | |
192 pass | |
193 | |
194 def get_subscribers(self): | |
195 d = self._dbpool.runQuery("""SELECT jid, resource FROM subscriptions | |
196 JOIN nodes ON (node_id=nodes.id) | |
197 JOIN entities ON (entity_id=entities.id) | |
198 WHERE node=%s AND | |
199 subscription='subscribed'""", | |
200 (self.id.encode('utf8'),)) | |
201 d.addCallback(self._convert_to_jids) | |
202 return d | |
203 | |
204 def _convert_to_jids(self, list): | |
205 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] | |
206 | |
207 def is_subscribed(self, subscriber): | |
208 pass | |
209 | |
210 class LeafNode(Node): | |
211 | |
212 implements(storage.ILeafNode) | |
213 | |
214 type = 'leaf' | |
215 | |
216 def store_items(self, items, publisher): | |
217 return defer.succeed(None) | |
218 | |
219 def remove_items(self, item_ids): | |
220 pass | |
221 | |
222 def get_items(self, max_items=None): | |
223 pass | |
224 | |
225 def get_items_by_id(self, item_ids): | |
226 pass | |
227 | |
228 def purge(self): | |
229 pass |