Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0060.py @ 1449:389357fd79ce
plugin XEP-0060: use of new RTDeferredSession to subscribe many nodes at once + subscribeToMany can now subscribe on separate services
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 15 Aug 2015 22:13:27 +0200 |
parents | e8c8e467964b |
children | 9b88b19b1ca8 |
comparison
equal
deleted
inserted
replaced
1448:227856b13d7a | 1449:389357fd79ce |
---|---|
20 from sat.core.i18n import _ | 20 from sat.core.i18n import _ |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core.log import getLogger | 22 from sat.core.log import getLogger |
23 log = getLogger(__name__) | 23 log = getLogger(__name__) |
24 | 24 |
25 from wokkel import disco, pubsub, rsm | 25 from sat.tools import sat_defer |
26 | |
27 from twisted.words.protocols.jabber import jid | |
28 from wokkel import disco | |
29 from wokkel import pubsub | |
30 from wokkel import rsm | |
26 from zope.interface import implements | 31 from zope.interface import implements |
27 # from twisted.internet import defer | 32 # from twisted.internet import defer |
28 import uuid | 33 import uuid |
34 | |
35 UNSPECIFIED = "unspecified error" | |
29 | 36 |
30 | 37 |
31 PLUGIN_INFO = { | 38 PLUGIN_INFO = { |
32 "name": "Publish-Subscribe", | 39 "name": "Publish-Subscribe", |
33 "import_name": "XEP-0060", | 40 "import_name": "XEP-0060", |
55 | 62 |
56 def __init__(self, host): | 63 def __init__(self, host): |
57 log.info(_(u"PubSub plugin initialization")) | 64 log.info(_(u"PubSub plugin initialization")) |
58 self.host = host | 65 self.host = host |
59 self.managedNodes = [] | 66 self.managedNodes = [] |
67 self.rt_sessions = sat_defer.RTDeferredSessions() | |
68 host.bridge.addMethod("subscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany) | |
69 host.bridge.addMethod("getSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True) | |
60 | 70 |
61 def getHandler(self, profile): | 71 def getHandler(self, profile): |
62 client = self.host.getClient(profile) | 72 client = self.host.getClient(profile) |
63 client.pubsub_client = SatPubSubClient(self.host, self) | 73 client.pubsub_client = SatPubSubClient(self.host, self) |
64 return client.pubsub_client | 74 return client.pubsub_client |
183 | 193 |
184 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): | 194 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): |
185 client = self.host.getClient(profile_key) | 195 client = self.host.getClient(profile_key) |
186 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) | 196 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) |
187 | 197 |
188 # @defer.inlineCallbacks | |
189 # def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): | |
190 # """Massively subscribe to many nodes. | |
191 | |
192 # @param service (JID): target service. | |
193 # @param nodeIdentifiers (list): the list of node identifiers to subscribe to. | |
194 # @param sub_id (str): optional subscription identifier. | |
195 # @param options (list): optional list of subscription options | |
196 # @param profile_key (str): %(doc_profile_key)s | |
197 # @return: list of Deferred instances. | |
198 # """ | |
199 # client = self.host.getClient(profile_key) | |
200 # found_nodes = yield self.listNodes(service, profile=client.profile) | |
201 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | |
202 # d_list = [] | |
203 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): | |
204 # if nodeIdentifier not in found_nodes: | |
205 # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) | |
206 # continue # avoid sat-pubsub "SubscriptionExists" error | |
207 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) | |
208 # defer.returnValue(d_list) | |
209 | |
210 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): | 198 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): |
211 client = self.host.getClient(profile_key) | 199 client = self.host.getClient(profile_key) |
212 return client.pubsub_client.subscriptions(service, nodeIdentifier) | 200 return client.pubsub_client.subscriptions(service, nodeIdentifier) |
201 | |
202 ## methods to manage several stanzas/jids at once ## | |
203 | |
204 # subscribe # | |
205 | |
206 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): | |
207 """Get real-time results for subcribeToManu session | |
208 | |
209 @param session_id: id of the real-time deferred session | |
210 @param return (tuple): (remaining, results) where: | |
211 - remaining is the number of still expected results | |
212 - results is a list of tuple(unicode, unicode, bool, unicode) with: | |
213 - service: pubsub service | |
214 - and node: pubsub node | |
215 - failure(unicode): empty string in case of success, error message else | |
216 @param profile_key: %(doc_profile_key)s | |
217 """ | |
218 profile = self.host.getClient(profile_key).profile | |
219 d = self.rt_sessions.getResults(session_id, on_success=lambda result:'', on_error=lambda failure:unicode(failure.value), profile=profile) | |
220 # we need to convert jid.JID to unicode with full() to serialise it for the bridge | |
221 d.addCallback(lambda ret: (ret[0], [(service.full(), node, '' if success else failure or UNSPECIFIED) | |
222 for (service, node), (success, failure) in ret[1].iteritems()])) | |
223 return d | |
224 | |
225 def _subscribeToMany(self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE): | |
226 return self.subscribeToMany([(jid.JID(service), unicode(node)) for service, node in node_data], jid.JID(subscriber), options, profile_key) | |
227 | |
228 def subscribeToMany(self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE): | |
229 """Subscribe to several nodes at once. | |
230 | |
231 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: | |
232 - service (jid.JID) is the pubsub service | |
233 - node (unicode) is the node to subscribe to | |
234 @param subscriber (jid.JID): optional subscription identifier. | |
235 @param options (dict): subscription options | |
236 @param profile_key (str): %(doc_profile_key)s | |
237 @return (str): RT Deferred session id | |
238 """ | |
239 client = self.host.getClient(profile_key) | |
240 deferreds = {} | |
241 for service, node in node_data: | |
242 deferreds[(service, node)] = client.pubsub_client.subscribe(service, node, subscriber, options=options) | |
243 return self.rt_sessions.newSession(deferreds, client.profile) | |
244 # found_nodes = yield self.listNodes(service, profile=client.profile) | |
245 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | |
246 # d_list = [] | |
247 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): | |
248 # if nodeIdentifier not in found_nodes: | |
249 # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) | |
250 # continue # avoid sat-pubsub "SubscriptionExists" error | |
251 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) | |
252 # defer.returnValue(d_list) | |
213 | 253 |
214 | 254 |
215 class SatPubSubClient(rsm.PubSubClient): | 255 class SatPubSubClient(rsm.PubSubClient): |
216 implements(disco.IDisco) | 256 implements(disco.IDisco) |
217 | 257 |