Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 294:df1edebb0466
PEP implementation, draft (huge patch sorry):
/!\ database schema has changed ! /!\
- whole PEP behaviour is not managed yet
- if the stanza is delegated, PEP is assumed
- fixed potential SQL injection in pgsql_storage
- publish notifications manage PEP
- added retract notifications (if "notify" attribute is present), with PEP handling
- a publisher can't replace an item he didn't publised anymore
- /!\ schema has changed, sat_pubsub_update_0_1.sql update it
- sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 16 Aug 2015 01:32:42 +0200 |
parents | 002c59dbc23f |
children | 4115999d85e9 |
comparison
equal
deleted
inserted
replaced
293:b96a4ac25f8b | 294:df1edebb0466 |
---|---|
70 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) | 70 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) |
71 | 71 |
72 # parseXml manage str, but we get unicode | 72 # parseXml manage str, but we get unicode |
73 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) | 73 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) |
74 | 74 |
75 | |
76 def withPEP(query, values, pep, recipient, pep_table=None): | |
77 """Helper method to facilitate PEP management | |
78 | |
79 @param query: SQL query basis | |
80 @param values: current values to replace in query | |
81 @param pep: True if we are in PEP mode | |
82 @param recipient: jid of the recipient | |
83 @param pep_table: added before pep if table need to be specified | |
84 @return: query + PEP AND check, | |
85 recipient's bare jid is added to value if needed | |
86 """ | |
87 pep_col_name = "{}pep".format( | |
88 '' if pep_table is None | |
89 else ".{}".format(pep_table)) | |
90 if pep: | |
91 pep_check="AND {}=%s".format(pep_col_name) | |
92 values=list(values) + [recipient.userhost()] | |
93 else: | |
94 pep_check="AND {} IS NULL".format(pep_col_name) | |
95 return "{} {}".format(query, pep_check), values | |
96 | |
97 | |
75 class Storage: | 98 class Storage: |
76 | 99 |
77 implements(iidavoll.IStorage) | 100 implements(iidavoll.IStorage) |
78 | 101 |
79 defaultConfig = { | 102 defaultConfig = { |
80 'leaf': { | 103 'leaf': { |
81 "pubsub#persist_items": True, | 104 const.OPT_PERSIST_ITEMS: True, |
82 "pubsub#deliver_payloads": True, | 105 const.OPT_DELIVER_PAYLOADS: True, |
83 "pubsub#send_last_published_item": 'on_sub', | 106 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
84 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 107 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
85 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, | 108 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, |
86 }, | 109 }, |
87 'collection': { | 110 'collection': { |
88 "pubsub#deliver_payloads": True, | 111 const.OPT_DELIVER_PAYLOADS: True, |
89 "pubsub#send_last_published_item": 'on_sub', | 112 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', |
90 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | 113 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, |
91 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, | 114 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, |
92 } | 115 } |
93 } | 116 } |
94 | 117 |
95 def __init__(self, dbpool): | 118 def __init__(self, dbpool): |
96 self.dbpool = dbpool | 119 self.dbpool = dbpool |
97 | 120 |
98 def getNode(self, nodeIdentifier): | 121 def _buildNode(self, row): |
99 return self.dbpool.runInteraction(self._getNode, nodeIdentifier) | 122 """Build a note class from database result row""" |
100 | |
101 def _getNode(self, cursor, nodeIdentifier): | |
102 configuration = {} | 123 configuration = {} |
103 cursor.execute("""SELECT node_type, | 124 |
125 if not row: | |
126 raise error.NodeNotFound() | |
127 | |
128 if row[2] == 'leaf': | |
129 configuration = { | |
130 'pubsub#persist_items': row[3], | |
131 'pubsub#deliver_payloads': row[4], | |
132 'pubsub#send_last_published_item': row[5], | |
133 const.OPT_ACCESS_MODEL:row[6], | |
134 const.OPT_PUBLISH_MODEL:row[7], | |
135 } | |
136 node = LeafNode(row[0], row[1], configuration) | |
137 node.dbpool = self.dbpool | |
138 return node | |
139 elif row[2] == 'collection': | |
140 configuration = { | |
141 'pubsub#deliver_payloads': row[4], | |
142 'pubsub#send_last_published_item': row[5], | |
143 const.OPT_ACCESS_MODEL: row[6], | |
144 const.OPT_PUBLISH_MODEL:row[7], | |
145 } | |
146 node = CollectionNode(row[0], row[1], configuration) | |
147 node.dbpool = self.dbpool | |
148 return node | |
149 else: | |
150 raise ValueError("Unknown node type !") | |
151 | |
152 def getNodeById(self, nodeDbId): | |
153 """Get node using database ID insted of pubsub identifier | |
154 | |
155 @param nodeDbId(unicode): database ID | |
156 """ | |
157 return self.dbpool.runInteraction(self._getNodeById, nodeDbId) | |
158 | |
159 | |
160 def _getNodeById(self, cursor, nodeDbId): | |
161 cursor.execute("""SELECT node_id, | |
162 node, | |
163 node_type, | |
104 persist_items, | 164 persist_items, |
105 deliver_payloads, | 165 deliver_payloads, |
106 send_last_published_item, | 166 send_last_published_item, |
107 access_model, | 167 access_model, |
108 publish_model | 168 publish_model, |
109 FROM nodes | 169 pep |
110 WHERE node=%s""", | 170 FROM nodes |
111 (nodeIdentifier,)) | 171 WHERE node_id=%s""", |
172 (nodeDbId,)) | |
112 row = cursor.fetchone() | 173 row = cursor.fetchone() |
113 | 174 return self._buildNode(row) |
114 if not row: | 175 |
115 raise error.NodeNotFound() | 176 def getNode(self, nodeIdentifier, pep, recipient=None): |
116 | 177 return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient) |
117 if row[0] == 'leaf': | 178 |
118 configuration = { | 179 |
119 'pubsub#persist_items': row[1], | 180 def _getNode(self, cursor, nodeIdentifier, pep, recipient): |
120 'pubsub#deliver_payloads': row[2], | 181 cursor.execute(*withPEP("""SELECT node_id, |
121 'pubsub#send_last_published_item': row[3], | 182 node, |
122 const.OPT_ACCESS_MODEL:row[4], | 183 node_type, |
123 const.OPT_PUBLISH_MODEL:row[5], | 184 persist_items, |
124 } | 185 deliver_payloads, |
125 node = LeafNode(nodeIdentifier, configuration) | 186 send_last_published_item, |
126 node.dbpool = self.dbpool | 187 access_model, |
127 return node | 188 publish_model, |
128 elif row[0] == 'collection': | 189 pep |
129 configuration = { | 190 FROM nodes |
130 'pubsub#deliver_payloads': row[2], | 191 WHERE node=%s""", |
131 'pubsub#send_last_published_item': row[3], | 192 (nodeIdentifier,), pep, recipient)) |
132 const.OPT_ACCESS_MODEL: row[4], | 193 row = cursor.fetchone() |
133 const.OPT_PUBLISH_MODEL:row[5], | 194 return self._buildNode(row) |
134 } | 195 |
135 node = CollectionNode(nodeIdentifier, configuration) | 196 def getNodeIds(self, pep): |
136 node.dbpool = self.dbpool | 197 d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL""" |
137 return node | 198 .format("NOT " if pep else "")) |
138 | |
139 | |
140 | |
141 def getNodeIds(self): | |
142 d = self.dbpool.runQuery("""SELECT node from nodes""") | |
143 d.addCallback(lambda results: [r[0] for r in results]) | 199 d.addCallback(lambda results: [r[0] for r in results]) |
144 return d | 200 return d |
145 | 201 |
146 | 202 |
147 def createNode(self, nodeIdentifier, owner, config): | 203 def createNode(self, nodeIdentifier, owner, config, pep, recipient=None): |
148 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, | 204 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, |
149 owner, config) | 205 owner, config, pep, recipient) |
150 | 206 |
151 | 207 |
152 def _createNode(self, cursor, nodeIdentifier, owner, config): | 208 def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient): |
153 if config['pubsub#node_type'] != 'leaf': | 209 if config['pubsub#node_type'] != 'leaf': |
154 raise error.NoCollections() | 210 raise error.NoCollections() |
155 | 211 |
156 owner = owner.userhost() | 212 owner = owner.userhost() |
213 | |
157 try: | 214 try: |
158 cursor.execute("""INSERT INTO nodes | 215 cursor.execute("""INSERT INTO nodes |
159 (node, node_type, persist_items, | 216 (node, node_type, persist_items, |
160 deliver_payloads, send_last_published_item, access_model, publish_model) | 217 deliver_payloads, send_last_published_item, access_model, publish_model, pep) |
161 VALUES | 218 VALUES |
162 (%s, 'leaf', %s, %s, %s, %s, %s)""", | 219 (%s, 'leaf', %s, %s, %s, %s, %s, %s)""", |
163 (nodeIdentifier, | 220 (nodeIdentifier, |
164 config['pubsub#persist_items'], | 221 config['pubsub#persist_items'], |
165 config['pubsub#deliver_payloads'], | 222 config['pubsub#deliver_payloads'], |
166 config['pubsub#send_last_published_item'], | 223 config['pubsub#send_last_published_item'], |
167 config[const.OPT_ACCESS_MODEL], | 224 config[const.OPT_ACCESS_MODEL], |
168 config[const.OPT_PUBLISH_MODEL], | 225 config[const.OPT_PUBLISH_MODEL], |
226 recipient.userhost() if pep else None | |
169 ) | 227 ) |
170 ) | 228 ) |
171 except cursor._pool.dbapi.IntegrityError: | 229 except cursor._pool.dbapi.IntegrityError: |
172 raise error.NodeExists() | 230 raise error.NodeExists() |
173 | 231 |
174 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,)); | 232 cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""", |
233 (nodeIdentifier,), pep, recipient)); | |
175 node_id = cursor.fetchone()[0] | 234 node_id = cursor.fetchone()[0] |
176 | 235 |
177 cursor.execute("""SELECT 1 as bool from entities where jid=%s""", | 236 cursor.execute("""SELECT 1 as bool from entities where jid=%s""", |
178 (owner,)) | 237 (owner,)) |
179 | 238 |
208 for group in allowed_groups: | 267 for group in allowed_groups: |
209 #TODO: check that group are actually in roster | 268 #TODO: check that group are actually in roster |
210 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) | 269 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) |
211 VALUES (%s,%s)""" , (node_id, group)) | 270 VALUES (%s,%s)""" , (node_id, group)) |
212 | 271 |
213 | 272 def deleteNodeByDbId(self, db_id): |
214 def deleteNode(self, nodeIdentifier): | 273 """Delete a node using directly its database id""" |
215 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) | 274 return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id) |
216 | 275 |
217 | 276 def _deleteNodeByDbId(self, cursor, db_id): |
218 def _deleteNode(self, cursor, nodeIdentifier): | 277 cursor.execute("""DELETE FROM nodes WHERE node_id=%s""", |
219 cursor.execute("""DELETE FROM nodes WHERE node=%s""", | 278 (db_id,)) |
220 (nodeIdentifier,)) | |
221 | 279 |
222 if cursor.rowcount != 1: | 280 if cursor.rowcount != 1: |
223 raise error.NodeNotFound() | 281 raise error.NodeNotFound() |
224 | 282 |
225 def getNodeGroups(self, nodeIdentifier): | 283 def deleteNode(self, nodeIdentifier, pep, recipient=None): |
226 return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier) | 284 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient) |
227 | 285 |
228 def _getNodeGroups(self, cursor, nodeIdentifier): | 286 |
229 cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", | 287 def _deleteNode(self, cursor, nodeIdentifier, pep, recipient): |
230 (nodeIdentifier,)) | 288 cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""", |
289 (nodeIdentifier,), pep, recipient)) | |
290 | |
291 if cursor.rowcount != 1: | |
292 raise error.NodeNotFound() | |
293 | |
294 def getNodeGroups(self, nodeIdentifier, pep, recipient=None): | |
295 return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier, pep, recipient) | |
296 | |
297 def _getNodeGroups(self, cursor, nodeIdentifier, pep, recipient): | |
298 cursor.execute(*withPEP("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", | |
299 (nodeIdentifier,), pep, recipient)) | |
231 rows = cursor.fetchall() | 300 rows = cursor.fetchall() |
232 | 301 |
233 return [row[0] for row in rows] | 302 return [row[0] for row in rows] |
234 | 303 |
235 def getAffiliations(self, entity): | 304 def getAffiliations(self, entity, pep, recipient=None): |
236 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities | 305 d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities |
237 NATURAL JOIN affiliations | 306 NATURAL JOIN affiliations |
238 NATURAL JOIN nodes | 307 NATURAL JOIN nodes |
239 WHERE jid=%s""", | 308 WHERE jid=%s""", |
240 (entity.userhost(),)) | 309 (entity.userhost(),), pep, recipient, 'nodes')) |
241 d.addCallback(lambda results: [tuple(r) for r in results]) | 310 d.addCallback(lambda results: [tuple(r) for r in results]) |
242 return d | 311 return d |
243 | 312 |
244 | 313 |
245 def getSubscriptions(self, entity): | 314 def getSubscriptions(self, entity, pep, recipient=None): |
246 def toSubscriptions(rows): | 315 def toSubscriptions(rows): |
247 subscriptions = [] | 316 subscriptions = [] |
248 for row in rows: | 317 for row in rows: |
249 subscriber = jid.internJID('%s/%s' % (row[1], | 318 subscriber = jid.internJID('%s/%s' % (row[1], |
250 row[2])) | 319 row[2])) |
254 | 323 |
255 d = self.dbpool.runQuery("""SELECT node, jid, resource, state | 324 d = self.dbpool.runQuery("""SELECT node, jid, resource, state |
256 FROM entities | 325 FROM entities |
257 NATURAL JOIN subscriptions | 326 NATURAL JOIN subscriptions |
258 NATURAL JOIN nodes | 327 NATURAL JOIN nodes |
259 WHERE jid=%s""", | 328 WHERE jid=%s AND nodes.pep=%s""", |
260 (entity.userhost(),)) | 329 (entity.userhost(), recipient.userhost() if pep else None)) |
261 d.addCallback(toSubscriptions) | 330 d.addCallback(toSubscriptions) |
262 return d | 331 return d |
263 | 332 |
264 | 333 |
265 def getDefaultConfiguration(self, nodeType): | 334 def getDefaultConfiguration(self, nodeType): |
269 | 338 |
270 class Node: | 339 class Node: |
271 | 340 |
272 implements(iidavoll.INode) | 341 implements(iidavoll.INode) |
273 | 342 |
274 def __init__(self, nodeIdentifier, config): | 343 def __init__(self, nodeDbId, nodeIdentifier, config): |
344 self.nodeDbId = nodeDbId | |
275 self.nodeIdentifier = nodeIdentifier | 345 self.nodeIdentifier = nodeIdentifier |
276 self._config = config | 346 self._config = config |
277 self.owner = None; | 347 self.owner = None; |
278 | 348 |
279 | 349 |
280 def _checkNodeExists(self, cursor): | 350 def _checkNodeExists(self, cursor): |
281 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", | 351 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", |
282 (self.nodeIdentifier,)) | 352 (self.nodeDbId,)) |
283 if not cursor.fetchone(): | 353 if not cursor.fetchone(): |
284 raise error.NodeNotFound() | 354 raise error.NodeNotFound() |
285 | 355 |
286 | 356 |
287 def getType(self): | 357 def getType(self): |
288 return self.nodeType | 358 return self.nodeType |
289 | 359 |
290 def getNodeOwner(self): | 360 def getNodeOwner(self): |
291 if self.owner: | 361 if self.owner: |
292 return defer.succeed(self.owner) | 362 return defer.succeed(self.owner) |
293 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node=%s""", (self.nodeIdentifier,)) | 363 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s""", (self.nodeDbId,)) |
294 d.addCallback(lambda result: jid.JID(result[0][0])) | 364 d.addCallback(lambda result: jid.JID(result[0][0])) |
295 return d | 365 return d |
296 | 366 |
297 | 367 |
298 def getConfiguration(self): | 368 def getConfiguration(self): |
313 | 383 |
314 def _setConfiguration(self, cursor, config): | 384 def _setConfiguration(self, cursor, config): |
315 self._checkNodeExists(cursor) | 385 self._checkNodeExists(cursor) |
316 cursor.execute("""UPDATE nodes SET persist_items=%s, | 386 cursor.execute("""UPDATE nodes SET persist_items=%s, |
317 deliver_payloads=%s, | 387 deliver_payloads=%s, |
318 send_last_published_item=%s | 388 send_last_published_item=%s, |
319 WHERE node=%s""", | 389 access_model=%s, |
320 (config["pubsub#persist_items"], | 390 publish_model=%s |
321 config["pubsub#deliver_payloads"], | 391 WHERE node_id=%s""", |
322 config["pubsub#send_last_published_item"], | 392 (config[const.OPT_PERSIST_ITEMS], |
323 self.nodeIdentifier)) | 393 config[const.OPT_DELIVER_PAYLOADS], |
394 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], | |
395 config[const.OPT_ACCESS_MODEL], | |
396 config[const.OPT_PUBLISH_MODEL], | |
397 self.nodeDbId)) | |
324 | 398 |
325 | 399 |
326 def _setCachedConfiguration(self, void, config): | 400 def _setCachedConfiguration(self, void, config): |
327 self._config = config | 401 self._config = config |
328 | 402 |
340 def _getAffiliation(self, cursor, entity): | 414 def _getAffiliation(self, cursor, entity): |
341 self._checkNodeExists(cursor) | 415 self._checkNodeExists(cursor) |
342 cursor.execute("""SELECT affiliation FROM affiliations | 416 cursor.execute("""SELECT affiliation FROM affiliations |
343 NATURAL JOIN nodes | 417 NATURAL JOIN nodes |
344 NATURAL JOIN entities | 418 NATURAL JOIN entities |
345 WHERE node=%s AND jid=%s""", | 419 WHERE node_id=%s AND jid=%s""", |
346 (self.nodeIdentifier, | 420 (self.nodeDbId, |
347 entity.userhost())) | 421 entity.userhost())) |
348 | 422 |
349 try: | 423 try: |
350 return cursor.fetchone()[0] | 424 return cursor.fetchone()[0] |
351 except TypeError: | 425 except TypeError: |
352 return None | 426 return None |
353 | 427 |
428 | |
354 def getAccessModel(self): | 429 def getAccessModel(self): |
355 return self.dbpool.runInteraction(self._getAccessModel) | 430 return self._config[const.OPT_ACCESS_MODEL] |
356 | |
357 def _getAccessModel(self, cursor, entity): | |
358 self._checkNodeExists(cursor) | |
359 cursor.execute("""SELECT access_model FROM nodes | |
360 WHERE node=%s""", | |
361 (self.nodeIdentifier,)) | |
362 | |
363 try: | |
364 return cursor.fetchone()[0] | |
365 except TypeError: | |
366 return None | |
367 | 431 |
368 | 432 |
369 def getSubscription(self, subscriber): | 433 def getSubscription(self, subscriber): |
370 return self.dbpool.runInteraction(self._getSubscription, subscriber) | 434 return self.dbpool.runInteraction(self._getSubscription, subscriber) |
371 | 435 |
377 resource = subscriber.resource or '' | 441 resource = subscriber.resource or '' |
378 | 442 |
379 cursor.execute("""SELECT state FROM subscriptions | 443 cursor.execute("""SELECT state FROM subscriptions |
380 NATURAL JOIN nodes | 444 NATURAL JOIN nodes |
381 NATURAL JOIN entities | 445 NATURAL JOIN entities |
382 WHERE node=%s AND jid=%s AND resource=%s""", | 446 WHERE node_id=%s AND jid=%s AND resource=%s""", |
383 (self.nodeIdentifier, | 447 (self.nodeDbId, |
384 userhost, | 448 userhost, |
385 resource)) | 449 resource)) |
386 | 450 |
387 row = cursor.fetchone() | 451 row = cursor.fetchone() |
388 if not row: | 452 if not row: |
396 | 460 |
397 | 461 |
398 def _getSubscriptions(self, cursor, state): | 462 def _getSubscriptions(self, cursor, state): |
399 self._checkNodeExists(cursor) | 463 self._checkNodeExists(cursor) |
400 | 464 |
401 query = """SELECT jid, resource, state, | 465 query = """SELECT node, jid, resource, state, |
402 subscription_type, subscription_depth | 466 subscription_type, subscription_depth |
403 FROM subscriptions | 467 FROM subscriptions |
404 NATURAL JOIN nodes | 468 NATURAL JOIN nodes |
405 NATURAL JOIN entities | 469 NATURAL JOIN entities |
406 WHERE node=%s""" | 470 WHERE node_id=%s""" |
407 values = [self.nodeIdentifier] | 471 values = [self.nodeDbId] |
408 | 472 |
409 if state: | 473 if state: |
410 query += " AND state=%s" | 474 query += " AND state=%s" |
411 values.append(state) | 475 values.append(state) |
412 | 476 |
413 cursor.execute(query, values) | 477 cursor.execute(query, values) |
414 rows = cursor.fetchall() | 478 rows = cursor.fetchall() |
415 | 479 |
416 subscriptions = [] | 480 subscriptions = [] |
417 for row in rows: | 481 for row in rows: |
418 subscriber = jid.JID(u'%s/%s' % (row[0], row[1])) | 482 subscriber = jid.JID(u'%s/%s' % (row[1], row[2])) |
419 | 483 |
420 options = {} | 484 options = {} |
421 if row[3]: | |
422 options['pubsub#subscription_type'] = row[3]; | |
423 if row[4]: | 485 if row[4]: |
424 options['pubsub#subscription_depth'] = row[4]; | 486 options['pubsub#subscription_type'] = row[4]; |
425 | 487 if row[5]: |
426 subscriptions.append(Subscription(self.nodeIdentifier, subscriber, | 488 options['pubsub#subscription_depth'] = row[5]; |
427 row[2], options)) | 489 |
490 subscriptions.append(Subscription(row[0], subscriber, | |
491 row[3], options)) | |
428 | 492 |
429 return subscriptions | 493 return subscriptions |
430 | 494 |
431 | 495 |
432 def addSubscription(self, subscriber, state, config): | 496 def addSubscription(self, subscriber, state, config): |
451 | 515 |
452 try: | 516 try: |
453 cursor.execute("""INSERT INTO subscriptions | 517 cursor.execute("""INSERT INTO subscriptions |
454 (node_id, entity_id, resource, state, | 518 (node_id, entity_id, resource, state, |
455 subscription_type, subscription_depth) | 519 subscription_type, subscription_depth) |
456 SELECT node_id, entity_id, %s, %s, %s, %s FROM | 520 SELECT %s, entity_id, %s, %s, %s, %s FROM |
457 (SELECT node_id FROM nodes | |
458 WHERE node=%s) as n | |
459 CROSS JOIN | |
460 (SELECT entity_id FROM entities | 521 (SELECT entity_id FROM entities |
461 WHERE jid=%s) as e""", | 522 WHERE jid=%s) AS ent_id""", |
462 (resource, | 523 (self.nodeDbId, |
524 resource, | |
463 state, | 525 state, |
464 subscription_type, | 526 subscription_type, |
465 subscription_depth, | 527 subscription_depth, |
466 self.nodeIdentifier, | |
467 userhost)) | 528 userhost)) |
468 except cursor._pool.dbapi.IntegrityError: | 529 except cursor._pool.dbapi.IntegrityError: |
469 raise error.SubscriptionExists() | 530 raise error.SubscriptionExists() |
470 | 531 |
471 | 532 |
479 | 540 |
480 userhost = subscriber.userhost() | 541 userhost = subscriber.userhost() |
481 resource = subscriber.resource or '' | 542 resource = subscriber.resource or '' |
482 | 543 |
483 cursor.execute("""DELETE FROM subscriptions WHERE | 544 cursor.execute("""DELETE FROM subscriptions WHERE |
484 node_id=(SELECT node_id FROM nodes | 545 node_id=%s AND |
485 WHERE node=%s) AND | |
486 entity_id=(SELECT entity_id FROM entities | 546 entity_id=(SELECT entity_id FROM entities |
487 WHERE jid=%s) AND | 547 WHERE jid=%s) AND |
488 resource=%s""", | 548 resource=%s""", |
489 (self.nodeIdentifier, | 549 (self.nodeDbId, |
490 userhost, | 550 userhost, |
491 resource)) | 551 resource)) |
492 if cursor.rowcount != 1: | 552 if cursor.rowcount != 1: |
493 raise error.NotSubscribed() | 553 raise error.NotSubscribed() |
494 | 554 |
504 | 564 |
505 cursor.execute("""SELECT 1 as bool FROM entities | 565 cursor.execute("""SELECT 1 as bool FROM entities |
506 NATURAL JOIN subscriptions | 566 NATURAL JOIN subscriptions |
507 NATURAL JOIN nodes | 567 NATURAL JOIN nodes |
508 WHERE entities.jid=%s | 568 WHERE entities.jid=%s |
509 AND node=%s AND state='subscribed'""", | 569 AND node_id=%s AND state='subscribed'""", |
510 (entity.userhost(), | 570 (entity.userhost(), |
511 self.nodeIdentifier)) | 571 self.nodeDbId)) |
512 | 572 |
513 return cursor.fetchone() is not None | 573 return cursor.fetchone() is not None |
514 | 574 |
515 | 575 |
516 def getAffiliations(self): | 576 def getAffiliations(self): |
521 self._checkNodeExists(cursor) | 581 self._checkNodeExists(cursor) |
522 | 582 |
523 cursor.execute("""SELECT jid, affiliation FROM nodes | 583 cursor.execute("""SELECT jid, affiliation FROM nodes |
524 NATURAL JOIN affiliations | 584 NATURAL JOIN affiliations |
525 NATURAL JOIN entities | 585 NATURAL JOIN entities |
526 WHERE node=%s""", | 586 WHERE node_id=%s""", |
527 (self.nodeIdentifier,)) | 587 (self.nodeDbId,)) |
528 result = cursor.fetchall() | 588 result = cursor.fetchall() |
529 | 589 |
530 return [(jid.internJID(r[0]), r[1]) for r in result] | 590 return [(jid.internJID(r[0]), r[1]) for r in result] |
531 | 591 |
532 | 592 |
539 | 599 |
540 def storeItems(self, item_data, publisher): | 600 def storeItems(self, item_data, publisher): |
541 return self.dbpool.runInteraction(self._storeItems, item_data, publisher) | 601 return self.dbpool.runInteraction(self._storeItems, item_data, publisher) |
542 | 602 |
543 | 603 |
544 def _storeItems(self, cursor, item_data, publisher): | 604 def _storeItems(self, cursor, items_data, publisher): |
545 self._checkNodeExists(cursor) | 605 self._checkNodeExists(cursor) |
546 for item_datum in item_data: | 606 for item_data in items_data: |
547 self._storeItem(cursor, item_datum, publisher) | 607 self._storeItem(cursor, item_data, publisher) |
548 | 608 |
549 | 609 |
550 def _storeItem(self, cursor, item_datum, publisher): | 610 def _storeItem(self, cursor, item_data, publisher): |
551 access_model, item_config, item = item_datum | 611 item, access_model, item_config = item_data |
552 data = item.toXml() | 612 data = item.toXml() |
553 | 613 |
554 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s | 614 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s |
555 FROM nodes | 615 FROM nodes |
556 WHERE nodes.node_id = items.node_id AND | 616 WHERE nodes.node_id = items.node_id AND |
557 nodes.node = %s and items.item=%s""", | 617 nodes.node_id = %s and items.item=%s""", |
558 (publisher.full(), | 618 (publisher.full(), |
559 data, | 619 data, |
560 self.nodeIdentifier, | 620 self.nodeDbId, |
561 item["id"])) | 621 item["id"])) |
562 if cursor.rowcount == 1: | 622 if cursor.rowcount == 1: |
563 return | 623 return |
564 | 624 |
565 cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model) | 625 cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model) |
566 SELECT node_id, %s, %s, %s, %s FROM nodes | 626 SELECT %s, %s, %s, %s, %s FROM nodes |
567 WHERE node=%s | 627 WHERE node_id=%s |
568 RETURNING item_id""", | 628 RETURNING item_id""", |
569 (item["id"], | 629 (self.nodeDbId, |
630 item["id"], | |
570 publisher.full(), | 631 publisher.full(), |
571 data, | 632 data, |
572 access_model, | 633 access_model, |
573 self.nodeIdentifier)) | 634 self.nodeDbId)) |
574 | 635 |
575 if access_model == const.VAL_AMODEL_ROSTER: | 636 if access_model == const.VAL_AMODEL_ROSTER: |
576 item_id = cursor.fetchone()[0]; | 637 item_id = cursor.fetchone()[0]; |
577 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config: | 638 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config: |
578 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value | 639 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value |
594 | 655 |
595 deleted = [] | 656 deleted = [] |
596 | 657 |
597 for itemIdentifier in itemIdentifiers: | 658 for itemIdentifier in itemIdentifiers: |
598 cursor.execute("""DELETE FROM items WHERE | 659 cursor.execute("""DELETE FROM items WHERE |
599 node_id=(SELECT node_id FROM nodes | 660 node_id=%s AND |
600 WHERE node=%s) AND | |
601 item=%s""", | 661 item=%s""", |
602 (self.nodeIdentifier, | 662 (self.nodeDbId, |
603 itemIdentifier)) | 663 itemIdentifier)) |
604 | 664 |
605 if cursor.rowcount: | 665 if cursor.rowcount: |
606 deleted.append(itemIdentifier) | 666 deleted.append(itemIdentifier) |
607 | 667 |
621 if ext_data is None: | 681 if ext_data is None: |
622 ext_data = {} | 682 ext_data = {} |
623 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data) | 683 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data) |
624 | 684 |
625 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data): | 685 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data): |
686 # FIXME: simplify the query construction | |
626 self._checkNodeExists(cursor) | 687 self._checkNodeExists(cursor) |
627 | 688 |
628 if unrestricted: | 689 if unrestricted: |
629 query = ["SELECT data,items.access_model,item_id"] | 690 query = ["SELECT data,items.access_model,item_id"] |
630 source = """FROM nodes | 691 source = """FROM nodes |
631 INNER JOIN items USING (node_id) | 692 INNER JOIN items USING (node_id) |
632 WHERE node=%s""" | 693 WHERE node_id=%s""" |
633 args = [self.nodeIdentifier] | 694 args = [self.nodeDbId] |
634 else: | 695 else: |
635 query = ["SELECT data"] | 696 query = ["SELECT data"] |
636 groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else "" | 697 groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else "" |
637 source = """FROM nodes | 698 source = """FROM nodes |
638 INNER JOIN items USING (node_id) | 699 INNER JOIN items USING (node_id) |
639 LEFT JOIN item_groups_authorized USING (item_id) | 700 LEFT JOIN item_groups_authorized USING (item_id) |
640 WHERE node=%s AND | 701 WHERE node_id=%s AND |
641 (items.access_model='open'""" + groups + ")" | 702 (items.access_model='open'""" + groups + ")" |
642 | 703 |
643 args = [self.nodeIdentifier] | 704 args = [self.nodeDbId] |
644 if authorized_groups: | 705 if authorized_groups: |
645 args.append(authorized_groups) | 706 args.append(authorized_groups) |
646 | 707 |
647 if 'filters' in ext_data: # MAM filters | 708 if 'filters' in ext_data: # MAM filters |
648 for filter_ in ext_data['filters']: | 709 for filter_ in ext_data['filters']: |
649 if filter_.var == 'start': | 710 if filter_.var == 'start': |
650 source += " AND date>='{date}'".format(date=filter_.value) | 711 source += " AND date>=%s" |
712 args.append(filter_.value) | |
651 if filter_.var == 'end': | 713 if filter_.var == 'end': |
652 source += " AND date<='{date}'".format(date=filter_.value) | 714 source += " AND date<=%s" |
715 args.append(filter_.value) | |
653 if filter_.var == 'with': | 716 if filter_.var == 'with': |
654 jid_s = filter_.value | 717 jid_s = filter_.value |
655 if '/' in jid_s: | 718 if '/' in jid_s: |
656 source += " AND publisher='{pub}'".format(pub=filter_.value) | 719 source += " AND publisher=%s" |
657 else: # assume the publisher field in DB is always a full JID | 720 args.append(filter_.value) |
658 # XXX: need to escape the % with itself to avoid formatting error | 721 else: |
659 source += " AND publisher LIKE '{pub}/%%'".format(pub=filter_.value) | 722 source += " AND publisher LIKE %s" |
723 args.append(u"{}%".format(filter_.value)) | |
660 | 724 |
661 query.append(source) | 725 query.append(source) |
662 order = "DESC" | 726 order = "DESC" |
663 | 727 |
664 if 'rsm' in ext_data: | 728 if 'rsm' in ext_data: |
665 rsm = ext_data['rsm'] | 729 rsm = ext_data['rsm'] |
666 maxItems = rsm.max | 730 maxItems = rsm.max |
667 if rsm.index is not None: | 731 if rsm.index is not None: |
668 query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)") | 732 query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)") |
669 args.append(self.nodeIdentifier) | 733 # FIXME: change the request so source is not used 2 times |
734 # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args | |
735 args.append(self.nodeDbId) | |
670 if authorized_groups: | 736 if authorized_groups: |
671 args.append(authorized_groups) | 737 args.append(authorized_groups) |
672 args.append(rsm.index) | 738 args.append(rsm.index) |
673 elif rsm.before is not None: | 739 elif rsm.before is not None: |
674 order = "ASC" | 740 order = "ASC" |
692 ret = [] | 758 ret = [] |
693 for data in result: | 759 for data in result: |
694 item = generic.stripNamespace(parseXml(data[0])) | 760 item = generic.stripNamespace(parseXml(data[0])) |
695 access_model = data[1] | 761 access_model = data[1] |
696 item_id = data[2] | 762 item_id = data[2] |
697 if access_model == 'roster': #TODO: jid access_model | 763 access_list = {} |
764 if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model | |
698 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | 765 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) |
699 access_list = [r[0] for r in cursor.fetchall()] | 766 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] |
700 else: | |
701 access_list = None | |
702 | 767 |
703 ret.append((item, access_model, access_list)) | 768 ret.append((item, access_model, access_list)) |
704 return ret | 769 return ret |
705 items = [generic.stripNamespace(parseXml(r[0])) for r in result] | 770 items = [generic.stripNamespace(parseXml(r[0])) for r in result] |
706 return items | 771 return items |
718 self._checkNodeExists(cursor) | 783 self._checkNodeExists(cursor) |
719 | 784 |
720 if unrestricted: | 785 if unrestricted: |
721 query = ["""SELECT count(item_id) FROM nodes | 786 query = ["""SELECT count(item_id) FROM nodes |
722 INNER JOIN items USING (node_id) | 787 INNER JOIN items USING (node_id) |
723 WHERE node=%s"""] | 788 WHERE node_id=%s"""] |
724 args = [self.nodeIdentifier] | 789 args = [self.nodeDbId] |
725 else: | 790 else: |
726 query = ["""SELECT count(item_id) FROM nodes | 791 query = ["""SELECT count(item_id) FROM nodes |
727 INNER JOIN items USING (node_id) | 792 INNER JOIN items USING (node_id) |
728 LEFT JOIN item_groups_authorized USING (item_id) | 793 LEFT JOIN item_groups_authorized USING (item_id) |
729 WHERE node=%s AND | 794 WHERE node_id=%s AND |
730 (items.access_model='open' """ + | 795 (items.access_model='open' """ + |
731 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + | 796 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + |
732 ")"] | 797 ")"] |
733 | 798 |
734 args = [self.nodeIdentifier] | 799 args = [self.nodeDbId] |
735 if authorized_groups: | 800 if authorized_groups: |
736 args.append(authorized_groups) | 801 args.append(authorized_groups) |
737 | 802 |
738 cursor.execute(' '.join(query), args) | 803 cursor.execute(' '.join(query), args) |
739 return cursor.fetchall()[0][0] | 804 return cursor.fetchall()[0][0] |
753 | 818 |
754 if unrestricted: | 819 if unrestricted: |
755 query = ["""SELECT row_number FROM ( | 820 query = ["""SELECT row_number FROM ( |
756 SELECT row_number() OVER (ORDER BY date DESC), item | 821 SELECT row_number() OVER (ORDER BY date DESC), item |
757 FROM nodes INNER JOIN items USING (node_id) | 822 FROM nodes INNER JOIN items USING (node_id) |
758 WHERE node=%s | 823 WHERE node_id=%s |
759 ) as x | 824 ) as x |
760 WHERE item=%s LIMIT 1"""] | 825 WHERE item=%s LIMIT 1"""] |
761 args = [self.nodeIdentifier] | 826 args = [self.nodeDbId] |
762 else: | 827 else: |
763 query = ["""SELECT row_number FROM ( | 828 query = ["""SELECT row_number FROM ( |
764 SELECT row_number() OVER (ORDER BY date DESC), item | 829 SELECT row_number() OVER (ORDER BY date DESC), item |
765 FROM nodes INNER JOIN items USING (node_id) | 830 FROM nodes INNER JOIN items USING (node_id) |
766 LEFT JOIN item_groups_authorized USING (item_id) | 831 LEFT JOIN item_groups_authorized USING (item_id) |
767 WHERE node=%s AND | 832 WHERE node_id=%s AND |
768 (items.access_model='open' """ + | 833 (items.access_model='open' """ + |
769 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + | 834 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + |
770 """)) as x | 835 """)) as x |
771 WHERE item=%s LIMIT 1"""] | 836 WHERE item=%s LIMIT 1"""] |
772 | 837 |
773 args = [self.nodeIdentifier] | 838 args = [self.nodeDbId] |
774 if authorized_groups: | 839 if authorized_groups: |
775 args.append(authorized_groups) | 840 args.append(authorized_groups) |
776 | 841 |
777 args.append(item) | 842 args.append(item) |
778 cursor.execute(' '.join(query), args) | 843 cursor.execute(' '.join(query), args) |
782 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): | 847 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): |
783 """ Get items which are in the given list | 848 """ Get items which are in the given list |
784 @param authorized_groups: we want to get items that these groups can access | 849 @param authorized_groups: we want to get items that these groups can access |
785 @param unrestricted: if true, don't check permissions | 850 @param unrestricted: if true, don't check permissions |
786 @param itemIdentifiers: list of ids of the items we want to get | 851 @param itemIdentifiers: list of ids of the items we want to get |
787 @return: list of (item, access_model, access_model) if unrestricted is True, else list of items | 852 @return: list of (item, access_model, access_list) if unrestricted is True, else list of items |
853 access_list is managed as a dictionnary with same key as for item_config | |
788 """ | 854 """ |
789 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) | 855 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) |
790 | 856 |
791 | 857 |
792 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): | 858 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): |
794 ret = [] | 860 ret = [] |
795 if unrestricted: #we get everything without checking permissions | 861 if unrestricted: #we get everything without checking permissions |
796 for itemIdentifier in itemIdentifiers: | 862 for itemIdentifier in itemIdentifiers: |
797 cursor.execute("""SELECT data,items.access_model,item_id FROM nodes | 863 cursor.execute("""SELECT data,items.access_model,item_id FROM nodes |
798 INNER JOIN items USING (node_id) | 864 INNER JOIN items USING (node_id) |
799 WHERE node=%s AND item=%s""", | 865 WHERE node_id=%s AND item=%s""", |
800 (self.nodeIdentifier, | 866 (self.nodeDbId, |
801 itemIdentifier)) | 867 itemIdentifier)) |
802 result = cursor.fetchone() | 868 result = cursor.fetchone() |
803 if result: | 869 if not result: |
804 for data in result: | 870 raise error.ItemNotFound() |
805 item = generic.stripNamespace(parseXml(data[0])) | 871 |
806 access_model = data[1] | 872 item = generic.stripNamespace(parseXml(result[0])) |
807 item_id = data[2] | 873 access_model = result[1] |
808 if access_model == 'roster': #TODO: jid access_model | 874 item_id = result[2] |
809 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | 875 access_list = {} |
810 access_list = [r[0] for r in cursor.fetchall()] | 876 if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model |
811 else: | 877 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) |
812 access_list = None | 878 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] |
813 | 879 |
814 ret.append((item, access_model, access_list)) | 880 ret.append((item, access_model, access_list)) |
815 else: #we check permission before returning items | 881 else: #we check permission before returning items |
816 for itemIdentifier in itemIdentifiers: | 882 for itemIdentifier in itemIdentifiers: |
817 args = [self.nodeIdentifier, itemIdentifier] | 883 args = [self.nodeDbId, itemIdentifier] |
818 if authorized_groups: | 884 if authorized_groups: |
819 args.append(authorized_groups) | 885 args.append(authorized_groups) |
820 cursor.execute("""SELECT data FROM nodes | 886 cursor.execute("""SELECT data FROM nodes |
821 INNER JOIN items USING (node_id) | 887 INNER JOIN items USING (node_id) |
822 LEFT JOIN item_groups_authorized USING (item_id) | 888 LEFT JOIN item_groups_authorized USING (item_id) |
823 WHERE node=%s AND item=%s AND | 889 WHERE node_id=%s AND item=%s AND |
824 (items.access_model='open' """ + | 890 (items.access_model='open' """ + |
825 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", | 891 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", |
826 args) | 892 args) |
827 | 893 |
828 result = cursor.fetchone() | 894 result = cursor.fetchone() |
829 if result: | 895 if result: |
830 ret.append(parseXml(result[0])) | 896 ret.append(generic.stripNamespace(parseXml(result[0]))) |
831 | 897 |
832 return ret | 898 return ret |
899 | |
900 | |
901 def getItemsPublishers(self, itemIdentifiers): | |
902 """Get the publishers for all given identifiers | |
903 | |
904 @return (dict): map of itemIdentifiers to publisher | |
905 """ | |
906 return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers) | |
907 | |
908 | |
909 def _getItemsPublishers(self, cursor, itemIdentifiers): | |
910 self._checkNodeExists(cursor) | |
911 ret = {} | |
912 for itemIdentifier in itemIdentifiers: | |
913 cursor.execute("""SELECT publisher FROM items | |
914 WHERE item=%s""", | |
915 (itemIdentifier,)) | |
916 result = cursor.fetchone() | |
917 if not result: | |
918 # We have an internal error, that's why we use ValueError | |
919 # and not error.ItemNotFound() | |
920 raise ValueError() # itemIdentifier must exists | |
921 ret[itemIdentifier] = jid.JID(result[0]) | |
922 return ret | |
923 | |
833 | 924 |
834 def purge(self): | 925 def purge(self): |
835 return self.dbpool.runInteraction(self._purge) | 926 return self.dbpool.runInteraction(self._purge) |
836 | 927 |
837 | 928 |
838 def _purge(self, cursor): | 929 def _purge(self, cursor): |
839 self._checkNodeExists(cursor) | 930 self._checkNodeExists(cursor) |
840 | 931 |
841 cursor.execute("""DELETE FROM items WHERE | 932 cursor.execute("""DELETE FROM items WHERE |
842 node_id=(SELECT node_id FROM nodes WHERE node=%s)""", | 933 node_id=%s""", |
843 (self.nodeIdentifier,)) | 934 (self.nodeDbId,)) |
844 | 935 |
845 | 936 # FIXME: to be checked |
846 def filterItemsWithPublisher(self, itemIdentifiers, requestor): | 937 # def filterItemsWithPublisher(self, itemIdentifiers, recipient): |
847 return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, requestor) | 938 # return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, recipient) |
848 | 939 |
849 def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor): | 940 # def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor): |
850 self._checkNodeExists(cursor) | 941 # self._checkNodeExists(cursor) |
851 ret = [] | 942 # ret = [] |
852 for itemIdentifier in itemIdentifiers: | 943 # for itemIdentifier in itemIdentifiers: |
853 args = ["%s/%%" % requestor.userhost(), itemIdentifier] | 944 # args = ["%s/%%" % requestor.userhost(), itemIdentifier] |
854 cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args) | 945 # cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args) |
855 result = cursor.fetchone() | 946 # result = cursor.fetchone() |
856 if result: | 947 # if result: |
857 ret.append(result[0]) | 948 # ret.append(result[0]) |
858 return ret | 949 # return ret |
859 | 950 |
860 class CollectionNode(Node): | 951 class CollectionNode(Node): |
861 | 952 |
862 nodeType = 'collection' | 953 nodeType = 'collection' |
863 | 954 |