changeset 3584:edc79cefe968

plugin XEP-0060: `getItem(s)`, `publish` and `(un)subscribe` are now coroutines
author Goffi <goffi@goffi.org>
date Wed, 30 Jun 2021 16:19:14 +0200
parents 16ade4ad63f3
children 31628770a15a
files sat/plugins/plugin_exp_events.py sat/plugins/plugin_exp_invitation_pubsub.py sat/plugins/plugin_exp_list_of_interest.py sat/plugins/plugin_misc_forums.py sat/plugins/plugin_xep_0060.py sat/plugins/plugin_xep_0277.py sat/plugins/plugin_xep_0346.py sat/plugins/plugin_xep_0384.py
diffstat 8 files changed, 165 insertions(+), 161 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_exp_events.py	Sun Jun 27 00:15:40 2021 +0200
+++ b/sat/plugins/plugin_exp_events.py	Wed Jun 30 16:19:14 2021 +0200
@@ -212,8 +212,7 @@
             data["creator"] = True
         return timestamp, data
 
-    @defer.inlineCallbacks
-    def getEventElement(self, client, service, node, id_):
+    async def getEventElement(self, client, service, node, id_):
         """Retrieve event element
 
         @param service(jid.JID): pubsub service
@@ -224,23 +223,24 @@
         """
         if not id_:
             id_ = NS_EVENT
-        items, metadata = yield self._p.getItems(client, service, node, item_ids=[id_])
+        items, metadata = await self._p.getItems(client, service, node, item_ids=[id_])
         try:
             event_elt = next(items[0].elements(NS_EVENT, "event"))
         except StopIteration:
             raise exceptions.NotFound(_("No event element has been found"))
         except IndexError:
             raise exceptions.NotFound(_("No event with this id has been found"))
-        defer.returnValue(event_elt)
+        return event_elt
 
     def _eventGet(self, service, node, id_="", profile_key=C.PROF_KEY_NONE):
         service = jid.JID(service) if service else None
         node = node if node else NS_EVENT
         client = self.host.getClient(profile_key)
-        return self.eventGet(client, service, node, id_)
+        return defer.ensureDeferred(
+            self.eventGet(client, service, node, id_)
+        )
 
-    @defer.inlineCallbacks
-    def eventGet(self, client, service, node, id_=NS_EVENT):
+    async def eventGet(self, client, service, node, id_=NS_EVENT):
         """Retrieve event data
 
         @param service(unicode, None): PubSub service
@@ -253,9 +253,9 @@
                 image: URL of a picture to use to represent event
                 background-image: URL of a picture to use in background
         """
-        event_elt = yield self.getEventElement(client, service, node, id_)
+        event_elt = await self.getEventElement(client, service, node, id_)
 
-        defer.returnValue(self._parseEventElt(event_elt))
+        return self._parseEventElt(event_elt)
 
     def _eventCreate(
         self, timestamp, data, service, node, id_="", profile_key=C.PROF_KEY_NONE
@@ -436,10 +436,11 @@
         node = node if node else NS_EVENT
         client = self.host.getClient(profile_key)
         invitee_jid = jid.JID(invitee_jid_s) if invitee_jid_s else None
-        return self.eventInviteeGet(client, service, node, invitee_jid)
+        return defer.ensureDeferred(
+            self.eventInviteeGet(client, service, node, invitee_jid)
+        )
 
-    @defer.inlineCallbacks
-    def eventInviteeGet(self, client, service, node, invitee_jid=None):
+    async def eventInviteeGet(self, client, service, node, invitee_jid=None):
         """Retrieve attendance from event node
 
         @param service(unicode, None): PubSub service
@@ -452,28 +453,30 @@
         if invitee_jid is None:
             invitee_jid = client.jid
         try:
-            items, metadata = yield self._p.getItems(
+            items, metadata = await self._p.getItems(
                 client, service, node, item_ids=[invitee_jid.userhost()]
             )
             event_elt = next(items[0].elements(NS_EVENT, "invitee"))
         except (exceptions.NotFound, IndexError):
             # no item found, event data are not set yet
-            defer.returnValue({})
+            return {}
         data = {}
         for key in ("attend", "guests"):
             try:
                 data[key] = event_elt[key]
             except KeyError:
                 continue
-        defer.returnValue(data)
+        return data
 
     def _eventInviteeSet(self, service, node, event_data, profile_key):
         service = jid.JID(service) if service else None
         node = node if node else NS_EVENT
         client = self.host.getClient(profile_key)
-        return self.eventInviteeSet(client, service, node, event_data)
+        return defer.ensureDeferred(
+            self.eventInviteeSet(client, service, node, event_data)
+        )
 
-    def eventInviteeSet(self, client, service, node, data):
+    async def eventInviteeSet(self, client, service, node, data):
         """Set or update attendance data in event node
 
         @param service(unicode, None): PubSub service
@@ -490,16 +493,17 @@
             except KeyError:
                 pass
         item_elt = pubsub.Item(id=client.jid.userhost(), payload=event_elt)
-        return self._p.publish(client, service, node, items=[item_elt])
+        return await self._p.publish(client, service, node, items=[item_elt])
 
     def _eventInviteesList(self, service, node, profile_key):
         service = jid.JID(service) if service else None
         node = node if node else NS_EVENT
         client = self.host.getClient(profile_key)
-        return self.eventInviteesList(client, service, node)
+        return defer.ensureDeferred(
+            self.eventInviteesList(client, service, node)
+        )
 
-    @defer.inlineCallbacks
-    def eventInviteesList(self, client, service, node):
+    async def eventInviteesList(self, client, service, node):
         """Retrieve attendance from event node
 
         @param service(unicode, None): PubSub service
@@ -507,7 +511,7 @@
         @return (dict): a dict with current attendance status,
             an empty dict is returned if nothing has been answered yed
         """
-        items, metadata = yield self._p.getItems(client, service, node)
+        items, metadata = await self._p.getItems(client, service, node)
         invitees = {}
         for item in items:
             try:
@@ -525,7 +529,7 @@
                     except KeyError:
                         continue
                 invitees[item["id"]] = data
-        defer.returnValue(invitees)
+        return invitees
 
     async def invitePreflight(
         self,
--- a/sat/plugins/plugin_exp_invitation_pubsub.py	Sun Jun 27 00:15:40 2021 +0200
+++ b/sat/plugins/plugin_exp_invitation_pubsub.py	Wed Jun 30 16:19:14 2021 +0200
@@ -164,6 +164,6 @@
         if not name:
             name = extra.pop("name", "")
 
-        return self.host.plugins['LIST_INTEREST'].registerPubsub(
+        return await self.host.plugins['LIST_INTEREST'].registerPubsub(
             client, namespace, service, node, item_id, creator,
             name, element, extra)
--- a/sat/plugins/plugin_exp_list_of_interest.py	Sun Jun 27 00:15:40 2021 +0200
+++ b/sat/plugins/plugin_exp_list_of_interest.py	Wed Jun 30 16:19:14 2021 +0200
@@ -99,8 +99,7 @@
             if e.condition == "conflict":
                 log.debug(_("requested node already exists"))
 
-    @defer.inlineCallbacks
-    def registerPubsub(self, client, namespace, service, node, item_id=None,
+    async def registerPubsub(self, client, namespace, service, node, item_id=None,
                        creator=False, name=None, element=None, extra=None):
         """Register an interesting element in personal list
 
@@ -120,7 +119,7 @@
         """
         if extra is None:
             extra = {}
-        yield self.createNode(client)
+        await self.createNode(client)
         interest_elt = domish.Element((NS_LIST_INTEREST, "interest"))
         interest_elt["namespace"] = namespace
         if name is not None:
@@ -146,7 +145,7 @@
         interest_uri = uri.buildXMPPUri("pubsub", **uri_kwargs)
         # we use URI of the interest as item id to avoid duplicates
         item_elt = pubsub.Item(interest_uri, payload=interest_elt)
-        yield self._p.publish(
+        await self._p.publish(
             client, client.jid.userhostJID(), NS_LIST_INTEREST, items=[item_elt]
         )
 
@@ -258,12 +257,11 @@
         node = node or None
         namespace = namespace or None
         client = self.host.getClient(profile)
-        d = self.listInterests(client, service, node, namespace)
+        d = defer.ensureDeferred(self.listInterests(client, service, node, namespace))
         d.addCallback(self._listInterestsSerialise)
         return d
 
-    @defer.inlineCallbacks
-    def listInterests(self, client, service=None, node=None, namespace=None):
+    async def listInterests(self, client, service=None, node=None, namespace=None):
         """Retrieve list of interests
 
         @param service(jid.JID, None): service to use
@@ -277,7 +275,7 @@
         # TODO: if a MAM filter were available, it would improve performances
         if not node:
             node = NS_LIST_INTEREST
-        items, metadata = yield self._p.getItems(client, service, node)
+        items, metadata = await self._p.getItems(client, service, node)
         if namespace is not None:
             filtered_items = []
             for item in items:
@@ -291,7 +289,7 @@
                     filtered_items.append(item)
             items = filtered_items
 
-        defer.returnValue((items, metadata))
+        return (items, metadata)
 
     def _interestRetract(self, service_s, item_id, profile_key):
         d = self._p._retractItem(
--- a/sat/plugins/plugin_misc_forums.py	Sun Jun 27 00:15:40 2021 +0200
+++ b/sat/plugins/plugin_misc_forums.py	Wed Jun 30 16:19:14 2021 +0200
@@ -189,23 +189,22 @@
             service = None
         if not node.strip():
             node = None
-        d=self.get(client, service, node, forums_key or None)
+        d = defer.ensureDeferred(self.get(client, service, node, forums_key or None))
         d.addCallback(lambda data: json.dumps(data))
         return d
 
-    @defer.inlineCallbacks
-    def get(self, client, service=None, node=None, forums_key=None):
+    async def get(self, client, service=None, node=None, forums_key=None):
         if service is None:
             service = client.pubsub_service
         if node is None:
             node = NS_FORUMS
         if forums_key is None:
             forums_key = 'default'
-        items_data = yield self._p.getItems(client, service, node, item_ids=[forums_key])
+        items_data = await self._p.getItems(client, service, node, item_ids=[forums_key])
         item = items_data[0][0]
         # we have the item and need to convert it to json
         forums = self._parseForums(item)
-        defer.returnValue(forums)
+        return forums
 
     def _set(self, forums, service=None, node=None, forums_key=None, profile_key=C.PROF_KEY_NONE):
         client = self.host.getClient(profile_key)
@@ -216,10 +215,11 @@
             service = None
         if not node.strip():
             node = None
-        return self.set(client, forums, service, node, forums_key or None)
+        return defer.ensureDeferred(
+            self.set(client, forums, service, node, forums_key or None)
+        )
 
-    @defer.inlineCallbacks
-    def set(self, client, forums, service=None, node=None, forums_key=None):
+    async def set(self, client, forums, service=None, node=None, forums_key=None):
         """Create or replace forums structure
 
         @param forums(list): list of dictionary as follow:
@@ -242,25 +242,33 @@
             node = NS_FORUMS
         if forums_key is None:
             forums_key = 'default'
-        forums_elt = yield self._createForums(client, forums, service, node)
-        yield self._p.sendItem(client, service, node, forums_elt, item_id=forums_key)
+        forums_elt = await self._createForums(client, forums, service, node)
+        return await self._p.sendItem(
+            client, service, node, forums_elt, item_id=forums_key
+        )
 
     def _getTopics(self, service, node, extra=None, profile_key=C.PROF_KEY_NONE):
         client = self.host.getClient(profile_key)
         extra = self._p.parseExtra(extra)
-        d = self.getTopics(client, jid.JID(service), node, rsm_request=extra.rsm_request, extra=extra.extra)
+        d = defer.ensureDeferred(
+            self.getTopics(
+                client, jid.JID(service), node, rsm_request=extra.rsm_request,
+                extra=extra.extra
+            )
+        )
         d.addCallback(
             lambda topics_data: (topics_data[0], data_format.serialise(topics_data[1]))
         )
         return d
 
-    @defer.inlineCallbacks
-    def getTopics(self, client, service, node, rsm_request=None, extra=None):
+    async def getTopics(self, client, service, node, rsm_request=None, extra=None):
         """Retrieve topics data
 
         Topics are simple microblog URIs with some metadata duplicated from first post
         """
-        topics_data = yield self._p.getItems(client, service, node, rsm_request=rsm_request, extra=extra)
+        topics_data = await self._p.getItems(
+            client, service, node, rsm_request=rsm_request, extra=extra
+        )
         topics = []
         item_elts, metadata = topics_data
         for item_elt in item_elts:
@@ -270,7 +278,7 @@
                      'author': topic_elt['author'],
                      'title': str(title_elt)}
             topics.append(topic)
-        defer.returnValue((topics, metadata))
+        return (topics, metadata)
 
     def _createTopic(self, service, node, mb_data, profile_key):
         client = self.host.getClient(profile_key)
--- a/sat/plugins/plugin_xep_0060.py	Sun Jun 27 00:15:40 2021 +0200
+++ b/sat/plugins/plugin_xep_0060.py	Wed Jun 30 16:19:14 2021 +0200
@@ -17,7 +17,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
-from typing import Optional
+from typing import Optional, List, Tuple
 from collections import namedtuple
 import urllib.request, urllib.parse, urllib.error
 from functools import reduce
@@ -35,7 +35,7 @@
 from sat.core.i18n import _
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
-from sat.core.xmpp import SatXMPPEntity
+from sat.core.core_types import SatXMPPEntity
 from sat.core import exceptions
 from sat.tools import sat_defer
 from sat.tools import xml_tools
@@ -471,9 +471,9 @@
         service = None if not service else jid.JID(service)
         payload = xml_tools.parse(payload)
         extra = data_format.deserialise(extra_ser)
-        d = self.sendItem(
+        d = defer.ensureDeferred(self.sendItem(
             client, service, nodeIdentifier, payload, item_id or None, extra
-        )
+        ))
         d.addCallback(lambda ret: ret or "")
         return d
 
@@ -487,23 +487,13 @@
             raise exceptions.DataError(_("Can't parse items: {msg}").format(
                 msg=e))
         extra = data_format.deserialise(extra_ser)
-        d = self.sendItems(
+        return defer.ensureDeferred(self.sendItems(
             client, service, nodeIdentifier, items, extra
-        )
-        return d
-
-    def _getPublishedItemId(self, published_ids, original_id):
-        """Return item of published id if found in answer
+        ))
 
-        if not found original_id is returned, which may be None
-        """
-        try:
-            return published_ids[0]
-        except IndexError:
-            return original_id
-
-    def sendItem(self, client, service, nodeIdentifier, payload, item_id=None,
-                 extra=None):
+    async def sendItem(
+        self, client, service, nodeIdentifier, payload, item_id=None, extra=None
+    ):
         """High level method to send one item
 
         @param service(jid.JID, None): service to send the item to
@@ -519,15 +509,17 @@
         if item_id is not None:
             item_elt['id'] = item_id
         item_elt.addChild(payload)
-        d = defer.ensureDeferred(self.sendItems(
+        published_ids = await self.sendItems(
             client,
             service,
             nodeIdentifier,
             [item_elt],
             extra
-        ))
-        d.addCallback(self._getPublishedItemId, item_id)
-        return d
+        )
+        try:
+            return published_ids[0]
+        except IndexError:
+            return item_id
 
     async def sendItems(self, client, service, nodeIdentifier, items, extra=None):
         """High level method to send several items at once
@@ -593,8 +585,8 @@
         except AttributeError:
             return []
 
-    def publish(self, client, service, nodeIdentifier, items=None, options=None):
-        return client.pubsub_client.publish(
+    async def publish(self, client, service, nodeIdentifier, items=None, options=None):
+        return await client.pubsub_client.publish(
             service, nodeIdentifier, items, client.pubsub_client.parent.jid,
             options=options
         )
@@ -630,7 +622,7 @@
         service = jid.JID(service) if service else None
         max_items = None if max_items == C.NO_LIMIT else max_items
         extra = self.parseExtra(extra_dict)
-        d = self.getItems(
+        d = defer.ensureDeferred(self.getItems(
             client,
             service,
             node or None,
@@ -639,13 +631,22 @@
             sub_id or None,
             extra.rsm_request,
             extra.extra,
-        )
+        ))
         d.addCallback(self.transItemsData)
         d.addCallback(self.serialiseItems)
         return d
 
-    def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None,
-                 rsm_request=None, extra=None):
+    async def getItems(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        max_items: Optional[int] = None,
+        item_ids: Optional[List[str]] = None,
+        sub_id: Optional[str] = None,
+        rsm_request: Optional[rsm.RSMRequest] = None,
+        extra: Optional[dict] = None
+    ) -> Tuple[List[dict], dict]:
         """Retrieve pubsub items from a node.
 
         @param service (JID, None): pubsub service.
@@ -682,9 +683,10 @@
                 rsm_request = rsm_request
             )
             # we have no MAM data here, so we add None
-            d.addCallback(lambda data: data + (None,))
             d.addErrback(sat_defer.stanza2NotFound)
             d.addTimeout(TIMEOUT, reactor)
+            items, rsm_response = await d
+            mam_response = None
         else:
             # if mam is requested, we have to do a totally different query
             if self._mam is None:
@@ -706,61 +708,49 @@
                     raise exceptions.DataError(
                         "Conflict between RSM request and MAM's RSM request"
                     )
-            d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage)
+            items, rsm_response, mam_response = await self._mam.getArchives(
+                client, mam_query, service, self._unwrapMAMMessage
+            )
 
         try:
             subscribe = C.bool(extra["subscribe"])
         except KeyError:
             subscribe = False
 
-        def subscribeEb(failure, service, node):
-            failure.trap(error.StanzaError)
-            log.warning(
-                "Could not subscribe to node {} on service {}: {}".format(
-                    node, str(service), str(failure.value)
+        if subscribe:
+            try:
+                await self.subscribe(client, service, node)
+            except error.StanzaError as e:
+                log.warning(
+                    f"Could not subscribe to node {node} on service {service}: {e}"
                 )
-            )
-
-        def doSubscribe(data):
-            self.subscribe(client, service, node).addErrback(
-                subscribeEb, service, node
-            )
-            return data
-
-        if subscribe:
-            d.addCallback(doSubscribe)
 
-        def addMetadata(result):
-            # TODO: handle the third argument (mam_response)
-            items, rsm_response, mam_response = result
-            service_jid = service if service else client.jid.userhostJID()
-            metadata = {
-                "service": service_jid,
-                "node": node,
-                "uri": self.getNodeURI(service_jid, node),
-            }
-            if mam_response is not None:
-                # mam_response is a dict with "complete" and "stable" keys
-                # we can put them directly in metadata
-                metadata.update(mam_response)
-            if rsm_request is not None and rsm_response is not None:
-                metadata['rsm'] = rsm_response.toDict()
-                if mam_response is None:
-                    index = rsm_response.index
-                    count = rsm_response.count
-                    if index is None or count is None:
-                        # we don't have enough information to know if the data is complete
-                        # or not
-                        metadata["complete"] = None
-                    else:
-                        # normally we have a strict equality here but XEP-0059 states
-                        # that index MAY be approximative, so just in case…
-                        metadata["complete"] = index + len(items) >= count
+        # TODO: handle mam_response
+        service_jid = service if service else client.jid.userhostJID()
+        metadata = {
+            "service": service_jid,
+            "node": node,
+            "uri": self.getNodeURI(service_jid, node),
+        }
+        if mam_response is not None:
+            # mam_response is a dict with "complete" and "stable" keys
+            # we can put them directly in metadata
+            metadata.update(mam_response)
+        if rsm_request is not None and rsm_response is not None:
+            metadata['rsm'] = rsm_response.toDict()
+            if mam_response is None:
+                index = rsm_response.index
+                count = rsm_response.count
+                if index is None or count is None:
+                    # we don't have enough information to know if the data is complete
+                    # or not
+                    metadata["complete"] = None
+                else:
+                    # normally we have a strict equality here but XEP-0059 states
+                    # that index MAY be approximative, so just in case…
+                    metadata["complete"] = index + len(items) >= count
 
-            return (items, metadata)
-
-        d.addCallback(addMetadata)
-        return d
+        return (items, metadata)
 
     # @defer.inlineCallbacks
     # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
@@ -1100,22 +1090,24 @@
     def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
         client = self.host.getClient(profile_key)
         service = None if not service else jid.JID(service)
-        d = self.subscribe(client, service, nodeIdentifier, options=options or None)
+        d = defer.ensureDeferred(
+            self.subscribe(client, service, nodeIdentifier, options=options or None)
+        )
         d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
         return d
 
-    def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None):
+    async def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None):
         # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
-        return client.pubsub_client.subscribe(
+        return await client.pubsub_client.subscribe(
             service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options
         )
 
     def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
         client = self.host.getClient(profile_key)
         service = None if not service else jid.JID(service)
-        return self.unsubscribe(client, service, nodeIdentifier)
+        return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier))
 
-    def unsubscribe(
+    async def unsubscribe(
         self,
         client,
         service,
@@ -1124,7 +1116,7 @@
         subscriptionIdentifier=None,
         sender=None,
     ):
-        return client.pubsub_client.unsubscribe(
+        return await client.pubsub_client.unsubscribe(
             service,
             nodeIdentifier,
             sub_jid or client.jid.userhostJID(),
@@ -1394,8 +1386,10 @@
         client = self.host.getClient(profile_key)
         deferreds = {}
         for service, node in node_data:
-            deferreds[(service, node)] = client.pubsub_client.subscribe(
-                service, node, subscriber, options=options
+            deferreds[(service, node)] = defer.ensureDeferred(
+                client.pubsub_client.subscribe(
+                    service, node, subscriber, options=options
+                )
             )
         return self.rt_sessions.newSession(deferreds, client.profile)
         # found_nodes = yield self.listNodes(service, profile=client.profile)
@@ -1475,9 +1469,9 @@
         client = self.host.getClient(profile_key)
         deferreds = {}
         for service, node in node_data:
-            deferreds[(service, node)] = self.getItems(
+            deferreds[(service, node)] = defer.ensureDeferred(self.getItems(
                 client, service, node, max_item, rsm_request=rsm_request, extra=extra
-            )
+            ))
         return self.rt_sessions.newSession(deferreds, client.profile)
 
 
--- a/sat/plugins/plugin_xep_0277.py	Sun Jun 27 00:15:40 2021 +0200
+++ b/sat/plugins/plugin_xep_0277.py	Wed Jun 30 16:19:14 2021 +0200
@@ -941,13 +941,14 @@
         service = jid.JID(service) if service else None
         max_items = None if max_items == C.NO_LIMIT else max_items
         extra = self._p.parseExtra(extra_dict)
-        d = self.mbGet(client, service, node or None, max_items, item_ids,
+        d = defer.ensureDeferred(
+            self.mbGet(client, service, node or None, max_items, item_ids,
                        extra.rsm_request, extra.extra)
+        )
         d.addCallback(self._mbGetSerialise)
         return d
 
-    @defer.inlineCallbacks
-    def mbGet(self, client, service=None, node=None, max_items=10, item_ids=None,
+    async def mbGet(self, client, service=None, node=None, max_items=10, item_ids=None,
               rsm_request=None, extra=None):
         """Get some microblogs
 
@@ -963,7 +964,7 @@
         """
         if node is None:
             node = NS_MICROBLOG
-        items_data = yield self._p.getItems(
+        items_data = await self._p.getItems(
             client,
             service,
             node,
@@ -972,9 +973,9 @@
             rsm_request=rsm_request,
             extra=extra,
         )
-        mb_data = yield self._p.transItemsDataD(
+        mb_data = await self._p.transItemsDataD(
             items_data, partial(self.item2mbdata, client, service=service, node=node))
-        defer.returnValue(mb_data)
+        return mb_data
 
     def _mbRename(self, service, node, item_id, new_id, profile_key):
         return defer.ensureDeferred(self.mbRename(
@@ -1372,13 +1373,15 @@
                         service = jid.JID(service_s)
                         node = item["{}{}".format(prefix, "_node")]
                         # time to get the comments
-                        d = self._p.getItems(
-                            client,
-                            service,
-                            node,
-                            max_comments,
-                            rsm_request=rsm_comments,
-                            extra=extra_comments,
+                        d = defer.ensureDeferred(
+                            self._p.getItems(
+                                client,
+                                service,
+                                node,
+                                max_comments,
+                                rsm_request=rsm_comments,
+                                extra=extra_comments,
+                            )
                         )
                         # then serialise
                         d.addCallback(
@@ -1420,9 +1423,9 @@
 
         deferreds = {}
         for service, node in node_data:
-            d = deferreds[(service, node)] = self._p.getItems(
+            d = deferreds[(service, node)] = defer.ensureDeferred(self._p.getItems(
                 client, service, node, max_items, rsm_request=rsm_request, extra=extra
-            )
+            ))
             d.addCallback(
                 lambda items_data: self._p.transItemsDataD(
                     items_data,
--- a/sat/plugins/plugin_xep_0346.py	Sun Jun 27 00:15:40 2021 +0200
+++ b/sat/plugins/plugin_xep_0346.py	Wed Jun 30 16:19:14 2021 +0200
@@ -627,8 +627,7 @@
         extra = data_format.deserialise(extra)
         return client, service, node or None, schema, item_id or None, extra
 
-    @defer.inlineCallbacks
-    def copyMissingValues(self, client, service, node, item_id, form_ns, values):
+    async def copyMissingValues(self, client, service, node, item_id, form_ns, values):
         """Retrieve values existing in original item and missing in update
 
         Existing item will be retrieve, and values not already specified in values will
@@ -643,7 +642,7 @@
         """
         try:
             # we get previous item
-            items_data = yield self._p.getItems(
+            items_data = await self._p.getItems(
                 client, service, node, item_ids=[item_id]
             )
             item_elt = items_data[0][0]
--- a/sat/plugins/plugin_xep_0384.py	Sun Jun 27 00:15:40 2021 +0200
+++ b/sat/plugins/plugin_xep_0384.py	Wed Jun 30 16:19:14 2021 +0200
@@ -792,8 +792,7 @@
                     devices.add(device_id)
         return devices
 
-    @defer.inlineCallbacks
-    def getDevices(self, client, entity_jid=None):
+    async def getDevices(self, client, entity_jid=None):
         """Retrieve list of registered OMEMO devices
 
         @param entity_jid(jid.JID, None): get devices from this entity
@@ -803,13 +802,13 @@
         if entity_jid is not None:
             assert not entity_jid.resource
         try:
-            items, metadata = yield self._p.getItems(client, entity_jid, NS_OMEMO_DEVICES)
+            items, metadata = await self._p.getItems(client, entity_jid, NS_OMEMO_DEVICES)
         except exceptions.NotFound:
             log.info(_("there is no node to handle OMEMO devices"))
-            defer.returnValue(set())
+            return set()
 
         devices = self.parseDevices(items)
-        defer.returnValue(devices)
+        return devices
 
     async def setDevices(self, client, devices):
         log.debug(f"setting devices with {', '.join(str(d) for d in devices)}")
@@ -831,8 +830,7 @@
 
     # bundles
 
-    @defer.inlineCallbacks
-    def getBundles(self, client, entity_jid, devices_ids):
+    async def getBundles(self, client, entity_jid, devices_ids):
         """Retrieve public bundles of an entity devices
 
         @param entity_jid(jid.JID): bare jid of entity
@@ -849,7 +847,7 @@
         for device_id in devices_ids:
             node = NS_OMEMO_BUNDLE.format(device_id=device_id)
             try:
-                items, metadata = yield self._p.getItems(client, entity_jid, node)
+                items, metadata = await self._p.getItems(client, entity_jid, node)
             except exceptions.NotFound:
                 log.warning(_("Bundle missing for device {device_id}")
                     .format(device_id=device_id))
@@ -910,7 +908,7 @@
             bundles[device_id] = ExtendedPublicBundle.parse(omemo_backend, ik, spk,
                                                             spkSignature, otpks)
 
-        defer.returnValue((bundles, missing))
+        return (bundles, missing)
 
     async def setBundle(self, client, bundle, device_id):
         """Set public bundle for this device.