comparison src/plugins/plugin_xep_0060.py @ 1459:4c4f88d7b156

plugins xep-0060, xep-0163, xep-0277, groupblog: bloging improvments (huge patch, sorry): /!\ not everything is working yet, and specially groupblogs are broken /!\ - renamed bridge api to use prefixed methods (e.g. psSubscribeToMany instead of subscribeToMany in PubSub) - (xep-0060): try to find a default PubSub service, and put it in client.pubsub_service - (xep-0060): extra dictionary can be used in bridge method for RSM and other options - (xep-0060): XEP_0060.addManagedNode and XEP_0060.removeManagedNode allow to easily catch notifications for a specific node - (xep-0060): retractItem manage "notify" attribute - (xep-0060): new signal psEvent will be used to transmit notifications to frontends - (xep-0060, constants): added a bunch of useful constants - (xep-0163): removed personalEvent in favor of psEvent - (xep-0163): addPEPEvent now filter non PEP events for in_callback - (xep-0277): use of new XEP-0060 plugin's addManagedNode - (xep-0277): fixed author handling for incoming blogs: author is the human readable name, author_jid it jid, and author_jid_verified is set to True is the jid is checked - (xep-0277): reworked data2entry with Twisted instead of feed, item_id can now be specified, <content/> is changed to <title/> if there is only content - (xep-0277): comments are now managed here (core removed from groupblog) - (xep-0277): (comments) node is created if needed, default pubsub service is used if available, else PEP - (xep-0277): retract is managed
author Goffi <goffi@goffi.org>
date Sun, 16 Aug 2015 00:39:44 +0200
parents 5116d70ddd1c
children 0f0889028eea
comparison
equal deleted inserted replaced
1458:832846fefe85 1459:4c4f88d7b156
25 from sat.tools import sat_defer 25 from sat.tools import sat_defer
26 26
27 from twisted.words.protocols.jabber import jid 27 from twisted.words.protocols.jabber import jid
28 from twisted.internet import defer 28 from twisted.internet import defer
29 from wokkel import disco 29 from wokkel import disco
30 # XXX: tmp.pubsub is actually use instead of wokkel version
31 # same thing for rsm
30 from wokkel import pubsub 32 from wokkel import pubsub
31 from wokkel import rsm 33 from wokkel import rsm
32 from zope.interface import implements 34 from zope.interface import implements
33 # from twisted.internet import defer 35 from collections import namedtuple
34 import uuid 36 import uuid
35 37
36 UNSPECIFIED = "unspecified error" 38 UNSPECIFIED = "unspecified error"
37 39
38 40
47 "handler": "yes", 49 "handler": "yes",
48 "description": _("""Implementation of PubSub Protocol""") 50 "description": _("""Implementation of PubSub Protocol""")
49 } 51 }
50 52
51 53
54 Extra = namedtuple('Extra', ('rsm_request', 'extra'))
55 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None
56 # extra is a potentially empty dict
57
58
52 class XEP_0060(object): 59 class XEP_0060(object):
53 OPT_ACCESS_MODEL = 'pubsub#access_model' 60 OPT_ACCESS_MODEL = 'pubsub#access_model'
54 OPT_PERSIST_ITEMS = 'pubsub#persist_items' 61 OPT_PERSIST_ITEMS = 'pubsub#persist_items'
55 OPT_MAX_ITEMS = 'pubsub#max_items' 62 OPT_MAX_ITEMS = 'pubsub#max_items'
56 OPT_DELIVER_PAYLOADS = 'pubsub#deliver_payloads' 63 OPT_DELIVER_PAYLOADS = 'pubsub#deliver_payloads'
58 OPT_NODE_TYPE = 'pubsub#node_type' 65 OPT_NODE_TYPE = 'pubsub#node_type'
59 OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type' 66 OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type'
60 OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth' 67 OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth'
61 OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed' 68 OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed'
62 OPT_PUBLISH_MODEL = 'pubsub#publish_model' 69 OPT_PUBLISH_MODEL = 'pubsub#publish_model'
70 ACCESS_OPEN = 'open'
71 ACCESS_PRESENCE = 'presence'
72 ACCESS_ROSTER = 'roster'
73 ACCESS_AUTHORIZE = 'authorize'
74 ACCESS_WHITELIST = 'whitelist'
63 75
64 def __init__(self, host): 76 def __init__(self, host):
65 log.info(_(u"PubSub plugin initialization")) 77 log.info(_(u"PubSub plugin initialization"))
66 self.host = host 78 self.host = host
67 self.managedNodes = [] 79 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks)
68 self.rt_sessions = sat_defer.RTDeferredSessions() 80 self.rt_sessions = sat_defer.RTDeferredSessions()
69 host.bridge.addMethod("subscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany) 81 host.bridge.addMethod("psDeleteNode", ".plugin", in_sign='sss', out_sign='', method=self._deleteNode, async=True)
70 host.bridge.addMethod("getSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True) 82 host.bridge.addMethod("psRetractItem", ".plugin", in_sign='sssbs', out_sign='', method=self._retractItem, async=True)
71 host.bridge.addMethod("getFromMany", ".plugin", in_sign='a(ss)ia{ss}s', out_sign='s', method=self._getFromMany) 83 host.bridge.addMethod("psRetractItems", ".plugin", in_sign='ssasbs', out_sign='', method=self._retractItems, async=True)
72 host.bridge.addMethod("getFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssasa{ss}))', method=self._getFromManyRTResult, async=True) 84 host.bridge.addMethod("psSubscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany)
85 host.bridge.addMethod("psGetSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True)
86 host.bridge.addMethod("psGetFromMany", ".plugin", in_sign='a(ss)ia{ss}s', out_sign='s', method=self._getFromMany)
87 host.bridge.addMethod("psGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssasa{ss}))', method=self._getFromManyRTResult, async=True)
88 host.bridge.addSignal("psEvent", ".plugin", signature='ssssa{ss}s') # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile
73 89
74 def getHandler(self, profile): 90 def getHandler(self, profile):
75 client = self.host.getClient(profile) 91 client = self.host.getClient(profile)
76 client.pubsub_client = SatPubSubClient(self.host, self) 92 client.pubsub_client = SatPubSubClient(self.host, self)
77 return client.pubsub_client 93 return client.pubsub_client
78 94
79 def addManagedNode(self, node_name, callback): 95 @defer.inlineCallbacks
80 """Add a handler for a namespace 96 def profileConnected(self, profile):
81 97 client = self.host.getClient(profile)
82 @param namespace: NS of the handler (will appear in disco info) 98 pubsub_services = yield self.host.findServiceEntities("pubsub", "service", profile_key = profile)
83 @param callback: method to call when the handler is found 99 if pubsub_services:
84 @param profile: profile which manage this handler""" 100 # we use one of the found services as our default pubsub service
85 self.managedNodes.append((node_name, callback)) 101 client.pubsub_service = pubsub_services.pop()
102 else:
103 client.pubsub_service = None
104
105 def parseExtra(self, extra):
106 """Parse extra dictionnary
107
108 used bridge's extra dictionnaries
109 @param extra(dict): extra data used to configure request
110 @return(Extra): filled Extra instance
111 """
112 if extra is not None:
113 rsm_dict = { key[4:]: value for key, value in extra.iteritems() if key.startswith('rsm_') }
114 if rsm_dict:
115 try:
116 rsm_dict['max_'] = rsm_dict.pop('max')
117 except KeyError:
118 pass
119 rsm_request = rsm.RSMRequest(**rsm_dict)
120 else:
121 rsm_request = None
122 else:
123 rsm_request = None
124 extra = {}
125 return Extra(rsm_request, extra)
126
127 def addManagedNode(self, node, **kwargs):
128 """Add a handler for a node
129
130 @param node(unicode): node to monitor, or None to monitor all
131 @param **kwargs: method(s) to call when the node is found
132 the methode must be named after PubSub constants in lower case
133 and suffixed with "_cb"
134 e.g.: "publish_cb" for C.PS_PUBLISH, "delete_cb" for C.PS_DELETE
135 """
136 assert kwargs
137 callbacks = self._node_cb.setdefault(node, {})
138 for event, cb in kwargs.iteritems():
139 event_name = event[:-3]
140 assert event_name in C.PS_EVENTS
141 callbacks.setdefault(event_name,[]).append(cb)
142
143 def removeManagedNode(self, node, *args):
144 """Add a handler for a node
145
146 @param node(unicode): node to monitor
147 @param *args: callback(s) to remove
148 """
149 assert args
150 try:
151 registred_cb = self._node_cb[node]
152 except KeyError:
153 pass
154 else:
155 for callback in args:
156 for event, cb_list in registred_cb.iteritems():
157 try:
158 cb_list.remove(callback)
159 except ValueError:
160 pass
161 else:
162 log.debug(u"removed callback {cb} for event {event} on node {node}".format(
163 cb=callback, event=event, node=node))
164 if not cb_list:
165 del registred_cb[event]
166 if not registred_cb:
167 del self._node_cb[node]
168 return
169 log.error(u"Trying to remove inexistant callback {cb} for node {node}".format(cb=callback, node=node))
86 170
87 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): 171 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
88 # """Retrieve the name of the nodes that are accessible on the target service. 172 # """Retrieve the name of the nodes that are accessible on the target service.
89 173
90 # @param service (JID): target service 174 # @param service (JID): target service
115 199
116 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): 200 def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
117 client = self.host.getClient(profile_key) 201 client = self.host.getClient(profile_key)
118 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) 202 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid)
119 203
120 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm_request=None, profile_key=C.PROF_KEY_NONE): 204 def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE):
121 """Retrieve pubsub items from a node. 205 """Retrieve pubsub items from a node.
122 206
123 @param service (JID): pubsub service. 207 @param service (JID): pubsub service.
124 @param node (str): node id. 208 @param node (str): node id.
125 @param max_items (int): optional limit on the number of retrieved items. 209 @param max_items (int): optional limit on the number of retrieved items.
132 - metadata with the following keys: 216 - metadata with the following keys:
133 - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse 217 - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse
134 """ 218 """
135 if rsm_request and item_ids: 219 if rsm_request and item_ids:
136 raise ValueError("items_id can't be used with rsm") 220 raise ValueError("items_id can't be used with rsm")
137 client = self.host.getClient(profile_key) 221 if extra is None:
138 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm_request} if rsm_request else None 222 extra = {}
223 client = self.host.getClient(profile_key)
224 ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm_request} if rsm_request is not None else None
139 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data) 225 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data)
226
227 try:
228 subscribe = C.bool(extra['subscribe'])
229 except KeyError:
230 subscribe = False
231
232 def doSubscribe(items):
233 self.subscribe(service, node, profile_key=profile_key)
234 return items
235
236 if subscribe:
237 d.addCallback(doSubscribe)
238
140 def addMetadata(items): 239 def addMetadata(items):
141 metadata = {} 240 metadata = {}
142 if rsm_request: 241 if rsm_request is not None:
143 rsm_data = client.pubsub_client.getRSMResponse(ext_data['id']) 242 rsm_data = client.pubsub_client.getRSMResponse(ext_data['id'])
144 metadata.update({'rsm_{}'.format(key): value for key, value in rsm_data}) 243 metadata.update({'rsm_{}'.format(key): value for key, value in rsm_data})
145 return (items, metadata) 244 return (items, metadata)
146 245
147 d.addCallback(addMetadata) 246 d.addCallback(addMetadata)
183 282
184 def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): 283 def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
185 client = self.host.getClient(profile_key) 284 client = self.host.getClient(profile_key)
186 return client.pubsub_client.createNode(service, nodeIdentifier, options) 285 return client.pubsub_client.createNode(service, nodeIdentifier, options)
187 286
287 def _deleteNode(self, service_s, nodeIdentifier, profile_key):
288 return self.deleteNode(jid.JID(service_s) if service_s else None, nodeIdentifier, profile_key)
289
188 def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): 290 def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
189 client = self.host.getClient(profile_key) 291 client = self.host.getClient(profile_key)
190 return client.pubsub_client.deleteNode(service, nodeIdentifier) 292 return client.pubsub_client.deleteNode(service, nodeIdentifier)
191 293
192 def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE): 294 def _retractItem(self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key):
193 client = self.host.getClient(profile_key) 295 return self._retractItems(service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key)
194 return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers) 296
297 def _retractItems(self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key):
298 return self.retractItems(jid.JID(service_s) if service_s else None, nodeIdentifier, itemIdentifiers, notify, profile_key)
299
300 def retractItems(self, service, nodeIdentifier, itemIdentifiers, notify=True, profile_key=C.PROF_KEY_NONE):
301 client = self.host.getClient(profile_key)
302 return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers, notify=True)
195 303
196 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): 304 def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
305 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
197 client = self.host.getClient(profile_key) 306 client = self.host.getClient(profile_key)
198 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) 307 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)
199 308
200 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): 309 def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
201 client = self.host.getClient(profile_key) 310 client = self.host.getClient(profile_key)
329 d.addCallback(lambda ret: (ret[0], 438 d.addCallback(lambda ret: (ret[0],
330 [(service.full(), node, failure, items, metadata) 439 [(service.full(), node, failure, items, metadata)
331 for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) 440 for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()]))
332 return d 441 return d
333 442
334 def _getFromMany(self, node_data, max_item=10, rsm_dict=None, profile_key=C.PROF_KEY_NONE): 443 def _getFromMany(self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE):
335 """ 444 """
336 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit 445 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
337 """ 446 """
338 max_item = None if max_item == C.NO_LIMIT else max_item 447 max_item = None if max_item == C.NO_LIMIT else max_item
339 return self.getFromMany([(jid.JID(service), unicode(node)) for service, node in node_data], max_item, rsm.RSMRequest(**rsm_dict) if rsm_dict else None, profile_key) 448 extra = self.parseExtra(extra_dict)
340 449 return self.getFromMany([(jid.JID(service), unicode(node)) for service, node in node_data], max_item, extra.rsm_request, extra.extra, profile_key)
341 def getFromMany(self, node_data, max_item=None, rsm_request=None, profile_key=C.PROF_KEY_NONE): 450
451 def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE):
342 """Get items from many nodes at once 452 """Get items from many nodes at once
343 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: 453 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
344 - service (jid.JID) is the pubsub service 454 - service (jid.JID) is the pubsub service
345 - node (unicode) is the node to get items from 455 - node (unicode) is the node to get items from
346 @param max_items (int): optional limit on the number of retrieved items. 456 @param max_items (int): optional limit on the number of retrieved items.
349 @return (str): RT Deferred session id 459 @return (str): RT Deferred session id
350 """ 460 """
351 client = self.host.getClient(profile_key) 461 client = self.host.getClient(profile_key)
352 deferreds = {} 462 deferreds = {}
353 for service, node in node_data: 463 for service, node in node_data:
354 deferreds[(service, node)] = self.getItems(service, node, max_item, rsm_request=rsm_request, profile_key=profile_key) 464 deferreds[(service, node)] = self.getItems(service, node, max_item, rsm_request=rsm_request, extra=extra, profile_key=profile_key)
355 return self.rt_sessions.newSession(deferreds, client.profile) 465 return self.rt_sessions.newSession(deferreds, client.profile)
356 466
357 467
358 class SatPubSubClient(rsm.PubSubClient): 468 class SatPubSubClient(rsm.PubSubClient):
359 implements(disco.IDisco) 469 implements(disco.IDisco)
365 475
366 def connectionInitialized(self): 476 def connectionInitialized(self):
367 rsm.PubSubClient.connectionInitialized(self) 477 rsm.PubSubClient.connectionInitialized(self)
368 478
369 def itemsReceived(self, event): 479 def itemsReceived(self, event):
370 if not self.host.trigger.point("PubSubItemsReceived", event, self.parent.profile): 480 log.debug(u"Pubsub items received")
371 return 481 for node in (event.nodeIdentifier, None):
372 for node in self.parent_plugin.managedNodes: 482 try:
373 if event.nodeIdentifier == node[0]: 483 callbacks = self.parent_plugin._node_cb[node][C.PS_ITEMS]
374 node[1](event, self.parent.profile) 484 except KeyError:
485 pass
486 else:
487 for callback in callbacks:
488 callback(event, self.parent.profile)
375 489
376 def deleteReceived(self, event): 490 def deleteReceived(self, event):
377 #TODO: manage delete event 491 log.debug((u"Publish node deleted"))
378 log.debug(_(u"Publish node deleted")) 492 for node in (event.nodeIdentifier, None):
379 493 try:
380 # def purgeReceived(self, event): 494 callbacks = self.parent_plugin._node_cb[node][C.PS_DELETE]
495 except KeyError:
496 pass
497 else:
498 for callback in callbacks:
499 callback(event, self.parent.profile)
381 500
382 def subscriptions(self, service, nodeIdentifier, sender=None): 501 def subscriptions(self, service, nodeIdentifier, sender=None):
383 """Return the list of subscriptions to the given service and node. 502 """Return the list of subscriptions to the given service and node.
384 503
385 @param service: The publish subscribe service to retrieve the subscriptions from. 504 @param service: The publish subscribe service to retrieve the subscriptions from.