comparison idavoll/test/test_backend.py @ 178:07114105885a

Send last published item on subscription if node is so configured.
author Ralph Meijer <ralphm@ik.nu>
date Thu, 10 Apr 2008 14:02:53 +0000
parents bd86f0c3fd39
children 69cdd8c6a431
comparison
equal deleted inserted replaced
177:faf1c9bc2612 178:07114105885a
13 from wokkel import pubsub 13 from wokkel import pubsub
14 14
15 from idavoll import backend, error 15 from idavoll import backend, error
16 16
17 OWNER = jid.JID('owner@example.com') 17 OWNER = jid.JID('owner@example.com')
18 NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
18 19
19 class BackendTest(unittest.TestCase): 20 class BackendTest(unittest.TestCase):
20 def test_delete_node(self): 21 def test_delete_node(self):
21 class testNode: 22 class testNode:
22 id = 'to-be-deleted' 23 id = 'to-be-deleted'
104 105
105 items = [pubsub.Item()] 106 items = [pubsub.Item()]
106 d = self.backend.publish('node', items, OWNER) 107 d = self.backend.publish('node', items, OWNER)
107 return d 108 return d
108 109
110 def test_notifyOnSubscription(self):
111 """
112 Test notification of last published item on subscription.
113 """
114 ITEM = "<item xmlns='%s' id='1'/>" % NS_PUBSUB
115
116 class testNode:
117 id = 'node'
118 def get_affiliation(self, entity):
119 if entity is OWNER:
120 return defer.succeed('owner')
121 def get_configuration(self):
122 return {'pubsub#deliver_payloads': True,
123 'pubsub#persist_items': False,
124 'pubsub#send_last_published_item': 'on_sub'}
125 def get_items(self, max_items):
126 return [ITEM]
127 def add_subscription(self, subscriber, state):
128 return defer.succeed(None)
129
130 class testStorage:
131 def get_node(self, node_id):
132 return defer.succeed(testNode())
133
134 def cb(data):
135 print [ITEM] == data['items']
136 self.assertEquals('node', data['node_id'])
137 self.assertEquals([ITEM], data['items'])
138 self.assertEquals(OWNER, data['subscriber'])
139
140 self.storage = testStorage()
141 self.backend = backend.BackendService(self.storage)
142 self.storage.backend = self.backend
143
144 d1 = defer.Deferred()
145 d1.addCallback(cb)
146 self.backend.register_notifier(d1.callback)
147 d2 = self.backend.subscribe('node', OWNER, OWNER)
148 return defer.gatherResults([d1, d2])
149
150 test_notifyOnSubscription.timeout = 2
151
109 152
110 class BaseTestBackend(object): 153 class BaseTestBackend(object):
111 """ 154 """
112 Base class for backend stubs. 155 Base class for backend stubs.
113 """ 156 """
127 def register_notifier(self, observerfn, *args, **kwargs): 170 def register_notifier(self, observerfn, *args, **kwargs):
128 return 171 return
129 172
130 def register_pre_delete(self, pre_delete_fn): 173 def register_pre_delete(self, pre_delete_fn):
131 return 174 return
175
132 176
133 class PubSubServiceFromBackendTest(unittest.TestCase): 177 class PubSubServiceFromBackendTest(unittest.TestCase):
134 178
135 def test_unsubscribeNotSubscribed(self): 179 def test_unsubscribeNotSubscribed(self):
136 """ 180 """