comparison src/plugins/plugin_xep_0060.py @ 1217:318eab3f93f8

plugin XEP-0060, groupblog: avoid unecessary pubsub errors while doing massive requests: - don't try to retrieve items from non accessible nodes - don't try to subscribe to non accessible or already subscribed nodes
author souliane <souliane@mailoo.org>
date Mon, 22 Sep 2014 20:49:13 +0200
parents 301b342c697a
children 16484ebb695b
comparison
equal deleted inserted replaced
1216:8ad37c3d58a9 1217:318eab3f93f8
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger 22 from sat.core.log import getLogger
23 log = getLogger(__name__) 23 log = getLogger(__name__)
24 from wokkel.pubsub import PubSubRequest 24 from sat.memory.memory import Sessions
25
25 from wokkel import disco, pubsub 26 from wokkel import disco, pubsub
27 from wokkel.pubsub import PubSubRequest, NS_PUBSUB
26 from zope.interface import implements 28 from zope.interface import implements
29 from twisted.internet import defer
30
27 31
28 PLUGIN_INFO = { 32 PLUGIN_INFO = {
29 "name": "Publish-Subscribe", 33 "name": "Publish-Subscribe",
30 "import_name": "XEP-0060", 34 "import_name": "XEP-0060",
31 "type": "XEP", 35 "type": "XEP",
52 def __init__(self, host): 56 def __init__(self, host):
53 log.info(_("PubSub plugin initialization")) 57 log.info(_("PubSub plugin initialization"))
54 self.host = host 58 self.host = host
55 self.managedNodes = [] 59 self.managedNodes = []
56 self.clients = {} 60 self.clients = {}
57 """host.bridge.addMethod("getItems", ".plugin", in_sign='ssa{ss}s', out_sign='as', method=self.getItems, 61 self.node_cache = Sessions(timeout=30, resettable_timeout=False)
58 async = True,
59 doc = { 'summary':'retrieve items',
60 'param_0':'service: pubsub service',
61 'param_1':'node: node identifier',
62 'param_2':'\n'.join(['options: can be:',
63 '- max_items: see XEP-0060 #6.5.7',
64 '- sub_id: subscription identifier, see XEP-0060 #7.2.2.2']),
65 'param_3':'%(doc_profile)s',
66 'return':'array of raw XML (content of the items)'
67 })"""
68 62
69 def getHandler(self, profile): 63 def getHandler(self, profile):
70 self.clients[profile] = SatPubSubClient(self.host, self) 64 self.clients[profile] = SatPubSubClient(self.host, self)
71 return self.clients[profile] 65 return self.clients[profile]
72 66
95 err_mess = _('INTERNAL ERROR: no handler for required profile') 89 err_mess = _('INTERNAL ERROR: no handler for required profile')
96 log.error(err_mess) 90 log.error(err_mess)
97 raise Exception(err_mess) 91 raise Exception(err_mess)
98 return profile, client 92 return profile, client
99 93
94 @defer.inlineCallbacks
95 def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
96 """Retrieve the name of the nodes that are accesible on the target service.
97
98 @param service (JID): target service
99 @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
100 @param profile (str): %(doc_profile)s
101 @return: list[str]
102 """
103 session_id = profile + "@found@" + service.userhost()
104 # FIXME: this can be called from self.subscribeToMany before the cache has been built by self.getItemsFromMany
105 if session_id in self.node_cache:
106 cache = self.node_cache.profileGet(session_id, profile)
107 else:
108 # FIXME: we arrive here while the cache is already being built...
109 result = yield self.getDiscoItems(service, nodeIdentifier, profile_key=profile)
110 node_names = [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]
111 dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile)
112 defer.returnValue(cache)
113
114 @defer.inlineCallbacks
115 def listSubscribedNodes(self, service, nodeIdentifier='', filter='subscribed', profile=C.PROF_KEY_NONE):
116 """Retrieve the name of the nodes to which the profile is subscribed on the target service.
117
118 @param service (JID): target service
119 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
120 @param filter (str): filter the result according to the given subscription type:
121 - None: do not filter
122 - 'pending': subscription has been approved yet by the node owner
123 - 'unconfigured': subscription options have not been configured yet
124 - 'subscribed': subscription is complete
125 @param profile (str): %(doc_profile)s
126 @return: list[str]
127 """
128 session_id = profile + "@subscriptions@" + service.userhost()
129 if session_id in self.node_cache:
130 cache = self.node_cache.profileGet(session_id, profile)
131 else:
132 subs = yield self.subscriptions(service, nodeIdentifier, profile_key=profile)
133 node_names = [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter]
134 dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile)
135 defer.returnValue(cache)
136
100 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): 137 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
101 profile, client = self.__getClientNProfile(profile_key, 'publish item') 138 profile, client = self.__getClientNProfile(profile_key, 'publish item')
102 return client.publish(service, nodeIdentifier, items, client.parent.jid) 139 return client.publish(service, nodeIdentifier, items, client.parent.jid)
103 140
104 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): 141 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE):
105 profile, client = self.__getClientNProfile(profile_key, 'get items') 142 profile, client = self.__getClientNProfile(profile_key, 'get items')
106 return client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) 143 return client.items(service, node, max_items, item_ids, sub_id, client.parent.jid)
107 144
145 @defer.inlineCallbacks
146 def getItemsFromMany(self, service, data, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE):
147 """Massively retrieve pubsub items from many nodes.
148
149 @param service (JID): target service.
150 @param data (dict): dictionnary binding some arbitrary keys to the node identifiers.
151 @param max_items (int): optional limit on the number of retrieved items *per node*.
152 @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used).
153 @param sub_id (str): optional subscription identifier.
154 @param profile_key (str): %(doc_profile_key)s
155 @return: dict binding a subset of the keys of data to Deferred instances.
156 """
157 profile, client = self.__getClientNProfile(profile_key, 'get items')
158 found_nodes = yield self.listNodes(service, profile=profile)
159 d_dict = {}
160 for publisher, node in data.items():
161 if node not in found_nodes:
162 log.info("Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
163 continue # avoid pubsub "item-not-found" error
164 d_dict[publisher] = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid)
165 defer.returnValue(d_dict)
166
108 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 167 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
109 profile, client = self.__getClientNProfile(profile_key, 'get options') 168 profile, client = self.__getClientNProfile(profile_key, 'get options')
110 return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) 169 return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier)
111 170
112 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 171 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
126 return client.retractItems(service, nodeIdentifier, itemIdentifiers) 185 return client.retractItems(service, nodeIdentifier, itemIdentifiers)
127 186
128 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): 187 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
129 profile, client = self.__getClientNProfile(profile_key, 'subscribe node') 188 profile, client = self.__getClientNProfile(profile_key, 'subscribe node')
130 return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options) 189 return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)
190
191 @defer.inlineCallbacks
192 def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
193 """Massively subscribe to many nodes.
194
195 @param service (JID): target service.
196 @param nodeIdentifiers (list): the list of node identifiers to subscribe to.
197 @param sub_id (str): optional subscription identifier.
198 @param options (list): optional list of subscription options
199 @param profile_key (str): %(doc_profile_key)s
200 @return: dict binding a subset of the keys of data to Deferred instances.
201 """
202 profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes')
203 found_nodes = yield self.listNodes(service, profile=profile)
204 subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile)
205 d_list = []
206 for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
207 if nodeIdentifier not in found_nodes:
208 log.info("Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
209 continue # avoid sat-pubsub "SubscriptionExists" error
210 d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options))
211 defer.returnValue(d_list)
212
213 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
214 profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions')
215 return client.subscriptions(service, nodeIdentifier)
216
217 def getDiscoItems(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
218 profile, client = self.__getClientNProfile(profile_key, 'disco items')
219 return client.getDiscoItems(None, service, nodeIdentifier)
131 220
132 221
133 class SatPubSubClient(pubsub.PubSubClient): 222 class SatPubSubClient(pubsub.PubSubClient):
134 implements(disco.IDisco) 223 implements(disco.IDisco)
135 224
156 @type nodeIdentifier: C{unicode} 245 @type nodeIdentifier: C{unicode}
157 246
158 @param maxItems: Optional limit on the number of retrieved items. 247 @param maxItems: Optional limit on the number of retrieved items.
159 @type maxItems: C{int} 248 @type maxItems: C{int}
160 249
161 @param itemIdentifiers: Identifiers of the items to be retracted. 250 @param itemIdentifiers: Identifiers of the items to be retrieved.
162 @type itemIdentifiers: C{set} 251 @type itemIdentifiers: C{set}
163 252
164 @param subscriptionIdentifier: Optional subscription identifier. In 253 @param subscriptionIdentifier: Optional subscription identifier. In
165 case the node has been subscribed to multiple times, this narrows 254 case the node has been subscribed to multiple times, this narrows
166 the results to the specific subscription. 255 the results to the specific subscription.
167 @type subscriptionIdentifier: C{unicode} 256 @type subscriptionIdentifier: C{unicode}
168 """ 257 """
169 NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
170
171 request = PubSubRequest('items') 258 request = PubSubRequest('items')
172 request.recipient = service 259 request.recipient = service
173 request.nodeIdentifier = nodeIdentifier 260 request.nodeIdentifier = nodeIdentifier
174 if maxItems: 261 if maxItems:
175 request.maxItems = str(int(maxItems)) 262 request.maxItems = str(int(maxItems))
218 #TODO: manage delete event 305 #TODO: manage delete event
219 log.debug(_("Publish node deleted")) 306 log.debug(_("Publish node deleted"))
220 307
221 # def purgeReceived(self, event): 308 # def purgeReceived(self, event):
222 309
223 310 @defer.inlineCallbacks
224 311 def subscriptions(self, service, nodeIdentifier, sender=None):
225 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 312 """Return the list of subscriptions to the given service and node.
226 _disco_info = [] 313
227 self.host.trigger.point("PubSub Disco Info", _disco_info, self.parent.profile) 314 @param service: The publish subscribe service to retrieve the subscriptions from.
228 return _disco_info 315 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
229 316 @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions).
230 def getDiscoItems(self, requestor, target, nodeIdentifier=''): 317 @type nodeIdentifier: C{unicode}
231 return [] 318 """
319 request = PubSubRequest('subscriptions')
320 request.recipient = service
321 request.nodeIdentifier = nodeIdentifier
322 request.sender = sender
323 iq = yield request.send(self.xmlstream)
324 defer.returnValue([sub for sub in iq.pubsub.subscriptions.elements() if
325 (sub.uri == NS_PUBSUB and sub.name == 'subscription')])
326
327 def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
328 disco_info = []
329 self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile)
330 return disco_info
331
332 def getDiscoItems(self, requestor, service, nodeIdentifier=''):
333 return self.host.getDiscoItems(service, nodeIdentifier, self.parent.profile)