comparison src/plugins/plugin_xep_0060.py @ 1449:389357fd79ce

plugin XEP-0060: use of new RTDeferredSession to subscribe many nodes at once + subscribeToMany can now subscribe on separate services
author Goffi <goffi@goffi.org>
date Sat, 15 Aug 2015 22:13:27 +0200
parents e8c8e467964b
children 9b88b19b1ca8
comparison
equal deleted inserted replaced
1448:227856b13d7a 1449:389357fd79ce
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger 22 from sat.core.log import getLogger
23 log = getLogger(__name__) 23 log = getLogger(__name__)
24 24
25 from wokkel import disco, pubsub, rsm 25 from sat.tools import sat_defer
26
27 from twisted.words.protocols.jabber import jid
28 from wokkel import disco
29 from wokkel import pubsub
30 from wokkel import rsm
26 from zope.interface import implements 31 from zope.interface import implements
27 # from twisted.internet import defer 32 # from twisted.internet import defer
28 import uuid 33 import uuid
34
35 UNSPECIFIED = "unspecified error"
29 36
30 37
31 PLUGIN_INFO = { 38 PLUGIN_INFO = {
32 "name": "Publish-Subscribe", 39 "name": "Publish-Subscribe",
33 "import_name": "XEP-0060", 40 "import_name": "XEP-0060",
55 62
56 def __init__(self, host): 63 def __init__(self, host):
57 log.info(_(u"PubSub plugin initialization")) 64 log.info(_(u"PubSub plugin initialization"))
58 self.host = host 65 self.host = host
59 self.managedNodes = [] 66 self.managedNodes = []
67 self.rt_sessions = sat_defer.RTDeferredSessions()
68 host.bridge.addMethod("subscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany)
69 host.bridge.addMethod("getSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True)
60 70
61 def getHandler(self, profile): 71 def getHandler(self, profile):
62 client = self.host.getClient(profile) 72 client = self.host.getClient(profile)
63 client.pubsub_client = SatPubSubClient(self.host, self) 73 client.pubsub_client = SatPubSubClient(self.host, self)
64 return client.pubsub_client 74 return client.pubsub_client
183 193
184 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): 194 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
185 client = self.host.getClient(profile_key) 195 client = self.host.getClient(profile_key)
186 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) 196 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)
187 197
188 # @defer.inlineCallbacks
189 # def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
190 # """Massively subscribe to many nodes.
191
192 # @param service (JID): target service.
193 # @param nodeIdentifiers (list): the list of node identifiers to subscribe to.
194 # @param sub_id (str): optional subscription identifier.
195 # @param options (list): optional list of subscription options
196 # @param profile_key (str): %(doc_profile_key)s
197 # @return: list of Deferred instances.
198 # """
199 # client = self.host.getClient(profile_key)
200 # found_nodes = yield self.listNodes(service, profile=client.profile)
201 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
202 # d_list = []
203 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
204 # if nodeIdentifier not in found_nodes:
205 # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
206 # continue # avoid sat-pubsub "SubscriptionExists" error
207 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
208 # defer.returnValue(d_list)
209
210 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): 198 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
211 client = self.host.getClient(profile_key) 199 client = self.host.getClient(profile_key)
212 return client.pubsub_client.subscriptions(service, nodeIdentifier) 200 return client.pubsub_client.subscriptions(service, nodeIdentifier)
201
202 ## methods to manage several stanzas/jids at once ##
203
204 # subscribe #
205
206 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
207 """Get real-time results for subcribeToManu session
208
209 @param session_id: id of the real-time deferred session
210 @param return (tuple): (remaining, results) where:
211 - remaining is the number of still expected results
212 - results is a list of tuple(unicode, unicode, bool, unicode) with:
213 - service: pubsub service
214 - and node: pubsub node
215 - failure(unicode): empty string in case of success, error message else
216 @param profile_key: %(doc_profile_key)s
217 """
218 profile = self.host.getClient(profile_key).profile
219 d = self.rt_sessions.getResults(session_id, on_success=lambda result:'', on_error=lambda failure:unicode(failure.value), profile=profile)
220 # we need to convert jid.JID to unicode with full() to serialise it for the bridge
221 d.addCallback(lambda ret: (ret[0], [(service.full(), node, '' if success else failure or UNSPECIFIED)
222 for (service, node), (success, failure) in ret[1].iteritems()]))
223 return d
224
225 def _subscribeToMany(self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE):
226 return self.subscribeToMany([(jid.JID(service), unicode(node)) for service, node in node_data], jid.JID(subscriber), options, profile_key)
227
228 def subscribeToMany(self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE):
229 """Subscribe to several nodes at once.
230
231 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
232 - service (jid.JID) is the pubsub service
233 - node (unicode) is the node to subscribe to
234 @param subscriber (jid.JID): optional subscription identifier.
235 @param options (dict): subscription options
236 @param profile_key (str): %(doc_profile_key)s
237 @return (str): RT Deferred session id
238 """
239 client = self.host.getClient(profile_key)
240 deferreds = {}
241 for service, node in node_data:
242 deferreds[(service, node)] = client.pubsub_client.subscribe(service, node, subscriber, options=options)
243 return self.rt_sessions.newSession(deferreds, client.profile)
244 # found_nodes = yield self.listNodes(service, profile=client.profile)
245 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
246 # d_list = []
247 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
248 # if nodeIdentifier not in found_nodes:
249 # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
250 # continue # avoid sat-pubsub "SubscriptionExists" error
251 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
252 # defer.returnValue(d_list)
213 253
214 254
215 class SatPubSubClient(rsm.PubSubClient): 255 class SatPubSubClient(rsm.PubSubClient):
216 implements(disco.IDisco) 256 implements(disco.IDisco)
217 257