comparison src/memory_storage.py @ 369:dabee42494ac

config file + cleaning: - SàT Pubsub can now be configured using the same config file as SàT itself (i.e. sat.conf or .sat.conf), in the same locations (/etc, local dir, xdg dir). Its options must be in the "pubsub" section - options on command line override config options - removed tap and http files which are not used anymore - changed directory structure to put source in src, to be coherent with SàT and Libervia - changed options name, db* become db_*, secret become xmpp_pwd - an exception is raised if jid or xmpp_pwd is are not configured
author Goffi <goffi@goffi.org>
date Fri, 02 Mar 2018 12:59:38 +0100
parents sat_pubsub/memory_storage.py@618a92080812
children aa3a464df605
comparison
equal deleted inserted replaced
368:618a92080812 369:dabee42494ac
1 #!/usr/bin/python
2 #-*- coding: utf-8 -*-
3
4 # Copyright (c) 2003-2011 Ralph Meijer
5 # Copyright (c) 2012-2018 Jérôme Poisson
6
7
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
17
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 # --
21
22 # This program is based on Idavoll (http://idavoll.ik.nu/),
23 # originaly written by Ralph Meijer (http://ralphm.net/blog/)
24 # It is sublicensed under AGPL v3 (or any later version) as allowed by the original
25 # license.
26
27 # --
28
29 # Here is a copy of the original license:
30
31 # Copyright (c) 2003-2011 Ralph Meijer
32
33 # Permission is hereby granted, free of charge, to any person obtaining
34 # a copy of this software and associated documentation files (the
35 # "Software"), to deal in the Software without restriction, including
36 # without limitation the rights to use, copy, modify, merge, publish,
37 # distribute, sublicense, and/or sell copies of the Software, and to
38 # permit persons to whom the Software is furnished to do so, subject to
39 # the following conditions:
40
41 # The above copyright notice and this permission notice shall be
42 # included in all copies or substantial portions of the Software.
43
44 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
45 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
46 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
47 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
48 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
49 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
50 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
51
52
53 import copy
54 from zope.interface import implements
55 from twisted.internet import defer
56 from twisted.words.protocols.jabber import jid
57
58 from wokkel.pubsub import Subscription
59
60 from sat_pubsub import error, iidavoll
61
62 class Storage:
63
64 implements(iidavoll.IStorage)
65
66 defaultConfig = {
67 'leaf': {
68 "pubsub#persist_items": True,
69 "pubsub#deliver_payloads": True,
70 "pubsub#send_last_published_item": 'on_sub',
71 },
72 'collection': {
73 "pubsub#deliver_payloads": True,
74 "pubsub#send_last_published_item": 'on_sub',
75 }
76 }
77
78 def __init__(self):
79 rootNode = CollectionNode('', jid.JID('localhost'),
80 copy.copy(self.defaultConfig['collection']))
81 self._nodes = {'': rootNode}
82
83
84 def getNode(self, nodeIdentifier):
85 try:
86 node = self._nodes[nodeIdentifier]
87 except KeyError:
88 return defer.fail(error.NodeNotFound())
89
90 return defer.succeed(node)
91
92
93 def getNodeIds(self):
94 return defer.succeed(self._nodes.keys())
95
96
97 def createNode(self, nodeIdentifier, owner, config):
98 if nodeIdentifier in self._nodes:
99 return defer.fail(error.NodeExists())
100
101 if config['pubsub#node_type'] != 'leaf':
102 raise error.NoCollections()
103
104 node = LeafNode(nodeIdentifier, owner, config)
105 self._nodes[nodeIdentifier] = node
106
107 return defer.succeed(None)
108
109
110 def deleteNode(self, nodeIdentifier):
111 try:
112 del self._nodes[nodeIdentifier]
113 except KeyError:
114 return defer.fail(error.NodeNotFound())
115
116 return defer.succeed(None)
117
118
119 def getAffiliations(self, entity):
120 entity = entity.userhost()
121 return defer.succeed([(node.nodeIdentifier, node._affiliations[entity])
122 for name, node in self._nodes.iteritems()
123 if entity in node._affiliations])
124
125
126 def getSubscriptions(self, entity):
127 subscriptions = []
128 for node in self._nodes.itervalues():
129 for subscriber, subscription in node._subscriptions.iteritems():
130 subscriber = jid.internJID(subscriber)
131 if subscriber.userhostJID() == entity.userhostJID():
132 subscriptions.append(subscription)
133
134 return defer.succeed(subscriptions)
135
136
137 def getDefaultConfiguration(self, nodeType):
138 if nodeType == 'collection':
139 raise error.NoCollections()
140
141 return self.defaultConfig[nodeType]
142
143
144 class Node:
145
146 implements(iidavoll.INode)
147
148 def __init__(self, nodeIdentifier, owner, config):
149 self.nodeIdentifier = nodeIdentifier
150 self._affiliations = {owner.userhost(): 'owner'}
151 self._subscriptions = {}
152 self._config = copy.copy(config)
153
154
155 def getType(self):
156 return self.nodeType
157
158
159 def getConfiguration(self):
160 return self._config
161
162
163 def getMetaData(self):
164 config = copy.copy(self._config)
165 config["pubsub#node_type"] = self.nodeType
166 return config
167
168
169 def setConfiguration(self, options):
170 for option in options:
171 if option in self._config:
172 self._config[option] = options[option]
173
174 return defer.succeed(None)
175
176
177 def getAffiliation(self, entity):
178 return defer.succeed(self._affiliations.get(entity.userhost()))
179
180
181 def getSubscription(self, subscriber):
182 try:
183 subscription = self._subscriptions[subscriber.full()]
184 except KeyError:
185 return defer.succeed(None)
186 else:
187 return defer.succeed(subscription)
188
189
190 def getSubscriptions(self, state=None):
191 return defer.succeed(
192 [subscription
193 for subscription in self._subscriptions.itervalues()
194 if state is None or subscription.state == state])
195
196
197
198 def addSubscription(self, subscriber, state, options):
199 if self._subscriptions.get(subscriber.full()):
200 return defer.fail(error.SubscriptionExists())
201
202 subscription = Subscription(self.nodeIdentifier, subscriber, state,
203 options)
204 self._subscriptions[subscriber.full()] = subscription
205 return defer.succeed(None)
206
207
208 def removeSubscription(self, subscriber):
209 try:
210 del self._subscriptions[subscriber.full()]
211 except KeyError:
212 return defer.fail(error.NotSubscribed())
213
214 return defer.succeed(None)
215
216
217 def isSubscribed(self, entity):
218 for subscriber, subscription in self._subscriptions.iteritems():
219 if jid.internJID(subscriber).userhost() == entity.userhost() and \
220 subscription.state == 'subscribed':
221 return defer.succeed(True)
222
223 return defer.succeed(False)
224
225
226 def getAffiliations(self):
227 affiliations = [(jid.internJID(entity), affiliation) for entity, affiliation
228 in self._affiliations.iteritems()]
229
230 return defer.succeed(affiliations)
231
232
233
234 class PublishedItem(object):
235 """
236 A published item.
237
238 This represent an item as it was published by an entity.
239
240 @ivar element: The DOM representation of the item that was published.
241 @type element: L{Element<twisted.words.xish.domish.Element>}
242 @ivar publisher: The entity that published the item.
243 @type publisher: L{JID<twisted.words.protocols.jabber.jid.JID>}
244 """
245
246 def __init__(self, element, publisher):
247 self.element = element
248 self.publisher = publisher
249
250
251
252 class LeafNode(Node):
253
254 implements(iidavoll.ILeafNode)
255
256 nodeType = 'leaf'
257
258 def __init__(self, nodeIdentifier, owner, config):
259 Node.__init__(self, nodeIdentifier, owner, config)
260 self._items = {}
261 self._itemlist = []
262
263
264 def storeItems(self, item_data, publisher):
265 for access_model, item_config, element in item_data:
266 item = PublishedItem(element, publisher)
267 itemIdentifier = element["id"]
268 if itemIdentifier in self._items:
269 self._itemlist.remove(self._items[itemIdentifier])
270 self._items[itemIdentifier] = item
271 self._itemlist.append(item)
272
273 return defer.succeed(None)
274
275
276 def removeItems(self, itemIdentifiers):
277 deleted = []
278
279 for itemIdentifier in itemIdentifiers:
280 try:
281 item = self._items[itemIdentifier]
282 except KeyError:
283 pass
284 else:
285 self._itemlist.remove(item)
286 del self._items[itemIdentifier]
287 deleted.append(itemIdentifier)
288
289 return defer.succeed(deleted)
290
291
292 def getItems(self, authorized_groups, unrestricted, maxItems=None):
293 if maxItems is not None:
294 itemList = self._itemlist[-maxItems:]
295 else:
296 itemList = self._itemlist
297 return defer.succeed([item.element for item in itemList])
298
299
300 def getItemsById(self, authorized_groups, unrestricted, itemIdentifiers):
301 items = []
302 for itemIdentifier in itemIdentifiers:
303 try:
304 item = self._items[itemIdentifier]
305 except KeyError:
306 pass
307 else:
308 items.append(item.element)
309 return defer.succeed(items)
310
311
312 def purge(self):
313 self._items = {}
314 self._itemlist = []
315
316 return defer.succeed(None)
317
318
319 def filterItemsWithPublisher(self, itemIdentifiers, requestor):
320 filteredItems = []
321 for itemIdentifier in itemIdentifiers:
322 try:
323 if self._items[itemIdentifier].publisher.userhost() == requestor.userhost():
324 filteredItems.append(self.items[itemIdentifier])
325 except KeyError, AttributeError:
326 pass
327 return defer.succeed(filteredItems)
328
329
330 class CollectionNode(Node):
331 nodeType = 'collection'
332
333
334
335 class GatewayStorage(object):
336 """
337 Memory based storage facility for the XMPP-HTTP gateway.
338 """
339
340 def __init__(self):
341 self.callbacks = {}
342
343
344 def addCallback(self, service, nodeIdentifier, callback):
345 try:
346 callbacks = self.callbacks[service, nodeIdentifier]
347 except KeyError:
348 callbacks = {callback}
349 self.callbacks[service, nodeIdentifier] = callbacks
350 else:
351 callbacks.add(callback)
352 pass
353
354 return defer.succeed(None)
355
356
357 def removeCallback(self, service, nodeIdentifier, callback):
358 try:
359 callbacks = self.callbacks[service, nodeIdentifier]
360 callbacks.remove(callback)
361 except KeyError:
362 return defer.fail(error.NotSubscribed())
363 else:
364 if not callbacks:
365 del self.callbacks[service, nodeIdentifier]
366
367 return defer.succeed(not callbacks)
368
369
370 def getCallbacks(self, service, nodeIdentifier):
371 try:
372 callbacks = self.callbacks[service, nodeIdentifier]
373 except KeyError:
374 return defer.fail(error.NoCallbacks())
375 else:
376 return defer.succeed(callbacks)
377
378
379 def hasCallbacks(self, service, nodeIdentifier):
380 return defer.succeed((service, nodeIdentifier) in self.callbacks)