Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0060.py @ 1446:e8c8e467964b
plugins xep-0060, xep-0277: code simplification/cleaning/fix:
- plugin xep-0060: moved rsm data to a more general metadata dict, which will contain all data relative to the node/items set. RSM metadata are prefixed with "rsm_"
- plugin xep-0060: minor docstring fixes
- plugin xep-0060: removed cache to simplify code base
- fixed broken getLastMicroblogs
- added _getLastMicroblogs as wrapper to getLastMicroblogs, for bridge
- removed lxml dependecy for this plugin, use native twisted instead
- several improvments/fixes in item2mbdata
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 15 Aug 2015 22:13:27 +0200 |
parents | 7c0acb966fd6 |
children | 389357fd79ce |
comparison
equal
deleted
inserted
replaced
1445:ddc7a39ff9d1 | 1446:e8c8e467964b |
---|---|
19 | 19 |
20 from sat.core.i18n import _ | 20 from sat.core.i18n import _ |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core.log import getLogger | 22 from sat.core.log import getLogger |
23 log = getLogger(__name__) | 23 log = getLogger(__name__) |
24 from sat.memory.memory import Sessions | |
25 | 24 |
26 from wokkel import disco, pubsub, rsm | 25 from wokkel import disco, pubsub, rsm |
27 from zope.interface import implements | 26 from zope.interface import implements |
28 from twisted.internet import defer | 27 # from twisted.internet import defer |
29 import uuid | 28 import uuid |
30 | 29 |
31 | 30 |
32 PLUGIN_INFO = { | 31 PLUGIN_INFO = { |
33 "name": "Publish-Subscribe", | 32 "name": "Publish-Subscribe", |
56 | 55 |
57 def __init__(self, host): | 56 def __init__(self, host): |
58 log.info(_(u"PubSub plugin initialization")) | 57 log.info(_(u"PubSub plugin initialization")) |
59 self.host = host | 58 self.host = host |
60 self.managedNodes = [] | 59 self.managedNodes = [] |
61 self.node_cache = Sessions(timeout=60, resettable_timeout=False) | |
62 | 60 |
63 def getHandler(self, profile): | 61 def getHandler(self, profile): |
64 client = self.host.getClient(profile) | 62 client = self.host.getClient(profile) |
65 client.pubsub_client = SatPubSubClient(self.host, self) | 63 client.pubsub_client = SatPubSubClient(self.host, self) |
66 return client.pubsub_client | 64 return client.pubsub_client |
71 @param namespace: NS of the handler (will appear in disco info) | 69 @param namespace: NS of the handler (will appear in disco info) |
72 @param callback: method to call when the handler is found | 70 @param callback: method to call when the handler is found |
73 @param profile: profile which manage this handler""" | 71 @param profile: profile which manage this handler""" |
74 self.managedNodes.append((node_name, callback)) | 72 self.managedNodes.append((node_name, callback)) |
75 | 73 |
76 def _getDeferredNodeCache(self, session_id, init, profile): | 74 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): |
77 """Manage a node cache with deferred initialisation and concurrent access. | 75 # """Retrieve the name of the nodes that are accessible on the target service. |
78 | 76 |
79 @param session_id (string): node cache session ID | 77 # @param service (JID): target service |
80 @param init (Deferred): deferred list of strings to initialise the cache. | 78 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) |
81 @param profile (str): %(doc_profile)s | 79 # @param profile (str): %(doc_profile)s |
82 @return: Deferred list[str] | 80 # @return: deferred which fire a list of nodes |
83 """ | 81 # """ |
84 if session_id in self.node_cache: | 82 # d = self.host.getDiscoItems(service, nodeIdentifier, profile_key=profile) |
85 cache = self.node_cache.profileGet(session_id, profile) | 83 # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) |
86 if cache['nodes'] is None: | 84 # return d |
87 # init is still running | 85 |
88 d = defer.Deferred() | 86 # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): |
89 cache['waiting'].append(d) | 87 # """Retrieve the name of the nodes to which the profile is subscribed on the target service. |
90 return d | 88 |
91 return defer.succeed(cache['nodes']) | 89 # @param service (JID): target service |
92 | 90 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) |
93 cache = {'init': init, 'waiting': [], 'nodes': None} | 91 # @param filter_ (str): filter the result according to the given subscription type: |
94 self.node_cache.newSession(cache, session_id=session_id, profile=profile) | 92 # - None: do not filter |
95 | 93 # - 'pending': subscription has not been approved yet by the node owner |
96 def cb(nodes): | 94 # - 'unconfigured': subscription options have not been configured yet |
97 cache['nodes'] = nodes | 95 # - 'subscribed': subscription is complete |
98 for d in cache['waiting']: | 96 # @param profile (str): %(doc_profile)s |
99 d.callback(nodes) | 97 # @return: Deferred list[str] |
100 return nodes | 98 # """ |
101 | 99 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) |
102 return init.addCallback(cb) | 100 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) |
103 | 101 # return d |
104 def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): | |
105 """Retrieve the name of the nodes that are accessible on the target service. | |
106 | |
107 @param service (JID): target service | |
108 @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) | |
109 @param profile (str): %(doc_profile)s | |
110 @return: Deferred list[str] | |
111 """ | |
112 session_id = profile + '@found@' + service.userhost() | |
113 d = self.host.getDiscoItems(service, nodeIdentifier, profile_key=profile) | |
114 d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) | |
115 return self._getDeferredNodeCache(session_id, d, profile) | |
116 | |
117 def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): | |
118 """Retrieve the name of the nodes to which the profile is subscribed on the target service. | |
119 | |
120 @param service (JID): target service | |
121 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) | |
122 @param filter_ (str): filter the result according to the given subscription type: | |
123 - None: do not filter | |
124 - 'pending': subscription has not been approved yet by the node owner | |
125 - 'unconfigured': subscription options have not been configured yet | |
126 - 'subscribed': subscription is complete | |
127 @param profile (str): %(doc_profile)s | |
128 @return: Deferred list[str] | |
129 """ | |
130 session_id = profile + '@subscriptions@' + service.userhost() | |
131 d = self.subscriptions(service, nodeIdentifier, profile_key=profile) | |
132 d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) | |
133 return self._getDeferredNodeCache(session_id, d, profile) | |
134 | 102 |
135 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): | 103 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): |
136 client = self.host.getClient(profile_key) | 104 client = self.host.getClient(profile_key) |
137 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) | 105 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) |
138 | 106 |
140 """Retrieve pubsub items from a node. | 108 """Retrieve pubsub items from a node. |
141 | 109 |
142 @param service (JID): target service. | 110 @param service (JID): target service. |
143 @param node (str): node id. | 111 @param node (str): node id. |
144 @param max_items (int): optional limit on the number of retrieved items. | 112 @param max_items (int): optional limit on the number of retrieved items. |
145 @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used). | 113 @param item_ids (list[str]): identifiers of the items to be retrieved (can't be used with rsm). |
146 @param sub_id (str): optional subscription identifier. | 114 @param sub_id (str): optional subscription identifier. |
147 @param rsm (dict): RSM request data | 115 @param rsm (dict): RSM request data |
148 @param profile_key (str): %(doc_profile_key)s | 116 @param profile_key (str): %(doc_profile_key)s |
149 @return: a deferred couple (list[dict], dict) containing: | 117 @return: a deferred couple (list[dict], dict) containing: |
150 - list of items | 118 - list of items |
151 - RSM response data | 119 - metadata with the following keys: |
120 - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse | |
152 """ | 121 """ |
122 if rsm and item_ids: | |
123 raise ValueError("items_id can't be used with rsm") | |
153 client = self.host.getClient(profile_key) | 124 client = self.host.getClient(profile_key) |
154 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None | 125 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None |
155 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data) | 126 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data) |
156 d.addCallback(lambda items: (items, client.pubsub_client.getRSMResponse(ext_data['id']) if rsm else {})) | 127 def addMetadata(items): |
128 if not rsm: | |
129 metadata = {} | |
130 else: | |
131 rsm_data = client.pubsub_client.getRSMResponse(ext_data['id']) | |
132 metadata = {'rsm_{}'.format(key): value for key, value in rsm_data} | |
133 return (items, metadata) | |
134 | |
135 d.addCallback(addMetadata) | |
157 return d | 136 return d |
158 | 137 |
159 @defer.inlineCallbacks | 138 # @defer.inlineCallbacks |
160 def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): | 139 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): |
161 """Massively retrieve pubsub items from many nodes. | 140 # """Massively retrieve pubsub items from many nodes. |
162 | 141 |
163 @param service (JID): target service. | 142 # @param service (JID): target service. |
164 @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. | 143 # @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. |
165 @param max_items (int): optional limit on the number of retrieved items *per node*. | 144 # @param max_items (int): optional limit on the number of retrieved items *per node*. |
166 @param sub_id (str): optional subscription identifier. | 145 # @param sub_id (str): optional subscription identifier. |
167 @param rsm (dict): RSM request data | 146 # @param rsm (dict): RSM request data |
168 @param profile_key (str): %(doc_profile_key)s | 147 # @param profile_key (str): %(doc_profile_key)s |
169 @return: a deferred dict with: | 148 # @return: a deferred dict with: |
170 - key: a value in (a subset of) data.keys() | 149 # - key: a value in (a subset of) data.keys() |
171 - couple (list[dict], dict) containing: | 150 # - couple (list[dict], dict) containing: |
172 - list of items | 151 # - list of items |
173 - RSM response data | 152 # - RSM response data |
174 """ | 153 # """ |
175 client = self.host.getClient(profile_key) | 154 # client = self.host.getClient(profile_key) |
176 found_nodes = yield self.listNodes(service, profile=client.profile) | 155 # found_nodes = yield self.listNodes(service, profile=client.profile) |
177 d_dict = {} | 156 # d_dict = {} |
178 for publisher, node in data.items(): | 157 # for publisher, node in data.items(): |
179 if node not in found_nodes: | 158 # if node not in found_nodes: |
180 log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) | 159 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) |
181 continue # avoid pubsub "item-not-found" error | 160 # continue # avoid pubsub "item-not-found" error |
182 d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) | 161 # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) |
183 defer.returnValue(d_dict) | 162 # defer.returnValue(d_dict) |
184 | 163 |
185 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 164 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): |
186 client = self.host.getClient(profile_key) | 165 client = self.host.getClient(profile_key) |
187 return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) | 166 return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) |
188 | 167 |
204 | 183 |
205 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): | 184 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): |
206 client = self.host.getClient(profile_key) | 185 client = self.host.getClient(profile_key) |
207 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) | 186 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) |
208 | 187 |
209 @defer.inlineCallbacks | 188 # @defer.inlineCallbacks |
210 def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): | 189 # def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): |
211 """Massively subscribe to many nodes. | 190 # """Massively subscribe to many nodes. |
212 | 191 |
213 @param service (JID): target service. | 192 # @param service (JID): target service. |
214 @param nodeIdentifiers (list): the list of node identifiers to subscribe to. | 193 # @param nodeIdentifiers (list): the list of node identifiers to subscribe to. |
215 @param sub_id (str): optional subscription identifier. | 194 # @param sub_id (str): optional subscription identifier. |
216 @param options (list): optional list of subscription options | 195 # @param options (list): optional list of subscription options |
217 @param profile_key (str): %(doc_profile_key)s | 196 # @param profile_key (str): %(doc_profile_key)s |
218 @return: list of Deferred instances. | 197 # @return: list of Deferred instances. |
219 """ | 198 # """ |
220 client = self.host.getClient(profile_key) | 199 # client = self.host.getClient(profile_key) |
221 found_nodes = yield self.listNodes(service, profile=client.profile) | 200 # found_nodes = yield self.listNodes(service, profile=client.profile) |
222 subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | 201 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) |
223 d_list = [] | 202 # d_list = [] |
224 for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): | 203 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): |
225 if nodeIdentifier not in found_nodes: | 204 # if nodeIdentifier not in found_nodes: |
226 log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) | 205 # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) |
227 continue # avoid sat-pubsub "SubscriptionExists" error | 206 # continue # avoid sat-pubsub "SubscriptionExists" error |
228 d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) | 207 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) |
229 defer.returnValue(d_list) | 208 # defer.returnValue(d_list) |
230 | 209 |
231 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): | 210 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): |
232 client = self.host.getClient(profile_key) | 211 client = self.host.getClient(profile_key) |
233 return client.pubsub_client.subscriptions(service, nodeIdentifier) | 212 return client.pubsub_client.subscriptions(service, nodeIdentifier) |
234 | 213 |
270 request.nodeIdentifier = nodeIdentifier | 249 request.nodeIdentifier = nodeIdentifier |
271 request.sender = sender | 250 request.sender = sender |
272 d = request.send(self.xmlstream) | 251 d = request.send(self.xmlstream) |
273 | 252 |
274 def cb(iq): | 253 def cb(iq): |
254 # FIXME: to be checked | |
275 return [sub for sub in iq.pubsub.subscriptions.elements() if | 255 return [sub for sub in iq.pubsub.subscriptions.elements() if |
276 (sub.uri == pubsub.NS_PUBSUB and sub.name == 'subscription')] | 256 (sub.uri == pubsub.NS_PUBSUB and sub.name == 'subscription')] |
277 | 257 |
278 return d.addCallback(cb) | 258 return d.addCallback(cb) |
279 | 259 |