comparison src/pgsql_storage.py @ 369:dabee42494ac

config file + cleaning: - SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir). Its options must be in the "pubsub" section - options on command line override config options - removed tap and http files which are not used anymore - changed directory structure to put source in src, to be coherent with SàT and Libervia - changed options name, db* become db_*, secret become xmpp_pwd - an exception is raised if jid or xmpp_pwd is are not configured
author Goffi <goffi@goffi.org>
date Fri, 02 Mar 2018 12:59:38 +0100
parents sat_pubsub/pgsql_storage.py@618a92080812
children 40e5edd7ea11
comparison
equal deleted inserted replaced
368:618a92080812 369:dabee42494ac
1 #!/usr/bin/python
2 #-*- coding: utf-8 -*-
3
4 # Copyright (c) 2012-2018 Jérôme Poisson
5 # Copyright (c) 2013-2016 Adrien Cossa
6 # Copyright (c) 2003-2011 Ralph Meijer
7
8
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU Affero General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU Affero General Public License for more details.
18
19 # You should have received a copy of the GNU Affero General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 # --
22
23 # This program is based on Idavoll (http://idavoll.ik.nu/),
24 # originaly written by Ralph Meijer (http://ralphm.net/blog/)
25 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original
26 # license.
27
28 # --
29
30 # Here is a copy of the original license:
31
32 # Copyright (c) 2003-2011 Ralph Meijer
33
34 # Permission is hereby granted, free of charge, to any person obtaining
35 # a copy of this software and associated documentation files (the
36 # "Software"), to deal in the Software without restriction, including
37 # without limitation the rights to use, copy, modify, merge, publish,
38 # distribute, sublicense, and/or sell copies of the Software, and to
39 # permit persons to whom the Software is furnished to do so, subject to
40 # the following conditions:
41
42 # The above copyright notice and this permission notice shall be
43 # included in all copies or substantial portions of the Software.
44
45 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
46 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
47 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
48 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
49 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
50 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
52
53
54 import copy, logging
55
56 from zope.interface import implements
57
58 from twisted.internet import reactor
59 from twisted.internet import defer
60 from twisted.words.protocols.jabber import jid
61 from twisted.python import log
62
63 from wokkel import generic
64 from wokkel.pubsub import Subscription
65
66 from sat_pubsub import error
67 from sat_pubsub import iidavoll
68 from sat_pubsub import const
69 from sat_pubsub import container
70 from sat_pubsub import exceptions
71 import uuid
72 import psycopg2
73 import psycopg2.extensions
74 # we wants psycopg2 to return us unicode, not str
75 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
76 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
77
78 # parseXml manage str, but we get unicode
79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
80 ITEMS_SEQ_NAME = u'node_{node_id}_seq'
81 PEP_COL_NAME = 'pep'
82 CURRENT_VERSION = '4'
83 # retrieve the maximum integer item id + 1
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'"
85
86
87 def withPEP(query, values, pep, recipient):
88 """Helper method to facilitate PEP management
89
90 @param query: SQL query basis
91 @param values: current values to replace in query
92 @param pep(bool): True if we are in PEP mode
93 @param recipient(jid.JID): jid of the recipient
94 @return: query + PEP AND check,
95 recipient's bare jid is added to value if needed
96 """
97 if pep:
98 pep_check="AND {}=%s".format(PEP_COL_NAME)
99 values=list(values) + [recipient.userhost()]
100 else:
101 pep_check="AND {} IS NULL".format(PEP_COL_NAME)
102 return "{} {}".format(query, pep_check), values
103
104
105 class Storage:
106
107 implements(iidavoll.IStorage)
108
109 defaultConfig = {
110 'leaf': {
111 const.OPT_PERSIST_ITEMS: True,
112 const.OPT_DELIVER_PAYLOADS: True,
113 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
114 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
115 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
116 const.OPT_SERIAL_IDS: False,
117 },
118 'collection': {
119 const.OPT_DELIVER_PAYLOADS: True,
120 const.OPT_SEND_LAST_PUBLISHED_ITEM: 'on_sub',
121 const.OPT_ACCESS_MODEL: const.VAL_AMODEL_DEFAULT,
122 const.OPT_PUBLISH_MODEL: const.VAL_PMODEL_DEFAULT,
123 }
124 }
125
126 def __init__(self, dbpool):
127 self.dbpool = dbpool
128 d = self.dbpool.runQuery("SELECT value FROM metadata WHERE key='version'")
129 d.addCallbacks(self._checkVersion, self._versionEb)
130
131 def _checkVersion(self, row):
132 version = row[0].value
133 if version != CURRENT_VERSION:
134 logging.error("Bad database schema version ({current}), please upgrade to {needed}".format(
135 current=version, needed=CURRENT_VERSION))
136 reactor.stop()
137
138 def _versionEb(self, failure):
139 logging.error("Can't check schema version: {reason}".format(reason=failure))
140 reactor.stop()
141
142 def _buildNode(self, row):
143 """Build a note class from database result row"""
144 configuration = {}
145
146 if not row:
147 raise error.NodeNotFound()
148
149 if row[2] == 'leaf':
150 configuration = {
151 'pubsub#persist_items': row[3],
152 'pubsub#deliver_payloads': row[4],
153 'pubsub#send_last_published_item': row[5],
154 const.OPT_ACCESS_MODEL:row[6],
155 const.OPT_PUBLISH_MODEL:row[7],
156 const.OPT_SERIAL_IDS:row[8],
157 }
158 schema = row[9]
159 if schema is not None:
160 schema = parseXml(schema)
161 node = LeafNode(row[0], row[1], configuration, schema)
162 node.dbpool = self.dbpool
163 return node
164 elif row[2] == 'collection':
165 configuration = {
166 'pubsub#deliver_payloads': row[4],
167 'pubsub#send_last_published_item': row[5],
168 const.OPT_ACCESS_MODEL: row[6],
169 const.OPT_PUBLISH_MODEL:row[7],
170 }
171 node = CollectionNode(row[0], row[1], configuration, None)
172 node.dbpool = self.dbpool
173 return node
174 else:
175 raise ValueError("Unknown node type !")
176
177 def getNodeById(self, nodeDbId):
178 """Get node using database ID insted of pubsub identifier
179
180 @param nodeDbId(unicode): database ID
181 """
182 return self.dbpool.runInteraction(self._getNodeById, nodeDbId)
183
184 def _getNodeById(self, cursor, nodeDbId):
185 cursor.execute("""SELECT node_id,
186 node,
187 node_type,
188 persist_items,
189 deliver_payloads,
190 send_last_published_item,
191 access_model,
192 publish_model,
193 serial_ids,
194 schema::text,
195 pep
196 FROM nodes
197 WHERE node_id=%s""",
198 (nodeDbId,))
199 row = cursor.fetchone()
200 return self._buildNode(row)
201
202 def getNode(self, nodeIdentifier, pep, recipient=None):
203 return self.dbpool.runInteraction(self._getNode, nodeIdentifier, pep, recipient)
204
205 def _getNode(self, cursor, nodeIdentifier, pep, recipient):
206 cursor.execute(*withPEP("""SELECT node_id,
207 node,
208 node_type,
209 persist_items,
210 deliver_payloads,
211 send_last_published_item,
212 access_model,
213 publish_model,
214 serial_ids,
215 schema::text,
216 pep
217 FROM nodes
218 WHERE node=%s""",
219 (nodeIdentifier,), pep, recipient))
220 row = cursor.fetchone()
221 return self._buildNode(row)
222
223 def getNodeIds(self, pep, recipient, allowed_accesses=None):
224 """retrieve ids of existing nodes
225
226 @param pep(bool): True if it's a PEP request
227 @param recipient(jid.JID, None): recipient of the PEP request
228 @param allowed_accesses(None, set): only nodes with access
229 in this set will be returned
230 None to return all nodes
231 @return (list[unicode]): ids of nodes
232 """
233 if not pep:
234 query = "SELECT node from nodes WHERE pep is NULL"
235 values = []
236 else:
237 query = "SELECT node from nodes WHERE pep=%s"
238 values = [recipient.userhost()]
239
240 if allowed_accesses is not None:
241 query += "AND access_model IN %s"
242 values.append(tuple(allowed_accesses))
243
244 d = self.dbpool.runQuery(query, values)
245 d.addCallback(lambda results: [r[0] for r in results])
246 return d
247
248 def createNode(self, nodeIdentifier, owner, config, schema, pep, recipient=None):
249 return self.dbpool.runInteraction(self._createNode, nodeIdentifier,
250 owner, config, schema, pep, recipient)
251
252 def _createNode(self, cursor, nodeIdentifier, owner, config, schema, pep, recipient):
253 if config['pubsub#node_type'] != 'leaf':
254 raise error.NoCollections()
255
256 owner = owner.userhost()
257
258 try:
259 cursor.execute("""INSERT INTO nodes
260 (node,
261 node_type,
262 persist_items,
263 deliver_payloads,
264 send_last_published_item,
265 access_model,
266 publish_model,
267 serial_ids,
268 schema,
269 pep)
270 VALUES
271 (%s, 'leaf', %s, %s, %s, %s, %s, %s, %s, %s)""",
272 (nodeIdentifier,
273 config['pubsub#persist_items'],
274 config['pubsub#deliver_payloads'],
275 config['pubsub#send_last_published_item'],
276 config[const.OPT_ACCESS_MODEL],
277 config[const.OPT_PUBLISH_MODEL],
278 config[const.OPT_SERIAL_IDS],
279 schema,
280 recipient.userhost() if pep else None
281 )
282 )
283 except cursor._pool.dbapi.IntegrityError as e:
284 if e.pgcode == "23505":
285 # unique_violation
286 raise error.NodeExists()
287 else:
288 raise error.InvalidConfigurationOption()
289
290 cursor.execute(*withPEP("""SELECT node_id FROM nodes WHERE node=%s""",
291 (nodeIdentifier,), pep, recipient));
292 node_id = cursor.fetchone()[0]
293
294 cursor.execute("""SELECT 1 as bool from entities where jid=%s""",
295 (owner,))
296
297 if not cursor.fetchone():
298 # XXX: we can NOT rely on the previous query! Commit is needed now because
299 # if the entry exists the next query will leave the database in a corrupted
300 # state: the solution is to rollback. I tried with other methods like
301 # "WHERE NOT EXISTS" but none of them worked, so the following solution
302 # looks like the sole - unless you have auto-commit on. More info
303 # about this issue: http://cssmay.com/question/tag/tag-psycopg2
304 cursor.connection.commit()
305 try:
306 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
307 (owner,))
308 except psycopg2.IntegrityError as e:
309 cursor.connection.rollback()
310 logging.warning("during node creation: %s" % e.message)
311
312 cursor.execute("""INSERT INTO affiliations
313 (node_id, entity_id, affiliation)
314 SELECT %s, entity_id, 'owner' FROM
315 (SELECT entity_id FROM entities
316 WHERE jid=%s) as e""",
317 (node_id, owner))
318
319 if config[const.OPT_ACCESS_MODEL] == const.VAL_AMODEL_PUBLISHER_ROSTER:
320 if const.OPT_ROSTER_GROUPS_ALLOWED in config:
321 allowed_groups = config[const.OPT_ROSTER_GROUPS_ALLOWED]
322 else:
323 allowed_groups = []
324 for group in allowed_groups:
325 #TODO: check that group are actually in roster
326 cursor.execute("""INSERT INTO node_groups_authorized (node_id, groupname)
327 VALUES (%s,%s)""" , (node_id, group))
328 # XXX: affiliations can't be set on during node creation (at least not with XEP-0060 alone)
329 # so whitelist affiliations need to be done afterward
330
331 # no we may have to do extra things according to config options
332 default_conf = self.defaultConfig['leaf']
333 # XXX: trigger works on node creation because OPT_SERIAL_IDS is False in defaultConfig
334 # if this value is changed, the _configurationTriggers method should be adapted.
335 Node._configurationTriggers(cursor, node_id, default_conf, config)
336
337 def deleteNodeByDbId(self, db_id):
338 """Delete a node using directly its database id"""
339 return self.dbpool.runInteraction(self._deleteNodeByDbId, db_id)
340
341 def _deleteNodeByDbId(self, cursor, db_id):
342 cursor.execute("""DELETE FROM nodes WHERE node_id=%s""",
343 (db_id,))
344
345 if cursor.rowcount != 1:
346 raise error.NodeNotFound()
347
348 def deleteNode(self, nodeIdentifier, pep, recipient=None):
349 return self.dbpool.runInteraction(self._deleteNode, nodeIdentifier, pep, recipient)
350
351 def _deleteNode(self, cursor, nodeIdentifier, pep, recipient):
352 cursor.execute(*withPEP("""DELETE FROM nodes WHERE node=%s""",
353 (nodeIdentifier,), pep, recipient))
354
355 if cursor.rowcount != 1:
356 raise error.NodeNotFound()
357
358 def getAffiliations(self, entity, nodeIdentifier, pep, recipient=None):
359 return self.dbpool.runInteraction(self._getAffiliations, entity, nodeIdentifier, pep, recipient)
360
361 def _getAffiliations(self, cursor, entity, nodeIdentifier, pep, recipient=None):
362 query = ["""SELECT node, affiliation FROM entities
363 NATURAL JOIN affiliations
364 NATURAL JOIN nodes
365 WHERE jid=%s"""]
366 args = [entity.userhost()]
367
368 if nodeIdentifier is not None:
369 query.append("AND node=%s")
370 args.append(nodeIdentifier)
371
372 cursor.execute(*withPEP(' '.join(query), args, pep, recipient))
373 rows = cursor.fetchall()
374 return [tuple(r) for r in rows]
375
376 def getSubscriptions(self, entity, nodeIdentifier=None, pep=False, recipient=None):
377 """retrieve subscriptions of an entity
378
379 @param entity(jid.JID): entity to check
380 @param nodeIdentifier(unicode, None): node identifier
381 None to retrieve all subscriptions
382 @param pep: True if we are in PEP mode
383 @param recipient: jid of the recipient
384 """
385
386 def toSubscriptions(rows):
387 subscriptions = []
388 for row in rows:
389 subscriber = jid.internJID('%s/%s' % (row.jid,
390 row.resource))
391 subscription = Subscription(row.node, subscriber, row.state)
392 subscriptions.append(subscription)
393 return subscriptions
394
395 query = ["""SELECT node,
396 jid,
397 resource,
398 state
399 FROM entities
400 NATURAL JOIN subscriptions
401 NATURAL JOIN nodes
402 WHERE jid=%s"""]
403
404 args = [entity.userhost()]
405
406 if nodeIdentifier is not None:
407 query.append("AND node=%s")
408 args.append(nodeIdentifier)
409
410 d = self.dbpool.runQuery(*withPEP(' '.join(query), args, pep, recipient))
411 d.addCallback(toSubscriptions)
412 return d
413
414 def getDefaultConfiguration(self, nodeType):
415 return self.defaultConfig[nodeType].copy()
416
417 def formatLastItems(self, result):
418 last_items = []
419 for pep_jid_s, node, data, item_access_model in result:
420 pep_jid = jid.JID(pep_jid_s)
421 item = generic.stripNamespace(parseXml(data))
422 last_items.append((pep_jid, node, item, item_access_model))
423 return last_items
424
425 def getLastItems(self, entities, nodes, node_accesses, item_accesses, pep):
426 """get last item for several nodes and entities in a single request"""
427 if not entities or not nodes or not node_accesses or not item_accesses:
428 raise ValueError("entities, nodes and accesses must not be empty")
429 if node_accesses != ('open',) or item_accesses != ('open',):
430 raise NotImplementedError('only "open" access model is handled for now')
431 if not pep:
432 raise NotImplementedError(u"getLastItems is only implemented for PEP at the moment")
433 d = self.dbpool.runQuery("""SELECT DISTINCT ON (node_id) pep, node, data::text, items.access_model
434 FROM items
435 NATURAL JOIN nodes
436 WHERE nodes.pep IN %s
437 AND node IN %s
438 AND nodes.access_model in %s
439 AND items.access_model in %s
440 ORDER BY node_id DESC, item_id DESC""",
441 (tuple([e.userhost() for e in entities]),
442 nodes,
443 node_accesses,
444 item_accesses))
445 d.addCallback(self.formatLastItems)
446 return d
447
448
449 class Node:
450
451 implements(iidavoll.INode)
452
453 def __init__(self, nodeDbId, nodeIdentifier, config, schema):
454 self.nodeDbId = nodeDbId
455 self.nodeIdentifier = nodeIdentifier
456 self._config = config
457 self._schema = schema
458
459 def _checkNodeExists(self, cursor):
460 cursor.execute("""SELECT 1 as exist FROM nodes WHERE node_id=%s""",
461 (self.nodeDbId,))
462 if not cursor.fetchone():
463 raise error.NodeNotFound()
464
465 def getType(self):
466 return self.nodeType
467
468 def getOwners(self):
469 d = self.dbpool.runQuery("""SELECT jid FROM nodes NATURAL JOIN affiliations NATURAL JOIN entities WHERE node_id=%s and affiliation='owner'""", (self.nodeDbId,))
470 d.addCallback(lambda rows: [jid.JID(r[0]) for r in rows])
471 return d
472
473 def getConfiguration(self):
474 return self._config
475
476 def getNextId(self):
477 """return XMPP item id usable for next item to publish
478
479 the return value will be next int if serila_ids is set,
480 else an UUID will be returned
481 """
482 if self._config[const.OPT_SERIAL_IDS]:
483 d = self.dbpool.runQuery("SELECT nextval('{seq_name}')".format(
484 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)))
485 d.addCallback(lambda rows: unicode(rows[0][0]))
486 return d
487 else:
488 return defer.succeed(unicode(uuid.uuid4()))
489
490 @staticmethod
491 def _configurationTriggers(cursor, node_id, old_config, new_config):
492 """trigger database relative actions needed when a config is changed
493
494 @param cursor(): current db cursor
495 @param node_id(unicode): database ID of the node
496 @param old_config(dict): config of the node before the change
497 @param new_config(dict): new options that will be changed
498 """
499 serial_ids = new_config[const.OPT_SERIAL_IDS]
500 if serial_ids != old_config[const.OPT_SERIAL_IDS]:
501 # serial_ids option has been modified,
502 # we need to handle corresponding sequence
503
504 # XXX: we use .format in following queries because values
505 # are generated by ourself
506 seq_name = ITEMS_SEQ_NAME.format(node_id=node_id)
507 if serial_ids:
508 # the next query get the max value +1 of all XMPP items ids
509 # which are integers, and default to 1
510 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=node_id))
511 next_val = cursor.fetchone()[0]
512 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name))
513 cursor.execute("CREATE SEQUENCE {seq_name} START {next_val} OWNED BY nodes.node_id".format(
514 seq_name = seq_name,
515 next_val = next_val))
516 else:
517 cursor.execute("DROP SEQUENCE IF EXISTS {seq_name}".format(seq_name = seq_name))
518
519 def setConfiguration(self, options):
520 config = copy.copy(self._config)
521
522 for option in options:
523 if option in config:
524 config[option] = options[option]
525
526 d = self.dbpool.runInteraction(self._setConfiguration, config)
527 d.addCallback(self._setCachedConfiguration, config)
528 return d
529
530 def _setConfiguration(self, cursor, config):
531 self._checkNodeExists(cursor)
532 self._configurationTriggers(cursor, self.nodeDbId, self._config, config)
533 cursor.execute("""UPDATE nodes SET persist_items=%s,
534 deliver_payloads=%s,
535 send_last_published_item=%s,
536 access_model=%s,
537 publish_model=%s,
538 serial_ids=%s
539 WHERE node_id=%s""",
540 (config[const.OPT_PERSIST_ITEMS],
541 config[const.OPT_DELIVER_PAYLOADS],
542 config[const.OPT_SEND_LAST_PUBLISHED_ITEM],
543 config[const.OPT_ACCESS_MODEL],
544 config[const.OPT_PUBLISH_MODEL],
545 config[const.OPT_SERIAL_IDS],
546 self.nodeDbId))
547
548 def _setCachedConfiguration(self, void, config):
549 self._config = config
550
551 def getSchema(self):
552 return self._schema
553
554 def setSchema(self, schema):
555 d = self.dbpool.runInteraction(self._setSchema, schema)
556 d.addCallback(self._setCachedSchema, schema)
557 return d
558
559 def _setSchema(self, cursor, schema):
560 self._checkNodeExists(cursor)
561 cursor.execute("""UPDATE nodes SET schema=%s
562 WHERE node_id=%s""",
563 (schema.toXml() if schema else None,
564 self.nodeDbId))
565
566 def _setCachedSchema(self, void, schema):
567 self._schema = schema
568
569 def getMetaData(self):
570 config = copy.copy(self._config)
571 config["pubsub#node_type"] = self.nodeType
572 return config
573
574 def getAffiliation(self, entity):
575 return self.dbpool.runInteraction(self._getAffiliation, entity)
576
577 def _getAffiliation(self, cursor, entity):
578 self._checkNodeExists(cursor)
579 cursor.execute("""SELECT affiliation FROM affiliations
580 NATURAL JOIN nodes
581 NATURAL JOIN entities
582 WHERE node_id=%s AND jid=%s""",
583 (self.nodeDbId,
584 entity.userhost()))
585
586 try:
587 return cursor.fetchone()[0]
588 except TypeError:
589 return None
590
591 def getAccessModel(self):
592 return self._config[const.OPT_ACCESS_MODEL]
593
594 def getSubscription(self, subscriber):
595 return self.dbpool.runInteraction(self._getSubscription, subscriber)
596
597 def _getSubscription(self, cursor, subscriber):
598 self._checkNodeExists(cursor)
599
600 userhost = subscriber.userhost()
601 resource = subscriber.resource or ''
602
603 cursor.execute("""SELECT state FROM subscriptions
604 NATURAL JOIN nodes
605 NATURAL JOIN entities
606 WHERE node_id=%s AND jid=%s AND resource=%s""",
607 (self.nodeDbId,
608 userhost,
609 resource))
610
611 row = cursor.fetchone()
612 if not row:
613 return None
614 else:
615 return Subscription(self.nodeIdentifier, subscriber, row[0])
616
617 def getSubscriptions(self, state=None):
618 return self.dbpool.runInteraction(self._getSubscriptions, state)
619
620 def _getSubscriptions(self, cursor, state):
621 self._checkNodeExists(cursor)
622
623 query = """SELECT node, jid, resource, state,
624 subscription_type, subscription_depth
625 FROM subscriptions
626 NATURAL JOIN nodes
627 NATURAL JOIN entities
628 WHERE node_id=%s"""
629 values = [self.nodeDbId]
630
631 if state:
632 query += " AND state=%s"
633 values.append(state)
634
635 cursor.execute(query, values)
636 rows = cursor.fetchall()
637
638 subscriptions = []
639 for row in rows:
640 subscriber = jid.JID(u'%s/%s' % (row.jid, row.resource))
641
642 options = {}
643 if row.subscription_type:
644 options['pubsub#subscription_type'] = row.subscription_type;
645 if row.subscription_depth:
646 options['pubsub#subscription_depth'] = row.subscription_depth;
647
648 subscriptions.append(Subscription(row.node, subscriber,
649 row.state, options))
650
651 return subscriptions
652
653 def addSubscription(self, subscriber, state, config):
654 return self.dbpool.runInteraction(self._addSubscription, subscriber,
655 state, config)
656
657 def _addSubscription(self, cursor, subscriber, state, config):
658 self._checkNodeExists(cursor)
659
660 userhost = subscriber.userhost()
661 resource = subscriber.resource or ''
662
663 subscription_type = config.get('pubsub#subscription_type')
664 subscription_depth = config.get('pubsub#subscription_depth')
665
666 try:
667 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
668 (userhost,))
669 except cursor._pool.dbapi.IntegrityError:
670 cursor.connection.rollback()
671
672 try:
673 cursor.execute("""INSERT INTO subscriptions
674 (node_id, entity_id, resource, state,
675 subscription_type, subscription_depth)
676 SELECT %s, entity_id, %s, %s, %s, %s FROM
677 (SELECT entity_id FROM entities
678 WHERE jid=%s) AS ent_id""",
679 (self.nodeDbId,
680 resource,
681 state,
682 subscription_type,
683 subscription_depth,
684 userhost))
685 except cursor._pool.dbapi.IntegrityError:
686 raise error.SubscriptionExists()
687
688 def removeSubscription(self, subscriber):
689 return self.dbpool.runInteraction(self._removeSubscription,
690 subscriber)
691
692 def _removeSubscription(self, cursor, subscriber):
693 self._checkNodeExists(cursor)
694
695 userhost = subscriber.userhost()
696 resource = subscriber.resource or ''
697
698 cursor.execute("""DELETE FROM subscriptions WHERE
699 node_id=%s AND
700 entity_id=(SELECT entity_id FROM entities
701 WHERE jid=%s) AND
702 resource=%s""",
703 (self.nodeDbId,
704 userhost,
705 resource))
706 if cursor.rowcount != 1:
707 raise error.NotSubscribed()
708
709 return None
710
711 def setSubscriptions(self, subscriptions):
712 return self.dbpool.runInteraction(self._setSubscriptions, subscriptions)
713
714 def _setSubscriptions(self, cursor, subscriptions):
715 self._checkNodeExists(cursor)
716
717 entities = self.getOrCreateEntities(cursor, [s.subscriber for s in subscriptions])
718 entities_map = {jid.JID(e.jid): e for e in entities}
719
720 # then we construct values for subscriptions update according to entity_id we just got
721 placeholders = ','.join(len(subscriptions) * ["%s"])
722 values = []
723 for subscription in subscriptions:
724 entity_id = entities_map[subscription.subscriber].entity_id
725 resource = subscription.subscriber.resource or u''
726 values.append((self.nodeDbId, entity_id, resource, subscription.state, None, None))
727 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5
728 cursor.execute("INSERT INTO subscriptions(node_id, entity_id, resource, state, subscription_type, subscription_depth) VALUES " + placeholders + " ON CONFLICT (entity_id, resource, node_id) DO UPDATE SET state=EXCLUDED.state", [v for v in values])
729
730 def isSubscribed(self, entity):
731 return self.dbpool.runInteraction(self._isSubscribed, entity)
732
733 def _isSubscribed(self, cursor, entity):
734 self._checkNodeExists(cursor)
735
736 cursor.execute("""SELECT 1 as bool FROM entities
737 NATURAL JOIN subscriptions
738 NATURAL JOIN nodes
739 WHERE entities.jid=%s
740 AND node_id=%s AND state='subscribed'""",
741 (entity.userhost(),
742 self.nodeDbId))
743
744 return cursor.fetchone() is not None
745
746 def getAffiliations(self):
747 return self.dbpool.runInteraction(self._getAffiliations)
748
749 def _getAffiliations(self, cursor):
750 self._checkNodeExists(cursor)
751
752 cursor.execute("""SELECT jid, affiliation FROM nodes
753 NATURAL JOIN affiliations
754 NATURAL JOIN entities
755 WHERE node_id=%s""",
756 (self.nodeDbId,))
757 result = cursor.fetchall()
758
759 return {jid.internJID(r[0]): r[1] for r in result}
760
761 def getOrCreateEntities(self, cursor, entities_jids):
762 """Get entity_id from entities in entities table
763
764 Entities will be inserted it they don't exist
765 @param entities_jid(list[jid.JID]): entities to get or create
766 @return list[record(entity_id,jid)]]: list of entity_id and jid (as plain string)
767 both existing and inserted entities are returned
768 """
769 # cf. http://stackoverflow.com/a/35265559
770 placeholders = ','.join(len(entities_jids) * ["(%s)"])
771 query = (
772 """
773 WITH
774 jid_values (jid) AS (
775 VALUES {placeholders}
776 ),
777 inserted (entity_id, jid) AS (
778 INSERT INTO entities (jid)
779 SELECT jid
780 FROM jid_values
781 ON CONFLICT DO NOTHING
782 RETURNING entity_id, jid
783 )
784 SELECT e.entity_id, e.jid
785 FROM entities e JOIN jid_values jv ON jv.jid = e.jid
786 UNION ALL
787 SELECT entity_id, jid
788 FROM inserted""".format(placeholders=placeholders))
789 cursor.execute(query, [j.userhost() for j in entities_jids])
790 return cursor.fetchall()
791
792 def setAffiliations(self, affiliations):
793 return self.dbpool.runInteraction(self._setAffiliations, affiliations)
794
795 def _setAffiliations(self, cursor, affiliations):
796 self._checkNodeExists(cursor)
797
798 entities = self.getOrCreateEntities(cursor, affiliations)
799
800 # then we construct values for affiliations update according to entity_id we just got
801 placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"])
802 values = []
803 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities))
804
805 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5
806 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values)
807
808 def deleteAffiliations(self, entities):
809 return self.dbpool.runInteraction(self._deleteAffiliations, entities)
810
811 def _deleteAffiliations(self, cursor, entities):
812 """delete affiliations and subscriptions for this entity"""
813 self._checkNodeExists(cursor)
814 placeholders = ','.join(len(entities) * ["%s"])
815 cursor.execute("DELETE FROM affiliations WHERE node_id=%s AND entity_id in (SELECT entity_id FROM entities WHERE jid IN (" + placeholders + ")) RETURNING entity_id", [self.nodeDbId] + [e.userhost() for e in entities])
816
817 rows = cursor.fetchall()
818 placeholders = ','.join(len(rows) * ["%s"])
819 cursor.execute("DELETE FROM subscriptions WHERE node_id=%s AND entity_id in (" + placeholders + ")", [self.nodeDbId] + [r[0] for r in rows])
820
821 def getAuthorizedGroups(self):
822 return self.dbpool.runInteraction(self._getNodeGroups)
823
824 def _getAuthorizedGroups(self, cursor):
825 cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
826 (self.nodeDbId,))
827 rows = cursor.fetchall()
828 return [row[0] for row in rows]
829
830
831 class LeafNode(Node):
832
833 implements(iidavoll.ILeafNode)
834
835 nodeType = 'leaf'
836
837 def storeItems(self, item_data, publisher):
838 return self.dbpool.runInteraction(self._storeItems, item_data, publisher)
839
840 def _storeItems(self, cursor, items_data, publisher):
841 self._checkNodeExists(cursor)
842 for item_data in items_data:
843 self._storeItem(cursor, item_data, publisher)
844
845 def _storeItem(self, cursor, item_data, publisher):
846 # first try to insert the item
847 # - if it fails (conflict), and the item is new and we have serial_ids options,
848 # current id will be recomputed using next item id query (note that is not perfect, as
849 # table is not locked and this can fail if two items are added at the same time
850 # but this can only happen with serial_ids and if future ids have been set by a client,
851 # this case should be rare enough to consider this situation acceptable)
852 # - if item insertion fail and the item is not new, we do an update
853 # - in other cases, exception is raised
854 item, access_model, item_config = item_data.item, item_data.access_model, item_data.config
855 data = item.toXml()
856
857 insert_query = """INSERT INTO items (node_id, item, publisher, data, access_model)
858 SELECT %s, %s, %s, %s, %s FROM nodes
859 WHERE node_id=%s
860 RETURNING item_id"""
861 insert_data = [self.nodeDbId,
862 item["id"],
863 publisher.full(),
864 data,
865 access_model,
866 self.nodeDbId]
867
868 try:
869 cursor.execute(insert_query, insert_data)
870 except cursor._pool.dbapi.IntegrityError as e:
871 if e.pgcode != "23505":
872 # we only handle unique_violation, every other exception must be raised
873 raise e
874 cursor.connection.rollback()
875 # the item already exist
876 if item_data.new:
877 # the item is new
878 if self._config[const.OPT_SERIAL_IDS]:
879 # this can happen with serial_ids, if a item has been stored
880 # with a future id (generated by XMPP client)
881 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=self.nodeDbId))
882 next_id = cursor.fetchone()[0]
883 # we update the sequence, so we can skip conflicting ids
884 cursor.execute(u"SELECT setval('{seq_name}', %s)".format(
885 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)), [next_id])
886 # and now we can retry the query with the new id
887 item['id'] = insert_data[1] = unicode(next_id)
888 # item saved in DB must also be updated with the new id
889 insert_data[3] = item.toXml()
890 cursor.execute(insert_query, insert_data)
891 else:
892 # but if we have not serial_ids, we have a real problem
893 raise e
894 else:
895 # this is an update
896 cursor.execute("""UPDATE items SET updated=now(), publisher=%s, data=%s
897 FROM nodes
898 WHERE nodes.node_id = items.node_id AND
899 nodes.node_id = %s and items.item=%s
900 RETURNING item_id""",
901 (publisher.full(),
902 data,
903 self.nodeDbId,
904 item["id"]))
905 if cursor.rowcount != 1:
906 raise exceptions.InternalError("item has not been updated correctly")
907 item_id = cursor.fetchone()[0];
908 self._storeCategories(cursor, item_id, item_data.categories, update=True)
909 return
910
911 item_id = cursor.fetchone()[0];
912 self._storeCategories(cursor, item_id, item_data.categories)
913
914 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
915 if const.OPT_ROSTER_GROUPS_ALLOWED in item_config:
916 item_config.fields[const.OPT_ROSTER_GROUPS_ALLOWED].fieldType='list-multi' #XXX: needed to force list if there is only one value
917 allowed_groups = item_config[const.OPT_ROSTER_GROUPS_ALLOWED]
918 else:
919 allowed_groups = []
920 for group in allowed_groups:
921 #TODO: check that group are actually in roster
922 cursor.execute("""INSERT INTO item_groups_authorized (item_id, groupname)
923 VALUES (%s,%s)""" , (item_id, group))
924 # TODO: whitelist access model
925
926 def _storeCategories(self, cursor, item_id, categories, update=False):
927 # TODO: handle canonical form
928 if update:
929 cursor.execute("""DELETE FROM item_categories
930 WHERE item_id=%s""", (item_id,))
931
932 for category in categories:
933 cursor.execute("""INSERT INTO item_categories (item_id, category)
934 VALUES (%s, %s)""", (item_id, category))
935
936 def removeItems(self, itemIdentifiers):
937 return self.dbpool.runInteraction(self._removeItems, itemIdentifiers)
938
939 def _removeItems(self, cursor, itemIdentifiers):
940 self._checkNodeExists(cursor)
941
942 deleted = []
943
944 for itemIdentifier in itemIdentifiers:
945 cursor.execute("""DELETE FROM items WHERE
946 node_id=%s AND
947 item=%s""",
948 (self.nodeDbId,
949 itemIdentifier))
950
951 if cursor.rowcount:
952 deleted.append(itemIdentifier)
953
954 return deleted
955
956 def getItems(self, authorized_groups, unrestricted, maxItems=None, ext_data=None):
957 """ Get all authorised items
958
959 @param authorized_groups: we want to get items that these groups can access
960 @param unrestricted: if true, don't check permissions (i.e.: get all items)
961 @param maxItems: nb of items we want to get
962 @param ext_data: options for extra features like RSM and MAM
963
964 @return: list of container.ItemData
965 if unrestricted is False, access_model and config will be None
966 """
967 if ext_data is None:
968 ext_data = {}
969 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data, ids_only=False)
970
971 def getItemsIds(self, authorized_groups, unrestricted, maxItems=None, ext_data=None):
972 """ Get all authorised items ids
973
974 @param authorized_groups: we want to get items that these groups can access
975 @param unrestricted: if true, don't check permissions (i.e.: get all items)
976 @param maxItems: nb of items we want to get
977 @param ext_data: options for extra features like RSM and MAM
978
979 @return list(unicode): list of ids
980 """
981 if ext_data is None:
982 ext_data = {}
983 return self.dbpool.runInteraction(self._getItems, authorized_groups, unrestricted, maxItems, ext_data, ids_only=True)
984
985 def _appendSourcesAndFilters(self, query, args, authorized_groups, unrestricted, ext_data):
986 """append sources and filters to sql query requesting items and return ORDER BY
987
988 arguments query, args, authorized_groups, unrestricted and ext_data are the same as for
989 _getItems
990 """
991 # SOURCES
992 query.append("FROM nodes INNER JOIN items USING (node_id)")
993
994 if unrestricted:
995 query_filters = ["WHERE node_id=%s"]
996 args.append(self.nodeDbId)
997 else:
998 query.append("LEFT JOIN item_groups_authorized USING (item_id)")
999 args.append(self.nodeDbId)
1000 if authorized_groups:
1001 get_groups = " or (items.access_model='roster' and groupname in %s)"
1002 args.append(authorized_groups)
1003 else:
1004 get_groups = ""
1005
1006 query_filters = ["WHERE node_id=%s AND (items.access_model='open'" + get_groups + ")"]
1007
1008 # FILTERS
1009 if 'filters' in ext_data: # MAM filters
1010 for filter_ in ext_data['filters']:
1011 if filter_.var == 'start':
1012 query_filters.append("AND created>=%s")
1013 args.append(filter_.value)
1014 elif filter_.var == 'end':
1015 query_filters.append("AND created<=%s")
1016 args.append(filter_.value)
1017 elif filter_.var == 'with':
1018 jid_s = filter_.value
1019 if '/' in jid_s:
1020 query_filters.append("AND publisher=%s")
1021 args.append(filter_.value)
1022 else:
1023 query_filters.append("AND publisher LIKE %s")
1024 args.append(u"{}%".format(filter_.value))
1025 elif filter_.var == const.MAM_FILTER_CATEGORY:
1026 query.append("LEFT JOIN item_categories USING (item_id)")
1027 query_filters.append("AND category=%s")
1028 args.append(filter_.value)
1029 else:
1030 log.msg("WARNING: unknown filter: {}".format(filter_.encode('utf-8')))
1031
1032 query.extend(query_filters)
1033
1034 return "ORDER BY item_id DESC"
1035
1036 def _getItems(self, cursor, authorized_groups, unrestricted, maxItems, ext_data, ids_only):
1037 self._checkNodeExists(cursor)
1038
1039 if maxItems == 0:
1040 return []
1041
1042 args = []
1043
1044 # SELECT
1045 if ids_only:
1046 query = ["SELECT item"]
1047 else:
1048 query = ["SELECT data::text,items.access_model,item_id,created,updated"]
1049
1050 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
1051
1052 if 'rsm' in ext_data:
1053 rsm = ext_data['rsm']
1054 maxItems = rsm.max
1055 if rsm.index is not None:
1056 # We need to know the item_id of corresponding to the index (offset) of the current query
1057 # so we execute the query to look for the item_id
1058 tmp_query = query[:]
1059 tmp_args = args[:]
1060 tmp_query[0] = "SELECT item_id"
1061 tmp_query.append("{} LIMIT 1 OFFSET %s".format(query_order))
1062 tmp_args.append(rsm.index)
1063 cursor.execute(' '.join(query), args)
1064 # FIXME: bad index is not managed yet
1065 item_id = cursor.fetchall()[0][0]
1066
1067 # now that we have the id, we can use it
1068 query.append("AND item_id<=%s")
1069 args.append(item_id)
1070 elif rsm.before is not None:
1071 if rsm.before != '':
1072 query.append("AND item_id>(SELECT item_id FROM items WHERE item=%s LIMIT 1)")
1073 args.append(rsm.before)
1074 if maxItems is not None:
1075 # if we have maxItems (i.e. a limit), we need to reverse order
1076 # in a first query to get the right items
1077 query.insert(0,"SELECT * from (")
1078 query.append("ORDER BY item_id ASC LIMIT %s) as x")
1079 args.append(maxItems)
1080 elif rsm.after:
1081 query.append("AND item_id<(SELECT item_id FROM items WHERE item=%s LIMIT 1)")
1082 args.append(rsm.after)
1083
1084 query.append(query_order)
1085
1086 if maxItems is not None:
1087 query.append("LIMIT %s")
1088 args.append(maxItems)
1089
1090 cursor.execute(' '.join(query), args)
1091
1092 result = cursor.fetchall()
1093 if unrestricted and not ids_only:
1094 # with unrestricted query, we need to fill the access_list for a roster access items
1095 ret = []
1096 for item_data in result:
1097 item = generic.stripNamespace(parseXml(item_data.data))
1098 access_model = item_data.access_model
1099 item_id = item_data.item_id
1100 created = item_data.created
1101 updated = item_data.updated
1102 access_list = {}
1103 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
1104 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
1105 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r.groupname for r in cursor.fetchall()]
1106
1107 ret.append(container.ItemData(item, access_model, access_list, created=created, updated=updated))
1108 # TODO: whitelist item access model
1109 return ret
1110
1111 if ids_only:
1112 return [r.item for r in result]
1113 else:
1114 items_data = [container.ItemData(generic.stripNamespace(parseXml(r.data)), r.access_model, created=r.created, updated=r.updated) for r in result]
1115 return items_data
1116
1117 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers):
1118 """Get items which are in the given list
1119
1120 @param authorized_groups: we want to get items that these groups can access
1121 @param unrestricted: if true, don't check permissions
1122 @param itemIdentifiers: list of ids of the items we want to get
1123 @return: list of container.ItemData
1124 ItemData.config will contains access_list (managed as a dictionnary with same key as for item_config)
1125 if unrestricted is False, access_model and config will be None
1126 """
1127 return self.dbpool.runInteraction(self._getItemsById, authorized_groups, unrestricted, itemIdentifiers)
1128
1129 def _getItemsById(self, cursor, authorized_groups, unrestricted, itemIdentifiers):
1130 self._checkNodeExists(cursor)
1131 ret = []
1132 if unrestricted: #we get everything without checking permissions
1133 for itemIdentifier in itemIdentifiers:
1134 cursor.execute("""SELECT data::text,items.access_model,item_id,created,updated FROM nodes
1135 INNER JOIN items USING (node_id)
1136 WHERE node_id=%s AND item=%s""",
1137 (self.nodeDbId,
1138 itemIdentifier))
1139 result = cursor.fetchone()
1140 if not result:
1141 raise error.ItemNotFound()
1142
1143 item = generic.stripNamespace(parseXml(result[0]))
1144 access_model = result[1]
1145 item_id = result[2]
1146 created= result[3]
1147 updated= result[4]
1148 access_list = {}
1149 if access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
1150 cursor.execute('SELECT groupname FROM item_groups_authorized WHERE item_id=%s', (item_id,))
1151 access_list[const.OPT_ROSTER_GROUPS_ALLOWED] = [r[0] for r in cursor.fetchall()]
1152 #TODO: WHITELIST access_model
1153
1154 ret.append(container.ItemData(item, access_model, access_list, created=created, updated=updated))
1155 else: #we check permission before returning items
1156 for itemIdentifier in itemIdentifiers:
1157 args = [self.nodeDbId, itemIdentifier]
1158 if authorized_groups:
1159 args.append(authorized_groups)
1160 cursor.execute("""SELECT data::text, created, updated FROM nodes
1161 INNER JOIN items USING (node_id)
1162 LEFT JOIN item_groups_authorized USING (item_id)
1163 WHERE node_id=%s AND item=%s AND
1164 (items.access_model='open' """ +
1165 ("or (items.access_model='roster' and groupname in %s)" if authorized_groups else '') + ")",
1166 args)
1167
1168 result = cursor.fetchone()
1169 if result:
1170 ret.append(container.ItemData(generic.stripNamespace(parseXml(result[0])), created=result[1], updated=result[2]))
1171
1172 return ret
1173
1174 def getItemsCount(self, authorized_groups, unrestricted, ext_data=None):
1175 """Count expected number of items in a getItems query
1176
1177 @param authorized_groups: we want to get items that these groups can access
1178 @param unrestricted: if true, don't check permissions (i.e.: get all items)
1179 @param ext_data: options for extra features like RSM and MAM
1180 """
1181 if ext_data is None:
1182 ext_data = {}
1183 return self.dbpool.runInteraction(self._getItemsCount, authorized_groups, unrestricted, ext_data)
1184
1185 def _getItemsCount(self, cursor, authorized_groups, unrestricted, ext_data):
1186 self._checkNodeExists(cursor)
1187 args = []
1188
1189 # SELECT
1190 query = ["SELECT count(1)"]
1191
1192 self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
1193
1194 cursor.execute(' '.join(query), args)
1195 return cursor.fetchall()[0][0]
1196
1197 def getItemsIndex(self, item_id, authorized_groups, unrestricted, ext_data=None):
1198 """Get expected index of first item in the window of a getItems query
1199
1200 @param item_id: id of the item
1201 @param authorized_groups: we want to get items that these groups can access
1202 @param unrestricted: if true, don't check permissions (i.e.: get all items)
1203 @param ext_data: options for extra features like RSM and MAM
1204 """
1205 if ext_data is None:
1206 ext_data = {}
1207 return self.dbpool.runInteraction(self._getItemsIndex, item_id, authorized_groups, unrestricted, ext_data)
1208
1209 def _getItemsIndex(self, cursor, item_id, authorized_groups, unrestricted, ext_data):
1210 self._checkNodeExists(cursor)
1211 args = []
1212
1213 # SELECT
1214 query = []
1215
1216 query_order = self._appendSourcesAndFilters(query, args, authorized_groups, unrestricted, ext_data)
1217
1218 query_select = "SELECT row_number from (SELECT row_number() OVER ({}), item".format(query_order)
1219 query.insert(0, query_select)
1220 query.append(") as x WHERE item=%s")
1221 args.append(item_id)
1222
1223 cursor.execute(' '.join(query), args)
1224 # XXX: row_number start at 1, but we want that index start at 0
1225 try:
1226 return cursor.fetchall()[0][0] - 1
1227 except IndexError:
1228 raise error.NodeNotFound()
1229
1230 def getItemsPublishers(self, itemIdentifiers):
1231 """Get the publishers for all given identifiers
1232
1233 @return (dict[unicode, jid.JID]): map of itemIdentifiers to publisher
1234 if item is not found, key is skipped in resulting dict
1235 """
1236 return self.dbpool.runInteraction(self._getItemsPublishers, itemIdentifiers)
1237
1238 def _getItemsPublishers(self, cursor, itemIdentifiers):
1239 self._checkNodeExists(cursor)
1240 ret = {}
1241 for itemIdentifier in itemIdentifiers:
1242 cursor.execute("""SELECT publisher FROM items
1243 WHERE item=%s""",
1244 (itemIdentifier,))
1245 result = cursor.fetchone()
1246 if result:
1247 ret[itemIdentifier] = jid.JID(result[0])
1248 return ret
1249
1250 def purge(self):
1251 return self.dbpool.runInteraction(self._purge)
1252
1253 def _purge(self, cursor):
1254 self._checkNodeExists(cursor)
1255
1256 cursor.execute("""DELETE FROM items WHERE
1257 node_id=%s""",
1258 (self.nodeDbId,))
1259
1260
1261 class CollectionNode(Node):
1262
1263 nodeType = 'collection'
1264
1265
1266
1267 class GatewayStorage(object):
1268 """
1269 Memory based storage facility for the XMPP-HTTP gateway.
1270 """
1271
1272 def __init__(self, dbpool):
1273 self.dbpool = dbpool
1274
1275 def _countCallbacks(self, cursor, service, nodeIdentifier):
1276 """
1277 Count number of callbacks registered for a node.
1278 """
1279 cursor.execute("""SELECT count(*) FROM callbacks
1280 WHERE service=%s and node=%s""",
1281 (service.full(),
1282 nodeIdentifier))
1283 results = cursor.fetchall()
1284 return results[0][0]
1285
1286 def addCallback(self, service, nodeIdentifier, callback):
1287 def interaction(cursor):
1288 cursor.execute("""SELECT 1 as bool FROM callbacks
1289 WHERE service=%s and node=%s and uri=%s""",
1290 (service.full(),
1291 nodeIdentifier,
1292 callback))
1293 if cursor.fetchall():
1294 return
1295
1296 cursor.execute("""INSERT INTO callbacks
1297 (service, node, uri) VALUES
1298 (%s, %s, %s)""",
1299 (service.full(),
1300 nodeIdentifier,
1301 callback))
1302
1303 return self.dbpool.runInteraction(interaction)
1304
1305 def removeCallback(self, service, nodeIdentifier, callback):
1306 def interaction(cursor):
1307 cursor.execute("""DELETE FROM callbacks
1308 WHERE service=%s and node=%s and uri=%s""",
1309 (service.full(),
1310 nodeIdentifier,
1311 callback))
1312
1313 if cursor.rowcount != 1:
1314 raise error.NotSubscribed()
1315
1316 last = not self._countCallbacks(cursor, service, nodeIdentifier)
1317 return last
1318
1319 return self.dbpool.runInteraction(interaction)
1320
1321 def getCallbacks(self, service, nodeIdentifier):
1322 def interaction(cursor):
1323 cursor.execute("""SELECT uri FROM callbacks
1324 WHERE service=%s and node=%s""",
1325 (service.full(),
1326 nodeIdentifier))
1327 results = cursor.fetchall()
1328
1329 if not results:
1330 raise error.NoCallbacks()
1331
1332 return [result[0] for result in results]
1333
1334 return self.dbpool.runInteraction(interaction)
1335
1336 def hasCallbacks(self, service, nodeIdentifier):
1337 def interaction(cursor):
1338 return bool(self._countCallbacks(cursor, service, nodeIdentifier))
1339
1340 return self.dbpool.runInteraction(interaction)