comparison idavoll/pgsql_storage.py @ 121:4f0113adb7ed

Add Node._check_node_exists() calls to all Node methods, because nodes could have been deleted in between calls. Add Node.get_subscription(). Only fire deferred (with None) on success of Node.add_subscription(). Fix Node.set_configuration() to actually work and only update the Node objects configuration when the SQL query has succeeded. Implement Node.remove_subscription(). Implement Node.is_subscribed(). Implement LeafNode methods (unchecked!).
author Ralph Meijer <ralphm@ik.nu>
date Tue, 12 Apr 2005 12:26:05 +0000
parents dfef919aaf1b
children c4ee16bc48e5
comparison
equal deleted inserted replaced
120:8892331314c8 121:4f0113adb7ed
105 105
106 def __init__(self, node_id, config): 106 def __init__(self, node_id, config):
107 self.id = node_id 107 self.id = node_id
108 self._config = config 108 self._config = config
109 109
110 def _check_node_exists(self, cursor):
111 cursor.execute("""SELECT id FROM nodes WHERE node=%s""",
112 (self.id.encode('utf8')))
113 if not cursor.fetchone():
114 raise backend.NodeNotFound
115
110 def get_type(self): 116 def get_type(self):
111 return self.type 117 return self.type
112 118
113 def get_configuration(self): 119 def get_configuration(self):
114 return self._config 120 return self._config
115 121
116 def set_configuration(self, options): 122 def set_configuration(self, options):
117 return self._dbpool.runInteraction(self._set_node_configuration, 123 return self._dbpool.runInteraction(self._set_configuration,
118 options) 124 options)
119 125
120 def _set_configuration(self, cursor, options): 126 def _set_configuration(self, cursor, options):
127 self._check_node_exists(cursor)
128
129 config = copy.copy(self._config)
130
121 for option in options: 131 for option in options:
122 if option in self._config: 132 if option in config:
123 self._config[option] = options[option] 133 config[option] = options[option]
124 134
125 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s 135 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s
126 WHERE node=%s""", 136 WHERE node=%s""",
127 (self._config["pubsub#persist_items"].encode('utf8'), 137 (config["pubsub#persist_items"],
128 self._config["pubsub#deliver_payloads"].encode('utf8'), 138 config["pubsub#deliver_payloads"],
129 self.id.encode('utf-8'))) 139 self.id.encode('utf-8')))
140
141 self._config = config
130 142
131 def get_meta_data(self): 143 def get_meta_data(self):
132 config = copy.copy(self._config) 144 config = copy.copy(self._config)
133 config["pubsub#node_type"] = self.type 145 config["pubsub#node_type"] = self.type
134 return config 146 return config
135 147
136 def get_affiliation(self, entity): 148 def get_affiliation(self, entity):
137 return self._dbpool.runInteraction(self._get_affiliation, entity) 149 return self._dbpool.runInteraction(self._get_affiliation, entity)
138 150
139 def _get_affiliation(self, cursor, entity): 151 def _get_affiliation(self, cursor, entity):
152 self._check_node_exists(cursor)
140 cursor.execute("""SELECT affiliation FROM affiliations 153 cursor.execute("""SELECT affiliation FROM affiliations
141 JOIN nodes ON (node_id=nodes.id) 154 JOIN nodes ON (node_id=nodes.id)
142 JOIN entities ON (entity_id=entities.id) 155 JOIN entities ON (entity_id=entities.id)
143 WHERE node=%s AND jid=%s""", 156 WHERE node=%s AND jid=%s""",
144 (self.id.encode('utf8'), 157 (self.id.encode('utf8'),
147 try: 160 try:
148 return cursor.fetchone()[0] 161 return cursor.fetchone()[0]
149 except TypeError: 162 except TypeError:
150 return None 163 return None
151 164
165 def get_subscription(self, subscriber):
166 return self._dbpool.runInteraction(self._get_subscription, subscriber)
167
168 def _get_subscription(self, cursor, subscriber):
169 self._check_node_exists(cursor)
170
171 userhost = subscriber.userhost()
172 resource = subscriber.resource or ''
173
174 cursor.execute("""SELECT subscription FROM subscriptions
175 JOIN nodes ON (nodes.id=subscriptions.node_id)
176 JOIN entities ON
177 (entities.id=subscriptions.entity_id)
178 WHERE node=%s AND jid=%s AND resource=%s""",
179 (self.id.encode('utf8'),
180 userhost.encode('utf8'),
181 resource.encode('utf8')))
182 try:
183 return cursor.fetchone()[0]
184 except TypeError:
185 return None
186
152 def add_subscription(self, subscriber, state): 187 def add_subscription(self, subscriber, state):
153 return self._dbpool.runInteraction(self._add_subscription, subscriber, 188 return self._dbpool.runInteraction(self._add_subscription, subscriber,
154 state) 189 state)
155 190
156 def _add_subscription(self, cursor, subscriber, state): 191 def _add_subscription(self, cursor, subscriber, state):
192 self._check_node_exists(cursor)
193
157 userhost = subscriber.userhost() 194 userhost = subscriber.userhost()
158 resource = subscriber.resource or '' 195 resource = subscriber.resource or ''
159 196
160 try: 197 try:
161 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", 198 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
173 (resource.encode('utf8'), 210 (resource.encode('utf8'),
174 state.encode('utf8'), 211 state.encode('utf8'),
175 self.id.encode('utf8'), 212 self.id.encode('utf8'),
176 userhost.encode('utf8'))) 213 userhost.encode('utf8')))
177 except cursor._pool.dbapi.OperationalError: 214 except cursor._pool.dbapi.OperationalError:
178 cursor.execute("""SELECT subscription FROM subscriptions 215 raise storage.SubscriptionExists
179 JOIN nodes ON (nodes.id=subscriptions.node_id) 216
180 JOIN entities ON 217 def remove_subscription(self, subscriber):
181 (entities.id=subscriptions.entity_id) 218 return self._dbpool.runInteraction(self._remove_subscription,
182 WHERE node=%s AND jid=%s AND resource=%s""", 219 subscriber)
183 (self.id.encode('utf8'), 220
184 userhost.encode('utf8'), 221 def _remove_subscription(self, cursor, subscriber):
185 resource.encode('utf8'))) 222 self._check_node_exists(cursor)
186 state = cursor.fetchone()[0] 223
187 224 userhost = subscriber.userhost()
188 return {'node': self.id, 225 resource = subscriber.resource or ''
189 'jid': subscriber, 226
190 'subscription': state} 227 cursor.execute("""DELETE FROM subscriptions WHERE
191 228 node_id=(SELECT id FROM nodes WHERE node=%s) AND
192 def remove_subscription(self, subscriber, state): 229 entity_id=(SELECT id FROM entities WHERE jid=%s)
193 pass 230 AND resource=%s""",
231 (self.id.encode('utf8'),
232 userhost.encode('utf8'),
233 resource.encode('utf8')))
234 if cursor.rowcount != 1:
235 raise storage.SubscriptionNotFound
236
237 return None
194 238
195 def get_subscribers(self): 239 def get_subscribers(self):
196 d = self._dbpool.runQuery("""SELECT jid, resource FROM subscriptions 240 d = self._dbpool.runInteraction(self._get_subscribers)
197 JOIN nodes ON (node_id=nodes.id)
198 JOIN entities ON (entity_id=entities.id)
199 WHERE node=%s AND
200 subscription='subscribed'""",
201 (self.id.encode('utf8'),))
202 d.addCallback(self._convert_to_jids) 241 d.addCallback(self._convert_to_jids)
203 return d 242 return d
204 243
244 def _get_subscribers(self, cursor):
245 self._check_node_exists(cursor)
246 cursor.execute("""SELECT jid, resource FROM subscriptions
247 JOIN nodes ON (node_id=nodes.id)
248 JOIN entities ON (entity_id=entities.id)
249 WHERE node=%s AND
250 subscription='subscribed'""",
251 (self.id.encode('utf8'),))
252 return cursor.fetchall()
253
205 def _convert_to_jids(self, list): 254 def _convert_to_jids(self, list):
206 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] 255 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list]
207 256
208 def is_subscribed(self, subscriber): 257 def is_subscribed(self, subscriber):
209 pass 258 return self._dbpool.runInteraction(self._is_subscribed, subscriber)
259
260 def _is_subscribed(self, cursor, subscriber):
261 self._check_node_exists(cursor)
262
263 userhost = subscriber.userhost()
264 resource = subscriber.resource or ''
265
266 cursor.execute("""SELECT 1 FROM entities
267 JOIN subscriptions ON
268 (entities.id=subscriptions.entity_id)
269 JOIN nodes ON
270 (nodes.id=subscriptions.node_id)
271 WHERE entities.jid=%s AND resource=%s
272 AND node=%s AND subscription='subscribed'""",
273 (userhost.encode('utf8'),
274 resource.encode('utf8'),
275 self.id.encode('utf8')))
276
277 return cursor.fetchone() is not None
210 278
211 class LeafNode(Node): 279 class LeafNode(Node):
212 280
213 implements(storage.ILeafNode) 281 implements(storage.ILeafNode)
214 282
215 type = 'leaf' 283 type = 'leaf'
216 284
217 def store_items(self, items, publisher): 285 def store_items(self, items, publisher):
218 return defer.succeed(None) 286 return self._dbpool.runInteraction(self._store_items, items, publisher)
287
288 def _store_items(self, cursor, items, publisher):
289 self._check_node_exists(cursor)
290 for item in items:
291 self._store_item(cursor, item, publisher)
292
293 def _store_item(self, cursor, item, publisher):
294 data = item.toXml()
295 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
296 FROM nodes
297 WHERE nodes.id = items.node_id AND
298 nodes.node = %s and items.item=%s""",
299 (publisher.full().encode('utf8'),
300 data.encode('utf8'),
301 self.id.encode('utf8'),
302 item["id"].encode('utf8')))
303 if cursor.rowcount == 1:
304 return
305
306 cursor.execute("""INSERT INTO items (node_id, item, publisher, data)
307 SELECT id, %s, %s, %s FROM nodes WHERE node=%s""",
308 (item["id"].encode('utf8'),
309 publisher.full().encode('utf8'),
310 data.encode('utf8'),
311 self.id.encode('utf8')))
219 312
220 def remove_items(self, item_ids): 313 def remove_items(self, item_ids):
221 pass 314 return self._dbpool.runInteraction(self._remove_items, item_ids)
315
316 def _remove_items(self, cursor, item_ids):
317 self._check_node_exists(cursor)
318
319 deleted = []
320
321 for item_id in item_ids:
322 cursor.execute("""DELETE FROM items WHERE
323 node_id=(SELECT id FROM nodes WHERE node=%s) AND
324 item=%s""",
325 (self.id.encode('utf-8'),
326 item_id.encode('utf-8')))
327
328 if cursor.rowcount:
329 deleted.append(item_id)
330
331 return deleted
222 332
223 def get_items(self, max_items=None): 333 def get_items(self, max_items=None):
224 pass 334 return self._dbpool.runInteraction(self._get_items, max_items)
225 335
226 def get_items_by_id(self, item_ids): 336 def _get_items(self, cursor, max_items):
227 pass 337 self._check_node_exists(cursor)
338 query = """SELECT data FROM nodes JOIN items ON
339 (nodes.id=items.node_id)
340 WHERE node=%s ORDER BY date DESC"""
341 if max_items:
342 cursor.execute(query + " LIMIT %s",
343 (self.id.encode('utf8'),
344 max_items))
345 else:
346 cursor.execute(query, (self.id.encode('utf8')))
347
348 result = cursor.fetchall()
349 return [r[0] for r in result]
350
351 def get_items_by_ids(self, item_ids):
352 return self._dbpool.runInteraction(self._get_items_by_ids, item_ids)
353
354 def _get_items_by_ids(self, cursor, item_ids):
355 self._check_node_exists(cursor)
356 items = []
357 for item_id in item_ids:
358 cursor.execute("""SELECT data FROM nodes JOIN items ON
359 (nodes.id=items.node_id)
360 WHERE node=%s AND item=%s""",
361 (self.id.encode('utf8'),
362 item_id.encode('utf8')))
363 result = cursor.fetchone()
364 if result:
365 items.append(result[0])
366 return items
228 367
229 def purge(self): 368 def purge(self):
230 pass 369 return self._dbpool.runInteraction(self._purge)
370
371 def _purge_node(self, cursor):
372 self._check_node_exists(cursor)
373
374 cursor.execute("""DELETE FROM items WHERE
375 node_id=(SELECT id FROM nodes WHERE node=%s)""",
376 (self.id.encode('utf-8'),))
377