comparison sat_pubsub/pgsql_storage.py @ 294:df1edebb0466

PEP implementation, draft (huge patch sorry): /!\ database schema has changed ! /!\ - whole PEP behaviour is not managed yet - if the stanza is delegated, PEP is assumed - fixed potential SQL injection in pgsql_storage - publish notifications manage PEP - added retract notifications (if "notify" attribute is present), with PEP handling - a publisher can't replace an item he didn't publised anymore - /!\ schema has changed, sat_pubsub_update_0_1.sql update it - sat_pubsub_update_0_1.sql also fixes bad items coming from former version of SàT
author Goffi <goffi@goffi.org>
date Sun, 16 Aug 2015 01:32:42 +0200
parents 002c59dbc23f
children 4115999d85e9
comparison
equal deleted inserted replaced
293:b96a4ac25f8b 294:df1edebb0466
70 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) 70 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
71 71
72 # parseXml manage str, but we get unicode 72 # parseXml manage str, but we get unicode
73 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) 73 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
74 74
75
76 def withPEP(query, values, pep, recipient, pep_table=None):
77 """Helper method to facilitate PEP management
78
79 @param query: SQL query basis
80 @param values: current values to replace in query
81 @param pep: True if we are in PEP mode
82 @param recipient: jid of the recipient
83 @param pep_table: added before pep if table need to be specified
84 @return: query + PEP AND check,
85 recipient's bare jid is added to value if needed
86 """
87 pep_col_name = "{}pep".format(
88 '' if pep_table is None
89 else ".{}".format(pep_table))
90 if pep:
91 pep_check="AND {}=%s".format(pep_col_name)
92 values=list(values) + [recipient.userhost()]
93 else:
94 pep_check="AND {} IS NULL".format(pep_col_name)
95 return "{} {}".format(query, pep_check), values
96
97
75 class Storage: 98 class Storage:
76 99
77 implements(iidavoll.IStorage) 100 implements(iidavoll.IStorage)
78 101
79 defaultConfig = { 102 defaultConfig = {
80 'leaf': { 103 'leaf': {
81 "pubsub#persist_items": True, 104 const.OPT_PERSIST_ITEMS: True,
82 "pubsub#deliver_payloads": True, 105 const.OPT_DELIVER_PAYLOADS: True,
83 "pubsub#send_last_published_item": 'on_sub', 106 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
84 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 107 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
85 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, 108 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
86 }, 109 },
87 'collection': { 110 'collection': {
88 "pubsub#deliver_payloads": True, 111 const.OPT_DELIVER_PAYLOADS: True,
89 "pubsub#send_last_published_item": 'on_sub', 112 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
90 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, 113 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
91 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, 114 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
92 } 115 }
93 } 116 }
94 117
95 def __init__(self, dbpool): 118 def __init__(self, dbpool):
96 self.dbpool = dbpool 119 self.dbpool = dbpool
97 120
98 def getNode(self, nodeIdentifier): 121 def _buildNode(self, row):
99 return self.dbpool.runInteraction(self._getNode, nodeIdentifier) 122 """Build a note class from database result row"""
100
101 def _getNode(self, cursor, nodeIdentifier):
102 configuration = {} 123 configuration = {}
103 cursor.execute("""SELECT node_type, 124
125 if not row:
126 raise error.NodeNotFound()
127
128 if row[2] == 'leaf':
129 configuration = {
130 'pubsub#persist_items': row[3],
131 'pubsub#deliver_payloads': row[4],
132 'pubsub#send_last_published_item': row[5],
133 const.OPT_ACCESS_MODEL:row[6],
134 const.OPT_PUBLISH_MODEL:row[7],
135 }
136 node = LeafNode(row[0], row[1], configuration)
137 node.dbpool = self.dbpool
138 return node
139 elif row[2] == 'collection':
140 configuration = {
141 'pubsub#deliver_payloads': row[4],
142 'pubsub#send_last_published_item': row[5],
143 const.OPT_ACCESS_MODEL: row[6],
144 const.OPT_PUBLISH_MODEL:row[7],
145 }
146 node = CollectionNode(row[0], row[1], configuration)
147 node.dbpool = self.dbpool
148 return node
149 else:
150 raise ValueError("Unknown node type !")
151
152 def getNodeById(self, nodeDbId):
153 """Get node using database ID insted of pubsub identifier
154
155 @param nodeDbId(unicode): database ID
156 """
157 return self.dbpool.runInteraction(self._getNodeById, nodeDbId)
158
159
160 def _getNodeById(self, cursor, nodeDbId):
161 cursor.execute("""SELECT node_id,
162 node,
163 node_type,
104 persist_items, 164 persist_items,
105 deliver_payloads, 165 deliver_payloads,
106 send_last_published_item, 166 send_last_published_item,
107 access_model, 167 access_model,
108 publish_model 168 publish_model,
109 FROM nodes 169 pep
110 WHERE node=%s""", 170 FROM nodes
111 (nodeIdentifier,)) 171 WHERE node_id=%s""",
172 (nodeDbId,))
112 row = cursor.fetchone() 173 row = cursor.fetchone()
113 174 return self._buildNode(row)
114 if not row: 175
115 raise error.NodeNotFound() 176 def getNode(self, nodeIdentifier, pep, recipient=None):
116 177 return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient)
117 if row[0] == 'leaf': 178
118 configuration = { 179
119 'pubsub#persist_items': row[1], 180 def _getNode(self, cursor, nodeIdentifier, pep, recipient):
120 'pubsub#deliver_payloads': row[2], 181 cursor.execute(*withPEP("""SELECT node_id,
121 'pubsub#send_last_published_item': row[3], 182 node,
122 const.OPT_ACCESS_MODEL:row[4], 183 node_type,
123 const.OPT_PUBLISH_MODEL:row[5], 184 persist_items,
124 } 185 deliver_payloads,
125 node = LeafNode(nodeIdentifier, configuration) 186 send_last_published_item,
126 node.dbpool = self.dbpool 187 access_model,
127 return node 188 publish_model,
128 elif row[0] == 'collection': 189 pep
129 configuration = { 190 FROM nodes
130 'pubsub#deliver_payloads': row[2], 191 WHERE node=%s""",
131 'pubsub#send_last_published_item': row[3], 192 (nodeIdentifier,), pep, recipient))
132 const.OPT_ACCESS_MODEL: row[4], 193 row = cursor.fetchone()
133 const.OPT_PUBLISH_MODEL:row[5], 194 return self._buildNode(row)
134 } 195
135 node = CollectionNode(nodeIdentifier, configuration) 196 def getNodeIds(self, pep):
136 node.dbpool = self.dbpool 197 d = self.dbpool.runQuery("""SELECT node from nodes WHERE pep is {}NULL"""
137 return node 198 .format("NOT " if pep else ""))
138
139
140
141 def getNodeIds(self):
142 d = self.dbpool.runQuery("""SELECT node from nodes""")
143 d.addCallback(lambda results: [r[0] for r in results]) 199 d.addCallback(lambda results: [r[0] for r in results])
144 return d 200 return d
145 201
146 202
147 def createNode(self, nodeIdentifier, owner, config): 203 def createNode(self, nodeIdentifier, owner, config, pep, recipient=None):
148 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, 204 return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
149 owner, config) 205 owner, config, pep, recipient)
150 206
151 207
152 def _createNode(self, cursor, nodeIdentifier, owner, config): 208 def _createNode(self, cursor, nodeIdentifier, owner, config, pep, recipient):
153 if config['pubsub#node_type'] != 'leaf': 209 if config['pubsub#node_type'] != 'leaf':
154 raise error.NoCollections() 210 raise error.NoCollections()
155 211
156 owner = owner.userhost() 212 owner = owner.userhost()
213
157 try: 214 try:
158 cursor.execute("""INSERT INTO nodes 215 cursor.execute("""INSERT INTO nodes
159 (node, node_type, persist_items, 216 (node, node_type, persist_items,
160 deliver_payloads, send_last_published_item, access_model, publish_model) 217 deliver_payloads, send_last_published_item, access_model, publish_model, pep)
161 VALUES 218 VALUES
162 (%s, 'leaf', %s, %s, %s, %s, %s)""", 219 (%s, 'leaf', %s, %s, %s, %s, %s, %s)""",
163 (nodeIdentifier, 220 (nodeIdentifier,
164 config['pubsub#persist_items'], 221 config['pubsub#persist_items'],
165 config['pubsub#deliver_payloads'], 222 config['pubsub#deliver_payloads'],
166 config['pubsub#send_last_published_item'], 223 config['pubsub#send_last_published_item'],
167 config[const.OPT_ACCESS_MODEL], 224 config[const.OPT_ACCESS_MODEL],
168 config[const.OPT_PUBLISH_MODEL], 225 config[const.OPT_PUBLISH_MODEL],
226 recipient.userhost() if pep else None
169 ) 227 )
170 ) 228 )
171 except cursor._pool.dbapi.IntegrityError: 229 except cursor._pool.dbapi.IntegrityError:
172 raise error.NodeExists() 230 raise error.NodeExists()
173 231
174 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", (nodeIdentifier,)); 232 cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""",
233 (nodeIdentifier,), pep, recipient));
175 node_id = cursor.fetchone()[0] 234 node_id = cursor.fetchone()[0]
176 235
177 cursor.execute("""SELECT 1 as bool from entities where jid=%s""", 236 cursor.execute("""SELECT 1 as bool from entities where jid=%s""",
178 (owner,)) 237 (owner,))
179 238
208 for group in allowed_groups: 267 for group in allowed_groups:
209 #TODO: check that group are actually in roster 268 #TODO: check that group are actually in roster
210 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) 269 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname)
211 VALUES (%s,%s)""" , (node_id, group)) 270 VALUES (%s,%s)""" , (node_id, group))
212 271
213 272 def deleteNodeByDbId(self, db_id):
214 def deleteNode(self, nodeIdentifier): 273 """Delete a node using directly its database id"""
215 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier) 274 return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id)
216 275
217 276 def _deleteNodeByDbId(self, cursor, db_id):
218 def _deleteNode(self, cursor, nodeIdentifier): 277 cursor.execute("""DELETE FROM nodes WHERE node_id=%s""",
219 cursor.execute("""DELETE FROM nodes WHERE node=%s""", 278 (db_id,))
220 (nodeIdentifier,))
221 279
222 if cursor.rowcount != 1: 280 if cursor.rowcount != 1:
223 raise error.NodeNotFound() 281 raise error.NodeNotFound()
224 282
225 def getNodeGroups(self, nodeIdentifier): 283 def deleteNode(self, nodeIdentifier, pep, recipient=None):
226 return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier) 284 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient)
227 285
228 def _getNodeGroups(self, cursor, nodeIdentifier): 286
229 cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", 287 def _deleteNode(self, cursor, nodeIdentifier, pep, recipient):
230 (nodeIdentifier,)) 288 cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""",
289 (nodeIdentifier,), pep, recipient))
290
291 if cursor.rowcount != 1:
292 raise error.NodeNotFound()
293
294 def getNodeGroups(self, nodeIdentifier, pep, recipient=None):
295 return self.dbpool.runInteraction(self._getNodeGroups, nodeIdentifier, pep, recipient)
296
297 def _getNodeGroups(self, cursor, nodeIdentifier, pep, recipient):
298 cursor.execute(*withPEP("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
299 (nodeIdentifier,), pep, recipient))
231 rows = cursor.fetchall() 300 rows = cursor.fetchall()
232 301
233 return [row[0] for row in rows] 302 return [row[0] for row in rows]
234 303
235 def getAffiliations(self, entity): 304 def getAffiliations(self, entity, pep, recipient=None):
236 d = self.dbpool.runQuery("""SELECT node, affiliation FROM entities 305 d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities
237 NATURAL JOIN affiliations 306 NATURAL JOIN affiliations
238 NATURAL JOIN nodes 307 NATURAL JOIN nodes
239 WHERE jid=%s""", 308 WHERE jid=%s""",
240 (entity.userhost(),)) 309 (entity.userhost(),), pep, recipient, 'nodes'))
241 d.addCallback(lambda results: [tuple(r) for r in results]) 310 d.addCallback(lambda results: [tuple(r) for r in results])
242 return d 311 return d
243 312
244 313
245 def getSubscriptions(self, entity): 314 def getSubscriptions(self, entity, pep, recipient=None):
246 def toSubscriptions(rows): 315 def toSubscriptions(rows):
247 subscriptions = [] 316 subscriptions = []
248 for row in rows: 317 for row in rows:
249 subscriber = jid.internJID('%s/%s' % (row[1], 318 subscriber = jid.internJID('%s/%s' % (row[1],
250 row[2])) 319 row[2]))
254 323
255 d = self.dbpool.runQuery("""SELECT node, jid, resource, state 324 d = self.dbpool.runQuery("""SELECT node, jid, resource, state
256 FROM entities 325 FROM entities
257 NATURAL JOIN subscriptions 326 NATURAL JOIN subscriptions
258 NATURAL JOIN nodes 327 NATURAL JOIN nodes
259 WHERE jid=%s""", 328 WHERE jid=%s AND nodes.pep=%s""",
260 (entity.userhost(),)) 329 (entity.userhost(), recipient.userhost() if pep else None))
261 d.addCallback(toSubscriptions) 330 d.addCallback(toSubscriptions)
262 return d 331 return d
263 332
264 333
265 def getDefaultConfiguration(self, nodeType): 334 def getDefaultConfiguration(self, nodeType):
269 338
270 class Node: 339 class Node:
271 340
272 implements(iidavoll.INode) 341 implements(iidavoll.INode)
273 342
274 def __init__(self, nodeIdentifier, config): 343 def __init__(self, nodeDbId, nodeIdentifier, config):
344 self.nodeDbId = nodeDbId
275 self.nodeIdentifier = nodeIdentifier 345 self.nodeIdentifier = nodeIdentifier
276 self._config = config 346 self._config = config
277 self.owner = None; 347 self.owner = None;
278 348
279 349
280 def _checkNodeExists(self, cursor): 350 def _checkNodeExists(self, cursor):
281 cursor.execute("""SELECT node_id FROM nodes WHERE node=%s""", 351 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""",
282 (self.nodeIdentifier,)) 352 (self.nodeDbId,))
283 if not cursor.fetchone(): 353 if not cursor.fetchone():
284 raise error.NodeNotFound() 354 raise error.NodeNotFound()
285 355
286 356
287 def getType(self): 357 def getType(self):
288 return self.nodeType 358 return self.nodeType
289 359
290 def getNodeOwner(self): 360 def getNodeOwner(self):
291 if self.owner: 361 if self.owner:
292 return defer.succeed(self.owner) 362 return defer.succeed(self.owner)
293 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node=%s""", (self.nodeIdentifier,)) 363 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s""", (self.nodeDbId,))
294 d.addCallback(lambda result: jid.JID(result[0][0])) 364 d.addCallback(lambda result: jid.JID(result[0][0]))
295 return d 365 return d
296 366
297 367
298 def getConfiguration(self): 368 def getConfiguration(self):
313 383
314 def _setConfiguration(self, cursor, config): 384 def _setConfiguration(self, cursor, config):
315 self._checkNodeExists(cursor) 385 self._checkNodeExists(cursor)
316 cursor.execute("""UPDATE nodes SET persist_items=%s, 386 cursor.execute("""UPDATE nodes SET persist_items=%s,
317 deliver_payloads=%s, 387 deliver_payloads=%s,
318 send_last_published_item=%s 388 send_last_published_item=%s,
319 WHERE node=%s""", 389 access_model=%s,
320 (config["pubsub#persist_items"], 390 publish_model=%s
321 config["pubsub#deliver_payloads"], 391 WHERE node_id=%s""",
322 config["pubsub#send_last_published_item"], 392 (config[const.OPT_PERSIST_ITEMS],
323 self.nodeIdentifier)) 393 config[const.OPT_DELIVER_PAYLOADS],
394 config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
395 config[const.OPT_ACCESS_MODEL],
396 config[const.OPT_PUBLISH_MODEL],
397 self.nodeDbId))
324 398
325 399
326 def _setCachedConfiguration(self, void, config): 400 def _setCachedConfiguration(self, void, config):
327 self._config = config 401 self._config = config
328 402
340 def _getAffiliation(self, cursor, entity): 414 def _getAffiliation(self, cursor, entity):
341 self._checkNodeExists(cursor) 415 self._checkNodeExists(cursor)
342 cursor.execute("""SELECT affiliation FROM affiliations 416 cursor.execute("""SELECT affiliation FROM affiliations
343 NATURAL JOIN nodes 417 NATURAL JOIN nodes
344 NATURAL JOIN entities 418 NATURAL JOIN entities
345 WHERE node=%s AND jid=%s""", 419 WHERE node_id=%s AND jid=%s""",
346 (self.nodeIdentifier, 420 (self.nodeDbId,
347 entity.userhost())) 421 entity.userhost()))
348 422
349 try: 423 try:
350 return cursor.fetchone()[0] 424 return cursor.fetchone()[0]
351 except TypeError: 425 except TypeError:
352 return None 426 return None
353 427
428
354 def getAccessModel(self): 429 def getAccessModel(self):
355 return self.dbpool.runInteraction(self._getAccessModel) 430 return self._config[const.OPT_ACCESS_MODEL]
356
357 def _getAccessModel(self, cursor, entity):
358 self._checkNodeExists(cursor)
359 cursor.execute("""SELECT access_model FROM nodes
360 WHERE node=%s""",
361 (self.nodeIdentifier,))
362
363 try:
364 return cursor.fetchone()[0]
365 except TypeError:
366 return None
367 431
368 432
369 def getSubscription(self, subscriber): 433 def getSubscription(self, subscriber):
370 return self.dbpool.runInteraction(self._getSubscription, subscriber) 434 return self.dbpool.runInteraction(self._getSubscription, subscriber)
371 435
377 resource = subscriber.resource or '' 441 resource = subscriber.resource or ''
378 442
379 cursor.execute("""SELECT state FROM subscriptions 443 cursor.execute("""SELECT state FROM subscriptions
380 NATURAL JOIN nodes 444 NATURAL JOIN nodes
381 NATURAL JOIN entities 445 NATURAL JOIN entities
382 WHERE node=%s AND jid=%s AND resource=%s""", 446 WHERE node_id=%s AND jid=%s AND resource=%s""",
383 (self.nodeIdentifier, 447 (self.nodeDbId,
384 userhost, 448 userhost,
385 resource)) 449 resource))
386 450
387 row = cursor.fetchone() 451 row = cursor.fetchone()
388 if not row: 452 if not row:
396 460
397 461
398 def _getSubscriptions(self, cursor, state): 462 def _getSubscriptions(self, cursor, state):
399 self._checkNodeExists(cursor) 463 self._checkNodeExists(cursor)
400 464
401 query = """SELECT jid, resource, state, 465 query = """SELECT node, jid, resource, state,
402 subscription_type, subscription_depth 466 subscription_type, subscription_depth
403 FROM subscriptions 467 FROM subscriptions
404 NATURAL JOIN nodes 468 NATURAL JOIN nodes
405 NATURAL JOIN entities 469 NATURAL JOIN entities
406 WHERE node=%s""" 470 WHERE node_id=%s"""
407 values = [self.nodeIdentifier] 471 values = [self.nodeDbId]
408 472
409 if state: 473 if state:
410 query += " AND state=%s" 474 query += " AND state=%s"
411 values.append(state) 475 values.append(state)
412 476
413 cursor.execute(query, values) 477 cursor.execute(query, values)
414 rows = cursor.fetchall() 478 rows = cursor.fetchall()
415 479
416 subscriptions = [] 480 subscriptions = []
417 for row in rows: 481 for row in rows:
418 subscriber = jid.JID(u'%s/%s' % (row[0], row[1])) 482 subscriber = jid.JID(u'%s/%s' % (row[1], row[2]))
419 483
420 options = {} 484 options = {}
421 if row[3]:
422 options['pubsub#subscription_type'] = row[3];
423 if row[4]: 485 if row[4]:
424 options['pubsub#subscription_depth'] = row[4]; 486 options['pubsub#subscription_type'] = row[4];
425 487 if row[5]:
426 subscriptions.append(Subscription(self.nodeIdentifier, subscriber, 488 options['pubsub#subscription_depth'] = row[5];
427 row[2], options)) 489
490 subscriptions.append(Subscription(row[0], subscriber,
491 row[3], options))
428 492
429 return subscriptions 493 return subscriptions
430 494
431 495
432 def addSubscription(self, subscriber, state, config): 496 def addSubscription(self, subscriber, state, config):
451 515
452 try: 516 try:
453 cursor.execute("""INSERT INTO subscriptions 517 cursor.execute("""INSERT INTO subscriptions
454 (node_id, entity_id, resource, state, 518 (node_id, entity_id, resource, state,
455 subscription_type, subscription_depth) 519 subscription_type, subscription_depth)
456 SELECT node_id, entity_id, %s, %s, %s, %s FROM 520 SELECT %s, entity_id, %s, %s, %s, %s FROM
457 (SELECT node_id FROM nodes
458 WHERE node=%s) as n
459 CROSS JOIN
460 (SELECT entity_id FROM entities 521 (SELECT entity_id FROM entities
461 WHERE jid=%s) as e""", 522 WHERE jid=%s) AS ent_id""",
462 (resource, 523 (self.nodeDbId,
524 resource,
463 state, 525 state,
464 subscription_type, 526 subscription_type,
465 subscription_depth, 527 subscription_depth,
466 self.nodeIdentifier,
467 userhost)) 528 userhost))
468 except cursor._pool.dbapi.IntegrityError: 529 except cursor._pool.dbapi.IntegrityError:
469 raise error.SubscriptionExists() 530 raise error.SubscriptionExists()
470 531
471 532
479 540
480 userhost = subscriber.userhost() 541 userhost = subscriber.userhost()
481 resource = subscriber.resource or '' 542 resource = subscriber.resource or ''
482 543
483 cursor.execute("""DELETE FROM subscriptions WHERE 544 cursor.execute("""DELETE FROM subscriptions WHERE
484 node_id=(SELECT node_id FROM nodes 545 node_id=%s AND
485 WHERE node=%s) AND
486 entity_id=(SELECT entity_id FROM entities 546 entity_id=(SELECT entity_id FROM entities
487 WHERE jid=%s) AND 547 WHERE jid=%s) AND
488 resource=%s""", 548 resource=%s""",
489 (self.nodeIdentifier, 549 (self.nodeDbId,
490 userhost, 550 userhost,
491 resource)) 551 resource))
492 if cursor.rowcount != 1: 552 if cursor.rowcount != 1:
493 raise error.NotSubscribed() 553 raise error.NotSubscribed()
494 554
504 564
505 cursor.execute("""SELECT 1 as bool FROM entities 565 cursor.execute("""SELECT 1 as bool FROM entities
506 NATURAL JOIN subscriptions 566 NATURAL JOIN subscriptions
507 NATURAL JOIN nodes 567 NATURAL JOIN nodes
508 WHERE entities.jid=%s 568 WHERE entities.jid=%s
509 AND node=%s AND state='subscribed'""", 569 AND node_id=%s AND state='subscribed'""",
510 (entity.userhost(), 570 (entity.userhost(),
511 self.nodeIdentifier)) 571 self.nodeDbId))
512 572
513 return cursor.fetchone() is not None 573 return cursor.fetchone() is not None
514 574
515 575
516 def getAffiliations(self): 576 def getAffiliations(self):
521 self._checkNodeExists(cursor) 581 self._checkNodeExists(cursor)
522 582
523 cursor.execute("""SELECT jid, affiliation FROM nodes 583 cursor.execute("""SELECT jid, affiliation FROM nodes
524 NATURAL JOIN affiliations 584 NATURAL JOIN affiliations
525 NATURAL JOIN entities 585 NATURAL JOIN entities
526 WHERE node=%s""", 586 WHERE node_id=%s""",
527 (self.nodeIdentifier,)) 587 (self.nodeDbId,))
528 result = cursor.fetchall() 588 result = cursor.fetchall()
529 589
530 return [(jid.internJID(r[0]), r[1]) for r in result] 590 return [(jid.internJID(r[0]), r[1]) for r in result]
531 591
532 592
539 599
540 def storeItems(self, item_data, publisher): 600 def storeItems(self, item_data, publisher):
541 return self.dbpool.runInteraction(self._storeItems, item_data, publisher) 601 return self.dbpool.runInteraction(self._storeItems, item_data, publisher)
542 602
543 603
544 def _storeItems(self, cursor, item_data, publisher): 604 def _storeItems(self, cursor, items_data, publisher):
545 self._checkNodeExists(cursor) 605 self._checkNodeExists(cursor)
546 for item_datum in item_data: 606 for item_data in items_data:
547 self._storeItem(cursor, item_datum, publisher) 607 self._storeItem(cursor, item_data, publisher)
548 608
549 609
550 def _storeItem(self, cursor, item_datum, publisher): 610 def _storeItem(self, cursor, item_data, publisher):
551 access_model, item_config, item = item_datum 611 item, access_model, item_config = item_data
552 data = item.toXml() 612 data = item.toXml()
553 613
554 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s 614 cursor.execute("""UPDATE items SET date=now(), publisher=%s, data=%s
555 FROM nodes 615 FROM nodes
556 WHERE nodes.node_id = items.node_id AND 616 WHERE nodes.node_id = items.node_id AND
557 nodes.node = %s and items.item=%s""", 617 nodes.node_id = %s and items.item=%s""",
558 (publisher.full(), 618 (publisher.full(),
559 data, 619 data,
560 self.nodeIdentifier, 620 self.nodeDbId,
561 item["id"])) 621 item["id"]))
562 if cursor.rowcount == 1: 622 if cursor.rowcount == 1:
563 return 623 return
564 624
565 cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model) 625 cursor.execute("""INSERT INTO items (node_id, item, publisher, data, access_model)
566 SELECT node_id, %s, %s, %s, %s FROM nodes 626 SELECT %s, %s, %s, %s, %s FROM nodes
567 WHERE node=%s 627 WHERE node_id=%s
568 RETURNING item_id""", 628 RETURNING item_id""",
569 (item["id"], 629 (self.nodeDbId,
630 item["id"],
570 publisher.full(), 631 publisher.full(),
571 data, 632 data,
572 access_model, 633 access_model,
573 self.nodeIdentifier)) 634 self.nodeDbId))
574 635
575 if access_model == const.VAL_AMODEL_ROSTER: 636 if access_model == const.VAL_AMODEL_ROSTER:
576 item_id = cursor.fetchone()[0]; 637 item_id = cursor.fetchone()[0];
577 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config: 638 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config:
578 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value 639 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value
594 655
595 deleted = [] 656 deleted = []
596 657
597 for itemIdentifier in itemIdentifiers: 658 for itemIdentifier in itemIdentifiers:
598 cursor.execute("""DELETE FROM items WHERE 659 cursor.execute("""DELETE FROM items WHERE
599 node_id=(SELECT node_id FROM nodes 660 node_id=%s AND
600 WHERE node=%s) AND
601 item=%s""", 661 item=%s""",
602 (self.nodeIdentifier, 662 (self.nodeDbId,
603 itemIdentifier)) 663 itemIdentifier))
604 664
605 if cursor.rowcount: 665 if cursor.rowcount:
606 deleted.append(itemIdentifier) 666 deleted.append(itemIdentifier)
607 667
621 if ext_data is None: 681 if ext_data is None:
622 ext_data = {} 682 ext_data = {}
623 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data) 683 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data)
624 684
625 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data): 685 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data):
686 # FIXME: simplify the query construction
626 self._checkNodeExists(cursor) 687 self._checkNodeExists(cursor)
627 688
628 if unrestricted: 689 if unrestricted:
629 query = ["SELECT data,items.access_model,item_id"] 690 query = ["SELECT data,items.access_model,item_id"]
630 source = """FROM nodes 691 source = """FROM nodes
631 INNER JOIN items USING (node_id) 692 INNER JOIN items USING (node_id)
632 WHERE node=%s""" 693 WHERE node_id=%s"""
633 args = [self.nodeIdentifier] 694 args = [self.nodeDbId]
634 else: 695 else:
635 query = ["SELECT data"] 696 query = ["SELECT data"]
636 groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else "" 697 groups = " or (items.access_model='roster' and groupname in %s)" if authorized_groups else ""
637 source = """FROM nodes 698 source = """FROM nodes
638 INNER JOIN items USING (node_id) 699 INNER JOIN items USING (node_id)
639 LEFT JOIN item_groups_authorized USING (item_id) 700 LEFT JOIN item_groups_authorized USING (item_id)
640 WHERE node=%s AND 701 WHERE node_id=%s AND
641 (items.access_model='open'""" + groups + ")" 702 (items.access_model='open'""" + groups + ")"
642 703
643 args = [self.nodeIdentifier] 704 args = [self.nodeDbId]
644 if authorized_groups: 705 if authorized_groups:
645 args.append(authorized_groups) 706 args.append(authorized_groups)
646 707
647 if 'filters' in ext_data: # MAM filters 708 if 'filters' in ext_data: # MAM filters
648 for filter_ in ext_data['filters']: 709 for filter_ in ext_data['filters']:
649 if filter_.var == 'start': 710 if filter_.var == 'start':
650 source += " AND date>='{date}'".format(date=filter_.value) 711 source += " AND date>=%s"
712 args.append(filter_.value)
651 if filter_.var == 'end': 713 if filter_.var == 'end':
652 source += " AND date<='{date}'".format(date=filter_.value) 714 source += " AND date<=%s"
715 args.append(filter_.value)
653 if filter_.var == 'with': 716 if filter_.var == 'with':
654 jid_s = filter_.value 717 jid_s = filter_.value
655 if '/' in jid_s: 718 if '/' in jid_s:
656 source += " AND publisher='{pub}'".format(pub=filter_.value) 719 source += " AND publisher=%s"
657 else: # assume the publisher field in DB is always a full JID 720 args.append(filter_.value)
658 # XXX: need to escape the % with itself to avoid formatting error 721 else:
659 source += " AND publisher LIKE '{pub}/%%'".format(pub=filter_.value) 722 source += " AND publisher LIKE %s"
723 args.append(u"{}%".format(filter_.value))
660 724
661 query.append(source) 725 query.append(source)
662 order = "DESC" 726 order = "DESC"
663 727
664 if 'rsm' in ext_data: 728 if 'rsm' in ext_data:
665 rsm = ext_data['rsm'] 729 rsm = ext_data['rsm']
666 maxItems = rsm.max 730 maxItems = rsm.max
667 if rsm.index is not None: 731 if rsm.index is not None:
668 query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)") 732 query.append("AND date<=(SELECT date " + source + " ORDER BY date DESC LIMIT 1 OFFSET %s)")
669 args.append(self.nodeIdentifier) 733 # FIXME: change the request so source is not used 2 times
734 # there is already a placeholder in source with node_id=%s, so we need to add self.noDbId in args
735 args.append(self.nodeDbId)
670 if authorized_groups: 736 if authorized_groups:
671 args.append(authorized_groups) 737 args.append(authorized_groups)
672 args.append(rsm.index) 738 args.append(rsm.index)
673 elif rsm.before is not None: 739 elif rsm.before is not None:
674 order = "ASC" 740 order = "ASC"
692 ret = [] 758 ret = []
693 for data in result: 759 for data in result:
694 item = generic.stripNamespace(parseXml(data[0])) 760 item = generic.stripNamespace(parseXml(data[0]))
695 access_model = data[1] 761 access_model = data[1]
696 item_id = data[2] 762 item_id = data[2]
697 if access_model == 'roster': #TODO: jid access_model 763 access_list = {}
764 if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
698 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) 765 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
699 access_list = [r[0] for r in cursor.fetchall()] 766 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
700 else:
701 access_list = None
702 767
703 ret.append((item, access_model, access_list)) 768 ret.append((item, access_model, access_list))
704 return ret 769 return ret
705 items = [generic.stripNamespace(parseXml(r[0])) for r in result] 770 items = [generic.stripNamespace(parseXml(r[0])) for r in result]
706 return items 771 return items
718 self._checkNodeExists(cursor) 783 self._checkNodeExists(cursor)
719 784
720 if unrestricted: 785 if unrestricted:
721 query = ["""SELECT count(item_id) FROM nodes 786 query = ["""SELECT count(item_id) FROM nodes
722 INNER JOIN items USING (node_id) 787 INNER JOIN items USING (node_id)
723 WHERE node=%s"""] 788 WHERE node_id=%s"""]
724 args = [self.nodeIdentifier] 789 args = [self.nodeDbId]
725 else: 790 else:
726 query = ["""SELECT count(item_id) FROM nodes 791 query = ["""SELECT count(item_id) FROM nodes
727 INNER JOIN items USING (node_id) 792 INNER JOIN items USING (node_id)
728 LEFT JOIN item_groups_authorized USING (item_id) 793 LEFT JOIN item_groups_authorized USING (item_id)
729 WHERE node=%s AND 794 WHERE node_id=%s AND
730 (items.access_model='open' """ + 795 (items.access_model='open' """ +
731 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + 796 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
732 ")"] 797 ")"]
733 798
734 args = [self.nodeIdentifier] 799 args = [self.nodeDbId]
735 if authorized_groups: 800 if authorized_groups:
736 args.append(authorized_groups) 801 args.append(authorized_groups)
737 802
738 cursor.execute(' '.join(query), args) 803 cursor.execute(' '.join(query), args)
739 return cursor.fetchall()[0][0] 804 return cursor.fetchall()[0][0]
753 818
754 if unrestricted: 819 if unrestricted:
755 query = ["""SELECT row_number FROM ( 820 query = ["""SELECT row_number FROM (
756 SELECT row_number() OVER (ORDER BY date DESC), item 821 SELECT row_number() OVER (ORDER BY date DESC), item
757 FROM nodes INNER JOIN items USING (node_id) 822 FROM nodes INNER JOIN items USING (node_id)
758 WHERE node=%s 823 WHERE node_id=%s
759 ) as x 824 ) as x
760 WHERE item=%s LIMIT 1"""] 825 WHERE item=%s LIMIT 1"""]
761 args = [self.nodeIdentifier] 826 args = [self.nodeDbId]
762 else: 827 else:
763 query = ["""SELECT row_number FROM ( 828 query = ["""SELECT row_number FROM (
764 SELECT row_number() OVER (ORDER BY date DESC), item 829 SELECT row_number() OVER (ORDER BY date DESC), item
765 FROM nodes INNER JOIN items USING (node_id) 830 FROM nodes INNER JOIN items USING (node_id)
766 LEFT JOIN item_groups_authorized USING (item_id) 831 LEFT JOIN item_groups_authorized USING (item_id)
767 WHERE node=%s AND 832 WHERE node_id=%s AND
768 (items.access_model='open' """ + 833 (items.access_model='open' """ +
769 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + 834 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') +
770 """)) as x 835 """)) as x
771 WHERE item=%s LIMIT 1"""] 836 WHERE item=%s LIMIT 1"""]
772 837
773 args = [self.nodeIdentifier] 838 args = [self.nodeDbId]
774 if authorized_groups: 839 if authorized_groups:
775 args.append(authorized_groups) 840 args.append(authorized_groups)
776 841
777 args.append(item) 842 args.append(item)
778 cursor.execute(' '.join(query), args) 843 cursor.execute(' '.join(query), args)
782 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): 847 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers):
783 """ Get items which are in the given list 848 """ Get items which are in the given list
784 @param authorized_groups: we want to get items that these groups can access 849 @param authorized_groups: we want to get items that these groups can access
785 @param unrestricted: if true, don't check permissions 850 @param unrestricted: if true, don't check permissions
786 @param itemIdentifiers: list of ids of the items we want to get 851 @param itemIdentifiers: list of ids of the items we want to get
787 @return: list of (item, access_model, access_model) if unrestricted is True, else list of items 852 @return: list of (item, access_model, access_list) if unrestricted is True, else list of items
853 access_list is managed as a dictionnary with same key as for item_config
788 """ 854 """
789 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) 855 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers)
790 856
791 857
792 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): 858 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers):
794 ret = [] 860 ret = []
795 if unrestricted: #we get everything without checking permissions 861 if unrestricted: #we get everything without checking permissions
796 for itemIdentifier in itemIdentifiers: 862 for itemIdentifier in itemIdentifiers:
797 cursor.execute("""SELECT data,items.access_model,item_id FROM nodes 863 cursor.execute("""SELECT data,items.access_model,item_id FROM nodes
798 INNER JOIN items USING (node_id) 864 INNER JOIN items USING (node_id)
799 WHERE node=%s AND item=%s""", 865 WHERE node_id=%s AND item=%s""",
800 (self.nodeIdentifier, 866 (self.nodeDbId,
801 itemIdentifier)) 867 itemIdentifier))
802 result = cursor.fetchone() 868 result = cursor.fetchone()
803 if result: 869 if not result:
804 for data in result: 870 raise error.ItemNotFound()
805 item = generic.stripNamespace(parseXml(data[0])) 871
806 access_model = data[1] 872 item = generic.stripNamespace(parseXml(result[0]))
807 item_id = data[2] 873 access_model = result[1]
808 if access_model == 'roster': #TODO: jid access_model 874 item_id = result[2]
809 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) 875 access_list = {}
810 access_list = [r[0] for r in cursor.fetchall()] 876 if access_model == const.VAL_AMODEL_ROSTER: #TODO: jid access_model
811 else: 877 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
812 access_list = None 878 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
813 879
814 ret.append((item, access_model, access_list)) 880 ret.append((item, access_model, access_list))
815 else: #we check permission before returning items 881 else: #we check permission before returning items
816 for itemIdentifier in itemIdentifiers: 882 for itemIdentifier in itemIdentifiers:
817 args = [self.nodeIdentifier, itemIdentifier] 883 args = [self.nodeDbId, itemIdentifier]
818 if authorized_groups: 884 if authorized_groups:
819 args.append(authorized_groups) 885 args.append(authorized_groups)
820 cursor.execute("""SELECT data FROM nodes 886 cursor.execute("""SELECT data FROM nodes
821 INNER JOIN items USING (node_id) 887 INNER JOIN items USING (node_id)
822 LEFT JOIN item_groups_authorized USING (item_id) 888 LEFT JOIN item_groups_authorized USING (item_id)
823 WHERE node=%s AND item=%s AND 889 WHERE node_id=%s AND item=%s AND
824 (items.access_model='open' """ + 890 (items.access_model='open' """ +
825 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", 891 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")",
826 args) 892 args)
827 893
828 result = cursor.fetchone() 894 result = cursor.fetchone()
829 if result: 895 if result:
830 ret.append(parseXml(result[0])) 896 ret.append(generic.stripNamespace(parseXml(result[0])))
831 897
832 return ret 898 return ret
899
900
901 def getItemsPublishers(self, itemIdentifiers):
902 """Get the publishers for all given identifiers
903
904 @return (dict): map of itemIdentifiers to publisher
905 """
906 return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers)
907
908
909 def _getItemsPublishers(self, cursor, itemIdentifiers):
910 self._checkNodeExists(cursor)
911 ret = {}
912 for itemIdentifier in itemIdentifiers:
913 cursor.execute("""SELECT publisher FROM items
914 WHERE item=%s""",
915 (itemIdentifier,))
916 result = cursor.fetchone()
917 if not result:
918 # We have an internal error, that's why we use ValueError
919 # and not error.ItemNotFound()
920 raise ValueError() # itemIdentifier must exists
921 ret[itemIdentifier] = jid.JID(result[0])
922 return ret
923
833 924
834 def purge(self): 925 def purge(self):
835 return self.dbpool.runInteraction(self._purge) 926 return self.dbpool.runInteraction(self._purge)
836 927
837 928
838 def _purge(self, cursor): 929 def _purge(self, cursor):
839 self._checkNodeExists(cursor) 930 self._checkNodeExists(cursor)
840 931
841 cursor.execute("""DELETE FROM items WHERE 932 cursor.execute("""DELETE FROM items WHERE
842 node_id=(SELECT node_id FROM nodes WHERE node=%s)""", 933 node_id=%s""",
843 (self.nodeIdentifier,)) 934 (self.nodeDbId,))
844 935
845 936 # FIXME: to be checked
846 def filterItemsWithPublisher(self, itemIdentifiers, requestor): 937 # def filterItemsWithPublisher(self, itemIdentifiers, recipient):
847 return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, requestor) 938 # return self.dbpool.runInteraction(self._filterItemsWithPublisher, itemIdentifiers, recipient)
848 939
849 def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor): 940 # def _filterItemsWithPublisher(self, cursor, itemIdentifiers, requestor):
850 self._checkNodeExists(cursor) 941 # self._checkNodeExists(cursor)
851 ret = [] 942 # ret = []
852 for itemIdentifier in itemIdentifiers: 943 # for itemIdentifier in itemIdentifiers:
853 args = ["%s/%%" % requestor.userhost(), itemIdentifier] 944 # args = ["%s/%%" % requestor.userhost(), itemIdentifier]
854 cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args) 945 # cursor.execute("""SELECT item FROM items WHERE publisher LIKE %s AND item=%s""", args)
855 result = cursor.fetchone() 946 # result = cursor.fetchone()
856 if result: 947 # if result:
857 ret.append(result[0]) 948 # ret.append(result[0])
858 return ret 949 # return ret
859 950
860 class CollectionNode(Node): 951 class CollectionNode(Node):
861 952
862 nodeType = 'collection' 953 nodeType = 'collection'
863 954