comparison idavoll/pgsql_storage.py @ 107:d252d793f0ed

Initial revision.
author Ralph Meijer <ralphm@ik.nu>
date Fri, 08 Apr 2005 10:15:02 +0000
parents
children dfef919aaf1b
comparison
equal deleted inserted replaced
106:dc36882d2620 107:d252d793f0ed
1 import copy
2 import storage
3 from twisted.enterprise import adbapi
4 from twisted.internet import defer
5 from twisted.words.protocols.jabber import jid
6 from zope.interface import implements
7
8 class Storage:
9
10 implements(storage.IStorage)
11
12 def __init__(self, user, database):
13 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user,
14 database=database)
15
16 def get_node(self, node_id):
17 return self._dbpool.runInteraction(self._get_node, node_id)
18
19 def _get_node(self, cursor, node_id):
20 configuration = {}
21 cursor.execute("""SELECT persistent, deliver_payload FROM nodes
22 WHERE node=%s""",
23 (node_id,))
24 try:
25 (configuration["pubsub#persist_items"],
26 configuration["pubsub#deliver_payloads"]) = cursor.fetchone()
27 except TypeError:
28 raise storage.NodeNotFound
29 else:
30 node = LeafNode(node_id, configuration)
31 node._dbpool = self._dbpool
32 return node
33
34 def get_node_ids(self):
35 d = self._dbpool.runQuery("""SELECT node from nodes""")
36 d.addCallback(lambda results: [r[0] for r in results])
37 return d
38
39 def create_node(self, node_id, owner, type='leaf'):
40 return self._dbpool.runInteraction(self._create_node, node_id, owner)
41
42 def _create_node(self, cursor, node_id, owner):
43 try:
44 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""",
45 (node_id.encode('utf8')))
46 except:
47 raise storage.NodeExists
48
49 cursor.execute("""SELECT 1 from entities where jid=%s""",
50 (owner.full().encode('utf8')))
51
52 if not cursor.fetchone():
53 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
54 (owner.full().encode('utf8')))
55
56 cursor.execute("""INSERT INTO affiliations
57 (node_id, entity_id, affiliation)
58 SELECT n.id, e.id, 'owner' FROM
59 (SELECT id FROM nodes WHERE node=%s) AS n
60 CROSS JOIN
61 (SELECT id FROM entities WHERE jid=%s) AS e""",
62 (node_id.encode('utf8'),
63 owner.full().encode('utf8')))
64
65 def delete_node(self, node_id):
66 return self._dbpool.runInteraction(self._delete_node, node_id)
67
68 def _delete_node(self, cursor, node_id):
69 cursor.execute("""DELETE FROM nodes WHERE node=%s""",
70 (node_id.encode('utf-8'),))
71
72 if cursor.rowcount != 1:
73 raise storage.NodeNotFound
74
75 def get_affiliations(self, entity):
76 d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities
77 JOIN affiliations ON
78 (affiliations.entity_id=entities.id)
79 JOIN nodes ON
80 (nodes.id=affiliations.node_id)
81 WHERE jid=%s""",
82 (entity.full().encode('utf8'),))
83 d.addCallback(lambda results: [tuple(r) for r in results])
84 return d
85
86 def get_subscriptions(self, entity):
87 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription
88 FROM entities JOIN subscriptions ON
89 (subscriptions.entity_id=entities.id)
90 JOIN nodes ON
91 (nodes.id=subscriptions.node_id)
92 WHERE jid=%s""",
93 (entity.userhost().encode('utf8'),))
94 d.addCallback(self._convert_subscription_jids)
95 return d
96
97 def _convert_subscription_jids(self, subscriptions):
98 return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription)
99 for node, subscriber, resource, subscription in subscriptions]
100
101 class Node:
102
103 implements(storage.INode)
104
105 def __init__(self, node_id, config):
106 self.id = node_id
107 self._config = config
108
109 def get_type(self):
110 return self.type
111
112 def get_configuration(self):
113 return self._config
114
115 def set_configuration(self, options):
116 return self._dbpool.runInteraction(self._set_node_configuration,
117 options)
118
119 def _set_configuration(self, cursor, options):
120 for option in options:
121 if option in self._config:
122 self._config[option] = options[option]
123
124 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s
125 WHERE node=%s""",
126 (self._config["pubsub#persist_items"].encode('utf8'),
127 self._config["pubsub#deliver_payloads"].encode('utf8'),
128 self.id.encode('utf-8')))
129
130 def get_meta_data(self):
131 config = copy.copy(self._config)
132 config["pubsub#node_type"] = self.type
133 return config
134
135 def get_affiliation(self, entity):
136 return self._dbpool.runInteraction(self._get_affiliation, entity)
137
138 def _get_affiliation(self, cursor, entity):
139 cursor.execute("""SELECT affiliation FROM affiliations
140 JOIN nodes ON (node_id=nodes.id)
141 JOIN entities ON (entity_id=entities.id)
142 WHERE node=%s AND jid=%s""",
143 (self.id.encode('utf8'),
144 entity.full().encode('utf8')))
145
146 try:
147 return cursor.fetchone()[0]
148 except TypeError:
149 return None
150
151 def add_subscription(self, subscriber, state):
152 return self._dbpool.runInteraction(self._add_subscription, subscriber,
153 state)
154
155 def _add_subscription(self, cursor, subscriber, state):
156 userhost = subscriber.userhost()
157 resource = subscriber.resource or ''
158
159 try:
160 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
161 (userhost.encode('utf8')))
162 except:
163 pass
164
165 try:
166 cursor.execute("""INSERT INTO subscriptions
167 (node_id, entity_id, resource, subscription)
168 SELECT n.id, e.id, %s, %s FROM
169 (SELECT id FROM nodes WHERE node=%s) AS n
170 CROSS JOIN
171 (SELECT id FROM entities WHERE jid=%s) AS e""",
172 (resource.encode('utf8'),
173 state.encode('utf8'),
174 self.id.encode('utf8'),
175 userhost.encode('utf8')))
176 except:
177 cursor.execute("""SELECT subscription FROM subscriptions
178 JOIN nodes ON (nodes.id=subscriptions.node_id)
179 JOIN entities ON
180 (entities.id=subscriptions.entity_id)
181 WHERE node=%s AND jid=%s AND resource=%s""",
182 (self.id.encode('utf8'),
183 userhost.encode('utf8'),
184 resource.encode('utf8')))
185 state = cursor.fetchone()[0]
186
187 return {'node': self.id,
188 'jid': subscriber,
189 'subscription': state}
190
191 def remove_subscription(self, subscriber, state):
192 pass
193
194 def get_subscribers(self):
195 d = self._dbpool.runQuery("""SELECT jid, resource FROM subscriptions
196 JOIN nodes ON (node_id=nodes.id)
197 JOIN entities ON (entity_id=entities.id)
198 WHERE node=%s AND
199 subscription='subscribed'""",
200 (self.id.encode('utf8'),))
201 d.addCallback(self._convert_to_jids)
202 return d
203
204 def _convert_to_jids(self, list):
205 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list]
206
207 def is_subscribed(self, subscriber):
208 pass
209
210 class LeafNode(Node):
211
212 implements(storage.ILeafNode)
213
214 type = 'leaf'
215
216 def store_items(self, items, publisher):
217 return defer.succeed(None)
218
219 def remove_items(self, item_ids):
220 pass
221
222 def get_items(self, max_items=None):
223 pass
224
225 def get_items_by_id(self, item_ids):
226 pass
227
228 def purge(self):
229 pass