# HG changeset patch # User Goffi # Date 1439678384 -7200 # Node ID 4c4f88d7b1564f08dd682afabaf99837c7b23c70 # Parent 832846fefe8586f4b5d61ef12697a046306b9547 plugins xep-0060, xep-0163, xep-0277, groupblog: bloging improvments (huge patch, sorry): /!\ not everything is working yet, and specially groupblogs are broken /!\ - renamed bridge api to use prefixed methods (e.g. psSubscribeToMany instead of subscribeToMany in PubSub) - (xep-0060): try to find a default PubSub service, and put it in client.pubsub_service - (xep-0060): extra dictionary can be used in bridge method for RSM and other options - (xep-0060): XEP_0060.addManagedNode and XEP_0060.removeManagedNode allow to easily catch notifications for a specific node - (xep-0060): retractItem manage "notify" attribute - (xep-0060): new signal psEvent will be used to transmit notifications to frontends - (xep-0060, constants): added a bunch of useful constants - (xep-0163): removed personalEvent in favor of psEvent - (xep-0163): addPEPEvent now filter non PEP events for in_callback - (xep-0277): use of new XEP-0060 plugin's addManagedNode - (xep-0277): fixed author handling for incoming blogs: author is the human readable name, author_jid it jid, and author_jid_verified is set to True is the jid is checked - (xep-0277): reworked data2entry with Twisted instead of feed, item_id can now be specified, is changed to if there is only content - (xep-0277): comments are now managed here (core removed from groupblog) - (xep-0277): (comments) node is created if needed, default pubsub service is used if available, else PEP - (xep-0277): retract is managed diff -r 832846fefe85 -r 4c4f88d7b156 src/core/constants.py --- a/src/core/constants.py Sun Aug 16 00:06:59 2015 +0200 +++ b/src/core/constants.py Sun Aug 16 00:39:44 2015 +0200 @@ -82,7 +82,8 @@ ENTITY_CAP_HASH = 'CAP_HASH' ## Roster jids selection ## - ALL = 'ALL' + PUBLIC = 'PUBLIC' + ALL = 'ALL' # ALL means all known contacts, while PUBLIC means everybody, known or not GROUP = 'GROUP' JID = 'JID' @@ -129,6 +130,18 @@ # names of widely used plugins TEXT_CMDS = 'TEXT-COMMANDS' + # PubSub event categories + PS_PEP = "PEP" + PS_MICROBLOG = "MICROBLOG" + + # PubSub + PS_PUBLISH = "publish" + PS_RETRACT = "retract" # used for items + PS_DELETE = "delete" #used for nodes + PS_ITEM = "item" + PS_ITEMS = "items" # Can contain publish and retract items + PS_EVENTS = (PS_ITEMS, PS_DELETE) + ## XMLUI ## XMLUI_WINDOW = 'window' @@ -230,7 +243,7 @@ def bool(cls, value): """@return (bool): bool value for associated constant""" assert isinstance(value, basestring) - return value.lower() == cls.BOOL_TRUE + return value.lower() in (cls.BOOL_TRUE, "1") @classmethod def boolConst(cls, value): diff -r 832846fefe85 -r 4c4f88d7b156 src/plugins/plugin_misc_groupblog.py --- a/src/plugins/plugin_misc_groupblog.py Sun Aug 16 00:06:59 2015 +0200 +++ b/src/plugins/plugin_misc_groupblog.py Sun Aug 16 00:39:44 2015 +0200 @@ -29,8 +29,7 @@ from wokkel import rsm from zope.interface import implements from feed import date -import uuid -import urllib +# import uuid try: from twisted.words.protocols.xmlstream import XMPPHandler @@ -40,7 +39,6 @@ NS_PUBSUB = 'http://jabber.org/protocol/pubsub' NS_GROUPBLOG = 'http://goffi.org/protocol/groupblog' NS_NODE_PREFIX = 'urn:xmpp:groupblog:' -NS_COMMENT_PREFIX = 'urn:xmpp:comments:' #NS_PUBSUB_EXP = 'http://goffi.org/protocol/pubsub' #for non official features NS_PUBSUB_EXP = NS_PUBSUB # XXX: we can't use custom namespace as Wokkel's PubSubService use official NS NS_PUBSUB_ITEM_ACCESS = NS_PUBSUB_EXP + "#item-access" @@ -137,7 +135,7 @@ # method=self.subscribeGroupBlog, # async=True) - host.trigger.add("PubSubItemsReceived", self.pubSubItemsReceivedTrigger) + # host.trigger.add("PubSubItemsReceived", self.pubSubItemsReceivedTrigger) ## plugin management methods ## @@ -172,43 +170,43 @@ defer.returnValue((profile, client)) - def pubSubItemsReceivedTrigger(self, event, profile): - """"Trigger which catch groupblogs events""" + # def pubSubItemsReceivedTrigger(self, event, profile): + # """"Trigger which catch groupblogs events""" - if event.nodeIdentifier.startswith(NS_NODE_PREFIX): - # Microblog - publisher = jid.JID(event.nodeIdentifier[len(NS_NODE_PREFIX):]) - origin_host = publisher.host.split('.') - event_host = event.sender.host.split('.') - #FIXME: basic origin check, must be improved - #TODO: automatic security test - if (not (origin_host) - or len(event_host) < len(origin_host) - or event_host[-len(origin_host):] != origin_host): - log.warning(u"Host incoherence between %s and %s (hack attempt ?)" % (unicode(event.sender), - unicode(publisher))) - return False + # if event.nodeIdentifier.startswith(NS_NODE_PREFIX): + # # Microblog + # publisher = jid.JID(event.nodeIdentifier[len(NS_NODE_PREFIX):]) + # origin_host = publisher.host.split('.') + # event_host = event.sender.host.split('.') + # #FIXME: basic origin check, must be improved + # #TODO: automatic security test + # if (not (origin_host) + # or len(event_host) < len(origin_host) + # or event_host[-len(origin_host):] != origin_host): + # log.warning(u"Host incoherence between %s and %s (hack attempt ?)" % (unicode(event.sender), + # unicode(publisher))) + # return False - client = self.host.getClient(profile) + # client = self.host.getClient(profile) - def gbdataManagementMicroblog(gbdata): - for gbdatum in gbdata: - self.host.bridge.personalEvent(publisher.full(), "MICROBLOG", gbdatum, profile) + # def gbdataManagementMicroblog(gbdata): + # for gbdatum in gbdata: + # self.host.bridge.personalEvent(publisher.full(), "MICROBLOG", gbdatum, profile) - d = self._itemsConstruction(event.items, publisher, client) - d.addCallback(gbdataManagementMicroblog) - return False + # d = self._itemsConstruction(event.items, publisher, client) + # d.addCallback(gbdataManagementMicroblog) + # return False - elif event.nodeIdentifier.startswith(NS_COMMENT_PREFIX): - # Comment - def gbdataManagementComments(gbdata): - for gbdatum in gbdata: - publisher = None # FIXME: see below (_handleCommentsItems) - self.host.bridge.personalEvent(publisher.full() if publisher else gbdatum["author"], "MICROBLOG", gbdatum, profile) - d = self._handleCommentsItems(event.items, event.sender, event.nodeIdentifier) - d.addCallback(gbdataManagementComments) - return False - return True + # elif event.nodeIdentifier.startswith(NS_COMMENT_PREFIX): + # # Comment + # def gbdataManagementComments(gbdata): + # for gbdatum in gbdata: + # publisher = None # FIXME: see below (_handleCommentsItems) + # self.host.bridge.personalEvent(publisher.full() if publisher else gbdatum["author"], "MICROBLOG", gbdatum, profile) + # d = self._handleCommentsItems(event.items, event.sender, event.nodeIdentifier) + # d.addCallback(gbdataManagementComments) + # return False + # return True ## internal helping methodes ## @@ -338,20 +336,20 @@ entry_d.addCallback(itemCreated) return entry_d - def _fillCommentsElement(self, mblog_data, entry_id, node_name, service_jid): - """ - @param mblog_data: dict containing the microblog data - @param entry_id: unique identifier of the entry - @param node_name: the pubsub node name - @param service_jid: the JID of the pubsub service - @return: the comments node string - """ - if entry_id is None: - entry_id = unicode(uuid.uuid4()) - comments_node = "%s_%s__%s" % (NS_COMMENT_PREFIX, entry_id, node_name) - mblog_data['comments'] = "xmpp:%(service)s?%(query)s" % {'service': service_jid.userhost(), - 'query': urllib.urlencode([('node', comments_node.encode('utf-8'))])} - return comments_node + # def _fillCommentsElement(self, mblog_data, entry_id, node_name, service_jid): + # """ + # @param mblog_data: dict containing the microblog data + # @param entry_id: unique identifier of the entry + # @param node_name: the pubsub node name + # @param service_jid: the JID of the pubsub service + # @return: the comments node string + # """ + # if entry_id is None: + # entry_id = unicode(uuid.uuid4()) + # comments_node = "%s_%s__%s" % (NS_COMMENT_PREFIX, entry_id, node_name) + # mblog_data['comments'] = "xmpp:%(service)s?%(query)s" % {'service': service_jid.userhost(), + # 'query': urllib.urlencode([('node', comments_node.encode('utf-8'))])} + # return comments_node def _mblogPublicationFailed(self, failure): #TODO diff -r 832846fefe85 -r 4c4f88d7b156 src/plugins/plugin_xep_0060.py --- a/src/plugins/plugin_xep_0060.py Sun Aug 16 00:06:59 2015 +0200 +++ b/src/plugins/plugin_xep_0060.py Sun Aug 16 00:39:44 2015 +0200 @@ -27,10 +27,12 @@ from twisted.words.protocols.jabber import jid from twisted.internet import defer from wokkel import disco +# XXX: tmp.pubsub is actually use instead of wokkel version +# same thing for rsm from wokkel import pubsub from wokkel import rsm from zope.interface import implements -# from twisted.internet import defer +from collections import namedtuple import uuid UNSPECIFIED = "unspecified error" @@ -49,6 +51,11 @@ } +Extra = namedtuple('Extra', ('rsm_request', 'extra')) +# rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None +# extra is a potentially empty dict + + class XEP_0060(object): OPT_ACCESS_MODEL = 'pubsub#access_model' OPT_PERSIST_ITEMS = 'pubsub#persist_items' @@ -60,29 +67,106 @@ OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth' OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed' OPT_PUBLISH_MODEL = 'pubsub#publish_model' + ACCESS_OPEN = 'open' + ACCESS_PRESENCE = 'presence' + ACCESS_ROSTER = 'roster' + ACCESS_AUTHORIZE = 'authorize' + ACCESS_WHITELIST = 'whitelist' def __init__(self, host): log.info(_(u"PubSub plugin initialization")) self.host = host - self.managedNodes = [] + self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) self.rt_sessions = sat_defer.RTDeferredSessions() - host.bridge.addMethod("subscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany) - host.bridge.addMethod("getSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True) - host.bridge.addMethod("getFromMany", ".plugin", in_sign='a(ss)ia{ss}s', out_sign='s', method=self._getFromMany) - host.bridge.addMethod("getFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssasa{ss}))', method=self._getFromManyRTResult, async=True) + host.bridge.addMethod("psDeleteNode", ".plugin", in_sign='sss', out_sign='', method=self._deleteNode, async=True) + host.bridge.addMethod("psRetractItem", ".plugin", in_sign='sssbs', out_sign='', method=self._retractItem, async=True) + host.bridge.addMethod("psRetractItems", ".plugin", in_sign='ssasbs', out_sign='', method=self._retractItems, async=True) + host.bridge.addMethod("psSubscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany) + host.bridge.addMethod("psGetSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True) + host.bridge.addMethod("psGetFromMany", ".plugin", in_sign='a(ss)ia{ss}s', out_sign='s', method=self._getFromMany) + host.bridge.addMethod("psGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssasa{ss}))', method=self._getFromManyRTResult, async=True) + host.bridge.addSignal("psEvent", ".plugin", signature='ssssa{ss}s') # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile def getHandler(self, profile): client = self.host.getClient(profile) client.pubsub_client = SatPubSubClient(self.host, self) return client.pubsub_client - def addManagedNode(self, node_name, callback): - """Add a handler for a namespace + @defer.inlineCallbacks + def profileConnected(self, profile): + client = self.host.getClient(profile) + pubsub_services = yield self.host.findServiceEntities("pubsub", "service", profile_key = profile) + if pubsub_services: + # we use one of the found services as our default pubsub service + client.pubsub_service = pubsub_services.pop() + else: + client.pubsub_service = None + + def parseExtra(self, extra): + """Parse extra dictionnary + + used bridge's extra dictionnaries + @param extra(dict): extra data used to configure request + @return(Extra): filled Extra instance + """ + if extra is not None: + rsm_dict = { key[4:]: value for key, value in extra.iteritems() if key.startswith('rsm_') } + if rsm_dict: + try: + rsm_dict['max_'] = rsm_dict.pop('max') + except KeyError: + pass + rsm_request = rsm.RSMRequest(**rsm_dict) + else: + rsm_request = None + else: + rsm_request = None + extra = {} + return Extra(rsm_request, extra) + + def addManagedNode(self, node, **kwargs): + """Add a handler for a node - @param namespace: NS of the handler (will appear in disco info) - @param callback: method to call when the handler is found - @param profile: profile which manage this handler""" - self.managedNodes.append((node_name, callback)) + @param node(unicode): node to monitor, or None to monitor all + @param **kwargs: method(s) to call when the node is found + the methode must be named after PubSub constants in lower case + and suffixed with "_cb" + e.g.: "publish_cb" for C.PS_PUBLISH, "delete_cb" for C.PS_DELETE + """ + assert kwargs + callbacks = self._node_cb.setdefault(node, {}) + for event, cb in kwargs.iteritems(): + event_name = event[:-3] + assert event_name in C.PS_EVENTS + callbacks.setdefault(event_name,[]).append(cb) + + def removeManagedNode(self, node, *args): + """Add a handler for a node + + @param node(unicode): node to monitor + @param *args: callback(s) to remove + """ + assert args + try: + registred_cb = self._node_cb[node] + except KeyError: + pass + else: + for callback in args: + for event, cb_list in registred_cb.iteritems(): + try: + cb_list.remove(callback) + except ValueError: + pass + else: + log.debug(u"removed callback {cb} for event {event} on node {node}".format( + cb=callback, event=event, node=node)) + if not cb_list: + del registred_cb[event] + if not registred_cb: + del self._node_cb[node] + return + log.error(u"Trying to remove inexistant callback {cb} for node {node}".format(cb=callback, node=node)) # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): # """Retrieve the name of the nodes that are accessible on the target service. @@ -117,7 +201,7 @@ client = self.host.getClient(profile_key) return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) - def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm_request=None, profile_key=C.PROF_KEY_NONE): + def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Retrieve pubsub items from a node. @param service (JID): pubsub service. @@ -134,12 +218,27 @@ """ if rsm_request and item_ids: raise ValueError("items_id can't be used with rsm") + if extra is None: + extra = {} client = self.host.getClient(profile_key) - ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm_request} if rsm_request else None + ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm_request} if rsm_request is not None else None d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data) + + try: + subscribe = C.bool(extra['subscribe']) + except KeyError: + subscribe = False + + def doSubscribe(items): + self.subscribe(service, node, profile_key=profile_key) + return items + + if subscribe: + d.addCallback(doSubscribe) + def addMetadata(items): metadata = {} - if rsm_request: + if rsm_request is not None: rsm_data = client.pubsub_client.getRSMResponse(ext_data['id']) metadata.update({'rsm_{}'.format(key): value for key, value in rsm_data}) return (items, metadata) @@ -185,15 +284,25 @@ client = self.host.getClient(profile_key) return client.pubsub_client.createNode(service, nodeIdentifier, options) + def _deleteNode(self, service_s, nodeIdentifier, profile_key): + return self.deleteNode(jid.JID(service_s) if service_s else None, nodeIdentifier, profile_key) + def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) return client.pubsub_client.deleteNode(service, nodeIdentifier) - def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE): + def _retractItem(self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key): + return self._retractItems(service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key) + + def _retractItems(self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key): + return self.retractItems(jid.JID(service_s) if service_s else None, nodeIdentifier, itemIdentifiers, notify, profile_key) + + def retractItems(self, service, nodeIdentifier, itemIdentifiers, notify=True, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) - return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers) + return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers, notify=True) def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): + # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe client = self.host.getClient(profile_key) return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) @@ -331,14 +440,15 @@ for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) return d - def _getFromMany(self, node_data, max_item=10, rsm_dict=None, profile_key=C.PROF_KEY_NONE): + def _getFromMany(self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE): """ @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit """ max_item = None if max_item == C.NO_LIMIT else max_item - return self.getFromMany([(jid.JID(service), unicode(node)) for service, node in node_data], max_item, rsm.RSMRequest(**rsm_dict) if rsm_dict else None, profile_key) + extra = self.parseExtra(extra_dict) + return self.getFromMany([(jid.JID(service), unicode(node)) for service, node in node_data], max_item, extra.rsm_request, extra.extra, profile_key) - def getFromMany(self, node_data, max_item=None, rsm_request=None, profile_key=C.PROF_KEY_NONE): + def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Get items from many nodes at once @param node_data (iterable[tuple]): iterable of tuple (service, node) where: - service (jid.JID) is the pubsub service @@ -351,7 +461,7 @@ client = self.host.getClient(profile_key) deferreds = {} for service, node in node_data: - deferreds[(service, node)] = self.getItems(service, node, max_item, rsm_request=rsm_request, profile_key=profile_key) + deferreds[(service, node)] = self.getItems(service, node, max_item, rsm_request=rsm_request, extra=extra, profile_key=profile_key) return self.rt_sessions.newSession(deferreds, client.profile) @@ -367,17 +477,26 @@ rsm.PubSubClient.connectionInitialized(self) def itemsReceived(self, event): - if not self.host.trigger.point("PubSubItemsReceived", event, self.parent.profile): - return - for node in self.parent_plugin.managedNodes: - if event.nodeIdentifier == node[0]: - node[1](event, self.parent.profile) + log.debug(u"Pubsub items received") + for node in (event.nodeIdentifier, None): + try: + callbacks = self.parent_plugin._node_cb[node][C.PS_ITEMS] + except KeyError: + pass + else: + for callback in callbacks: + callback(event, self.parent.profile) def deleteReceived(self, event): - #TODO: manage delete event - log.debug(_(u"Publish node deleted")) - - # def purgeReceived(self, event): + log.debug((u"Publish node deleted")) + for node in (event.nodeIdentifier, None): + try: + callbacks = self.parent_plugin._node_cb[node][C.PS_DELETE] + except KeyError: + pass + else: + for callback in callbacks: + callback(event, self.parent.profile) def subscriptions(self, service, nodeIdentifier, sender=None): """Return the list of subscriptions to the given service and node. diff -r 832846fefe85 -r 4c4f88d7b156 src/plugins/plugin_xep_0163.py --- a/src/plugins/plugin_xep_0163.py Sun Aug 16 00:06:59 2015 +0200 +++ b/src/plugins/plugin_xep_0163.py Sun Aug 16 00:39:44 2015 +0200 @@ -49,43 +49,62 @@ self.pep_events = set() self.pep_out_cb = {} host.trigger.add("PubSub Disco Info", self.disoInfoTrigger) - host.bridge.addSignal("personalEvent", ".plugin", signature='ssa{ss}s') # args: from (jid), type(MOOD, TUNE, etc), data, profile - host.bridge.addMethod("sendPersonalEvent", ".plugin", in_sign='sa{ss}s', out_sign='', method=self.sendPersonalEvent, async=True) # args: type(MOOD, TUNE, etc), data, profile_key; + host.bridge.addMethod("PEPSend", ".plugin", in_sign='sa{ss}s', out_sign='', method=self.PEPSend, async=True) # args: type(MOOD, TUNE, etc), data, profile_key; self.addPEPEvent("MOOD", NS_USER_MOOD, self.userMoodCB, self.sendMood) def disoInfoTrigger(self, disco_info, profile): """Add info from managed PEP + @param disco_info: list of disco feature as returned by PubSub, will be filled with PEP features - @param profile: profile we are handling""" + @param profile: profile we are handling + """ disco_info.extend(map(disco.DiscoFeature, self.pep_events)) return True - def addPEPEvent(self, event_type, name, in_callback, out_callback=None, notify=True): + def addPEPEvent(self, event_type, node, in_callback, out_callback=None, notify=True): """Add a Personal Eventing Protocol event manager - @param event_type: type of the event (always uppercase), can be MOOD, TUNE, etc - @param name: namespace of the node (e.g. http://jabber.org/protocol/mood for User Mood) - @param in_callback: method to call when this event occur - @param out_callback: method to call when we want to publish this event (must return a deferred) - @param notify: add autosubscribe (+notify) if True""" + + @param event_type(unicode): type of the event (always uppercase), can be MOOD, TUNE, etc + @param node(unicode): namespace of the node (e.g. http://jabber.org/protocol/mood for User Mood) + @param in_callback(callable): method to call when this event occur + the callable will be called with (itemsEvent, profile) as arguments + @param out_callback(callable,None): method to call when we want to publish this event (must return a deferred) + the callable will be called when sendPEPEvent is called + @param notify(bool): add autosubscribe (+notify) if True + """ if out_callback: self.pep_out_cb[event_type] = out_callback - self.pep_events.add(name) + self.pep_events.add(node) if notify: - self.pep_events.add(name + "+notify") - self.host.plugins["XEP-0060"].addManagedNode(name, in_callback) + self.pep_events.add(node + "+notify") + def filterPEPEvent(itemsEvent, profile): + """Ignore messages which are not coming from PEP (i.e. main server) - def sendPEPEvent(self, namespace, data, profile): + @param itemsEvent(pubsub.ItemsEvent): pubsub event + @param profile(unicode): %(doc_profile)s + """ + if itemsEvent.sender.user or itemsEvent.sender.resource: + log.debug("ignoring non PEP event from {} (profile={})".format(itemsEvent.sender.full(), profile)) + return + in_callback(itemsEvent, profile) + + self.host.plugins["XEP-0060"].addManagedNode(node, items_cb=filterPEPEvent) + + def sendPEPEvent(self, node, data, profile): """Publish the event data - @param namespace: node namespace + + @param node(unicode): node namespace @param data: domish.Element to use as payload - @param profile: profile which send the data""" + @param profile: profile which send the data + """ item = pubsub.Item(payload=data) - return self.host.plugins["XEP-0060"].publish(None, namespace, [item], profile_key=profile) + return self.host.plugins["XEP-0060"].publish(None, node, [item], profile_key=profile) - def sendPersonalEvent(self, event_type, data, profile_key=C.PROF_KEY_NONE): + def PEPSend(self, event_type, data, profile_key=C.PROF_KEY_NONE): """Send personal event after checking the data is alright + @param event_type: type of event (eg: MOOD, TUNE), must be in self.pep_out_cb.keys() @param data: dict of {string:string} of event_type dependant data @param profile_key: profile who send the event @@ -112,10 +131,12 @@ if not mood: log.debug(_("No mood found")) return - self.host.bridge.personalEvent(itemsEvent.sender.full(), "MOOD", {"mood": mood.value or "", "text": mood.text or ""}, profile) + self.host.bridge.psEvent(C.PS_PEP, itemsEvent.sender.full(), itemsEvent.nodeIdentifier, + "MOOD", {"mood": mood.value or "", "text": mood.text or ""}, profile) def sendMood(self, data, profile): """Send XEP-0107's User Mood + @param data: must include mood and text @param profile: profile which send the mood""" try: diff -r 832846fefe85 -r 4c4f88d7b156 src/plugins/plugin_xep_0277.py --- a/src/plugins/plugin_xep_0277.py Sun Aug 16 00:06:59 2015 +0200 +++ b/src/plugins/plugin_xep_0277.py Sun Aug 16 00:39:44 2015 +0200 @@ -21,25 +21,28 @@ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) -from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish from twisted.internet import defer from twisted.python import failure from sat.core import exceptions -from sat.tools.xml_tools import ElementParser +from sat.tools import xml_tools from sat.tools import sat_defer +# XXX: tmp.pubsub is actually use instead of wokkel version from wokkel import pubsub -from wokkel import rsm -from feed import atom, date +from feed.date import rfc3339 import uuid -from time import time +import time import urlparse -from cgi import escape +import urllib NS_MICROBLOG = 'urn:xmpp:microblog:0' NS_ATOM = 'http://www.w3.org/2005/Atom' NS_XHTML = 'http://www.w3.org/1999/xhtml' NS_PUBSUB_EVENT = "{}{}".format(pubsub.NS_PUBSUB, "#event") +NS_COMMENT_PREFIX = '{}:comments/'.format(NS_MICROBLOG) + PLUGIN_INFO = { "name": "Microblogging over XMPP Plugin", @@ -65,52 +68,56 @@ self.host = host self._p = self.host.plugins["XEP-0060"] # this facilitate the access to pubsub plugin self.rt_sessions = sat_defer.RTDeferredSessions() - self.host.plugins["XEP-0163"].addPEPEvent("MICROBLOG", NS_MICROBLOG, self.microblogCB, self.sendMicroblog, notify=False) - host.bridge.addMethod("getLastMicroblogs", ".plugin", - in_sign='sis', out_sign='(aa{ss}a{ss})', - method=self._getLastMicroblogs, - async=True, - doc={'summary': 'retrieve items', - 'param_0': 'jid: publisher of wanted microblog', - 'param_1': 'max_items: see XEP-0060 #6.5.7', - 'param_2': '%(doc_profile)s', - 'return': 'list of microblog data (dict)'}) - host.bridge.addMethod("setMicroblogAccess", ".plugin", in_sign='ss', out_sign='', - method=self.setMicroblogAccess, + self.host.plugins["XEP-0060"].addManagedNode(None, items_cb=self._itemsReceived) + + host.bridge.addMethod("mbSend", ".plugin", + in_sign='ssa{ss}s', out_sign='', + method=self._mbSend, + async=True) + host.bridge.addMethod("mbRetract", ".plugin", + in_sign='ssss', out_sign='', + method=self._mbRetract, + async=True) + host.bridge.addMethod("mbGetLast", ".plugin", + in_sign='ssia{ss}s', out_sign='(aa{ss}a{ss})', + method=self._mbGetLast, async=True) - host.bridge.addMethod("mBSubscribeToMany", ".plugin", in_sign='sass', out_sign='s', - method=self._mBSubscribeToMany) - host.bridge.addMethod("mBGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssaa{ss}a{ss}))', - method=self._mBGetFromManyRTResult, async=True) - host.bridge.addMethod("mBGetFromMany", ".plugin", in_sign='sasia{ss}s', out_sign='s', - method=self._mBGetFromMany) - host.bridge.addMethod("mBGetFromManyWithCommentsRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssa(a{ss}a(sssaa{ss}a{ss}))a{ss}))', - method=self._mBGetFromManyWithCommentsRTResult, async=True) - host.bridge.addMethod("mBGetFromManyWithComments", ".plugin", in_sign='sasiia{ss}a{ss}s', out_sign='s', method=self._mBGetFromManyWithComments) + host.bridge.addMethod("mbSetAccess", ".plugin", in_sign='ss', out_sign='', + method=self.mbSetAccess, + async=True) + host.bridge.addMethod("mbSetAccess", ".plugin", in_sign='ss', out_sign='', + method=self.mbSetAccess, + async=True) + host.bridge.addMethod("mbSubscribeToMany", ".plugin", in_sign='sass', out_sign='s', + method=self._mbSubscribeToMany) + host.bridge.addMethod("mbGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssaa{ss}a{ss}))', + method=self._mbGetFromManyRTResult, async=True) + host.bridge.addMethod("mbGetFromMany", ".plugin", in_sign='sasia{ss}s', out_sign='s', + method=self._mbGetFromMany) + host.bridge.addMethod("mbGetFromManyWithCommentsRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssa(a{ss}a(sssaa{ss}a{ss}))a{ss}))', + method=self._mbGetFromManyWithCommentsRTResult, async=True) + host.bridge.addMethod("mbGetFromManyWithComments", ".plugin", in_sign='sasiia{ss}a{ss}s', out_sign='s', method=self._mbGetFromManyWithComments) ## plugin management methods ## - def microblogCB(self, itemsEvent, profile): - """Callback to "MICROBLOG" PEP event.""" - def manageItem(microblog_data): - self.host.bridge.personalEvent(itemsEvent.sender.full(), "MICROBLOG", microblog_data, profile) + def _itemsReceived(self, itemsEvent, profile): + """Callback which manage items notifications (publish + retract)""" + if not itemsEvent.nodeIdentifier.startswith(NS_MICROBLOG): + return + def manageItem(data, event): + self.host.bridge.psEvent(C.PS_MICROBLOG, itemsEvent.sender.full(), itemsEvent.nodeIdentifier, event, data, profile) for item in itemsEvent.items: - self.item2mbdata(item).addCallbacks(manageItem, lambda failure: None) + if item.name == C.PS_ITEM: + self.item2mbdata(item).addCallbacks(manageItem, lambda failure: None, (C.PS_PUBLISH,)) + elif item.name == C.PS_RETRACT: + manageItem({'id': item['id']}, C.PS_RETRACT) + else: + raise exceptions.InternalError("Invalid event value") + ## data/item transformation ## - def _removeXHTMLMarkups(self, xhtml): - """Remove XHTML markups from the given string. - - @param xhtml: the XHTML string to be cleaned - @return: a Deferred instance for the cleaned string - """ - return self.host.plugins["TEXT-SYNTAXES"].convert(xhtml, - self.host.plugins["TEXT-SYNTAXES"].SYNTAX_XHTML, - self.host.plugins["TEXT-SYNTAXES"].SYNTAX_TEXT, - False) - @defer.inlineCallbacks def item2mbdata(self, item_elt): """Convert an XML Item to microblog data used in bridge API @@ -151,7 +158,7 @@ if data_elt.uri != NS_XHTML: raise failure.Failure(exceptions.DataError(_('Content of type XHTML must declare its namespace!'))) key = check_conflict(u'{}_xhtml'.format(elem.name)) - data = unicode(data_elt) + data = data_elt.toXml() microblog_data[key] = yield self.host.plugins["TEXT-SYNTAXES"].clean_xhtml(data) else: key = check_conflict(elem.name) @@ -196,7 +203,7 @@ # we check that text content is present for key in ('title', 'content'): if key not in microblog_data and ('{}_xhtml'.format(key)) in microblog_data: - log.warning(u"item {id_} provide a {key}_xhtml data but not a text one".format(id_, key)) + log.warning(u"item {id_} provide a {key}_xhtml data but not a text one".format(id_=id_, key=key)) # ... and do the conversion if it's not microblog_data[key] = yield self.host.plugins["TEXT-SYNTAXES"].\ convert(microblog_data['{}_xhtml'.format(key)], @@ -218,13 +225,13 @@ except StopIteration: msg = u'No atom updated element found in the pubsub item {}'.format(id_) raise failure.Failure(exceptions.DataError(msg)) - microblog_data['updated'] = unicode(date.rfc3339.tf_from_timestamp(unicode(updated_elt))) + microblog_data['updated'] = unicode(rfc3339.tf_from_timestamp(unicode(updated_elt))) try: published_elt = entry_elt.elements(NS_ATOM, 'published').next() except StopIteration: microblog_data['published'] = microblog_data['updated'] else: - microblog_data['published'] = unicode(date.rfc3339.tf_from_timestamp(unicode(published_elt))) + microblog_data['published'] = unicode(rfc3339.tf_from_timestamp(unicode(published_elt))) # links for link_elt in entry_elt.elements(NS_ATOM, 'link'): @@ -246,6 +253,7 @@ log.warning(u"Unmanaged link element: rel={rel} title={title} href={href}".format(rel=rel, title=title, href=href)) # author + publisher = item_elt.getAttribute("publisher") try: author_elt = entry_elt.elements(NS_ATOM, 'author').next() except StopIteration: @@ -263,16 +271,24 @@ uri_elt = author_elt.elements(NS_ATOM, 'uri').next() except StopIteration: log.debug("No uri element found in author element of item {}".format(id_)) + if publisher: + microblog_data['author_jid'] = publisher else: uri = unicode(uri_elt) if uri.startswith("xmpp:"): uri = uri[5:] - microblog_data['author_uri'] = uri - if item_elt.getAttribute("publisher") == uri: - microblog_data['author_uri_verified'] = C.BOOL_TRUE + microblog_data['author_jid'] = uri + else: + microblog_data['author_jid'] = item_elt.getAttribute("publisher") or "" + + if not publisher: + log.debug("No publisher attribute, we can't verify author jid") + microblog_data['author_jid_verified'] = C.BOOL_FALSE + elif publisher == uri: + microblog_data['author_jid_verified'] = C.BOOL_TRUE else: log.warning("item atom:uri differ from publisher attribute, spoofing attempt ? atom:uri = {} publisher = {}".format(uri, item_elt.getAttribute("publisher"))) - microblog_data['author_uri_verified'] = C.BOOL_FALSE + microblog_data['author_jid_verified'] = C.BOOL_FALSE # email try: email_elt = author_elt.elements(NS_ATOM, 'email').next() @@ -284,105 +300,216 @@ defer.returnValue(microblog_data) @defer.inlineCallbacks - def data2entry(self, data, profile): + def data2entry(self, data, item_id=None, profile_key=C.PROF_KEY_NONE): """Convert a data dict to en entry usable to create an item @param data: data dict as given by bridge method. + @param item_id(unicode, None): id of the item to use + if None the id will be generated randomly @return: deferred which fire domish.Element """ - #TODO: rewrite this directly with twisted (i.e. without atom / reparsing) - _uuid = unicode(uuid.uuid1()) - _entry = atom.Entry() - _entry.title = '' # reset the default value which is not empty + if item_id is None: + item_id = unicode(uuid.uuid4()) + client = self.host.getClient(profile_key) + entry_elt = domish.Element((NS_ATOM, 'entry')) - elems = {'title': atom.Title, 'content': atom.Content} + ## content and title ## synt = self.host.plugins["TEXT-SYNTAXES"] - # loop on ('title', 'title_rich', 'title_xhtml', 'content', 'content_rich', 'content_xhtml') - for key in elems.keys(): - for type_ in ['', 'rich', 'xhtml']: - attr = "%s_%s" % (key, type_) if type_ else key + for elem_name in ('title', 'content'): + for type_ in ['', '_rich', '_xhtml']: + attr = "{}{}".format(elem_name, type_) if attr in data: + elem = entry_elt.addElement(elem_name) if type_: - if type_ == 'rich': # convert input from current syntax to XHTML - converted = yield synt.convert(data[attr], synt.getCurrentSyntax(profile), "XHTML") + if type_ == '_rich': # convert input from current syntax to XHTML + converted = yield synt.convert(data[attr], synt.getCurrentSyntax(profile_key), "XHTML") + if '{}_xhtml'.format(elem_name) in data: + raise failure.Failure(exceptions.DataError(_("Can't have xhtml and rich content at the same time"))) else: # clean the XHTML input converted = yield synt.clean_xhtml(data[attr]) - elem = elems[key]((u'<div xmlns="%s">%s</div>' % (NS_XHTML, converted)).encode('utf-8')) - elem.attrs['type'] = 'xhtml' - if hasattr(_entry, '%s_xhtml' % key): - raise failure.Failure(exceptions.DataError(_("Can't have xhtml and rich content at the same time"))) - setattr(_entry, '%s_xhtml' % key, elem) + + xml_content = u'<div xmlns="{ns}">{converted}</div>'.format( + ns=NS_XHTML, + converted=converted) + elem.addChild(xml_tools.ElementParser()(xml_content)) + elem['type'] = 'xhtml' + if elem_name not in data: + # there is raw text content, which is mandatory + # so we create one from xhtml content + elem_txt = entry_elt.addElement(elem_name) + text_content = yield self.host.plugins["TEXT-SYNTAXES"].convert(xml_content, + self.host.plugins["TEXT-SYNTAXES"].SYNTAX_XHTML, + self.host.plugins["TEXT-SYNTAXES"].SYNTAX_TEXT, + False) + elem_txt.addContent(text_content) + elem_txt['type'] = 'text' + else: # raw text only needs to be escaped to get HTML-safe sequence - elem = elems[key](escape(data[attr]).encode('utf-8')) - elem.attrs['type'] = 'text' - setattr(_entry, key, elem) - if not getattr(_entry, key).text: - if hasattr(_entry, '%s_xhtml' % key): - text = yield self._removeXHTMLMarkups(getattr(_entry, '%s_xhtml' % key).text) - setattr(_entry, key, text) - if not _entry.title.text: # eventually move the data from content to title - _entry.title = _entry.content.text - _entry.title.attrs['type'] = _entry.content.attrs['type'] - _entry.content.text = '' - _entry.content.attrs['type'] = '' - if hasattr(_entry, 'content_xhtml'): - _entry.title_xhtml = atom.Title(_entry.content_xhtml.text) - _entry.title_xhtml.attrs['type'] = _entry.content_xhtml.attrs['type'] - _entry.content_xhtml.text = '' - _entry.content_xhtml.attrs['type'] = '' + elem.addContent(data[attr]) + elem['type'] = 'text' + + try: + entry_elt.elements(NS_ATOM, 'title').next() + except StopIteration: + # we have no title element which is mandatory + # so we transform content element to title + elems = list(entry_elt.elements(NS_ATOM, 'content')) + if not elems: + raise exceptions.DataError("There must be at least one content or title element") + for elem in elems: + elem.name = 'title' + + ## author ## + author_elt = entry_elt.addElement('author') + try: + author_name = data['author'] + except KeyError: + # FIXME: must use better name + author_name = client.jid.user + author_elt.addElement('name', content=author_name) - _entry.author = atom.Author() - _entry.author.name = data.get('author', self.host.getJidNStream(profile)[0].userhost()).encode('utf-8') - _entry.updated = float(data.get('updated', time())) - _entry.published = float(data.get('published', time())) - entry_id = data.get('id', unicode(_uuid)) - _entry.id = entry_id.encode('utf-8') + try: + author_jid_s = data['author_jid'] + except KeyError: + author_jid_s = client.jid.userhost() + author_elt.addElement('uri', content="xmpp:{}".format(author_jid_s)) + + ## published/updated time ## + current_time = time.time() + entry_elt.addElement('updated', + content=rfc3339.timestamp_from_tf(float(data.get('updated', current_time)))) + entry_elt.addElement('published', + content=rfc3339.timestamp_from_tf(float(data.get('published', current_time)))) + + ## id ## + entry_id = data.get('id', item_id) # FIXME: use a proper id (see XEP-0277 ยง7.1) + entry_elt.addElement('id', content=entry_id) # + + ## comments ## if 'comments' in data: - link = atom.Link() - link.attrs['href'] = data['comments'] - link.attrs['rel'] = 'replies' - link.attrs['title'] = 'comments' - _entry.links.append(link) - _entry_elt = ElementParser()(str(_entry).decode('utf-8')) - item = pubsub.Item(id=entry_id, payload=_entry_elt) - defer.returnValue(item) + link_elt = entry_elt.addElement('link') + link_elt['href'] = data['comments'] + link_elt['rel'] = 'replies' + link_elt['title'] = 'comments' + + ## final item building ## + item_elt = pubsub.Item(id=item_id, payload=entry_elt) + defer.returnValue(item_elt) ## publish ## @defer.inlineCallbacks - def sendMicroblog(self, data, profile): + def _manageComments(self, access, mb_data, service, node, item_id, profile): + """Check comments keys in mb_data and create comments node if necessary + + @param access(unicode): access model + @param mb_data(dict): microblog mb_data + @param service(jid.JID): Pubsub service of the parent item + @param node(unicode): node of the parent item + @param item_id(unicoe): id of the parent item + """ + allow_comments = C.bool(mb_data.pop("allow_comments", "false")) + if not allow_comments: + return + + client = self.host.getClient(profile) + + options = {self._p.OPT_ACCESS_MODEL: access, + self._p.OPT_PERSIST_ITEMS: 1, + self._p.OPT_MAX_ITEMS: -1, + self._p.OPT_DELIVER_PAYLOADS: 1, + self._p.OPT_SEND_ITEM_SUBSCRIBE: 1, + self._p.OPT_PUBLISH_MODEL: "subscribers", # TODO: should be open if *both* node and item access_model are open (public node and item) + } + + comments_node_base = u"{}{}".format(NS_COMMENT_PREFIX, item_id) + comments_node = comments_node_base + + suffix = None + comments_service = client.pubsub_service if client.pubsub_service is not None else service + max_tries = 3 + + for i in xrange(max_tries+1): + try: + yield self._p.createNode(comments_service, comments_node, options, profile_key=profile) + break + except error.StanzaError as e: + if e.condition == 'conflict' and i<max_tries: + log.warning(u"node {} already exists on service {}".format(comments_node, comments_service)) + suffix = 0 if suffix is None else suffix + 1 + comments_node = u"{}_{}".format(comments_node_base, suffix) + else: + raise e + + if comments_service is None: + comments_service = client.jid.userhostJID() + + mb_data['comments'] = "xmpp:%(service)s?%(query)s" % { + 'service': comments_service.userhost(), + 'query': urllib.urlencode([('node', comments_node.encode('utf-8'))]) + } + + def _mbSend(self, service, node, data, profile_key): + service = jid.JID(service) if service else None + node = node if node else NS_MICROBLOG + profile = self.host.memory.getProfileName(profile_key) + return self.send(service, node, data, profile) + + @defer.inlineCallbacks + def send(self, service=None, node=NS_MICROBLOG, data=None, profile=None): """Send XEP-0277's microblog data - @param data: must include content - @param profile: profile which send the mood""" - if 'content' not in data: - log.error("Microblog data must contain at least 'content' key") - raise failure.Failure(exceptions.DataError('no "content" key found')) - content = data['content'] - if not content: - log.error("Microblog data's content value must not be empty") - raise failure.Failure(exceptions.DataError('empty content')) - item = yield self.data2entry(data, profile) - ret = yield self._p.publish(None, NS_MICROBLOG, [item], profile_key=profile) + @param service(jid.JID, None): PubSub service where the microblog must be published + None to publish on profile's PEP + @param node(unicode): PubSub node to use (defaut to microblog NS) + @param data(dict): microblog data (must include at least a "content" or a "title" key). + see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details + @param profile: %(doc_profile)s + """ + assert profile is not None + + item_id = data.get('id') or unicode(uuid.uuid4()) + try: + yield self._manageComments(self._p.ACCESS_OPEN, data, service, node, item_id, profile) + except error.StanzaError: + log.warning("Can't create comments node for item {}".format(item_id)) + item = yield self.data2entry(data, item_id, profile) + ret = yield self._p.publish(service, node, [item], profile_key=profile) defer.returnValue(ret) + + ## retract ## + + def _mbRetract(self, service_jid_s, nodeIdentifier, itemIdentifier, profile_key): + """Call self._p._retractItem, but use default node if node is empty""" + return self._p._retractItem(service_jid_s, nodeIdentifier or NS_MICROBLOG, itemIdentifier, True, profile_key) + ## get ## - def _getLastMicroblogs(self, pub_jid_s, max_items=10, profile_key=C.PROF_KEY_NONE): - return self.getLastMicroblogs(jid.JID(pub_jid_s), max_items, profile_key) + def _mbGetLast(self, service_jid_s, node="", max_items=10, extra_dict=None, profile_key=C.PROF_KEY_NONE): + """ + @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit + """ + max_items = None if max_items == C.NO_LIMIT else max_items + extra = self._p.parseExtra(extra_dict) + return self.mbGetLast(jid.JID(service_jid_s), node or None, max_items, extra.rsm_request, extra.extra, profile_key) @defer.inlineCallbacks - def getLastMicroblogs(self, pub_jid, max_items=10, profile_key=C.PROF_KEY_NONE): + def mbGetLast(self, service_jid, node=None, max_items=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Get the last published microblogs - @param pub_jid(jid.JID): jid of the publisher - @param max_items: how many microblogs we want to get + @param service_jid(jid.JID): jid of the publisher + @param node(unicode, None): node to get (or microblog node if None) + @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit + @param rsm_request (rsm.RSMRequest): RSM request data @param profile_key: profile key @return: a deferred couple with the list of items and metadatas. """ - items_data = yield self._p.getItems(pub_jid, NS_MICROBLOG, max_items=max_items, profile_key=profile_key) + if node is None: + node = NS_MICROBLOG + items_data = yield self._p.getItems(service_jid, node, max_items=max_items, rsm_request=rsm_request, extra=extra, profile_key=profile_key) serialised = yield self._p.serItemsDataD(items_data, self.item2mbdata) defer.returnValue(serialised) @@ -410,7 +537,7 @@ ## configure ## - def setMicroblogAccess(self, access="presence", profile_key=C.PROF_KEY_NONE): + def mbSetAccess(self, access="presence", profile_key=C.PROF_KEY_NONE): """Create a microblog node on PEP with given access If the node already exists, it change options @@ -491,17 +618,19 @@ publishers[:] = [jid.JID(publisher) for publisher in publishers] return publishers_type, publishers + + # subscribe # - def _mBSubscribeToMany(self, publishers_type, publishers, profile_key): + def _mbSubscribeToMany(self, publishers_type, publishers, profile_key): """ @return (str): session id: Use pubsub.getSubscribeRTResult to get the results """ publishers_type, publishers = self._checkPublishers(publishers_type, publishers) - return self.mBSubscribeToMany(publishers_type, publishers, profile_key) + return self.mbSubscribeToMany(publishers_type, publishers, profile_key) - def mBSubscribeToMany(self, publishers_type, publishers, profile_key): + def mbSubscribeToMany(self, publishers_type, publishers, profile_key): """Subscribe microblogs for a list of groups or jids @param publishers_type: type of the list of publishers, one of: @@ -517,8 +646,8 @@ # get # - def _mBGetFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): - """Get real-time results for mBGetFromMany session + def _mbGetFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): + """Get real-time results for mbGetFromMany session @param session_id: id of the real-time deferred session @param return (tuple): (remaining, results) where: @@ -527,7 +656,8 @@ - service (unicode): pubsub service - node (unicode): pubsub node - failure (unicode): empty string in case of success, error message else - - items_data(tuple): data tuple as returned by [getLastMicroblogs] + - items_data(list): data as returned by [mbGetLast] + - items_metadata(dict): metadata as returned by [mbGetLast] @param profile_key: %(doc_profile_key)s """ def onSuccess(items_data): @@ -546,35 +676,34 @@ for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) return d - def _mBGetFromMany(self, publishers_type, publishers, max_item=10, rsm_dict=None, profile_key=C.PROF_KEY_NONE): + def _mbGetFromMany(self, publishers_type, publishers, max_items=10, extra_dict=None, profile_key=C.PROF_KEY_NONE): """ - @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit + @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit """ - max_item = None if max_item == C.NO_LIMIT else max_item + max_items = None if max_items == C.NO_LIMIT else max_items publishers_type, publishers = self._checkPublishers(publishers_type, publishers) - return self.mBGetFromMany(publishers_type, publishers, max_item, rsm.RSMRequest(**rsm_dict) if rsm_dict else None, profile_key) + extra = self._p.parseExtra(extra_dict) + return self.mbGetFromMany(publishers_type, publishers, max_items, extra.rsm_request, extra.extra, profile_key) - def mBGetFromMany(self, publishers_type, publishers, max_item=None, rsm_data=None, profile_key=C.PROF_KEY_NONE): + def mbGetFromMany(self, publishers_type, publishers, max_items=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Get the published microblogs for a list of groups or jids @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) @param max_items (int): optional limit on the number of retrieved items. - @param rsm_data (rsm.RSMRequest): RSM request data, common to all publishers + @param rsm_request (rsm.RSMRequest): RSM request data, common to all publishers + @param extra (dict): Extra data @param profile_key: profile key - @return: a deferred dict with: - - key: publisher (unicode) - - value: couple (list[dict], dict) with: - - the microblogs data - - RSM response data + @return (str): RT Deferred session id """ + # XXX: extra is unused here so far client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key) - return self._p.getFromMany(node_data, max_item, rsm_data, profile_key=profile_key) + return self._p.getFromMany(node_data, max_items, rsm_request, profile_key=profile_key) # comments # - def _mBGetFromManyWithCommentsRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): - """Get real-time results for [mBGetFromManyWithComments] session + def _mbGetFromManyWithCommentsRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): + """Get real-time results for [mbGetFromManyWithComments] session @param session_id: id of the real-time deferred session @param return (tuple): (remaining, results) where: @@ -582,7 +711,6 @@ - results is a list of tuple with - service (unicode): pubsub service - node (unicode): pubsub node - - success (bool): True if the getItems was successful - failure (unicode): empty string in case of success, error message else - items(list): list of items with: - item(dict): item microblog data @@ -602,20 +730,24 @@ for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) return d - def _mBGetFromManyWithComments(self, publishers_type, publishers, max_item=10, max_comments=C.NO_LIMIT, rsm_dict=None, rsm_comments_dict=None, profile_key=C.PROF_KEY_NONE): + def _mbGetFromManyWithComments(self, publishers_type, publishers, max_items=10, max_comments=C.NO_LIMIT, extra_dict=None, extra_comments_dict=None, profile_key=C.PROF_KEY_NONE): """ - @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit + @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @param max_comments(int): maximum number of comments to get, C.NO_LIMIT for no limit """ - max_item = None if max_item == C.NO_LIMIT else max_item + max_items = None if max_items == C.NO_LIMIT else max_items max_comments = None if max_comments == C.NO_LIMIT else max_comments publishers_type, publishers = self._checkPublishers(publishers_type, publishers) - return self.mBGetFromManyWithComments(publishers_type, publishers, max_item, max_comments, - rsm.RSMRequest(**rsm_dict) if rsm_dict else None, - rsm.RSMRequest(**rsm_comments_dict) if rsm_comments_dict else None, + extra = self._p.parseExtra(extra_dict) + extra_comments = self._p.parseExtra(extra_comments_dict) + return self.mbGetFromManyWithComments(publishers_type, publishers, max_items, max_comments, + extra.rsm_request, + extra.extra, + extra_comments.rsm_request, + extra_comments.extra, profile_key) - def mBGetFromManyWithComments(self, publishers_type, publishers, max_item=None, max_comments=None, rsm_request=None, rsm_comments=None, profile_key=C.PROF_KEY_NONE): + def mbGetFromManyWithComments(self, publishers_type, publishers, max_items=None, max_comments=None, rsm_request=None, extra=None, rsm_comments=None, extra_comments=None, profile_key=C.PROF_KEY_NONE): """Helper method to get the microblogs and their comments in one shot @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") @@ -623,13 +755,11 @@ @param max_items (int): optional limit on the number of retrieved items. @param max_comments (int): maximum number of comments to retrieve @param rsm_request (rsm.RSMRequest): RSM request for initial items only + @param extra (dict): extra configuration for initial items only @param rsm_comments (rsm.RSMRequest): RSM request for comments only + @param extra_comments (dict): extra configuration for comments only @param profile_key: profile key - @return: a deferred dict with: - - key: publisher (unicode) - - value: couple (list[dict], dict) with: - - the microblogs data - - RSM response data + @return (str): RT Deferred session id """ # XXX: this method seems complicated because it do a couple of treatments # to serialise and associate the data, but it make life in frontends side @@ -653,30 +783,30 @@ service_s = value node = item["{}{}".format(prefix, "_node")] # time to get the comments - d = self._p.getItems(jid.JID(service_s), node, max_comments, rsm_request=rsm_comments, profile_key=profile_key) + d = self._p.getItems(jid.JID(service_s), node, max_comments, rsm_request=rsm_comments, extra=extra_comments, profile_key=profile_key) # then serialise d.addCallback(lambda items_data: self._p.serItemsDataD(items_data, self.item2mbdata)) # with failure handling d.addCallback(lambda serialised_items_data: ('',) + serialised_items_data) d.addErrback(lambda failure: (unicode(failure.value), [], {})) # and associate with service/node (needed if there are several comments nodes) - d.addCallback(lambda serialised: (service_s, node) + serialised) + d.addCallback(lambda serialised, service_s=service_s, node=node: (service_s, node) + serialised) dlist.append(d) # we get the comments comments_d = defer.gatherResults(dlist) # and add them to the item data - comments_d.addCallback(lambda comments_data: (item, comments_data)) + comments_d.addCallback(lambda comments_data, item=item: (item, comments_data)) items_dlist.append(comments_d) # we gather the items + comments in a list items_d = defer.gatherResults(items_dlist) # and add the metadata - items_d.addCallback(lambda items: (items, metadata)) + items_d.addCallback(lambda items_completed: (items_completed, metadata)) return items_d client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key) deferreds = {} for service, node in node_data: - d = deferreds[(service, node)] = self._p.getItems(service, node, max_item, rsm_request=rsm_request, profile_key=profile_key) + d = deferreds[(service, node)] = self._p.getItems(service, node, max_items, rsm_request=rsm_request, extra=extra, profile_key=profile_key) d.addCallback(lambda items_data: self._p.serItemsDataD(items_data, self.item2mbdata)) d.addCallback(getComments) d.addCallback(lambda items_comments_data: ('', items_comments_data))