# HG changeset patch # User Goffi # Date 1426672348 -3600 # Node ID f71a0fc2688655df7229d3b815c469df9c1f3768 # Parent 1e3b1f9ad6e2ae99ef3167f29b7d529d2568e736# Parent 584d45bb36d9b0ab608dad26dfdf9afa42008bd5 merged branch frontends_multi_profiles diff -r 584d45bb36d9 -r f71a0fc26886 CONTRAT_SOCIAL --- a/CONTRAT_SOCIAL Wed Mar 18 10:39:22 2015 +0100 +++ b/CONTRAT_SOCIAL Wed Mar 18 10:52:28 2015 +0100 @@ -24,5 +24,5 @@ - L'idée de Fraternité est essentielle, aussi: - nous ferons notre possible pour aider les utilisateurs, quel que soit leur niveau - - de même, des efforts seront fait quant à l'accessibilité aux personnes victimes d'un handicap + - de même, des efforts seront fait quant à l'accessibilité pour tous - « Salut à Toi », XMPP, et les technologies utilisées facilitent les échanges électroniques, mais nous désirons mettre l'accent sur les rencontres réelles et humaines: nous favoriserons toujours le réel sur le virtuel. diff -r 584d45bb36d9 -r f71a0fc26886 CONTRAT_SOCIAL_en --- a/CONTRAT_SOCIAL_en Wed Mar 18 10:39:22 2015 +0100 +++ b/CONTRAT_SOCIAL_en Wed Mar 18 10:52:28 2015 +0100 @@ -24,6 +24,6 @@ - The idea of Fraternity is essential. This is why: - we will help the users, whatever their computer literacy is, to the extent of what we can - - we will as well commit ourselves to help the accessibility to "Salut à Toi" for disabled people + - we will as well commit ourselves to help the accessibility to "Salut à Toi" for all - "Salut à Toi" , XMPP, and the technologies used help facilitate the electronic exchanges, but we strive to focus on real and human exchanges : we will always favor Real on Virtual. diff -r 584d45bb36d9 -r f71a0fc26886 setup.py --- a/setup.py Wed Mar 18 10:39:22 2015 +0100 +++ b/setup.py Wed Mar 18 10:52:28 2015 +0100 @@ -298,7 +298,8 @@ package_dir={'sat': 'src', 'sat_frontends': 'frontends/src', 'twisted.plugins': 'src/twisted/plugins'}, packages=['sat', 'sat.tools', 'sat.bridge', 'sat.plugins', 'sat.test', 'sat.core', 'sat.memory', 'sat_frontends', 'sat_frontends.bridge', 'sat_frontends.quick_frontend', 'sat_frontends.jp', - 'sat_frontends.primitivus', 'sat_frontends.tools', 'sat.stdui', 'twisted.plugins'], + 'sat_frontends.primitivus', 'sat_frontends.tools', 'sat.stdui','sat.tmp', 'sat.tmp.wokkel', + 'twisted.plugins'], package_data={'sat': ['sat.sh'], }, data_files=[(os.path.join(sys.prefix, 'share/locale/fr/LC_MESSAGES'), ['i18n/fr/LC_MESSAGES/sat.mo']), ('share/doc/%s' % NAME, ['CHANGELOG', 'COPYING', 'INSTALL', 'README', 'README4TRANSLATORS']), diff -r 584d45bb36d9 -r f71a0fc26886 src/core/xmpp.py --- a/src/core/xmpp.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/core/xmpp.py Wed Mar 18 10:52:28 2015 +0100 @@ -114,6 +114,8 @@ self.host = host def onMessage(self, message): + if not message.hasAttribute('from'): + message['from'] = self.parent.jid.host log.debug(_(u"got message from: %s") % message["from"]) post_treat = defer.Deferred() # XXX: plugin can add their treatments to this deferred diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/__init__.py --- a/src/plugins/__init__.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/plugins/__init__.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,7 @@ +# TODO: remove this when RSM and MAM are in wokkel +import wokkel +from sat.tmp.wokkel import delay as tmp_delay, pubsub as tmp_pubsub, rsm as tmp_rsm, mam as tmp_mam +wokkel.delay = tmp_delay +wokkel.pubsub = tmp_pubsub +wokkel.rsm = tmp_rsm +wokkel.mam = tmp_mam diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/plugin_misc_groupblog.py --- a/src/plugins/plugin_misc_groupblog.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/plugins/plugin_misc_groupblog.py Wed Mar 18 10:52:28 2015 +0100 @@ -26,6 +26,7 @@ from twisted.words.xish.domish import Element, generateElementsNamed from sat.core import exceptions from wokkel import disco, data_form, iwokkel +from wokkel import rsm as wokkel_rsm from zope.interface import implements from feed import date import uuid @@ -52,12 +53,17 @@ 'JID': None, #JID is not yet managed } +MAX_ITEMS = 5 +MAX_COMMENTS = 5 +DO_NOT_COUNT_COMMENTS = -1 # must be lower than 0 + PLUGIN_INFO = { "name": "Group blogging throught collections", "import_name": "GROUPBLOG", "type": "MISC", "protocols": [], "dependencies": ["XEP-0277"], + "recommendations": ["XEP-0059"], "main": "GroupBlog", "handler": "yes", "description": _("""Implementation of microblogging with roster access""") @@ -79,7 +85,6 @@ class UnknownType(Exception): pass - class GroupBlog(object): """This class use a SàT PubSub Service to manage access on microblog""" @@ -104,32 +109,27 @@ async=True) host.bridge.addMethod("getGroupBlogs", ".plugin", - in_sign='sass', out_sign='aa{ss}', + in_sign='sasa{ss}bs', out_sign='(aa{ss}a{ss})', method=self.getGroupBlogs, async=True) host.bridge.addMethod("getGroupBlogsWithComments", ".plugin", - in_sign='sass', out_sign='a(a{ss}aa{ss})', + in_sign='sasa{ss}is', out_sign='(a(a{ss}(aa{ss}a{ss}))a{ss})', method=self.getGroupBlogsWithComments, async=True) - host.bridge.addMethod("getLastGroupBlogs", ".plugin", - in_sign='sis', out_sign='aa{ss}', - method=self.getLastGroupBlogs, + host.bridge.addMethod("getGroupBlogsAtom", ".plugin", + in_sign='sa{ss}s', out_sign='s', + method=self.getGroupBlogsAtom, async=True) - host.bridge.addMethod("getLastGroupBlogsAtom", ".plugin", - in_sign='sis', out_sign='s', - method=self.getLastGroupBlogsAtom, - async=True) - - host.bridge.addMethod("getMassiveLastGroupBlogs", ".plugin", - in_sign='sasis', out_sign='a{saa{ss}}', - method=self._getMassiveLastGroupBlogs, + host.bridge.addMethod("getMassiveGroupBlogs", ".plugin", + in_sign='sasa{ss}s', out_sign='a{s(aa{ss}a{ss})}', + method=self._getMassiveGroupBlogs, async=True) host.bridge.addMethod("getGroupBlogComments", ".plugin", - in_sign='sss', out_sign='aa{ss}', + in_sign='ssa{ss}s', out_sign='(aa{ss}a{ss})', method=self.getGroupBlogComments, async=True) @@ -515,78 +515,108 @@ d_list.append(self.item2gbdata(item).addCallback(cb)) return defer.DeferredList(d_list, consumeErrors=True).addCallback(lambda result: [value for (success, value) in result if success]) - def __getGroupBlogs(self, pub_jid_s, max_items=10, item_ids=None, profile_key=C.PROF_KEY_NONE): + def _getOrCountComments(self, items, max=0, profile_key=C.PROF_KEY_NONE): + """Get and/or count the comments of the given items. + + @param items (list): items to consider. + @param max (int): maximum number of comments to get, if 0 only count + them. The count is set to the item data of key "comments_count". + @param profile_key (str): %(doc_profile_key)s + @return: a deferred list of: + - if max == 0: microblog data + - else: couple (dict, (list[dict], dict)) containing: + - microblog data (main item) + - couple (comments data, RSM response data for the comments) + """ + def comments_cb(comments_data, entry): + entry['comments_count'] = comments_data[1]['count'] + return (entry, comments_data) if max > 0 else entry + + assert(max >= 0) + d_list = [] + for entry in items: + if entry.get('comments', False): + comments_rsm = {'max': max} + d = self.getGroupBlogComments(entry['comments_service'], entry['comments_node'], rsm=comments_rsm, profile_key=profile_key) + d.addCallback(comments_cb, entry) + d_list.append(d) + else: + if max > 0: + d_list.append(defer.succeed((entry, ([], {})))) + else: + d_list.append(defer.succeed(entry)) + deferred_list = defer.DeferredList(d_list) + deferred_list.addCallback(lambda result: [value for (success, value) in result if success]) + return deferred_list + + def __getGroupBlogs(self, pub_jid_s, item_ids=None, rsm=None, max_comments=0, profile_key=C.PROF_KEY_NONE): """Retrieve previously published items from a publish subscribe node. + @param pub_jid_s: jid of the publisher - @param max_items: how many microblogs we want to get (see XEP-0060 #6.5.7) @param item_ids: list of microblogs items IDs - @param profile_key: profile key - @return: list of microblog data (dict) + @param rsm (dict): RSM request data + @param max_comments (int): maximum number of comments to retrieve + @param profile_key (str): %(doc_profile_key)s + @return: a deferred couple (list, dict) containing: + - list of: + - if max_comments == 0: microblog data + - else: couple (dict, (list[dict], dict)) containing: + - microblog data (main item) + - couple (comments data, RSM response data for the comments) + - RSM response data """ pub_jid = jid.JID(pub_jid_s) - def initialised(result): - profile, client = result - d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(pub_jid), - max_items=max_items, item_ids=item_ids, profile_key=profile_key) - d.addCallback(self._itemsConstruction, pub_jid, client) - d.addErrback(lambda ignore: {}) # TODO: more complete error management (log !) - return d + def cb(items, client): + d = self._itemsConstruction(items, pub_jid, client) + if max_comments == DO_NOT_COUNT_COMMENTS: + return d + return d.addCallback(self._getOrCountComments, max_comments, profile_key) - #TODO: we need to use the server corresponding the the host of the jid - return self._initialise(profile_key).addCallback(initialised) + return DeferredItems(self, cb, None, profile_key).get(self.getNodeName(pub_jid), item_ids, rsm=rsm) - def getGroupBlogs(self, pub_jid_s, item_ids=None, profile_key=C.PROF_KEY_NONE): + def getGroupBlogs(self, pub_jid_s, item_ids=None, rsm=None, count_comments=True, profile_key=C.PROF_KEY_NONE): """Get the published microblogs of the specified IDs. If item_ids is - None, the result would be the same than calling getLastGroupBlogs + None, the result would be the same than calling getGroupBlogs with the default value for the attribute max_items. - @param pub_jid_s: jid of the publisher - @param item_ids: list of microblogs items IDs - @param profile_key: profile key - @return: list of microblog data (dict) - """ - return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, profile_key=profile_key) - def getGroupBlogsWithComments(self, pub_jid_s, item_ids=None, profile_key=C.PROF_KEY_NONE): - """Get the published microblogs of the specified IDs and their comments. If - item_ids is None, returns the last published microblogs and their comments. @param pub_jid_s: jid of the publisher @param item_ids: list of microblogs items IDs - @param profile_key: profile key - @return: list of couple (microblog data, list of microblog data) + @param rsm (dict): RSM request data + @param count_comments (bool): also count the comments if True + @param profile_key (str): %(doc_profile_key)s + @return: a deferred couple (list, dict) containing: + - list of microblog data + - RSM response data """ - def get_comments(data): - d_list = [] - for entry in data: - if entry.get('comments', False): - d = self.getGroupBlogComments(entry['comments_service'], entry['comments_node'], profile_key=profile_key) - d.addCallback(lambda data: (entry, data)) - d_list.append(d) - else: - d_list.append(defer.succeed((entry, []))) - deferred_list = defer.DeferredList(d_list) - deferred_list.addCallback(lambda result: [value for (success, value) in result if success]) - return deferred_list + max_comments = 0 if count_comments else DO_NOT_COUNT_COMMENTS + return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key) + + def getGroupBlogsWithComments(self, pub_jid_s, item_ids=None, rsm=None, max_comments=None, profile_key=C.PROF_KEY_NONE): + """Get the published microblogs of the specified IDs and their comments. If + item_ids is None, returns the last published microblogs and their comments. - d = self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, profile_key=profile_key) - d.addCallback(get_comments) - return d + @param pub_jid_s: jid of the publisher + @param item_ids: list of microblogs items IDs + @param rsm (dict): RSM request data + @param max_comments (int): maximum number of comments to retrieve + @param profile_key (str): %(doc_profile_key)s + @return: a deferred couple (list, dict) containing: + - list of couple (dict, (list[dict], dict)) containing: + - microblog data (main item) + - couple (comments data, RSM response data for the comments) + - RSM response data + """ + if max_comments is None: + max_comments = MAX_COMMENTS + assert(max_comments > 0) # otherwise the return signature is not the same + return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key) - def getLastGroupBlogs(self, pub_jid_s, max_items=10, profile_key=C.PROF_KEY_NONE): - """Get the last published microblogs - @param pub_jid_s: jid of the publisher - @param max_items: how many microblogs we want to get (see XEP-0060 #6.5.7) - @param profile_key: profile key - @return: list of microblog data (dict) - """ - return self.__getGroupBlogs(pub_jid_s, max_items=max_items, profile_key=profile_key) - - def getLastGroupBlogsAtom(self, pub_jid_s, max_items=10, profile_key=C.PROF_KEY_NONE): + def getGroupBlogsAtom(self, pub_jid_s, rsm=None, profile_key=C.PROF_KEY_NONE): """Get the atom feed of the last published microblogs @param pub_jid: jid of the publisher - @param max_items: how many microblogs we want to get (see XEP-0060 #6.5.7) @param profile_key: profile key - @return: atom XML feed (unicode) + @return: a deferred unicode (atom XML feed) """ pub_jid = jid.JID(pub_jid_s) @@ -617,59 +647,41 @@ feed += " " + entry.toXml() + "\n" return feed + "" - def initialised(result): - profile, client = result - d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(pub_jid), - max_items=max_items, profile_key=profile_key) - d.addCallback(items2feed, pub_jid, client) - d.addErrback(lambda ignore: '') # TODO: more complete error management (log !) - return d + def cb(items, client): + return items2feed(items, pub_jid, client) - #TODO: we need to use the server corresponding the the host of the jid - return self._initialise(profile_key).addCallback(initialised) + d = DeferredItems(self, cb, lambda dummy: [''], profile_key).get(self.getNodeName(pub_jid), rsm=rsm) + return d.addCallback(lambda res: res[0]) - def getGroupBlogComments(self, service_s, node, profile_key=C.PROF_KEY_NONE): + def getGroupBlogComments(self, service_s, node, rsm=None, profile_key=C.PROF_KEY_NONE): """Get all comments of given node @param service_s: service hosting the node @param node: comments node @param profile_key: profile key - @return: list of microblog data (dict) + @return: a deferred couple (list, dict) containing: + - list of microblog data + - RSM response data """ service = jid.JID(service_s) - def initialised(result): - profile, client = result - d = self.host.plugins["XEP-0060"].getItems(service, node, - profile_key=profile_key) - d.addCallback(self._handleCommentsItems, service, node) - d.addErrback(lambda ignore: {}) # TODO: more complete error management (log !) - return d + def cb(items, client): + return self._handleCommentsItems(items, service, node) - #TODO: we need to use the server corresponding the the host of the jid - return self._initialise(profile_key).addCallback(initialised) + return DeferredItems(self, cb, None, profile_key).get(node, rsm=rsm) - def _getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE): + def _getMassiveGroupBlogs(self, publishers_type, publishers, rsm=None, profile_key=C.PROF_KEY_NONE): if publishers_type == 'JID': publishers_jids = [jid.JID(publisher) for publisher in publishers] else: publishers_jids = publishers - return self.getMassiveLastGroupBlogs(publishers_type, publishers_jids, max_items, profile_key) + return self.getMassiveGroupBlogs(publishers_type, publishers_jids, rsm, profile_key) - @defer.inlineCallbacks - def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE): - """Get the last published microblogs for a list of groups or jids - @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") - @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) - @param max_items: how many microblogs we want to get - @param profile_key: profile key - """ + def _getPublishersJIDs(self, publishers_type, publishers, client): #TODO: custom exception if publishers_type not in ["GROUP", "JID", "ALL"]: raise Exception("Bad call, unknown publishers_type") if publishers_type == "ALL" and publishers: raise Exception("Publishers list must be empty when getting microblogs for all contacts") - profile, client = yield self._initialise(profile_key) - #TODO: we need to use the server corresponding the the host of the jid if publishers_type == "ALL": contacts = client.roster.getItems() @@ -682,20 +694,26 @@ jids = publishers else: raise UnknownType - - data = {publisher: self.getNodeName(publisher) for publisher in jids} - d_dict = yield self.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, data, max_items=max_items, profile_key=profile) + return jids - def cb(jid): - def res(gbdata): - return (jid.full(), gbdata) - return res + def getMassiveGroupBlogs(self, publishers_type, publishers, rsm=None, profile_key=C.PROF_KEY_NONE): + """Get the last published microblogs for a list of groups or jids + @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") + @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) + @param rsm (dict): RSM request data, common to all publishers + @param profile_key: profile key + @return: a deferred dict with: + - key: publisher (unicode) + - value: couple (list[dict], dict) with: + - the microblogs data + - RSM response data + """ + def cb(items, publisher, client): + d = self._itemsConstruction(items, publisher, client) + return d.addCallback(self._getOrCountComments, False, profile_key) - for publisher, d in d_dict.items(): - d.addCallback(self._itemsConstruction, publisher, client) - d.addCallback(cb(publisher)) - result = yield defer.DeferredList(d_dict.values(), consumeErrors=False) - defer.returnValue({value[0]: value[1] for success, value in result if success}) + #TODO: we need to use the server corresponding to the host of the jid + return DeferredItemsFromMany(self, cb, profile_key).get(publishers_type, publishers, rsm=rsm) def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE): def initialised(result): @@ -721,30 +739,14 @@ @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) @param profile_key: profile key """ - #TODO: custom exception - if publishers_type not in ["GROUP", "JID", "ALL"]: - raise Exception("Bad call, unknown publishers_type") - if publishers_type == "ALL" and publishers: - raise Exception("Publishers list must be empty when getting microblogs for all contacts") profile, client = yield self._initialise(profile_key) #TODO: we need to use the server corresponding the the host of the jid - if publishers_type == "ALL": - contacts = client.roster.getItems() - jids = [contact.jid.userhostJID() for contact in contacts] - elif publishers_type == "GROUP": - jids = [] - for _group in publishers: - jids.extend(client.roster.getJidsFromGroup(_group)) - elif publishers_type == 'JID': - jids = publishers - else: - raise UnknownType - + jids = self._getPublishersJIDs(publishers_type, publishers, client) node_ids = [self.getNodeName(publisher) for publisher in jids] d_list = yield self.host.plugins["XEP-0060"].subscribeToMany(client.item_access_pubsub, node_ids, profile_key=profile_key) result = yield defer.DeferredList(d_list, consumeErrors=False) - defer.returnValue(result) + defer.returnValue(None) def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE): """Delete absolutely all the microblog data that the user has posted""" @@ -752,18 +754,26 @@ return defer.DeferredList(calls) def deleteAllGroupBlogs(self, profile_key=C.PROF_KEY_NONE): - """Delete all the main items and their comments that the user has posted + """Delete all the main items that the user has posted and their comments. """ def initialised(result): profile, client = result service = client.item_access_pubsub jid_ = client.jid + main_node = self.getNodeName(jid_) - main_node = self.getNodeName(jid_) - d = self.host.plugins["XEP-0060"].deleteNode(service, main_node, profile_key=profile) + def cb(nodes): + d_list = [] + for node in [node for node in nodes if node.endswith(main_node)]: + d = self.host.plugins["XEP-0060"].deleteNode(service, node, profile_key=profile) + d.addErrback(lambda failure: log.error(_("Deletion of node %(node)s failed: %(message)s") % + {'node': node, 'message': failure.getErrorMessage()})) + d_list.append(d) + return defer.DeferredList(d_list) + + d = self.host.plugins["XEP-0060"].listNodes(service, profile=profile) + d.addCallback(cb) d.addCallback(lambda dummy: log.info(_("All microblog's main items from %s have been deleted!") % jid_.userhost())) - d.addErrback(lambda failure: log.error(_("Deletion of node %(node)s failed: %(message)s") % - {'node': main_node, 'message': failure.getErrorMessage()})) return d return self._initialise(profile_key).addCallback(initialised) @@ -782,9 +792,11 @@ jids = [contact.jid.userhostJID() for contact in client.roster.getItems()] blogs = [] for jid_ in jids: + if jid_ == client.jid.userhostJID(): + continue # do not remove the comments on our own node main_node = self.getNodeName(jid_) d = self.host.plugins["XEP-0060"].getItems(service, main_node, profile_key=profile) - d.addCallback(getComments, client) + d.addCallback(lambda res: getComments(res[0], client)) d.addErrback(lambda failure, main_node: log.error(_("Retrieval of items for node %(node)s failed: %(message)s") % {'node': main_node, 'message': failure.getErrorMessage()}), main_node) blogs.append(d) @@ -807,7 +819,7 @@ href = link.getAttribute('href') service, node = self.host.plugins['XEP-0277'].parseCommentUrl(href) d = self.host.plugins["XEP-0060"].getItems(service, node, profile_key=profile_key) - d.addCallback(lambda items, service, node: (service, node, items), service, node) + d.addCallback(lambda items: (service, node, items[0])) d.addErrback(lambda failure, node: log.error(_("Retrieval of comments for node %(node)s failed: %(message)s") % {'node': node, 'message': failure.getErrorMessage()}), node) comments.append(d) @@ -850,6 +862,116 @@ return self._initialise(profile_key).addCallback(initialised) +class DeferredItems(): + """Helper class to retrieve items using XEP-0060""" + + def __init__(self, parent, cb, eb=None, profile_key=C.PROF_KEY_NONE): + """ + @param parent (GroupBlog): GroupBlog instance + @param cb (callable): callback method to be applied on items + @param eb (callable): errback method to be applied on items + @param profile_key (str): %(doc_profile_key)s + """ + self.parent = parent + self.cb = cb + self.eb = (lambda dummy: ([], {})) if eb is None else eb + self.profile_key = profile_key + + def get(self, node, item_ids=None, sub_id=None, rsm=None): + """ + @param node (str): node identifier. + @param item_ids (list[str]): list of items identifiers. + @param sub_id (str): optional subscription identifier. + @param rsm (dict): RSM request data + @return: a deferred couple (list, dict) containing: + - list of microblog data + - RSM response data + """ + if rsm is None: + rsm = {'max': (len(item_ids) if item_ids else MAX_ITEMS)} + + def initialised(result): + profile, client = result + rsm_ = wokkel_rsm.RSMRequest(**rsm) + d = self.parent.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, + node, rsm_.max, + item_ids, sub_id, rsm_, + profile_key=profile) + + def cb(result): + d = defer.maybeDeferred(self.cb, result[0], client) + return d.addCallback(lambda items: (items, result[1])) + + d.addCallbacks(cb, self.eb) + return d + + #TODO: we need to use the server corresponding to the host of the jid + return self.parent._initialise(self.profile_key).addCallback(initialised) + + +class DeferredItemsFromMany(): + def __init__(self, parent, cb, profile_key=C.PROF_KEY_NONE): + """ + @param parent (GroupBlog): GroupBlog instance + @param cb (callable): callback method to be applied on items + @param profile_key (str): %(doc_profile_key)s + """ + self.parent = parent + self.cb = cb + self.profile_key = profile_key + + def __buildData(self, publishers_type, publishers, client): + jids = self.parent._getPublishersJIDs(publishers_type, publishers, client) + return {publisher: self.parent.getNodeName(publisher) for publisher in jids} + + def get(self, publishers_type, publishers, sub_id=None, rsm=None): + """ + @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") + @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) + @param sub_id (str): optional subscription identifier. + @param rsm (dict): RSM request data + @return: a deferred dict with: + - key: publisher (unicode) + - value: couple (list[dict], dict) with: + - the microblogs data + - RSM response data + """ + if rsm is None: + rsm = {'max': MAX_ITEMS} + + def initialised(result): + profile, client = result + + data = self.__buildData(publishers_type, publishers, client) + rsm_ = wokkel_rsm.RSMRequest(**rsm) + d = self.parent.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, + data, rsm_.max, sub_id, + rsm_, profile_key=profile) + + def cb(publisher): + def callback(result): + d = defer.maybeDeferred(self.cb, result[0], publisher, client) + d.addCallback(lambda items: (publisher.full(), (items, result[1]))) + return d + return callback + + def cb_list(result): + return {value[0]: value[1] for success, value in result if success} + + def main_cb(result): + d_list = [] + for publisher, d_items in result.items(): + # XXX: trick needed as publisher is a loop variable + d_list.append(d_items.addCallback(cb(publisher))) + return defer.DeferredList(d_list, consumeErrors=False).addCallback(cb_list) + + d.addCallback(main_cb) + return d + + #TODO: we need to use the server corresponding to the host of the jid + return self.parent._initialise(self.profile_key).addCallback(initialised) + + class GroupBlog_handler(XMPPHandler): implements(iwokkel.IDisco) diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/plugin_xep_0059.py --- a/src/plugins/plugin_xep_0059.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/plugins/plugin_xep_0059.py Wed Mar 18 10:52:28 2015 +0100 @@ -22,7 +22,6 @@ from sat.core.log import getLogger log = getLogger(__name__) -from twisted.words.xish import domish from wokkel import disco, iwokkel try: from twisted.words.protocols.xmlstream import XMPPHandler @@ -45,93 +44,10 @@ class XEP_0059(object): + # XXX: RSM management is done directly in Wokkel. def __init__(self, host): log.info(_("Result Set Management plugin initialization")) - self.host = host - - def requestPage(self, stanza, limit=10, index=None, after=None, before=None): - """Embed a RSM page request in the given stanza. - - @param stanza (domish.Element): any stanza to which RSM applies - @param limit (int): the maximum number of items in the page - @param index (int): the starting index of the requested page - @param after (str, int): the element immediately preceding the page - @param before (str, int): the element immediately following the page - """ - main_elt = None - try: - main_elt = domish.generateElementsNamed(stanza.elements(), name="query").next() - except StopIteration: - try: - main_elt = domish.generateElementsNamed(stanza.elements(), name="pubsub").next() - except StopIteration: - log.warning("Injection of a RSM element only applies to query or pubsub stanzas") - return - limit = str(int(limit)) - - # in case the service doesn't support RSM, do this at least - main_elt.items.attributes['max_items'] = limit - - set_elt = main_elt.addElement('set', NS_RSM) - set_elt.addElement('max').addContent(limit) - if index: - assert(after is None and before is None) - set_elt.addElement('index').addContent(str(int(index))) - if after: - assert(before is None) # could not specify both at the same time - set_elt.addElement('after').addContent(str(after)) - if before is not None: - if before == '': # request the last page, according to http://xmpp.org/extensions/xep-0059.html#last - set_elt.addElement('before') - else: - set_elt.addElement('before').addContent(str(before)) - - def countItems(self, stanza): - """Count the items without retrieving any of them. - - @param stanza (domish.Element): any stanza to which RSM applies - """ - self.requestPage(stanza, limit=0) - - def extractMetadata(self, stanza): - """Extract the RSM metadata from the given stanza. - - @param stanza (domish.Element, wokkel.pubsub.PubSubRequest): - any stanza to which RSM applies. When used by XEP-0060, - wokkel's PubSubRequest instance is also accepted. - @return: dict containing the page metadata - """ - try: - main_elt = domish.generateElementsNamed(stanza.elements(), name="query").next() - except StopIteration: - try: - main_elt = domish.generateElementsNamed(stanza.elements(), name="pubsub").next() - except StopIteration: - log.warning("Extracting data from a RSM element only applies to query or pubsub stanzas") - return {} - try: - set_elt = domish.generateElementsQNamed(main_elt.elements(), name="set", uri=NS_RSM).next() - except StopIteration: - log.debug("There's no RSM element in the stanza") - return {} - - data = {} - elts = set_elt.elements() - try: - elt = elts.next() - if elt.name == "first": - data["first"] = "".join(elt.children) - data["first_index"] = int(elt.getAttribute("index")) - elif elt.name == "last": - data["last"] = "".join(elt.children) - elif elt.name == "count": - data["count"] = int("".join(elt.children)) - except StopIteration: - pass - if "count" not in data: - log.warning("There's no 'count' element in the RSM element!") - return data class XEP_0059_handler(XMPPHandler): diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/plugin_xep_0060.py --- a/src/plugins/plugin_xep_0060.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/plugins/plugin_xep_0060.py Wed Mar 18 10:52:28 2015 +0100 @@ -23,10 +23,10 @@ log = getLogger(__name__) from sat.memory.memory import Sessions -from wokkel.compat import IQ -from wokkel import disco, pubsub +from wokkel import disco, pubsub, rsm from zope.interface import implements from twisted.internet import defer +import uuid PLUGIN_INFO = { @@ -65,6 +65,12 @@ self.clients[profile] = SatPubSubClient(self.host, self) return self.clients[profile] + def profileDisconnected(self, profile): + try: + del self.clients[profile] + except KeyError: + pass + def addManagedNode(self, node_name, callback): """Add a handler for a namespace @param namespace: NS of the handler (will appear in disco info) @@ -155,21 +161,41 @@ profile, client = self.__getClientNProfile(profile_key, 'publish item') return client.publish(service, nodeIdentifier, items, client.parent.jid) - def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): + def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): + """Retrieve pubsub items from a node. + + @param service (JID): target service. + @param node (str): node id. + @param max_items (int): optional limit on the number of retrieved items. + @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used). + @param sub_id (str): optional subscription identifier. + @param rsm (dict): RSM request data + @param profile_key (str): %(doc_profile_key)s + @return: a deferred couple (list[dict], dict) containing: + - list of items + - RSM response data + """ profile, client = self.__getClientNProfile(profile_key, 'get items') - return client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) + ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None + d = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid, ext_data) + d.addCallback(lambda items: (items, client.getRSMResponse(ext_data['id']) if rsm else {})) + return d @defer.inlineCallbacks - def getItemsFromMany(self, service, data, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): + def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): """Massively retrieve pubsub items from many nodes. @param service (JID): target service. @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. @param max_items (int): optional limit on the number of retrieved items *per node*. - @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used). @param sub_id (str): optional subscription identifier. + @param rsm (dict): RSM request data @param profile_key (str): %(doc_profile_key)s - @return: dict binding a subset of the keys of data to Deferred instances. + @return: a deferred dict with: + - key: a value in (a subset of) data.keys() + - couple (list[dict], dict) containing: + - list of items + - RSM response data """ profile, client = self.__getClientNProfile(profile_key, 'get items') found_nodes = yield self.listNodes(service, profile=profile) @@ -178,7 +204,7 @@ if node not in found_nodes: log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) continue # avoid pubsub "item-not-found" error - d_dict[publisher] = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) + d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, profile) defer.returnValue(d_dict) def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): @@ -232,82 +258,16 @@ return client.subscriptions(service, nodeIdentifier) -class SatPubSubClient(pubsub.PubSubClient): +class SatPubSubClient(rsm.PubSubClient): implements(disco.IDisco) def __init__(self, host, parent_plugin): self.host = host self.parent_plugin = parent_plugin - pubsub.PubSubClient.__init__(self) + rsm.PubSubClient.__init__(self) def connectionInitialized(self): - pubsub.PubSubClient.connectionInitialized(self) - - # FIXME: we have to temporary override this method here just - # to set the attributes itemIdentifiers which is not used - # in pubsub.PubSubClient.items + use the XEP-0059 - def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, - subscriptionIdentifier=None, sender=None): - """ - Retrieve previously published items from a publish subscribe node. - - @param service: The publish subscribe service that keeps the node. - @type service: L{JID} - - @param nodeIdentifier: The identifier of the node. - @type nodeIdentifier: C{unicode} - - @param maxItems: Optional limit on the number of retrieved items. - @type maxItems: C{int} - - @param itemIdentifiers: Identifiers of the items to be retrieved. - @type itemIdentifiers: C{set} - - @param subscriptionIdentifier: Optional subscription identifier. In - case the node has been subscribed to multiple times, this narrows - the results to the specific subscription. - @type subscriptionIdentifier: C{unicode} - """ - # TODO: add method attributes for RSM: before, after, index - request = PubSubRequest('items', self.host, {'limit': maxItems} if maxItems else {}) - request.recipient = service - request.nodeIdentifier = nodeIdentifier - if maxItems: - request.maxItems = str(int(maxItems)) - request.subscriptionIdentifier = subscriptionIdentifier - request.sender = sender - request.itemIdentifiers = itemIdentifiers # XXX: this line has been added - - def cb(iq): - items = [] - for element in iq.pubsub.items.elements(): - if element.uri == pubsub.NS_PUBSUB and element.name == 'item': - items.append(element) - # TODO: return (items, self.host.plugins['XEP-0059'].extractMetadata(iq)) ?? - return items - - d = request.send(self.xmlstream) - d.addCallback(cb) - return d - - # FIXME: this should be done in wokkel - def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None): - """ - Retract items from a publish subscribe node. - - @param service: The publish subscribe service to delete the node from. - @type service: L{JID} - @param nodeIdentifier: The identifier of the node. - @type nodeIdentifier: C{unicode} - @param itemIdentifiers: Identifiers of the items to be retracted. - @type itemIdentifiers: C{set} - """ - request = PubSubRequest('retract') - request.recipient = service - request.nodeIdentifier = nodeIdentifier - request.itemIdentifiers = itemIdentifiers - request.sender = sender - return request.send(self.xmlstream) + rsm.PubSubClient.connectionInitialized(self) def itemsReceived(self, event): if not self.host.trigger.point("PubSubItemsReceived", event, self.parent.profile): @@ -322,7 +282,6 @@ # def purgeReceived(self, event): - @defer.inlineCallbacks def subscriptions(self, service, nodeIdentifier, sender=None): """Return the list of subscriptions to the given service and node. @@ -331,13 +290,17 @@ @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). @type nodeIdentifier: C{unicode} """ - request = PubSubRequest('subscriptions') + request = pubsub.PubSubRequest('subscriptions') request.recipient = service request.nodeIdentifier = nodeIdentifier request.sender = sender - iq = yield request.send(self.xmlstream) - defer.returnValue([sub for sub in iq.pubsub.subscriptions.elements() if - (sub.uri == pubsub.NS_PUBSUB and sub.name == 'subscription')]) + d = request.send(self.xmlstream) + + def cb(iq): + return [sub for sub in iq.pubsub.subscriptions.elements() if + (sub.uri == pubsub.NS_PUBSUB and sub.name == 'subscription')] + + return d.addCallback(cb) def getDiscoInfo(self, requestor, service, nodeIdentifier=''): disco_info = [] @@ -346,64 +309,3 @@ def getDiscoItems(self, requestor, service, nodeIdentifier=''): return [] - - -class PubSubRequest(pubsub.PubSubRequest): - - def __init__(self, verb=None, host=None, page_attrs=None): - """ - @param verb (str): the type of pubsub request - @param host (SAT): the SAT instance - @param page_attrs (dict): options for RSM paging: - - limit (int): the maximum number of items in the page - - index (int): the starting index of the requested page - - after (str, int): the element immediately preceding the page - - before (str, int): the element immediately following the page - """ - self.verb = verb - self.host = host - self.page_attrs = page_attrs - - # FIXME: the redefinition of this wokkel method is the easiest way I found - # to handle RSM. We should find a proper solution, maybe just add in wokkel an - # empty method postProcessMessage, call it before sending and overwrite it here - # instead of overwriting the whole send method. - def send(self, xs): - """ - Send this request to its recipient. - - This renders all of the relevant parameters for this specific - requests into an L{IQ}, and invoke its C{send} method. - This returns a deferred that fires upon reception of a response. See - L{IQ} for details. - - @param xs: The XML stream to send the request on. - @type xs: L{twisted.words.protocols.jabber.xmlstream.XmlStream} - @rtype: L{defer.Deferred}. - """ - - try: - (self.stanzaType, - childURI, - childName) = self._verbRequestMap[self.verb] - except KeyError: - raise NotImplementedError() - - iq = IQ(xs, self.stanzaType) - iq.addElement((childURI, 'pubsub')) - verbElement = iq.pubsub.addElement(childName) - - if self.sender: - iq['from'] = self.sender.full() - if self.recipient: - iq['to'] = self.recipient.full() - - for parameter in self._parameters[self.verb]: - getattr(self, '_render_%s' % parameter)(verbElement) - - # This lines have been added for RSM - if self.host and 'XEP-0059' in self.host.plugins and self.page_attrs: - self.page_attrs['stanza'] = iq - self.host.plugins['XEP-0059'].requestPage(**self.page_attrs) - - return iq.send() diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/plugin_xep_0203.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_xep_0203.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,84 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT plugin for Delayed Delivery (XEP-0203) +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from sat.core.i18n import _ +from sat.core.log import getLogger +log = getLogger(__name__) + +from wokkel import disco, iwokkel, delay +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler +from zope.interface import implements + + +NS_DD = 'urn:xmpp:delay' + +PLUGIN_INFO = { + "name": "Delayed Delivery", + "import_name": "XEP-0203", + "type": "XEP", + "protocols": ["XEP-0203"], + "main": "XEP_0203", + "handler": "yes", + "description": _("""Implementation of Delayed Delivery""") +} + + +class XEP_0203(object): + + def __init__(self, host): + log.info(_("Delayed Delivery plugin initialization")) + self.host = host + + def getHandler(self, profile): + return XEP_0203_handler(self, profile) + + def delay(self, stamp, sender=None, desc='', parent=None): + """Build a delay element, eventually append it to the given parent element. + + @param stamp (datetime): offset-aware timestamp of the original sending. + @param sender (JID): entity that originally sent or delayed the message. + @param desc (unicode): optional natural language description. + @param parent (domish.Element): add the delay element to this element. + @return: the delay element (domish.Element) + """ + elt = delay.Delay(stamp, sender).toElement() + if desc: + elt.addContent(desc) + if parent: + parent.addChild(elt) + return elt + + +class XEP_0203_handler(XMPPHandler): + implements(iwokkel.IDisco) + + def __init__(self, plugin_parent, profile): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + self.profile = profile + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_DD)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return [] diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/plugin_xep_0277.py --- a/src/plugins/plugin_xep_0277.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/plugins/plugin_xep_0277.py Wed Mar 18 10:52:28 2015 +0100 @@ -42,8 +42,9 @@ "name": "Microblogging over XMPP Plugin", "import_name": "XEP-0277", "type": "XEP", - "protocols": [], + "protocols": ["XEP-0277"], "dependencies": ["XEP-0163", "XEP-0060", "TEXT-SYNTAXES"], + "recommendations": ["XEP-0059"], "main": "XEP_0277", "handler": "no", "description": _("""Implementation of microblogging Protocol""") @@ -77,8 +78,8 @@ def parseCommentUrl(self, node_url): """Determine the fields comments_service and comments_node of a microblog data from the href attribute of an entry's link element. For example this input: - xmpp:sat-pubsub.libervia.org?node=urn%3Axmpp%3Acomments%3A_c5c4a142-2279-4b2a-ba4c-1bc33aa87634__urn%3Axmpp%3Agroupblog%3Asouliane%libervia.org - will return (JID(u'sat-pubsub.libervia.org'), 'urn:xmpp:comments:_c5c4a142-2279-4b2a-ba4c-1bc33aa87634__urn:xmpp:groupblog:souliane%libervia.org') + xmpp:sat-pubsub.libervia.org?node=urn%3Axmpp%3Acomments%3A_c5c4a142-2279-4b2a-ba4c-1bc33aa87634__urn%3Axmpp%3Agroupblog%3Asouliane%40libervia.org + will return (JID(u'sat-pubsub.libervia.org'), 'urn:xmpp:comments:_c5c4a142-2279-4b2a-ba4c-1bc33aa87634__urn:xmpp:groupblog:souliane@libervia.org') @return: a tuple (JID, str) """ parsed_url = urlparse.urlparse(node_url, 'xmpp') @@ -315,10 +316,12 @@ @param pub_jid: jid of the publisher @param max_items: how many microblogs we want to get @param profile_key: profile key + + @return: a deferred couple with the list of items and RSM information. """ d = self.host.plugins["XEP-0060"].getItems(jid.JID(pub_jid), NS_MICROBLOG, max_items=max_items, profile_key=profile_key) - d.addCallback(lambda items: defer.DeferredList(map(self.item2mbdata, items), consumeErrors=True)) - d.addCallback(lambda result: [value for (success, value) in result if success]) + d.addCallback(lambda res: (defer.DeferredList(map(self.item2mbdata, res[0]), consumeErrors=True), res[1])) + d.addCallback(lambda res: ([value for (success, value) in res[0] if success], res[1])) return d def setMicroblogAccess(self, access="presence", profile_key=C.PROF_KEY_NONE): diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/plugin_xep_0297.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_xep_0297.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,116 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT plugin for Stanza Forwarding (XEP-0297) +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from sat.core.constants import Const as C +from sat.core.i18n import _ +from sat.core.log import getLogger +log = getLogger(__name__) + +from wokkel import disco, iwokkel +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler +from zope.interface import implements + +from twisted.words.xish import domish + +NS_SF = 'urn:xmpp:forward:0' + +PLUGIN_INFO = { + "name": "Stanza Forwarding", + "import_name": "XEP-0297", + "type": "XEP", + "protocols": ["XEP-0297"], + "main": "XEP_0297", + "handler": "yes", + "description": _("""Implementation of Stanza Forwarding""") +} + + +class XEP_0297(object): + + def __init__(self, host): + log.info(_("Stanza Forwarding plugin initialization")) + self.host = host + + def getHandler(self, profile): + return XEP_0297_handler(self, profile) + + @classmethod + def updateUri(cls, element, uri): + """Update recursively the element URI. + + @param element (domish.Element): element to update + @param uri (unicode): new URI + """ + # XXX: we need this because changing the URI of an existing element + # containing children doesn't update the children's blank URI. + element.uri = uri + element.defaultUri = uri + for child in element.children: + if isinstance(child, domish.Element) and not child.uri: + XEP_0297.updateUri(child, uri) + + def forward(self, stanza, to_jid, stamp, body='', profile_key=C.PROF_KEY_NONE): + """Forward a message to the given JID. + + @param stanza (domish.Element): original stanza to be forwarded. + @param to_jid (JID): recipient JID. + @param stamp (datetime): offset-aware timestamp of the original reception. + @param body (unicode): optional description. + @param profile_key (unicode): %(doc_profile_key)s + @return: a Deferred when the message has been sent + """ + msg = domish.Element((None, 'message')) + msg['to'] = to_jid.full() + msg['type'] = stanza['type'] + + body_elt = domish.Element((None, 'body')) + if body: + body_elt.addContent(body) + + forwarded_elt = domish.Element((NS_SF, 'forwarded')) + delay_elt = self.host.plugins['XEP-0203'].delay(stamp) + forwarded_elt.addChild(delay_elt) + if not stanza.uri: # None or '' + XEP_0297.updateUri(stanza, 'jabber:client') + forwarded_elt.addChild(stanza) + + msg.addChild(body_elt) + msg.addChild(forwarded_elt) + + client = self.host.getClient(profile_key) + return client.xmlstream.send(msg.toXml()) + + +class XEP_0297_handler(XMPPHandler): + implements(iwokkel.IDisco) + + def __init__(self, plugin_parent, profile): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + self.profile = profile + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_SF)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return [] diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/plugin_xep_0313.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_xep_0313.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,247 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT plugin for Message Archive Management (XEP-0313) +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from sat.core.constants import Const as C +from sat.core.i18n import _ +from sat.core.log import getLogger +log = getLogger(__name__) + +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid + +from zope.interface import implements + +from wokkel import disco, data_form +from wokkel.generic import parseXml +from wokkel.pubsub import NS_PUBSUB_EVENT, ItemsEvent + +# TODO: change this when RSM and MAM are in wokkel +from sat.tmp.wokkel.rsm import RSMRequest +from sat.tmp.wokkel import mam + + +NS_MAM = 'urn:xmpp:mam:0' +NS_SF = 'urn:xmpp:forward:0' +NS_DD = 'urn:xmpp:delay' +NS_CLIENT = 'jabber:client' + +PLUGIN_INFO = { + "name": "Message Archive Management", + "import_name": "XEP-0313", + "type": "XEP", + "protocols": ["XEP-0313"], + "dependencies": ["XEP-0059", "XEP-0297", "XEP-0203"], + "recommendations": ["XEP-0334"], + "main": "XEP_0313", + "handler": "yes", + "description": _("""Implementation of Message Archive Management""") +} + + +class XEP_0313(object): + + def __init__(self, host): + log.info(_("Message Archive Management plugin initialization")) + self.host = host + self.clients = {} # bind profile name to SatMAMClient + host.bridge.addMethod("MAMqueryFields", ".plugin", in_sign='ss', out_sign='s', + method=self._queryFields, + async=True, + doc={}) + host.bridge.addMethod("MAMqueryArchive", ".plugin", in_sign='ssa{ss}ss', out_sign='s', + method=self._queryArchive, + async=True, + doc={}) + host.bridge.addMethod("MAMgetPrefs", ".plugin", in_sign='ss', out_sign='s', + method=self._getPrefs, + async=True, + doc={}) + host.bridge.addMethod("MAMsetPrefs", ".plugin", in_sign='ssasass', out_sign='s', + method=self._setPrefs, + async=True, + doc={}) + host.trigger.add("MessageReceived", self.messageReceivedTrigger) + + def getHandler(self, profile): + self.clients[profile] = SatMAMClient(self, profile) + return self.clients[profile] + + def profileDisconnected(self, profile): + try: + del self.clients[profile] + except KeyError: + pass + + def _queryFields(self, service_s=None, profile_key=C.PROF_KEY_NONE): + service = jid.JID(service_s) if service_s else None + return self.queryFields(service, profile_key) + + def queryFields(self, service=None, profile_key=C.PROF_KEY_NONE): + """Ask the server about additional supported fields. + + @param service: entity offering the MAM service (None for user archives) + @param profile_key (unicode): %(doc_profile_key)s + @return: the server response as a Deferred domish.Element + """ + # http://xmpp.org/extensions/xep-0313.html#query-form + def eb(failure): + # typically StanzaError with condition u'service-unavailable' + log.error(failure.getErrorMessage()) + return '' + + profile = self.host.memory.getProfileName(profile_key) + d = self.clients[profile].queryFields(service) + return d.addCallbacks(lambda elt: elt.toXml(), eb) + + def _queryArchive(self, service_s=None, form_xml=None, rsm_dict=None, node=None, profile_key=C.PROF_KEY_NONE): + service = jid.JID(service_s) if service_s else None + if form_xml: + form = data_form.Form.fromElement(parseXml(form_xml)) + if form.formNamespace != NS_MAM: + log.error(_("Expected a MAM Data Form, got instead: %s") % form.formNamespace) + form = None + else: + form = None + rsm = RSMRequest(**rsm_dict) if rsm_dict else None + return self.queryArchive(service, form, rsm, node, profile_key) + + def queryArchive(self, service=None, form=None, rsm=None, node=None, profile_key=C.PROF_KEY_NONE): + """Query a user, MUC or pubsub archive. + + @param service: entity offering the MAM service (None for user archives) + @param form (Form): data form to filter the request + @param rsm (RSMRequest): RSM request instance + @param node (unicode): pubsub node to query, or None if inappropriate + @param profile_key (unicode): %(doc_profile_key)s + @return: a Deferred when the message has been sent + """ + def eb(failure): + # typically StanzaError with condition u'service-unavailable' + log.error(failure.getErrorMessage()) + return '' + + profile = self.host.memory.getProfileName(profile_key) + d = self.clients[profile].queryArchive(service, form, rsm, node) + return d.addCallbacks(lambda elt: elt.toXml(), eb) + # TODO: add the handler for receiving the final message + + def _getPrefs(self, service_s=None, profile_key=C.PROF_KEY_NONE): + service = jid.JID(service_s) if service_s else None + return self.getPrefs(service, profile_key) + + def getPrefs(self, service=None, profile_key=C.PROF_KEY_NONE): + """Retrieve the current user preferences. + + @param service: entity offering the MAM service (None for user archives) + @param profile_key (unicode): %(doc_profile_key)s + @return: the server response as a Deferred domish.Element + """ + # http://xmpp.org/extensions/xep-0313.html#prefs + def eb(failure): + # typically StanzaError with condition u'service-unavailable' + log.error(failure.getErrorMessage()) + return '' + + profile = self.host.memory.getProfileName(profile_key) + d = self.clients[profile].queryPrefs(service) + return d.addCallbacks(lambda elt: elt.toXml(), eb) + + def _setPrefs(self, service_s=None, default='roster', always=None, never=None, profile_key=C.PROF_KEY_NONE): + service = jid.JID(service_s) if service_s else None + always_jid = [jid.JID(entity) for entity in always] + never_jid = [jid.JID(entity) for entity in never] + #TODO: why not build here a MAMPrefs object instead of passing the args separately? + return self.setPrefs(service, default, always_jid, never_jid, profile_key) + + def setPrefs(self, service=None, default='roster', always=None, never=None, profile_key=C.PROF_KEY_NONE): + """Set news user preferences. + + @param service: entity offering the MAM service (None for user archives) + @param default (unicode): a value in ('always', 'never', 'roster') + @param always (list): a list of JID instances + @param never (list): a list of JID instances + @param profile_key (unicode): %(doc_profile_key)s + @return: the server response as a Deferred domish.Element + """ + # http://xmpp.org/extensions/xep-0313.html#prefs + def eb(failure): + # typically StanzaError with condition u'service-unavailable' + log.error(failure.getErrorMessage()) + return '' + + profile = self.host.memory.getProfileName(profile_key) + d = self.clients[profile].setPrefs(service, default, always, never) + return d.addCallbacks(lambda elt: elt.toXml(), eb) + + def messageReceivedTrigger(self, message, post_treat, profile): + """Check if the message is a MAM result. If so, extract the original + message, stop processing the current message and process the original + message instead. + """ + try: + result = domish.generateElementsQNamed(message.elements(), "result", NS_MAM).next() + except StopIteration: + return True + try: + forwarded = domish.generateElementsQNamed(result.elements(), "forwarded", NS_SF).next() + except StopIteration: + log.error(_("MAM result misses its mandatory element!")) + return False + try: + # TODO: delay is not here for nothing, get benefice of it! + delay = domish.generateElementsQNamed(forwarded.elements(), "delay", NS_DD).next() + msg = domish.generateElementsQNamed(forwarded.elements(), "message", NS_CLIENT).next() + except StopIteration: + log.error(_(" element misses a mandatory child!")) + return False + log.debug(_("MAM found a forwarded message")) + + if msg.event and msg.event.uri == NS_PUBSUB_EVENT: + event = ItemsEvent(jid.JID(message['from']), + jid.JID(message['to']), + msg.event.items['node'], + msg.event.items.elements(), + {}) + self.host.plugins["XEP-0060"].clients[profile].itemsReceived(event) + return False + + client = self.host.getClient(profile) + client.messageProt.onMessage(msg) + return False + + +class SatMAMClient(mam.MAMClient): + implements(disco.IDisco) + + def __init__(self, plugin_parent, profile): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + self.profile = profile + mam.MAMClient.__init__(self) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_MAM)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return [] diff -r 584d45bb36d9 -r f71a0fc26886 src/plugins/plugin_xep_0334.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_xep_0334.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,111 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT plugin for Delayed Delivery (XEP-0334) +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from sat.core.i18n import _ +from sat.core.log import getLogger +log = getLogger(__name__) + +from sat.core import exceptions + +from wokkel import disco, iwokkel +try: + from twisted.words.protocols.xmlstream import XMPPHandler +except ImportError: + from wokkel.subprotocols import XMPPHandler +from twisted.python import failure +from zope.interface import implements + + +NS_MPH = 'urn:xmpp:hints' + +PLUGIN_INFO = { + "name": "Message Processing Hints", + "import_name": "XEP-0334", + "type": "XEP", + "protocols": ["XEP-0334"], + "main": "XEP_0334", + "handler": "yes", + "description": _("""Implementation of Message Processing Hints""") +} + + +class XEP_0334(object): + + def __init__(self, host): + log.info(_("Message Processing Hints plugin initialization")) + self.host = host + host.trigger.add("sendMessage", self.sendMessageTrigger) + host.trigger.add("MessageReceived", self.messageReceivedTrigger) + + def getHandler(self, profile): + return XEP_0334_handler(self, profile) + + def sendMessageTrigger(self, mess_data, pre_xml_treatments, post_xml_treatments, profile): + """Add the hints element to the message to be sent""" + hints = [] + for key in ('no-permanent-storage', 'no-storage', 'no-copy'): + if mess_data['extra'].get(key, None): + hints.append(key) + + def treatment(mess_data): + message = mess_data['xml'] + for key in hints: + message.addElement((NS_MPH, key)) + if key in ('no-permanent-storage', 'no-storage'): + mess_data['extra']['no_storage'] = True + # TODO: the core doesn't process this 'no_storage' info yet + # it will be added after the frontends refactorization + return mess_data + + if hints: + post_xml_treatments.addCallback(treatment) + return True + + def messageReceivedTrigger(self, message, post_treat, profile): + """Check for hints in the received message""" + hints = [] + for key in ('no-permanent-storage', 'no-storage'): + try: + message.elements(uri=NS_MPH, name=key).next() + hints.append(key) + except StopIteration: + pass + + def post_treat_hints(data): + raise failure.Failure(exceptions.SkipHistory()) + + if hints: + post_treat.addCallback(post_treat_hints) + return True + + +class XEP_0334_handler(XMPPHandler): + implements(iwokkel.IDisco) + + def __init__(self, plugin_parent, profile): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + self.profile = profile + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_MPH)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return [] diff -r 584d45bb36d9 -r f71a0fc26886 src/test/constants.py --- a/src/test/constants.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/test/constants.py Wed Mar 18 10:52:28 2015 +0100 @@ -23,6 +23,8 @@ class Const(object): + PROF_KEY_NONE = '@NONE@' + PROFILE = ['test_profile', 'test_profile2', 'test_profile3', 'test_profile4', 'test_profile5'] JID_STR = [u"test@example.org/SàT", u"sender@example.net/house", u"sender@example.net/work", u"sender@server.net/res", u"xxx@server.net/res"] JID = [jid.JID(jid_s) for jid_s in JID_STR] diff -r 584d45bb36d9 -r f71a0fc26886 src/test/helpers.py --- a/src/test/helpers.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/test/helpers.py Wed Mar 18 10:52:28 2015 +0100 @@ -23,14 +23,16 @@ log_config.satConfigure() from sat.core import exceptions -from constants import Const +from constants import Const as C from wokkel.xmppim import RosterItem +from wokkel.generic import parseXml from sat.core.xmpp import SatRosterProtocol from sat.memory.memory import Params, Memory from twisted.trial.unittest import FailTest from twisted.trial import unittest from twisted.internet import defer from twisted.words.protocols.jabber.jid import JID +from twisted.words.xish import domish from xml.etree import cElementTree as etree from collections import Counter import re @@ -64,12 +66,15 @@ self.bridge = FakeBridge() self.memory = FakeMemory(self) self.trigger = FakeTriggerManager() - self.init() + self.profiles = {} + self.reinit() - def init(self): + def reinit(self): """This can be called by tests that check for sent and stored messages, uses FakeClient or get/set some other data that need to be cleaned""" - self.sent_messages = [] + for profile in self.profiles: + self.profiles[profile].reinit() + self.memory.reinit() self.stored_messages = [] self.plugins = {} self.profiles = {} @@ -90,7 +95,7 @@ @param mess_data: message data dictionnary @param client: profile's client """ - self.sent_messages.append(mess_data["to"]) + client.xmlstream.send(mess_data['xml']) return mess_data def _storeMessage(self, mess_data, client): @@ -110,6 +115,10 @@ """ return mess_data # TODO + def getProfileName(self, profile_key): + """Get the profile name from the profile_key""" + return profile_key + def getClient(self, profile_key): """Convenient method to get client from profile key @return: client or None if it doesn't exist""" @@ -123,18 +132,23 @@ def getJidNStream(self, profile_key): """Convenient method to get jid and stream from profile key @return: tuple (jid, xmlstream) from profile, can be None""" - return (Const.PROFILE_DICT[profile_key], None) + return (C.PROFILE_DICT[profile_key], None) def isConnected(self, profile): return True - def getSentMessageRaw(self, profile_index): - """Pop and return the sent message in first position (works like a FIFO). - Called by tests. FakeClient instances associated to each profile must have - been previously initialized with the method FakeSAT.getClient. - @return: the sent message for given profile, or None""" + def getSentMessages(self, profile_index): + """Return all the sent messages (in the order they have been sent) and + empty the list. Called by tests. FakeClient instances associated to each + profile must have been previously initialized with the method + FakeSAT.getClient. + + @param profile_index: index of the profile to consider (cf. C.PROFILE) + @return: the sent messages for given profile, or None""" try: - return self.profiles[Const.PROFILE[profile_index]].xmlstream.sent.pop(0) + tmp = self.profiles[C.PROFILE[profile_index]].xmlstream.sent + self.profiles[C.PROFILE[profile_index]].xmlstream.sent = [] + return tmp except IndexError: return None @@ -142,8 +156,20 @@ """Pop and return the sent message in first position (works like a FIFO). Called by tests. FakeClient instances associated to each profile must have been previously initialized with the method FakeSAT.getClient. + + @param profile_index: index of the profile to consider (cf. C.PROFILE) + @return: the sent message for given profile, or None""" + try: + return self.profiles[C.PROFILE[profile_index]].xmlstream.sent.pop(0) + except IndexError: + return None + + def getSentMessageXml(self, profile_index): + """Pop and return the sent message in first position (works like a FIFO). + Called by tests. FakeClient instances associated to each profile must have + been previously initialized with the method FakeSAT.getClient. @return: XML representation of the sent message for given profile, or None""" - entry = self.getSentMessageRaw(profile_index) + entry = self.getSentMessage(profile_index) return entry.toXml() if entry else None def findFeaturesSet(self, features, category=None, type_=None, jid_=None, profile_key=None): @@ -238,7 +264,7 @@ def getProfileName(self, profile_key, return_profile_keys=False): if profile_key == '@DEFAULT@': - return Const.PROFILE[0] + return C.PROFILE[0] elif profile_key == '@NONE@': raise exceptions.ProfileNotSetError else: @@ -258,9 +284,9 @@ self.host = host self.params = FakeParams(host, None) self.config = self.parseMainConf() - self.init() + self.reinit() - def init(self): + def reinit(self): """Tests that manipulate params, entities, features should re-initialise the memory first to not fake the result.""" self.params.load_default_params() @@ -335,21 +361,39 @@ self.sent = [] def send(self, obj): - """Save the sent messages to compare them later""" + """Save the sent messages to compare them later. + + @param obj (domish.Element, str or unicode): message to send + """ + if not isinstance(obj, domish.Element): + assert(isinstance(obj, str) or isinstance(obj, unicode)) + obj = parseXml(obj) + + if obj.name == 'iq': + # IQ request expects an answer, return the request itself so + # you can check if it has been well built by your plugin. + self.iqDeferreds[obj['id']].callback(obj) + self.sent.append(obj) return defer.succeed(None) + def addObserver(self, *argv): + pass + class FakeClient(object): """Tests involving more than one profile need one instance of this class per profile""" def __init__(self, host, profile=None): self.host = host - self.profile = profile if profile else Const.PROFILE[0] - self.jid = Const.PROFILE_DICT[self.profile] + self.profile = profile if profile else C.PROFILE[0] + self.jid = C.PROFILE_DICT[self.profile] self.roster = FakeRosterProtocol(host, self) self.xmlstream = FakeXmlStream() + def reinit(self): + self.xmlstream = FakeXmlStream() + def send(self, obj): return self.xmlstream.send(obj) diff -r 584d45bb36d9 -r f71a0fc26886 src/test/helpers_plugins.py --- a/src/test/helpers_plugins.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/test/helpers_plugins.py Wed Mar 18 10:52:28 2015 +0100 @@ -20,10 +20,18 @@ """ Helpers class for plugin dependencies """ -from constants import Const +from twisted.internet import defer + +from wokkel.muc import Room, User +from wokkel.generic import parseXml +from wokkel.disco import DiscoItem, DiscoItems + +# temporary until the changes are integrated to Wokkel +from sat.tmp.wokkel.rsm import RSMResponse + +from constants import Const as C from sat.plugins import plugin_xep_0045 -from twisted.internet import defer -from wokkel.muc import Room, User +from collections import OrderedDict class FakeMUCClient(object): @@ -42,13 +50,13 @@ roster = {} # ask the other profiles to fill our roster - for i in xrange(0, len(Const.PROFILE)): - other_profile = Const.PROFILE[i] + for i in xrange(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] if other_profile == profile: continue try: other_room = self.plugin_parent.clients[other_profile].joined_rooms[roomJID.userhost()] - roster.setdefault(other_room.nick, User(other_room.nick, Const.PROFILE_DICT[other_profile])) + roster.setdefault(other_room.nick, User(other_room.nick, C.PROFILE_DICT[other_profile])) for other_nick in other_room.roster: roster.setdefault(other_nick, other_room.roster[other_nick]) except (AttributeError, KeyError): @@ -56,7 +64,7 @@ # rename our nick if it already exists while nick in roster.keys(): - if Const.PROFILE_DICT[profile].userhost() == roster[nick].entity.userhost(): + if C.PROFILE_DICT[profile].userhost() == roster[nick].entity.userhost(): break # same user with different resource --> same nickname nick = nick + "_" @@ -65,13 +73,13 @@ self.joined_rooms[roomJID.userhost()] = room # fill the other rosters with the new entry - for i in xrange(0, len(Const.PROFILE)): - other_profile = Const.PROFILE[i] + for i in xrange(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] if other_profile == profile: continue try: other_room = self.plugin_parent.clients[other_profile].joined_rooms[roomJID.userhost()] - other_room.roster.setdefault(room.nick, User(room.nick, Const.PROFILE_DICT[profile])) + other_room.roster.setdefault(room.nick, User(room.nick, C.PROFILE_DICT[profile])) except (AttributeError, KeyError): pass @@ -85,8 +93,8 @@ """ room = self.joined_rooms[roomJID.userhost()] # remove ourself from the other rosters - for i in xrange(0, len(Const.PROFILE)): - other_profile = Const.PROFILE[i] + for i in xrange(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] if other_profile == profile: continue try: @@ -103,7 +111,7 @@ def __init__(self, host): self.host = host self.clients = {} - for profile in Const.PROFILE: + for profile in C.PROFILE: self.clients[profile] = FakeMUCClient(self) def join(self, room_jid, nick, options={}, profile_key='@DEFAULT@'): @@ -124,9 +132,9 @@ def joinRoom(self, muc_index, user_index): """Called by tests @return: the nickname of the user who joined room""" - muc_jid = Const.MUC[muc_index] - nick = Const.JID[user_index].user - profile = Const.PROFILE[user_index] + muc_jid = C.MUC[muc_index] + nick = C.JID[user_index].user + profile = C.PROFILE[user_index] self.join(muc_jid, nick, profile_key=profile) return self.getNick(muc_index, user_index) @@ -145,17 +153,17 @@ def leaveRoom(self, muc_index, user_index): """Called by tests @return: the nickname of the user who left the room""" - muc_jid = Const.MUC[muc_index] + muc_jid = C.MUC[muc_index] nick = self.getNick(muc_index, user_index) - profile = Const.PROFILE[user_index] + profile = C.PROFILE[user_index] self.leave(muc_jid, profile_key=profile) return nick def getRoom(self, muc_index, user_index): """Called by tests @return: a wokkel.muc.Room instance""" - profile = Const.PROFILE[user_index] - muc_s = Const.MUC_STR[muc_index] + profile = C.PROFILE[user_index] + muc_s = C.MUC_STR[muc_index] try: return self.clients[profile].joined_rooms[muc_s] except (AttributeError, KeyError): @@ -163,14 +171,14 @@ def getNick(self, muc_index, user_index): try: - return self.getRoomNick(Const.MUC_STR[muc_index], Const.PROFILE[user_index]) + return self.getRoomNick(C.MUC_STR[muc_index], C.PROFILE[user_index]) except (KeyError, AttributeError): return '' def getNickOfUser(self, muc_index, user_index, profile_index, secure=True): try: - room = self.clients[Const.PROFILE[profile_index]].joined_rooms[Const.MUC_STR[muc_index]] - return self.getRoomNickOfUser(room, Const.JID_STR[user_index]) + room = self.clients[C.PROFILE[profile_index]].joined_rooms[C.MUC_STR[muc_index]] + return self.getRoomNickOfUser(room, C.JID_STR[user_index]) except (KeyError, AttributeError): return None @@ -190,3 +198,84 @@ @profile_key: %(doc_profile_key)s """ pass + + +class FakeSatPubSubClient(object): + + def __init__(self, host, parent_plugin): + self.host = host + self.parent_plugin = parent_plugin + self.__items = OrderedDict() + self.__rsm_responses = {} + + def createNode(self, service, nodeIdentifier=None, options=None, + sender=None): + return defer.succeed(None) + + def deleteNode(self, service, nodeIdentifier, sender=None): + try: + del self.__items[nodeIdentifier] + except KeyError: + pass + return defer.succeed(None) + + def subscribe(self, service, nodeIdentifier, subscriber, + options=None, sender=None): + return defer.succeed(None) + + def unsubscribe(self, service, nodeIdentifier, subscriber, + subscriptionIdentifier=None, sender=None): + return defer.succeed(None) + + def publish(self, service, nodeIdentifier, items=None, sender=None): + node = self.__items.setdefault(nodeIdentifier, []) + + def replace(item_obj): + index = 0 + for current in node: + if current['id'] == item_obj['id']: + node[index] = item_obj + return True + index += 1 + return False + + for item in items: + item_obj = parseXml(item) if isinstance(item, unicode) else item + if not replace(item_obj): + node.append(item_obj) + return defer.succeed(None) + + def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, + subscriptionIdentifier=None, sender=None, ext_data=None): + try: + items = self.__items[nodeIdentifier] + except KeyError: + items = [] + if ext_data: + assert('id' in ext_data) + if 'rsm' in ext_data: + args = (0, items[0]['id'], items[-1]['id']) if items else () + self.__rsm_responses[ext_data['id']] = RSMResponse(len(items), *args) + return defer.succeed(items) + + def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None): + node = self.__items[nodeIdentifier] + for item in [item for item in node if item['id'] in itemIdentifiers]: + node.remove(item) + return defer.succeed(None) + + def getRSMResponse(self, id): + if id not in self.__rsm_responses: + return {} + result = self.__rsm_responses[id].toDict() + del self.__rsm_responses[id] + return result + + def subscriptions(self, service, nodeIdentifier, sender=None): + return defer.succeed([]) + + def service_getDiscoItems(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): + items = DiscoItems() + for item in self.__items.keys(): + items.append(DiscoItem(service, item)) + return defer.succeed(items) diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_memory.py --- a/src/test/test_memory.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/test/test_memory.py Wed Mar 18 10:52:28 2015 +0100 @@ -136,7 +136,7 @@ return self.host.memory.getParams(security_limit, app, profile_key) def test_updateParams(self): - self.host.memory.init() + self.host.memory.reinit() # check if the update works self.host.memory.updateParams(self._getParamXML()) self.assertParamExists() @@ -145,7 +145,7 @@ self.host.memory.updateParams(self._getParamXML()) self.assertEqual(previous.toxml().encode("utf-8"), self.host.memory.params.dom.toxml().encode("utf-8")) - self.host.memory.init() + self.host.memory.reinit() # check successive updates (without intersection) self.host.memory.updateParams(self._getParamXML('1')) self.assertParamExists("1") @@ -156,7 +156,7 @@ previous = self.host.memory.params.dom.cloneNode(True) # save for later - self.host.memory.init() + self.host.memory.reinit() # check successive updates (with intersection) self.host.memory.updateParams(self._getParamXML('1')) self.assertParamExists("1") @@ -168,7 +168,7 @@ # successive updates with or without intersection should have the same result self.assertEqual(previous.toxml().encode("utf-8"), self.host.memory.params.dom.toxml().encode("utf-8")) - self.host.memory.init() + self.host.memory.reinit() # one update with two params in a new category self.host.memory.updateParams(self._getParamXML('12')) self.assertParamExists("1") @@ -177,21 +177,21 @@ def test_getParams(self): # tests with no security level on the parameter (most secure) params = self._getParamXML() - self.host.memory.init() + self.host.memory.reinit() self.host.memory.updateParams(params) self._getParams(Const.NO_SECURITY_LIMIT).addCallback(self.assertParamExists_async) self._getParams(0).addCallback(self.assertParamNotExists_async) self._getParams(1).addCallback(self.assertParamNotExists_async) # tests with security level 0 on the parameter (not secure) params = self._getParamXML(security_level=0) - self.host.memory.init() + self.host.memory.reinit() self.host.memory.updateParams(params) self._getParams(Const.NO_SECURITY_LIMIT).addCallback(self.assertParamExists_async) self._getParams(0).addCallback(self.assertParamExists_async) self._getParams(1).addCallback(self.assertParamExists_async) # tests with security level 1 on the parameter (more secure) params = self._getParamXML(security_level=1) - self.host.memory.init() + self.host.memory.reinit() self.host.memory.updateParams(params) self._getParams(Const.NO_SECURITY_LIMIT).addCallback(self.assertParamExists_async) self._getParams(0).addCallback(self.assertParamNotExists_async) @@ -213,66 +213,66 @@ # tests with no security level on the parameter (most secure) params = self._getParamXML() - self.host.memory.init() + self.host.memory.reinit() register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assertParamExists() - self.host.memory.init() + self.host.memory.reinit() register(params, 0, Const.APP_NAME) self.assertParamNotExists() - self.host.memory.init() + self.host.memory.reinit() register(params, 1, Const.APP_NAME) self.assertParamNotExists() # tests with security level 0 on the parameter (not secure) params = self._getParamXML(security_level=0) - self.host.memory.init() + self.host.memory.reinit() register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assertParamExists() - self.host.memory.init() + self.host.memory.reinit() register(params, 0, Const.APP_NAME) self.assertParamExists() - self.host.memory.init() + self.host.memory.reinit() register(params, 1, Const.APP_NAME) self.assertParamExists() # tests with security level 1 on the parameter (more secure) params = self._getParamXML(security_level=1) - self.host.memory.init() + self.host.memory.reinit() register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assertParamExists() - self.host.memory.init() + self.host.memory.reinit() register(params, 0, Const.APP_NAME) self.assertParamNotExists() - self.host.memory.init() + self.host.memory.reinit() register(params, 1, Const.APP_NAME) self.assertParamExists() # tests with security level 1 and several parameters being registered params = self._getParamXML("12", security_level=1) - self.host.memory.init() + self.host.memory.reinit() register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assertParamExists() self.assertParamExists("2") - self.host.memory.init() + self.host.memory.reinit() register(params, 0, Const.APP_NAME) self.assertParamNotExists() self.assertParamNotExists("2") - self.host.memory.init() + self.host.memory.reinit() register(params, 1, Const.APP_NAME) self.assertParamExists() self.assertParamExists("2") # tests with several parameters being registered in an existing category - self.host.memory.init() + self.host.memory.reinit() self.host.memory.updateParams(self._getParamXML("3")) register(self._getParamXML("12"), Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assertParamExists() self.assertParamExists("2") - self.host.memory.init() + self.host.memory.reinit() def test_paramsRegisterApp_getParams(self): # test retrieving the parameter for a specific frontend - self.host.memory.init() + self.host.memory.reinit() params = self._getParamXML(security_level=1) self.host.memory.paramsRegisterApp(params, 1, Const.APP_NAME) self._getParams(1, '').addCallback(self.assertParamExists_async) @@ -280,7 +280,7 @@ self._getParams(1, 'another_dummy_frontend').addCallback(self.assertParamNotExists_async) # the same with several parameters registered at the same time - self.host.memory.init() + self.host.memory.reinit() params = self._getParamXML('12', security_level=0) self.host.memory.paramsRegisterApp(params, 5, Const.APP_NAME) self._getParams(5, '').addCallback(self.assertParamExists_async) diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_misc_groupblog.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/test_plugin_misc_groupblog.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,418 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" Plugin groupblogs """ + +from constants import Const as C +from sat.test import helpers, helpers_plugins +from sat.plugins import plugin_misc_groupblog +from sat.plugins import plugin_xep_0060 +from sat.plugins import plugin_xep_0277 +from sat.plugins import plugin_xep_0163 +from sat.plugins import plugin_misc_text_syntaxes +from twisted.internet import defer +from twisted.words.protocols.jabber import jid + + +NS_PUBSUB = 'http://jabber.org/protocol/pubsub' + +DO_NOT_COUNT_COMMENTS = -1 + +SERVICE = u'pubsub.example.com' +PUBLISHER = u'test@example.org' +OTHER_PUBLISHER = u'other@xmpp.net' +NODE_ID = u'urn:xmpp:groupblog:{publisher}'.format(publisher=PUBLISHER) +OTHER_NODE_ID = u'urn:xmpp:groupblog:{publisher}'.format(publisher=OTHER_PUBLISHER) +ITEM_ID_1 = u'c745a688-9b02-11e3-a1a3-c0143dd4fe51' +COMMENT_ID_1 = u'd745a688-9b02-11e3-a1a3-c0143dd4fe52' +COMMENT_ID_2 = u'e745a688-9b02-11e3-a1a3-c0143dd4fe53' + + +def COMMENTS_NODE_ID(publisher=PUBLISHER): + return u'urn:xmpp:comments:_{id}__urn:xmpp:groupblog:{publisher}'.format(id=ITEM_ID_1, publisher=publisher) + + +def COMMENTS_NODE_URL(publisher=PUBLISHER): + return u'xmpp:{service}?node={node}'.format(service=SERVICE, id=ITEM_ID_1, + node=COMMENTS_NODE_ID(publisher).replace(':', '%3A').replace('@', '%40')) + + +def ITEM(publisher=PUBLISHER): + return u""" + + + The Uses of This World + {id} + 2003-12-12T17:47:23Z + 2003-12-12T17:47:23Z + + + {publisher} + + + + """.format(ns=NS_PUBSUB, id=ITEM_ID_1, publisher=publisher, comments_node_url=COMMENTS_NODE_URL(publisher)) + + +def COMMENT(id_=COMMENT_ID_1): + return u""" + + + The Uses of This World + {id} + 2003-12-12T17:47:23Z + 2003-12-12T17:47:23Z + + {publisher} + + + + """.format(ns=NS_PUBSUB, id=id_, publisher=PUBLISHER) + + +def ITEM_DATA(id_=ITEM_ID_1, count=0): + res = {'id': ITEM_ID_1, + 'type': 'main_item', + 'content': 'The Uses of This World', + 'author': PUBLISHER, + 'updated': '1071251243.0', + 'published': '1071251243.0', + 'service': SERVICE, + 'comments': COMMENTS_NODE_URL_1, + 'comments_service': SERVICE, + 'comments_node': COMMENTS_NODE_ID_1} + if count != DO_NOT_COUNT_COMMENTS: + res.update({'comments_count': unicode(count)}) + return res + + +def COMMENT_DATA(id_=COMMENT_ID_1): + return {'id': id_, + 'type': 'comment', + 'content': 'The Uses of This World', + 'author': PUBLISHER, + 'updated': '1071251243.0', + 'published': '1071251243.0', + 'service': SERVICE, + 'node': COMMENTS_NODE_ID_1, + 'verified_publisher': 'false'} + + +COMMENTS_NODE_ID_1 = COMMENTS_NODE_ID() +COMMENTS_NODE_ID_2 = COMMENTS_NODE_ID(OTHER_PUBLISHER) +COMMENTS_NODE_URL_1 = COMMENTS_NODE_URL() +COMMENTS_NODE_URL_2 = COMMENTS_NODE_URL(OTHER_PUBLISHER) +ITEM_1 = ITEM() +ITEM_2 = ITEM(OTHER_PUBLISHER) +COMMENT_1 = COMMENT(COMMENT_ID_1) +COMMENT_2 = COMMENT(COMMENT_ID_2) + + +def ITEM_DATA_1(count=0): + return ITEM_DATA(count=count) + +COMMENT_DATA_1 = COMMENT_DATA() +COMMENT_DATA_2 = COMMENT_DATA(COMMENT_ID_2) + + +class XEP_groupblogTest(helpers.SatTestCase): + + def setUp(self): + self.host = helpers.FakeSAT() + self.plugin = plugin_misc_groupblog.GroupBlog(self.host) + self.plugin._initialise = self._initialise + self.host.plugins['XEP-0060'] = plugin_xep_0060.XEP_0060(self.host) + self.host.plugins['XEP-0163'] = plugin_xep_0163.XEP_0163(self.host) + self.host.plugins['TEXT-SYNTAXES'] = plugin_misc_text_syntaxes.TextSyntaxes(self.host) + self.host.plugins['XEP-0277'] = plugin_xep_0277.XEP_0277(self.host) + self.__initialised = False + self._initialise(C.PROFILE[0]) + + def _initialise(self, profile_key): + profile = profile_key + client = self.host.getClient(profile) + if not self.__initialised: + client.item_access_pubsub = jid.JID(SERVICE) + xep_0060 = self.host.plugins['XEP-0060'] + xep_0060.clients[profile] = helpers_plugins.FakeSatPubSubClient(self.host, xep_0060) + xep_0060.clients[profile].parent = client + self.psclient = xep_0060.clients[profile] + helpers.FakeSAT.getDiscoItems = self.psclient.service_getDiscoItems + self.__initialised = True + return defer.succeed((profile, client)) + + def _addItem(self, profile, item, parent_node=None): + self.host.plugins['XEP-0060'].clients[profile]._addItem(item, parent_node) + + def test_sendGroupBlog(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.items(SERVICE, NODE_ID) + d.addCallback(lambda items: self.assertEqual(len(items), 0)) + d.addCallback(lambda dummy: self.plugin.sendGroupBlog('PUBLIC', [], 'test', {}, C.PROFILE[0])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, NODE_ID)) + return d.addCallback(lambda items: self.assertEqual(len(items), 1)) + + def test_deleteGroupBlog(self): + pub_data = (SERVICE, NODE_ID, ITEM_ID_1) + self.host.bridge.expectCall('personalEvent', C.JID_STR[0], + "MICROBLOG_DELETE", + {'type': 'main_item', 'id': ITEM_ID_1}, + C.PROFILE[0]) + + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.plugin.deleteGroupBlog(pub_data, COMMENTS_NODE_URL_1, profile_key=C.PROFILE[0])) + return d.addCallback(self.assertEqual, None) + + def test_updateGroupBlog(self): + pub_data = (SERVICE, NODE_ID, ITEM_ID_1) + new_text = u"silfu23RFWUP)IWNOEIOEFÖ" + + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.plugin.updateGroupBlog(pub_data, COMMENTS_NODE_URL_1, new_text, {}, profile_key=C.PROFILE[0])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, NODE_ID)) + return d.addCallback(lambda items: self.assertEqual(''.join(items[0].entry.title.children), new_text)) + + def test_sendGroupBlogComment(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.items(SERVICE, NODE_ID) + d.addCallback(lambda items: self.assertEqual(len(items), 0)) + d.addCallback(lambda dummy: self.plugin.sendGroupBlogComment(COMMENTS_NODE_URL_1, 'test', {}, profile_key=C.PROFILE[0])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_1)) + return d.addCallback(lambda items: self.assertEqual(len(items), 1)) + + def test_getGroupBlogs(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.plugin.getGroupBlogs(PUBLISHER, profile_key=C.PROFILE[0])) + result = ([ITEM_DATA_1()], {'count': '1', 'index': '0', 'first': ITEM_ID_1, 'last': ITEM_ID_1}) + return d.addCallback(self.assertEqual, result) + + def test_getGroupBlogsNoCount(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.plugin.getGroupBlogs(PUBLISHER, count_comments=False, profile_key=C.PROFILE[0])) + result = ([ITEM_DATA_1(DO_NOT_COUNT_COMMENTS)], {'count': '1', 'index': '0', 'first': ITEM_ID_1, 'last': ITEM_ID_1}) + return d.addCallback(self.assertEqual, result) + + def test_getGroupBlogsWithIDs(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.plugin.getGroupBlogs(PUBLISHER, [ITEM_ID_1], profile_key=C.PROFILE[0])) + result = ([ITEM_DATA_1()], {'count': '1', 'index': '0', 'first': ITEM_ID_1, 'last': ITEM_ID_1}) + return d.addCallback(self.assertEqual, result) + + def test_getGroupBlogsWithRSM(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.plugin.getGroupBlogs(PUBLISHER, rsm={'max': 1}, profile_key=C.PROFILE[0])) + result = ([ITEM_DATA_1()], {'count': '1', 'index': '0', 'first': ITEM_ID_1, 'last': ITEM_ID_1}) + return d.addCallback(self.assertEqual, result) + + def test_getGroupBlogsWithComments(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_1, [COMMENT_1])) + d.addCallback(lambda dummy: self.plugin.getGroupBlogsWithComments(PUBLISHER, [], profile_key=C.PROFILE[0])) + result = ([(ITEM_DATA_1(1), ([COMMENT_DATA_1], + {'count': '1', 'index': '0', 'first': COMMENT_ID_1, 'last': COMMENT_ID_1}))], + {'count': '1', 'index': '0', 'first': ITEM_ID_1, 'last': ITEM_ID_1}) + return d.addCallback(self.assertEqual, result) + + def test_getGroupBlogsWithComments2(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_1, [COMMENT_1, COMMENT_2])) + d.addCallback(lambda dummy: self.plugin.getGroupBlogsWithComments(PUBLISHER, [], profile_key=C.PROFILE[0])) + result = ([(ITEM_DATA_1(2), ([COMMENT_DATA_1, COMMENT_DATA_2], + {'count': '2', 'index': '0', 'first': COMMENT_ID_1, 'last': COMMENT_ID_2}))], + {'count': '1', 'index': '0', 'first': ITEM_ID_1, 'last': ITEM_ID_1}) + + return d.addCallback(self.assertEqual, result) + + def test_getGroupBlogsAtom(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.plugin.getGroupBlogsAtom(PUBLISHER, {'max': 1}, profile_key=C.PROFILE[0])) + + def cb(atom): + self.assertIsInstance(atom, unicode) + self.assertTrue(atom.startswith('')) + + return d.addCallback(cb) + + def test_getMassiveGroupBlogs(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.plugin.getMassiveGroupBlogs('JID', [jid.JID(PUBLISHER)], {'max': 1}, profile_key=C.PROFILE[0])) + result = {PUBLISHER: ([ITEM_DATA_1()], {'count': '1', 'index': '0', 'first': ITEM_ID_1, 'last': ITEM_ID_1})} + + def clean(res): + del self.host.plugins['XEP-0060'].node_cache[C.PROFILE[0] + '@found@' + SERVICE] + return res + + d.addCallback(clean) + d.addCallback(self.assertEqual, result) + + def test_getMassiveGroupBlogsWithComments(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_1, [COMMENT_1, COMMENT_2])) + d.addCallback(lambda dummy: self.plugin.getMassiveGroupBlogs('JID', [jid.JID(PUBLISHER)], {'max': 1}, profile_key=C.PROFILE[0])) + result = {PUBLISHER: ([ITEM_DATA_1(2)], {'count': '1', 'index': '0', 'first': ITEM_ID_1, 'last': ITEM_ID_1})} + + def clean(res): + del self.host.plugins['XEP-0060'].node_cache[C.PROFILE[0] + '@found@' + SERVICE] + return res + + d.addCallback(clean) + d.addCallback(self.assertEqual, result) + + def test_getGroupBlogComments(self): + self._initialise(C.PROFILE[0]) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_1, [COMMENT_1])) + d.addCallback(lambda dummy: self.plugin.getGroupBlogComments(SERVICE, COMMENTS_NODE_ID_1, {'max': 1}, profile_key=C.PROFILE[0])) + result = ([COMMENT_DATA_1], {'count': '1', 'index': '0', 'first': COMMENT_ID_1, 'last': COMMENT_ID_1}) + return d.addCallback(self.assertEqual, result) + + def test_subscribeGroupBlog(self): + self._initialise(C.PROFILE[0]) + d = self.plugin.subscribeGroupBlog(PUBLISHER, profile_key=C.PROFILE[0]) + return d.addCallback(self.assertEqual, None) + + def test_massiveSubscribeGroupBlogs(self): + self._initialise(C.PROFILE[0]) + d = self.plugin.massiveSubscribeGroupBlogs('JID', [jid.JID(PUBLISHER)], profile_key=C.PROFILE[0]) + + def clean(res): + del self.host.plugins['XEP-0060'].node_cache[C.PROFILE[0] + '@found@' + SERVICE] + del self.host.plugins['XEP-0060'].node_cache[C.PROFILE[0] + '@subscriptions@' + SERVICE] + return res + + d.addCallback(clean) + return d.addCallback(self.assertEqual, None) + + def test_deleteAllGroupBlogs(self): + """Delete our main node and associated comments node""" + self._initialise(C.PROFILE[0]) + self.host.profiles[C.PROFILE[0]].roster.addItem(jid.JID(OTHER_PUBLISHER)) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_1, [COMMENT_1, COMMENT_2])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_1)) + d.addCallback(lambda items: self.assertEqual(len(items), 2)) + + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, OTHER_NODE_ID, [ITEM_2])) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_2, [COMMENT_1, COMMENT_2])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, OTHER_NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_2)) + d.addCallback(lambda items: self.assertEqual(len(items), 2)) + + def clean(res): + del self.host.plugins['XEP-0060'].node_cache[C.PROFILE[0] + '@found@' + SERVICE] + return res + + d.addCallback(lambda dummy: self.plugin.deleteAllGroupBlogs(C.PROFILE[0])) + d.addCallback(clean) + + d.addCallback(lambda dummy: self.psclient.items(SERVICE, NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 0)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_1)) + d.addCallback(lambda items: self.assertEqual(len(items), 0)) + + d.addCallback(lambda dummy: self.psclient.items(SERVICE, OTHER_NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_2)) + d.addCallback(lambda items: self.assertEqual(len(items), 2)) + return d + + def test_deleteAllGroupBlogsComments(self): + """Delete the comments we posted on other node's""" + self._initialise(C.PROFILE[0]) + self.host.profiles[C.PROFILE[0]].roster.addItem(jid.JID(OTHER_PUBLISHER)) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_1, [COMMENT_1, COMMENT_2])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_1)) + d.addCallback(lambda items: self.assertEqual(len(items), 2)) + + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, OTHER_NODE_ID, [ITEM_2])) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_2, [COMMENT_1, COMMENT_2])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, OTHER_NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_2)) + d.addCallback(lambda items: self.assertEqual(len(items), 2)) + + def clean(res): + del self.host.plugins['XEP-0060'].node_cache[C.PROFILE[0] + '@found@' + SERVICE] + return res + + d.addCallback(lambda dummy: self.plugin.deleteAllGroupBlogsComments(C.PROFILE[0])) + d.addCallback(clean) + + d.addCallback(lambda dummy: self.psclient.items(SERVICE, NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_1)) + d.addCallback(lambda items: self.assertEqual(len(items), 2)) + + d.addCallback(lambda dummy: self.psclient.items(SERVICE, OTHER_NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_2)) + d.addCallback(lambda items: self.assertEqual(len(items), 0)) + return d + + def test_deleteAllGroupBlogsAndComments(self): + self._initialise(C.PROFILE[0]) + self.host.profiles[C.PROFILE[0]].roster.addItem(jid.JID(OTHER_PUBLISHER)) + d = self.psclient.publish(SERVICE, NODE_ID, [ITEM_1]) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_1, [COMMENT_1, COMMENT_2])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_1)) + d.addCallback(lambda items: self.assertEqual(len(items), 2)) + + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, OTHER_NODE_ID, [ITEM_2])) + d.addCallback(lambda dummy: self.psclient.publish(SERVICE, COMMENTS_NODE_ID_2, [COMMENT_1, COMMENT_2])) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, OTHER_NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_2)) + d.addCallback(lambda items: self.assertEqual(len(items), 2)) + + def clean(res): + del self.host.plugins['XEP-0060'].node_cache[C.PROFILE[0] + '@found@' + SERVICE] + return res + + d.addCallback(lambda dummy: self.plugin.deleteAllGroupBlogsAndComments(C.PROFILE[0])) + d.addCallback(clean) + + d.addCallback(lambda dummy: self.psclient.items(SERVICE, NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 0)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_1)) + d.addCallback(lambda items: self.assertEqual(len(items), 0)) + + d.addCallback(lambda dummy: self.psclient.items(SERVICE, OTHER_NODE_ID)) + d.addCallback(lambda items: self.assertEqual(len(items), 1)) + d.addCallback(lambda dummy: self.psclient.items(SERVICE, COMMENTS_NODE_ID_2)) + d.addCallback(lambda items: self.assertEqual(len(items), 0)) + return d diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_misc_radiocol.py --- a/src/test/test_plugin_misc_radiocol.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/test/test_plugin_misc_radiocol.py Wed Mar 18 10:52:28 2015 +0100 @@ -55,8 +55,8 @@ def setUp(self): self.host = helpers.FakeSAT() - def init(self): - self.host.init() + def reinit(self): + self.host.reinit() self.host.plugins['ROOM-GAME'] = plugin_room_game.RoomGame(self.host) self.plugin = plugin.Radiocol(self.host) # must be init after ROOM-GAME self.plugin.testing = True @@ -109,7 +109,7 @@ """Check if the message "song_rejected" has been sent by the referee and process the command with the profile of the uploader @param profile_index: uploader's profile""" - sent = self.host.getSentMessageRaw(0) + sent = self.host.getSentMessage(0) content = "" self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID_S + '/' + self.plugin_0045.getNick(0, profile_index), 'normal', content)) self._roomGameCmd(sent, ['radiocolSongRejected', ROOM_JID_S, 'Too many songs in queue']) @@ -117,7 +117,7 @@ def _noUploadCb(self): """Check if the message "no_upload" has been sent by the referee and process the command with the profiles of each room users""" - sent = self.host.getSentMessageRaw(0) + sent = self.host.getSentMessage(0) content = "" self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID_S, 'groupchat', content)) self._roomGameCmd(sent, ['radiocolNoUpload', ROOM_JID_S]) @@ -125,7 +125,7 @@ def _uploadOkCb(self): """Check if the message "upload_ok" has been sent by the referee and process the command with the profiles of each room users""" - sent = self.host.getSentMessageRaw(0) + sent = self.host.getSentMessage(0) content = "" self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID_S, 'groupchat', content)) self._roomGameCmd(sent, ['radiocolUploadOk', ROOM_JID_S]) @@ -136,7 +136,7 @@ @param attrs: information dict about the song @param profile_index: profile index of the uploader """ - sent = self.host.getSentMessageRaw(0) + sent = self.host.getSentMessage(0) attrs['sender'] = self.plugin_0045.getNick(0, profile_index) radiocol_elt = domish.generateElementsNamed(sent.elements(), 'radiocol').next() preload_elt = domish.generateElementsNamed(radiocol_elt.elements(), 'preload').next() @@ -150,7 +150,7 @@ def _playNextSongCb(self): """Check if the message "play" has been sent by the referee and process the command with the profiles of each room users""" - sent = self.host.getSentMessageRaw(0) + sent = self.host.getSentMessage(0) filename = self.playlist.pop(0) content = "" % filename self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID_S, 'groupchat', content)) @@ -181,7 +181,7 @@ self.assertEqual(game_data['to_delete'][attrs['filename']], filepath) content = "" % " ".join(["%s='%s'" % (attr, attrs[attr]) for attr in attrs]) - sent = self.host.getSentMessageRaw(profile_index) + sent = self.host.getSentMessage(profile_index) self.assertEqualXML(sent.toXml(), self._expectedMessage(REFEREE_FULL, 'normal', content)) reject_song = len(game_data['queue']) >= plugin.QUEUE_LIMIT @@ -240,7 +240,7 @@ """ for nick in sync_data: expected = self._expectedMessage(ROOM_JID_S + '/' + nick, 'normal', sync_data[nick]) - sent = self.host.getSentMessageRaw(0) + sent = self.host.getSentMessage(0) self.assertEqualXML(sent.toXml(), expected) for elt in sync_data[nick]: if elt.name == 'preload': @@ -273,7 +273,7 @@ # Check that the message "players" has been sent by the referee expected = self._expectedMessage(to_jid, type_, self._buildPlayers(nicks)) - sent = self.host.getSentMessageRaw(0) + sent = self.host.getSentMessage(0) self.assertEqualXML(sent.toXml(), expected) # Process the command with the profiles of each room users @@ -332,14 +332,14 @@ d.addCallbacks(eb, eb) def test_init(self): - self.init() + self.reinit() self.assertEqual(self.plugin.invite_mode, self.plugin.FROM_PLAYERS) self.assertEqual(self.plugin.wait_mode, self.plugin.FOR_NONE) self.assertEqual(self.plugin.join_mode, self.plugin.INVITED) self.assertEqual(self.plugin.ready_mode, self.plugin.FORCE) def test_game(self): - self.init() + self.reinit() # create game self.plugin.prepareRoom(OTHER_PLAYERS, ROOM_JID_S, PROFILE) @@ -347,7 +347,7 @@ room = self.plugin_0045.getRoom(0, 0) nicks = [self.plugin_0045.getNick(0, 0)] - sent = self.host.getSentMessageRaw(0) + sent = self.host.getSentMessage(0) self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID_S, 'groupchat', self._buildPlayers(nicks))) self._roomGameCmd(sent, ['radiocolStarted', ROOM_JID_S, REFEREE_FULL, nicks, [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT]]) diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_misc_room_game.py --- a/src/test/test_plugin_misc_room_game.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/test/test_plugin_misc_room_game.py Wed Mar 18 10:52:28 2015 +0100 @@ -54,8 +54,8 @@ def setUp(self): self.host = helpers.FakeSAT() - def init(self, game_init={}, player_init={}): - self.host.init() + def reinit(self, game_init={}, player_init={}): + self.host.reinit() self.plugin = plugin.RoomGame(self.host) self.plugin._init_(self.host, PLUGIN_INFO, (NAMESERVICE, TAG), game_init, player_init) self.plugin_0045 = self.host.plugins['XEP-0045'] = helpers_plugins.FakeXEP_0045(self.host) @@ -83,20 +83,20 @@ return "<%s xmlns='%s'>%s" % (to, type_, TAG, NAMESERVICE, content) def test_createOrInvite_solo(self): - self.init() + self.reinit() self.plugin_0045.joinRoom(0, 0) self.plugin._createOrInvite(self.plugin_0045.getRoom(0, 0), [], Const.PROFILE[0]) self.assertTrue(self.plugin._gameExists(ROOM_JID_S, True)) def test_createOrInvite_multi_not_waiting(self): - self.init() + self.reinit() self.plugin_0045.joinRoom(0, 0) other_players = [Const.JID_STR[1], Const.JID_STR[2]] self.plugin._createOrInvite(self.plugin_0045.getRoom(0, 0), other_players, Const.PROFILE[0]) self.assertTrue(self.plugin._gameExists(ROOM_JID_S, True)) def test_createOrInvite_multi_waiting(self): - self.init(player_init={'score': 0}) + self.reinit(player_init={'score': 0}) self.plugin_0045.joinRoom(0, 0) other_players = [Const.JID_STR[1], Const.JID_STR[2]] self.plugin._createOrInvite(self.plugin_0045.getRoom(0, 0), other_players, Const.PROFILE[0]) @@ -104,13 +104,13 @@ self.assertFalse(self.plugin._gameExists(ROOM_JID_S, True)) def test_initGame(self): - self.init() + self.reinit() self.initGame(0, 0) self.assertTrue(self.plugin.isReferee(ROOM_JID_S, Const.JID[0].user)) self.assertEqual([], self.plugin.games[ROOM_JID_S]['players']) def test_checkJoinAuth(self): - self.init() + self.reinit() check = lambda value: getattr(self, "assert%s" % value)(self.plugin._checkJoinAuth(ROOM_JID_S, Const.JID_STR[0], Const.JID[0].user)) check(False) # to test the "invited" mode, the referee must be different than the user to test @@ -127,7 +127,7 @@ check(True) def test_updatePlayers(self): - self.init() + self.reinit() self.initGame(0, 0) self.assertEqual(self.plugin.games[ROOM_JID_S]['players'], []) self.plugin._updatePlayers(ROOM_JID_S, [], True, Const.PROFILE[0]) @@ -140,27 +140,27 @@ self.assertEqual(self.plugin.games[ROOM_JID_S]['players'], ["user1", "user2", "user3"]) def test_synchronizeRoom(self): - self.init() + self.reinit() self.initGame(0, 0) self.plugin._synchronizeRoom(ROOM_JID_S, [Const.MUC[0]], Const.PROFILE[0]) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "players", [])) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "players", [])) self.plugin.games[ROOM_JID_S]['players'].append("test1") self.plugin._synchronizeRoom(ROOM_JID_S, [Const.MUC[0]], Const.PROFILE[0]) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "players", ["test1"])) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "players", ["test1"])) self.plugin.games[ROOM_JID_S]['started'] = True self.plugin.games[ROOM_JID_S]['players'].append("test2") self.plugin._synchronizeRoom(ROOM_JID_S, [Const.MUC[0]], Const.PROFILE[0]) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", ["test1", "test2"])) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", ["test1", "test2"])) self.plugin.games[ROOM_JID_S]['players'].append("test3") self.plugin.games[ROOM_JID_S]['players'].append("test4") user1 = JID(ROOM_JID_S + "/" + Const.JID[0].user) user2 = JID(ROOM_JID_S + "/" + Const.JID[1].user) self.plugin._synchronizeRoom(ROOM_JID_S, [user1, user2], Const.PROFILE[0]) - self.assertEqualXML(self.host.getSentMessage(0), self._expectedMessage(user1.full(), "normal", "started", ["test1", "test2", "test3", "test4"])) - self.assertEqualXML(self.host.getSentMessage(0), self._expectedMessage(user2.full(), "normal", "started", ["test1", "test2", "test3", "test4"])) + self.assertEqualXML(self.host.getSentMessageXml(0), self._expectedMessage(user1.full(), "normal", "started", ["test1", "test2", "test3", "test4"])) + self.assertEqualXML(self.host.getSentMessageXml(0), self._expectedMessage(user2.full(), "normal", "started", ["test1", "test2", "test3", "test4"])) def test_invitePlayers(self): - self.init() + self.reinit() self.initGame(0, 0) self.plugin_0045.joinRoom(0, 1) self.assertEqual(self.plugin.invitations[ROOM_JID_S], []) @@ -181,7 +181,7 @@ nick = self.plugin_0045.getNick(0, index) getattr(self, "assert%s" % value)(self.plugin._checkInviteAuth(ROOM_JID_S, nick)) - self.init() + self.reinit() for mode in [self.plugin.FROM_ALL, self.plugin.FROM_NONE, self.plugin.FROM_REFEREE, self.plugin.FROM_PLAYERS]: self.plugin.invite_mode = mode @@ -205,13 +205,13 @@ check(False, 2) def test_isReferee(self): - self.init() + self.reinit() self.initGame(0, 0) self.assertTrue(self.plugin.isReferee(ROOM_JID_S, self.plugin_0045.getNick(0, 0))) self.assertFalse(self.plugin.isReferee(ROOM_JID_S, self.plugin_0045.getNick(0, 1))) def test_isPlayer(self): - self.init() + self.reinit() self.initGame(0, 0) self.assertTrue(self.plugin.isPlayer(ROOM_JID_S, self.plugin_0045.getNick(0, 0))) user_nick = self.plugin_0045.joinRoom(0, 1) @@ -225,7 +225,7 @@ room = self.plugin_0045.getRoom(0, 0) self.assertEqual((value, confirmed, rest), self.plugin._checkWaitAuth(room, other_players)) - self.init() + self.reinit() self.initGame(0, 0) other_players = [Const.JID[1], Const.JID[3]] self.plugin.wait_mode = self.plugin.FOR_NONE @@ -251,7 +251,7 @@ self.plugin_0045.getNickOfUser(0, 2, 0)], []) def test_prepareRoom_trivial(self): - self.init() + self.reinit() other_players = [] self.plugin.prepareRoom(other_players, ROOM_JID_S, PROFILE) self.assertTrue(self.plugin._gameExists(ROOM_JID_S, True)) @@ -263,7 +263,7 @@ self.assertEqual((False, True), self.plugin._checkCreateGameAndInit(ROOM_JID_S, PROFILE)) def test_prepareRoom_invite(self): - self.init() + self.reinit() other_players = [Const.JID_STR[1], Const.JID_STR[2]] self.plugin.prepareRoom(other_players, ROOM_JID_S, PROFILE) room = self.plugin_0045.getRoom(0, 0) @@ -285,7 +285,7 @@ self.assertEqual((False, False), self.plugin._checkCreateGameAndInit(ROOM_JID_S, Const.PROFILE[1])) def test_prepareRoom_score1(self): - self.init(player_init={'score': 0}) + self.reinit(player_init={'score': 0}) other_players = [Const.JID_STR[1], Const.JID_STR[2]] self.plugin.prepareRoom(other_players, ROOM_JID_S, PROFILE) room = self.plugin_0045.getRoom(0, 0) @@ -311,7 +311,7 @@ self.assertEqual((True, False), self.plugin._checkCreateGameAndInit(ROOM_JID_S, Const.PROFILE[0])) def test_prepareRoom_score2(self): - self.init(player_init={'score': 0}) + self.reinit(player_init={'score': 0}) other_players = [Const.JID_STR[1], Const.JID_STR[4]] self.plugin.prepareRoom(other_players, ROOM_JID_S, PROFILE) room = self.plugin_0045.getRoom(0, 0) @@ -324,56 +324,56 @@ self.assertEqual((False, True), self.plugin._checkCreateGameAndInit(ROOM_JID_S, PROFILE)) def test_userJoinedTrigger(self): - self.init(player_init={"xxx": "xyz"}) + self.reinit(player_init={"xxx": "xyz"}) other_players = [Const.JID_STR[1], Const.JID_STR[3]] self.plugin.prepareRoom(other_players, ROOM_JID_S, PROFILE) nicks = [self.plugin_0045.getNick(0, 0)] - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "players", nicks)) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "players", nicks)) self.assertTrue(len(self.plugin.invitations[ROOM_JID_S]) == 1) # wrong profile user_nick = self.plugin_0045.joinRoom(0, 1) room = self.plugin_0045.getRoom(0, 1) self.plugin.userJoinedTrigger(room, User(user_nick, Const.JID[1]), OTHER_PROFILE) - self.assertEqual(self.host.getSentMessageRaw(0), None) # no new message has been sent + self.assertEqual(self.host.getSentMessage(0), None) # no new message has been sent self.assertFalse(self.plugin._gameExists(ROOM_JID_S, True)) # game not started # referee profile, user is allowed, wait for one more room = self.plugin_0045.getRoom(0, 0) self.plugin.userJoinedTrigger(room, User(user_nick, Const.JID[1]), PROFILE) nicks.append(user_nick) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "players", nicks)) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "players", nicks)) self.assertFalse(self.plugin._gameExists(ROOM_JID_S, True)) # game not started # referee profile, user is not allowed user_nick = self.plugin_0045.joinRoom(0, 4) self.plugin.userJoinedTrigger(room, User(user_nick, Const.JID[4]), PROFILE) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S + '/' + user_nick, "normal", "players", nicks)) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S + '/' + user_nick, "normal", "players", nicks)) self.assertFalse(self.plugin._gameExists(ROOM_JID_S, True)) # game not started # referee profile, user is allowed, everybody here user_nick = self.plugin_0045.joinRoom(0, 3) self.plugin.userJoinedTrigger(room, User(user_nick, Const.JID[3]), PROFILE) nicks.append(user_nick) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", nicks)) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", nicks)) self.assertTrue(self.plugin._gameExists(ROOM_JID_S, True)) # game started self.assertTrue(len(self.plugin.invitations[ROOM_JID_S]) == 0) # wait for none - self.init() + self.reinit() self.plugin.prepareRoom(other_players, ROOM_JID_S, PROFILE) - self.assertNotEqual(self.host.getSentMessageRaw(0), None) # init messages + self.assertNotEqual(self.host.getSentMessage(0), None) # init messages room = self.plugin_0045.getRoom(0, 0) nicks = [self.plugin_0045.getNick(0, 0)] user_nick = self.plugin_0045.joinRoom(0, 3) self.plugin.userJoinedTrigger(room, User(user_nick, Const.JID[3]), PROFILE) nicks.append(user_nick) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", nicks)) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", nicks)) self.assertTrue(self.plugin._gameExists(ROOM_JID_S, True)) def test_userLeftTrigger(self): - self.init(player_init={"xxx": "xyz"}) + self.reinit(player_init={"xxx": "xyz"}) other_players = [Const.JID_STR[1], Const.JID_STR[3], Const.JID_STR[4]] self.plugin.prepareRoom(other_players, ROOM_JID_S, PROFILE) room = self.plugin_0045.getRoom(0, 0) @@ -433,7 +433,7 @@ self.assertTrue(len(self.plugin.invitations[ROOM_JID_S]) == 0) def test__checkCreateGameAndInit(self): - self.init() + self.reinit() self.assertEqual((False, False), self.plugin._checkCreateGameAndInit(ROOM_JID_S, PROFILE)) # print internal error nick = self.plugin_0045.joinRoom(0, 0) @@ -453,7 +453,7 @@ def test_createGame(self): - self.init(player_init={"xxx": "xyz"}) + self.reinit(player_init={"xxx": "xyz"}) nicks = [] for i in [0, 1, 3, 4]: nicks.append(self.plugin_0045.joinRoom(0, i)) @@ -462,7 +462,7 @@ self.plugin.createGame(ROOM_JID_S, nicks, PROFILE) self.assertTrue(self.plugin._gameExists(ROOM_JID_S, True)) self.assertEqual(self.plugin.games[ROOM_JID_S]['players'], nicks) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", nicks)) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", nicks)) for nick in nicks: self.assertEqual('init', self.plugin.games[ROOM_JID_S]['status'][nick]) self.assertEqual(self.plugin.player_init, self.plugin.games[ROOM_JID_S]['players_data'][nick]) @@ -472,16 +472,16 @@ self.assertEqual(nick, self.plugin.games[ROOM_JID_S]['players_data'][nick]['xxx']) # game exists, current profile is referee - self.init(player_init={"xxx": "xyz"}) + self.reinit(player_init={"xxx": "xyz"}) self.initGame(0, 0) self.plugin.games[ROOM_JID_S]['started'] = True self.plugin.createGame(ROOM_JID_S, nicks, PROFILE) - self.assertEqual(self.host.getSentMessage(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", nicks)) + self.assertEqual(self.host.getSentMessageXml(0), self._expectedMessage(ROOM_JID_S, "groupchat", "started", nicks)) # game exists, current profile is not referee - self.init(player_init={"xxx": "xyz"}) + self.reinit(player_init={"xxx": "xyz"}) self.initGame(0, 0) self.plugin.games[ROOM_JID_S]['started'] = True self.plugin_0045.joinRoom(0, 1) self.plugin.createGame(ROOM_JID_S, nicks, OTHER_PROFILE) - self.assertEqual(self.host.getSentMessageRaw(0), None) # no sync message has been sent by other_profile + self.assertEqual(self.host.getSentMessage(0), None) # no sync message has been sent by other_profile diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_xep_0033.py --- a/src/test/test_plugin_xep_0033.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/test/test_plugin_xep_0033.py Wed Mar 18 10:52:28 2015 +0100 @@ -25,19 +25,21 @@ from sat.plugins import plugin_xep_0033 as plugin from sat.core.exceptions import CancelError from sat.core.log import getLogger -from copy import deepcopy from twisted.internet import defer from wokkel.generic import parseXml from twisted.words.protocols.jabber.jid import JID from logging import ERROR -PROFILE = Const.PROFILE[0] +PROFILE_INDEX = 0 +PROFILE = Const.PROFILE[PROFILE_INDEX] JID_STR_FROM = Const.JID_STR[1] JID_STR_TO = Const.PROFILE_DICT[PROFILE].host JID_STR_X_TO = Const.JID_STR[0] JID_STR_X_CC = Const.JID_STR[1] JID_STR_X_BCC = Const.JID_STR[2] +ADDRS = ('to', JID_STR_X_TO, 'cc', JID_STR_X_CC, 'bcc', JID_STR_X_BCC) + class XEP_0033Test(helpers.SatTestCase): @@ -46,7 +48,7 @@ self.plugin = plugin.XEP_0033(self.host) def test_messageReceived(self): - self.host.memory.init() + self.host.memory.reinit() xml = u""" test @@ -68,115 +70,122 @@ self.assertEqual(data['extra']['addresses'], '%s:%s\n%s:%s\n%s:%s\n' % expected, msg) treatments.addCallback(cb) - treatments.callback(data) + return treatments.callback(data) - def test_sendMessageTrigger(self): + def _get_mess_data(self): mess_data = {"to": JID(JID_STR_TO), "type": "chat", "message": "content", "extra": {} } - addresses = ('to', JID_STR_X_TO, 'cc', JID_STR_X_CC, 'bcc', JID_STR_X_BCC) - mess_data["extra"]["address"] = '%s:%s\n%s:%s\n%s:%s\n' % addresses + mess_data["extra"]["address"] = '%s:%s\n%s:%s\n%s:%s\n' % ADDRS original_stanza = u""" content """ % (JID_STR_FROM, JID_STR_TO) mess_data['xml'] = parseXml(original_stanza.encode("utf-8")) - expected = deepcopy(mess_data['xml']) + return mess_data + + def _assertAddresses(self, mess_data): + """The mess_data that we got here has been modified by self.plugin.sendMessageTrigger, + check that the addresses element has been added to the stanza.""" + expected = self._get_mess_data()['xml'] addresses_extra = """
- """ % addresses + """ % ADDRS addresses_element = parseXml(addresses_extra.encode('utf-8')) expected.addChild(addresses_element) + self.assertEqualXML(mess_data['xml'].toXml().encode("utf-8"), expected.toXml().encode("utf-8")) - def assertAddresses(mess_data): - """The mess_data that we got here has been modified by self.plugin.sendMessageTrigger, - check that the addresses element has been added to the stanza.""" - self.assertEqualXML(mess_data['xml'].toXml().encode("utf-8"), expected.toXml().encode("utf-8")) - - def sendMessageErrback(failure, exception_class): - """If the failure does encapsulate the expected exception, it will be silently - trapped, otherwise it will be re-raised and will make the test fail""" - failure.trap(exception_class) + def _checkSentAndStored(self): + """Check that all the recipients got their messages and that the history has been filled. + /!\ see the comments in XEP_0033.sendAndStoreMessage""" + sent = [] + stored = [] + d_list = [] - def checkSentAndStored(): - """Check that all the recipients got their messages and that the history has been filled. - /!\ see the comments in XEP_0033.sendAndStoreMessage""" - sent = [] - stored = [] - for to_s in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): - to_jid = JID(to_s) - host = JID(to_jid.host) - logger = getLogger() - level = logger.getEffectiveLevel() - logger.setLevel(ERROR) # remove log.warning pollution - entities = yield self.host.findFeaturesSet([plugin.NS_ADDRESS], jid_=host, profile_key=PROFILE) - if host in entities: - if host not in sent: # send the message to the entity offering the feature - sent.append(host) - stored.append(host) - stored.append(to_jid) # store in history for each recipient - else: # feature not supported, use normal behavior - sent.append(to_jid) - stored.append(to_jid) - logger.setLevel(level) - msg = "/!\ see the comments in XEP_0033.sendAndStoreMessage" - self.assertEqualUnsortedList(self.host.sent_messages, sent, msg) - self.assertEqualUnsortedList(self.host.stored_messages, stored, msg) + def cb(entities, to_jid, logger, level): + if host in entities: + if host not in sent: # send the message to the entity offering the feature + sent.append(host) + stored.append(host) + stored.append(to_jid) # store in history for each recipient + else: # feature not supported, use normal behavior + sent.append(to_jid) + stored.append(to_jid) + logger.setLevel(level) - def trigger(data, exception): - """Execute self.plugin.sendMessageTrigger with a different logging - level to not pollute the output, then check that the plugin did its - job. It should abort sending the message or add the extended - addressing information to the stanza. - @param data: the data to be processed by self.plugin.sendMessageTrigger - @param exception: CancelError - """ + for to_s in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): + to_jid = JID(to_s) + host = JID(to_jid.host) logger = getLogger() level = logger.getEffectiveLevel() logger.setLevel(ERROR) # remove log.warning pollution - pre_treatments = defer.Deferred() - post_treatments = defer.Deferred() - self.plugin.sendMessageTrigger(data, pre_treatments, post_treatments, PROFILE) - post_treatments.callback(data) - logger.setLevel(level) - post_treatments.addCallbacks(assertAddresses, lambda failure: sendMessageErrback(failure, exception)) + d = self.host.findFeaturesSet([plugin.NS_ADDRESS], jid_=host, profile_key=PROFILE) + d.addCallback(cb, to_jid, logger, level) + d_list.append(d) - # feature is not supported, abort the message - self.host.memory.init() - data = deepcopy(mess_data) - trigger(data, CancelError) + def cb_list(dummy): + msg = "/!\ see the comments in XEP_0033.sendAndStoreMessage" + sent_recipients = [JID(elt['to']) for elt in self.host.getSentMessages(PROFILE_INDEX)] + self.assertEqualUnsortedList(sent_recipients, sent, msg) + self.assertEqualUnsortedList(self.host.stored_messages, stored, msg) + + return defer.DeferredList(d_list).addCallback(cb_list) + def _trigger(self, data): + """Execute self.plugin.sendMessageTrigger with a different logging + level to not pollute the output, then check that the plugin did its + job. It should abort sending the message or add the extended + addressing information to the stanza. + @param data: the data to be processed by self.plugin.sendMessageTrigger + """ + logger = getLogger() + level = logger.getEffectiveLevel() + logger.setLevel(ERROR) # remove log.warning pollution + pre_treatments = defer.Deferred() + post_treatments = defer.Deferred() + self.plugin.sendMessageTrigger(data, pre_treatments, post_treatments, PROFILE) + post_treatments.callback(data) + logger.setLevel(level) + post_treatments.addCallbacks(self._assertAddresses, lambda failure: failure.trap(CancelError)) + return post_treatments + + def test_sendMessageTriggerFeatureNotSupported(self): + # feature is not supported, abort the message + self.host.memory.reinit() + data = self._get_mess_data() + return self._trigger(data) + + def test_sendMessageTriggerFeatureSupported(self): # feature is supported by the main target server - self.host.init() - self.host.memory.init() + self.host.reinit() self.host.addFeature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) - data = deepcopy(mess_data) - trigger(data, CancelError) - checkSentAndStored() + data = self._get_mess_data() + d = self._trigger(data) + return d.addCallback(lambda dummy: self._checkSentAndStored()) + def test_sendMessageTriggerFeatureFullySupported(self): # feature is supported by all target servers - self.host.init() - self.host.memory.init() + self.host.reinit() self.host.addFeature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): self.host.addFeature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE) - data = deepcopy(mess_data) - trigger(data, CancelError) - checkSentAndStored() + data = self._get_mess_data() + d = self._trigger(data) + return d.addCallback(lambda dummy: self._checkSentAndStored()) + def test_sendMessageTriggerFixWrongEntity(self): # check that a wrong recipient entity is fixed by the backend - self.host.init() - self.host.memory.init() + self.host.reinit() self.host.addFeature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): self.host.addFeature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE) - data = deepcopy(mess_data) + data = self._get_mess_data() data["to"] = JID(JID_STR_X_TO) - trigger(data, CancelError) - checkSentAndStored() + d = self._trigger(data) + return d.addCallback(lambda dummy: self._checkSentAndStored()) diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_xep_0085.py --- a/src/test/test_plugin_xep_0085.py Wed Mar 18 10:39:22 2015 +0100 +++ b/src/test/test_plugin_xep_0085.py Wed Mar 18 10:52:28 2015 +0100 @@ -27,7 +27,6 @@ from copy import deepcopy from twisted.internet import defer from wokkel.generic import parseXml -from twisted.words.protocols.jabber.jid import JID class XEP_0085Test(helpers.SatTestCase): @@ -35,10 +34,9 @@ def setUp(self): self.host = helpers.FakeSAT() self.plugin = plugin.XEP_0085(self.host) + self.host.memory.setParam(plugin.PARAM_NAME, True, plugin.PARAM_KEY, C.NO_SECURITY_LIMIT, Const.PROFILE[0]) def test_messageReceived(self): - self.host.memory.init() - self.host.memory.setParam(plugin.PARAM_NAME, True, plugin.PARAM_KEY, C.NO_SECURITY_LIMIT, Const.PROFILE[0]) for state in plugin.CHAT_STATES: xml = u""" @@ -51,11 +49,15 @@ state, plugin.NS_CHAT_STATES) stanza = parseXml(xml.encode("utf-8")) self.host.bridge.expectCall("chatStateReceived", Const.JID_STR[1], state, Const.PROFILE[0]) - self.plugin.messageReceivedTrigger(stanza, defer.Deferred(), Const.PROFILE[0]) + self.plugin.messageReceivedTrigger(stanza, None, Const.PROFILE[0]) def test_sendMessageTrigger(self): - self.host.memory.init() - self.host.memory.setParam(plugin.PARAM_NAME, True, plugin.PARAM_KEY, C.NO_SECURITY_LIMIT, Const.PROFILE[0]) + def cb(data): + xml = data['xml'].toXml().encode("utf-8") + self.assertEqualXML(xml, expected.toXml().encode("utf-8")) + + d_list = [] + for state in plugin.CHAT_STATES: mess_data = {"to": Const.JID[0], "type": "chat", @@ -70,9 +72,14 @@ mess_data['xml'] = parseXml(stanza.encode("utf-8")) expected = deepcopy(mess_data['xml']) expected.addElement(state, plugin.NS_CHAT_STATES) - treatments = defer.Deferred() - self.plugin.sendMessageTrigger(mess_data, defer.Deferred(), treatments, Const.PROFILE[0]) - xml = treatments.callbacks[0][0][0](mess_data) - # cancel the timer to not block the process + post_treatments = defer.Deferred() + self.plugin.sendMessageTrigger(mess_data, None, post_treatments, Const.PROFILE[0]) + + post_treatments.addCallback(cb) + post_treatments.callback(mess_data) + d_list.append(post_treatments) + + def cb_list(dummy): # cancel the timer to not block the process self.plugin.map[Const.PROFILE[0]][Const.JID[0]].timer.cancel() - self.assertEqualXML(xml['xml'].toXml().encode("utf-8"), expected.toXml().encode("utf-8")) + + return defer.DeferredList(d_list).addCallback(cb_list) diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_xep_0203.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/test_plugin_xep_0203.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,65 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" Plugin XEP-0203 """ + +from sat.test import helpers +from sat.plugins.plugin_xep_0203 import XEP_0203 +from twisted.words.xish import domish +from twisted.words.protocols.jabber.jid import JID +from dateutil.tz import tzutc +import datetime + +NS_PUBSUB = 'http://jabber.org/protocol/pubsub' + + +class XEP_0203Test(helpers.SatTestCase): + + def setUp(self): + self.host = helpers.FakeSAT() + self.plugin = XEP_0203(self.host) + + def test_delay(self): + delay_xml = """ + + Offline Storage + + """ + message_xml = """ + + text + %s + + """ % delay_xml + + parent = domish.Element((None, 'message')) + parent['from'] = 'romeo@montague.net/orchard' + parent['to'] = 'juliet@capulet.com' + parent['type'] = 'chat' + parent.addElement('body', None, 'text') + stamp = datetime.datetime(2002, 9, 10, 23, 8, 25, tzinfo=tzutc()) + elt = self.plugin.delay(stamp, JID('capulet.com'), 'Offline Storage', parent) + self.assertEqualXML(elt.toXml(), delay_xml, True) + self.assertEqualXML(parent.toXml(), message_xml, True) diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_xep_0297.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/test_plugin_xep_0297.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,78 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" Plugin XEP-0297 """ + +from constants import Const as C +from sat.test import helpers +from sat.plugins.plugin_xep_0203 import XEP_0203 +from sat.plugins.plugin_xep_0297 import XEP_0297 +from twisted.words.protocols.jabber.jid import JID +from dateutil.tz import tzutc +import datetime +from wokkel.generic import parseXml + + +NS_PUBSUB = 'http://jabber.org/protocol/pubsub' + + +class XEP_0297Test(helpers.SatTestCase): + + def setUp(self): + self.host = helpers.FakeSAT() + self.plugin = XEP_0297(self.host) + self.host.plugins['XEP-0203'] = XEP_0203(self.host) + + def test_delay(self): + stanza = parseXml(""" + + Yet I should kill thee with much cherishing. + + + + + """.encode('utf-8')) + output = """ + + A most courteous exposition! + + + + Yet I should kill thee with much cherishing. + + + + + + + """ + stamp = datetime.datetime(2010, 7, 10, 23, 8, 25, tzinfo=tzutc()) + d = self.plugin.forward(stanza, JID('mercutio@verona.lit'), stamp, + body='A most courteous exposition!', + profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), output, True)) + return d diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_xep_0313.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/test_plugin_xep_0313.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,250 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" Plugin XEP-0313 """ + +from constants import Const as C +from sat.test import helpers +from sat.plugins.plugin_xep_0313 import XEP_0313 +from twisted.words.protocols.jabber.jid import JID +from twisted.words.xish import domish +from dateutil.tz import tzutc +import datetime + +# TODO: change this when RSM and MAM are in wokkel +from sat.tmp.wokkel.rsm import RSMRequest +from sat.tmp.wokkel.mam import buildForm + +NS_PUBSUB = 'http://jabber.org/protocol/pubsub' +SERVICE = 'sat-pubsub.tazar.int' +SERVICE_JID = JID(SERVICE) + + +class XEP_0313Test(helpers.SatTestCase): + + def setUp(self): + self.host = helpers.FakeSAT() + self.plugin = XEP_0313(self.host) + client = self.plugin.getHandler(C.PROFILE[0]) + client.makeConnection(self.host.getClient(C.PROFILE[0]).xmlstream) + + def test_queryArchive(self): + xml = """ + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + d = self.plugin.queryArchive(SERVICE_JID, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryArchivePubsub(self): + xml = """ + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + d = self.plugin.queryArchive(SERVICE_JID, node="fdp/submitted/capulet.lit/sonnets", profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryArchiveWith(self): + xml = """ + + + + + urn:xmpp:mam:0 + + + juliet@capulet.lit + + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + form = buildForm(with_jid=JID('juliet@capulet.lit')) + d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryArchiveStartEnd(self): + xml = """ + + + + + urn:xmpp:mam:0 + + + 2010-06-07T00:00:00Z + + + 2010-07-07T13:23:54Z + + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + start = datetime.datetime(2010, 6, 7, 0, 0, 0, tzinfo=tzutc()) + end = datetime.datetime(2010, 7, 7, 13, 23, 54, tzinfo=tzutc()) + form = buildForm(start=start, end=end) + d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryArchiveStart(self): + xml = """ + + + + + urn:xmpp:mam:0 + + + 2010-08-07T00:00:00Z + + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc()) + form = buildForm(start=start) + d = self.plugin.queryArchive(SERVICE_JID, form, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryArchiveRSM(self): + xml = """ + + + + + urn:xmpp:mam:0 + + + 2010-08-07T00:00:00Z + + + + 10 + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc()) + form = buildForm(start=start) + rsm = RSMRequest(max=10) + d = self.plugin.queryArchive(SERVICE_JID, form=form, rsm=rsm, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryArchiveRSMPaging(self): + xml = """ + + + + urn:xmpp:mam:0 + 2010-08-07T00:00:00Z + + + 10 + 09af3-cc343-b409f + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + start = datetime.datetime(2010, 8, 7, 0, 0, 0, tzinfo=tzutc()) + form = buildForm(start=start) + rsm = RSMRequest(max=10, after=u'09af3-cc343-b409f') + d = self.plugin.queryArchive(SERVICE_JID, form=form, rsm=rsm, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryFields(self): + xml = """ + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + d = self.plugin.queryFields(SERVICE_JID, C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryArchiveFields(self): + xml = """ + + + + + urn:xmpp:mam:0 + + + Where arth thou, my Juliet? + + + {http://jabber.org/protocol/mood}mood/lonely + + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + extra = [{'fieldType': 'text-single', + 'var': 'urn:example:xmpp:free-text-search', + 'value': 'Where arth thou, my Juliet?'}, + {'fieldType': 'text-single', + 'var': 'urn:example:xmpp:stanza-content', + 'value': '{http://jabber.org/protocol/mood}mood/lonely'}] + form = buildForm(extra=extra) + d = self.plugin.queryArchive(SERVICE_JID, form=form, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_queryPrefs(self): + xml = """ + + + + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + d = self.plugin.getPrefs(SERVICE_JID, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d + + def test_setPrefs(self): + xml = """ + + + + romeo@montague.lit + + + montague@montague.lit + + + + """ % (("H_%d" % domish.Element._idCounter), SERVICE) + always = [JID('romeo@montague.lit')] + never = [JID('montague@montague.lit')] + d = self.plugin.setPrefs(SERVICE_JID, always=always, never=never, profile_key=C.PROFILE[0]) + d.addCallback(lambda dummy: self.assertEqualXML(self.host.getSentMessageXml(0), xml, True)) + return d diff -r 584d45bb36d9 -r f71a0fc26886 src/test/test_plugin_xep_0334.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/test/test_plugin_xep_0334.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,102 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013, 2014 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" Plugin XEP-0334 """ + +from constants import Const as C +from sat.test import helpers +from sat.plugins.plugin_xep_0334 import XEP_0334 +from twisted.internet import defer +from wokkel.generic import parseXml +from sat.core import exceptions + +HINTS = ('no-permanent-storage', 'no-storage', 'no-copy') + + +class XEP_0334Test(helpers.SatTestCase): + + def setUp(self): + self.host = helpers.FakeSAT() + self.plugin = XEP_0334(self.host) + + def test_sendMessageTrigger(self): + template_xml = """ + + text + %s + + """ + original_xml = template_xml % '' + + d_list = [] + + def cb(data, expected_xml): + result_xml = data['xml'].toXml().encode("utf-8") + self.assertEqualXML(result_xml, expected_xml, True) + + for key in (HINTS + ('', 'dummy_hint')): + mess_data = {'xml': parseXml(original_xml.encode("utf-8")), + 'extra': {key: True} + } + treatments = defer.Deferred() + self.plugin.sendMessageTrigger(mess_data, defer.Deferred(), treatments, C.PROFILE[0]) + if treatments.callbacks: # the trigger added a callback + expected_xml = template_xml % ('<%s xmlns="urn:xmpp:hints"/>' % key) + treatments.addCallback(cb, expected_xml) + treatments.callback(mess_data) + d_list.append(treatments) + + return defer.DeferredList(d_list) + + def test_messageReceivedTrigger(self): + template_xml = """ + + text + %s + + """ + + def cb(dummy): + raise Exception("Errback should not be ran instead of callback!") + + def eb(failure): + failure.trap(exceptions.SkipHistory) + + d_list = [] + + for key in (HINTS + ('dummy_hint',)): + message = parseXml(template_xml % ('<%s xmlns="urn:xmpp:hints"/>' % key)) + post_treat = defer.Deferred() + self.plugin.messageReceivedTrigger(message, post_treat, C.PROFILE[0]) + if post_treat.callbacks: + assert(key in ('no-permanent-storage', 'no-storage')) + post_treat.addCallbacks(cb, eb) + post_treat.callback(None) + d_list.append(post_treat) + else: + assert(key not in ('no-permanent-storage', 'no-storage')) + + return defer.DeferredList(d_list) diff -r 584d45bb36d9 -r f71a0fc26886 src/tmp/README --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/tmp/README Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,6 @@ +Use this module to temporary store files that need to be integrated to other +projects such as Wokkel. + +For example, the changeset that introduced this folder adds RSM (XEP-0059) +support to Wokkel. The files in sat.tmp.wokkel are imported over the initial +wokkel files from sat.plugins.__init__.py \ No newline at end of file diff -r 584d45bb36d9 -r f71a0fc26886 src/tmp/__init__.py diff -r 584d45bb36d9 -r f71a0fc26886 src/tmp/wokkel/__init__.py diff -r 584d45bb36d9 -r f71a0fc26886 src/tmp/wokkel/delay.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/tmp/wokkel/delay.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,117 @@ +# -*- test-case-name: wokkel.test.test_delay -*- +# +# Copyright (c) Ralph Meijer. +# See LICENSE for details. + +""" +Delayed Delivery. + +Support for comunicating Delayed Delivery information as specified by +U{XEP-0203} and its predecessor +U{XEP-0091}. +""" + +from dateutil.parser import parse +from dateutil.tz import tzutc + +from twisted.words.protocols.jabber.jid import InvalidFormat, JID +from twisted.words.xish import domish + +NS_DELAY = 'urn:xmpp:delay' +NS_JABBER_DELAY = 'jabber:x:delay' + +class Delay(object): + """ + Delayed Delivery information. + + Instances of this class represent delayed delivery information that can be + parsed from and rendered into both XEP-0203 and legacy XEP-0091 formats. + + @ivar stamp: The timestamp the stanza was originally sent. + @type stamp: L{datetime.datetime} + @ivar sender: The optional entity that originally sent the stanza or + delayed its delivery. + @type sender: L{JID} + """ + + def __init__(self, stamp, sender=None): + self.stamp = stamp + self.sender = sender + + + def toElement(self, legacy=False): + """ + Render this instance into a domish Element. + + @param legacy: If C{True}, use the legacy XEP-0091 format. + @type legacy: C{bool} + """ + if not self.stamp: + raise ValueError("stamp is required") + if self.stamp.tzinfo is None: + raise ValueError("stamp is not offset-aware") + + if legacy: + element = domish.Element((NS_JABBER_DELAY, 'x')) + stampFormat = '%Y%m%dT%H:%M:%S' + else: + element = domish.Element((NS_DELAY, 'delay')) + stampFormat = '%Y-%m-%dT%H:%M:%SZ' + + stamp = self.stamp.astimezone(tzutc()) + element['stamp'] = stamp.strftime(stampFormat) + + if self.sender: + element['from'] = self.sender.full() + + return element + + + @staticmethod + def fromElement(element): + """ + Create an instance from a domish Element. + """ + try: + stamp = parse(element[u'stamp']) + + # Assume UTC if no timezone was given + if stamp.tzinfo is None: + stamp = stamp.replace(tzinfo=tzutc()) + except (KeyError, ValueError, TypeError): + stamp = None + + try: + sender = JID(element[u'from']) + except (KeyError, InvalidFormat): + sender = None + + delay = Delay(stamp, sender) + return delay + + + +class DelayMixin(object): + """ + Mixin for parsing delayed delivery information from stanzas. + + This can be used as a mixin for subclasses of L{wokkel.generic.Stanza} + for parsing delayed delivery information. If both XEP-0203 and XEP-0091 + formats are present, the former takes precedence. + """ + + delay = None + + childParsers = { + (NS_DELAY, 'delay'): '_childParser_delay', + (NS_JABBER_DELAY, 'x'): '_childParser_legacyDelay', + } + + + def _childParser_delay(self, element): + self.delay = Delay.fromElement(element) + + + def _childParser_legacyDelay(self, element): + if not self.delay: + self.delay = Delay.fromElement(element) diff -r 584d45bb36d9 -r f71a0fc26886 src/tmp/wokkel/mam.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/tmp/wokkel/mam.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,586 @@ +# -*- test-case-name: wokkel.test.test_mam -*- +# +# Copyright (c) Adrien Cossa. +# See LICENSE for details. + +""" +XMPP Message Archive Management protocol. + +This protocol is specified in +U{XEP-0313}. +""" + +from dateutil.tz import tzutc + +from zope.interface import Interface, implements + +from twisted.words.protocols.jabber.xmlstream import IQ, toResponse +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid +from twisted.python import log + +from wokkel.subprotocols import IQHandlerMixin, XMPPHandler +from wokkel import disco, data_form, delay + +import rsm + +NS_MAM = 'urn:xmpp:mam:0' +NS_FORWARD = 'urn:xmpp:forward:0' + +FIELDS_REQUEST = "/iq[@type='get']/query[@xmlns='%s']" % NS_MAM +ARCHIVE_REQUEST = "/iq[@type='set']/query[@xmlns='%s']" % NS_MAM +PREFS_GET_REQUEST = "/iq[@type='get']/prefs[@xmlns='%s']" % NS_MAM +PREFS_SET_REQUEST = "/iq[@type='set']/prefs[@xmlns='%s']" % NS_MAM + +# TODO: add the tests! + + +class MAMError(Exception): + """ + MAM error. + """ + + +class Unsupported(MAMError): + def __init__(self, feature, text=None): + self.feature = feature + MAMError.__init__(self, 'feature-not-implemented', + 'unsupported', + feature, + text) + + def __str__(self): + message = MAMError.__str__(self) + message += ', feature %r' % self.feature + return message + + +class MAMQueryRequest(): + """ + A Message Archive Management request. + + @ivar form: Data Form specifing the filters. + @itype form: L{data_form.Form} + + @ivar rsm: RSM request instance. + @itype rsm: L{rsm.RSMRequest} + + @ivar node: pubsub node id if querying a pubsub node, else None. + @itype form: C{unicode} + """ + + form = None + rsm = None + node = None + + def __init__(self, form=None, rsm=None, node=None): + if form is not None: + assert(form.formType == 'submit') + self.form = form + self.rsm = rsm + self.node = node + + @classmethod + def parse(cls, element): + """Parse the DOM representation of a MAM request. + + @param element: MAM request element. + @type element: L{Element} + + @return: MAMQueryRequest instance. + @rtype: L{MAMQueryRequest} + """ + if element.uri != NS_MAM or element.name != 'query': + raise MAMError('Element provided is not a MAM request') + form = data_form.findForm(element, NS_MAM) + try: + rsm_request = rsm.RSMRequest.parse(element) + except rsm.RSMNotFoundError: + rsm_request = None + node = element['node'] if element.hasAttribute('node') else None + return MAMQueryRequest(form, rsm_request, node) + + def toElement(self): + """ + Return the DOM representation of this RSM request. + + @rtype: L{Element} + """ + mam_elt = domish.Element((NS_MAM, 'query')) + if self.node is not None: + mam_elt['node'] = self.node + if self.form is not None: + mam_elt.addChild(self.form.toElement()) + if self.rsm is not None: + mam_elt.addChild(self.rsm.toElement()) + + return mam_elt + + def render(self, parent): + """Embed the DOM representation of this MAM request in the given element. + + @param parent: parent IQ element. + @type parent: L{Element} + + @return: MAM request element. + @rtype: L{Element} + """ + assert(parent.name == 'iq') + mam_elt = self.toElement() + parent.addChild(mam_elt) + return mam_elt + + +class MAMPrefs(): + """ + A Message Archive Management request. + + @param default: A value in ('always', 'never', 'roster'). + @type : C{unicode} + + @param always (list): A list of JID instances. + @type always: C{list} + + @param never (list): A list of JID instances. + @type never: C{list} + """ + + default = None + always = None + never = None + + def __init__(self, default=None, always=None, never=None): + if default: + assert(default in ('always', 'never', 'roster')) + self.default = default + if always: + assert(isinstance(always, list)) + self.always = always + else: + self.always = [] + if never: + assert(isinstance(never, list)) + self.never = never + else: + self.never = [] + + @classmethod + def parse(cls, element): + """Parse the DOM representation of a MAM request. + + @param element: MAM request element. + @type element: L{Element} + + @return: MAMPrefs instance. + @rtype: L{MAMPrefs} + """ + if element.uri != NS_MAM or element.name != 'prefs': + raise MAMError('Element provided is not a MAM request') + default = element['default'] if element.hasAttribute('default') else None + prefs = {} + for attr in ('always', 'never'): + prefs[attr] = [] + try: + pref = domish.generateElementsNamed(element.elements(), attr).next() + for jid_s in domish.generateElementsNamed(pref.elements(), 'jid'): + prefs[attr].append(jid.JID(jid_s)) + except StopIteration: + pass + return MAMPrefs(default, **prefs) + + def toElement(self): + """ + Return the DOM representation of this RSM request. + + @rtype: L{Element} + """ + mam_elt = domish.Element((NS_MAM, 'prefs')) + if self.default: + mam_elt['default'] = self.default + for attr in ('always', 'never'): + attr_elt = mam_elt.addElement(attr) + jids = getattr(self, attr) + for jid in jids: + attr_elt.addElement('jid', content=jid.full()) + return mam_elt + + def render(self, parent): + """Embed the DOM representation of this MAM request in the given element. + + @param parent: parent IQ element. + @type parent: L{Element} + + @return: MAM request element. + @rtype: L{Element} + """ + assert(parent.name == 'iq') + mam_elt = self.toElement() + parent.addChild(mam_elt) + return mam_elt + + +class MAMClient(XMPPHandler): + """ + MAM client. + + This handler implements the protocol for sending out MAM requests. + """ + + def queryArchive(self, service=None, form=None, rsm=None, node=None, sender=None): + """Query a user, MUC or pubsub archive. + + @param service: Entity offering the MAM service (None for user archives). + @type service: L{JID} + + @param form: Data Form to filter the request. + @type form: L{Form} + + @param rsm: RSM request instance. + @type rsm: L{RSMRequest} + + @param node: Pubsub node to query, or None if inappropriate. + @type node: C{unicode} + + @param sender: Optional sender address. + @type sender: L{JID} + + @return: A deferred that fires upon receiving a response. + @rtype: L{Deferred} + """ + iq = IQ(self.xmlstream, 'set') + MAMQueryRequest(form, rsm, node).render(iq) + if sender is not None: + iq['from'] = unicode(sender) + return iq.send(to=service.full() if service else None) + + def queryFields(self, service=None, sender=None): + """Ask the server about additional supported fields. + + @param service: Entity offering the MAM service (None for user archives). + @type service: L{JID} + + @param sender: Optional sender address. + @type sender: L{JID} + + @return: A deferred that fires upon receiving a response. + @rtype: L{Deferred} + """ + # http://xmpp.org/extensions/xep-0313.html#query-form + iq = IQ(self.xmlstream, 'get') + MAMQueryRequest().render(iq) + if sender is not None: + iq['from'] = unicode(sender) + return iq.send(to=service.full() if service else None) + + def queryPrefs(self, service=None, sender=None): + """Retrieve the current user preferences. + + @param service: Entity offering the MAM service (None for user archives). + @type service: L{JID} + + @param sender: Optional sender address. + @type sender: L{JID} + + @return: A deferred that fires upon receiving a response. + @rtype: L{Deferred} + """ + # http://xmpp.org/extensions/xep-0313.html#prefs + iq = IQ(self.xmlstream, 'get') + MAMPrefs().render(iq) + if sender is not None: + iq['from'] = unicode(sender) + return iq.send(to=service.full() if service else None) + + def setPrefs(self, service=None, default='roster', always=None, never=None, sender=None): + """Set new user preferences. + + @param service: Entity offering the MAM service (None for user archives). + @type service: L{JID} + + @param default: A value in ('always', 'never', 'roster'). + @type : C{unicode} + + @param always (list): A list of JID instances. + @type always: C{list} + + @param never (list): A list of JID instances. + @type never: C{list} + + @param sender: Optional sender address. + @type sender: L{JID} + + @return: A deferred that fires upon receiving a response. + @rtype: L{Deferred} + """ + # http://xmpp.org/extensions/xep-0313.html#prefs + assert(default is not None) + iq = IQ(self.xmlstream, 'set') + MAMPrefs(default, always, never).render(iq) + if sender is not None: + iq['from'] = unicode(sender) + return iq.send(to=service.full() if service else None) + + +class IMAMResource(Interface): + + def onArchiveRequest(self, mam, requestor): + """ + + @param mam: The MAM request. + @type mam: L{MAMQueryReques} + + @param requestor: JID of the requestor. + @type requestor: L{JID} + + @return: The RSM answer. + @rtype: L{RSMResponse} + """ + + def onPrefsGetRequest(self, requestor): + """ + + @param requestor: JID of the requestor. + @type requestor: L{JID} + + @return: The current settings. + @rtype: L{wokkel.mam.MAMPrefs} + """ + + def onPrefsSetRequest(self, prefs, requestor): + """ + + @param prefs: The new settings to set. + @type prefs: L{wokkel.mam.MAMPrefs} + + @param requestor: JID of the requestor. + @type requestor: L{JID} + + @return: The new current settings. + @rtype: L{wokkel.mam.MAMPrefs} + """ + + +class MAMService(XMPPHandler, IQHandlerMixin): + """ + Protocol implementation for a MAM service. + + This handler waits for XMPP Ping requests and sends a response. + """ + + implements(disco.IDisco) + + iqHandlers = {FIELDS_REQUEST: '_onFieldsRequest', + ARCHIVE_REQUEST: '_onArchiveRequest', + PREFS_GET_REQUEST: '_onPrefsGetRequest', + PREFS_SET_REQUEST: '_onPrefsSetRequest' + } + + _legacyFilters = {'start': {'fieldType': 'text-single', + 'var': 'start', + 'label': 'Starting time', + 'desc': 'Starting time a the result period.', + }, + 'end': {'fieldType': 'text-single', + 'var': 'end', + 'label': 'Ending time', + 'desc': 'Ending time of the result period.', + }, + 'with': {'fieldType': 'jid-single', + 'var': 'with', + 'label': 'Entity', + 'desc': 'Entity against which to match message.', + }, + } + + extra_filters = {} + + def __init__(self, resource): + """ + @param resource: instance implementing IMAMResource + @type resource: L{object} + """ + self.resource = resource + + def connectionInitialized(self): + """ + Called when the XML stream has been initialized. + + This sets up an observer for incoming ping requests. + """ + self.xmlstream.addObserver(FIELDS_REQUEST, self.handleRequest) + self.xmlstream.addObserver(ARCHIVE_REQUEST, self.handleRequest) + self.xmlstream.addObserver(PREFS_GET_REQUEST, self.handleRequest) + self.xmlstream.addObserver(PREFS_SET_REQUEST, self.handleRequest) + + def addFilter(self, field): + """ + Add a new filter for querying MAM archive. + + @param field: dictionary specifying the attributes to build a + wokkel.data_form.Field. + @type field: C{dict} + """ + self.extra_filters[field.var] = field + + def _onFieldsRequest(self, iq): + """ + Called when a fields request has been received. + + This immediately replies with a result response. + """ + response = toResponse(iq, 'result') + query = response.addElement((NS_MAM, 'query')) + query.addChild(buildForm('form', extra=self.extra_filters).toElement()) + self.xmlstream.send(response) + iq.handled = True + + def _onArchiveRequest(self, iq): + """ + Called when a message archive request has been received. + + This immediately replies with a result response, followed by the + list of archived message and the finally the message. + """ + response = toResponse(iq, 'result') + self.xmlstream.send(response) + + mam_ = MAMQueryRequest.parse(iq.query) + requestor = jid.JID(iq['from']) + + # remove unsupported filters + unsupported_fields = [] + if mam_.form: + for key, field in mam_.form.fields.iteritems(): + if key not in self._legacyFilters and key not in self.extra_filters: + log.msg('Ignored unsupported MAM filter: %s' % field) + unsupported_fields.append(key) + for key in unsupported_fields: + del mam_.form.fields[key] + + def forward_message(id_, elt, date): + msg = domish.Element((None, 'message')) + msg['to'] = iq['from'] + result = msg.addElement('result', NS_MAM) + if iq.hasAttribute('queryid'): + result['queryid'] = iq.query['queryid'] + result['id'] = id_ + forward = result.addElement('forwarded', NS_FORWARD) + forward.addChild(delay.Delay(date).toElement()) + forward.addChild(elt) + self.xmlstream.send(msg) + + def cb(result): + msg_data, rsm_elt = result + for data in msg_data: + forward_message(*data) + msg = domish.Element((None, 'message')) + msg['to'] = iq['from'] + fin = msg.addElement('fin', NS_MAM) + if iq.hasAttribute('queryid'): + fin['queryid'] = iq.query['queryid'] + if rsm_elt is not None: + fin.addChild(rsm_elt) + self.xmlstream.send(msg) + + self.resource.onArchiveRequest(mam_, requestor).addCallback(cb) + iq.handled = True + + def _onPrefsGetRequest(self, iq): + """ + Called when a prefs get request has been received. + + This immediately replies with a result response. + """ + response = toResponse(iq, 'result') + + requestor = jid.JID(iq['from']) + + def cb(prefs): + response.addChild(prefs.toElement()) + self.xmlstream.send(response) + + self.resource.onPrefsGetRequest(requestor).addCallback(cb) + iq.handled = True + + def _onPrefsSetRequest(self, iq): + """ + Called when a prefs get request has been received. + + This immediately replies with a result response. + """ + response = toResponse(iq, 'result') + + prefs = MAMPrefs.parse(iq.prefs) + requestor = jid.JID(iq['from']) + + def cb(prefs): + response.addChild(prefs.toElement()) + self.xmlstream.send(response) + + self.resource.onPrefsSetRequest(prefs, requestor).addCallback(cb) + iq.handled = True + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + return [disco.DiscoFeature(NS_MAM)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + return [] + + +def datetime2utc(datetime): + """Convert a datetime to a XEP-0082 compliant UTC datetime. + + @param datetime: Offset-aware timestamp to convert. + @type datetime: L{datetime} + + @return: The datetime converted to UTC. + @rtype: C{unicode} + """ + stampFormat = '%Y-%m-%dT%H:%M:%SZ' + return datetime.astimezone(tzutc()).strftime(stampFormat) + + +def buildForm(formType='submit', start=None, end=None, with_jid=None, extra=None): + """Prepare a Data Form for MAM. + + @param formType: The type of the Data Form ('submit' or 'form'). + @type formType: C{unicode} + + @param start: Offset-aware timestamp to filter out older messages. + @type start: L{datetime} + + @param end: Offset-aware timestamp to filter out later messages. + @type end: L{datetime} + + @param with_jid: JID against which to match messages. + @type with_jid: L{JID} + + @param extra: list of extra fields that are not defined by the + specification. Each element of the list must be a dictionary + specifying the attributes to build a wokkel.data_form.Field. + @type: C{list} + + @return: XEP-0004 Data Form object. + @rtype: L{Form} + """ + form = data_form.Form(formType, formNamespace=NS_MAM) + filters = [] + + if formType == 'form': + filters.extend(MAMService._legacyFilters.values()) + elif formType == 'submit': + if start: + filters.append({'var': 'start', 'value': datetime2utc(start)}) + if end: + filters.append({'var': 'end', 'value': datetime2utc(end)}) + if with_jid: + # must specify the field type to overwrite default value in Field.__init__ + filters.append({'fieldType': 'jid-single', 'var': 'with', 'value': with_jid.full()}) + + if extra is not None: + filters.extend(extra) + + for field in filters: + form.addField(data_form.Field(**field)) + + return form diff -r 584d45bb36d9 -r f71a0fc26886 src/tmp/wokkel/pubsub.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/tmp/wokkel/pubsub.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,1568 @@ +# -*- test-case-name: wokkel.test.test_pubsub -*- +# +# Copyright (c) Ralph Meijer. +# See LICENSE for details. + +""" +XMPP publish-subscribe protocol. + +This protocol is specified in +U{XEP-0060}. +""" + +from zope.interface import implements + +from twisted.internet import defer +from twisted.python import log +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish + +from wokkel import disco, data_form, generic, shim +from wokkel.compat import IQ +from wokkel.subprotocols import IQHandlerMixin, XMPPHandler +from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource + +# Iq get and set XPath queries +IQ_GET = '/iq[@type="get"]' +IQ_SET = '/iq[@type="set"]' + +# Publish-subscribe namespaces +NS_PUBSUB = 'http://jabber.org/protocol/pubsub' +NS_PUBSUB_EVENT = NS_PUBSUB + '#event' +NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' +NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" +NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config" +NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data" +NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options" + +# XPath to match pubsub requests +PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \ + 'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \ + '@xmlns="' + NS_PUBSUB_OWNER + '"]' + +class SubscriptionPending(Exception): + """ + Raised when the requested subscription is pending acceptance. + """ + + + +class SubscriptionUnconfigured(Exception): + """ + Raised when the requested subscription needs to be configured before + becoming active. + """ + + + +class PubSubError(error.StanzaError): + """ + Exception with publish-subscribe specific condition. + """ + def __init__(self, condition, pubsubCondition, feature=None, text=None): + appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) + if feature: + appCondition['feature'] = feature + error.StanzaError.__init__(self, condition, + text=text, + appCondition=appCondition) + + + +class BadRequest(error.StanzaError): + """ + Bad request stanza error. + """ + def __init__(self, pubsubCondition=None, text=None): + if pubsubCondition: + appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) + else: + appCondition = None + error.StanzaError.__init__(self, 'bad-request', + text=text, + appCondition=appCondition) + + + +class Unsupported(PubSubError): + def __init__(self, feature, text=None): + self.feature = feature + PubSubError.__init__(self, 'feature-not-implemented', + 'unsupported', + feature, + text) + + def __str__(self): + message = PubSubError.__str__(self) + message += ', feature %r' % self.feature + return message + + +class Subscription(object): + """ + A subscription to a node. + + @ivar nodeIdentifier: The identifier of the node subscribed to. The root + node is denoted by C{None}. + @type nodeIdentifier: C{unicode} + + @ivar subscriber: The subscribing entity. + @type subscriber: L{jid.JID} + + @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'}, + C{'unconfigured'}. + @type state: C{unicode} + + @ivar options: Optional list of subscription options. + @type options: C{dict} + + @ivar subscriptionIdentifier: Optional subscription identifier. + @type subscriptionIdentifier: C{unicode} + """ + + def __init__(self, nodeIdentifier, subscriber, state, options=None, + subscriptionIdentifier=None): + self.nodeIdentifier = nodeIdentifier + self.subscriber = subscriber + self.state = state + self.options = options or {} + self.subscriptionIdentifier = subscriptionIdentifier + + + @staticmethod + def fromElement(element): + return Subscription( + element.getAttribute('node'), + jid.JID(element.getAttribute('jid')), + element.getAttribute('subscription'), + subscriptionIdentifier=element.getAttribute('subid')) + + + def toElement(self, defaultUri=None): + """ + Return the DOM representation of this subscription. + + @rtype: L{domish.Element} + """ + element = domish.Element((defaultUri, 'subscription')) + if self.nodeIdentifier: + element['node'] = self.nodeIdentifier + element['jid'] = unicode(self.subscriber) + element['subscription'] = self.state + if self.subscriptionIdentifier: + element['subid'] = self.subscriptionIdentifier + return element + + + +class Item(domish.Element): + """ + Publish subscribe item. + + This behaves like an object providing L{domish.IElement}. + + Item payload can be added using C{addChild} or C{addRawXml}, or using the + C{payload} keyword argument to C{__init__}. + """ + + def __init__(self, id=None, payload=None): + """ + @param id: optional item identifier + @type id: C{unicode} + @param payload: optional item payload. Either as a domish element, or + as serialized XML. + @type payload: object providing L{domish.IElement} or C{unicode}. + """ + + domish.Element.__init__(self, (None, 'item')) + if id is not None: + self['id'] = id + if payload is not None: + if isinstance(payload, basestring): + self.addRawXml(payload) + else: + self.addChild(payload) + + + +class PubSubRequest(generic.Stanza): + """ + A publish-subscribe request. + + The set of instance variables used depends on the type of request. If + a variable is not applicable or not passed in the request, its value is + C{None}. + + @ivar verb: The type of publish-subscribe request. See C{_requestVerbMap}. + @type verb: C{str}. + + @ivar affiliations: Affiliations to be modified. + @type affiliations: C{set} + + @ivar items: The items to be published, as L{domish.Element}s. + @type items: C{list} + + @ivar itemIdentifiers: Identifiers of the items to be retrieved or + retracted. + @type itemIdentifiers: C{set} + + @ivar maxItems: Maximum number of items to retrieve. + @type maxItems: C{int}. + + @ivar nodeIdentifier: Identifier of the node the request is about. + @type nodeIdentifier: C{unicode} + + @ivar nodeType: The type of node that should be created, or for which the + configuration is retrieved. C{'leaf'} or C{'collection'}. + @type nodeType: C{str} + + @ivar options: Configurations options for nodes, subscriptions and publish + requests. + @type options: L{data_form.Form} + + @ivar subscriber: The subscribing entity. + @type subscriber: L{JID} + + @ivar subscriptionIdentifier: Identifier for a specific subscription. + @type subscriptionIdentifier: C{unicode} + + @ivar subscriptions: Subscriptions to be modified, as a set of + L{Subscription}. + @type subscriptions: C{set} + + @ivar affiliations: Affiliations to be modified, as a dictionary of entity + (L{JID} to affiliation + (C{unicode}). + @type affiliations: C{dict} + """ + + verb = None + + affiliations = None + items = None + itemIdentifiers = None + maxItems = None + nodeIdentifier = None + nodeType = None + options = None + subscriber = None + subscriptionIdentifier = None + subscriptions = None + affiliations = None + + # Map request iq type and subelement name to request verb + _requestVerbMap = { + ('set', NS_PUBSUB, 'publish'): 'publish', + ('set', NS_PUBSUB, 'subscribe'): 'subscribe', + ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe', + ('get', NS_PUBSUB, 'options'): 'optionsGet', + ('set', NS_PUBSUB, 'options'): 'optionsSet', + ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions', + ('get', NS_PUBSUB, 'affiliations'): 'affiliations', + ('set', NS_PUBSUB, 'create'): 'create', + ('get', NS_PUBSUB_OWNER, 'default'): 'default', + ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet', + ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet', + ('get', NS_PUBSUB, 'items'): 'items', + ('set', NS_PUBSUB, 'retract'): 'retract', + ('set', NS_PUBSUB_OWNER, 'purge'): 'purge', + ('set', NS_PUBSUB_OWNER, 'delete'): 'delete', + ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet', + ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet', + ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet', + ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet', + } + + # Map request verb to request iq type and subelement name + _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems())) + + # Map request verb to parameter handler names + _parameters = { + 'publish': ['node', 'items'], + 'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'], + 'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'], + 'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'], + 'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'], + 'subscriptions': [], + 'affiliations': [], + 'create': ['nodeOrNone', 'configureOrNone'], + 'default': ['default'], + 'configureGet': ['nodeOrEmpty'], + 'configureSet': ['nodeOrEmpty', 'configure'], + 'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'], + 'retract': ['node', 'itemIdentifiers'], + 'purge': ['node'], + 'delete': ['node'], + 'affiliationsGet': ['nodeOrEmpty'], + 'affiliationsSet': ['nodeOrEmpty', 'affiliations'], + 'subscriptionsGet': ['nodeOrEmpty'], + 'subscriptionsSet': [], + } + + def __init__(self, verb=None): + self.verb = verb + + + def _parse_node(self, verbElement): + """ + Parse the required node identifier out of the verbElement. + """ + try: + self.nodeIdentifier = verbElement["node"] + except KeyError: + raise BadRequest('nodeid-required') + + + def _render_node(self, verbElement): + """ + Render the required node identifier on the verbElement. + """ + if not self.nodeIdentifier: + raise Exception("Node identifier is required") + + verbElement['node'] = self.nodeIdentifier + + + def _parse_nodeOrEmpty(self, verbElement): + """ + Parse the node identifier out of the verbElement. May be empty. + """ + self.nodeIdentifier = verbElement.getAttribute("node", '') + + + def _render_nodeOrEmpty(self, verbElement): + """ + Render the node identifier on the verbElement. May be empty. + """ + if self.nodeIdentifier: + verbElement['node'] = self.nodeIdentifier + + + def _parse_nodeOrNone(self, verbElement): + """ + Parse the optional node identifier out of the verbElement. + """ + self.nodeIdentifier = verbElement.getAttribute("node") + + + def _render_nodeOrNone(self, verbElement): + """ + Render the optional node identifier on the verbElement. + """ + if self.nodeIdentifier: + verbElement['node'] = self.nodeIdentifier + + + def _parse_items(self, verbElement): + """ + Parse items out of the verbElement for publish requests. + """ + self.items = [] + for element in verbElement.elements(): + if element.uri == NS_PUBSUB and element.name == 'item': + self.items.append(element) + + + def _render_items(self, verbElement): + """ + Render items into the verbElement for publish requests. + """ + if self.items: + for item in self.items: + item.uri = NS_PUBSUB + verbElement.addChild(item) + + + def _parse_jid(self, verbElement): + """ + Parse subscriber out of the verbElement for un-/subscribe requests. + """ + try: + self.subscriber = jid.internJID(verbElement["jid"]) + except KeyError: + raise BadRequest('jid-required') + + + def _render_jid(self, verbElement): + """ + Render subscriber into the verbElement for un-/subscribe requests. + """ + verbElement['jid'] = self.subscriber.full() + + + def _parse_default(self, verbElement): + """ + Parse node type out of a request for the default node configuration. + """ + form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG) + if form is not None and form.formType == 'submit': + values = form.getValues() + self.nodeType = values.get('pubsub#node_type', 'leaf') + else: + self.nodeType = 'leaf' + + + def _parse_configure(self, verbElement): + """ + Parse options out of a request for setting the node configuration. + """ + form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG) + if form is not None: + if form.formType in ('submit', 'cancel'): + self.options = form + else: + raise BadRequest(text=u"Unexpected form type '%s'" % form.formType) + else: + raise BadRequest(text="Missing configuration form") + + + def _parse_configureOrNone(self, verbElement): + """ + Parse optional node configuration form in create request. + """ + for element in verbElement.parent.elements(): + if element.uri == NS_PUBSUB and element.name == 'configure': + form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG) + if form is not None: + if form.formType != 'submit': + raise BadRequest(text=u"Unexpected form type '%s'" % + form.formType) + else: + form = data_form.Form('submit', + formNamespace=NS_PUBSUB_NODE_CONFIG) + self.options = form + + + def _render_configureOrNone(self, verbElement): + """ + Render optional node configuration form in create request. + """ + if self.options is not None: + configure = verbElement.parent.addElement('configure') + configure.addChild(self.options.toElement()) + + + def _parse_itemIdentifiers(self, verbElement): + """ + Parse item identifiers out of items and retract requests. + """ + self.itemIdentifiers = [] + for element in verbElement.elements(): + if element.uri == NS_PUBSUB and element.name == 'item': + try: + self.itemIdentifiers.append(element["id"]) + except KeyError: + raise BadRequest() + + + def _render_itemIdentifiers(self, verbElement): + """ + Render item identifiers into items and retract requests. + """ + if self.itemIdentifiers: + for itemIdentifier in self.itemIdentifiers: + item = verbElement.addElement('item') + item['id'] = itemIdentifier + + + def _parse_maxItems(self, verbElement): + """ + Parse maximum items out of an items request. + """ + value = verbElement.getAttribute('max_items') + + if value: + try: + self.maxItems = int(value) + except ValueError: + raise BadRequest(text="Field max_items requires a positive " + + "integer value") + + + def _render_maxItems(self, verbElement): + """ + Render maximum items into an items request. + """ + if self.maxItems: + verbElement['max_items'] = unicode(self.maxItems) + + + def _parse_subidOrNone(self, verbElement): + """ + Parse subscription identifier out of a request. + """ + self.subscriptionIdentifier = verbElement.getAttribute("subid") + + + def _render_subidOrNone(self, verbElement): + """ + Render subscription identifier into a request. + """ + if self.subscriptionIdentifier: + verbElement['subid'] = self.subscriptionIdentifier + + + def _parse_options(self, verbElement): + """ + Parse options form out of a subscription options request. + """ + form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS) + if form is not None: + if form.formType in ('submit', 'cancel'): + self.options = form + else: + raise BadRequest(text=u"Unexpected form type '%s'" % form.formType) + else: + raise BadRequest(text="Missing options form") + + + + def _render_options(self, verbElement): + verbElement.addChild(self.options.toElement()) + + + def _parse_optionsWithSubscribe(self, verbElement): + for element in verbElement.parent.elements(): + if element.name == 'options' and element.uri == NS_PUBSUB: + form = data_form.findForm(element, + NS_PUBSUB_SUBSCRIBE_OPTIONS) + if form is not None: + if form.formType != 'submit': + raise BadRequest(text=u"Unexpected form type '%s'" % + form.formType) + else: + form = data_form.Form('submit', + formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) + self.options = form + + + def _render_optionsWithSubscribe(self, verbElement): + if self.options is not None: + optionsElement = verbElement.parent.addElement('options') + self._render_options(optionsElement) + + + def _parse_affiliations(self, verbElement): + self.affiliations = {} + for element in verbElement.elements(): + if (element.uri == NS_PUBSUB_OWNER and + element.name == 'affiliation'): + try: + entity = jid.internJID(element['jid']).userhostJID() + except KeyError: + raise BadRequest(text='Missing jid attribute') + + if entity in self.affiliations: + raise BadRequest(text='Multiple affiliations for an entity') + + try: + affiliation = element['affiliation'] + except KeyError: + raise BadRequest(text='Missing affiliation attribute') + + self.affiliations[entity] = affiliation + + + def parseElement(self, element): + """ + Parse the publish-subscribe verb and parameters out of a request. + """ + generic.Stanza.parseElement(self, element) + + verbs = [] + verbElements = [] + for child in element.pubsub.elements(): + key = (self.stanzaType, child.uri, child.name) + try: + verb = self._requestVerbMap[key] + except KeyError: + continue + + verbs.append(verb) + verbElements.append(child) + + if not verbs: + raise NotImplementedError() + + if len(verbs) > 1: + if 'optionsSet' in verbs and 'subscribe' in verbs: + self.verb = 'subscribe' + verbElement = verbElements[verbs.index('subscribe')] + else: + raise NotImplementedError() + else: + self.verb = verbs[0] + verbElement = verbElements[0] + + for parameter in self._parameters[self.verb]: + getattr(self, '_parse_%s' % parameter)(verbElement) + + + + def send(self, xs): + """ + Send this request to its recipient. + + This renders all of the relevant parameters for this specific + requests into an L{IQ}, and invoke its C{send} method. + This returns a deferred that fires upon reception of a response. See + L{IQ} for details. + + @param xs: The XML stream to send the request on. + @type xs: L{twisted.words.protocols.jabber.xmlstream.XmlStream} + @rtype: L{defer.Deferred}. + """ + + try: + (self.stanzaType, + childURI, + childName) = self._verbRequestMap[self.verb] + except KeyError: + raise NotImplementedError() + + iq = IQ(xs, self.stanzaType) + iq.addElement((childURI, 'pubsub')) + verbElement = iq.pubsub.addElement(childName) + + if self.sender: + iq['from'] = self.sender.full() + if self.recipient: + iq['to'] = self.recipient.full() + + for parameter in self._parameters[self.verb]: + getattr(self, '_render_%s' % parameter)(verbElement) + + return iq.send() + + + +class PubSubEvent(object): + """ + A publish subscribe event. + + @param sender: The entity from which the notification was received. + @type sender: L{jid.JID} + @param recipient: The entity to which the notification was sent. + @type recipient: L{wokkel.pubsub.ItemsEvent} + @param nodeIdentifier: Identifier of the node the event pertains to. + @type nodeIdentifier: C{unicode} + @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}. + @type headers: C{dict} + """ + + def __init__(self, sender, recipient, nodeIdentifier, headers): + self.sender = sender + self.recipient = recipient + self.nodeIdentifier = nodeIdentifier + self.headers = headers + + + +class ItemsEvent(PubSubEvent): + """ + A publish-subscribe event that signifies new, updated and retracted items. + + @param items: List of received items as domish elements. + @type items: C{list} of L{domish.Element} + """ + + def __init__(self, sender, recipient, nodeIdentifier, items, headers): + PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers) + self.items = items + + + +class DeleteEvent(PubSubEvent): + """ + A publish-subscribe event that signifies the deletion of a node. + """ + + redirectURI = None + + + +class PurgeEvent(PubSubEvent): + """ + A publish-subscribe event that signifies the purging of a node. + """ + + + +class PubSubClient(XMPPHandler): + """ + Publish subscribe client protocol. + """ + + implements(IPubSubClient) + + def connectionInitialized(self): + self.xmlstream.addObserver('/message/event[@xmlns="%s"]' % + NS_PUBSUB_EVENT, self._onEvent) + + + def _onEvent(self, message): + if message.getAttribute('type') == 'error': + return + + try: + sender = jid.JID(message["from"]) + recipient = jid.JID(message["to"]) + except KeyError: + return + + actionElement = None + for element in message.event.elements(): + if element.uri == NS_PUBSUB_EVENT: + actionElement = element + + if not actionElement: + return + + eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None) + + if eventHandler: + headers = shim.extractHeaders(message) + eventHandler(sender, recipient, actionElement, headers) + message.handled = True + + + def _onEvent_items(self, sender, recipient, action, headers): + nodeIdentifier = action["node"] + + items = [element for element in action.elements() + if element.name in ('item', 'retract')] + + event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers) + self.itemsReceived(event) + + + def _onEvent_delete(self, sender, recipient, action, headers): + nodeIdentifier = action["node"] + event = DeleteEvent(sender, recipient, nodeIdentifier, headers) + if action.redirect: + event.redirectURI = action.redirect.getAttribute('uri') + self.deleteReceived(event) + + + def _onEvent_purge(self, sender, recipient, action, headers): + nodeIdentifier = action["node"] + event = PurgeEvent(sender, recipient, nodeIdentifier, headers) + self.purgeReceived(event) + + + def itemsReceived(self, event): + pass + + + def deleteReceived(self, event): + pass + + + def purgeReceived(self, event): + pass + + + def createNode(self, service, nodeIdentifier=None, options=None, + sender=None): + """ + Create a publish subscribe node. + + @param service: The publish subscribe service to create the node at. + @type service: L{JID} + @param nodeIdentifier: Optional suggestion for the id of the node. + @type nodeIdentifier: C{unicode} + @param options: Optional node configuration options. + @type options: C{dict} + """ + request = PubSubRequest('create') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.sender = sender + + if options: + form = data_form.Form(formType='submit', + formNamespace=NS_PUBSUB_NODE_CONFIG) + form.makeFields(options) + request.options = form + + def cb(iq): + try: + new_node = iq.pubsub.create["node"] + except AttributeError: + # the suggested node identifier was accepted + new_node = nodeIdentifier + return new_node + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + + def deleteNode(self, service, nodeIdentifier, sender=None): + """ + Delete a publish subscribe node. + + @param service: The publish subscribe service to delete the node from. + @type service: L{JID} + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + """ + request = PubSubRequest('delete') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.sender = sender + return request.send(self.xmlstream) + + + def subscribe(self, service, nodeIdentifier, subscriber, + options=None, sender=None): + """ + Subscribe to a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param subscriber: The entity to subscribe to the node. This entity + will get notifications of new published items. + @type subscriber: L{JID} + + @param options: Subscription options. + @type options: C{dict} + + @return: Deferred that fires with L{Subscription} or errbacks with + L{SubscriptionPending} or L{SubscriptionUnconfigured}. + @rtype: L{defer.Deferred} + """ + request = PubSubRequest('subscribe') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriber = subscriber + request.sender = sender + + if options: + form = data_form.Form(formType='submit', + formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) + form.makeFields(options) + request.options = form + + def cb(iq): + subscription = Subscription.fromElement(iq.pubsub.subscription) + + if subscription.state == 'pending': + raise SubscriptionPending() + elif subscription.state == 'unconfigured': + raise SubscriptionUnconfigured() + else: + # we assume subscription == 'subscribed' + # any other value would be invalid, but that should have + # yielded a stanza error. + return subscription + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + + def unsubscribe(self, service, nodeIdentifier, subscriber, + subscriptionIdentifier=None, sender=None): + """ + Unsubscribe from a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param subscriber: The entity to unsubscribe from the node. + @type subscriber: L{JID} + + @param subscriptionIdentifier: Optional subscription identifier. + @type subscriptionIdentifier: C{unicode} + """ + request = PubSubRequest('unsubscribe') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriber = subscriber + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + return request.send(self.xmlstream) + + + def publish(self, service, nodeIdentifier, items=None, sender=None): + """ + Publish to a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID} + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + @param items: Optional list of L{Item}s to publish. + @type items: C{list} + """ + request = PubSubRequest('publish') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.items = items + request.sender = sender + return request.send(self.xmlstream) + + + def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, + subscriptionIdentifier=None, sender=None): + """ + Retrieve previously published items from a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param maxItems: Optional limit on the number of retrieved items. + @type maxItems: C{int} + + @param itemIdentifiers: Identifiers of the items to be retrieved. + @type itemIdentifiers: C{set} + + @param subscriptionIdentifier: Optional subscription identifier. In + case the node has been subscribed to multiple times, this narrows + the results to the specific subscription. + @type subscriptionIdentifier: C{unicode} + """ + request = PubSubRequest('items') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + if maxItems: + request.maxItems = str(int(maxItems)) + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + request.itemIdentifiers = itemIdentifiers + + def cb(iq): + items = [] + for element in iq.pubsub.items.elements(): + if element.uri == NS_PUBSUB and element.name == 'item': + items.append(element) + return items + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None): + """ + Retract items from a publish subscribe node. + + @param service: The publish subscribe service to delete the node from. + @type service: L{JID} + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + @param itemIdentifiers: Identifiers of the items to be retracted. + @type itemIdentifiers: C{set} + """ + request = PubSubRequest('retract') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.itemIdentifiers = itemIdentifiers + request.sender = sender + return request.send(self.xmlstream) + + def getOptions(self, service, nodeIdentifier, subscriber, + subscriptionIdentifier=None, sender=None): + """ + Get subscription options. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param subscriber: The entity subscribed to the node. + @type subscriber: L{JID} + + @param subscriptionIdentifier: Optional subscription identifier. + @type subscriptionIdentifier: C{unicode} + + @rtype: L{data_form.Form} + """ + request = PubSubRequest('optionsGet') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriber = subscriber + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + + def cb(iq): + form = data_form.findForm(iq.pubsub.options, + NS_PUBSUB_SUBSCRIBE_OPTIONS) + form.typeCheck() + return form + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + + def setOptions(self, service, nodeIdentifier, subscriber, + options, subscriptionIdentifier=None, sender=None): + """ + Set subscription options. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param subscriber: The entity subscribed to the node. + @type subscriber: L{JID} + + @param options: Subscription options. + @type options: C{dict}. + + @param subscriptionIdentifier: Optional subscription identifier. + @type subscriptionIdentifier: C{unicode} + """ + request = PubSubRequest('optionsSet') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriber = subscriber + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + + form = data_form.Form(formType='submit', + formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) + form.makeFields(options) + request.options = form + + d = request.send(self.xmlstream) + return d + + + +class PubSubService(XMPPHandler, IQHandlerMixin): + """ + Protocol implementation for a XMPP Publish Subscribe Service. + + The word Service here is used as taken from the Publish Subscribe + specification. It is the party responsible for keeping nodes and their + subscriptions, and sending out notifications. + + Methods from the L{IPubSubService} interface that are called as a result + of an XMPP request may raise exceptions. Alternatively the deferred + returned by these methods may have their errback called. These are handled + as follows: + + - If the exception is an instance of L{error.StanzaError}, an error + response iq is returned. + - Any other exception is reported using L{log.msg}. An error response + with the condition C{internal-server-error} is returned. + + The default implementation of said methods raises an L{Unsupported} + exception and are meant to be overridden. + + @ivar discoIdentity: Service discovery identity as a dictionary with + keys C{'category'}, C{'type'} and C{'name'}. + @ivar pubSubFeatures: List of supported publish-subscribe features for + service discovery, as C{str}. + @type pubSubFeatures: C{list} or C{None} + """ + + implements(IPubSubService, disco.IDisco) + + iqHandlers = { + '/*': '_onPubSubRequest', + } + + _legacyHandlers = { + 'publish': ('publish', ['sender', 'recipient', + 'nodeIdentifier', 'items']), + 'subscribe': ('subscribe', ['sender', 'recipient', + 'nodeIdentifier', 'subscriber']), + 'unsubscribe': ('unsubscribe', ['sender', 'recipient', + 'nodeIdentifier', 'subscriber']), + 'subscriptions': ('subscriptions', ['sender', 'recipient']), + 'affiliations': ('affiliations', ['sender', 'recipient']), + 'create': ('create', ['sender', 'recipient', 'nodeIdentifier']), + 'getConfigurationOptions': ('getConfigurationOptions', []), + 'default': ('getDefaultConfiguration', + ['sender', 'recipient', 'nodeType']), + 'configureGet': ('getConfiguration', ['sender', 'recipient', + 'nodeIdentifier']), + 'configureSet': ('setConfiguration', ['sender', 'recipient', + 'nodeIdentifier', 'options']), + 'items': ('items', ['sender', 'recipient', 'nodeIdentifier', + 'maxItems', 'itemIdentifiers']), + 'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier', + 'itemIdentifiers']), + 'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']), + 'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']), + } + + _request_class = PubSubRequest + + hideNodes = False + + def __init__(self, resource=None): + self.resource = resource + self.discoIdentity = {'category': 'pubsub', + 'type': 'service', + 'name': 'Generic Publish-Subscribe Service'} + + self.pubSubFeatures = [] + + + def connectionMade(self): + self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest) + + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + def toInfo(nodeInfo): + if not nodeInfo: + return + + (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data'] + info.append(disco.DiscoIdentity('pubsub', nodeType)) + if metaData: + form = data_form.Form(formType="result", + formNamespace=NS_PUBSUB_META_DATA) + form.addField( + data_form.Field( + var='pubsub#node_type', + value=nodeType, + label='The type of node (collection or leaf)' + ) + ) + + for metaDatum in metaData: + form.addField(data_form.Field.fromDict(metaDatum)) + + info.append(form) + + return + + info = [] + + request = PubSubRequest('discoInfo') + + if self.resource is not None: + resource = self.resource.locateResource(request) + identity = resource.discoIdentity + features = resource.features + getInfo = resource.getInfo + else: + category = self.discoIdentity['category'] + idType = self.discoIdentity['type'] + name = self.discoIdentity['name'] + identity = disco.DiscoIdentity(category, idType, name) + features = self.pubSubFeatures + getInfo = self.getNodeInfo + + if not nodeIdentifier: + info.append(identity) + info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS)) + info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature)) + for feature in features]) + + d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '') + d.addCallback(toInfo) + d.addErrback(log.err) + d.addCallback(lambda _: info) + return d + + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + if self.hideNodes: + d = defer.succeed([]) + elif self.resource is not None: + request = PubSubRequest('discoInfo') + resource = self.resource.locateResource(request) + d = resource.getNodes(requestor, target, nodeIdentifier) + elif nodeIdentifier: + d = self.getNodes(requestor, target) + else: + d = defer.succeed([]) + + d.addCallback(lambda nodes: [disco.DiscoItem(target, node) + for node in nodes]) + return d + + + def _onPubSubRequest(self, iq): + request = self._request_class.fromElement(iq) + + if self.resource is not None: + resource = self.resource.locateResource(request) + else: + resource = self + + # Preprocess the request, knowing the handling resource + try: + preProcessor = getattr(self, '_preProcess_%s' % request.verb) + except AttributeError: + pass + else: + request = preProcessor(resource, request) + if request is None: + return defer.succeed(None) + + # Process the request itself, + if resource is not self: + try: + handler = getattr(resource, request.verb) + except AttributeError: + text = "Request verb: %s" % request.verb + return defer.fail(Unsupported('', text)) + + d = handler(request) + else: + try: + handlerName, argNames = self._legacyHandlers[request.verb] + except KeyError: + text = "Request verb: %s" % request.verb + return defer.fail(Unsupported('', text)) + + handler = getattr(self, handlerName) + args = [getattr(request, arg) for arg in argNames] + d = handler(*args) + + # If needed, translate the result into a response + try: + cb = getattr(self, '_toResponse_%s' % request.verb) + except AttributeError: + pass + else: + d.addCallback(cb, resource, request) + + return d + + + def _toResponse_subscribe(self, result, resource, request): + response = domish.Element((NS_PUBSUB, "pubsub")) + response.addChild(result.toElement(NS_PUBSUB)) + return response + + + def _toResponse_subscriptions(self, result, resource, request): + response = domish.Element((NS_PUBSUB, 'pubsub')) + subscriptions = response.addElement('subscriptions') + for subscription in result: + subscriptions.addChild(subscription.toElement(NS_PUBSUB)) + return response + + + def _toResponse_affiliations(self, result, resource, request): + response = domish.Element((NS_PUBSUB, 'pubsub')) + affiliations = response.addElement('affiliations') + + for nodeIdentifier, affiliation in result: + item = affiliations.addElement('affiliation') + item['node'] = nodeIdentifier + item['affiliation'] = affiliation + + return response + + + def _toResponse_create(self, result, resource, request): + if not request.nodeIdentifier or request.nodeIdentifier != result: + response = domish.Element((NS_PUBSUB, 'pubsub')) + create = response.addElement('create') + create['node'] = result + return response + else: + return None + + + def _formFromConfiguration(self, resource, values): + fieldDefs = resource.getConfigurationOptions() + form = data_form.Form(formType="form", + formNamespace=NS_PUBSUB_NODE_CONFIG) + form.makeFields(values, fieldDefs) + return form + + + def _checkConfiguration(self, resource, form): + fieldDefs = resource.getConfigurationOptions() + form.typeCheck(fieldDefs, filterUnknown=True) + + + def _preProcess_create(self, resource, request): + if request.options: + self._checkConfiguration(resource, request.options) + return request + + + def _preProcess_default(self, resource, request): + if request.nodeType not in ('leaf', 'collection'): + raise error.StanzaError('not-acceptable') + else: + return request + + + def _toResponse_default(self, options, resource, request): + response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) + default = response.addElement("default") + form = self._formFromConfiguration(resource, options) + default.addChild(form.toElement()) + return response + + + def _toResponse_configureGet(self, options, resource, request): + response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) + configure = response.addElement("configure") + form = self._formFromConfiguration(resource, options) + configure.addChild(form.toElement()) + + if request.nodeIdentifier: + configure["node"] = request.nodeIdentifier + + return response + + + def _preProcess_configureSet(self, resource, request): + if request.options.formType == 'cancel': + return None + else: + self._checkConfiguration(resource, request.options) + return request + + + def _toResponse_items(self, result, resource, request): + response = domish.Element((NS_PUBSUB, 'pubsub')) + items = response.addElement('items') + items["node"] = request.nodeIdentifier + + for item in result: + if item.name == 'item': + item.uri = NS_PUBSUB + items.addChild(item) + + return response + + + def _createNotification(self, eventType, service, nodeIdentifier, + subscriber, subscriptions=None): + headers = [] + + if subscriptions: + for subscription in subscriptions: + if nodeIdentifier != subscription.nodeIdentifier: + headers.append(('Collection', subscription.nodeIdentifier)) + + message = domish.Element((None, "message")) + message["from"] = service.full() + message["to"] = subscriber.full() + event = message.addElement((NS_PUBSUB_EVENT, "event")) + + element = event.addElement(eventType) + element["node"] = nodeIdentifier + + if headers: + message.addChild(shim.Headers(headers)) + + return message + + + def _toResponse_affiliationsGet(self, result, resource, request): + response = domish.Element((NS_PUBSUB_OWNER, 'pubsub')) + affiliations = response.addElement('affiliations') + + if request.nodeIdentifier: + affiliations['node'] = request.nodeIdentifier + + for entity, affiliation in result.iteritems(): + item = affiliations.addElement('affiliation') + item['jid'] = entity.full() + item['affiliation'] = affiliation + + return response + + + # public methods + + def notifyPublish(self, service, nodeIdentifier, notifications): + for subscriber, subscriptions, items in notifications: + message = self._createNotification('items', service, + nodeIdentifier, subscriber, + subscriptions) + for item in items: + item.uri = NS_PUBSUB_EVENT + message.event.items.addChild(item) + self.send(message) + + + def notifyDelete(self, service, nodeIdentifier, subscribers, + redirectURI=None): + for subscriber in subscribers: + message = self._createNotification('delete', service, + nodeIdentifier, + subscriber) + if redirectURI: + redirect = message.event.delete.addElement('redirect') + redirect['uri'] = redirectURI + self.send(message) + + + def getNodeInfo(self, requestor, service, nodeIdentifier): + return None + + + def getNodes(self, requestor, service): + return [] + + + def publish(self, requestor, service, nodeIdentifier, items): + raise Unsupported('publish') + + + def subscribe(self, requestor, service, nodeIdentifier, subscriber): + raise Unsupported('subscribe') + + + def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): + raise Unsupported('subscribe') + + + def subscriptions(self, requestor, service): + raise Unsupported('retrieve-subscriptions') + + + def affiliations(self, requestor, service): + raise Unsupported('retrieve-affiliations') + + + def create(self, requestor, service, nodeIdentifier): + raise Unsupported('create-nodes') + + + def getConfigurationOptions(self): + return {} + + + def getDefaultConfiguration(self, requestor, service, nodeType): + raise Unsupported('retrieve-default') + + + def getConfiguration(self, requestor, service, nodeIdentifier): + raise Unsupported('config-node') + + + def setConfiguration(self, requestor, service, nodeIdentifier, options): + raise Unsupported('config-node') + + + def items(self, requestor, service, nodeIdentifier, maxItems, + itemIdentifiers): + raise Unsupported('retrieve-items') + + + def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): + raise Unsupported('retract-items') + + + def purge(self, requestor, service, nodeIdentifier): + raise Unsupported('purge-nodes') + + + def delete(self, requestor, service, nodeIdentifier): + raise Unsupported('delete-nodes') + + + +class PubSubResource(object): + + implements(IPubSubResource) + + features = [] + discoIdentity = disco.DiscoIdentity('pubsub', + 'service', + 'Publish-Subscribe Service') + + + def locateResource(self, request): + return self + + + def getInfo(self, requestor, service, nodeIdentifier): + return defer.succeed(None) + + + def getNodes(self, requestor, service, nodeIdentifier): + return defer.succeed([]) + + + def getConfigurationOptions(self): + return {} + + + def publish(self, request): + return defer.fail(Unsupported('publish')) + + + def subscribe(self, request): + return defer.fail(Unsupported('subscribe')) + + + def unsubscribe(self, request): + return defer.fail(Unsupported('subscribe')) + + + def subscriptions(self, request): + return defer.fail(Unsupported('retrieve-subscriptions')) + + + def affiliations(self, request): + return defer.fail(Unsupported('retrieve-affiliations')) + + + def create(self, request): + return defer.fail(Unsupported('create-nodes')) + + + def default(self, request): + return defer.fail(Unsupported('retrieve-default')) + + + def configureGet(self, request): + return defer.fail(Unsupported('config-node')) + + + def configureSet(self, request): + return defer.fail(Unsupported('config-node')) + + + def items(self, request): + return defer.fail(Unsupported('retrieve-items')) + + + def retract(self, request): + return defer.fail(Unsupported('retract-items')) + + + def purge(self, request): + return defer.fail(Unsupported('purge-nodes')) + + + def delete(self, request): + return defer.fail(Unsupported('delete-nodes')) + + + def affiliationsGet(self, request): + return defer.fail(Unsupported('modify-affiliations')) + + + def affiliationsSet(self, request): + return defer.fail(Unsupported('modify-affiliations')) + + + def subscriptionsGet(self, request): + return defer.fail(Unsupported('manage-subscriptions')) + + + def subscriptionsSet(self, request): + return defer.fail(Unsupported('manage-subscriptions')) diff -r 584d45bb36d9 -r f71a0fc26886 src/tmp/wokkel/rsm.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/tmp/wokkel/rsm.py Wed Mar 18 10:52:28 2015 +0100 @@ -0,0 +1,378 @@ +# -*- test-case-name: wokkel.test.test_rsm -*- +# +# Copyright (c) Adrien Cossa. +# See LICENSE for details. + +""" +XMPP Result Set Management protocol. + +This protocol is specified in +U{XEP-0059}. +""" + +from twisted.words.xish import domish + +import pubsub +import copy + + +# RSM namespace +NS_RSM = 'http://jabber.org/protocol/rsm' + + +class RSMError(Exception): + """ + RSM error. + """ + + +class RSMNotFoundError(Exception): + """ + An expected RSM element has not been found. + """ + + +class RSMRequest(): + """ + A Result Set Management request. + + @ivar max: limit on the number of retrieved items. + @itype max: C{int} or C{unicode} + + @ivar index: starting index of the requested page. + @itype index: C{int} or C{unicode} + + @ivar after: ID of the element immediately preceding the page. + @itype after: C{unicode} + + @ivar before: ID of the element immediately following the page. + @itype before: C{unicode} + """ + + max = 10 + index = None + after = None + before = None + + def __init__(self, max=None, index=None, after=None, before=None): + if max is not None: + max = int(max) + assert(max >= 0) + self.max = max + + if index is not None: + assert(after is None and before is None) + index = int(index) + assert(index >= 0) + self.index = index + + if after is not None: + assert(before is None) + assert(isinstance(after, unicode)) + self.after = after + + if before is not None: + assert(isinstance(before, unicode)) + self.before = before + + @classmethod + def parse(cls, element): + """Parse the given request element. + + @param element: request containing a set element. + @type element: L{domish.Element} + + @return: RSMRequest instance. + @rtype: L{RSMRequest} + """ + try: + set_elt = domish.generateElementsQNamed(element.elements(), + name="set", + uri=NS_RSM).next() + except StopIteration: + raise RSMNotFoundError() + + request = RSMRequest() + for elt in list(set_elt.elements()): + if elt.name in ('before', 'after'): + setattr(request, elt.name, ''.join(elt.children)) + elif elt.name in ('max', 'index'): + setattr(request, elt.name, int(''.join(elt.children))) + + if request.max is None: + raise RSMError("RSM request is missing its 'max' element") + + return request + + def toElement(self): + """ + Return the DOM representation of this RSM request. + + @rtype: L{domish.Element} + """ + set_elt = domish.Element((NS_RSM, 'set')) + set_elt.addElement('max').addContent(unicode(self.max)) + + if self.index is not None: + set_elt.addElement('index').addContent(unicode(self.index)) + + if self.before is not None: + if self.before == '': # request the last page + set_elt.addElement('before') + else: + set_elt.addElement('before').addContent(self.before) + + if self.after is not None: + set_elt.addElement('after').addContent(self.after) + + return set_elt + + def render(self, element): + """Embed the DOM representation of this RSM request in the given element. + + @param element: Element to contain the RSM request. + @type element: L{domish.Element} + + @return: RSM request element. + @rtype: L{domish.Element} + """ + if element.name == 'pubsub' and hasattr(element, 'items'): + element.items.attributes['max_items'] = unicode(self.max) + + set_elt = self.toElement() + element.addChild(set_elt) + + return set_elt + + +class RSMResponse(): + """ + A Result Set Management response. + + @ivar count: total number of items. + @itype count: C{int} + + @ivar index: starting index of the returned page. + @itype index: C{int} + + @ivar first: ID of the first element of the returned page. + @itype first: C{unicode} + + @ivar last: ID of the last element of the returned page. + @itype last: C{unicode} + """ + + count = 0 + index = None + first = None + last = None + + def __init__(self, count=None, index=None, first=None, last=None): + if count is not None: + assert(isinstance(count, int) and count >= 0) + self.count = count + + if index is not None: + assert(isinstance(index, int) and index >= 0) + self.index = index + assert(isinstance(first, unicode)) + self.first = first + assert(isinstance(last, unicode)) + self.last = last + else: + assert(first is None and last is None) + + @classmethod + def parse(cls, element): + """Parse the given response element. + + @param element: response element. + @type element: L{domish.Element} + + @return: RSMResponse instance. + @rtype: L{RSMResponse} + """ + try: + set_elt = domish.generateElementsQNamed(element.elements(), + name="set", + uri=NS_RSM).next() + except StopIteration: + return RSMNotFoundError() + + response = RSMResponse() + for elt in list(set_elt.elements()): + if elt.name in ('first', 'last'): + setattr(response, elt.name, ''.join(elt.children)) + if elt.name == 'first': + response.index = int(elt.getAttribute("index")) + elif elt.name == 'count': + response.count = int(''.join(elt.children)) + + if response.count is None: + raise RSMError("RSM response is missing its 'count' element") + + return response + + def toElement(self): + """ + Return the DOM representation of this RSM request. + + @rtype: L{domish.Element} + """ + set_elt = domish.Element((NS_RSM, 'set')) + set_elt.addElement('count').addContent(unicode(self.count)) + + if self.index is not None: + first_elt = set_elt.addElement('first') + first_elt.addContent(self.first) + first_elt['index'] = unicode(self.index) + + set_elt.addElement('last').addContent(self.last) + + return set_elt + + def render(self, element): + """Embed the DOM representation of this RSM response in the given element. + + @param element: Element to contain the RSM response. + @type element: L{domish.Element} + + @return: RSM request element. + @rtype: L{domish.Element} + """ + set_elt = self.toElement() + element.addChild(set_elt) + return set_elt + + def toDict(self): + """Return a dict representation of the object. + + @return: a dict of strings. + @rtype: C{dict} binding C{unicode} to C{unicode} + """ + result = {} + for attr in ('count', 'index', 'first', 'last'): + value = getattr(self, attr) + if value is not None: + result[attr] = unicode(value) + return result + + +class PubSubRequest(pubsub.PubSubRequest): + """PubSubRequest extension to handle RSM. + + @ivar rsm: RSM request instance. + @type rsm: L{RSMRequest} + """ + + rsm = None + + def __init__(self, verb=None): + pubsub.PubSubRequest.__init__(self, verb) + self._parameters = copy.deepcopy(pubsub.PubSubRequest._parameters) + self._parameters['items'].append('rsm') + + def _parse_rsm(self, verbElement): + try: + self.rsm = RSMRequest.parse(verbElement.parent) + except RSMNotFoundError: + self.rsm = None + + def _render_rsm(self, verbElement): + if self.rsm: + self.rsm.render(verbElement.parent) + + +class PubSubClient(pubsub.PubSubClient): + """PubSubClient extension to handle RSM.""" + + _rsm_responses = {} + + def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, + subscriptionIdentifier=None, sender=None, ext_data=None): + """ + Retrieve previously published items from a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param maxItems: Optional limit on the number of retrieved items. + @type maxItems: C{int} + + @param itemIdentifiers: Identifiers of the items to be retrieved. + @type itemIdentifiers: C{set} + + @param subscriptionIdentifier: Optional subscription identifier. In + case the node has been subscribed to multiple times, this narrows + the results to the specific subscription. + @type subscriptionIdentifier: C{unicode} + + @param ext_data: extension data. + @type ext_data: L{dict} + + @return: a Deferred that fires a C{list} of L{domish.Element}. + @rtype: L{defer.Deferred} + """ + request = PubSubRequest('items') # that's a rsm.PubSubRequest instance + request.recipient = service + request.nodeIdentifier = nodeIdentifier + if maxItems: + request.maxItems = str(int(maxItems)) + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + request.itemIdentifiers = itemIdentifiers + if ext_data and 'rsm' in ext_data: + request.rsm = ext_data['rsm'] + + def cb(iq): + items = [] + if iq.pubsub.items: + for element in iq.pubsub.items.elements(): + if element.uri == pubsub.NS_PUBSUB and element.name == 'item': + items.append(element) + + if request.rsm: + response = RSMResponse.parse(iq.pubsub) + if response is not None: + self._rsm_responses[ext_data['id']] = response + return items + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + def getRSMResponse(self, id): + """ + Post-retrieve the RSM response data after items retrieval is done. + + @param id: extension data ID + @type id: C{unicode} + + @return: dict representation of the RSM response. + @rtype: C{dict} of C{unicode} + """ + # This method exists to not modify the return value of self.items. + if id not in self._rsm_responses: + return {} + result = self._rsm_responses[id].toDict() + del self._rsm_responses[id] + return result + + +class PubSubService(pubsub.PubSubService): + """PubSubService extension to handle RSM.""" + + _request_class = PubSubRequest + + def _toResponse_items(self, result, resource, request): + response = pubsub.PubSubService._toResponse_items(self, result, + resource, request) + set_elts = [elt for elt in result if elt.name == 'set'] + if set_elts: + assert(len(set_elts) == 1) + response.addChild(set_elts[0]) + + return response