Mercurial > libervia-backend
diff sat_frontends/bridge/pb.py @ 3039:a1bc34f90fa5
bridge (pb): implemented an asyncio compatible bridge:
`pb` bridge can now be used with asyncio by instantiating AIOBridge.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:53:38 +0200 |
parents | ab2696e34d29 |
children | 84bb63e1e4c4 |
line wrap: on
line diff
--- a/sat_frontends/bridge/pb.py Wed Sep 25 08:41:36 2019 +0200 +++ b/sat_frontends/bridge/pb.py Wed Sep 25 08:53:38 2019 +0200 @@ -17,12 +17,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from sat.core.log import getLogger +import asyncio +from functools import partial +from twisted.spread import pb +from twisted.internet import reactor, defer +from twisted.internet.error import ConnectionRefusedError +from logging import getLogger +from sat.core import exceptions +from sat_frontends.bridge.bridge_frontend import BridgeException log = getLogger(__name__) -from sat.core import exceptions -from twisted.spread import pb -from twisted.internet import reactor class SignalsHandler(pb.Referenceable): @@ -38,7 +42,7 @@ log.debug("registering signal {name}".format(name=name)) method_name = "remote_" + name try: - self.__getattribute__(self, method_name) + self.__getattribute__(method_name) except AttributeError: pass else: @@ -51,11 +55,12 @@ class Bridge(object): + def __init__(self): self.signals_handler = SignalsHandler() def __getattr__(self, name): - return lambda *args, **kwargs: self.call(name, args, kwargs) + return partial(self.call, name) def remoteCallback(self, result, callback): """call callback with argument or None @@ -70,7 +75,7 @@ else: callback(result) - def call(self, name, args, kwargs): + def call(self, name, *args, **kwargs): """call a remote method @param name(str): name of the bridge method @@ -98,8 +103,9 @@ if errback is not None: d.addErrback(errback) - def _initBridgeEb(self, failure): - log.error("Can't init bridge: {msg}".format(msg=failure)) + def _initBridgeEb(self, failure_): + log.error("Can't init bridge: {msg}".format(msg=failure_)) + return failure_ def _set_root(self, root): """set remote root object @@ -111,16 +117,24 @@ d.addErrback(self._initBridgeEb) return d - def _generic_errback(self, failure): - log.error("bridge failure: {}".format(failure)) + def getRootObjectEb(self, failure_): + """Call errback with appropriate bridge error""" + if failure_.check(ConnectionRefusedError): + raise exceptions.BridgeExceptionNoService + else: + raise failure_ def bridgeConnect(self, callback, errback): factory = pb.PBClientFactory() reactor.connectTCP("localhost", 8789, factory) d = factory.getRootObject() d.addCallback(self._set_root) - d.addCallback(lambda __: callback()) - d.addErrback(errback) + if callback is not None: + d.addCallback(lambda __: callback()) + d.addErrback(self.getRootObjectEb) + if errback is not None: + d.addErrback(lambda failure_: errback(failure_.value)) + return d def register_signal(self, functionName, handler, iface="core"): self.signals_handler.register_signal(functionName, handler, iface) @@ -190,7 +204,7 @@ errback = self._generic_errback d.addErrback(errback) - def discoInfos(self, entity_jid, node='', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None): + def discoInfos(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None): d = self.root.callRemote("discoInfos", entity_jid, node, use_cache, profile_key) if callback is not None: d.addCallback(callback) @@ -198,7 +212,7 @@ errback = self._generic_errback d.addErrback(errback) - def discoItems(self, entity_jid, node='', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None): + def discoItems(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None): d = self.root.callRemote("discoItems", entity_jid, node, use_cache, profile_key) if callback is not None: d.addCallback(callback) @@ -581,3 +595,317 @@ if errback is None: errback = self._generic_errback d.addErrback(errback) + + +class AIOSignalsHandler(SignalsHandler): + + def register_signal(self, name, handler, iface="core"): + async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture( + asyncio.ensure_future(handler(*args, **kwargs))) + return super().register_signal(name, async_handler, iface) + + +class AIOBridge(Bridge): + + def __init__(self): + self.signals_handler = AIOSignalsHandler() + + def _errback(self, failure_): + raise BridgeException( + name=failure_.type.decode('utf-8'), + message=str(failure_.value) + ) + + def call(self, name, *args, **kwargs): + d = self.root.callRemote(name, *args, *kwargs) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + async def bridgeConnect(self): + d = super().bridgeConnect(callback=None, errback=None) + return await d.asFuture(asyncio.get_event_loop()) + + def actionsGet(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("actionsGet", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def addContact(self, entity_jid, profile_key="@DEFAULT@"): + d = self.root.callRemote("addContact", entity_jid, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def asyncDeleteProfile(self, profile): + d = self.root.callRemote("asyncDeleteProfile", profile) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def asyncGetParamA(self, name, category, attribute="value", security_limit=-1, profile_key="@DEFAULT@"): + d = self.root.callRemote("asyncGetParamA", name, category, attribute, security_limit, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def asyncGetParamsValuesFromCategory(self, category, security_limit=-1, profile_key="@DEFAULT@"): + d = self.root.callRemote("asyncGetParamsValuesFromCategory", category, security_limit, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def connect(self, profile_key="@DEFAULT@", password='', options={}): + d = self.root.callRemote("connect", profile_key, password, options) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def delContact(self, entity_jid, profile_key="@DEFAULT@"): + d = self.root.callRemote("delContact", entity_jid, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def discoFindByFeatures(self, namespaces, identities, bare_jid=False, service=True, roster=True, own_jid=True, local_device=False, profile_key="@DEFAULT@"): + d = self.root.callRemote("discoFindByFeatures", namespaces, identities, bare_jid, service, roster, own_jid, local_device, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def discoInfos(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@"): + d = self.root.callRemote("discoInfos", entity_jid, node, use_cache, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def discoItems(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@"): + d = self.root.callRemote("discoItems", entity_jid, node, use_cache, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def disconnect(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("disconnect", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def encryptionNamespaceGet(self, arg_0): + d = self.root.callRemote("encryptionNamespaceGet", arg_0) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def encryptionPluginsGet(self): + d = self.root.callRemote("encryptionPluginsGet") + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def encryptionTrustUIGet(self, to_jid, namespace, profile_key): + d = self.root.callRemote("encryptionTrustUIGet", to_jid, namespace, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getConfig(self, section, name): + d = self.root.callRemote("getConfig", section, name) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getContacts(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("getContacts", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getContactsFromGroup(self, group, profile_key="@DEFAULT@"): + d = self.root.callRemote("getContactsFromGroup", group, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getEntitiesData(self, jids, keys, profile): + d = self.root.callRemote("getEntitiesData", jids, keys, profile) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getEntityData(self, jid, keys, profile): + d = self.root.callRemote("getEntityData", jid, keys, profile) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getFeatures(self, profile_key): + d = self.root.callRemote("getFeatures", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getMainResource(self, contact_jid, profile_key="@DEFAULT@"): + d = self.root.callRemote("getMainResource", contact_jid, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getParamA(self, name, category, attribute="value", profile_key="@DEFAULT@"): + d = self.root.callRemote("getParamA", name, category, attribute, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getParamsCategories(self): + d = self.root.callRemote("getParamsCategories") + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getParamsUI(self, security_limit=-1, app='', profile_key="@DEFAULT@"): + d = self.root.callRemote("getParamsUI", security_limit, app, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getPresenceStatuses(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("getPresenceStatuses", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getReady(self): + d = self.root.callRemote("getReady") + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getVersion(self): + d = self.root.callRemote("getVersion") + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def getWaitingSub(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("getWaitingSub", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def historyGet(self, from_jid, to_jid, limit, between=True, filters='', profile="@NONE@"): + d = self.root.callRemote("historyGet", from_jid, to_jid, limit, between, filters, profile) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def isConnected(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("isConnected", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def launchAction(self, callback_id, data, profile_key="@DEFAULT@"): + d = self.root.callRemote("launchAction", callback_id, data, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def loadParamsTemplate(self, filename): + d = self.root.callRemote("loadParamsTemplate", filename) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def menuHelpGet(self, menu_id, language): + d = self.root.callRemote("menuHelpGet", menu_id, language) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def menuLaunch(self, menu_type, path, data, security_limit, profile_key): + d = self.root.callRemote("menuLaunch", menu_type, path, data, security_limit, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def menusGet(self, language, security_limit): + d = self.root.callRemote("menusGet", language, security_limit) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def messageEncryptionGet(self, to_jid, profile_key): + d = self.root.callRemote("messageEncryptionGet", to_jid, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def messageEncryptionStart(self, to_jid, namespace='', replace=False, profile_key="@NONE@"): + d = self.root.callRemote("messageEncryptionStart", to_jid, namespace, replace, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def messageEncryptionStop(self, to_jid, profile_key): + d = self.root.callRemote("messageEncryptionStop", to_jid, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def messageSend(self, to_jid, message, subject={}, mess_type="auto", extra={}, profile_key="@NONE@"): + d = self.root.callRemote("messageSend", to_jid, message, subject, mess_type, extra, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def namespacesGet(self): + d = self.root.callRemote("namespacesGet") + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def paramsRegisterApp(self, xml, security_limit=-1, app=''): + d = self.root.callRemote("paramsRegisterApp", xml, security_limit, app) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def profileCreate(self, profile, password='', component=''): + d = self.root.callRemote("profileCreate", profile, password, component) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def profileIsSessionStarted(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("profileIsSessionStarted", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def profileNameGet(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("profileNameGet", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def profileSetDefault(self, profile): + d = self.root.callRemote("profileSetDefault", profile) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def profileStartSession(self, password='', profile_key="@DEFAULT@"): + d = self.root.callRemote("profileStartSession", password, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def profilesListGet(self, clients=True, components=False): + d = self.root.callRemote("profilesListGet", clients, components) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def progressGet(self, id, profile): + d = self.root.callRemote("progressGet", id, profile) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def progressGetAll(self, profile): + d = self.root.callRemote("progressGetAll", profile) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def progressGetAllMetadata(self, profile): + d = self.root.callRemote("progressGetAllMetadata", profile) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def rosterResync(self, profile_key="@DEFAULT@"): + d = self.root.callRemote("rosterResync", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def saveParamsTemplate(self, filename): + d = self.root.callRemote("saveParamsTemplate", filename) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def sessionInfosGet(self, profile_key): + d = self.root.callRemote("sessionInfosGet", profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def setParam(self, name, value, category, security_limit=-1, profile_key="@DEFAULT@"): + d = self.root.callRemote("setParam", name, value, category, security_limit, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def setPresence(self, to_jid='', show='', statuses={}, profile_key="@DEFAULT@"): + d = self.root.callRemote("setPresence", to_jid, show, statuses, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def subscription(self, sub_type, entity, profile_key="@DEFAULT@"): + d = self.root.callRemote("subscription", sub_type, entity, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def updateContact(self, entity_jid, name, groups, profile_key="@DEFAULT@"): + d = self.root.callRemote("updateContact", entity_jid, name, groups, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop())