107
|
1 import copy |
|
2 from zope.interface import implements |
|
3 from twisted.internet import defer |
|
4 from twisted.words.protocols.jabber import jid |
|
5 import storage |
|
6 |
|
7 default_config = {"pubsub#persist_items": False, |
|
8 "pubsub#deliver_payloads": False} |
|
9 |
|
10 class Storage: |
|
11 |
|
12 implements(storage.IStorage) |
|
13 |
|
14 def __init__(self): |
|
15 self._nodes = {} |
|
16 |
|
17 def get_node(self, node_id): |
|
18 try: |
|
19 node = self._nodes[node_id] |
|
20 except KeyError: |
|
21 return defer.fail(storage.NodeNotFound()) |
|
22 |
|
23 return defer.succeed(node) |
|
24 |
|
25 def get_node_ids(self): |
|
26 return defer.succeed(self._nodes.keys()) |
|
27 |
|
28 def create_node(self, node_id, owner, config = None, type='leaf'): |
|
29 if node_id in self._nodes: |
|
30 return defer.fail(storage.NodeExists()) |
|
31 |
|
32 if not config: |
|
33 config = copy.copy(default_config) |
|
34 |
|
35 if type != 'leaf': |
|
36 raise NotImplementedError |
|
37 |
|
38 node = LeafNode(node_id, owner, config) |
|
39 self._nodes[node_id] = node |
|
40 |
|
41 return defer.succeed(None) |
|
42 |
|
43 def delete_node(self, node_id): |
|
44 try: |
|
45 del self._nodes[node_id] |
|
46 except KeyError: |
|
47 return defer.fail(storage.NodeNotFound()) |
|
48 |
|
49 return defer.succeed(None) |
|
50 |
|
51 def get_affiliations(self, entity): |
|
52 entity_full = entity.full() |
|
53 return defer.succeed([(node.id, node._affiliations[entity_full]) |
|
54 for name, node in self._nodes.iteritems() |
|
55 if entity_full in node._affiliations]) |
|
56 |
|
57 def get_subscriptions(self, entity): |
|
58 subscriptions = [] |
|
59 for node in self._nodes.itervalues(): |
|
60 for subscriber, subscription in node._subscriptions.iteritems(): |
|
61 subscriber = jid.JID(subscriber) |
|
62 if subscriber.userhostJID() == entity: |
|
63 subscriptions.append((node.id, subscriber, |
|
64 subscription.state)) |
|
65 |
|
66 return defer.succeed(subscriptions) |
|
67 |
|
68 class Node: |
|
69 |
|
70 implements(storage.INode) |
|
71 |
|
72 def __init__(self, node_id, owner, config): |
|
73 self.id = node_id |
|
74 self._affiliations = {owner.full(): 'owner'} |
|
75 self._subscriptions = {} |
|
76 self._config = config |
|
77 |
|
78 def get_type(self): |
|
79 return self.type |
|
80 |
|
81 def get_configuration(self): |
|
82 return self._config |
|
83 |
|
84 def get_meta_data(self): |
|
85 config = copy.copy(self._config) |
|
86 config["pubsub#node_type"] = self.type |
|
87 return config |
|
88 |
|
89 def set_configuration(self, options): |
|
90 for option in options: |
|
91 if option in self._config: |
|
92 self._config[option] = options[option] |
|
93 |
|
94 return defer.succeed(None) |
|
95 |
|
96 def get_affiliation(self, entity): |
|
97 return defer.succeed(self._affiliations.get(entity.full())) |
|
98 |
|
99 def add_subscription(self, subscriber, state): |
|
100 try: |
|
101 subscription = self._subscriptions[subscriber.full()] |
|
102 except: |
|
103 subscription = Subscription(state) |
|
104 self._subscriptions[subscriber.full()] = subscription |
|
105 |
|
106 return defer.succeed({'node': self.id, |
|
107 'jid': subscriber, |
|
108 'subscription': subscription.state}) |
|
109 |
|
110 def remove_subscription(self, subscriber): |
|
111 del self._subscriptions[subscriber.full()] |
|
112 |
|
113 return defer.succeed(None) |
|
114 |
|
115 def get_subscribers(self): |
|
116 subscribers = [jid.JID(subscriber) for subscriber, subscription |
|
117 in self._subscriptions.iteritems() |
|
118 if subscription.state == 'subscribed'] |
|
119 |
|
120 return defer.succeed(subscribers) |
|
121 |
|
122 def is_subscribed(self, subscriber): |
|
123 try: |
|
124 subscription = self._subscriptions[subscriber.full()] |
|
125 except KeyError: |
|
126 return defer.succeed(False) |
|
127 |
|
128 return defer.succeed(subscription.state == 'subscribed') |
|
129 |
|
130 class LeafNode(Node): |
|
131 |
|
132 implements(storage.ILeafNode) |
|
133 type = 'leaf' |
|
134 |
|
135 def __init__(self, node_id, owner, config): |
|
136 Node.__init__(self, node_id, owner, config) |
|
137 self._items = {} |
|
138 self._itemlist = [] |
|
139 |
|
140 def store_items(self, items, publisher): |
|
141 for data in items: |
|
142 id = data["id"] |
|
143 item = (data.toXml(), publisher) |
|
144 if id in self._items: |
|
145 self._itemlist.remove(self._items[id]) |
|
146 self._items[id] = item |
|
147 self._itemlist.append(item) |
|
148 |
|
149 return defer.succeed(None) |
|
150 |
|
151 def remove_items(self, item_ids): |
|
152 deleted = [] |
|
153 |
|
154 for item_id in item_ids: |
|
155 try: |
|
156 item = self._items[item_id] |
|
157 self._itemlist.remove(item) |
|
158 del self._items[item_id] |
|
159 deleted.append(item_id) |
|
160 except KeyError: |
|
161 pass |
|
162 |
|
163 return defer.succeed(deleted) |
|
164 |
|
165 def get_items(self, max_items=None): |
|
166 if max_items: |
|
167 list = self._itemlist[-max_items:] |
|
168 else: |
|
169 list = self._itemlist |
|
170 return defer.succeed([item[0] for item in list]) |
|
171 |
|
172 def get_items_by_id(self, item_ids): |
|
173 items = [] |
|
174 for item_id in item_ids: |
|
175 try: |
|
176 item = self._items[item_id] |
|
177 except KeyError: |
|
178 pass |
|
179 else: |
|
180 items.append(item[0]) |
|
181 return defer.succeed(items) |
|
182 |
|
183 def purge(self): |
|
184 self._items = {} |
|
185 self._itemlist = [] |
|
186 |
|
187 return defer.succeed(None) |
|
188 |
|
189 class Subscription: |
|
190 |
|
191 implements(storage.ISubscription) |
|
192 |
|
193 def __init__(self, state): |
|
194 self.state = state |