comparison src/plugins/plugin_xep_0060.py @ 1446:e8c8e467964b

plugins xep-0060, xep-0277: code simplification/cleaning/fix: - plugin xep-0060: moved rsm data to a more general metadata dict, which will contain all data relative to the node/items set. RSM metadata are prefixed with "rsm_" - plugin xep-0060: minor docstring fixes - plugin xep-0060: removed cache to simplify code base - fixed broken getLastMicroblogs - added _getLastMicroblogs as wrapper to getLastMicroblogs, for bridge - removed lxml dependecy for this plugin, use native twisted instead - several improvments/fixes in item2mbdata
author Goffi <goffi@goffi.org>
date Sat, 15 Aug 2015 22:13:27 +0200
parents 7c0acb966fd6
children 389357fd79ce
comparison
equal deleted inserted replaced
1445:ddc7a39ff9d1 1446:e8c8e467964b
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger 22 from sat.core.log import getLogger
23 log = getLogger(__name__) 23 log = getLogger(__name__)
24 from sat.memory.memory import Sessions
25 24
26 from wokkel import disco, pubsub, rsm 25 from wokkel import disco, pubsub, rsm
27 from zope.interface import implements 26 from zope.interface import implements
28 from twisted.internet import defer 27 # from twisted.internet import defer
29 import uuid 28 import uuid
30 29
31 30
32 PLUGIN_INFO = { 31 PLUGIN_INFO = {
33 "name": "Publish-Subscribe", 32 "name": "Publish-Subscribe",
56 55
57 def __init__(self, host): 56 def __init__(self, host):
58 log.info(_(u"PubSub plugin initialization")) 57 log.info(_(u"PubSub plugin initialization"))
59 self.host = host 58 self.host = host
60 self.managedNodes = [] 59 self.managedNodes = []
61 self.node_cache = Sessions(timeout=60, resettable_timeout=False)
62 60
63 def getHandler(self, profile): 61 def getHandler(self, profile):
64 client = self.host.getClient(profile) 62 client = self.host.getClient(profile)
65 client.pubsub_client = SatPubSubClient(self.host, self) 63 client.pubsub_client = SatPubSubClient(self.host, self)
66 return client.pubsub_client 64 return client.pubsub_client
71 @param namespace: NS of the handler (will appear in disco info) 69 @param namespace: NS of the handler (will appear in disco info)
72 @param callback: method to call when the handler is found 70 @param callback: method to call when the handler is found
73 @param profile: profile which manage this handler""" 71 @param profile: profile which manage this handler"""
74 self.managedNodes.append((node_name, callback)) 72 self.managedNodes.append((node_name, callback))
75 73
76 def _getDeferredNodeCache(self, session_id, init, profile): 74 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
77 """Manage a node cache with deferred initialisation and concurrent access. 75 # """Retrieve the name of the nodes that are accessible on the target service.
78 76
79 @param session_id (string): node cache session ID 77 # @param service (JID): target service
80 @param init (Deferred): deferred list of strings to initialise the cache. 78 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
81 @param profile (str): %(doc_profile)s 79 # @param profile (str): %(doc_profile)s
82 @return: Deferred list[str] 80 # @return: deferred which fire a list of nodes
83 """ 81 # """
84 if session_id in self.node_cache: 82 # d = self.host.getDiscoItems(service, nodeIdentifier, profile_key=profile)
85 cache = self.node_cache.profileGet(session_id, profile) 83 # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
86 if cache['nodes'] is None: 84 # return d
87 # init is still running 85
88 d = defer.Deferred() 86 # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
89 cache['waiting'].append(d) 87 # """Retrieve the name of the nodes to which the profile is subscribed on the target service.
90 return d 88
91 return defer.succeed(cache['nodes']) 89 # @param service (JID): target service
92 90 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
93 cache = {'init': init, 'waiting': [], 'nodes': None} 91 # @param filter_ (str): filter the result according to the given subscription type:
94 self.node_cache.newSession(cache, session_id=session_id, profile=profile) 92 # - None: do not filter
95 93 # - 'pending': subscription has not been approved yet by the node owner
96 def cb(nodes): 94 # - 'unconfigured': subscription options have not been configured yet
97 cache['nodes'] = nodes 95 # - 'subscribed': subscription is complete
98 for d in cache['waiting']: 96 # @param profile (str): %(doc_profile)s
99 d.callback(nodes) 97 # @return: Deferred list[str]
100 return nodes 98 # """
101 99 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
102 return init.addCallback(cb) 100 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
103 101 # return d
104 def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
105 """Retrieve the name of the nodes that are accessible on the target service.
106
107 @param service (JID): target service
108 @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
109 @param profile (str): %(doc_profile)s
110 @return: Deferred list[str]
111 """
112 session_id = profile + '@found@' + service.userhost()
113 d = self.host.getDiscoItems(service, nodeIdentifier, profile_key=profile)
114 d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
115 return self._getDeferredNodeCache(session_id, d, profile)
116
117 def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
118 """Retrieve the name of the nodes to which the profile is subscribed on the target service.
119
120 @param service (JID): target service
121 @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
122 @param filter_ (str): filter the result according to the given subscription type:
123 - None: do not filter
124 - 'pending': subscription has not been approved yet by the node owner
125 - 'unconfigured': subscription options have not been configured yet
126 - 'subscribed': subscription is complete
127 @param profile (str): %(doc_profile)s
128 @return: Deferred list[str]
129 """
130 session_id = profile + '@subscriptions@' + service.userhost()
131 d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
132 d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
133 return self._getDeferredNodeCache(session_id, d, profile)
134 102
135 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): 103 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
136 client = self.host.getClient(profile_key) 104 client = self.host.getClient(profile_key)
137 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) 105 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid)
138 106
140 """Retrieve pubsub items from a node. 108 """Retrieve pubsub items from a node.
141 109
142 @param service (JID): target service. 110 @param service (JID): target service.
143 @param node (str): node id. 111 @param node (str): node id.
144 @param max_items (int): optional limit on the number of retrieved items. 112 @param max_items (int): optional limit on the number of retrieved items.
145 @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used). 113 @param item_ids (list[str]): identifiers of the items to be retrieved (can't be used with rsm).
146 @param sub_id (str): optional subscription identifier. 114 @param sub_id (str): optional subscription identifier.
147 @param rsm (dict): RSM request data 115 @param rsm (dict): RSM request data
148 @param profile_key (str): %(doc_profile_key)s 116 @param profile_key (str): %(doc_profile_key)s
149 @return: a deferred couple (list[dict], dict) containing: 117 @return: a deferred couple (list[dict], dict) containing:
150 - list of items 118 - list of items
151 - RSM response data 119 - metadata with the following keys:
120 - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse
152 """ 121 """
122 if rsm and item_ids:
123 raise ValueError("items_id can't be used with rsm")
153 client = self.host.getClient(profile_key) 124 client = self.host.getClient(profile_key)
154 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None 125 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None
155 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data) 126 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data)
156 d.addCallback(lambda items: (items, client.pubsub_client.getRSMResponse(ext_data['id']) if rsm else {})) 127 def addMetadata(items):
128 if not rsm:
129 metadata = {}
130 else:
131 rsm_data = client.pubsub_client.getRSMResponse(ext_data['id'])
132 metadata = {'rsm_{}'.format(key): value for key, value in rsm_data}
133 return (items, metadata)
134
135 d.addCallback(addMetadata)
157 return d 136 return d
158 137
159 @defer.inlineCallbacks 138 # @defer.inlineCallbacks
160 def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): 139 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
161 """Massively retrieve pubsub items from many nodes. 140 # """Massively retrieve pubsub items from many nodes.
162 141
163 @param service (JID): target service. 142 # @param service (JID): target service.
164 @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. 143 # @param data (dict): dictionnary binding some arbitrary keys to the node identifiers.
165 @param max_items (int): optional limit on the number of retrieved items *per node*. 144 # @param max_items (int): optional limit on the number of retrieved items *per node*.
166 @param sub_id (str): optional subscription identifier. 145 # @param sub_id (str): optional subscription identifier.
167 @param rsm (dict): RSM request data 146 # @param rsm (dict): RSM request data
168 @param profile_key (str): %(doc_profile_key)s 147 # @param profile_key (str): %(doc_profile_key)s
169 @return: a deferred dict with: 148 # @return: a deferred dict with:
170 - key: a value in (a subset of) data.keys() 149 # - key: a value in (a subset of) data.keys()
171 - couple (list[dict], dict) containing: 150 # - couple (list[dict], dict) containing:
172 - list of items 151 # - list of items
173 - RSM response data 152 # - RSM response data
174 """ 153 # """
175 client = self.host.getClient(profile_key) 154 # client = self.host.getClient(profile_key)
176 found_nodes = yield self.listNodes(service, profile=client.profile) 155 # found_nodes = yield self.listNodes(service, profile=client.profile)
177 d_dict = {} 156 # d_dict = {}
178 for publisher, node in data.items(): 157 # for publisher, node in data.items():
179 if node not in found_nodes: 158 # if node not in found_nodes:
180 log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) 159 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
181 continue # avoid pubsub "item-not-found" error 160 # continue # avoid pubsub "item-not-found" error
182 d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) 161 # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile)
183 defer.returnValue(d_dict) 162 # defer.returnValue(d_dict)
184 163
185 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 164 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
186 client = self.host.getClient(profile_key) 165 client = self.host.getClient(profile_key)
187 return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) 166 return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier)
188 167
204 183
205 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): 184 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
206 client = self.host.getClient(profile_key) 185 client = self.host.getClient(profile_key)
207 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) 186 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)
208 187
209 @defer.inlineCallbacks 188 # @defer.inlineCallbacks
210 def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): 189 # def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
211 """Massively subscribe to many nodes. 190 # """Massively subscribe to many nodes.
212 191
213 @param service (JID): target service. 192 # @param service (JID): target service.
214 @param nodeIdentifiers (list): the list of node identifiers to subscribe to. 193 # @param nodeIdentifiers (list): the list of node identifiers to subscribe to.
215 @param sub_id (str): optional subscription identifier. 194 # @param sub_id (str): optional subscription identifier.
216 @param options (list): optional list of subscription options 195 # @param options (list): optional list of subscription options
217 @param profile_key (str): %(doc_profile_key)s 196 # @param profile_key (str): %(doc_profile_key)s
218 @return: list of Deferred instances. 197 # @return: list of Deferred instances.
219 """ 198 # """
220 client = self.host.getClient(profile_key) 199 # client = self.host.getClient(profile_key)
221 found_nodes = yield self.listNodes(service, profile=client.profile) 200 # found_nodes = yield self.listNodes(service, profile=client.profile)
222 subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) 201 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
223 d_list = [] 202 # d_list = []
224 for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): 203 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
225 if nodeIdentifier not in found_nodes: 204 # if nodeIdentifier not in found_nodes:
226 log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) 205 # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
227 continue # avoid sat-pubsub "SubscriptionExists" error 206 # continue # avoid sat-pubsub "SubscriptionExists" error
228 d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) 207 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
229 defer.returnValue(d_list) 208 # defer.returnValue(d_list)
230 209
231 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): 210 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
232 client = self.host.getClient(profile_key) 211 client = self.host.getClient(profile_key)
233 return client.pubsub_client.subscriptions(service, nodeIdentifier) 212 return client.pubsub_client.subscriptions(service, nodeIdentifier)
234 213
270 request.nodeIdentifier = nodeIdentifier 249 request.nodeIdentifier = nodeIdentifier
271 request.sender = sender 250 request.sender = sender
272 d = request.send(self.xmlstream) 251 d = request.send(self.xmlstream)
273 252
274 def cb(iq): 253 def cb(iq):
254 # FIXME: to be checked
275 return [sub for sub in iq.pubsub.subscriptions.elements() if 255 return [sub for sub in iq.pubsub.subscriptions.elements() if
276 (sub.uri == pubsub.NS_PUBSUB and sub.name == 'subscription')] 256 (sub.uri == pubsub.NS_PUBSUB and sub.name == 'subscription')]
277 257
278 return d.addCallback(cb) 258 return d.addCallback(cb)
279 259