Mercurial > libervia-pubsub
comparison src/pgsql_storage.py @ 369:dabee42494ac
config file + cleaning:
- SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir).
Its options must be in the "pubsub" section
- options on command line override config options
- removed tap and http files which are not used anymore
- changed directory structure to put source in src, to be coherent with SàT and Libervia
- changed options name, db* become db_*, secret become xmpp_pwd
- an exception is raised if jid or xmpp_pwd is are not configured
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Mar 2018 12:59:38 +0100 |
parents | sat_pubsub/pgsql_storage.py@618a92080812 |
children | 40e5edd7ea11 |
comparison
equal
deleted
inserted
replaced
368:618a92080812 | 369:dabee42494ac |
---|---|
1 #!/usr/bin/python | |
2 #-*- coding: utf-8 -*- | |
3 | |
4 # Copyright (c) 2012-2018 Jérôme Poisson | |
5 # Copyright (c) 2013-2016 Adrien Cossa | |
6 # Copyright (c) 2003-2011 Ralph Meijer | |
7 | |
8 | |
9 # This program is free software: you can redistribute it and/or modify | |
10 # it under the terms of the GNU Affero General Public License as published by | |
11 # the Free Software Foundation, either version 3 of the License, or | |
12 # (at your option) any later version. | |
13 | |
14 # This program is distributed in the hope that it will be useful, | |
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
17 # GNU Affero General Public License for more details. | |
18 | |
19 # You should have received a copy of the GNU Affero General Public License | |
20 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
21 # -- | |
22 | |
23 # This program is based on Idavoll (http://idavoll.ik.nu/), | |
24 # originaly written by Ralph Meijer (http://ralphm.net/blog/) | |
25 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original | |
26 # license. | |
27 | |
28 # -- | |
29 | |
30 # Here is a copy of the original license: | |
31 | |
32 # Copyright (c) 2003-2011 Ralph Meijer | |
33 | |
34 # Permission is hereby granted, free of charge, to any person obtaining | |
35 # a copy of this software and associated documentation files (the | |
36 # "Software"), to deal in the Software without restriction, including | |
37 # without limitation the rights to use, copy, modify, merge, publish, | |
38 # distribute, sublicense, and/or sell copies of the Software, and to | |
39 # permit persons to whom the Software is furnished to do so, subject to | |
40 # the following conditions: | |
41 | |
42 # The above copyright notice and this permission notice shall be | |
43 # included in all copies or substantial portions of the Software. | |
44 | |
45 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
46 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
47 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND | |
48 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE | |
49 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION | |
50 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION | |
51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
52 | |
53 | |
54 import copy, logging | |
55 | |
56 from zope.interface import implements | |
57 | |
58 from twisted.internet import reactor | |
59 from twisted.internet import defer | |
60 from twisted.words.protocols.jabber import jid | |
61 from twisted.python import log | |
62 | |
63 from wokkel import generic | |
64 from wokkel.pubsub import Subscription | |
65 | |
66 from sat_pubsub import error | |
67 from sat_pubsub import iidavoll | |
68 from sat_pubsub import const | |
69 from sat_pubsub import container | |
70 from sat_pubsub import exceptions | |
71 import uuid | |
72 import psycopg2 | |
73 import psycopg2.extensions | |
74 # we wants psycopg2 to return us unicode, not str | |
75 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) | |
76 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) | |
77 | |
78 # parseXml manage str, but we get unicode | |
79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) | |
80 ITEMS_SEQ_NAME = u'node_{node_id}_seq' | |
81 PEP_COL_NAME = 'pep' | |
82 CURRENT_VERSION = '4' | |
83 # retrieve the maximum integer item id + 1 | |
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" | |
85 | |
86 | |
87 def withPEP(query, values, pep, recipient): | |
88 """Helper method to facilitate PEP management | |
89 | |
90 @param query: SQL query basis | |
91 @param values: current values to replace in query | |
92 @param pep(bool): True if we are in PEP mode | |
93 @param recipient(jid.JID): jid of the recipient | |
94 @return: query + PEP AND check, | |
95 recipient's bare jid is added to value if needed | |
96 """ | |
97 if pep: | |
98 pep_check="AND {}=%s".format(PEP_COL_NAME) | |
99 values=list(values) + [recipient.userhost()] | |
100 else: | |
101 pep_check="AND {} IS NULL".format(PEP_COL_NAME) | |
102 return "{} {}".format(query, pep_check), values | |
103 | |
104 | |
105 class Storage: | |
106 | |
107 implements(iidavoll.IStorage) | |
108 | |
109 defaultConfig = { | |
110 'leaf': { | |
111 const.OPT_PERSIST_ITEMS: True, | |
112 const.OPT_DELIVER_PAYLOADS: True, | |
113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | |
114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | |
115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, | |
116 const.OPT_SERIAL_IDS: False, | |
117 }, | |
118 'collection': { | |
119 const.OPT_DELIVER_PAYLOADS: True, | |
120 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub', | |
121 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT, | |
122 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT, | |
123 } | |
124 } | |
125 | |
126 def __init__(self, dbpool): | |
127 self.dbpool = dbpool | |
128 d = self.dbpool.runQuery("SELECT value FROM metadata WHERE key='version'") | |
129 d.addCallbacks(self._checkVersion, self._versionEb) | |
130 | |
131 def _checkVersion(self, row): | |
132 version = row[0].value | |
133 if version != CURRENT_VERSION: | |
134 logging.error("Bad database schema version ({current}), please upgrade to {needed}".format( | |
135 current=version, needed=CURRENT_VERSION)) | |
136 reactor.stop() | |
137 | |
138 def _versionEb(self, failure): | |
139 logging.error("Can't check schema version: {reason}".format(reason=failure)) | |
140 reactor.stop() | |
141 | |
142 def _buildNode(self, row): | |
143 """Build a note class from database result row""" | |
144 configuration = {} | |
145 | |
146 if not row: | |
147 raise error.NodeNotFound() | |
148 | |
149 if row[2] == 'leaf': | |
150 configuration = { | |
151 'pubsub#persist_items': row[3], | |
152 'pubsub#deliver_payloads': row[4], | |
153 'pubsub#send_last_published_item': row[5], | |
154 const.OPT_ACCESS_MODEL:row[6], | |
155 const.OPT_PUBLISH_MODEL:row[7], | |
156 const.OPT_SERIAL_IDS:row[8], | |
157 } | |
158 schema = row[9] | |
159 if schema is not None: | |
160 schema = parseXml(schema) | |
161 node = LeafNode(row[0], row[1], configuration, schema) | |
162 node.dbpool = self.dbpool | |
163 return node | |
164 elif row[2] == 'collection': | |
165 configuration = { | |
166 'pubsub#deliver_payloads': row[4], | |
167 'pubsub#send_last_published_item': row[5], | |
168 const.OPT_ACCESS_MODEL: row[6], | |
169 const.OPT_PUBLISH_MODEL:row[7], | |
170 } | |
171 node = CollectionNode(row[0], row[1], configuration, None) | |
172 node.dbpool = self.dbpool | |
173 return node | |
174 else: | |
175 raise ValueError("Unknown node type !") | |
176 | |
177 def getNodeById(self, nodeDbId): | |
178 """Get node using database ID insted of pubsub identifier | |
179 | |
180 @param nodeDbId(unicode): database ID | |
181 """ | |
182 return self.dbpool.runInteraction(self._getNodeById, nodeDbId) | |
183 | |
184 def _getNodeById(self, cursor, nodeDbId): | |
185 cursor.execute("""SELECT node_id, | |
186 node, | |
187 node_type, | |
188 persist_items, | |
189 deliver_payloads, | |
190 send_last_published_item, | |
191 access_model, | |
192 publish_model, | |
193 serial_ids, | |
194 schema::text, | |
195 pep | |
196 FROM nodes | |
197 WHERE node_id=%s""", | |
198 (nodeDbId,)) | |
199 row = cursor.fetchone() | |
200 return self._buildNode(row) | |
201 | |
202 def getNode(self, nodeIdentifier, pep, recipient=None): | |
203 return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient) | |
204 | |
205 def _getNode(self, cursor, nodeIdentifier, pep, recipient): | |
206 cursor.execute(*withPEP("""SELECT node_id, | |
207 node, | |
208 node_type, | |
209 persist_items, | |
210 deliver_payloads, | |
211 send_last_published_item, | |
212 access_model, | |
213 publish_model, | |
214 serial_ids, | |
215 schema::text, | |
216 pep | |
217 FROM nodes | |
218 WHERE node=%s""", | |
219 (nodeIdentifier,), pep, recipient)) | |
220 row = cursor.fetchone() | |
221 return self._buildNode(row) | |
222 | |
223 def getNodeIds(self, pep, recipient, allowed_accesses=None): | |
224 """retrieve ids of existing nodes | |
225 | |
226 @param pep(bool): True if it's a PEP request | |
227 @param recipient(jid.JID, None): recipient of the PEP request | |
228 @param allowed_accesses(None, set): only nodes with access | |
229 in this set will be returned | |
230 None to return all nodes | |
231 @return (list[unicode]): ids of nodes | |
232 """ | |
233 if not pep: | |
234 query = "SELECT node from nodes WHERE pep is NULL" | |
235 values = [] | |
236 else: | |
237 query = "SELECT node from nodes WHERE pep=%s" | |
238 values = [recipient.userhost()] | |
239 | |
240 if allowed_accesses is not None: | |
241 query += "AND access_model IN %s" | |
242 values.append(tuple(allowed_accesses)) | |
243 | |
244 d = self.dbpool.runQuery(query, values) | |
245 d.addCallback(lambda results: [r[0] for r in results]) | |
246 return d | |
247 | |
248 def createNode(self, nodeIdentifier, owner, config, schema, pep, recipient=None): | |
249 return self.dbpool.runInteraction(self._createNode, nodeIdentifier, | |
250 owner, config, schema, pep, recipient) | |
251 | |
252 def _createNode(self, cursor, nodeIdentifier, owner, config, schema, pep, recipient): | |
253 if config['pubsub#node_type'] != 'leaf': | |
254 raise error.NoCollections() | |
255 | |
256 owner = owner.userhost() | |
257 | |
258 try: | |
259 cursor.execute("""INSERT INTO nodes | |
260 (node, | |
261 node_type, | |
262 persist_items, | |
263 deliver_payloads, | |
264 send_last_published_item, | |
265 access_model, | |
266 publish_model, | |
267 serial_ids, | |
268 schema, | |
269 pep) | |
270 VALUES | |
271 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s)""", | |
272 (nodeIdentifier, | |
273 config['pubsub#persist_items'], | |
274 config['pubsub#deliver_payloads'], | |
275 config['pubsub#send_last_published_item'], | |
276 config[const.OPT_ACCESS_MODEL], | |
277 config[const.OPT_PUBLISH_MODEL], | |
278 config[const.OPT_SERIAL_IDS], | |
279 schema, | |
280 recipient.userhost() if pep else None | |
281 ) | |
282 ) | |
283 except cursor._pool.dbapi.IntegrityError as e: | |
284 if e.pgcode == "23505": | |
285 # unique_violation | |
286 raise error.NodeExists() | |
287 else: | |
288 raise error.InvalidConfigurationOption() | |
289 | |
290 cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""", | |
291 (nodeIdentifier,), pep, recipient)); | |
292 node_id = cursor.fetchone()[0] | |
293 | |
294 cursor.execute("""SELECT 1 as bool from entities where jid=%s""", | |
295 (owner,)) | |
296 | |
297 if not cursor.fetchone(): | |
298 # XXX: we can NOT rely on the previous query! Commit is needed now because | |
299 # if the entry exists the next query will leave the database in a corrupted | |
300 # state: the solution is to rollback. I tried with other methods like | |
301 # "WHERE NOT EXISTS" but none of them worked, so the following solution | |
302 # looks like the sole - unless you have auto-commit on. More info | |
303 # about this issue: http://cssmay.com/question/tag/tag-psycopg2 | |
304 cursor.connection.commit() | |
305 try: | |
306 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | |
307 (owner,)) | |
308 except psycopg2.IntegrityError as e: | |
309 cursor.connection.rollback() | |
310 logging.warning("during node creation: %s" % e.message) | |
311 | |
312 cursor.execute("""INSERT INTO affiliations | |
313 (node_id, entity_id, affiliation) | |
314 SELECT %s, entity_id, 'owner' FROM | |
315 (SELECT entity_id FROM entities | |
316 WHERE jid=%s) as e""", | |
317 (node_id, owner)) | |
318 | |
319 if config[const.OPT_ACCESS_MODEL] == const.VAL_AMODEL_PUBLISHER_ROSTER: | |
320 if const.OPT_ROSTER_GROUPS_ALLOWED in config: | |
321 allowed_groups = config[const.OPT_ROSTER_GROUPS_ALLOWED] | |
322 else: | |
323 allowed_groups = [] | |
324 for group in allowed_groups: | |
325 #TODO: check that group are actually in roster | |
326 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname) | |
327 VALUES (%s,%s)""" , (node_id, group)) | |
328 # XXX: affiliations can't be set on during node creation (at least not with XEP-0060 alone) | |
329 # so whitelist affiliations need to be done afterward | |
330 | |
331 # no we may have to do extra things according to config options | |
332 default_conf = self.defaultConfig['leaf'] | |
333 # XXX: trigger works on node creation because OPT_SERIAL_IDS is False in defaultConfig | |
334 # if this value is changed, the _configurationTriggers method should be adapted. | |
335 Node._configurationTriggers(cursor, node_id, default_conf, config) | |
336 | |
337 def deleteNodeByDbId(self, db_id): | |
338 """Delete a node using directly its database id""" | |
339 return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id) | |
340 | |
341 def _deleteNodeByDbId(self, cursor, db_id): | |
342 cursor.execute("""DELETE FROM nodes WHERE node_id=%s""", | |
343 (db_id,)) | |
344 | |
345 if cursor.rowcount != 1: | |
346 raise error.NodeNotFound() | |
347 | |
348 def deleteNode(self, nodeIdentifier, pep, recipient=None): | |
349 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient) | |
350 | |
351 def _deleteNode(self, cursor, nodeIdentifier, pep, recipient): | |
352 cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""", | |
353 (nodeIdentifier,), pep, recipient)) | |
354 | |
355 if cursor.rowcount != 1: | |
356 raise error.NodeNotFound() | |
357 | |
358 def getAffiliations(self, entity, nodeIdentifier, pep, recipient=None): | |
359 return self.dbpool.runInteraction(self._getAffiliations, entity, nodeIdentifier, pep, recipient) | |
360 | |
361 def _getAffiliations(self, cursor, entity, nodeIdentifier, pep, recipient=None): | |
362 query = ["""SELECT node, affiliation FROM entities | |
363 NATURAL JOIN affiliations | |
364 NATURAL JOIN nodes | |
365 WHERE jid=%s"""] | |
366 args = [entity.userhost()] | |
367 | |
368 if nodeIdentifier is not None: | |
369 query.append("AND node=%s") | |
370 args.append(nodeIdentifier) | |
371 | |
372 cursor.execute(*withPEP(' '.join(query), args, pep, recipient)) | |
373 rows = cursor.fetchall() | |
374 return [tuple(r) for r in rows] | |
375 | |
376 def getSubscriptions(self, entity, nodeIdentifier=None, pep=False, recipient=None): | |
377 """retrieve subscriptions of an entity | |
378 | |
379 @param entity(jid.JID): entity to check | |
380 @param nodeIdentifier(unicode, None): node identifier | |
381 None to retrieve all subscriptions | |
382 @param pep: True if we are in PEP mode | |
383 @param recipient: jid of the recipient | |
384 """ | |
385 | |
386 def toSubscriptions(rows): | |
387 subscriptions = [] | |
388 for row in rows: | |
389 subscriber = jid.internJID('%s/%s' % (row.jid, | |
390 row.resource)) | |
391 subscription = Subscription(row.node, subscriber, row.state) | |
392 subscriptions.append(subscription) | |
393 return subscriptions | |
394 | |
395 query = ["""SELECT node, | |
396 jid, | |
397 resource, | |
398 state | |
399 FROM entities | |
400 NATURAL JOIN subscriptions | |
401 NATURAL JOIN nodes | |
402 WHERE jid=%s"""] | |
403 | |
404 args = [entity.userhost()] | |
405 | |
406 if nodeIdentifier is not None: | |
407 query.append("AND node=%s") | |
408 args.append(nodeIdentifier) | |
409 | |
410 d = self.dbpool.runQuery(*withPEP(' '.join(query), args, pep, recipient)) | |
411 d.addCallback(toSubscriptions) | |
412 return d | |
413 | |
414 def getDefaultConfiguration(self, nodeType): | |
415 return self.defaultConfig[nodeType].copy() | |
416 | |
417 def formatLastItems(self, result): | |
418 last_items = [] | |
419 for pep_jid_s, node, data, item_access_model in result: | |
420 pep_jid = jid.JID(pep_jid_s) | |
421 item = generic.stripNamespace(parseXml(data)) | |
422 last_items.append((pep_jid, node, item, item_access_model)) | |
423 return last_items | |
424 | |
425 def getLastItems(self, entities, nodes, node_accesses, item_accesses, pep): | |
426 """get last item for several nodes and entities in a single request""" | |
427 if not entities or not nodes or not node_accesses or not item_accesses: | |
428 raise ValueError("entities, nodes and accesses must not be empty") | |
429 if node_accesses != ('open',) or item_accesses != ('open',): | |
430 raise NotImplementedError('only "open" access model is handled for now') | |
431 if not pep: | |
432 raise NotImplementedError(u"getLastItems is only implemented for PEP at the moment") | |
433 d = self.dbpool.runQuery("""SELECT DISTINCT ON (node_id) pep, node, data::text, items.access_model | |
434 FROM items | |
435 NATURAL JOIN nodes | |
436 WHERE nodes.pep IN %s | |
437 AND node IN %s | |
438 AND nodes.access_model in %s | |
439 AND items.access_model in %s | |
440 ORDER BY node_id DESC, item_id DESC""", | |
441 (tuple([e.userhost() for e in entities]), | |
442 nodes, | |
443 node_accesses, | |
444 item_accesses)) | |
445 d.addCallback(self.formatLastItems) | |
446 return d | |
447 | |
448 | |
449 class Node: | |
450 | |
451 implements(iidavoll.INode) | |
452 | |
453 def __init__(self, nodeDbId, nodeIdentifier, config, schema): | |
454 self.nodeDbId = nodeDbId | |
455 self.nodeIdentifier = nodeIdentifier | |
456 self._config = config | |
457 self._schema = schema | |
458 | |
459 def _checkNodeExists(self, cursor): | |
460 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""", | |
461 (self.nodeDbId,)) | |
462 if not cursor.fetchone(): | |
463 raise error.NodeNotFound() | |
464 | |
465 def getType(self): | |
466 return self.nodeType | |
467 | |
468 def getOwners(self): | |
469 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s and affiliation='owner'""", (self.nodeDbId,)) | |
470 d.addCallback(lambda rows: [jid.JID(r[0]) for r in rows]) | |
471 return d | |
472 | |
473 def getConfiguration(self): | |
474 return self._config | |
475 | |
476 def getNextId(self): | |
477 """return XMPP item id usable for next item to publish | |
478 | |
479 the return value will be next int if serila_ids is set, | |
480 else an UUID will be returned | |
481 """ | |
482 if self._config[const.OPT_SERIAL_IDS]: | |
483 d = self.dbpool.runQuery("SELECT nextval('{seq_name}')".format( | |
484 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId))) | |
485 d.addCallback(lambda rows: unicode(rows[0][0])) | |
486 return d | |
487 else: | |
488 return defer.succeed(unicode(uuid.uuid4())) | |
489 | |
490 @staticmethod | |
491 def _configurationTriggers(cursor, node_id, old_config, new_config): | |
492 """trigger database relative actions needed when a config is changed | |
493 | |
494 @param cursor(): current db cursor | |
495 @param node_id(unicode): database ID of the node | |
496 @param old_config(dict): config of the node before the change | |
497 @param new_config(dict): new options that will be changed | |
498 """ | |
499 serial_ids = new_config[const.OPT_SERIAL_IDS] | |
500 if serial_ids != old_config[const.OPT_SERIAL_IDS]: | |
501 # serial_ids option has been modified, | |
502 # we need to handle corresponding sequence | |
503 | |
504 # XXX: we use .format in following queries because values | |
505 # are generated by ourself | |
506 seq_name = ITEMS_SEQ_NAME.format(node_id=node_id) | |
507 if serial_ids: | |
508 # the next query get the max value +1 of all XMPP items ids | |
509 # which are integers, and default to 1 | |
510 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=node_id)) | |
511 next_val = cursor.fetchone()[0] | |
512 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name)) | |
513 cursor.execute("CREATE SEQUENCE {seq_name} START {next_val} OWNED BY nodes.node_id".format( | |
514 seq_name = seq_name, | |
515 next_val = next_val)) | |
516 else: | |
517 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name)) | |
518 | |
519 def setConfiguration(self, options): | |
520 config = copy.copy(self._config) | |
521 | |
522 for option in options: | |
523 if option in config: | |
524 config[option] = options[option] | |
525 | |
526 d = self.dbpool.runInteraction(self._setConfiguration, config) | |
527 d.addCallback(self._setCachedConfiguration, config) | |
528 return d | |
529 | |
530 def _setConfiguration(self, cursor, config): | |
531 self._checkNodeExists(cursor) | |
532 self._configurationTriggers(cursor, self.nodeDbId, self._config, config) | |
533 cursor.execute("""UPDATE nodes SET persist_items=%s, | |
534 deliver_payloads=%s, | |
535 send_last_published_item=%s, | |
536 access_model=%s, | |
537 publish_model=%s, | |
538 serial_ids=%s | |
539 WHERE node_id=%s""", | |
540 (config[const.OPT_PERSIST_ITEMS], | |
541 config[const.OPT_DELIVER_PAYLOADS], | |
542 config[const.OPT_SEND_LAST_PUBLISHED_ITEM], | |
543 config[const.OPT_ACCESS_MODEL], | |
544 config[const.OPT_PUBLISH_MODEL], | |
545 config[const.OPT_SERIAL_IDS], | |
546 self.nodeDbId)) | |
547 | |
548 def _setCachedConfiguration(self, void, config): | |
549 self._config = config | |
550 | |
551 def getSchema(self): | |
552 return self._schema | |
553 | |
554 def setSchema(self, schema): | |
555 d = self.dbpool.runInteraction(self._setSchema, schema) | |
556 d.addCallback(self._setCachedSchema, schema) | |
557 return d | |
558 | |
559 def _setSchema(self, cursor, schema): | |
560 self._checkNodeExists(cursor) | |
561 cursor.execute("""UPDATE nodes SET schema=%s | |
562 WHERE node_id=%s""", | |
563 (schema.toXml() if schema else None, | |
564 self.nodeDbId)) | |
565 | |
566 def _setCachedSchema(self, void, schema): | |
567 self._schema = schema | |
568 | |
569 def getMetaData(self): | |
570 config = copy.copy(self._config) | |
571 config["pubsub#node_type"] = self.nodeType | |
572 return config | |
573 | |
574 def getAffiliation(self, entity): | |
575 return self.dbpool.runInteraction(self._getAffiliation, entity) | |
576 | |
577 def _getAffiliation(self, cursor, entity): | |
578 self._checkNodeExists(cursor) | |
579 cursor.execute("""SELECT affiliation FROM affiliations | |
580 NATURAL JOIN nodes | |
581 NATURAL JOIN entities | |
582 WHERE node_id=%s AND jid=%s""", | |
583 (self.nodeDbId, | |
584 entity.userhost())) | |
585 | |
586 try: | |
587 return cursor.fetchone()[0] | |
588 except TypeError: | |
589 return None | |
590 | |
591 def getAccessModel(self): | |
592 return self._config[const.OPT_ACCESS_MODEL] | |
593 | |
594 def getSubscription(self, subscriber): | |
595 return self.dbpool.runInteraction(self._getSubscription, subscriber) | |
596 | |
597 def _getSubscription(self, cursor, subscriber): | |
598 self._checkNodeExists(cursor) | |
599 | |
600 userhost = subscriber.userhost() | |
601 resource = subscriber.resource or '' | |
602 | |
603 cursor.execute("""SELECT state FROM subscriptions | |
604 NATURAL JOIN nodes | |
605 NATURAL JOIN entities | |
606 WHERE node_id=%s AND jid=%s AND resource=%s""", | |
607 (self.nodeDbId, | |
608 userhost, | |
609 resource)) | |
610 | |
611 row = cursor.fetchone() | |
612 if not row: | |
613 return None | |
614 else: | |
615 return Subscription(self.nodeIdentifier, subscriber, row[0]) | |
616 | |
617 def getSubscriptions(self, state=None): | |
618 return self.dbpool.runInteraction(self._getSubscriptions, state) | |
619 | |
620 def _getSubscriptions(self, cursor, state): | |
621 self._checkNodeExists(cursor) | |
622 | |
623 query = """SELECT node, jid, resource, state, | |
624 subscription_type, subscription_depth | |
625 FROM subscriptions | |
626 NATURAL JOIN nodes | |
627 NATURAL JOIN entities | |
628 WHERE node_id=%s""" | |
629 values = [self.nodeDbId] | |
630 | |
631 if state: | |
632 query += " AND state=%s" | |
633 values.append(state) | |
634 | |
635 cursor.execute(query, values) | |
636 rows = cursor.fetchall() | |
637 | |
638 subscriptions = [] | |
639 for row in rows: | |
640 subscriber = jid.JID(u'%s/%s' % (row.jid, row.resource)) | |
641 | |
642 options = {} | |
643 if row.subscription_type: | |
644 options['pubsub#subscription_type'] = row.subscription_type; | |
645 if row.subscription_depth: | |
646 options['pubsub#subscription_depth'] = row.subscription_depth; | |
647 | |
648 subscriptions.append(Subscription(row.node, subscriber, | |
649 row.state, options)) | |
650 | |
651 return subscriptions | |
652 | |
653 def addSubscription(self, subscriber, state, config): | |
654 return self.dbpool.runInteraction(self._addSubscription, subscriber, | |
655 state, config) | |
656 | |
657 def _addSubscription(self, cursor, subscriber, state, config): | |
658 self._checkNodeExists(cursor) | |
659 | |
660 userhost = subscriber.userhost() | |
661 resource = subscriber.resource or '' | |
662 | |
663 subscription_type = config.get('pubsub#subscription_type') | |
664 subscription_depth = config.get('pubsub#subscription_depth') | |
665 | |
666 try: | |
667 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", | |
668 (userhost,)) | |
669 except cursor._pool.dbapi.IntegrityError: | |
670 cursor.connection.rollback() | |
671 | |
672 try: | |
673 cursor.execute("""INSERT INTO subscriptions | |
674 (node_id, entity_id, resource, state, | |
675 subscription_type, subscription_depth) | |
676 SELECT %s, entity_id, %s, %s, %s, %s FROM | |
677 (SELECT entity_id FROM entities | |
678 WHERE jid=%s) AS ent_id""", | |
679 (self.nodeDbId, | |
680 resource, | |
681 state, | |
682 subscription_type, | |
683 subscription_depth, | |
684 userhost)) | |
685 except cursor._pool.dbapi.IntegrityError: | |
686 raise error.SubscriptionExists() | |
687 | |
688 def removeSubscription(self, subscriber): | |
689 return self.dbpool.runInteraction(self._removeSubscription, | |
690 subscriber) | |
691 | |
692 def _removeSubscription(self, cursor, subscriber): | |
693 self._checkNodeExists(cursor) | |
694 | |
695 userhost = subscriber.userhost() | |
696 resource = subscriber.resource or '' | |
697 | |
698 cursor.execute("""DELETE FROM subscriptions WHERE | |
699 node_id=%s AND | |
700 entity_id=(SELECT entity_id FROM entities | |
701 WHERE jid=%s) AND | |
702 resource=%s""", | |
703 (self.nodeDbId, | |
704 userhost, | |
705 resource)) | |
706 if cursor.rowcount != 1: | |
707 raise error.NotSubscribed() | |
708 | |
709 return None | |
710 | |
711 def setSubscriptions(self, subscriptions): | |
712 return self.dbpool.runInteraction(self._setSubscriptions, subscriptions) | |
713 | |
714 def _setSubscriptions(self, cursor, subscriptions): | |
715 self._checkNodeExists(cursor) | |
716 | |
717 entities = self.getOrCreateEntities(cursor, [s.subscriber for s in subscriptions]) | |
718 entities_map = {jid.JID(e.jid): e for e in entities} | |
719 | |
720 # then we construct values for subscriptions update according to entity_id we just got | |
721 placeholders = ','.join(len(subscriptions) * ["%s"]) | |
722 values = [] | |
723 for subscription in subscriptions: | |
724 entity_id = entities_map[subscription.subscriber].entity_id | |
725 resource = subscription.subscriber.resource or u'' | |
726 values.append((self.nodeDbId, entity_id, resource, subscription.state, None, None)) | |
727 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 | |
728 cursor.execute("INSERT INTO subscriptions(node_id, entity_id, resource, state, subscription_type, subscription_depth) VALUES " + placeholders + " ON CONFLICT (entity_id, resource, node_id) DO UPDATE SET state=EXCLUDED.state", [v for v in values]) | |
729 | |
730 def isSubscribed(self, entity): | |
731 return self.dbpool.runInteraction(self._isSubscribed, entity) | |
732 | |
733 def _isSubscribed(self, cursor, entity): | |
734 self._checkNodeExists(cursor) | |
735 | |
736 cursor.execute("""SELECT 1 as bool FROM entities | |
737 NATURAL JOIN subscriptions | |
738 NATURAL JOIN nodes | |
739 WHERE entities.jid=%s | |
740 AND node_id=%s AND state='subscribed'""", | |
741 (entity.userhost(), | |
742 self.nodeDbId)) | |
743 | |
744 return cursor.fetchone() is not None | |
745 | |
746 def getAffiliations(self): | |
747 return self.dbpool.runInteraction(self._getAffiliations) | |
748 | |
749 def _getAffiliations(self, cursor): | |
750 self._checkNodeExists(cursor) | |
751 | |
752 cursor.execute("""SELECT jid, affiliation FROM nodes | |
753 NATURAL JOIN affiliations | |
754 NATURAL JOIN entities | |
755 WHERE node_id=%s""", | |
756 (self.nodeDbId,)) | |
757 result = cursor.fetchall() | |
758 | |
759 return {jid.internJID(r[0]): r[1] for r in result} | |
760 | |
761 def getOrCreateEntities(self, cursor, entities_jids): | |
762 """Get entity_id from entities in entities table | |
763 | |
764 Entities will be inserted it they don't exist | |
765 @param entities_jid(list[jid.JID]): entities to get or create | |
766 @return list[record(entity_id,jid)]]: list of entity_id and jid (as plain string) | |
767 both existing and inserted entities are returned | |
768 """ | |
769 # cf. http://stackoverflow.com/a/35265559 | |
770 placeholders = ','.join(len(entities_jids) * ["(%s)"]) | |
771 query = ( | |
772 """ | |
773 WITH | |
774 jid_values (jid) AS ( | |
775 VALUES {placeholders} | |
776 ), | |
777 inserted (entity_id, jid) AS ( | |
778 INSERT INTO entities (jid) | |
779 SELECT jid | |
780 FROM jid_values | |
781 ON CONFLICT DO NOTHING | |
782 RETURNING entity_id, jid | |
783 ) | |
784 SELECT e.entity_id, e.jid | |
785 FROM entities e JOIN jid_values jv ON jv.jid = e.jid | |
786 UNION ALL | |
787 SELECT entity_id, jid | |
788 FROM inserted""".format(placeholders=placeholders)) | |
789 cursor.execute(query, [j.userhost() for j in entities_jids]) | |
790 return cursor.fetchall() | |
791 | |
792 def setAffiliations(self, affiliations): | |
793 return self.dbpool.runInteraction(self._setAffiliations, affiliations) | |
794 | |
795 def _setAffiliations(self, cursor, affiliations): | |
796 self._checkNodeExists(cursor) | |
797 | |
798 entities = self.getOrCreateEntities(cursor, affiliations) | |
799 | |
800 # then we construct values for affiliations update according to entity_id we just got | |
801 placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"]) | |
802 values = [] | |
803 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities)) | |
804 | |
805 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 | |
806 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) | |
807 | |
808 def deleteAffiliations(self, entities): | |
809 return self.dbpool.runInteraction(self._deleteAffiliations, entities) | |
810 | |
811 def _deleteAffiliations(self, cursor, entities): | |
812 """delete affiliations and subscriptions for this entity""" | |
813 self._checkNodeExists(cursor) | |
814 placeholders = ','.join(len(entities) * ["%s"]) | |
815 cursor.execute("DELETE FROM affiliations WHERE node_id=%s AND entity_id in (SELECT entity_id FROM entities WHERE jid IN (" + placeholders + ")) RETURNING entity_id", [self.nodeDbId] + [e.userhost() for e in entities]) | |
816 | |
817 rows = cursor.fetchall() | |
818 placeholders = ','.join(len(rows) * ["%s"]) | |
819 cursor.execute("DELETE FROM subscriptions WHERE node_id=%s AND entity_id in (" + placeholders + ")", [self.nodeDbId] + [r[0] for r in rows]) | |
820 | |
821 def getAuthorizedGroups(self): | |
822 return self.dbpool.runInteraction(self._getNodeGroups) | |
823 | |
824 def _getAuthorizedGroups(self, cursor): | |
825 cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", | |
826 (self.nodeDbId,)) | |
827 rows = cursor.fetchall() | |
828 return [row[0] for row in rows] | |
829 | |
830 | |
831 class LeafNode(Node): | |
832 | |
833 implements(iidavoll.ILeafNode) | |
834 | |
835 nodeType = 'leaf' | |
836 | |
837 def storeItems(self, item_data, publisher): | |
838 return self.dbpool.runInteraction(self._storeItems, item_data, publisher) | |
839 | |
840 def _storeItems(self, cursor, items_data, publisher): | |
841 self._checkNodeExists(cursor) | |
842 for item_data in items_data: | |
843 self._storeItem(cursor, item_data, publisher) | |
844 | |
845 def _storeItem(self, cursor, item_data, publisher): | |
846 # first try to insert the item | |
847 # - if it fails (conflict), and the item is new and we have serial_ids options, | |
848 # current id will be recomputed using next item id query (note that is not perfect, as | |
849 # table is not locked and this can fail if two items are added at the same time | |
850 # but this can only happen with serial_ids and if future ids have been set by a client, | |
851 # this case should be rare enough to consider this situation acceptable) | |
852 # - if item insertion fail and the item is not new, we do an update | |
853 # - in other cases, exception is raised | |
854 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config | |
855 data = item.toXml() | |
856 | |
857 insert_query = """INSERT INTO items (node_id, item, publisher, data, access_model) | |
858 SELECT %s, %s, %s, %s, %s FROM nodes | |
859 WHERE node_id=%s | |
860 RETURNING item_id""" | |
861 insert_data = [self.nodeDbId, | |
862 item["id"], | |
863 publisher.full(), | |
864 data, | |
865 access_model, | |
866 self.nodeDbId] | |
867 | |
868 try: | |
869 cursor.execute(insert_query, insert_data) | |
870 except cursor._pool.dbapi.IntegrityError as e: | |
871 if e.pgcode != "23505": | |
872 # we only handle unique_violation, every other exception must be raised | |
873 raise e | |
874 cursor.connection.rollback() | |
875 # the item already exist | |
876 if item_data.new: | |
877 # the item is new | |
878 if self._config[const.OPT_SERIAL_IDS]: | |
879 # this can happen with serial_ids, if a item has been stored | |
880 # with a future id (generated by XMPP client) | |
881 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=self.nodeDbId)) | |
882 next_id = cursor.fetchone()[0] | |
883 # we update the sequence, so we can skip conflicting ids | |
884 cursor.execute(u"SELECT setval('{seq_name}', %s)".format( | |
885 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)), [next_id]) | |
886 # and now we can retry the query with the new id | |
887 item['id'] = insert_data[1] = unicode(next_id) | |
888 # item saved in DB must also be updated with the new id | |
889 insert_data[3] = item.toXml() | |
890 cursor.execute(insert_query, insert_data) | |
891 else: | |
892 # but if we have not serial_ids, we have a real problem | |
893 raise e | |
894 else: | |
895 # this is an update | |
896 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s | |
897 FROM nodes | |
898 WHERE nodes.node_id = items.node_id AND | |
899 nodes.node_id = %s and items.item=%s | |
900 RETURNING item_id""", | |
901 (publisher.full(), | |
902 data, | |
903 self.nodeDbId, | |
904 item["id"])) | |
905 if cursor.rowcount != 1: | |
906 raise exceptions.InternalError("item has not been updated correctly") | |
907 item_id = cursor.fetchone()[0]; | |
908 self._storeCategories(cursor, item_id, item_data.categories, update=True) | |
909 return | |
910 | |
911 item_id = cursor.fetchone()[0]; | |
912 self._storeCategories(cursor, item_id, item_data.categories) | |
913 | |
914 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | |
915 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config: | |
916 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value | |
917 allowed_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED] | |
918 else: | |
919 allowed_groups = [] | |
920 for group in allowed_groups: | |
921 #TODO: check that group are actually in roster | |
922 cursor.execute("""INSERT INTO item_groups_authorized (item_id, groupname) | |
923 VALUES (%s,%s)""" , (item_id, group)) | |
924 # TODO: whitelist access model | |
925 | |
926 def _storeCategories(self, cursor, item_id, categories, update=False): | |
927 # TODO: handle canonical form | |
928 if update: | |
929 cursor.execute("""DELETE FROM item_categories | |
930 WHERE item_id=%s""", (item_id,)) | |
931 | |
932 for category in categories: | |
933 cursor.execute("""INSERT INTO item_categories (item_id, category) | |
934 VALUES (%s, %s)""", (item_id, category)) | |
935 | |
936 def removeItems(self, itemIdentifiers): | |
937 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers) | |
938 | |
939 def _removeItems(self, cursor, itemIdentifiers): | |
940 self._checkNodeExists(cursor) | |
941 | |
942 deleted = [] | |
943 | |
944 for itemIdentifier in itemIdentifiers: | |
945 cursor.execute("""DELETE FROM items WHERE | |
946 node_id=%s AND | |
947 item=%s""", | |
948 (self.nodeDbId, | |
949 itemIdentifier)) | |
950 | |
951 if cursor.rowcount: | |
952 deleted.append(itemIdentifier) | |
953 | |
954 return deleted | |
955 | |
956 def getItems(self, authorized_groups, unrestricted, maxItems=None, ext_data=None): | |
957 """ Get all authorised items | |
958 | |
959 @param authorized_groups: we want to get items that these groups can access | |
960 @param unrestricted: if true, don't check permissions (i.e.: get all items) | |
961 @param maxItems: nb of items we want to get | |
962 @param ext_data: options for extra features like RSM and MAM | |
963 | |
964 @return: list of container.ItemData | |
965 if unrestricted is False, access_model and config will be None | |
966 """ | |
967 if ext_data is None: | |
968 ext_data = {} | |
969 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data, ids_only=False) | |
970 | |
971 def getItemsIds(self, authorized_groups, unrestricted, maxItems=None, ext_data=None): | |
972 """ Get all authorised items ids | |
973 | |
974 @param authorized_groups: we want to get items that these groups can access | |
975 @param unrestricted: if true, don't check permissions (i.e.: get all items) | |
976 @param maxItems: nb of items we want to get | |
977 @param ext_data: options for extra features like RSM and MAM | |
978 | |
979 @return list(unicode): list of ids | |
980 """ | |
981 if ext_data is None: | |
982 ext_data = {} | |
983 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data, ids_only=True) | |
984 | |
985 def _appendSourcesAndFilters(self, query, args, authorized_groups, unrestricted, ext_data): | |
986 """append sources and filters to sql query requesting items and return ORDER BY | |
987 | |
988 arguments query, args, authorized_groups, unrestricted and ext_data are the same as for | |
989 _getItems | |
990 """ | |
991 # SOURCES | |
992 query.append("FROM nodes INNER JOIN items USING (node_id)") | |
993 | |
994 if unrestricted: | |
995 query_filters = ["WHERE node_id=%s"] | |
996 args.append(self.nodeDbId) | |
997 else: | |
998 query.append("LEFT JOIN item_groups_authorized USING (item_id)") | |
999 args.append(self.nodeDbId) | |
1000 if authorized_groups: | |
1001 get_groups = " or (items.access_model='roster' and groupname in %s)" | |
1002 args.append(authorized_groups) | |
1003 else: | |
1004 get_groups = "" | |
1005 | |
1006 query_filters = ["WHERE node_id=%s AND (items.access_model='open'" + get_groups + ")"] | |
1007 | |
1008 # FILTERS | |
1009 if 'filters' in ext_data: # MAM filters | |
1010 for filter_ in ext_data['filters']: | |
1011 if filter_.var == 'start': | |
1012 query_filters.append("AND created>=%s") | |
1013 args.append(filter_.value) | |
1014 elif filter_.var == 'end': | |
1015 query_filters.append("AND created<=%s") | |
1016 args.append(filter_.value) | |
1017 elif filter_.var == 'with': | |
1018 jid_s = filter_.value | |
1019 if '/' in jid_s: | |
1020 query_filters.append("AND publisher=%s") | |
1021 args.append(filter_.value) | |
1022 else: | |
1023 query_filters.append("AND publisher LIKE %s") | |
1024 args.append(u"{}%".format(filter_.value)) | |
1025 elif filter_.var == const.MAM_FILTER_CATEGORY: | |
1026 query.append("LEFT JOIN item_categories USING (item_id)") | |
1027 query_filters.append("AND category=%s") | |
1028 args.append(filter_.value) | |
1029 else: | |
1030 log.msg("WARNING: unknown filter: {}".format(filter_.encode('utf-8'))) | |
1031 | |
1032 query.extend(query_filters) | |
1033 | |
1034 return "ORDER BY item_id DESC" | |
1035 | |
1036 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data, ids_only): | |
1037 self._checkNodeExists(cursor) | |
1038 | |
1039 if maxItems == 0: | |
1040 return [] | |
1041 | |
1042 args = [] | |
1043 | |
1044 # SELECT | |
1045 if ids_only: | |
1046 query = ["SELECT item"] | |
1047 else: | |
1048 query = ["SELECT data::text,items.access_model,item_id,created,updated"] | |
1049 | |
1050 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) | |
1051 | |
1052 if 'rsm' in ext_data: | |
1053 rsm = ext_data['rsm'] | |
1054 maxItems = rsm.max | |
1055 if rsm.index is not None: | |
1056 # We need to know the item_id of corresponding to the index (offset) of the current query | |
1057 # so we execute the query to look for the item_id | |
1058 tmp_query = query[:] | |
1059 tmp_args = args[:] | |
1060 tmp_query[0] = "SELECT item_id" | |
1061 tmp_query.append("{} LIMIT 1 OFFSET %s".format(query_order)) | |
1062 tmp_args.append(rsm.index) | |
1063 cursor.execute(' '.join(query), args) | |
1064 # FIXME: bad index is not managed yet | |
1065 item_id = cursor.fetchall()[0][0] | |
1066 | |
1067 # now that we have the id, we can use it | |
1068 query.append("AND item_id<=%s") | |
1069 args.append(item_id) | |
1070 elif rsm.before is not None: | |
1071 if rsm.before != '': | |
1072 query.append("AND item_id>(SELECT item_id FROM items WHERE item=%s LIMIT 1)") | |
1073 args.append(rsm.before) | |
1074 if maxItems is not None: | |
1075 # if we have maxItems (i.e. a limit), we need to reverse order | |
1076 # in a first query to get the right items | |
1077 query.insert(0,"SELECT * from (") | |
1078 query.append("ORDER BY item_id ASC LIMIT %s) as x") | |
1079 args.append(maxItems) | |
1080 elif rsm.after: | |
1081 query.append("AND item_id<(SELECT item_id FROM items WHERE item=%s LIMIT 1)") | |
1082 args.append(rsm.after) | |
1083 | |
1084 query.append(query_order) | |
1085 | |
1086 if maxItems is not None: | |
1087 query.append("LIMIT %s") | |
1088 args.append(maxItems) | |
1089 | |
1090 cursor.execute(' '.join(query), args) | |
1091 | |
1092 result = cursor.fetchall() | |
1093 if unrestricted and not ids_only: | |
1094 # with unrestricted query, we need to fill the access_list for a roster access items | |
1095 ret = [] | |
1096 for item_data in result: | |
1097 item = generic.stripNamespace(parseXml(item_data.data)) | |
1098 access_model = item_data.access_model | |
1099 item_id = item_data.item_id | |
1100 created = item_data.created | |
1101 updated = item_data.updated | |
1102 access_list = {} | |
1103 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | |
1104 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | |
1105 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r.groupname for r in cursor.fetchall()] | |
1106 | |
1107 ret.append(container.ItemData(item, access_model, access_list, created=created, updated=updated)) | |
1108 # TODO: whitelist item access model | |
1109 return ret | |
1110 | |
1111 if ids_only: | |
1112 return [r.item for r in result] | |
1113 else: | |
1114 items_data = [container.ItemData(generic.stripNamespace(parseXml(r.data)), r.access_model, created=r.created, updated=r.updated) for r in result] | |
1115 return items_data | |
1116 | |
1117 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers): | |
1118 """Get items which are in the given list | |
1119 | |
1120 @param authorized_groups: we want to get items that these groups can access | |
1121 @param unrestricted: if true, don't check permissions | |
1122 @param itemIdentifiers: list of ids of the items we want to get | |
1123 @return: list of container.ItemData | |
1124 ItemData.config will contains access_list (managed as a dictionnary with same key as for item_config) | |
1125 if unrestricted is False, access_model and config will be None | |
1126 """ | |
1127 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers) | |
1128 | |
1129 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers): | |
1130 self._checkNodeExists(cursor) | |
1131 ret = [] | |
1132 if unrestricted: #we get everything without checking permissions | |
1133 for itemIdentifier in itemIdentifiers: | |
1134 cursor.execute("""SELECT data::text,items.access_model,item_id,created,updated FROM nodes | |
1135 INNER JOIN items USING (node_id) | |
1136 WHERE node_id=%s AND item=%s""", | |
1137 (self.nodeDbId, | |
1138 itemIdentifier)) | |
1139 result = cursor.fetchone() | |
1140 if not result: | |
1141 raise error.ItemNotFound() | |
1142 | |
1143 item = generic.stripNamespace(parseXml(result[0])) | |
1144 access_model = result[1] | |
1145 item_id = result[2] | |
1146 created= result[3] | |
1147 updated= result[4] | |
1148 access_list = {} | |
1149 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER: | |
1150 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,)) | |
1151 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()] | |
1152 #TODO: WHITELIST access_model | |
1153 | |
1154 ret.append(container.ItemData(item, access_model, access_list, created=created, updated=updated)) | |
1155 else: #we check permission before returning items | |
1156 for itemIdentifier in itemIdentifiers: | |
1157 args = [self.nodeDbId, itemIdentifier] | |
1158 if authorized_groups: | |
1159 args.append(authorized_groups) | |
1160 cursor.execute("""SELECT data::text, created, updated FROM nodes | |
1161 INNER JOIN items USING (node_id) | |
1162 LEFT JOIN item_groups_authorized USING (item_id) | |
1163 WHERE node_id=%s AND item=%s AND | |
1164 (items.access_model='open' """ + | |
1165 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")", | |
1166 args) | |
1167 | |
1168 result = cursor.fetchone() | |
1169 if result: | |
1170 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), created=result[1], updated=result[2])) | |
1171 | |
1172 return ret | |
1173 | |
1174 def getItemsCount(self, authorized_groups, unrestricted, ext_data=None): | |
1175 """Count expected number of items in a getItems query | |
1176 | |
1177 @param authorized_groups: we want to get items that these groups can access | |
1178 @param unrestricted: if true, don't check permissions (i.e.: get all items) | |
1179 @param ext_data: options for extra features like RSM and MAM | |
1180 """ | |
1181 if ext_data is None: | |
1182 ext_data = {} | |
1183 return self.dbpool.runInteraction(self._getItemsCount, authorized_groups, unrestricted, ext_data) | |
1184 | |
1185 def _getItemsCount(self, cursor, authorized_groups, unrestricted, ext_data): | |
1186 self._checkNodeExists(cursor) | |
1187 args = [] | |
1188 | |
1189 # SELECT | |
1190 query = ["SELECT count(1)"] | |
1191 | |
1192 self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) | |
1193 | |
1194 cursor.execute(' '.join(query), args) | |
1195 return cursor.fetchall()[0][0] | |
1196 | |
1197 def getItemsIndex(self, item_id, authorized_groups, unrestricted, ext_data=None): | |
1198 """Get expected index of first item in the window of a getItems query | |
1199 | |
1200 @param item_id: id of the item | |
1201 @param authorized_groups: we want to get items that these groups can access | |
1202 @param unrestricted: if true, don't check permissions (i.e.: get all items) | |
1203 @param ext_data: options for extra features like RSM and MAM | |
1204 """ | |
1205 if ext_data is None: | |
1206 ext_data = {} | |
1207 return self.dbpool.runInteraction(self._getItemsIndex, item_id, authorized_groups, unrestricted, ext_data) | |
1208 | |
1209 def _getItemsIndex(self, cursor, item_id, authorized_groups, unrestricted, ext_data): | |
1210 self._checkNodeExists(cursor) | |
1211 args = [] | |
1212 | |
1213 # SELECT | |
1214 query = [] | |
1215 | |
1216 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data) | |
1217 | |
1218 query_select = "SELECT row_number from (SELECT row_number() OVER ({}), item".format(query_order) | |
1219 query.insert(0, query_select) | |
1220 query.append(") as x WHERE item=%s") | |
1221 args.append(item_id) | |
1222 | |
1223 cursor.execute(' '.join(query), args) | |
1224 # XXX: row_number start at 1, but we want that index start at 0 | |
1225 try: | |
1226 return cursor.fetchall()[0][0] - 1 | |
1227 except IndexError: | |
1228 raise error.NodeNotFound() | |
1229 | |
1230 def getItemsPublishers(self, itemIdentifiers): | |
1231 """Get the publishers for all given identifiers | |
1232 | |
1233 @return (dict[unicode, jid.JID]): map of itemIdentifiers to publisher | |
1234 if item is not found, key is skipped in resulting dict | |
1235 """ | |
1236 return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers) | |
1237 | |
1238 def _getItemsPublishers(self, cursor, itemIdentifiers): | |
1239 self._checkNodeExists(cursor) | |
1240 ret = {} | |
1241 for itemIdentifier in itemIdentifiers: | |
1242 cursor.execute("""SELECT publisher FROM items | |
1243 WHERE item=%s""", | |
1244 (itemIdentifier,)) | |
1245 result = cursor.fetchone() | |
1246 if result: | |
1247 ret[itemIdentifier] = jid.JID(result[0]) | |
1248 return ret | |
1249 | |
1250 def purge(self): | |
1251 return self.dbpool.runInteraction(self._purge) | |
1252 | |
1253 def _purge(self, cursor): | |
1254 self._checkNodeExists(cursor) | |
1255 | |
1256 cursor.execute("""DELETE FROM items WHERE | |
1257 node_id=%s""", | |
1258 (self.nodeDbId,)) | |
1259 | |
1260 | |
1261 class CollectionNode(Node): | |
1262 | |
1263 nodeType = 'collection' | |
1264 | |
1265 | |
1266 | |
1267 class GatewayStorage(object): | |
1268 """ | |
1269 Memory based storage facility for the XMPP-HTTP gateway. | |
1270 """ | |
1271 | |
1272 def __init__(self, dbpool): | |
1273 self.dbpool = dbpool | |
1274 | |
1275 def _countCallbacks(self, cursor, service, nodeIdentifier): | |
1276 """ | |
1277 Count number of callbacks registered for a node. | |
1278 """ | |
1279 cursor.execute("""SELECT count(*) FROM callbacks | |
1280 WHERE service=%s and node=%s""", | |
1281 (service.full(), | |
1282 nodeIdentifier)) | |
1283 results = cursor.fetchall() | |
1284 return results[0][0] | |
1285 | |
1286 def addCallback(self, service, nodeIdentifier, callback): | |
1287 def interaction(cursor): | |
1288 cursor.execute("""SELECT 1 as bool FROM callbacks | |
1289 WHERE service=%s and node=%s and uri=%s""", | |
1290 (service.full(), | |
1291 nodeIdentifier, | |
1292 callback)) | |
1293 if cursor.fetchall(): | |
1294 return | |
1295 | |
1296 cursor.execute("""INSERT INTO callbacks | |
1297 (service, node, uri) VALUES | |
1298 (%s, %s, %s)""", | |
1299 (service.full(), | |
1300 nodeIdentifier, | |
1301 callback)) | |
1302 | |
1303 return self.dbpool.runInteraction(interaction) | |
1304 | |
1305 def removeCallback(self, service, nodeIdentifier, callback): | |
1306 def interaction(cursor): | |
1307 cursor.execute("""DELETE FROM callbacks | |
1308 WHERE service=%s and node=%s and uri=%s""", | |
1309 (service.full(), | |
1310 nodeIdentifier, | |
1311 callback)) | |
1312 | |
1313 if cursor.rowcount != 1: | |
1314 raise error.NotSubscribed() | |
1315 | |
1316 last = not self._countCallbacks(cursor, service, nodeIdentifier) | |
1317 return last | |
1318 | |
1319 return self.dbpool.runInteraction(interaction) | |
1320 | |
1321 def getCallbacks(self, service, nodeIdentifier): | |
1322 def interaction(cursor): | |
1323 cursor.execute("""SELECT uri FROM callbacks | |
1324 WHERE service=%s and node=%s""", | |
1325 (service.full(), | |
1326 nodeIdentifier)) | |
1327 results = cursor.fetchall() | |
1328 | |
1329 if not results: | |
1330 raise error.NoCallbacks() | |
1331 | |
1332 return [result[0] for result in results] | |
1333 | |
1334 return self.dbpool.runInteraction(interaction) | |
1335 | |
1336 def hasCallbacks(self, service, nodeIdentifier): | |
1337 def interaction(cursor): | |
1338 return bool(self._countCallbacks(cursor, service, nodeIdentifier)) | |
1339 | |
1340 return self.dbpool.runInteraction(interaction) |