107
|
1 import copy |
|
2 import storage |
|
3 from twisted.enterprise import adbapi |
|
4 from twisted.internet import defer |
|
5 from twisted.words.protocols.jabber import jid |
|
6 from zope.interface import implements |
|
7 |
|
8 class Storage: |
|
9 |
|
10 implements(storage.IStorage) |
|
11 |
|
12 def __init__(self, user, database): |
|
13 self._dbpool = adbapi.ConnectionPool('pyPgSQL.PgSQL', user=user, |
|
14 database=database) |
|
15 |
|
16 def get_node(self, node_id): |
|
17 return self._dbpool.runInteraction(self._get_node, node_id) |
|
18 |
|
19 def _get_node(self, cursor, node_id): |
|
20 configuration = {} |
|
21 cursor.execute("""SELECT persistent, deliver_payload FROM nodes |
|
22 WHERE node=%s""", |
|
23 (node_id,)) |
|
24 try: |
|
25 (configuration["pubsub#persist_items"], |
|
26 configuration["pubsub#deliver_payloads"]) = cursor.fetchone() |
|
27 except TypeError: |
|
28 raise storage.NodeNotFound |
|
29 else: |
|
30 node = LeafNode(node_id, configuration) |
|
31 node._dbpool = self._dbpool |
|
32 return node |
|
33 |
|
34 def get_node_ids(self): |
|
35 d = self._dbpool.runQuery("""SELECT node from nodes""") |
|
36 d.addCallback(lambda results: [r[0] for r in results]) |
|
37 return d |
|
38 |
|
39 def create_node(self, node_id, owner, type='leaf'): |
|
40 return self._dbpool.runInteraction(self._create_node, node_id, owner) |
|
41 |
|
42 def _create_node(self, cursor, node_id, owner): |
113
|
43 node_id = node_id.encode('utf-8') |
|
44 owner = owner.userhost().encode('utf-8') |
107
|
45 try: |
|
46 cursor.execute("""INSERT INTO nodes (node) VALUES (%s)""", |
113
|
47 (node_id)) |
|
48 except cursor._pool.dbapi.OperationalError: |
107
|
49 raise storage.NodeExists |
|
50 |
|
51 cursor.execute("""SELECT 1 from entities where jid=%s""", |
113
|
52 (owner)) |
107
|
53 |
|
54 if not cursor.fetchone(): |
|
55 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
113
|
56 (owner)) |
107
|
57 |
|
58 cursor.execute("""INSERT INTO affiliations |
|
59 (node_id, entity_id, affiliation) |
|
60 SELECT n.id, e.id, 'owner' FROM |
|
61 (SELECT id FROM nodes WHERE node=%s) AS n |
|
62 CROSS JOIN |
|
63 (SELECT id FROM entities WHERE jid=%s) AS e""", |
113
|
64 (node_id, owner)) |
107
|
65 |
|
66 def delete_node(self, node_id): |
|
67 return self._dbpool.runInteraction(self._delete_node, node_id) |
|
68 |
|
69 def _delete_node(self, cursor, node_id): |
|
70 cursor.execute("""DELETE FROM nodes WHERE node=%s""", |
|
71 (node_id.encode('utf-8'),)) |
|
72 |
|
73 if cursor.rowcount != 1: |
|
74 raise storage.NodeNotFound |
|
75 |
|
76 def get_affiliations(self, entity): |
|
77 d = self._dbpool.runQuery("""SELECT node, affiliation FROM entities |
|
78 JOIN affiliations ON |
|
79 (affiliations.entity_id=entities.id) |
|
80 JOIN nodes ON |
|
81 (nodes.id=affiliations.node_id) |
|
82 WHERE jid=%s""", |
113
|
83 (entity.userhost().encode('utf8'),)) |
107
|
84 d.addCallback(lambda results: [tuple(r) for r in results]) |
|
85 return d |
|
86 |
|
87 def get_subscriptions(self, entity): |
|
88 d = self._dbpool.runQuery("""SELECT node, jid, resource, subscription |
|
89 FROM entities JOIN subscriptions ON |
|
90 (subscriptions.entity_id=entities.id) |
|
91 JOIN nodes ON |
|
92 (nodes.id=subscriptions.node_id) |
|
93 WHERE jid=%s""", |
|
94 (entity.userhost().encode('utf8'),)) |
|
95 d.addCallback(self._convert_subscription_jids) |
|
96 return d |
|
97 |
|
98 def _convert_subscription_jids(self, subscriptions): |
|
99 return [(node, jid.JID('%s/%s' % (subscriber, resource)), subscription) |
|
100 for node, subscriber, resource, subscription in subscriptions] |
|
101 |
|
102 class Node: |
|
103 |
|
104 implements(storage.INode) |
|
105 |
|
106 def __init__(self, node_id, config): |
|
107 self.id = node_id |
|
108 self._config = config |
|
109 |
|
110 def get_type(self): |
|
111 return self.type |
|
112 |
|
113 def get_configuration(self): |
|
114 return self._config |
|
115 |
|
116 def set_configuration(self, options): |
|
117 return self._dbpool.runInteraction(self._set_node_configuration, |
|
118 options) |
|
119 |
|
120 def _set_configuration(self, cursor, options): |
|
121 for option in options: |
|
122 if option in self._config: |
|
123 self._config[option] = options[option] |
|
124 |
|
125 cursor.execute("""UPDATE nodes SET persistent=%s, deliver_payload=%s |
|
126 WHERE node=%s""", |
|
127 (self._config["pubsub#persist_items"].encode('utf8'), |
|
128 self._config["pubsub#deliver_payloads"].encode('utf8'), |
|
129 self.id.encode('utf-8'))) |
|
130 |
|
131 def get_meta_data(self): |
|
132 config = copy.copy(self._config) |
|
133 config["pubsub#node_type"] = self.type |
|
134 return config |
|
135 |
|
136 def get_affiliation(self, entity): |
|
137 return self._dbpool.runInteraction(self._get_affiliation, entity) |
|
138 |
|
139 def _get_affiliation(self, cursor, entity): |
|
140 cursor.execute("""SELECT affiliation FROM affiliations |
|
141 JOIN nodes ON (node_id=nodes.id) |
|
142 JOIN entities ON (entity_id=entities.id) |
|
143 WHERE node=%s AND jid=%s""", |
|
144 (self.id.encode('utf8'), |
113
|
145 entity.userhost().encode('utf8'))) |
107
|
146 |
|
147 try: |
|
148 return cursor.fetchone()[0] |
|
149 except TypeError: |
|
150 return None |
|
151 |
|
152 def add_subscription(self, subscriber, state): |
|
153 return self._dbpool.runInteraction(self._add_subscription, subscriber, |
|
154 state) |
|
155 |
|
156 def _add_subscription(self, cursor, subscriber, state): |
|
157 userhost = subscriber.userhost() |
|
158 resource = subscriber.resource or '' |
|
159 |
|
160 try: |
|
161 cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""", |
|
162 (userhost.encode('utf8'))) |
113
|
163 except cursor._pool.dbapi.OperationalError: |
107
|
164 pass |
|
165 |
|
166 try: |
|
167 cursor.execute("""INSERT INTO subscriptions |
|
168 (node_id, entity_id, resource, subscription) |
|
169 SELECT n.id, e.id, %s, %s FROM |
|
170 (SELECT id FROM nodes WHERE node=%s) AS n |
|
171 CROSS JOIN |
|
172 (SELECT id FROM entities WHERE jid=%s) AS e""", |
|
173 (resource.encode('utf8'), |
|
174 state.encode('utf8'), |
|
175 self.id.encode('utf8'), |
|
176 userhost.encode('utf8'))) |
113
|
177 except cursor._pool.dbapi.OperationalError: |
107
|
178 cursor.execute("""SELECT subscription FROM subscriptions |
|
179 JOIN nodes ON (nodes.id=subscriptions.node_id) |
|
180 JOIN entities ON |
|
181 (entities.id=subscriptions.entity_id) |
|
182 WHERE node=%s AND jid=%s AND resource=%s""", |
|
183 (self.id.encode('utf8'), |
|
184 userhost.encode('utf8'), |
|
185 resource.encode('utf8'))) |
|
186 state = cursor.fetchone()[0] |
|
187 |
|
188 return {'node': self.id, |
|
189 'jid': subscriber, |
|
190 'subscription': state} |
|
191 |
|
192 def remove_subscription(self, subscriber, state): |
|
193 pass |
|
194 |
|
195 def get_subscribers(self): |
|
196 d = self._dbpool.runQuery("""SELECT jid, resource FROM subscriptions |
|
197 JOIN nodes ON (node_id=nodes.id) |
|
198 JOIN entities ON (entity_id=entities.id) |
|
199 WHERE node=%s AND |
|
200 subscription='subscribed'""", |
|
201 (self.id.encode('utf8'),)) |
|
202 d.addCallback(self._convert_to_jids) |
|
203 return d |
|
204 |
|
205 def _convert_to_jids(self, list): |
|
206 return [jid.JID("%s/%s" % (l[0], l[1])) for l in list] |
|
207 |
|
208 def is_subscribed(self, subscriber): |
|
209 pass |
|
210 |
|
211 class LeafNode(Node): |
|
212 |
|
213 implements(storage.ILeafNode) |
|
214 |
|
215 type = 'leaf' |
|
216 |
|
217 def store_items(self, items, publisher): |
|
218 return defer.succeed(None) |
|
219 |
|
220 def remove_items(self, item_ids): |
|
221 pass |
|
222 |
|
223 def get_items(self, max_items=None): |
|
224 pass |
|
225 |
|
226 def get_items_by_id(self, item_ids): |
|
227 pass |
|
228 |
|
229 def purge(self): |
|
230 pass |