comparison sat_pubsub/memory_storage.py @ 232:923281d4c5bc

renamed idavoll directory to sat_pubsub
author Goffi <goffi@goffi.org>
date Thu, 17 May 2012 12:48:14 +0200
parents idavoll/memory_storage.py@b7018ec56ee5
children 564ae55219e1
comparison
equal deleted inserted replaced
231:d99047cd90f9 232:923281d4c5bc
1 # Copyright (c) 2003-2010 Ralph Meijer
2 # See LICENSE for details.
3
4 import copy
5 from zope.interface import implements
6 from twisted.internet import defer
7 from twisted.words.protocols.jabber import jid
8
9 from wokkel.pubsub import Subscription
10
11 from idavoll import error, iidavoll
12
13 class Storage:
14
15 implements(iidavoll.IStorage)
16
17 defaultConfig = {
18 'leaf': {
19 "pubsub#persist_items": True,
20 "pubsub#deliver_payloads": True,
21 "pubsub#send_last_published_item": 'on_sub',
22 },
23 'collection': {
24 "pubsub#deliver_payloads": True,
25 "pubsub#send_last_published_item": 'on_sub',
26 }
27 }
28
29 def __init__(self):
30 rootNode = CollectionNode('', jid.JID('localhost'),
31 copy.copy(self.defaultConfig['collection']))
32 self._nodes = {'': rootNode}
33
34
35 def getNode(self, nodeIdentifier):
36 try:
37 node = self._nodes[nodeIdentifier]
38 except KeyError:
39 return defer.fail(error.NodeNotFound())
40
41 return defer.succeed(node)
42
43
44 def getNodeIds(self):
45 return defer.succeed(self._nodes.keys())
46
47
48 def createNode(self, nodeIdentifier, owner, config):
49 if nodeIdentifier in self._nodes:
50 return defer.fail(error.NodeExists())
51
52 if config['pubsub#node_type'] != 'leaf':
53 raise error.NoCollections()
54
55 node = LeafNode(nodeIdentifier, owner, config)
56 self._nodes[nodeIdentifier] = node
57
58 return defer.succeed(None)
59
60
61 def deleteNode(self, nodeIdentifier):
62 try:
63 del self._nodes[nodeIdentifier]
64 except KeyError:
65 return defer.fail(error.NodeNotFound())
66
67 return defer.succeed(None)
68
69
70 def getAffiliations(self, entity):
71 entity = entity.userhost()
72 return defer.succeed([(node.nodeIdentifier, node._affiliations[entity])
73 for name, node in self._nodes.iteritems()
74 if entity in node._affiliations])
75
76
77 def getSubscriptions(self, entity):
78 subscriptions = []
79 for node in self._nodes.itervalues():
80 for subscriber, subscription in node._subscriptions.iteritems():
81 subscriber = jid.internJID(subscriber)
82 if subscriber.userhostJID() == entity.userhostJID():
83 subscriptions.append(subscription)
84
85 return defer.succeed(subscriptions)
86
87
88 def getDefaultConfiguration(self, nodeType):
89 if nodeType == 'collection':
90 raise error.NoCollections()
91
92 return self.defaultConfig[nodeType]
93
94
95 class Node:
96
97 implements(iidavoll.INode)
98
99 def __init__(self, nodeIdentifier, owner, config):
100 self.nodeIdentifier = nodeIdentifier
101 self._affiliations = {owner.userhost(): 'owner'}
102 self._subscriptions = {}
103 self._config = copy.copy(config)
104
105
106 def getType(self):
107 return self.nodeType
108
109
110 def getConfiguration(self):
111 return self._config
112
113
114 def getMetaData(self):
115 config = copy.copy(self._config)
116 config["pubsub#node_type"] = self.nodeType
117 return config
118
119
120 def setConfiguration(self, options):
121 for option in options:
122 if option in self._config:
123 self._config[option] = options[option]
124
125 return defer.succeed(None)
126
127
128 def getAffiliation(self, entity):
129 return defer.succeed(self._affiliations.get(entity.userhost()))
130
131
132 def getSubscription(self, subscriber):
133 try:
134 subscription = self._subscriptions[subscriber.full()]
135 except KeyError:
136 return defer.succeed(None)
137 else:
138 return defer.succeed(subscription)
139
140
141 def getSubscriptions(self, state=None):
142 return defer.succeed(
143 [subscription
144 for subscription in self._subscriptions.itervalues()
145 if state is None or subscription.state == state])
146
147
148
149 def addSubscription(self, subscriber, state, options):
150 if self._subscriptions.get(subscriber.full()):
151 return defer.fail(error.SubscriptionExists())
152
153 subscription = Subscription(self.nodeIdentifier, subscriber, state,
154 options)
155 self._subscriptions[subscriber.full()] = subscription
156 return defer.succeed(None)
157
158
159 def removeSubscription(self, subscriber):
160 try:
161 del self._subscriptions[subscriber.full()]
162 except KeyError:
163 return defer.fail(error.NotSubscribed())
164
165 return defer.succeed(None)
166
167
168 def isSubscribed(self, entity):
169 for subscriber, subscription in self._subscriptions.iteritems():
170 if jid.internJID(subscriber).userhost() == entity.userhost() and \
171 subscription.state == 'subscribed':
172 return defer.succeed(True)
173
174 return defer.succeed(False)
175
176
177 def getAffiliations(self):
178 affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation
179 in self._affiliations.iteritems()]
180
181 return defer.succeed(affiliations)
182
183
184
185 class PublishedItem(object):
186 """
187 A published item.
188
189 This represent an item as it was published by an entity.
190
191 @ivar element: The DOM representation of the item that was published.
192 @type element: L{Element<twisted.words.xish.domish.Element>}
193 @ivar publisher: The entity that published the item.
194 @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>}
195 """
196
197 def __init__(self, element, publisher):
198 self.element = element
199 self.publisher = publisher
200
201
202
203 class LeafNode(Node):
204
205 implements(iidavoll.ILeafNode)
206
207 nodeType = 'leaf'
208
209 def __init__(self, nodeIdentifier, owner, config):
210 Node.__init__(self, nodeIdentifier, owner, config)
211 self._items = {}
212 self._itemlist = []
213
214
215 def storeItems(self, items, publisher):
216 for element in items:
217 item = PublishedItem(element, publisher)
218 itemIdentifier = element["id"]
219 if itemIdentifier in self._items:
220 self._itemlist.remove(self._items[itemIdentifier])
221 self._items[itemIdentifier] = item
222 self._itemlist.append(item)
223
224 return defer.succeed(None)
225
226
227 def removeItems(self, itemIdentifiers):
228 deleted = []
229
230 for itemIdentifier in itemIdentifiers:
231 try:
232 item = self._items[itemIdentifier]
233 except KeyError:
234 pass
235 else:
236 self._itemlist.remove(item)
237 del self._items[itemIdentifier]
238 deleted.append(itemIdentifier)
239
240 return defer.succeed(deleted)
241
242
243 def getItems(self, maxItems=None):
244 if maxItems:
245 itemList = self._itemlist[-maxItems:]
246 else:
247 itemList = self._itemlist
248 return defer.succeed([item.element for item in itemList])
249
250
251 def getItemsById(self, itemIdentifiers):
252 items = []
253 for itemIdentifier in itemIdentifiers:
254 try:
255 item = self._items[itemIdentifier]
256 except KeyError:
257 pass
258 else:
259 items.append(item.element)
260 return defer.succeed(items)
261
262
263 def purge(self):
264 self._items = {}
265 self._itemlist = []
266
267 return defer.succeed(None)
268
269
270 class CollectionNode(Node):
271 nodeType = 'collection'
272
273
274
275 class GatewayStorage(object):
276 """
277 Memory based storage facility for the XMPP-HTTP gateway.
278 """
279
280 def __init__(self):
281 self.callbacks = {}
282
283
284 def addCallback(self, service, nodeIdentifier, callback):
285 try:
286 callbacks = self.callbacks[service, nodeIdentifier]
287 except KeyError:
288 callbacks = set([callback])
289 self.callbacks[service, nodeIdentifier] = callbacks
290 else:
291 callbacks.add(callback)
292 pass
293
294 return defer.succeed(None)
295
296
297 def removeCallback(self, service, nodeIdentifier, callback):
298 try:
299 callbacks = self.callbacks[service, nodeIdentifier]
300 callbacks.remove(callback)
301 except KeyError:
302 return defer.fail(error.NotSubscribed())
303 else:
304 if not callbacks:
305 del self.callbacks[service, nodeIdentifier]
306
307 return defer.succeed(not callbacks)
308
309
310 def getCallbacks(self, service, nodeIdentifier):
311 try:
312 callbacks = self.callbacks[service, nodeIdentifier]
313 except KeyError:
314 return defer.fail(error.NoCallbacks())
315 else:
316 return defer.succeed(callbacks)
317
318
319 def hasCallbacks(self, service, nodeIdentifier):
320 return defer.succeed((service, nodeIdentifier) in self.callbacks)