Mercurial > libervia-pubsub
comparison idavoll/pgsql_storage.py @ 121:4f0113adb7ed
Add Node._check_node_exists() calls to all Node methods, because nodes could
have
been deleted in between calls.
Add Node.get_subscription().
Only fire deferred (with None) on success of Node.add_subscription().
Fix Node.set_configuration() to actually work and only update the Node objects
configuration when the SQL query has succeeded.
Implement Node.remove_subscription().
Implement Node.is_subscribed().
Implement LeafNode methods (unchecked!).
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 12 Apr 2005 12:26:05 +0000 |
parents | dfef919aaf1b |
children | c4ee16bc48e5 |
comparison
equal
deleted
inserted
replaced
120:8892331314c8 | 121:4f0113adb7ed |
---|---|
105 | 105 |
106 def __init__(self, node_id, config): | 106 def __init__(self, node_id, config): |
107 self.id = node_id | 107 self.id = node_id |
108 self._config = config | 108 self._config = config |
109 | 109 |
110 def _check_node_exists(self, cursor): | |
111 cursor.execute("""SELECT id FROM nodes WHERE node=%s""", | |
112 (self.id.encode('utf8'))) | |
113 if not cursor.fetchone(): | |
114 raise backend.NodeNotFound | |
115 | |
110 def get_type(self): | 116 def get_type(self): |
111 return self.type | 117 return self.type |
112 | 118 |
113 def get_configuration(self): | 119 def get_configuration(self): |
114 return self._config | 120 return self._config |
115 | 121 |
116 def set_configuration(self, options): | 122 def set_configuration(self, options): |
117 return self._dbpool.runInteraction(self._set_node_configuration, | 123 return self._dbpool.runInteraction(self._set_configuration, |
118 options) | 124 options) |
119 | 125 |
120 def _set_configuration(self, cursor, options): | 126 def _set_configuration(self, cursor, options): |
127 self._check_node_exists(cursor) | |
128 | |
129 config = copy.copy(self._config) | |
130 | |
121 for option in options: | 131 for option in options: |
122 if option in self._config: | 132 if option in config: |
123 self._config[option] = options[option] | 133 config[option] = options[option] |
124 | 134 |
125 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s | 135 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s |
126 WHERE node=%s""", | 136 WHERE node=%s""", |
127 (self._config["pubsub#persist_items"].encode('utf8'), | 137 (config["pubsub#persist_items"], |
128 self._config["pubsub#deliver_payloads"].encode('utf8'), | 138 config["pubsub#deliver_payloads"], |
129 self.id.encode('utf-8'))) | 139 self.id.encode('utf-8'))) |
140 | |
141 self._config = config | |
130 | 142 |
131 def get_meta_data(self): | 143 def get_meta_data(self): |
132 config = copy.copy(self._config) | 144 config = copy.copy(self._config) |
133 config["pubsub#node_type"] = self.type | 145 config["pubsub#node_type"] = self.type |
134 return config | 146 return config |
135 | 147 |
136 def get_affiliation(self, entity): | 148 def get_affiliation(self, entity): |
137 return self._dbpool.runInteraction(self._get_affiliation, entity) | 149 return self._dbpool.runInteraction(self._get_affiliation, entity) |
138 | 150 |
139 def _get_affiliation(self, cursor, entity): | 151 def _get_affiliation(self, cursor, entity): |
152 self._check_node_exists(cursor) | |
140 cursor.execute("""SELECT affiliation FROM affiliations | 153 cursor.execute("""SELECT affiliation FROM affiliations |
141 JOIN nodes ON (node_id=nodes.id) | 154 JOIN nodes ON (node_id=nodes.id) |
142 JOIN entities ON (entity_id=entities.id) | 155 JOIN entities ON (entity_id=entities.id) |
143 WHERE node=%s AND jid=%s""", | 156 WHERE node=%s AND jid=%s""", |
144 (self.id.encode('utf8'), | 157 (self.id.encode('utf8'), |
147 try: | 160 try: |
148 return cursor.fetchone()[0] | 161 return cursor.fetchone()[0] |
149 except TypeError: | 162 except TypeError: |
150 return None | 163 return None |
151 | 164 |
165 def get_subscription(self, subscriber): | |
166 return self._dbpool.runInteraction(self._get_subscription, subscriber) | |
167 | |
168 def _get_subscription(self, cursor, subscriber): | |
169 self._check_node_exists(cursor) | |
170 | |
171 userhost = subscriber.userhost() | |
172 resource = subscriber.resource or '' | |
173 | |
174 cursor.execute("""SELECT subscription FROM subscriptions | |
175 JOIN nodes ON (nodes.id=subscriptions.node_id) | |
176 JOIN entities ON | |
177 (entities.id=subscriptions.entity_id) | |
178 WHERE node=%s AND jid=%s AND resource=%s""", | |
179 (self.id.encode('utf8'), | |
180 userhost.encode('utf8'), | |
181 resource.encode('utf8'))) | |
182 try: | |
183 return cursor.fetchone()[0] | |
184 except TypeError: | |
185 return None | |
186 | |
152 def add_subscription(self, subscriber, state): | 187 def add_subscription(self, subscriber, state): |
153 return self._dbpool.runInteraction(self._add_subscription, subscriber, | 188 return self._dbpool.runInteraction(self._add_subscription, subscriber, |
154 state) | 189 state) |
155 | 190 |
156 def _add_subscription(self, cursor, subscriber, state): | 191 def _add_subscription(self, cursor, subscriber, state): |
192 self._check_node_exists(cursor) | |
193 | |
157 userhost = subscriber.userhost() | 194 userhost = subscriber.userhost() |
158 resource = subscriber.resource or '' | 195 resource = subscriber.resource or '' |
159 | 196 |
160 try: | 197 try: |
161 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 198 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
173 (resource.encode('utf8'), | 210 (resource.encode('utf8'), |
174 state.encode('utf8'), | 211 state.encode('utf8'), |
175 self.id.encode('utf8'), | 212 self.id.encode('utf8'), |
176 userhost.encode('utf8'))) | 213 userhost.encode('utf8'))) |
177 except cursor._pool.dbapi.OperationalError: | 214 except cursor._pool.dbapi.OperationalError: |
178 cursor.execute("""SELECT subscription FROM subscriptions | 215 raise storage.SubscriptionExists |
179 JOIN nodes ON (nodes.id=subscriptions.node_id) | 216 |
180 JOIN entities ON | 217 def remove_subscription(self, subscriber): |
181 (entities.id=subscriptions.entity_id) | 218 return self._dbpool.runInteraction(self._remove_subscription, |
182 WHERE node=%s AND jid=%s AND resource=%s""", | 219 subscriber) |
183 (self.id.encode('utf8'), | 220 |
184 userhost.encode('utf8'), | 221 def _remove_subscription(self, cursor, subscriber): |
185 resource.encode('utf8'))) | 222 self._check_node_exists(cursor) |
186 state = cursor.fetchone()[0] | 223 |
187 | 224 userhost = subscriber.userhost() |
188 return {'node': self.id, | 225 resource = subscriber.resource or '' |
189 'jid': subscriber, | 226 |
190 'subscription': state} | 227 cursor.execute("""DELETE FROM subscriptions WHERE |
191 | 228 node_id=(SELECT id FROM nodes WHERE node=%s) AND |
192 def remove_subscription(self, subscriber, state): | 229 entity_id=(SELECT id FROM entities WHERE jid=%s) |
193 pass | 230 AND resource=%s""", |
231 (self.id.encode('utf8'), | |
232 userhost.encode('utf8'), | |
233 resource.encode('utf8'))) | |
234 if cursor.rowcount != 1: | |
235 raise storage.SubscriptionNotFound | |
236 | |
237 return None | |
194 | 238 |
195 def get_subscribers(self): | 239 def get_subscribers(self): |
196 d = self._dbpool.runQuery("""SELECT jid, resource FROM subscriptions | 240 d = self._dbpool.runInteraction(self._get_subscribers) |
197 JOIN nodes ON (node_id=nodes.id) | |
198 JOIN entities ON (entity_id=entities.id) | |
199 WHERE node=%s AND | |
200 subscription='subscribed'""", | |
201 (self.id.encode('utf8'),)) | |
202 d.addCallback(self._convert_to_jids) | 241 d.addCallback(self._convert_to_jids) |
203 return d | 242 return d |
204 | 243 |
244 def _get_subscribers(self, cursor): | |
245 self._check_node_exists(cursor) | |
246 cursor.execute("""SELECT jid, resource FROM subscriptions | |
247 JOIN nodes ON (node_id=nodes.id) | |
248 JOIN entities ON (entity_id=entities.id) | |
249 WHERE node=%s AND | |
250 subscription='subscribed'""", | |
251 (self.id.encode('utf8'),)) | |
252 return cursor.fetchall() | |
253 | |
205 def _convert_to_jids(self, list): | 254 def _convert_to_jids(self, list): |
206 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] | 255 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] |
207 | 256 |
208 def is_subscribed(self, subscriber): | 257 def is_subscribed(self, subscriber): |
209 pass | 258 return self._dbpool.runInteraction(self._is_subscribed, subscriber) |
259 | |
260 def _is_subscribed(self, cursor, subscriber): | |
261 self._check_node_exists(cursor) | |
262 | |
263 userhost = subscriber.userhost() | |
264 resource = subscriber.resource or '' | |
265 | |
266 cursor.execute("""SELECT 1 FROM entities | |
267 JOIN subscriptions ON | |
268 (entities.id=subscriptions.entity_id) | |
269 JOIN nodes ON | |
270 (nodes.id=subscriptions.node_id) | |
271 WHERE entities.jid=%s AND resource=%s | |
272 AND node=%s AND subscription='subscribed'""", | |
273 (userhost.encode('utf8'), | |
274 resource.encode('utf8'), | |
275 self.id.encode('utf8'))) | |
276 | |
277 return cursor.fetchone() is not None | |
210 | 278 |
211 class LeafNode(Node): | 279 class LeafNode(Node): |
212 | 280 |
213 implements(storage.ILeafNode) | 281 implements(storage.ILeafNode) |
214 | 282 |
215 type = 'leaf' | 283 type = 'leaf' |
216 | 284 |
217 def store_items(self, items, publisher): | 285 def store_items(self, items, publisher): |
218 return defer.succeed(None) | 286 return self._dbpool.runInteraction(self._store_items, items, publisher) |
287 | |
288 def _store_items(self, cursor, items, publisher): | |
289 self._check_node_exists(cursor) | |
290 for item in items: | |
291 self._store_item(cursor, item, publisher) | |
292 | |
293 def _store_item(self, cursor, item, publisher): | |
294 data = item.toXml() | |
295 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s | |
296 FROM nodes | |
297 WHERE nodes.id = items.node_id AND | |
298 nodes.node = %s and items.item=%s""", | |
299 (publisher.full().encode('utf8'), | |
300 data.encode('utf8'), | |
301 self.id.encode('utf8'), | |
302 item["id"].encode('utf8'))) | |
303 if cursor.rowcount == 1: | |
304 return | |
305 | |
306 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) | |
307 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", | |
308 (item["id"].encode('utf8'), | |
309 publisher.full().encode('utf8'), | |
310 data.encode('utf8'), | |
311 self.id.encode('utf8'))) | |
219 | 312 |
220 def remove_items(self, item_ids): | 313 def remove_items(self, item_ids): |
221 pass | 314 return self._dbpool.runInteraction(self._remove_items, item_ids) |
315 | |
316 def _remove_items(self, cursor, item_ids): | |
317 self._check_node_exists(cursor) | |
318 | |
319 deleted = [] | |
320 | |
321 for item_id in item_ids: | |
322 cursor.execute("""DELETE FROM items WHERE | |
323 node_id=(SELECT id FROM nodes WHERE node=%s) AND | |
324 item=%s""", | |
325 (self.id.encode('utf-8'), | |
326 item_id.encode('utf-8'))) | |
327 | |
328 if cursor.rowcount: | |
329 deleted.append(item_id) | |
330 | |
331 return deleted | |
222 | 332 |
223 def get_items(self, max_items=None): | 333 def get_items(self, max_items=None): |
224 pass | 334 return self._dbpool.runInteraction(self._get_items, max_items) |
225 | 335 |
226 def get_items_by_id(self, item_ids): | 336 def _get_items(self, cursor, max_items): |
227 pass | 337 self._check_node_exists(cursor) |
338 query = """SELECT data FROM nodes JOIN items ON | |
339 (nodes.id=items.node_id) | |
340 WHERE node=%s ORDER BY date DESC""" | |
341 if max_items: | |
342 cursor.execute(query + " LIMIT %s", | |
343 (self.id.encode('utf8'), | |
344 max_items)) | |
345 else: | |
346 cursor.execute(query, (self.id.encode('utf8'))) | |
347 | |
348 result = cursor.fetchall() | |
349 return [r[0] for r in result] | |
350 | |
351 def get_items_by_ids(self, item_ids): | |
352 return self._dbpool.runInteraction(self._get_items_by_ids, item_ids) | |
353 | |
354 def _get_items_by_ids(self, cursor, item_ids): | |
355 self._check_node_exists(cursor) | |
356 items = [] | |
357 for item_id in item_ids: | |
358 cursor.execute("""SELECT data FROM nodes JOIN items ON | |
359 (nodes.id=items.node_id) | |
360 WHERE node=%s AND item=%s""", | |
361 (self.id.encode('utf8'), | |
362 item_id.encode('utf8'))) | |
363 result = cursor.fetchone() | |
364 if result: | |
365 items.append(result[0]) | |
366 return items | |
228 | 367 |
229 def purge(self): | 368 def purge(self): |
230 pass | 369 return self._dbpool.runInteraction(self._purge) |
370 | |
371 def _purge_node(self, cursor): | |
372 self._check_node_exists(cursor) | |
373 | |
374 cursor.execute("""DELETE FROM items WHERE | |
375 node_id=(SELECT id FROM nodes WHERE node=%s)""", | |
376 (self.id.encode('utf-8'),)) | |
377 |