comparison src/plugins/plugin_xep_0060.py @ 1242:f584fbda4773

plugin XEP-0060: fixes listing the nodes on the pubsub service
author souliane <souliane@mailoo.org>
date Sat, 18 Oct 2014 20:23:12 +0200
parents 16484ebb695b
children b4a264915ea9
comparison
equal deleted inserted replaced
1241:e56dfe0378a1 1242:f584fbda4773
58 def __init__(self, host): 58 def __init__(self, host):
59 log.info(_("PubSub plugin initialization")) 59 log.info(_("PubSub plugin initialization"))
60 self.host = host 60 self.host = host
61 self.managedNodes = [] 61 self.managedNodes = []
62 self.clients = {} 62 self.clients = {}
63 self.node_cache = Sessions(timeout=30, resettable_timeout=False) 63 self.node_cache = Sessions(timeout=60, resettable_timeout=False)
64 64
65 def getHandler(self, profile): 65 def getHandler(self, profile):
66 self.clients[profile] = SatPubSubClient(self.host, self) 66 self.clients[profile] = SatPubSubClient(self.host, self)
67 return self.clients[profile] 67 return self.clients[profile]
68 68
91 err_mess = _('INTERNAL ERROR: no handler for required profile') 91 err_mess = _('INTERNAL ERROR: no handler for required profile')
92 log.error(err_mess) 92 log.error(err_mess)
93 raise Exception(err_mess) 93 raise Exception(err_mess)
94 return profile, client 94 return profile, client
95 95
96 @defer.inlineCallbacks 96 def _getDeferredNodeCache(self, session_id, init, profile):
97 """Manage a node cache with deferred initialisation and concurrent access.
98
99 @param session_id (string): node cache session ID
100 @param init (Deferred): deferred list of strings to initialise the cache.
101 @param profile (str): %(doc_profile)s
102 @return: Deferred list[str]
103 """
104 if session_id in self.node_cache:
105 cache = self.node_cache.profileGet(session_id, profile)
106 if cache['nodes'] is None:
107 # init is still running
108 d = defer.Deferred()
109 cache['waiting'].append(d)
110 return d
111 return defer.succeed(cache['nodes'])
112
113 cache = {'init': init, 'waiting': [], 'nodes': None}
114 self.node_cache.newSession(cache, session_id=session_id, profile=profile)
115
116 def cb(nodes):
117 cache['nodes'] = nodes
118 for d in cache['waiting']:
119 d.callback(nodes)
120 return nodes
121
122 return init.addCallback(cb)
123
97 def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): 124 def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
98 """Retrieve the name of the nodes that are accesible on the target service. 125 """Retrieve the name of the nodes that are accessible on the target service.
99 126
100 @param service (JID): target service 127 @param service (JID): target service
101 @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) 128 @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
102 @param profile (str): %(doc_profile)s 129 @param profile (str): %(doc_profile)s
103 @return: list[str] 130 @return: Deferred list[str]
104 """ 131 """
105 session_id = profile + "@found@" + service.userhost() 132 session_id = profile + '@found@' + service.userhost()
106 # FIXME: this can be called from self.subscribeToMany before the cache has been built by self.getItemsFromMany 133 d = self.getDiscoItems(service, nodeIdentifier, profile_key=profile)
107 if session_id in self.node_cache: 134 d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
108 cache = self.node_cache.profileGet(session_id, profile) 135 return self._getDeferredNodeCache(session_id, d, profile)
109 else: 136
110 # FIXME: we arrive here while the cache is already being built... 137 def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
111 result = yield self.getDiscoItems(service, nodeIdentifier, profile_key=profile)
112 node_names = [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]
113 dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile)
114 defer.returnValue(cache)
115
116 @defer.inlineCallbacks
117 def listSubscribedNodes(self, service, nodeIdentifier='', filter='subscribed', profile=C.PROF_KEY_NONE):
118 """Retrieve the name of the nodes to which the profile is subscribed on the target service. 138 """Retrieve the name of the nodes to which the profile is subscribed on the target service.
119 139
120 @param service (JID): target service 140 @param service (JID): target service
121 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) 141 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
122 @param filter (str): filter the result according to the given subscription type: 142 @param filter_ (str): filter the result according to the given subscription type:
123 - None: do not filter 143 - None: do not filter
124 - 'pending': subscription has been approved yet by the node owner 144 - 'pending': subscription has been approved yet by the node owner
125 - 'unconfigured': subscription options have not been configured yet 145 - 'unconfigured': subscription options have not been configured yet
126 - 'subscribed': subscription is complete 146 - 'subscribed': subscription is complete
127 @param profile (str): %(doc_profile)s 147 @param profile (str): %(doc_profile)s
128 @return: list[str] 148 @return: Deferred list[str]
129 """ 149 """
130 session_id = profile + "@subscriptions@" + service.userhost() 150 session_id = profile + '@subscriptions@' + service.userhost()
131 if session_id in self.node_cache: 151 d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
132 cache = self.node_cache.profileGet(session_id, profile) 152 d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
133 else: 153 return self._getDeferredNodeCache(session_id, d, profile)
134 subs = yield self.subscriptions(service, nodeIdentifier, profile_key=profile)
135 node_names = [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter]
136 dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile)
137 defer.returnValue(cache)
138 154
139 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): 155 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
140 profile, client = self.__getClientNProfile(profile_key, 'publish item') 156 profile, client = self.__getClientNProfile(profile_key, 'publish item')
141 return client.publish(service, nodeIdentifier, items, client.parent.jid) 157 return client.publish(service, nodeIdentifier, items, client.parent.jid)
142 158