comparison idavoll/pgsql_storage.py @ 136:327de183f48d

Discover client_encoding parameter to pyPgSQL, removing all encode() calls.
author Ralph Meijer <ralphm@ik.nu>
date Mon, 25 Apr 2005 12:50:54 +0000
parents d3689da18ed2
children 812300cdbc22
comparison
equal deleted inserted replaced
135:49acdc6a2be4 136:327de183f48d
8 class Storage: 8 class Storage:
9 9
10 implements(storage.IStorage) 10 implements(storage.IStorage)
11 11
12 def __init__(self, user, database): 12 def __init__(self, user, database):
13 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, 13 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL',
14 database=database) 14 user=user,
15 database=database,
16 client_encoding='utf-8'
17 )
15 18
16 def get_node(self, node_id): 19 def get_node(self, node_id):
17 return self._dbpool.runInteraction(self._get_node, node_id) 20 return self._dbpool.runInteraction(self._get_node, node_id)
18 21
19 def _get_node(self, cursor, node_id): 22 def _get_node(self, cursor, node_id):
38 41
39 def create_node(self, node_id, owner, type='leaf'): 42 def create_node(self, node_id, owner, type='leaf'):
40 return self._dbpool.runInteraction(self._create_node, node_id, owner) 43 return self._dbpool.runInteraction(self._create_node, node_id, owner)
41 44
42 def _create_node(self, cursor, node_id, owner): 45 def _create_node(self, cursor, node_id, owner):
43 node_id = node_id.encode('utf-8') 46 node_id = node_id
44 owner = owner.userhost().encode('utf-8') 47 owner = owner.userhost()
45 try: 48 try:
46 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", 49 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""",
47 (node_id)) 50 (node_id))
48 except cursor._pool.dbapi.OperationalError: 51 except cursor._pool.dbapi.OperationalError:
49 raise storage.NodeExists 52 raise storage.NodeExists
66 def delete_node(self, node_id): 69 def delete_node(self, node_id):
67 return self._dbpool.runInteraction(self._delete_node, node_id) 70 return self._dbpool.runInteraction(self._delete_node, node_id)
68 71
69 def _delete_node(self, cursor, node_id): 72 def _delete_node(self, cursor, node_id):
70 cursor.execute("""DELETE FROM nodes WHERE node=%s""", 73 cursor.execute("""DELETE FROM nodes WHERE node=%s""",
71 (node_id.encode('utf-8'),)) 74 (node_id,))
72 75
73 if cursor.rowcount != 1: 76 if cursor.rowcount != 1:
74 raise storage.NodeNotFound 77 raise storage.NodeNotFound
75 78
76 def get_affiliations(self, entity): 79 def get_affiliations(self, entity):
78 JOIN affiliations ON 81 JOIN affiliations ON
79 (affiliations.entity_id=entities.id) 82 (affiliations.entity_id=entities.id)
80 JOIN nodes ON 83 JOIN nodes ON
81 (nodes.id=affiliations.node_id) 84 (nodes.id=affiliations.node_id)
82 WHERE jid=%s""", 85 WHERE jid=%s""",
83 (entity.userhost().encode('utf8'),)) 86 (entity.userhost(),))
84 d.addCallback(lambda results: [tuple(r) for r in results]) 87 d.addCallback(lambda results: [tuple(r) for r in results])
85 return d 88 return d
86 89
87 def get_subscriptions(self, entity): 90 def get_subscriptions(self, entity):
88 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription 91 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription
89 FROM entities JOIN subscriptions ON 92 FROM entities JOIN subscriptions ON
90 (subscriptions.entity_id=entities.id) 93 (subscriptions.entity_id=entities.id)
91 JOIN nodes ON 94 JOIN nodes ON
92 (nodes.id=subscriptions.node_id) 95 (nodes.id=subscriptions.node_id)
93 WHERE jid=%s""", 96 WHERE jid=%s""",
94 (entity.userhost().encode('utf8'),)) 97 (entity.userhost(),))
95 d.addCallback(self._convert_subscription_jids) 98 d.addCallback(self._convert_subscription_jids)
96 return d 99 return d
97 100
98 def _convert_subscription_jids(self, subscriptions): 101 def _convert_subscription_jids(self, subscriptions):
99 return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription) 102 return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription)
107 self.id = node_id 110 self.id = node_id
108 self._config = config 111 self._config = config
109 112
110 def _check_node_exists(self, cursor): 113 def _check_node_exists(self, cursor):
111 cursor.execute("""SELECT id FROM nodes WHERE node=%s""", 114 cursor.execute("""SELECT id FROM nodes WHERE node=%s""",
112 (self.id.encode('utf8'))) 115 (self.id))
113 if not cursor.fetchone(): 116 if not cursor.fetchone():
114 raise backend.NodeNotFound 117 raise backend.NodeNotFound
115 118
116 def get_type(self): 119 def get_type(self):
117 return self.type 120 return self.type
134 self._check_node_exists(cursor) 137 self._check_node_exists(cursor)
135 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s 138 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s
136 WHERE node=%s""", 139 WHERE node=%s""",
137 (config["pubsub#persist_items"], 140 (config["pubsub#persist_items"],
138 config["pubsub#deliver_payloads"], 141 config["pubsub#deliver_payloads"],
139 self.id.encode('utf-8'))) 142 self.id))
140 143
141 def _set_cached_configuration(self, void, config): 144 def _set_cached_configuration(self, void, config):
142 self._config = config 145 self._config = config
143 146
144 def get_meta_data(self): 147 def get_meta_data(self):
153 self._check_node_exists(cursor) 156 self._check_node_exists(cursor)
154 cursor.execute("""SELECT affiliation FROM affiliations 157 cursor.execute("""SELECT affiliation FROM affiliations
155 JOIN nodes ON (node_id=nodes.id) 158 JOIN nodes ON (node_id=nodes.id)
156 JOIN entities ON (entity_id=entities.id) 159 JOIN entities ON (entity_id=entities.id)
157 WHERE node=%s AND jid=%s""", 160 WHERE node=%s AND jid=%s""",
158 (self.id.encode('utf8'), 161 (self.id,
159 entity.userhost().encode('utf8'))) 162 entity.userhost()))
160 163
161 try: 164 try:
162 return cursor.fetchone()[0] 165 return cursor.fetchone()[0]
163 except TypeError: 166 except TypeError:
164 return None 167 return None
175 cursor.execute("""SELECT subscription FROM subscriptions 178 cursor.execute("""SELECT subscription FROM subscriptions
176 JOIN nodes ON (nodes.id=subscriptions.node_id) 179 JOIN nodes ON (nodes.id=subscriptions.node_id)
177 JOIN entities ON 180 JOIN entities ON
178 (entities.id=subscriptions.entity_id) 181 (entities.id=subscriptions.entity_id)
179 WHERE node=%s AND jid=%s AND resource=%s""", 182 WHERE node=%s AND jid=%s AND resource=%s""",
180 (self.id.encode('utf8'), 183 (self.id,
181 userhost.encode('utf8'), 184 userhost,
182 resource.encode('utf8'))) 185 resource))
183 try: 186 try:
184 return cursor.fetchone()[0] 187 return cursor.fetchone()[0]
185 except TypeError: 188 except TypeError:
186 return None 189 return None
187 190
195 userhost = subscriber.userhost() 198 userhost = subscriber.userhost()
196 resource = subscriber.resource or '' 199 resource = subscriber.resource or ''
197 200
198 try: 201 try:
199 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", 202 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
200 (userhost.encode('utf8'))) 203 (userhost))
201 except cursor._pool.dbapi.OperationalError: 204 except cursor._pool.dbapi.OperationalError:
202 pass 205 pass
203 206
204 try: 207 try:
205 cursor.execute("""INSERT INTO subscriptions 208 cursor.execute("""INSERT INTO subscriptions
206 (node_id, entity_id, resource, subscription) 209 (node_id, entity_id, resource, subscription)
207 SELECT n.id, e.id, %s, %s FROM 210 SELECT n.id, e.id, %s, %s FROM
208 (SELECT id FROM nodes WHERE node=%s) AS n 211 (SELECT id FROM nodes WHERE node=%s) AS n
209 CROSS JOIN 212 CROSS JOIN
210 (SELECT id FROM entities WHERE jid=%s) AS e""", 213 (SELECT id FROM entities WHERE jid=%s) AS e""",
211 (resource.encode('utf8'), 214 (resource,
212 state.encode('utf8'), 215 state,
213 self.id.encode('utf8'), 216 self.id,
214 userhost.encode('utf8'))) 217 userhost))
215 except cursor._pool.dbapi.OperationalError: 218 except cursor._pool.dbapi.OperationalError:
216 raise storage.SubscriptionExists 219 raise storage.SubscriptionExists
217 220
218 def remove_subscription(self, subscriber): 221 def remove_subscription(self, subscriber):
219 return self._dbpool.runInteraction(self._remove_subscription, 222 return self._dbpool.runInteraction(self._remove_subscription,
227 230
228 cursor.execute("""DELETE FROM subscriptions WHERE 231 cursor.execute("""DELETE FROM subscriptions WHERE
229 node_id=(SELECT id FROM nodes WHERE node=%s) AND 232 node_id=(SELECT id FROM nodes WHERE node=%s) AND
230 entity_id=(SELECT id FROM entities WHERE jid=%s) 233 entity_id=(SELECT id FROM entities WHERE jid=%s)
231 AND resource=%s""", 234 AND resource=%s""",
232 (self.id.encode('utf8'), 235 (self.id,
233 userhost.encode('utf8'), 236 userhost,
234 resource.encode('utf8'))) 237 resource))
235 if cursor.rowcount != 1: 238 if cursor.rowcount != 1:
236 raise storage.SubscriptionNotFound 239 raise storage.SubscriptionNotFound
237 240
238 return None 241 return None
239 242
247 cursor.execute("""SELECT jid, resource FROM subscriptions 250 cursor.execute("""SELECT jid, resource FROM subscriptions
248 JOIN nodes ON (node_id=nodes.id) 251 JOIN nodes ON (node_id=nodes.id)
249 JOIN entities ON (entity_id=entities.id) 252 JOIN entities ON (entity_id=entities.id)
250 WHERE node=%s AND 253 WHERE node=%s AND
251 subscription='subscribed'""", 254 subscription='subscribed'""",
252 (self.id.encode('utf8'),)) 255 (self.id,))
253 return cursor.fetchall() 256 return cursor.fetchall()
254 257
255 def _convert_to_jids(self, list): 258 def _convert_to_jids(self, list):
256 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] 259 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list]
257 260
269 (entities.id=subscriptions.entity_id) 272 (entities.id=subscriptions.entity_id)
270 JOIN nodes ON 273 JOIN nodes ON
271 (nodes.id=subscriptions.node_id) 274 (nodes.id=subscriptions.node_id)
272 WHERE entities.jid=%s AND resource=%s 275 WHERE entities.jid=%s AND resource=%s
273 AND node=%s AND subscription='subscribed'""", 276 AND node=%s AND subscription='subscribed'""",
274 (userhost.encode('utf8'), 277 (userhost,
275 resource.encode('utf8'), 278 resource,
276 self.id.encode('utf8'))) 279 self.id))
277 280
278 return cursor.fetchone() is not None 281 return cursor.fetchone() is not None
279 282
280 class LeafNode(Node): 283 class LeafNode(Node):
281 284
295 data = item.toXml() 298 data = item.toXml()
296 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s 299 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
297 FROM nodes 300 FROM nodes
298 WHERE nodes.id = items.node_id AND 301 WHERE nodes.id = items.node_id AND
299 nodes.node = %s and items.item=%s""", 302 nodes.node = %s and items.item=%s""",
300 (publisher.full().encode('utf8'), 303 (publisher.full(),
301 data, 304 data,
302 self.id.encode('utf8'), 305 self.id,
303 item["id"].encode('utf8'))) 306 item["id"]))
304 if cursor.rowcount == 1: 307 if cursor.rowcount == 1:
305 return 308 return
306 309
307 cursor.execute("""INSERT INTO items (node_id, item, publisher, data) 310 cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
308 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""", 311 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""",
309 (item["id"].encode('utf8'), 312 (item["id"],
310 publisher.full().encode('utf8'), 313 publisher.full(),
311 data, 314 data,
312 self.id.encode('utf8'))) 315 self.id))
313 316
314 def remove_items(self, item_ids): 317 def remove_items(self, item_ids):
315 return self._dbpool.runInteraction(self._remove_items, item_ids) 318 return self._dbpool.runInteraction(self._remove_items, item_ids)
316 319
317 def _remove_items(self, cursor, item_ids): 320 def _remove_items(self, cursor, item_ids):
321 324
322 for item_id in item_ids: 325 for item_id in item_ids:
323 cursor.execute("""DELETE FROM items WHERE 326 cursor.execute("""DELETE FROM items WHERE
324 node_id=(SELECT id FROM nodes WHERE node=%s) AND 327 node_id=(SELECT id FROM nodes WHERE node=%s) AND
325 item=%s""", 328 item=%s""",
326 (self.id.encode('utf-8'), 329 (self.id,
327 item_id.encode('utf-8'))) 330 item_id))
328 331
329 if not cursor.rowcount: 332 if not cursor.rowcount:
330 raise storage.ItemNotFound 333 raise storage.ItemNotFound
331 334
332 def get_items(self, max_items=None): 335 def get_items(self, max_items=None):
337 query = """SELECT data FROM nodes JOIN items ON 340 query = """SELECT data FROM nodes JOIN items ON
338 (nodes.id=items.node_id) 341 (nodes.id=items.node_id)
339 WHERE node=%s ORDER BY date DESC""" 342 WHERE node=%s ORDER BY date DESC"""
340 if max_items: 343 if max_items:
341 cursor.execute(query + " LIMIT %s", 344 cursor.execute(query + " LIMIT %s",
342 (self.id.encode('utf8'), 345 (self.id,
343 max_items)) 346 max_items))
344 else: 347 else:
345 cursor.execute(query, (self.id.encode('utf8'))) 348 cursor.execute(query, (self.id))
346 349
347 result = cursor.fetchall() 350 result = cursor.fetchall()
348 return [unicode(r[0], 'utf8') for r in result] 351 return [unicode(r[0], 'utf-8') for r in result]
349 352
350 def get_items_by_id(self, item_ids): 353 def get_items_by_id(self, item_ids):
351 return self._dbpool.runInteraction(self._get_items_by_id, item_ids) 354 return self._dbpool.runInteraction(self._get_items_by_id, item_ids)
352 355
353 def _get_items_by_id(self, cursor, item_ids): 356 def _get_items_by_id(self, cursor, item_ids):
355 items = [] 358 items = []
356 for item_id in item_ids: 359 for item_id in item_ids:
357 cursor.execute("""SELECT data FROM nodes JOIN items ON 360 cursor.execute("""SELECT data FROM nodes JOIN items ON
358 (nodes.id=items.node_id) 361 (nodes.id=items.node_id)
359 WHERE node=%s AND item=%s""", 362 WHERE node=%s AND item=%s""",
360 (self.id.encode('utf8'), 363 (self.id,
361 item_id.encode('utf8'))) 364 item_id))
362 result = cursor.fetchone() 365 result = cursor.fetchone()
363 if result: 366 if result:
364 items.append(unicode(result[0], 'utf8')) 367 items.append(unicode(result[0], 'utf-8'))
365 return items 368 return items
366 369
367 def purge(self): 370 def purge(self):
368 return self._dbpool.runInteraction(self._purge) 371 return self._dbpool.runInteraction(self._purge)
369 372
370 def _purge(self, cursor): 373 def _purge(self, cursor):
371 self._check_node_exists(cursor) 374 self._check_node_exists(cursor)
372 375
373 cursor.execute("""DELETE FROM items WHERE 376 cursor.execute("""DELETE FROM items WHERE
374 node_id=(SELECT id FROM nodes WHERE node=%s)""", 377 node_id=(SELECT id FROM nodes WHERE node=%s)""",
375 (self.id.encode('utf-8'),)) 378 (self.id,))
376 379