diff sat_frontends/bridge/pb.py @ 3039:a1bc34f90fa5

bridge (pb): implemented an asyncio compatible bridge: `pb` bridge can now be used with asyncio by instantiating AIOBridge.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:53:38 +0200
parents ab2696e34d29
children 84bb63e1e4c4
line wrap: on
line diff
--- a/sat_frontends/bridge/pb.py	Wed Sep 25 08:41:36 2019 +0200
+++ b/sat_frontends/bridge/pb.py	Wed Sep 25 08:53:38 2019 +0200
@@ -17,12 +17,16 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from sat.core.log import getLogger
+import asyncio
+from functools import partial
+from twisted.spread import pb
+from twisted.internet import reactor, defer
+from twisted.internet.error import ConnectionRefusedError
+from logging import getLogger
+from sat.core import exceptions
+from sat_frontends.bridge.bridge_frontend import BridgeException
 
 log = getLogger(__name__)
-from sat.core import exceptions
-from twisted.spread import pb
-from twisted.internet import reactor
 
 
 class SignalsHandler(pb.Referenceable):
@@ -38,7 +42,7 @@
         log.debug("registering signal {name}".format(name=name))
         method_name = "remote_" + name
         try:
-            self.__getattribute__(self, method_name)
+            self.__getattribute__(method_name)
         except AttributeError:
             pass
         else:
@@ -51,11 +55,12 @@
 
 
 class Bridge(object):
+
     def __init__(self):
         self.signals_handler = SignalsHandler()
 
     def __getattr__(self, name):
-        return lambda *args, **kwargs: self.call(name, args, kwargs)
+        return partial(self.call, name)
 
     def remoteCallback(self, result, callback):
         """call callback with argument or None
@@ -70,7 +75,7 @@
         else:
             callback(result)
 
-    def call(self, name, args, kwargs):
+    def call(self, name, *args, **kwargs):
         """call a remote method
 
         @param name(str): name of the bridge method
@@ -98,8 +103,9 @@
         if errback is not None:
             d.addErrback(errback)
 
-    def _initBridgeEb(self, failure):
-        log.error("Can't init bridge: {msg}".format(msg=failure))
+    def _initBridgeEb(self, failure_):
+        log.error("Can't init bridge: {msg}".format(msg=failure_))
+        return failure_
 
     def _set_root(self, root):
         """set remote root object
@@ -111,16 +117,24 @@
         d.addErrback(self._initBridgeEb)
         return d
 
-    def _generic_errback(self, failure):
-        log.error("bridge failure: {}".format(failure))
+    def getRootObjectEb(self, failure_):
+        """Call errback with appropriate bridge error"""
+        if failure_.check(ConnectionRefusedError):
+            raise exceptions.BridgeExceptionNoService
+        else:
+            raise failure_
 
     def bridgeConnect(self, callback, errback):
         factory = pb.PBClientFactory()
         reactor.connectTCP("localhost", 8789, factory)
         d = factory.getRootObject()
         d.addCallback(self._set_root)
-        d.addCallback(lambda __: callback())
-        d.addErrback(errback)
+        if callback is not None:
+            d.addCallback(lambda __: callback())
+        d.addErrback(self.getRootObjectEb)
+        if errback is not None:
+            d.addErrback(lambda failure_: errback(failure_.value))
+        return d
 
     def register_signal(self, functionName, handler, iface="core"):
         self.signals_handler.register_signal(functionName, handler, iface)
@@ -190,7 +204,7 @@
             errback = self._generic_errback
         d.addErrback(errback)
 
-    def discoInfos(self, entity_jid, node='', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None):
+    def discoInfos(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None):
         d = self.root.callRemote("discoInfos", entity_jid, node, use_cache, profile_key)
         if callback is not None:
             d.addCallback(callback)
@@ -198,7 +212,7 @@
             errback = self._generic_errback
         d.addErrback(errback)
 
-    def discoItems(self, entity_jid, node='', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None):
+    def discoItems(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None):
         d = self.root.callRemote("discoItems", entity_jid, node, use_cache, profile_key)
         if callback is not None:
             d.addCallback(callback)
@@ -581,3 +595,317 @@
         if errback is None:
             errback = self._generic_errback
         d.addErrback(errback)
+
+
+class AIOSignalsHandler(SignalsHandler):
+
+    def register_signal(self, name, handler, iface="core"):
+        async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture(
+            asyncio.ensure_future(handler(*args, **kwargs)))
+        return super().register_signal(name, async_handler, iface)
+
+
+class AIOBridge(Bridge):
+
+    def __init__(self):
+        self.signals_handler = AIOSignalsHandler()
+
+    def _errback(self, failure_):
+        raise BridgeException(
+            name=failure_.type.decode('utf-8'),
+            message=str(failure_.value)
+            )
+
+    def call(self, name, *args, **kwargs):
+        d = self.root.callRemote(name, *args, *kwargs)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    async def bridgeConnect(self):
+        d = super().bridgeConnect(callback=None, errback=None)
+        return await d.asFuture(asyncio.get_event_loop())
+
+    def actionsGet(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("actionsGet", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def addContact(self, entity_jid, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("addContact", entity_jid, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def asyncDeleteProfile(self, profile):
+        d = self.root.callRemote("asyncDeleteProfile", profile)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def asyncGetParamA(self, name, category, attribute="value", security_limit=-1, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("asyncGetParamA", name, category, attribute, security_limit, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def asyncGetParamsValuesFromCategory(self, category, security_limit=-1, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("asyncGetParamsValuesFromCategory", category, security_limit, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def connect(self, profile_key="@DEFAULT@", password='', options={}):
+        d = self.root.callRemote("connect", profile_key, password, options)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def delContact(self, entity_jid, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("delContact", entity_jid, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def discoFindByFeatures(self, namespaces, identities, bare_jid=False, service=True, roster=True, own_jid=True, local_device=False, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("discoFindByFeatures", namespaces, identities, bare_jid, service, roster, own_jid, local_device, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def discoInfos(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("discoInfos", entity_jid, node, use_cache, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def discoItems(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("discoItems", entity_jid, node, use_cache, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def disconnect(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("disconnect", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def encryptionNamespaceGet(self, arg_0):
+        d = self.root.callRemote("encryptionNamespaceGet", arg_0)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def encryptionPluginsGet(self):
+        d = self.root.callRemote("encryptionPluginsGet")
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def encryptionTrustUIGet(self, to_jid, namespace, profile_key):
+        d = self.root.callRemote("encryptionTrustUIGet", to_jid, namespace, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getConfig(self, section, name):
+        d = self.root.callRemote("getConfig", section, name)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getContacts(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("getContacts", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getContactsFromGroup(self, group, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("getContactsFromGroup", group, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getEntitiesData(self, jids, keys, profile):
+        d = self.root.callRemote("getEntitiesData", jids, keys, profile)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getEntityData(self, jid, keys, profile):
+        d = self.root.callRemote("getEntityData", jid, keys, profile)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getFeatures(self, profile_key):
+        d = self.root.callRemote("getFeatures", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getMainResource(self, contact_jid, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("getMainResource", contact_jid, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getParamA(self, name, category, attribute="value", profile_key="@DEFAULT@"):
+        d = self.root.callRemote("getParamA", name, category, attribute, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getParamsCategories(self):
+        d = self.root.callRemote("getParamsCategories")
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getParamsUI(self, security_limit=-1, app='', profile_key="@DEFAULT@"):
+        d = self.root.callRemote("getParamsUI", security_limit, app, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getPresenceStatuses(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("getPresenceStatuses", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getReady(self):
+        d = self.root.callRemote("getReady")
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getVersion(self):
+        d = self.root.callRemote("getVersion")
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def getWaitingSub(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("getWaitingSub", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def historyGet(self, from_jid, to_jid, limit, between=True, filters='', profile="@NONE@"):
+        d = self.root.callRemote("historyGet", from_jid, to_jid, limit, between, filters, profile)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def isConnected(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("isConnected", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def launchAction(self, callback_id, data, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("launchAction", callback_id, data, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def loadParamsTemplate(self, filename):
+        d = self.root.callRemote("loadParamsTemplate", filename)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def menuHelpGet(self, menu_id, language):
+        d = self.root.callRemote("menuHelpGet", menu_id, language)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def menuLaunch(self, menu_type, path, data, security_limit, profile_key):
+        d = self.root.callRemote("menuLaunch", menu_type, path, data, security_limit, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def menusGet(self, language, security_limit):
+        d = self.root.callRemote("menusGet", language, security_limit)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def messageEncryptionGet(self, to_jid, profile_key):
+        d = self.root.callRemote("messageEncryptionGet", to_jid, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def messageEncryptionStart(self, to_jid, namespace='', replace=False, profile_key="@NONE@"):
+        d = self.root.callRemote("messageEncryptionStart", to_jid, namespace, replace, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def messageEncryptionStop(self, to_jid, profile_key):
+        d = self.root.callRemote("messageEncryptionStop", to_jid, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def messageSend(self, to_jid, message, subject={}, mess_type="auto", extra={}, profile_key="@NONE@"):
+        d = self.root.callRemote("messageSend", to_jid, message, subject, mess_type, extra, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def namespacesGet(self):
+        d = self.root.callRemote("namespacesGet")
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def paramsRegisterApp(self, xml, security_limit=-1, app=''):
+        d = self.root.callRemote("paramsRegisterApp", xml, security_limit, app)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def profileCreate(self, profile, password='', component=''):
+        d = self.root.callRemote("profileCreate", profile, password, component)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def profileIsSessionStarted(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("profileIsSessionStarted", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def profileNameGet(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("profileNameGet", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def profileSetDefault(self, profile):
+        d = self.root.callRemote("profileSetDefault", profile)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def profileStartSession(self, password='', profile_key="@DEFAULT@"):
+        d = self.root.callRemote("profileStartSession", password, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def profilesListGet(self, clients=True, components=False):
+        d = self.root.callRemote("profilesListGet", clients, components)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def progressGet(self, id, profile):
+        d = self.root.callRemote("progressGet", id, profile)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def progressGetAll(self, profile):
+        d = self.root.callRemote("progressGetAll", profile)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def progressGetAllMetadata(self, profile):
+        d = self.root.callRemote("progressGetAllMetadata", profile)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def rosterResync(self, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("rosterResync", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def saveParamsTemplate(self, filename):
+        d = self.root.callRemote("saveParamsTemplate", filename)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def sessionInfosGet(self, profile_key):
+        d = self.root.callRemote("sessionInfosGet", profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def setParam(self, name, value, category, security_limit=-1, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("setParam", name, value, category, security_limit, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def setPresence(self, to_jid='', show='', statuses={}, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("setPresence", to_jid, show, statuses, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def subscription(self, sub_type, entity, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("subscription", sub_type, entity, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())
+
+    def updateContact(self, entity_jid, name, groups, profile_key="@DEFAULT@"):
+        d = self.root.callRemote("updateContact", entity_jid, name, groups, profile_key)
+        d.addErrback(self._errback)
+        return d.asFuture(asyncio.get_event_loop())