Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0060.py @ 1217:318eab3f93f8
plugin XEP-0060, groupblog: avoid unecessary pubsub errors while doing massive requests:
- don't try to retrieve items from non accessible nodes
- don't try to subscribe to non accessible or already subscribed nodes
author | souliane <souliane@mailoo.org> |
---|---|
date | Mon, 22 Sep 2014 20:49:13 +0200 |
parents | 301b342c697a |
children | 16484ebb695b |
comparison
equal
deleted
inserted
replaced
1216:8ad37c3d58a9 | 1217:318eab3f93f8 |
---|---|
19 | 19 |
20 from sat.core.i18n import _ | 20 from sat.core.i18n import _ |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core.log import getLogger | 22 from sat.core.log import getLogger |
23 log = getLogger(__name__) | 23 log = getLogger(__name__) |
24 from wokkel.pubsub import PubSubRequest | 24 from sat.memory.memory import Sessions |
25 | |
25 from wokkel import disco, pubsub | 26 from wokkel import disco, pubsub |
27 from wokkel.pubsub import PubSubRequest, NS_PUBSUB | |
26 from zope.interface import implements | 28 from zope.interface import implements |
29 from twisted.internet import defer | |
30 | |
27 | 31 |
28 PLUGIN_INFO = { | 32 PLUGIN_INFO = { |
29 "name": "Publish-Subscribe", | 33 "name": "Publish-Subscribe", |
30 "import_name": "XEP-0060", | 34 "import_name": "XEP-0060", |
31 "type": "XEP", | 35 "type": "XEP", |
52 def __init__(self, host): | 56 def __init__(self, host): |
53 log.info(_("PubSub plugin initialization")) | 57 log.info(_("PubSub plugin initialization")) |
54 self.host = host | 58 self.host = host |
55 self.managedNodes = [] | 59 self.managedNodes = [] |
56 self.clients = {} | 60 self.clients = {} |
57 """host.bridge.addMethod("getItems", ".plugin", in_sign='ssa{ss}s', out_sign='as', method=self.getItems, | 61 self.node_cache = Sessions(timeout=30, resettable_timeout=False) |
58 async = True, | |
59 doc = { 'summary':'retrieve items', | |
60 'param_0':'service: pubsub service', | |
61 'param_1':'node: node identifier', | |
62 'param_2':'\n'.join(['options: can be:', | |
63 '- max_items: see XEP-0060 #6.5.7', | |
64 '- sub_id: subscription identifier, see XEP-0060 #7.2.2.2']), | |
65 'param_3':'%(doc_profile)s', | |
66 'return':'array of raw XML (content of the items)' | |
67 })""" | |
68 | 62 |
69 def getHandler(self, profile): | 63 def getHandler(self, profile): |
70 self.clients[profile] = SatPubSubClient(self.host, self) | 64 self.clients[profile] = SatPubSubClient(self.host, self) |
71 return self.clients[profile] | 65 return self.clients[profile] |
72 | 66 |
95 err_mess = _('INTERNAL ERROR: no handler for required profile') | 89 err_mess = _('INTERNAL ERROR: no handler for required profile') |
96 log.error(err_mess) | 90 log.error(err_mess) |
97 raise Exception(err_mess) | 91 raise Exception(err_mess) |
98 return profile, client | 92 return profile, client |
99 | 93 |
94 @defer.inlineCallbacks | |
95 def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): | |
96 """Retrieve the name of the nodes that are accesible on the target service. | |
97 | |
98 @param service (JID): target service | |
99 @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) | |
100 @param profile (str): %(doc_profile)s | |
101 @return: list[str] | |
102 """ | |
103 session_id = profile + "@found@" + service.userhost() | |
104 # FIXME: this can be called from self.subscribeToMany before the cache has been built by self.getItemsFromMany | |
105 if session_id in self.node_cache: | |
106 cache = self.node_cache.profileGet(session_id, profile) | |
107 else: | |
108 # FIXME: we arrive here while the cache is already being built... | |
109 result = yield self.getDiscoItems(service, nodeIdentifier, profile_key=profile) | |
110 node_names = [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')] | |
111 dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile) | |
112 defer.returnValue(cache) | |
113 | |
114 @defer.inlineCallbacks | |
115 def listSubscribedNodes(self, service, nodeIdentifier='', filter='subscribed', profile=C.PROF_KEY_NONE): | |
116 """Retrieve the name of the nodes to which the profile is subscribed on the target service. | |
117 | |
118 @param service (JID): target service | |
119 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) | |
120 @param filter (str): filter the result according to the given subscription type: | |
121 - None: do not filter | |
122 - 'pending': subscription has been approved yet by the node owner | |
123 - 'unconfigured': subscription options have not been configured yet | |
124 - 'subscribed': subscription is complete | |
125 @param profile (str): %(doc_profile)s | |
126 @return: list[str] | |
127 """ | |
128 session_id = profile + "@subscriptions@" + service.userhost() | |
129 if session_id in self.node_cache: | |
130 cache = self.node_cache.profileGet(session_id, profile) | |
131 else: | |
132 subs = yield self.subscriptions(service, nodeIdentifier, profile_key=profile) | |
133 node_names = [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter] | |
134 dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile) | |
135 defer.returnValue(cache) | |
136 | |
100 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): | 137 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): |
101 profile, client = self.__getClientNProfile(profile_key, 'publish item') | 138 profile, client = self.__getClientNProfile(profile_key, 'publish item') |
102 return client.publish(service, nodeIdentifier, items, client.parent.jid) | 139 return client.publish(service, nodeIdentifier, items, client.parent.jid) |
103 | 140 |
104 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): | 141 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): |
105 profile, client = self.__getClientNProfile(profile_key, 'get items') | 142 profile, client = self.__getClientNProfile(profile_key, 'get items') |
106 return client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) | 143 return client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) |
107 | 144 |
145 @defer.inlineCallbacks | |
146 def getItemsFromMany(self, service, data, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): | |
147 """Massively retrieve pubsub items from many nodes. | |
148 | |
149 @param service (JID): target service. | |
150 @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. | |
151 @param max_items (int): optional limit on the number of retrieved items *per node*. | |
152 @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used). | |
153 @param sub_id (str): optional subscription identifier. | |
154 @param profile_key (str): %(doc_profile_key)s | |
155 @return: dict binding a subset of the keys of data to Deferred instances. | |
156 """ | |
157 profile, client = self.__getClientNProfile(profile_key, 'get items') | |
158 found_nodes = yield self.listNodes(service, profile=profile) | |
159 d_dict = {} | |
160 for publisher, node in data.items(): | |
161 if node not in found_nodes: | |
162 log.info("Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) | |
163 continue # avoid pubsub "item-not-found" error | |
164 d_dict[publisher] = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) | |
165 defer.returnValue(d_dict) | |
166 | |
108 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 167 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): |
109 profile, client = self.__getClientNProfile(profile_key, 'get options') | 168 profile, client = self.__getClientNProfile(profile_key, 'get options') |
110 return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) | 169 return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) |
111 | 170 |
112 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 171 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): |
126 return client.retractItems(service, nodeIdentifier, itemIdentifiers) | 185 return client.retractItems(service, nodeIdentifier, itemIdentifiers) |
127 | 186 |
128 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): | 187 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): |
129 profile, client = self.__getClientNProfile(profile_key, 'subscribe node') | 188 profile, client = self.__getClientNProfile(profile_key, 'subscribe node') |
130 return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options) | 189 return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options) |
190 | |
191 @defer.inlineCallbacks | |
192 def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): | |
193 """Massively subscribe to many nodes. | |
194 | |
195 @param service (JID): target service. | |
196 @param nodeIdentifiers (list): the list of node identifiers to subscribe to. | |
197 @param sub_id (str): optional subscription identifier. | |
198 @param options (list): optional list of subscription options | |
199 @param profile_key (str): %(doc_profile_key)s | |
200 @return: dict binding a subset of the keys of data to Deferred instances. | |
201 """ | |
202 profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes') | |
203 found_nodes = yield self.listNodes(service, profile=profile) | |
204 subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile) | |
205 d_list = [] | |
206 for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): | |
207 if nodeIdentifier not in found_nodes: | |
208 log.info("Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) | |
209 continue # avoid sat-pubsub "SubscriptionExists" error | |
210 d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)) | |
211 defer.returnValue(d_list) | |
212 | |
213 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): | |
214 profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions') | |
215 return client.subscriptions(service, nodeIdentifier) | |
216 | |
217 def getDiscoItems(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): | |
218 profile, client = self.__getClientNProfile(profile_key, 'disco items') | |
219 return client.getDiscoItems(None, service, nodeIdentifier) | |
131 | 220 |
132 | 221 |
133 class SatPubSubClient(pubsub.PubSubClient): | 222 class SatPubSubClient(pubsub.PubSubClient): |
134 implements(disco.IDisco) | 223 implements(disco.IDisco) |
135 | 224 |
156 @type nodeIdentifier: C{unicode} | 245 @type nodeIdentifier: C{unicode} |
157 | 246 |
158 @param maxItems: Optional limit on the number of retrieved items. | 247 @param maxItems: Optional limit on the number of retrieved items. |
159 @type maxItems: C{int} | 248 @type maxItems: C{int} |
160 | 249 |
161 @param itemIdentifiers: Identifiers of the items to be retracted. | 250 @param itemIdentifiers: Identifiers of the items to be retrieved. |
162 @type itemIdentifiers: C{set} | 251 @type itemIdentifiers: C{set} |
163 | 252 |
164 @param subscriptionIdentifier: Optional subscription identifier. In | 253 @param subscriptionIdentifier: Optional subscription identifier. In |
165 case the node has been subscribed to multiple times, this narrows | 254 case the node has been subscribed to multiple times, this narrows |
166 the results to the specific subscription. | 255 the results to the specific subscription. |
167 @type subscriptionIdentifier: C{unicode} | 256 @type subscriptionIdentifier: C{unicode} |
168 """ | 257 """ |
169 NS_PUBSUB = 'http://jabber.org/protocol/pubsub' | |
170 | |
171 request = PubSubRequest('items') | 258 request = PubSubRequest('items') |
172 request.recipient = service | 259 request.recipient = service |
173 request.nodeIdentifier = nodeIdentifier | 260 request.nodeIdentifier = nodeIdentifier |
174 if maxItems: | 261 if maxItems: |
175 request.maxItems = str(int(maxItems)) | 262 request.maxItems = str(int(maxItems)) |
218 #TODO: manage delete event | 305 #TODO: manage delete event |
219 log.debug(_("Publish node deleted")) | 306 log.debug(_("Publish node deleted")) |
220 | 307 |
221 # def purgeReceived(self, event): | 308 # def purgeReceived(self, event): |
222 | 309 |
223 | 310 @defer.inlineCallbacks |
224 | 311 def subscriptions(self, service, nodeIdentifier, sender=None): |
225 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 312 """Return the list of subscriptions to the given service and node. |
226 _disco_info = [] | 313 |
227 self.host.trigger.point("PubSub Disco Info", _disco_info, self.parent.profile) | 314 @param service: The publish subscribe service to retrieve the subscriptions from. |
228 return _disco_info | 315 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} |
229 | 316 @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). |
230 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | 317 @type nodeIdentifier: C{unicode} |
231 return [] | 318 """ |
319 request = PubSubRequest('subscriptions') | |
320 request.recipient = service | |
321 request.nodeIdentifier = nodeIdentifier | |
322 request.sender = sender | |
323 iq = yield request.send(self.xmlstream) | |
324 defer.returnValue([sub for sub in iq.pubsub.subscriptions.elements() if | |
325 (sub.uri == NS_PUBSUB and sub.name == 'subscription')]) | |
326 | |
327 def getDiscoInfo(self, requestor, service, nodeIdentifier=''): | |
328 disco_info = [] | |
329 self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) | |
330 return disco_info | |
331 | |
332 def getDiscoItems(self, requestor, service, nodeIdentifier=''): | |
333 return self.host.getDiscoItems(service, nodeIdentifier, self.parent.profile) |