comparison src/plugins/plugin_xep_0060.py @ 1420:7c0acb966fd6

plugins groupblog, xep-0060: first pass of simplification
author Goffi <goffi@goffi.org>
date Wed, 22 Apr 2015 20:21:55 +0200
parents 069ad98b360d
children e8c8e467964b
comparison
equal deleted inserted replaced
1419:be2df1ddea8e 1420:7c0acb966fd6
56 56
57 def __init__(self, host): 57 def __init__(self, host):
58 log.info(_(u"PubSub plugin initialization")) 58 log.info(_(u"PubSub plugin initialization"))
59 self.host = host 59 self.host = host
60 self.managedNodes = [] 60 self.managedNodes = []
61 self.clients = {}
62 self.node_cache = Sessions(timeout=60, resettable_timeout=False) 61 self.node_cache = Sessions(timeout=60, resettable_timeout=False)
63 62
64 def getHandler(self, profile): 63 def getHandler(self, profile):
65 self.clients[profile] = SatPubSubClient(self.host, self) 64 client = self.host.getClient(profile)
66 return self.clients[profile] 65 client.pubsub_client = SatPubSubClient(self.host, self)
67 66 return client.pubsub_client
68 def profileDisconnected(self, profile):
69 try:
70 del self.clients[profile]
71 except KeyError:
72 pass
73 67
74 def addManagedNode(self, node_name, callback): 68 def addManagedNode(self, node_name, callback):
75 """Add a handler for a namespace 69 """Add a handler for a namespace
70
76 @param namespace: NS of the handler (will appear in disco info) 71 @param namespace: NS of the handler (will appear in disco info)
77 @param callback: method to call when the handler is found 72 @param callback: method to call when the handler is found
78 @param profile: profile which manage this handler""" 73 @param profile: profile which manage this handler"""
79 self.managedNodes.append((node_name, callback)) 74 self.managedNodes.append((node_name, callback))
80
81 def __getClientNProfile(self, profile_key, action='do pusbsub'):
82 """Return a tuple of (client, profile)
83 raise error when the profile doesn't exists
84 @param profile_key: as usual :)
85 @param action: text of action to show in case of error"""
86 profile = self.host.memory.getProfileName(profile_key)
87 if not profile:
88 err_mess = _('Trying to %(action)s with an unknown profile key [%(profile_key)s]') % {
89 'action': action,
90 'profile_key': profile_key}
91 log.error(err_mess)
92 raise Exception(err_mess)
93 try:
94 client = self.clients[profile]
95 except KeyError:
96 err_mess = _('INTERNAL ERROR: no handler for required profile')
97 log.error(err_mess)
98 raise Exception(err_mess)
99 return profile, client
100 75
101 def _getDeferredNodeCache(self, session_id, init, profile): 76 def _getDeferredNodeCache(self, session_id, init, profile):
102 """Manage a node cache with deferred initialisation and concurrent access. 77 """Manage a node cache with deferred initialisation and concurrent access.
103 78
104 @param session_id (string): node cache session ID 79 @param session_id (string): node cache session ID
144 119
145 @param service (JID): target service 120 @param service (JID): target service
146 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) 121 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
147 @param filter_ (str): filter the result according to the given subscription type: 122 @param filter_ (str): filter the result according to the given subscription type:
148 - None: do not filter 123 - None: do not filter
149 - 'pending': subscription has been approved yet by the node owner 124 - 'pending': subscription has not been approved yet by the node owner
150 - 'unconfigured': subscription options have not been configured yet 125 - 'unconfigured': subscription options have not been configured yet
151 - 'subscribed': subscription is complete 126 - 'subscribed': subscription is complete
152 @param profile (str): %(doc_profile)s 127 @param profile (str): %(doc_profile)s
153 @return: Deferred list[str] 128 @return: Deferred list[str]
154 """ 129 """
156 d = self.subscriptions(service, nodeIdentifier, profile_key=profile) 131 d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
157 d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) 132 d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
158 return self._getDeferredNodeCache(session_id, d, profile) 133 return self._getDeferredNodeCache(session_id, d, profile)
159 134
160 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): 135 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
161 profile, client = self.__getClientNProfile(profile_key, 'publish item') 136 client = self.host.getClient(profile_key)
162 return client.publish(service, nodeIdentifier, items, client.parent.jid) 137 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid)
163 138
164 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): 139 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
165 """Retrieve pubsub items from a node. 140 """Retrieve pubsub items from a node.
166 141
167 @param service (JID): target service. 142 @param service (JID): target service.
173 @param profile_key (str): %(doc_profile_key)s 148 @param profile_key (str): %(doc_profile_key)s
174 @return: a deferred couple (list[dict], dict) containing: 149 @return: a deferred couple (list[dict], dict) containing:
175 - list of items 150 - list of items
176 - RSM response data 151 - RSM response data
177 """ 152 """
178 profile, client = self.__getClientNProfile(profile_key, 'get items') 153 client = self.host.getClient(profile_key)
179 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None 154 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None
180 d = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid, ext_data) 155 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data)
181 d.addCallback(lambda items: (items, client.getRSMResponse(ext_data['id']) if rsm else {})) 156 d.addCallback(lambda items: (items, client.pubsub_client.getRSMResponse(ext_data['id']) if rsm else {}))
182 return d 157 return d
183 158
184 @defer.inlineCallbacks 159 @defer.inlineCallbacks
185 def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): 160 def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
186 """Massively retrieve pubsub items from many nodes. 161 """Massively retrieve pubsub items from many nodes.
195 - key: a value in (a subset of) data.keys() 170 - key: a value in (a subset of) data.keys()
196 - couple (list[dict], dict) containing: 171 - couple (list[dict], dict) containing:
197 - list of items 172 - list of items
198 - RSM response data 173 - RSM response data
199 """ 174 """
200 profile, client = self.__getClientNProfile(profile_key, 'get items') 175 client = self.host.getClient(profile_key)
201 found_nodes = yield self.listNodes(service, profile=profile) 176 found_nodes = yield self.listNodes(service, profile=client.profile)
202 d_dict = {} 177 d_dict = {}
203 for publisher, node in data.items(): 178 for publisher, node in data.items():
204 if node not in found_nodes: 179 if node not in found_nodes:
205 log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) 180 log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
206 continue # avoid pubsub "item-not-found" error 181 continue # avoid pubsub "item-not-found" error
207 d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, profile) 182 d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile)
208 defer.returnValue(d_dict) 183 defer.returnValue(d_dict)
209 184
210 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 185 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
211 profile, client = self.__getClientNProfile(profile_key, 'get options') 186 client = self.host.getClient(profile_key)
212 return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) 187 return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier)
213 188
214 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 189 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
215 profile, client = self.__getClientNProfile(profile_key, 'set options') 190 client = self.host.getClient(profile_key)
216 return client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) 191 return client.pubsub_client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier)
217 192
218 def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): 193 def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
219 profile, client = self.__getClientNProfile(profile_key, 'create node') 194 client = self.host.getClient(profile_key)
220 return client.createNode(service, nodeIdentifier, options) 195 return client.pubsub_client.createNode(service, nodeIdentifier, options)
221 196
222 def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): 197 def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
223 profile, client = self.__getClientNProfile(profile_key, 'delete node') 198 client = self.host.getClient(profile_key)
224 return client.deleteNode(service, nodeIdentifier) 199 return client.pubsub_client.deleteNode(service, nodeIdentifier)
225 200
226 def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE): 201 def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE):
227 profile, client = self.__getClientNProfile(profile_key, 'retract items') 202 client = self.host.getClient(profile_key)
228 return client.retractItems(service, nodeIdentifier, itemIdentifiers) 203 return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers)
229 204
230 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): 205 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
231 profile, client = self.__getClientNProfile(profile_key, 'subscribe node') 206 client = self.host.getClient(profile_key)
232 return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options) 207 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)
233 208
234 @defer.inlineCallbacks 209 @defer.inlineCallbacks
235 def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): 210 def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
236 """Massively subscribe to many nodes. 211 """Massively subscribe to many nodes.
237 212
240 @param sub_id (str): optional subscription identifier. 215 @param sub_id (str): optional subscription identifier.
241 @param options (list): optional list of subscription options 216 @param options (list): optional list of subscription options
242 @param profile_key (str): %(doc_profile_key)s 217 @param profile_key (str): %(doc_profile_key)s
243 @return: list of Deferred instances. 218 @return: list of Deferred instances.
244 """ 219 """
245 profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes') 220 client = self.host.getClient(profile_key)
246 found_nodes = yield self.listNodes(service, profile=profile) 221 found_nodes = yield self.listNodes(service, profile=client.profile)
247 subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile) 222 subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
248 d_list = [] 223 d_list = []
249 for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): 224 for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
250 if nodeIdentifier not in found_nodes: 225 if nodeIdentifier not in found_nodes:
251 log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) 226 log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
252 continue # avoid sat-pubsub "SubscriptionExists" error 227 continue # avoid sat-pubsub "SubscriptionExists" error
253 d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)) 228 d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
254 defer.returnValue(d_list) 229 defer.returnValue(d_list)
255 230
256 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): 231 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
257 profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions') 232 client = self.host.getClient(profile_key)
258 return client.subscriptions(service, nodeIdentifier) 233 return client.pubsub_client.subscriptions(service, nodeIdentifier)
259 234
260 235
261 class SatPubSubClient(rsm.PubSubClient): 236 class SatPubSubClient(rsm.PubSubClient):
262 implements(disco.IDisco) 237 implements(disco.IDisco)
263 238