Mercurial > libervia-pubsub
comparison idavoll/pgsql_storage.py @ 136:327de183f48d
Discover client_encoding parameter to pyPgSQL, removing all encode() calls.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 25 Apr 2005 12:50:54 +0000 |
parents | d3689da18ed2 |
children | 812300cdbc22 |
comparison
equal
deleted
inserted
replaced
135:49acdc6a2be4 | 136:327de183f48d |
---|---|
8 class Storage: | 8 class Storage: |
9 | 9 |
10 implements(storage.IStorage) | 10 implements(storage.IStorage) |
11 | 11 |
12 def __init__(self, user, database): | 12 def __init__(self, user, database): |
13 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, | 13 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', |
14 database=database) | 14 user=user, |
15 database=database, | |
16 client_encoding='utf-8' | |
17 ) | |
15 | 18 |
16 def get_node(self, node_id): | 19 def get_node(self, node_id): |
17 return self._dbpool.runInteraction(self._get_node, node_id) | 20 return self._dbpool.runInteraction(self._get_node, node_id) |
18 | 21 |
19 def _get_node(self, cursor, node_id): | 22 def _get_node(self, cursor, node_id): |
38 | 41 |
39 def create_node(self, node_id, owner, type='leaf'): | 42 def create_node(self, node_id, owner, type='leaf'): |
40 return self._dbpool.runInteraction(self._create_node, node_id, owner) | 43 return self._dbpool.runInteraction(self._create_node, node_id, owner) |
41 | 44 |
42 def _create_node(self, cursor, node_id, owner): | 45 def _create_node(self, cursor, node_id, owner): |
43 node_id = node_id.encode('utf-8') | 46 node_id = node_id |
44 owner = owner.userhost().encode('utf-8') | 47 owner = owner.userhost() |
45 try: | 48 try: |
46 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", | 49 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", |
47 (node_id)) | 50 (node_id)) |
48 except cursor._pool.dbapi.OperationalError: | 51 except cursor._pool.dbapi.OperationalError: |
49 raise storage.NodeExists | 52 raise storage.NodeExists |
66 def delete_node(self, node_id): | 69 def delete_node(self, node_id): |
67 return self._dbpool.runInteraction(self._delete_node, node_id) | 70 return self._dbpool.runInteraction(self._delete_node, node_id) |
68 | 71 |
69 def _delete_node(self, cursor, node_id): | 72 def _delete_node(self, cursor, node_id): |
70 cursor.execute("""DELETE FROM nodes WHERE node=%s""", | 73 cursor.execute("""DELETE FROM nodes WHERE node=%s""", |
71 (node_id.encode('utf-8'),)) | 74 (node_id,)) |
72 | 75 |
73 if cursor.rowcount != 1: | 76 if cursor.rowcount != 1: |
74 raise storage.NodeNotFound | 77 raise storage.NodeNotFound |
75 | 78 |
76 def get_affiliations(self, entity): | 79 def get_affiliations(self, entity): |
78 JOIN affiliations ON | 81 JOIN affiliations ON |
79 (affiliations.entity_id=entities.id) | 82 (affiliations.entity_id=entities.id) |
80 JOIN nodes ON | 83 JOIN nodes ON |
81 (nodes.id=affiliations.node_id) | 84 (nodes.id=affiliations.node_id) |
82 WHERE jid=%s""", | 85 WHERE jid=%s""", |
83 (entity.userhost().encode('utf8'),)) | 86 (entity.userhost(),)) |
84 d.addCallback(lambda results: [tuple(r) for r in results]) | 87 d.addCallback(lambda results: [tuple(r) for r in results]) |
85 return d | 88 return d |
86 | 89 |
87 def get_subscriptions(self, entity): | 90 def get_subscriptions(self, entity): |
88 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription | 91 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription |
89 FROM entities JOIN subscriptions ON | 92 FROM entities JOIN subscriptions ON |
90 (subscriptions.entity_id=entities.id) | 93 (subscriptions.entity_id=entities.id) |
91 JOIN nodes ON | 94 JOIN nodes ON |
92 (nodes.id=subscriptions.node_id) | 95 (nodes.id=subscriptions.node_id) |
93 WHERE jid=%s""", | 96 WHERE jid=%s""", |
94 (entity.userhost().encode('utf8'),)) | 97 (entity.userhost(),)) |
95 d.addCallback(self._convert_subscription_jids) | 98 d.addCallback(self._convert_subscription_jids) |
96 return d | 99 return d |
97 | 100 |
98 def _convert_subscription_jids(self, subscriptions): | 101 def _convert_subscription_jids(self, subscriptions): |
99 return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription) | 102 return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription) |
107 self.id = node_id | 110 self.id = node_id |
108 self._config = config | 111 self._config = config |
109 | 112 |
110 def _check_node_exists(self, cursor): | 113 def _check_node_exists(self, cursor): |
111 cursor.execute("""SELECT id FROM nodes WHERE node=%s""", | 114 cursor.execute("""SELECT id FROM nodes WHERE node=%s""", |
112 (self.id.encode('utf8'))) | 115 (self.id)) |
113 if not cursor.fetchone(): | 116 if not cursor.fetchone(): |
114 raise backend.NodeNotFound | 117 raise backend.NodeNotFound |
115 | 118 |
116 def get_type(self): | 119 def get_type(self): |
117 return self.type | 120 return self.type |
134 self._check_node_exists(cursor) | 137 self._check_node_exists(cursor) |
135 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s | 138 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s |
136 WHERE node=%s""", | 139 WHERE node=%s""", |
137 (config["pubsub#persist_items"], | 140 (config["pubsub#persist_items"], |
138 config["pubsub#deliver_payloads"], | 141 config["pubsub#deliver_payloads"], |
139 self.id.encode('utf-8'))) | 142 self.id)) |
140 | 143 |
141 def _set_cached_configuration(self, void, config): | 144 def _set_cached_configuration(self, void, config): |
142 self._config = config | 145 self._config = config |
143 | 146 |
144 def get_meta_data(self): | 147 def get_meta_data(self): |
153 self._check_node_exists(cursor) | 156 self._check_node_exists(cursor) |
154 cursor.execute("""SELECT affiliation FROM affiliations | 157 cursor.execute("""SELECT affiliation FROM affiliations |
155 JOIN nodes ON (node_id=nodes.id) | 158 JOIN nodes ON (node_id=nodes.id) |
156 JOIN entities ON (entity_id=entities.id) | 159 JOIN entities ON (entity_id=entities.id) |
157 WHERE node=%s AND jid=%s""", | 160 WHERE node=%s AND jid=%s""", |
158 (self.id.encode('utf8'), | 161 (self.id, |
159 entity.userhost().encode('utf8'))) | 162 entity.userhost())) |
160 | 163 |
161 try: | 164 try: |
162 return cursor.fetchone()[0] | 165 return cursor.fetchone()[0] |
163 except TypeError: | 166 except TypeError: |
164 return None | 167 return None |
175 cursor.execute("""SELECT subscription FROM subscriptions | 178 cursor.execute("""SELECT subscription FROM subscriptions |
176 JOIN nodes ON (nodes.id=subscriptions.node_id) | 179 JOIN nodes ON (nodes.id=subscriptions.node_id) |
177 JOIN entities ON | 180 JOIN entities ON |
178 (entities.id=subscriptions.entity_id) | 181 (entities.id=subscriptions.entity_id) |
179 WHERE node=%s AND jid=%s AND resource=%s""", | 182 WHERE node=%s AND jid=%s AND resource=%s""", |
180 (self.id.encode('utf8'), | 183 (self.id, |
181 userhost.encode('utf8'), | 184 userhost, |
182 resource.encode('utf8'))) | 185 resource)) |
183 try: | 186 try: |
184 return cursor.fetchone()[0] | 187 return cursor.fetchone()[0] |
185 except TypeError: | 188 except TypeError: |
186 return None | 189 return None |
187 | 190 |
195 userhost = subscriber.userhost() | 198 userhost = subscriber.userhost() |
196 resource = subscriber.resource or '' | 199 resource = subscriber.resource or '' |
197 | 200 |
198 try: | 201 try: |
199 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | 202 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
200 (userhost.encode('utf8'))) | 203 (userhost)) |
201 except cursor._pool.dbapi.OperationalError: | 204 except cursor._pool.dbapi.OperationalError: |
202 pass | 205 pass |
203 | 206 |
204 try: | 207 try: |
205 cursor.execute("""INSERT INTO subscriptions | 208 cursor.execute("""INSERT INTO subscriptions |
206 (node_id, entity_id, resource, subscription) | 209 (node_id, entity_id, resource, subscription) |
207 SELECT n.id, e.id, %s, %s FROM | 210 SELECT n.id, e.id, %s, %s FROM |
208 (SELECT id FROM nodes WHERE node=%s) AS n | 211 (SELECT id FROM nodes WHERE node=%s) AS n |
209 CROSS JOIN | 212 CROSS JOIN |
210 (SELECT id FROM entities WHERE jid=%s) AS e""", | 213 (SELECT id FROM entities WHERE jid=%s) AS e""", |
211 (resource.encode('utf8'), | 214 (resource, |
212 state.encode('utf8'), | 215 state, |
213 self.id.encode('utf8'), | 216 self.id, |
214 userhost.encode('utf8'))) | 217 userhost)) |
215 except cursor._pool.dbapi.OperationalError: | 218 except cursor._pool.dbapi.OperationalError: |
216 raise storage.SubscriptionExists | 219 raise storage.SubscriptionExists |
217 | 220 |
218 def remove_subscription(self, subscriber): | 221 def remove_subscription(self, subscriber): |
219 return self._dbpool.runInteraction(self._remove_subscription, | 222 return self._dbpool.runInteraction(self._remove_subscription, |
227 | 230 |
228 cursor.execute("""DELETE FROM subscriptions WHERE | 231 cursor.execute("""DELETE FROM subscriptions WHERE |
229 node_id=(SELECT id FROM nodes WHERE node=%s) AND | 232 node_id=(SELECT id FROM nodes WHERE node=%s) AND |
230 entity_id=(SELECT id FROM entities WHERE jid=%s) | 233 entity_id=(SELECT id FROM entities WHERE jid=%s) |
231 AND resource=%s""", | 234 AND resource=%s""", |
232 (self.id.encode('utf8'), | 235 (self.id, |
233 userhost.encode('utf8'), | 236 userhost, |
234 resource.encode('utf8'))) | 237 resource)) |
235 if cursor.rowcount != 1: | 238 if cursor.rowcount != 1: |
236 raise storage.SubscriptionNotFound | 239 raise storage.SubscriptionNotFound |
237 | 240 |
238 return None | 241 return None |
239 | 242 |
247 cursor.execute("""SELECT jid, resource FROM subscriptions | 250 cursor.execute("""SELECT jid, resource FROM subscriptions |
248 JOIN nodes ON (node_id=nodes.id) | 251 JOIN nodes ON (node_id=nodes.id) |
249 JOIN entities ON (entity_id=entities.id) | 252 JOIN entities ON (entity_id=entities.id) |
250 WHERE node=%s AND | 253 WHERE node=%s AND |
251 subscription='subscribed'""", | 254 subscription='subscribed'""", |
252 (self.id.encode('utf8'),)) | 255 (self.id,)) |
253 return cursor.fetchall() | 256 return cursor.fetchall() |
254 | 257 |
255 def _convert_to_jids(self, list): | 258 def _convert_to_jids(self, list): |
256 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] | 259 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] |
257 | 260 |
269 (entities.id=subscriptions.entity_id) | 272 (entities.id=subscriptions.entity_id) |
270 JOIN nodes ON | 273 JOIN nodes ON |
271 (nodes.id=subscriptions.node_id) | 274 (nodes.id=subscriptions.node_id) |
272 WHERE entities.jid=%s AND resource=%s | 275 WHERE entities.jid=%s AND resource=%s |
273 AND node=%s AND subscription='subscribed'""", | 276 AND node=%s AND subscription='subscribed'""", |
274 (userhost.encode('utf8'), | 277 (userhost, |
275 resource.encode('utf8'), | 278 resource, |
276 self.id.encode('utf8'))) | 279 self.id)) |
277 | 280 |
278 return cursor.fetchone() is not None | 281 return cursor.fetchone() is not None |
279 | 282 |
280 class LeafNode(Node): | 283 class LeafNode(Node): |
281 | 284 |
295 data = item.toXml() | 298 data = item.toXml() |
296 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s | 299 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s |
297 FROM nodes | 300 FROM nodes |
298 WHERE nodes.id = items.node_id AND | 301 WHERE nodes.id = items.node_id AND |
299 nodes.node = %s and items.item=%s""", | 302 nodes.node = %s and items.item=%s""", |
300 (publisher.full().encode('utf8'), | 303 (publisher.full(), |
301 data, | 304 data, |
302 self.id.encode('utf8'), | 305 self.id, |
303 item["id"].encode('utf8'))) | 306 item["id"])) |
304 if cursor.rowcount == 1: | 307 if cursor.rowcount == 1: |
305 return | 308 return |
306 | 309 |
307 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) | 310 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) |
308 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", | 311 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", |
309 (item["id"].encode('utf8'), | 312 (item["id"], |
310 publisher.full().encode('utf8'), | 313 publisher.full(), |
311 data, | 314 data, |
312 self.id.encode('utf8'))) | 315 self.id)) |
313 | 316 |
314 def remove_items(self, item_ids): | 317 def remove_items(self, item_ids): |
315 return self._dbpool.runInteraction(self._remove_items, item_ids) | 318 return self._dbpool.runInteraction(self._remove_items, item_ids) |
316 | 319 |
317 def _remove_items(self, cursor, item_ids): | 320 def _remove_items(self, cursor, item_ids): |
321 | 324 |
322 for item_id in item_ids: | 325 for item_id in item_ids: |
323 cursor.execute("""DELETE FROM items WHERE | 326 cursor.execute("""DELETE FROM items WHERE |
324 node_id=(SELECT id FROM nodes WHERE node=%s) AND | 327 node_id=(SELECT id FROM nodes WHERE node=%s) AND |
325 item=%s""", | 328 item=%s""", |
326 (self.id.encode('utf-8'), | 329 (self.id, |
327 item_id.encode('utf-8'))) | 330 item_id)) |
328 | 331 |
329 if not cursor.rowcount: | 332 if not cursor.rowcount: |
330 raise storage.ItemNotFound | 333 raise storage.ItemNotFound |
331 | 334 |
332 def get_items(self, max_items=None): | 335 def get_items(self, max_items=None): |
337 query = """SELECT data FROM nodes JOIN items ON | 340 query = """SELECT data FROM nodes JOIN items ON |
338 (nodes.id=items.node_id) | 341 (nodes.id=items.node_id) |
339 WHERE node=%s ORDER BY date DESC""" | 342 WHERE node=%s ORDER BY date DESC""" |
340 if max_items: | 343 if max_items: |
341 cursor.execute(query + " LIMIT %s", | 344 cursor.execute(query + " LIMIT %s", |
342 (self.id.encode('utf8'), | 345 (self.id, |
343 max_items)) | 346 max_items)) |
344 else: | 347 else: |
345 cursor.execute(query, (self.id.encode('utf8'))) | 348 cursor.execute(query, (self.id)) |
346 | 349 |
347 result = cursor.fetchall() | 350 result = cursor.fetchall() |
348 return [unicode(r[0], 'utf8') for r in result] | 351 return [unicode(r[0], 'utf-8') for r in result] |
349 | 352 |
350 def get_items_by_id(self, item_ids): | 353 def get_items_by_id(self, item_ids): |
351 return self._dbpool.runInteraction(self._get_items_by_id, item_ids) | 354 return self._dbpool.runInteraction(self._get_items_by_id, item_ids) |
352 | 355 |
353 def _get_items_by_id(self, cursor, item_ids): | 356 def _get_items_by_id(self, cursor, item_ids): |
355 items = [] | 358 items = [] |
356 for item_id in item_ids: | 359 for item_id in item_ids: |
357 cursor.execute("""SELECT data FROM nodes JOIN items ON | 360 cursor.execute("""SELECT data FROM nodes JOIN items ON |
358 (nodes.id=items.node_id) | 361 (nodes.id=items.node_id) |
359 WHERE node=%s AND item=%s""", | 362 WHERE node=%s AND item=%s""", |
360 (self.id.encode('utf8'), | 363 (self.id, |
361 item_id.encode('utf8'))) | 364 item_id)) |
362 result = cursor.fetchone() | 365 result = cursor.fetchone() |
363 if result: | 366 if result: |
364 items.append(unicode(result[0], 'utf8')) | 367 items.append(unicode(result[0], 'utf-8')) |
365 return items | 368 return items |
366 | 369 |
367 def purge(self): | 370 def purge(self): |
368 return self._dbpool.runInteraction(self._purge) | 371 return self._dbpool.runInteraction(self._purge) |
369 | 372 |
370 def _purge(self, cursor): | 373 def _purge(self, cursor): |
371 self._check_node_exists(cursor) | 374 self._check_node_exists(cursor) |
372 | 375 |
373 cursor.execute("""DELETE FROM items WHERE | 376 cursor.execute("""DELETE FROM items WHERE |
374 node_id=(SELECT id FROM nodes WHERE node=%s)""", | 377 node_id=(SELECT id FROM nodes WHERE node=%s)""", |
375 (self.id.encode('utf-8'),)) | 378 (self.id,)) |
376 | 379 |