changeset 1420:7c0acb966fd6

plugins groupblog, xep-0060: first pass of simplification
author Goffi <goffi@goffi.org>
date Wed, 22 Apr 2015 20:21:55 +0200
parents be2df1ddea8e
children 16b1ba7ccaaa
files src/plugins/plugin_misc_groupblog.py src/plugins/plugin_xep_0060.py
diffstat 2 files changed, 133 insertions(+), 139 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_misc_groupblog.py	Wed Apr 22 18:30:28 2015 +0200
+++ b/src/plugins/plugin_misc_groupblog.py	Wed Apr 22 20:21:55 2015 +0200
@@ -143,6 +143,8 @@
 
         host.trigger.add("PubSubItemsReceived", self.pubSubItemsReceivedTrigger)
 
+    ## plugin management methods ##
+
     def getHandler(self, profile):
         return GroupBlog_handler()
 
@@ -212,6 +214,8 @@
             return False
         return True
 
+    ## internal helping methodes ##
+
     def _handleCommentsItems(self, items, service, node_identifier):
         """ Convert comments items to groupblog data, and send them as signals
 
@@ -268,6 +272,8 @@
         """
         return NS_NODE_PREFIX + publisher.userhost()
 
+    ## publish ##
+
     def _publishMblog(self, service, client, access_type, access_list, message, extra):
         """Actually publish the message on the group blog
 
@@ -291,7 +297,7 @@
 
         if extra.get('allow_comments', 'False').lower() == 'true':
             # XXX: use the item identifier? http://bugs.goffi.org/show_bug.cgi?id=63
-            comments_node = self.__fillCommentsElement(mblog_data, None, node_name, service)
+            comments_node = self._fillCommentsElement(mblog_data, None, node_name, service)
             _options = {P.OPT_ACCESS_MODEL: access_model_value,
                         P.OPT_PERSIST_ITEMS: 1,
                         P.OPT_MAX_ITEMS: -1,
@@ -335,7 +341,7 @@
         entry_d.addCallback(itemCreated)
         return entry_d
 
-    def __fillCommentsElement(self, mblog_data, entry_id, node_name, service_jid):
+    def _fillCommentsElement(self, mblog_data, entry_id, node_name, service_jid):
         """
         @param mblog_data: dict containing the microblog data
         @param entry_id: unique identifier of the entry
@@ -356,6 +362,7 @@
 
     def sendGroupBlog(self, access_type, access_list, message, extra, profile_key=C.PROF_KEY_NONE):
         """Publish a microblog with given item access
+
         @param access_type: one of "PUBLIC", "GROUP", "JID"
         @param access_list: list of authorized entity (empty list for PUBLIC ACCESS,
                             list of groups or list of jids) for this item
@@ -385,80 +392,6 @@
 
         return self._initialise(profile_key).addCallback(initialised)
 
-    def deleteGroupBlog(self, pub_data, comments, profile_key=C.PROF_KEY_NONE):
-        """Delete a microblog item from a node.
-        @param pub_data: a tuple (service, node identifier, item identifier)
-        @param comments: comments node identifier (for main item) or empty string
-        @param profile_key: %(doc_profile_key)s
-        """
-
-        def initialised(result):
-            profile, client = result
-            service, node, item_id = pub_data
-            service_jid = jid.JID(service) if service else client.item_access_pubsub
-            if comments or not node:  # main item
-                node = self.getNodeName(client.jid)
-            if comments:
-                # remove the associated comments node
-                comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments)
-                d = self.host.plugins["XEP-0060"].deleteNode(comments_service, comments_node, profile_key=profile)
-                d.addErrback(lambda failure: log.error(u"Deletion of node %s failed: %s" % (comments_node, failure.getErrorMessage())))
-            # remove the item itself
-            d = self.host.plugins["XEP-0060"].retractItems(service_jid, node, [item_id], profile_key=profile)
-            d.addErrback(lambda failure: log.error(u"Deletion of item %s from %s failed: %s" % (item_id, node, failure.getErrorMessage())))
-            return d
-
-        def notify(d):
-            # TODO: this works only on the same host, and notifications for item deletion should be
-            # implemented according to http://xmpp.org/extensions/xep-0060.html#publisher-delete-success-notify
-            # instead. The notification mechanism implemented in sat_pubsub and wokkel have apriori
-            # a problem with retrieving the subscriptions, or something else.
-            service, node, item_id = pub_data
-            publisher = self.host.getJidNStream(profile_key)[0]
-            profile = self.host.memory.getProfileName(profile_key)
-            gbdatum = {'id': item_id, 'type': 'main_item' if (comments or not node) else 'comment'}
-            self.host.bridge.personalEvent(publisher.full(), "MICROBLOG_DELETE", gbdatum, profile)
-            return d
-
-        return self._initialise(profile_key).addCallback(initialised).addCallback(notify)
-
-    def updateGroupBlog(self, pub_data, comments, message, extra, profile_key=C.PROF_KEY_NONE):
-        """Modify a microblog node
-        @param pub_data: a tuple (service, node identifier, item identifier)
-        @param comments: comments node identifier (for main item) or empty string
-        @param message: new message
-        @param extra: dict which option name as key, which can be:
-            - allow_comments: True to accept an other level of comments, False else (default: False)
-            - rich: if present, contain rich text in currently selected syntax
-        @param profile_key: %(doc_profile)
-        """
-
-        def initialised(result):
-            profile, client = result
-            mblog_data = {'content': message}
-            for attr in ['content_rich', 'title', 'title_rich']:
-                if attr in extra and extra[attr]:
-                    mblog_data[attr] = extra[attr]
-            service, node, item_id = pub_data
-            service_jid = jid.JID(service) if service else client.item_access_pubsub
-            if comments or not node:  # main item
-                node = self.getNodeName(client.jid)
-            mblog_data['id'] = unicode(item_id)
-            if 'published' in extra:
-                mblog_data['published'] = extra['published']
-            if extra.get('allow_comments', 'False').lower() == 'true':
-                comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments)
-                # we could use comments_node directly but it's safer to rebuild it
-                # XXX: use the item identifier? http://bugs.goffi.org/show_bug.cgi?id=63
-                entry_id = comments_node.split('_')[1].split('__')[0]
-                self.__fillCommentsElement(mblog_data, entry_id, node, service_jid)
-            entry_d = self.host.plugins["XEP-0277"].data2entry(mblog_data, profile)
-            entry_d.addCallback(lambda mblog_item: self.host.plugins["XEP-0060"].publish(service_jid, node, items=[mblog_item], profile_key=profile))
-            entry_d.addErrback(lambda failure: log.error(u"Modification of %s failed: %s" % (pub_data, failure.getErrorMessage())))
-            return entry_d
-
-        return self._initialise(profile_key).addCallback(initialised)
-
     def sendGroupBlogComment(self, node_url, message, extra, profile_key=C.PROF_KEY_NONE):
         """Publish a comment in the given node
         @param node url: link to the comments node as specified in XEP-0277 and given in microblog data's comments key
@@ -519,6 +452,48 @@
             d_list.append(self.item2gbdata(item).addCallback(cb))
         return defer.DeferredList(d_list, consumeErrors=True).addCallback(lambda result: [value for (success, value) in result if success])
 
+    ## modify ##
+
+    def updateGroupBlog(self, pub_data, comments, message, extra, profile_key=C.PROF_KEY_NONE):
+        """Modify a microblog node
+
+        @param pub_data: a tuple (service, node identifier, item identifier)
+        @param comments: comments node identifier (for main item) or empty string
+        @param message: new message
+        @param extra: dict which option name as key, which can be:
+            - allow_comments: True to accept an other level of comments, False else (default: False)
+            - rich: if present, contain rich text in currently selected syntax
+        @param profile_key: %(doc_profile)
+        """
+
+        def initialised(result):
+            profile, client = result
+            mblog_data = {'content': message}
+            for attr in ['content_rich', 'title', 'title_rich']:
+                if attr in extra and extra[attr]:
+                    mblog_data[attr] = extra[attr]
+            service, node, item_id = pub_data
+            service_jid = jid.JID(service) if service else client.item_access_pubsub
+            if comments or not node:  # main item
+                node = self.getNodeName(client.jid)
+            mblog_data['id'] = unicode(item_id)
+            if 'published' in extra:
+                mblog_data['published'] = extra['published']
+            if extra.get('allow_comments', 'False').lower() == 'true':
+                comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments)
+                # we could use comments_node directly but it's safer to rebuild it
+                # XXX: use the item identifier? http://bugs.goffi.org/show_bug.cgi?id=63
+                entry_id = comments_node.split('_')[1].split('__')[0]
+                self._fillCommentsElement(mblog_data, entry_id, node, service_jid)
+            entry_d = self.host.plugins["XEP-0277"].data2entry(mblog_data, profile)
+            entry_d.addCallback(lambda mblog_item: self.host.plugins["XEP-0060"].publish(service_jid, node, items=[mblog_item], profile_key=profile))
+            entry_d.addErrback(lambda failure: log.error(u"Modification of %s failed: %s" % (pub_data, failure.getErrorMessage())))
+            return entry_d
+
+        return self._initialise(profile_key).addCallback(initialised)
+
+    ## get ##
+
     def _getOrCountComments(self, items, max=0, profile_key=C.PROF_KEY_NONE):
         """Get and/or count the comments of the given items.
 
@@ -553,7 +528,7 @@
         deferred_list.addCallback(lambda result: [value for (success, value) in result if success])
         return deferred_list
 
-    def __getGroupBlogs(self, pub_jid_s, item_ids=None, rsm=None, max_comments=0, profile_key=C.PROF_KEY_NONE):
+    def _getGroupBlogs(self, pub_jid_s, item_ids=None, rsm=None, max_comments=0, profile_key=C.PROF_KEY_NONE):
         """Retrieve previously published items from a publish subscribe node.
 
         @param pub_jid_s: jid of the publisher
@@ -594,7 +569,7 @@
             - RSM response data
         """
         max_comments = 0 if count_comments else DO_NOT_COUNT_COMMENTS
-        return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key)
+        return self._getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key)
 
     def getGroupBlogsWithComments(self, pub_jid_s, item_ids=None, rsm=None, max_comments=None, profile_key=C.PROF_KEY_NONE):
         """Get the published microblogs of the specified IDs and their comments. If
@@ -614,7 +589,7 @@
         if max_comments is None:
             max_comments = MAX_COMMENTS
         assert(max_comments > 0)  # otherwise the return signature is not the same
-        return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key)
+        return self._getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key)
 
     def getGroupBlogsAtom(self, pub_jid_s, rsm=None, profile_key=C.PROF_KEY_NONE):
         """Get the atom feed of the last published microblogs
@@ -719,6 +694,8 @@
         #TODO: we need to use the server corresponding to the host of the jid
         return DeferredItemsFromMany(self, cb, profile_key).get(publishers_type, publishers, rsm=rsm)
 
+    ## subscribe ##
+
     def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE):
         def initialised(result):
             profile, client = result
@@ -752,6 +729,46 @@
         yield defer.DeferredList(d_list, consumeErrors=False)
         defer.returnValue(None)
 
+    ## delete ##
+
+    def deleteGroupBlog(self, pub_data, comments, profile_key=C.PROF_KEY_NONE):
+        """Delete a microblog item from a node.
+
+        @param pub_data: a tuple (service, node identifier, item identifier)
+        @param comments: comments node identifier (for main item) or empty string
+        @param profile_key: %(doc_profile_key)s
+        """
+
+        def initialised(result):
+            profile, client = result
+            service, node, item_id = pub_data
+            service_jid = jid.JID(service) if service else client.item_access_pubsub
+            if comments or not node:  # main item
+                node = self.getNodeName(client.jid)
+            if comments:
+                # remove the associated comments node
+                comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments)
+                d = self.host.plugins["XEP-0060"].deleteNode(comments_service, comments_node, profile_key=profile)
+                d.addErrback(lambda failure: log.error(u"Deletion of node %s failed: %s" % (comments_node, failure.getErrorMessage())))
+            # remove the item itself
+            d = self.host.plugins["XEP-0060"].retractItems(service_jid, node, [item_id], profile_key=profile)
+            d.addErrback(lambda failure: log.error(u"Deletion of item %s from %s failed: %s" % (item_id, node, failure.getErrorMessage())))
+            return d
+
+        def notify(d):
+            # TODO: this works only on the same host, and notifications for item deletion should be
+            # implemented according to http://xmpp.org/extensions/xep-0060.html#publisher-delete-success-notify
+            # instead. The notification mechanism implemented in sat_pubsub and wokkel have apriori
+            # a problem with retrieving the subscriptions, or something else.
+            service, node, item_id = pub_data
+            publisher = self.host.getJidNStream(profile_key)[0]
+            profile = self.host.memory.getProfileName(profile_key)
+            gbdatum = {'id': item_id, 'type': 'main_item' if (comments or not node) else 'comment'}
+            self.host.bridge.personalEvent(publisher.full(), "MICROBLOG_DELETE", gbdatum, profile)
+            return d
+
+        return self._initialise(profile_key).addCallback(initialised).addCallback(notify)
+
     def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE):
         """Delete absolutely all the microblog data that the user has posted"""
         calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)]
@@ -865,9 +882,10 @@
 
         return self._initialise(profile_key).addCallback(initialised)
 
+## helper classes to manipulate items ##
 
 class DeferredItems():
-    """Helper class to retrieve items using XEP-0060"""
+    """Retrieve items using XEP-0060"""
 
     def __init__(self, parent, cb, eb=None, profile_key=C.PROF_KEY_NONE):
         """
@@ -883,6 +901,7 @@
 
     def get(self, node, item_ids=None, sub_id=None, rsm=None):
         """
+
         @param node (str): node identifier.
         @param item_ids (list[str]): list of items identifiers.
         @param sub_id (str): optional subscription identifier.
@@ -898,7 +917,7 @@
             profile, client = result
             rsm_ = wokkel_rsm.RSMRequest(**rsm)
             d = self.parent.host.plugins["XEP-0060"].getItems(client.item_access_pubsub,
-                                                              node, rsm_.max,
+                                                              node, rsm_.max_,
                                                               item_ids, sub_id, rsm_,
                                                               profile_key=profile)
 
@@ -924,7 +943,7 @@
         self.cb = cb
         self.profile_key = profile_key
 
-    def __buildData(self, publishers_type, publishers, client):
+    def _buildData(self, publishers_type, publishers, client):
         jids = self.parent._getPublishersJIDs(publishers_type, publishers, client)
         return {publisher: self.parent.getNodeName(publisher) for publisher in jids}
 
@@ -946,7 +965,7 @@
         def initialised(result):
             profile, client = result
 
-            data = self.__buildData(publishers_type, publishers, client)
+            data = self._buildData(publishers_type, publishers, client)
             rsm_ = wokkel_rsm.RSMRequest(**rsm)
             d = self.parent.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub,
                                                                       data, rsm_.max, sub_id,
--- a/src/plugins/plugin_xep_0060.py	Wed Apr 22 18:30:28 2015 +0200
+++ b/src/plugins/plugin_xep_0060.py	Wed Apr 22 20:21:55 2015 +0200
@@ -58,46 +58,21 @@
         log.info(_(u"PubSub plugin initialization"))
         self.host = host
         self.managedNodes = []
-        self.clients = {}
         self.node_cache = Sessions(timeout=60, resettable_timeout=False)
 
     def getHandler(self, profile):
-        self.clients[profile] = SatPubSubClient(self.host, self)
-        return self.clients[profile]
-
-    def profileDisconnected(self, profile):
-        try:
-            del self.clients[profile]
-        except KeyError:
-            pass
+        client = self.host.getClient(profile)
+        client.pubsub_client = SatPubSubClient(self.host, self)
+        return client.pubsub_client
 
     def addManagedNode(self, node_name, callback):
         """Add a handler for a namespace
+
         @param namespace: NS of the handler (will appear in disco info)
         @param callback: method to call when the handler is found
         @param profile: profile which manage this handler"""
         self.managedNodes.append((node_name, callback))
 
-    def __getClientNProfile(self, profile_key, action='do pusbsub'):
-        """Return a tuple of (client, profile)
-        raise error when the profile doesn't exists
-        @param profile_key: as usual :)
-        @param action: text of action to show in case of error"""
-        profile = self.host.memory.getProfileName(profile_key)
-        if not profile:
-            err_mess = _('Trying to %(action)s with an unknown profile key [%(profile_key)s]') % {
-                'action': action,
-                'profile_key': profile_key}
-            log.error(err_mess)
-            raise Exception(err_mess)
-        try:
-            client = self.clients[profile]
-        except KeyError:
-            err_mess = _('INTERNAL ERROR: no handler for required profile')
-            log.error(err_mess)
-            raise Exception(err_mess)
-        return profile, client
-
     def _getDeferredNodeCache(self, session_id, init, profile):
         """Manage a node cache with deferred initialisation and concurrent access.
 
@@ -146,7 +121,7 @@
         @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
         @param filter_ (str): filter the result according to the given subscription type:
             - None: do not filter
-            - 'pending': subscription has been approved yet by the node owner
+            - 'pending': subscription has not been approved yet by the node owner
             - 'unconfigured': subscription options have not been configured yet
             - 'subscribed': subscription is complete
         @param profile (str): %(doc_profile)s
@@ -158,8 +133,8 @@
         return self._getDeferredNodeCache(session_id, d, profile)
 
     def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
-        profile, client = self.__getClientNProfile(profile_key, 'publish item')
-        return client.publish(service, nodeIdentifier, items, client.parent.jid)
+        client = self.host.getClient(profile_key)
+        return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid)
 
     def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
         """Retrieve pubsub items from a node.
@@ -175,10 +150,10 @@
             - list of items
             - RSM response data
         """
-        profile, client = self.__getClientNProfile(profile_key, 'get items')
+        client = self.host.getClient(profile_key)
         ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None
-        d = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid, ext_data)
-        d.addCallback(lambda items: (items, client.getRSMResponse(ext_data['id']) if rsm else {}))
+        d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data)
+        d.addCallback(lambda items: (items, client.pubsub_client.getRSMResponse(ext_data['id']) if rsm else {}))
         return d
 
     @defer.inlineCallbacks
@@ -197,39 +172,39 @@
                 - list of items
                 - RSM response data
         """
-        profile, client = self.__getClientNProfile(profile_key, 'get items')
-        found_nodes = yield self.listNodes(service, profile=profile)
+        client = self.host.getClient(profile_key)
+        found_nodes = yield self.listNodes(service, profile=client.profile)
         d_dict = {}
         for publisher, node in data.items():
             if node not in found_nodes:
                 log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
                 continue  # avoid pubsub "item-not-found" error
-            d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, profile)
+            d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile)
         defer.returnValue(d_dict)
 
     def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
-        profile, client = self.__getClientNProfile(profile_key, 'get options')
-        return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier)
+        client = self.host.getClient(profile_key)
+        return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier)
 
     def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
-        profile, client = self.__getClientNProfile(profile_key, 'set options')
-        return client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier)
+        client = self.host.getClient(profile_key)
+        return client.pubsub_client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier)
 
     def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
-        profile, client = self.__getClientNProfile(profile_key, 'create node')
-        return client.createNode(service, nodeIdentifier, options)
+        client = self.host.getClient(profile_key)
+        return client.pubsub_client.createNode(service, nodeIdentifier, options)
 
     def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
-        profile, client = self.__getClientNProfile(profile_key, 'delete node')
-        return client.deleteNode(service, nodeIdentifier)
+        client = self.host.getClient(profile_key)
+        return client.pubsub_client.deleteNode(service, nodeIdentifier)
 
     def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE):
-        profile, client = self.__getClientNProfile(profile_key, 'retract items')
-        return client.retractItems(service, nodeIdentifier, itemIdentifiers)
+        client = self.host.getClient(profile_key)
+        return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers)
 
     def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
-        profile, client = self.__getClientNProfile(profile_key, 'subscribe node')
-        return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)
+        client = self.host.getClient(profile_key)
+        return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)
 
     @defer.inlineCallbacks
     def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
@@ -242,20 +217,20 @@
         @param profile_key (str): %(doc_profile_key)s
         @return: list of Deferred instances.
         """
-        profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes')
-        found_nodes = yield self.listNodes(service, profile=profile)
-        subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile)
+        client = self.host.getClient(profile_key)
+        found_nodes = yield self.listNodes(service, profile=client.profile)
+        subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
         d_list = []
         for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
             if nodeIdentifier not in found_nodes:
                 log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
                 continue  # avoid sat-pubsub "SubscriptionExists" error
-            d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options))
+            d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
         defer.returnValue(d_list)
 
     def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
-        profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions')
-        return client.subscriptions(service, nodeIdentifier)
+        client = self.host.getClient(profile_key)
+        return client.pubsub_client.subscriptions(service, nodeIdentifier)
 
 
 class SatPubSubClient(rsm.PubSubClient):