comparison sat_frontends/bridge/pb.py @ 3039:a1bc34f90fa5

bridge (pb): implemented an asyncio compatible bridge: `pb` bridge can now be used with asyncio by instantiating AIOBridge.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:53:38 +0200
parents ab2696e34d29
children 84bb63e1e4c4
comparison
equal deleted inserted replaced
3038:5f3068915686 3039:a1bc34f90fa5
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.log import getLogger 20 import asyncio
21 from functools import partial
22 from twisted.spread import pb
23 from twisted.internet import reactor, defer
24 from twisted.internet.error import ConnectionRefusedError
25 from logging import getLogger
26 from sat.core import exceptions
27 from sat_frontends.bridge.bridge_frontend import BridgeException
21 28
22 log = getLogger(__name__) 29 log = getLogger(__name__)
23 from sat.core import exceptions
24 from twisted.spread import pb
25 from twisted.internet import reactor
26 30
27 31
28 class SignalsHandler(pb.Referenceable): 32 class SignalsHandler(pb.Referenceable):
29 def __getattr__(self, name): 33 def __getattr__(self, name):
30 if name.startswith("remote_"): 34 if name.startswith("remote_"):
36 40
37 def register_signal(self, name, handler, iface="core"): 41 def register_signal(self, name, handler, iface="core"):
38 log.debug("registering signal {name}".format(name=name)) 42 log.debug("registering signal {name}".format(name=name))
39 method_name = "remote_" + name 43 method_name = "remote_" + name
40 try: 44 try:
41 self.__getattribute__(self, method_name) 45 self.__getattribute__(method_name)
42 except AttributeError: 46 except AttributeError:
43 pass 47 pass
44 else: 48 else:
45 raise exceptions.InternalError( 49 raise exceptions.InternalError(
46 "{name} signal handler has been registered twice".format( 50 "{name} signal handler has been registered twice".format(
49 ) 53 )
50 setattr(self, method_name, handler) 54 setattr(self, method_name, handler)
51 55
52 56
53 class Bridge(object): 57 class Bridge(object):
58
54 def __init__(self): 59 def __init__(self):
55 self.signals_handler = SignalsHandler() 60 self.signals_handler = SignalsHandler()
56 61
57 def __getattr__(self, name): 62 def __getattr__(self, name):
58 return lambda *args, **kwargs: self.call(name, args, kwargs) 63 return partial(self.call, name)
59 64
60 def remoteCallback(self, result, callback): 65 def remoteCallback(self, result, callback):
61 """call callback with argument or None 66 """call callback with argument or None
62 67
63 if result is not None not argument is used, 68 if result is not None not argument is used,
68 if result is None: 73 if result is None:
69 callback() 74 callback()
70 else: 75 else:
71 callback(result) 76 callback(result)
72 77
73 def call(self, name, args, kwargs): 78 def call(self, name, *args, **kwargs):
74 """call a remote method 79 """call a remote method
75 80
76 @param name(str): name of the bridge method 81 @param name(str): name of the bridge method
77 @param args(list): arguments 82 @param args(list): arguments
78 may contain callback and errback as last 2 items 83 may contain callback and errback as last 2 items
96 if callback is not None: 101 if callback is not None:
97 d.addCallback(self.remoteCallback, callback) 102 d.addCallback(self.remoteCallback, callback)
98 if errback is not None: 103 if errback is not None:
99 d.addErrback(errback) 104 d.addErrback(errback)
100 105
101 def _initBridgeEb(self, failure): 106 def _initBridgeEb(self, failure_):
102 log.error("Can't init bridge: {msg}".format(msg=failure)) 107 log.error("Can't init bridge: {msg}".format(msg=failure_))
108 return failure_
103 109
104 def _set_root(self, root): 110 def _set_root(self, root):
105 """set remote root object 111 """set remote root object
106 112
107 bridge will then be initialised 113 bridge will then be initialised
109 self.root = root 115 self.root = root
110 d = root.callRemote("initBridge", self.signals_handler) 116 d = root.callRemote("initBridge", self.signals_handler)
111 d.addErrback(self._initBridgeEb) 117 d.addErrback(self._initBridgeEb)
112 return d 118 return d
113 119
114 def _generic_errback(self, failure): 120 def getRootObjectEb(self, failure_):
115 log.error("bridge failure: {}".format(failure)) 121 """Call errback with appropriate bridge error"""
122 if failure_.check(ConnectionRefusedError):
123 raise exceptions.BridgeExceptionNoService
124 else:
125 raise failure_
116 126
117 def bridgeConnect(self, callback, errback): 127 def bridgeConnect(self, callback, errback):
118 factory = pb.PBClientFactory() 128 factory = pb.PBClientFactory()
119 reactor.connectTCP("localhost", 8789, factory) 129 reactor.connectTCP("localhost", 8789, factory)
120 d = factory.getRootObject() 130 d = factory.getRootObject()
121 d.addCallback(self._set_root) 131 d.addCallback(self._set_root)
122 d.addCallback(lambda __: callback()) 132 if callback is not None:
123 d.addErrback(errback) 133 d.addCallback(lambda __: callback())
134 d.addErrback(self.getRootObjectEb)
135 if errback is not None:
136 d.addErrback(lambda failure_: errback(failure_.value))
137 return d
124 138
125 def register_signal(self, functionName, handler, iface="core"): 139 def register_signal(self, functionName, handler, iface="core"):
126 self.signals_handler.register_signal(functionName, handler, iface) 140 self.signals_handler.register_signal(functionName, handler, iface)
127 141
128 142
188 d.addCallback(callback) 202 d.addCallback(callback)
189 if errback is None: 203 if errback is None:
190 errback = self._generic_errback 204 errback = self._generic_errback
191 d.addErrback(errback) 205 d.addErrback(errback)
192 206
193 def discoInfos(self, entity_jid, node='', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None): 207 def discoInfos(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None):
194 d = self.root.callRemote("discoInfos", entity_jid, node, use_cache, profile_key) 208 d = self.root.callRemote("discoInfos", entity_jid, node, use_cache, profile_key)
195 if callback is not None: 209 if callback is not None:
196 d.addCallback(callback) 210 d.addCallback(callback)
197 if errback is None: 211 if errback is None:
198 errback = self._generic_errback 212 errback = self._generic_errback
199 d.addErrback(errback) 213 d.addErrback(errback)
200 214
201 def discoItems(self, entity_jid, node='', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None): 215 def discoItems(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@", callback=None, errback=None):
202 d = self.root.callRemote("discoItems", entity_jid, node, use_cache, profile_key) 216 d = self.root.callRemote("discoItems", entity_jid, node, use_cache, profile_key)
203 if callback is not None: 217 if callback is not None:
204 d.addCallback(callback) 218 d.addCallback(callback)
205 if errback is None: 219 if errback is None:
206 errback = self._generic_errback 220 errback = self._generic_errback
579 if callback is not None: 593 if callback is not None:
580 d.addCallback(lambda __: callback()) 594 d.addCallback(lambda __: callback())
581 if errback is None: 595 if errback is None:
582 errback = self._generic_errback 596 errback = self._generic_errback
583 d.addErrback(errback) 597 d.addErrback(errback)
598
599
600 class AIOSignalsHandler(SignalsHandler):
601
602 def register_signal(self, name, handler, iface="core"):
603 async_handler = lambda *args, **kwargs: defer.Deferred.fromFuture(
604 asyncio.ensure_future(handler(*args, **kwargs)))
605 return super().register_signal(name, async_handler, iface)
606
607
608 class AIOBridge(Bridge):
609
610 def __init__(self):
611 self.signals_handler = AIOSignalsHandler()
612
613 def _errback(self, failure_):
614 raise BridgeException(
615 name=failure_.type.decode('utf-8'),
616 message=str(failure_.value)
617 )
618
619 def call(self, name, *args, **kwargs):
620 d = self.root.callRemote(name, *args, *kwargs)
621 d.addErrback(self._errback)
622 return d.asFuture(asyncio.get_event_loop())
623
624 async def bridgeConnect(self):
625 d = super().bridgeConnect(callback=None, errback=None)
626 return await d.asFuture(asyncio.get_event_loop())
627
628 def actionsGet(self, profile_key="@DEFAULT@"):
629 d = self.root.callRemote("actionsGet", profile_key)
630 d.addErrback(self._errback)
631 return d.asFuture(asyncio.get_event_loop())
632
633 def addContact(self, entity_jid, profile_key="@DEFAULT@"):
634 d = self.root.callRemote("addContact", entity_jid, profile_key)
635 d.addErrback(self._errback)
636 return d.asFuture(asyncio.get_event_loop())
637
638 def asyncDeleteProfile(self, profile):
639 d = self.root.callRemote("asyncDeleteProfile", profile)
640 d.addErrback(self._errback)
641 return d.asFuture(asyncio.get_event_loop())
642
643 def asyncGetParamA(self, name, category, attribute="value", security_limit=-1, profile_key="@DEFAULT@"):
644 d = self.root.callRemote("asyncGetParamA", name, category, attribute, security_limit, profile_key)
645 d.addErrback(self._errback)
646 return d.asFuture(asyncio.get_event_loop())
647
648 def asyncGetParamsValuesFromCategory(self, category, security_limit=-1, profile_key="@DEFAULT@"):
649 d = self.root.callRemote("asyncGetParamsValuesFromCategory", category, security_limit, profile_key)
650 d.addErrback(self._errback)
651 return d.asFuture(asyncio.get_event_loop())
652
653 def connect(self, profile_key="@DEFAULT@", password='', options={}):
654 d = self.root.callRemote("connect", profile_key, password, options)
655 d.addErrback(self._errback)
656 return d.asFuture(asyncio.get_event_loop())
657
658 def delContact(self, entity_jid, profile_key="@DEFAULT@"):
659 d = self.root.callRemote("delContact", entity_jid, profile_key)
660 d.addErrback(self._errback)
661 return d.asFuture(asyncio.get_event_loop())
662
663 def discoFindByFeatures(self, namespaces, identities, bare_jid=False, service=True, roster=True, own_jid=True, local_device=False, profile_key="@DEFAULT@"):
664 d = self.root.callRemote("discoFindByFeatures", namespaces, identities, bare_jid, service, roster, own_jid, local_device, profile_key)
665 d.addErrback(self._errback)
666 return d.asFuture(asyncio.get_event_loop())
667
668 def discoInfos(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@"):
669 d = self.root.callRemote("discoInfos", entity_jid, node, use_cache, profile_key)
670 d.addErrback(self._errback)
671 return d.asFuture(asyncio.get_event_loop())
672
673 def discoItems(self, entity_jid, node=u'', use_cache=True, profile_key="@DEFAULT@"):
674 d = self.root.callRemote("discoItems", entity_jid, node, use_cache, profile_key)
675 d.addErrback(self._errback)
676 return d.asFuture(asyncio.get_event_loop())
677
678 def disconnect(self, profile_key="@DEFAULT@"):
679 d = self.root.callRemote("disconnect", profile_key)
680 d.addErrback(self._errback)
681 return d.asFuture(asyncio.get_event_loop())
682
683 def encryptionNamespaceGet(self, arg_0):
684 d = self.root.callRemote("encryptionNamespaceGet", arg_0)
685 d.addErrback(self._errback)
686 return d.asFuture(asyncio.get_event_loop())
687
688 def encryptionPluginsGet(self):
689 d = self.root.callRemote("encryptionPluginsGet")
690 d.addErrback(self._errback)
691 return d.asFuture(asyncio.get_event_loop())
692
693 def encryptionTrustUIGet(self, to_jid, namespace, profile_key):
694 d = self.root.callRemote("encryptionTrustUIGet", to_jid, namespace, profile_key)
695 d.addErrback(self._errback)
696 return d.asFuture(asyncio.get_event_loop())
697
698 def getConfig(self, section, name):
699 d = self.root.callRemote("getConfig", section, name)
700 d.addErrback(self._errback)
701 return d.asFuture(asyncio.get_event_loop())
702
703 def getContacts(self, profile_key="@DEFAULT@"):
704 d = self.root.callRemote("getContacts", profile_key)
705 d.addErrback(self._errback)
706 return d.asFuture(asyncio.get_event_loop())
707
708 def getContactsFromGroup(self, group, profile_key="@DEFAULT@"):
709 d = self.root.callRemote("getContactsFromGroup", group, profile_key)
710 d.addErrback(self._errback)
711 return d.asFuture(asyncio.get_event_loop())
712
713 def getEntitiesData(self, jids, keys, profile):
714 d = self.root.callRemote("getEntitiesData", jids, keys, profile)
715 d.addErrback(self._errback)
716 return d.asFuture(asyncio.get_event_loop())
717
718 def getEntityData(self, jid, keys, profile):
719 d = self.root.callRemote("getEntityData", jid, keys, profile)
720 d.addErrback(self._errback)
721 return d.asFuture(asyncio.get_event_loop())
722
723 def getFeatures(self, profile_key):
724 d = self.root.callRemote("getFeatures", profile_key)
725 d.addErrback(self._errback)
726 return d.asFuture(asyncio.get_event_loop())
727
728 def getMainResource(self, contact_jid, profile_key="@DEFAULT@"):
729 d = self.root.callRemote("getMainResource", contact_jid, profile_key)
730 d.addErrback(self._errback)
731 return d.asFuture(asyncio.get_event_loop())
732
733 def getParamA(self, name, category, attribute="value", profile_key="@DEFAULT@"):
734 d = self.root.callRemote("getParamA", name, category, attribute, profile_key)
735 d.addErrback(self._errback)
736 return d.asFuture(asyncio.get_event_loop())
737
738 def getParamsCategories(self):
739 d = self.root.callRemote("getParamsCategories")
740 d.addErrback(self._errback)
741 return d.asFuture(asyncio.get_event_loop())
742
743 def getParamsUI(self, security_limit=-1, app='', profile_key="@DEFAULT@"):
744 d = self.root.callRemote("getParamsUI", security_limit, app, profile_key)
745 d.addErrback(self._errback)
746 return d.asFuture(asyncio.get_event_loop())
747
748 def getPresenceStatuses(self, profile_key="@DEFAULT@"):
749 d = self.root.callRemote("getPresenceStatuses", profile_key)
750 d.addErrback(self._errback)
751 return d.asFuture(asyncio.get_event_loop())
752
753 def getReady(self):
754 d = self.root.callRemote("getReady")
755 d.addErrback(self._errback)
756 return d.asFuture(asyncio.get_event_loop())
757
758 def getVersion(self):
759 d = self.root.callRemote("getVersion")
760 d.addErrback(self._errback)
761 return d.asFuture(asyncio.get_event_loop())
762
763 def getWaitingSub(self, profile_key="@DEFAULT@"):
764 d = self.root.callRemote("getWaitingSub", profile_key)
765 d.addErrback(self._errback)
766 return d.asFuture(asyncio.get_event_loop())
767
768 def historyGet(self, from_jid, to_jid, limit, between=True, filters='', profile="@NONE@"):
769 d = self.root.callRemote("historyGet", from_jid, to_jid, limit, between, filters, profile)
770 d.addErrback(self._errback)
771 return d.asFuture(asyncio.get_event_loop())
772
773 def isConnected(self, profile_key="@DEFAULT@"):
774 d = self.root.callRemote("isConnected", profile_key)
775 d.addErrback(self._errback)
776 return d.asFuture(asyncio.get_event_loop())
777
778 def launchAction(self, callback_id, data, profile_key="@DEFAULT@"):
779 d = self.root.callRemote("launchAction", callback_id, data, profile_key)
780 d.addErrback(self._errback)
781 return d.asFuture(asyncio.get_event_loop())
782
783 def loadParamsTemplate(self, filename):
784 d = self.root.callRemote("loadParamsTemplate", filename)
785 d.addErrback(self._errback)
786 return d.asFuture(asyncio.get_event_loop())
787
788 def menuHelpGet(self, menu_id, language):
789 d = self.root.callRemote("menuHelpGet", menu_id, language)
790 d.addErrback(self._errback)
791 return d.asFuture(asyncio.get_event_loop())
792
793 def menuLaunch(self, menu_type, path, data, security_limit, profile_key):
794 d = self.root.callRemote("menuLaunch", menu_type, path, data, security_limit, profile_key)
795 d.addErrback(self._errback)
796 return d.asFuture(asyncio.get_event_loop())
797
798 def menusGet(self, language, security_limit):
799 d = self.root.callRemote("menusGet", language, security_limit)
800 d.addErrback(self._errback)
801 return d.asFuture(asyncio.get_event_loop())
802
803 def messageEncryptionGet(self, to_jid, profile_key):
804 d = self.root.callRemote("messageEncryptionGet", to_jid, profile_key)
805 d.addErrback(self._errback)
806 return d.asFuture(asyncio.get_event_loop())
807
808 def messageEncryptionStart(self, to_jid, namespace='', replace=False, profile_key="@NONE@"):
809 d = self.root.callRemote("messageEncryptionStart", to_jid, namespace, replace, profile_key)
810 d.addErrback(self._errback)
811 return d.asFuture(asyncio.get_event_loop())
812
813 def messageEncryptionStop(self, to_jid, profile_key):
814 d = self.root.callRemote("messageEncryptionStop", to_jid, profile_key)
815 d.addErrback(self._errback)
816 return d.asFuture(asyncio.get_event_loop())
817
818 def messageSend(self, to_jid, message, subject={}, mess_type="auto", extra={}, profile_key="@NONE@"):
819 d = self.root.callRemote("messageSend", to_jid, message, subject, mess_type, extra, profile_key)
820 d.addErrback(self._errback)
821 return d.asFuture(asyncio.get_event_loop())
822
823 def namespacesGet(self):
824 d = self.root.callRemote("namespacesGet")
825 d.addErrback(self._errback)
826 return d.asFuture(asyncio.get_event_loop())
827
828 def paramsRegisterApp(self, xml, security_limit=-1, app=''):
829 d = self.root.callRemote("paramsRegisterApp", xml, security_limit, app)
830 d.addErrback(self._errback)
831 return d.asFuture(asyncio.get_event_loop())
832
833 def profileCreate(self, profile, password='', component=''):
834 d = self.root.callRemote("profileCreate", profile, password, component)
835 d.addErrback(self._errback)
836 return d.asFuture(asyncio.get_event_loop())
837
838 def profileIsSessionStarted(self, profile_key="@DEFAULT@"):
839 d = self.root.callRemote("profileIsSessionStarted", profile_key)
840 d.addErrback(self._errback)
841 return d.asFuture(asyncio.get_event_loop())
842
843 def profileNameGet(self, profile_key="@DEFAULT@"):
844 d = self.root.callRemote("profileNameGet", profile_key)
845 d.addErrback(self._errback)
846 return d.asFuture(asyncio.get_event_loop())
847
848 def profileSetDefault(self, profile):
849 d = self.root.callRemote("profileSetDefault", profile)
850 d.addErrback(self._errback)
851 return d.asFuture(asyncio.get_event_loop())
852
853 def profileStartSession(self, password='', profile_key="@DEFAULT@"):
854 d = self.root.callRemote("profileStartSession", password, profile_key)
855 d.addErrback(self._errback)
856 return d.asFuture(asyncio.get_event_loop())
857
858 def profilesListGet(self, clients=True, components=False):
859 d = self.root.callRemote("profilesListGet", clients, components)
860 d.addErrback(self._errback)
861 return d.asFuture(asyncio.get_event_loop())
862
863 def progressGet(self, id, profile):
864 d = self.root.callRemote("progressGet", id, profile)
865 d.addErrback(self._errback)
866 return d.asFuture(asyncio.get_event_loop())
867
868 def progressGetAll(self, profile):
869 d = self.root.callRemote("progressGetAll", profile)
870 d.addErrback(self._errback)
871 return d.asFuture(asyncio.get_event_loop())
872
873 def progressGetAllMetadata(self, profile):
874 d = self.root.callRemote("progressGetAllMetadata", profile)
875 d.addErrback(self._errback)
876 return d.asFuture(asyncio.get_event_loop())
877
878 def rosterResync(self, profile_key="@DEFAULT@"):
879 d = self.root.callRemote("rosterResync", profile_key)
880 d.addErrback(self._errback)
881 return d.asFuture(asyncio.get_event_loop())
882
883 def saveParamsTemplate(self, filename):
884 d = self.root.callRemote("saveParamsTemplate", filename)
885 d.addErrback(self._errback)
886 return d.asFuture(asyncio.get_event_loop())
887
888 def sessionInfosGet(self, profile_key):
889 d = self.root.callRemote("sessionInfosGet", profile_key)
890 d.addErrback(self._errback)
891 return d.asFuture(asyncio.get_event_loop())
892
893 def setParam(self, name, value, category, security_limit=-1, profile_key="@DEFAULT@"):
894 d = self.root.callRemote("setParam", name, value, category, security_limit, profile_key)
895 d.addErrback(self._errback)
896 return d.asFuture(asyncio.get_event_loop())
897
898 def setPresence(self, to_jid='', show='', statuses={}, profile_key="@DEFAULT@"):
899 d = self.root.callRemote("setPresence", to_jid, show, statuses, profile_key)
900 d.addErrback(self._errback)
901 return d.asFuture(asyncio.get_event_loop())
902
903 def subscription(self, sub_type, entity, profile_key="@DEFAULT@"):
904 d = self.root.callRemote("subscription", sub_type, entity, profile_key)
905 d.addErrback(self._errback)
906 return d.asFuture(asyncio.get_event_loop())
907
908 def updateContact(self, entity_jid, name, groups, profile_key="@DEFAULT@"):
909 d = self.root.callRemote("updateContact", entity_jid, name, groups, profile_key)
910 d.addErrback(self._errback)
911 return d.asFuture(asyncio.get_event_loop())