Mercurial > libervia-backend
diff src/core/sat_main.py @ 2144:1d3f73e065e1
core, jp: component handling + client handling refactoring:
- SàT can now handle components
- plugin have now a "modes" key in PLUGIN_INFO where they declare if they can be used with clients and or components. They default to be client only.
- components are really similar to clients, but with some changes in behaviour:
* component has "entry point", which is a special plugin with a componentStart method, which is called just after component is connected
* trigger end with a different suffixes (e.g. profileConnected vs profileConnectedComponent), so a plugin which manage both clients and components can have different workflow
* for clients, only triggers of plugins handling client mode are launched
* for components, only triggers of plugins needed in dependencies are launched. They all must handle component mode.
* component have a sendHistory attribute (False by default) which can be set to True to allow saving sent messages into history
* for convenience, "client" is still used in method even if it can now be a component
* a new "component" boolean attribute tells if we have a component or a client
* components have to add themselve Message protocol
* roster and presence protocols are not added for components
* component default port is 5347 (which is Prosody's default port)
- asyncCreateProfile has been renamed for profileCreate, both to follow new naming convention and to prepare the transition to fully asynchronous bridge
- createProfile has a new "component" attribute. When used to create a component, it must be set to a component entry point
- jp: added --component argument to profile/create
- disconnect bridge method is now asynchronous, this way frontends can know when disconnection is finished
- new PI_* constants for PLUGIN_INFO values (not used everywhere yet)
- client/component connection workflow has been moved to their classes instead of being a host methods
- host.messageSend is now client.sendMessage, and former client.sendMessage is now client.sendMessageData.
- identities are now handled in client.identities list, so it can be updated dynamically by plugins (in the future, frontends should be able to update them too through bridge)
- profileConnecting* profileConnected* profileDisconnected* and getHandler now all use client instead of profile
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 12 Feb 2017 17:55:43 +0100 |
parents | be96beb7ca14 |
children | 1bb9bf1b4150 |
line wrap: on
line diff
--- a/src/core/sat_main.py Tue Feb 07 00:15:03 2017 +0100 +++ b/src/core/sat_main.py Sun Feb 12 17:55:43 2017 +0100 @@ -22,7 +22,6 @@ from twisted.application import service from twisted.internet import defer from twisted.words.protocols.jabber import jid -from twisted.words.xish import domish from twisted.internet import reactor from wokkel.xmppim import RosterItem from sat.core import xmpp @@ -41,7 +40,6 @@ import sys import os.path import uuid -import time try: from collections import OrderedDict # only available from python 2.7 @@ -81,7 +79,7 @@ self.bridge.register_method("getProfilesList", self.memory.getProfilesList) self.bridge.register_method("getEntityData", lambda jid_, keys, profile: self.memory.getEntityData(jid.JID(jid_), keys, profile)) self.bridge.register_method("getEntitiesData", self.memory._getEntitiesData) - self.bridge.register_method("asyncCreateProfile", self.memory.asyncCreateProfile) + self.bridge.register_method("profileCreate", self.memory.createProfile) self.bridge.register_method("asyncDeleteProfile", self.memory.asyncDeleteProfile) self.bridge.register_method("profileStartSession", self.memory.startSession) self.bridge.register_method("profileIsSessionStarted", self.memory._isSessionStarted) @@ -156,8 +154,20 @@ self.initialised.callback(None) log.info(_(u"Backend is ready")) + def _unimport_plugin(self, plugin_path): + """remove a plugin from sys.modules if it is there""" + try: + del sys.modules[plugin_path] + except KeyError: + pass + def _import_plugins(self): """Import all plugins found in plugins directory""" + # FIXME: module imported but cancelled should be deleted + # TODO: make this more generic and reusable in tools.common + # FIXME: should use imp + # TODO: do not import all plugins if no needed: component plugins are not needed if we + # just use a client, and plugin blacklisting should be possible in sat.conf plugins_path = os.path.dirname(sat.plugins.__file__) plugin_glob = "plugin*." + C.PLUGIN_EXT plug_lst = [os.path.splitext(plugin)[0] for plugin in map(os.path.basename, glob(os.path.join(plugins_path, plugin_glob)))] @@ -167,23 +177,34 @@ try: __import__(plugin_path) except exceptions.MissingModule as e: - try: - del sys.modules[plugin_path] - except KeyError: - pass + self._unimport_plugin(plugin_path) log.warning(u"Can't import plugin [{path}] because of an unavailale third party module:\n{msg}".format( path=plugin_path, msg=e)) continue except exceptions.CancelError as e: log.info(u"Plugin [{path}] cancelled its own import: {msg}".format(path=plugin_path, msg=e)) + self._unimport_plugin(plugin_path) continue except Exception as e: import traceback log.error(_(u"Can't import plugin [{path}]:\n{error}").format(path=plugin_path, error=traceback.format_exc())) + self._unimport_plugin(plugin_path) continue mod = sys.modules[plugin_path] plugin_info = mod.PLUGIN_INFO import_name = plugin_info['import_name'] + + plugin_modes = plugin_info[u'modes'] = set(plugin_info.setdefault(u"modes", C.PLUG_MODE_DEFAULT)) + + # if the plugin is an entry point, it must work in component mode + if plugin_info[u'type'] == C.PLUG_TYPE_ENTRY_POINT: + # if plugin is an entrypoint, we cache it + if C.PLUG_MODE_COMPONENT not in plugin_modes: + log.error(_(u"{type} type must be used with {mode} mode, ignoring plugin").format( + type = C.PLUG_TYPE_ENTRY_POINT, mode = C.PLUG_MODE_COMPONENT)) + self._unimport_plugin(plugin_path) + continue + if import_name in plugins_to_import: log.error(_(u"Name conflict for import name [{import_name}], can't import plugin [{name}]").format(**plugin_info)) continue @@ -243,6 +264,8 @@ self.plugins[import_name].is_handler = True else: self.plugins[import_name].is_handler = False + # we keep metadata as a Class attribute + self.plugins[import_name]._info = plugin_info #TODO: test xmppclient presence and register handler parent def pluginsUnload(self): @@ -268,12 +291,14 @@ return self.connect(profile, password, options) def connect(self, profile, password='', options=None, max_retries=C.XMPP_MAX_RETRIES): - """Retrieve the individual parameters, authenticate the profile + """Connect a profile (i.e. connect client.component to XMPP server) + + Retrieve the individual parameters, authenticate the profile and initiate the connection to the associated XMPP server. - @param profile: %(doc_profile)s @param password (string): the SàT profile password - @param options (dict): connection options + @param options (dict): connection options. Key can be: + - @param max_retries (int): max number of connection retries @return (D(bool)): - True if the XMPP connection was already established @@ -282,100 +307,32 @@ """ if options is None: options={} - def connectXMPPClient(dummy=None): + def connectProfile(dummy=None): if self.isConnected(profile): log.info(_("already connected !")) return True - d = self._connectXMPPClient(profile, max_retries) + + if self.memory.isComponent(profile): + d = xmpp.SatXMPPComponent.startConnection(self, profile, max_retries) + else: + d = xmpp.SatXMPPClient.startConnection(self, profile, max_retries) return d.addCallback(lambda dummy: False) d = self.memory.startSession(password, profile) - d.addCallback(connectXMPPClient) + d.addCallback(connectProfile) return d - @defer.inlineCallbacks - def _connectXMPPClient(self, profile, max_retries): - """This part is called from connect when we have loaded individual parameters from memory""" - try: - port = int(self.memory.getParamA(C.FORCE_PORT_PARAM, "Connection", profile_key=profile)) - except ValueError: - log.debug(_("Can't parse port value, using default value")) - port = None # will use default value 5222 or be retrieved from a DNS SRV record - - password = yield self.memory.asyncGetParamA("Password", "Connection", profile_key=profile) - current = self.profiles[profile] = xmpp.SatXMPPClient(self, profile, - jid.JID(self.memory.getParamA("JabberID", "Connection", profile_key=profile)), - password, self.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile), - port, max_retries) - - current.messageProt = xmpp.SatMessageProtocol(self) - current.messageProt.setHandlerParent(current) - - current.roster = xmpp.SatRosterProtocol(self) - current.roster.setHandlerParent(current) - - current.presence = xmpp.SatPresenceProtocol(self) - current.presence.setHandlerParent(current) - - current.fallBack = xmpp.SatFallbackHandler(self) - current.fallBack.setHandlerParent(current) - - current.versionHandler = xmpp.SatVersionHandler(C.APP_NAME_FULL, - self.full_version) - current.versionHandler.setHandlerParent(current) - - current.identityHandler = xmpp.SatIdentityHandler() - current.identityHandler.setHandlerParent(current) - - log.debug(_("setting plugins parents")) - - plugin_conn_cb = [] - for plugin in self.plugins.iteritems(): - if plugin[1].is_handler: - plugin[1].getHandler(profile).setHandlerParent(current) - connected_cb = getattr(plugin[1], "profileConnected", None) # profile connected is called after client is ready and roster is got - if connected_cb: - plugin_conn_cb.append((plugin[0], connected_cb)) - try: - yield plugin[1].profileConnecting(profile) # profile connecting is called before actually starting client - except AttributeError: - pass - - current.startService() - - yield current.getConnectionDeferred() - yield current.roster.got_roster # we want to be sure that we got the roster - - # Call profileConnected callback for all plugins, and print error message if any of them fails - conn_cb_list = [] - for dummy, callback in plugin_conn_cb: - conn_cb_list.append(defer.maybeDeferred(callback, profile)) - list_d = defer.DeferredList(conn_cb_list) - - def logPluginResults(results): - all_succeed = all([success for success, result in results]) - if not all_succeed: - log.error(_(u"Plugins initialisation error")) - for idx, (success, result) in enumerate(results): - if not success: - log.error(u"error (plugin %(name)s): %(failure)s" % - {'name': plugin_conn_cb[idx][0], 'failure': result}) - - yield list_d.addCallback(logPluginResults) # FIXME: we should have a timeout here, and a way to know if a plugin freeze - # TODO: mesure launch time of each plugin - def disconnect(self, profile_key): """disconnect from jabber server""" + # FIXME: client should not be deleted if only disconnected + # it shoud be deleted only when session is finished if not self.isConnected(profile_key): - log.info(_("not connected !")) - return - profile = self.memory.getProfileName(profile_key) - log.info(_("Disconnecting...")) - self.profiles[profile].stopService() - for plugin in self.plugins.iteritems(): - disconnected_cb = getattr(plugin[1], "profileDisconnected", None) - if disconnected_cb: - disconnected_cb(profile) + # isConnected is checked here and not on client + # because client is deleted when session is ended + log.info(_(u"not connected !")) + return defer.succeed(None) + client = self.getClient(profile_key) + return client.entityDisconnect() def getFeatures(self, profile_key=C.PROF_KEY_NONE): """Get available features @@ -439,14 +396,17 @@ client = self.getClient(profile_key) return [jid_.full() for jid_ in client.roster.getJidsFromGroup(group)] - def purgeClient(self, profile): - """Remove reference to a profile client and purge cache - the garbage collector can then free the memory""" + def purgeEntity(self, profile): + """Remove reference to a profile client/component and purge cache + + the garbage collector can then free the memory + """ try: del self.profiles[profile] except KeyError: log.error(_("Trying to remove reference to a client not referenced")) - self.memory.purgeProfileSession(profile) + else: + self.memory.purgeProfileSession(profile) def startService(self): log.info(u"Salut à toi ô mon frère !") @@ -515,6 +475,14 @@ """ return unicode(self.memory.getConfig(section, name, '')) + def logErrback(self, failure_): + """generic errback logging + + can be used as last errback to show unexpected error + """ + log.error(_(u"Unexpected error: {}".format(failure_))) + return failure_ + ## Client management ## def setParam(self, name, value, category, security_limit, profile_key): @@ -537,168 +505,11 @@ ## XMPP methods ## - def generateMessageXML(self, data): - """Generate <message/> stanza from message data - - @param data(dict): message data - domish element will be put in data['xml'] - following keys are needed: - - from - - to - - uid: can be set to '' if uid attribute is not wanted - - message - - type - - subject - - extra - @return (dict) message data - """ - data['xml'] = message_elt = domish.Element((None, 'message')) - message_elt["to"] = data["to"].full() - message_elt["from"] = data['from'].full() - message_elt["type"] = data["type"] - if data['uid']: # key must be present but can be set to '' - # by a plugin to avoid id on purpose - message_elt['id'] = data['uid'] - for lang, subject in data["subject"].iteritems(): - subject_elt = message_elt.addElement("subject", content=subject) - if lang: - subject_elt[(C.NS_XML, 'lang')] = lang - for lang, message in data["message"].iteritems(): - body_elt = message_elt.addElement("body", content=message) - if lang: - body_elt[(C.NS_XML, 'lang')] = lang - try: - thread = data['extra']['thread'] - except KeyError: - if 'thread_parent' in data['extra']: - raise exceptions.InternalError(u"thread_parent found while there is not associated thread") - else: - thread_elt = message_elt.addElement("thread", content=thread) - try: - thread_elt["parent"] = data["extra"]["thread_parent"] - except KeyError: - pass - return data - def _messageSend(self, to_jid_s, message, subject=None, mess_type='auto', extra=None, profile_key=C.PROF_KEY_NONE): client = self.getClient(profile_key) to_jid = jid.JID(to_jid_s) #XXX: we need to use the dictionary comprehension because D-Bus return its own types, and pickle can't manage them. TODO: Need to find a better way - return self.messageSend(client, to_jid, message, subject, mess_type, {unicode(key): unicode(value) for key, value in extra.items()}) - - def messageSend(self, client, to_jid, message, subject=None, mess_type='auto', extra=None, uid=None, no_trigger=False): - """Send a message to an entity - - @param to_jid(jid.JID): destinee of the message - @param message(dict): message body, key is the language (use '' when unknown) - @param subject(dict): message subject, key is the language (use '' when unknown) - @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or: - - auto: for automatic type detection - - info: for information ("info_type" can be specified in extra) - @param extra(dict, None): extra data. Key can be: - - info_type: information type, can be - TODO - @param uid(unicode, None): unique id: - should be unique at least in this XMPP session - if None, an uuid will be generated - @param no_trigger (bool): if True, messageSend trigger will no be used - useful when a message need to be sent without any modification - """ - profile = client.profile - if subject is None: - subject = {} - if extra is None: - extra = {} - data = { # dict is similar to the one used in client.onMessage - "from": client.jid, - "to": to_jid, - "uid": uid or unicode(uuid.uuid4()), - "message": message, - "subject": subject, - "type": mess_type, - "extra": extra, - "timestamp": time.time(), - } - pre_xml_treatments = defer.Deferred() # XXX: plugin can add their pre XML treatments to this deferred - post_xml_treatments = defer.Deferred() # XXX: plugin can add their post XML treatments to this deferred - - if data["type"] == "auto": - # we try to guess the type - if data["subject"]: - data["type"] = 'normal' - elif not data["to"].resource: # if to JID has a resource, the type is not 'groupchat' - # we may have a groupchat message, we check if the we know this jid - try: - entity_type = self.memory.getEntityData(data["to"], ['type'], profile)["type"] - #FIXME: should entity_type manage resources ? - except (exceptions.UnknownEntityError, KeyError): - entity_type = "contact" - - if entity_type == "chatroom": - data["type"] = 'groupchat' - else: - data["type"] = 'chat' - else: - data["type"] == 'chat' - data["type"] == "chat" if data["subject"] else "normal" - - # FIXME: send_only is used by libervia's OTR plugin to avoid - # the triggers from frontend, and no_trigger do the same - # thing internally, this could be unified - send_only = data['extra'].get('send_only', False) - - if not no_trigger and not send_only: - if not self.trigger.point("messageSend", client, data, pre_xml_treatments, post_xml_treatments): - return defer.succeed(None) - - log.debug(_(u"Sending message (type {type}, to {to})").format(type=data["type"], to=to_jid.full())) - - pre_xml_treatments.addCallback(lambda dummy: self.generateMessageXML(data)) - pre_xml_treatments.chainDeferred(post_xml_treatments) - post_xml_treatments.addCallback(client.sendMessage) - if send_only: - log.debug(_("Triggers, storage and echo have been inhibited by the 'send_only' parameter")) - else: - post_xml_treatments.addCallback(self.messageAddToHistory, client) - post_xml_treatments.addCallback(self.messageSendToBridge, client) - post_xml_treatments.addErrback(self._cancelErrorTrap) - pre_xml_treatments.callback(data) - return pre_xml_treatments - - def _cancelErrorTrap(self, failure): - """A message sending can be cancelled by a plugin treatment""" - failure.trap(exceptions.CancelError) - - def messageAddToHistory(self, data, client): - """Store message into database (for local history) - - @param data: message data dictionnary - @param client: profile's client - """ - if data[u"type"] != C.MESS_TYPE_GROUPCHAT: - # we don't add groupchat message to history, as we get them back - # and they will be added then - if data[u'message'] or data[u'subject']: # we need a message to store - self.memory.addToHistory(client, data) - else: - log.warning(u"No message found") # empty body should be managed by plugins before this point - return data - - def messageSendToBridge(self, data, client): - """Send message to bridge, so frontends can display it - - @param data: message data dictionnary - @param client: profile's client - """ - if data[u"type"] != C.MESS_TYPE_GROUPCHAT: - # we don't send groupchat message to bridge, as we get them back - # and they will be added the - if data[u'message'] or data[u'subject']: # we need a message to send something - # We send back the message, so all frontends are aware of it - self.bridge.messageNew(data[u'uid'], data[u'timestamp'], data[u'from'].full(), data[u'to'].full(), data[u'message'], data[u'subject'], data[u'type'], data[u'extra'], profile=client.profile) - else: - log.warning(_(u"No message found")) - return data + return client.sendMessage(to_jid, message, subject, mess_type, {unicode(key): unicode(value) for key, value in extra.items()}) def _setPresence(self, to="", show="", statuses=None, profile_key=C.PROF_KEY_NONE): return self.setPresence(jid.JID(to) if to else None, show, statuses, profile_key)