Mercurial > libervia-pubsub
comparison idavoll/backend.py @ 15:46cd13c68ac0
Redone memory storage of nodes.
Created a few classes for memory storage of nodes.
Implemented basic subscription.
Implemented item storage.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Sat, 17 Jul 2004 22:05:50 +0000 |
parents | 05a5d412e1b1 |
children | 7937d6fbbe2a |
comparison
equal
deleted
inserted
replaced
14:68e900b46d49 | 15:46cd13c68ac0 |
---|---|
1 from twisted.application import service | 1 from twisted.application import service |
2 from twisted.python import components, failure | 2 from twisted.python import components, failure |
3 from twisted.internet import defer, reactor | 3 from twisted.internet import defer, reactor |
4 from twisted.protocols.jabber import jid | |
4 | 5 |
5 class IBackendService(components.Interface): | 6 class IBackendService(components.Interface): |
6 """ Interface to a backend service of a pubsub service """ | 7 """ Interface to a backend service of a pubsub service """ |
7 | 8 |
8 def do_publish(self, node, publisher, item): | 9 def do_publish(self, node, publisher, item): |
28 | 29 |
29 class NoPayloadAllowed(BackendException): | 30 class NoPayloadAllowed(BackendException): |
30 def __init__(self, msg = 'No payload allowed'): | 31 def __init__(self, msg = 'No payload allowed'): |
31 BackendException.__init__(self, msg) | 32 BackendException.__init__(self, msg) |
32 | 33 |
34 class Subscription: | |
35 def __init__(self, state): | |
36 self.state = state | |
37 | |
38 class NodeConfiguration: | |
39 def __init__(self): | |
40 self.persist_items = False | |
41 self.deliver_payloads = False | |
42 | |
43 class Node: | |
44 def __init__(self, name): | |
45 self.name = name | |
46 self.configuration = NodeConfiguration() | |
47 self.subscriptions = {} | |
48 self.affiliations = {} | |
49 self.items = {} | |
50 | |
33 class MemoryBackendService(service.Service): | 51 class MemoryBackendService(service.Service): |
34 | 52 |
35 __implements__ = IBackendService, | 53 __implements__ = IBackendService, |
36 | 54 |
37 def __init__(self): | 55 def __init__(self): |
38 self.nodes = { | 56 self.nodes = {} |
39 "ralphm/mood/ralphm@ik.nu": { | |
40 "persist_items": True, | |
41 "deliver_payloads": True, | |
42 } | |
43 } | |
44 self.subscribers = { | |
45 "ralphm/mood/ralphm@ik.nu": [ | |
46 "notify@ik.nu/mood_monitor" | |
47 ] | |
48 } | |
49 self.affiliations = { | |
50 "ralphm/mood/ralphm@ik.nu": { | |
51 "ralphm@ik.nu": "owner", | |
52 "ralphm@doe.ik.nu": "publisher" | |
53 } | |
54 } | |
55 | 57 |
56 def do_publish(self, node, publisher, items): | 58 node = Node("ralphm/mood/ralphm@ik.nu") |
59 node.subscriptions["ralphm@doe.ik.nu"] = Subscription("subscribed") | |
60 node.affiliations["ralphm@ik.nu"] = "owner" | |
61 node.affiliations["ralphm@doe.ik.nu"] = "publisher" | |
62 node.configuration.persist_items = True | |
63 node.configuration.deliver_payloads = True | |
64 self.nodes[node.name] = node | |
65 | |
66 def do_publish(self, node_id, publisher, items): | |
57 try: | 67 try: |
58 try: | 68 try: |
59 config = self.nodes[node] | 69 node = self.nodes[node_id] |
60 persist_items = config["persist_items"] | 70 persist_items = node.configuration.persist_items |
61 deliver_payloads = config["deliver_payloads"] | 71 deliver_payloads = node.configuration.deliver_payloads |
62 except KeyError: | 72 except KeyError: |
63 raise NodeNotFound | 73 raise NodeNotFound |
64 | 74 |
65 try: | 75 try: |
66 affiliation = self.affiliations[node][publisher] | 76 if node.affiliations[publisher] not in ['owner', 'publisher']: |
67 if affiliation not in ['owner', 'publisher']: | |
68 raise NotAuthorized | 77 raise NotAuthorized |
69 except KeyError: | 78 except KeyError: |
70 raise NotAuthorized() | 79 raise NotAuthorized() |
71 | 80 |
72 # the following is under the assumption that the publisher | 81 # the following is under the assumption that the publisher |
76 if items and not persist_items and not deliver_payloads: | 85 if items and not persist_items and not deliver_payloads: |
77 raise NoPayloadAllowed | 86 raise NoPayloadAllowed |
78 elif not items and (persist_items or deliver_payloads): | 87 elif not items and (persist_items or deliver_payloads): |
79 raise PayloadExpected | 88 raise PayloadExpected |
80 | 89 |
81 print "publish by %s to %s" % (publisher, node) | 90 print "publish by %s to %s" % (publisher, node_id) |
82 | 91 |
83 if persist_items or deliver_payloads: | 92 if persist_items or deliver_payloads: |
84 for item in items: | 93 for item in items: |
85 if item["id"] is None: | 94 if item["id"] is None: |
86 item["id"] = 'random' | 95 item["id"] = 'random' |
87 | 96 |
88 if persist_items: | 97 if persist_items: |
89 self.storeItems(node, publisher, items) | 98 self.storeItems(node_id, publisher, items) |
90 | 99 |
91 if items and not deliver_payloads: | 100 if items and not deliver_payloads: |
92 for item in items: | 101 for item in items: |
93 item.children = [] | 102 item.children = [] |
94 | 103 |
95 recipients = self.get_subscribers(node) | 104 recipients = self.get_subscribers(node_id) |
96 recipients.addCallback(self.magic_filter, node, items) | 105 recipients.addCallback(self.magic_filter, node_id, items) |
97 recipients.addCallback(self.pubsub_service.do_notification, node) | 106 recipients.addCallback(self.pubsub_service.do_notification, node_id) |
98 | 107 |
99 return defer.succeed(None) | 108 return defer.succeed(None) |
100 except: | 109 except: |
101 f = failure.Failure() | 110 f = failure.Failure() |
102 return defer.fail(f) | 111 return defer.fail(f) |
103 | 112 |
104 def magic_filter(self, subscribers, node, items): | 113 def do_subscribe(self, node_id, subscriber, requestor): |
114 # expect subscriber and requestor to be a jid.JID | |
115 try: | |
116 try: | |
117 node = self.nodes[node_id] | |
118 except KeyError: | |
119 raise NodeNotFound | |
120 | |
121 affiliation = node.affiliations.get(requestor.full(), 'none') | |
122 | |
123 if affiliation == 'banned': | |
124 raise NotAuthorized | |
125 | |
126 print subscriber.full() | |
127 print subscriber.userhostJID().full() | |
128 print requestor.full() | |
129 | |
130 if subscriber.userhostJID() != requestor: | |
131 raise NotAuthorized | |
132 | |
133 try: | |
134 subscription = node.subscriptions[subscriber.full()] | |
135 except KeyError: | |
136 subscription = Subscription('subscribed') | |
137 node.subscriptions[subscriber.full()] = subscription | |
138 | |
139 print node.subscriptions | |
140 | |
141 return defer.succeed({ | |
142 'affiliation': affiliation, | |
143 'node': node_id, | |
144 'jid': subscriber, | |
145 'subscription': subscription.state}) | |
146 except: | |
147 f = failure.Failure() | |
148 return defer.fail(f) | |
149 | |
150 | |
151 def magic_filter(self, subscribers, node_id, items): | |
105 list = {} | 152 list = {} |
106 for subscriber in subscribers: | 153 for subscriber in subscribers: |
107 list[subscriber] = items | 154 list[subscriber] = items |
108 | 155 |
109 return list | 156 return list |
110 | 157 |
111 def get_subscribers(self, node): | 158 def get_subscribers(self, node_id): |
112 d = defer.Deferred() | 159 d = defer.Deferred() |
113 try: | 160 try: |
114 result = self.subscribers[node] | 161 result = self.nodes[node_id].subscriptions.keys() |
115 except: | 162 except: |
116 f = failure.Failure() | 163 f = failure.Failure() |
117 reactor.callLater(0, d.errback, f) | 164 reactor.callLater(0, d.errback, f) |
118 else: | 165 else: |
119 reactor.callLater(0, d.callback, result) | 166 reactor.callLater(0, d.callback, result) |
120 | 167 |
121 return d | 168 return d |
122 | 169 |
123 def storeItems(self, node, publisher, items): | 170 def storeItems(self, node_id, publisher, items): |
124 for item in items: | 171 for item in items: |
125 print "Storing item %s" % item.toXml() | 172 self.nodes[node_id].items[item["id"]] = item |
126 | 173 |
174 print self.nodes[node_id].items |