Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0060.py @ 1242:f584fbda4773
plugin XEP-0060: fixes listing the nodes on the pubsub service
author | souliane <souliane@mailoo.org> |
---|---|
date | Sat, 18 Oct 2014 20:23:12 +0200 |
parents | 16484ebb695b |
children | b4a264915ea9 |
comparison
equal
deleted
inserted
replaced
1241:e56dfe0378a1 | 1242:f584fbda4773 |
---|---|
58 def __init__(self, host): | 58 def __init__(self, host): |
59 log.info(_("PubSub plugin initialization")) | 59 log.info(_("PubSub plugin initialization")) |
60 self.host = host | 60 self.host = host |
61 self.managedNodes = [] | 61 self.managedNodes = [] |
62 self.clients = {} | 62 self.clients = {} |
63 self.node_cache = Sessions(timeout=30, resettable_timeout=False) | 63 self.node_cache = Sessions(timeout=60, resettable_timeout=False) |
64 | 64 |
65 def getHandler(self, profile): | 65 def getHandler(self, profile): |
66 self.clients[profile] = SatPubSubClient(self.host, self) | 66 self.clients[profile] = SatPubSubClient(self.host, self) |
67 return self.clients[profile] | 67 return self.clients[profile] |
68 | 68 |
91 err_mess = _('INTERNAL ERROR: no handler for required profile') | 91 err_mess = _('INTERNAL ERROR: no handler for required profile') |
92 log.error(err_mess) | 92 log.error(err_mess) |
93 raise Exception(err_mess) | 93 raise Exception(err_mess) |
94 return profile, client | 94 return profile, client |
95 | 95 |
96 @defer.inlineCallbacks | 96 def _getDeferredNodeCache(self, session_id, init, profile): |
97 """Manage a node cache with deferred initialisation and concurrent access. | |
98 | |
99 @param session_id (string): node cache session ID | |
100 @param init (Deferred): deferred list of strings to initialise the cache. | |
101 @param profile (str): %(doc_profile)s | |
102 @return: Deferred list[str] | |
103 """ | |
104 if session_id in self.node_cache: | |
105 cache = self.node_cache.profileGet(session_id, profile) | |
106 if cache['nodes'] is None: | |
107 # init is still running | |
108 d = defer.Deferred() | |
109 cache['waiting'].append(d) | |
110 return d | |
111 return defer.succeed(cache['nodes']) | |
112 | |
113 cache = {'init': init, 'waiting': [], 'nodes': None} | |
114 self.node_cache.newSession(cache, session_id=session_id, profile=profile) | |
115 | |
116 def cb(nodes): | |
117 cache['nodes'] = nodes | |
118 for d in cache['waiting']: | |
119 d.callback(nodes) | |
120 return nodes | |
121 | |
122 return init.addCallback(cb) | |
123 | |
97 def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): | 124 def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): |
98 """Retrieve the name of the nodes that are accesible on the target service. | 125 """Retrieve the name of the nodes that are accessible on the target service. |
99 | 126 |
100 @param service (JID): target service | 127 @param service (JID): target service |
101 @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) | 128 @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) |
102 @param profile (str): %(doc_profile)s | 129 @param profile (str): %(doc_profile)s |
103 @return: list[str] | 130 @return: Deferred list[str] |
104 """ | 131 """ |
105 session_id = profile + "@found@" + service.userhost() | 132 session_id = profile + '@found@' + service.userhost() |
106 # FIXME: this can be called from self.subscribeToMany before the cache has been built by self.getItemsFromMany | 133 d = self.getDiscoItems(service, nodeIdentifier, profile_key=profile) |
107 if session_id in self.node_cache: | 134 d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) |
108 cache = self.node_cache.profileGet(session_id, profile) | 135 return self._getDeferredNodeCache(session_id, d, profile) |
109 else: | 136 |
110 # FIXME: we arrive here while the cache is already being built... | 137 def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): |
111 result = yield self.getDiscoItems(service, nodeIdentifier, profile_key=profile) | |
112 node_names = [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')] | |
113 dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile) | |
114 defer.returnValue(cache) | |
115 | |
116 @defer.inlineCallbacks | |
117 def listSubscribedNodes(self, service, nodeIdentifier='', filter='subscribed', profile=C.PROF_KEY_NONE): | |
118 """Retrieve the name of the nodes to which the profile is subscribed on the target service. | 138 """Retrieve the name of the nodes to which the profile is subscribed on the target service. |
119 | 139 |
120 @param service (JID): target service | 140 @param service (JID): target service |
121 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) | 141 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) |
122 @param filter (str): filter the result according to the given subscription type: | 142 @param filter_ (str): filter the result according to the given subscription type: |
123 - None: do not filter | 143 - None: do not filter |
124 - 'pending': subscription has been approved yet by the node owner | 144 - 'pending': subscription has been approved yet by the node owner |
125 - 'unconfigured': subscription options have not been configured yet | 145 - 'unconfigured': subscription options have not been configured yet |
126 - 'subscribed': subscription is complete | 146 - 'subscribed': subscription is complete |
127 @param profile (str): %(doc_profile)s | 147 @param profile (str): %(doc_profile)s |
128 @return: list[str] | 148 @return: Deferred list[str] |
129 """ | 149 """ |
130 session_id = profile + "@subscriptions@" + service.userhost() | 150 session_id = profile + '@subscriptions@' + service.userhost() |
131 if session_id in self.node_cache: | 151 d = self.subscriptions(service, nodeIdentifier, profile_key=profile) |
132 cache = self.node_cache.profileGet(session_id, profile) | 152 d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) |
133 else: | 153 return self._getDeferredNodeCache(session_id, d, profile) |
134 subs = yield self.subscriptions(service, nodeIdentifier, profile_key=profile) | |
135 node_names = [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter] | |
136 dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile) | |
137 defer.returnValue(cache) | |
138 | 154 |
139 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): | 155 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): |
140 profile, client = self.__getClientNProfile(profile_key, 'publish item') | 156 profile, client = self.__getClientNProfile(profile_key, 'publish item') |
141 return client.publish(service, nodeIdentifier, items, client.parent.jid) | 157 return client.publish(service, nodeIdentifier, items, client.parent.jid) |
142 | 158 |