Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0060.py @ 1420:7c0acb966fd6
plugins groupblog, xep-0060: first pass of simplification
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Apr 2015 20:21:55 +0200 |
parents | 069ad98b360d |
children | e8c8e467964b |
comparison
equal
deleted
inserted
replaced
1419:be2df1ddea8e | 1420:7c0acb966fd6 |
---|---|
56 | 56 |
57 def __init__(self, host): | 57 def __init__(self, host): |
58 log.info(_(u"PubSub plugin initialization")) | 58 log.info(_(u"PubSub plugin initialization")) |
59 self.host = host | 59 self.host = host |
60 self.managedNodes = [] | 60 self.managedNodes = [] |
61 self.clients = {} | |
62 self.node_cache = Sessions(timeout=60, resettable_timeout=False) | 61 self.node_cache = Sessions(timeout=60, resettable_timeout=False) |
63 | 62 |
64 def getHandler(self, profile): | 63 def getHandler(self, profile): |
65 self.clients[profile] = SatPubSubClient(self.host, self) | 64 client = self.host.getClient(profile) |
66 return self.clients[profile] | 65 client.pubsub_client = SatPubSubClient(self.host, self) |
67 | 66 return client.pubsub_client |
68 def profileDisconnected(self, profile): | |
69 try: | |
70 del self.clients[profile] | |
71 except KeyError: | |
72 pass | |
73 | 67 |
74 def addManagedNode(self, node_name, callback): | 68 def addManagedNode(self, node_name, callback): |
75 """Add a handler for a namespace | 69 """Add a handler for a namespace |
70 | |
76 @param namespace: NS of the handler (will appear in disco info) | 71 @param namespace: NS of the handler (will appear in disco info) |
77 @param callback: method to call when the handler is found | 72 @param callback: method to call when the handler is found |
78 @param profile: profile which manage this handler""" | 73 @param profile: profile which manage this handler""" |
79 self.managedNodes.append((node_name, callback)) | 74 self.managedNodes.append((node_name, callback)) |
80 | |
81 def __getClientNProfile(self, profile_key, action='do pusbsub'): | |
82 """Return a tuple of (client, profile) | |
83 raise error when the profile doesn't exists | |
84 @param profile_key: as usual :) | |
85 @param action: text of action to show in case of error""" | |
86 profile = self.host.memory.getProfileName(profile_key) | |
87 if not profile: | |
88 err_mess = _('Trying to %(action)s with an unknown profile key [%(profile_key)s]') % { | |
89 'action': action, | |
90 'profile_key': profile_key} | |
91 log.error(err_mess) | |
92 raise Exception(err_mess) | |
93 try: | |
94 client = self.clients[profile] | |
95 except KeyError: | |
96 err_mess = _('INTERNAL ERROR: no handler for required profile') | |
97 log.error(err_mess) | |
98 raise Exception(err_mess) | |
99 return profile, client | |
100 | 75 |
101 def _getDeferredNodeCache(self, session_id, init, profile): | 76 def _getDeferredNodeCache(self, session_id, init, profile): |
102 """Manage a node cache with deferred initialisation and concurrent access. | 77 """Manage a node cache with deferred initialisation and concurrent access. |
103 | 78 |
104 @param session_id (string): node cache session ID | 79 @param session_id (string): node cache session ID |
144 | 119 |
145 @param service (JID): target service | 120 @param service (JID): target service |
146 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) | 121 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) |
147 @param filter_ (str): filter the result according to the given subscription type: | 122 @param filter_ (str): filter the result according to the given subscription type: |
148 - None: do not filter | 123 - None: do not filter |
149 - 'pending': subscription has been approved yet by the node owner | 124 - 'pending': subscription has not been approved yet by the node owner |
150 - 'unconfigured': subscription options have not been configured yet | 125 - 'unconfigured': subscription options have not been configured yet |
151 - 'subscribed': subscription is complete | 126 - 'subscribed': subscription is complete |
152 @param profile (str): %(doc_profile)s | 127 @param profile (str): %(doc_profile)s |
153 @return: Deferred list[str] | 128 @return: Deferred list[str] |
154 """ | 129 """ |
156 d = self.subscriptions(service, nodeIdentifier, profile_key=profile) | 131 d = self.subscriptions(service, nodeIdentifier, profile_key=profile) |
157 d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) | 132 d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) |
158 return self._getDeferredNodeCache(session_id, d, profile) | 133 return self._getDeferredNodeCache(session_id, d, profile) |
159 | 134 |
160 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): | 135 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): |
161 profile, client = self.__getClientNProfile(profile_key, 'publish item') | 136 client = self.host.getClient(profile_key) |
162 return client.publish(service, nodeIdentifier, items, client.parent.jid) | 137 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) |
163 | 138 |
164 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): | 139 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): |
165 """Retrieve pubsub items from a node. | 140 """Retrieve pubsub items from a node. |
166 | 141 |
167 @param service (JID): target service. | 142 @param service (JID): target service. |
173 @param profile_key (str): %(doc_profile_key)s | 148 @param profile_key (str): %(doc_profile_key)s |
174 @return: a deferred couple (list[dict], dict) containing: | 149 @return: a deferred couple (list[dict], dict) containing: |
175 - list of items | 150 - list of items |
176 - RSM response data | 151 - RSM response data |
177 """ | 152 """ |
178 profile, client = self.__getClientNProfile(profile_key, 'get items') | 153 client = self.host.getClient(profile_key) |
179 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None | 154 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None |
180 d = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid, ext_data) | 155 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data) |
181 d.addCallback(lambda items: (items, client.getRSMResponse(ext_data['id']) if rsm else {})) | 156 d.addCallback(lambda items: (items, client.pubsub_client.getRSMResponse(ext_data['id']) if rsm else {})) |
182 return d | 157 return d |
183 | 158 |
184 @defer.inlineCallbacks | 159 @defer.inlineCallbacks |
185 def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): | 160 def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): |
186 """Massively retrieve pubsub items from many nodes. | 161 """Massively retrieve pubsub items from many nodes. |
195 - key: a value in (a subset of) data.keys() | 170 - key: a value in (a subset of) data.keys() |
196 - couple (list[dict], dict) containing: | 171 - couple (list[dict], dict) containing: |
197 - list of items | 172 - list of items |
198 - RSM response data | 173 - RSM response data |
199 """ | 174 """ |
200 profile, client = self.__getClientNProfile(profile_key, 'get items') | 175 client = self.host.getClient(profile_key) |
201 found_nodes = yield self.listNodes(service, profile=profile) | 176 found_nodes = yield self.listNodes(service, profile=client.profile) |
202 d_dict = {} | 177 d_dict = {} |
203 for publisher, node in data.items(): | 178 for publisher, node in data.items(): |
204 if node not in found_nodes: | 179 if node not in found_nodes: |
205 log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) | 180 log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) |
206 continue # avoid pubsub "item-not-found" error | 181 continue # avoid pubsub "item-not-found" error |
207 d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, profile) | 182 d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) |
208 defer.returnValue(d_dict) | 183 defer.returnValue(d_dict) |
209 | 184 |
210 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 185 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): |
211 profile, client = self.__getClientNProfile(profile_key, 'get options') | 186 client = self.host.getClient(profile_key) |
212 return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) | 187 return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) |
213 | 188 |
214 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 189 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): |
215 profile, client = self.__getClientNProfile(profile_key, 'set options') | 190 client = self.host.getClient(profile_key) |
216 return client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) | 191 return client.pubsub_client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) |
217 | 192 |
218 def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): | 193 def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): |
219 profile, client = self.__getClientNProfile(profile_key, 'create node') | 194 client = self.host.getClient(profile_key) |
220 return client.createNode(service, nodeIdentifier, options) | 195 return client.pubsub_client.createNode(service, nodeIdentifier, options) |
221 | 196 |
222 def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): | 197 def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): |
223 profile, client = self.__getClientNProfile(profile_key, 'delete node') | 198 client = self.host.getClient(profile_key) |
224 return client.deleteNode(service, nodeIdentifier) | 199 return client.pubsub_client.deleteNode(service, nodeIdentifier) |
225 | 200 |
226 def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE): | 201 def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE): |
227 profile, client = self.__getClientNProfile(profile_key, 'retract items') | 202 client = self.host.getClient(profile_key) |
228 return client.retractItems(service, nodeIdentifier, itemIdentifiers) | 203 return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers) |
229 | 204 |
230 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): | 205 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): |
231 profile, client = self.__getClientNProfile(profile_key, 'subscribe node') | 206 client = self.host.getClient(profile_key) |
232 return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options) | 207 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) |
233 | 208 |
234 @defer.inlineCallbacks | 209 @defer.inlineCallbacks |
235 def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): | 210 def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): |
236 """Massively subscribe to many nodes. | 211 """Massively subscribe to many nodes. |
237 | 212 |
240 @param sub_id (str): optional subscription identifier. | 215 @param sub_id (str): optional subscription identifier. |
241 @param options (list): optional list of subscription options | 216 @param options (list): optional list of subscription options |
242 @param profile_key (str): %(doc_profile_key)s | 217 @param profile_key (str): %(doc_profile_key)s |
243 @return: list of Deferred instances. | 218 @return: list of Deferred instances. |
244 """ | 219 """ |
245 profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes') | 220 client = self.host.getClient(profile_key) |
246 found_nodes = yield self.listNodes(service, profile=profile) | 221 found_nodes = yield self.listNodes(service, profile=client.profile) |
247 subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile) | 222 subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) |
248 d_list = [] | 223 d_list = [] |
249 for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): | 224 for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): |
250 if nodeIdentifier not in found_nodes: | 225 if nodeIdentifier not in found_nodes: |
251 log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) | 226 log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) |
252 continue # avoid sat-pubsub "SubscriptionExists" error | 227 continue # avoid sat-pubsub "SubscriptionExists" error |
253 d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)) | 228 d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) |
254 defer.returnValue(d_list) | 229 defer.returnValue(d_list) |
255 | 230 |
256 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): | 231 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): |
257 profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions') | 232 client = self.host.getClient(profile_key) |
258 return client.subscriptions(service, nodeIdentifier) | 233 return client.pubsub_client.subscriptions(service, nodeIdentifier) |
259 | 234 |
260 | 235 |
261 class SatPubSubClient(rsm.PubSubClient): | 236 class SatPubSubClient(rsm.PubSubClient): |
262 implements(disco.IDisco) | 237 implements(disco.IDisco) |
263 | 238 |