diff libervia/backend/core/xmpp.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/core/xmpp.py@c23cad65ae99
children bc7d45dedeb0
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/core/xmpp.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,1953 @@
+#!/usr/bin/env python3
+
+# Libervia: an XMPP client
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import calendar
+import copy
+from functools import partial
+import mimetypes
+from pathlib import Path
+import sys
+import time
+from typing import Callable, Dict, Tuple, Optional
+from urllib.parse import unquote, urlparse
+import uuid
+
+import shortuuid
+from twisted.internet import defer, error as internet_error
+from twisted.internet import ssl
+from twisted.python import failure
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.protocols.jabber import error
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import client as wokkel_client, disco, generic, iwokkel, xmppim
+from wokkel import component
+from wokkel import delay
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core import core_types
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.memory import cache
+from libervia.backend.memory import encryption
+from libervia.backend.memory import persistent
+from libervia.backend.tools import xml_tools
+from libervia.backend.tools import utils
+from libervia.backend.tools.common import data_format
+
+log = getLogger(__name__)
+
+
+NS_X_DATA = "jabber:x:data"
+NS_DISCO_INFO = "http://jabber.org/protocol/disco#info"
+NS_XML_ELEMENT = "urn:xmpp:xml-element"
+NS_ROSTER_VER = "urn:xmpp:features:rosterver"
+# we use 2 "@" which is illegal in a jid, to be sure we are not mixing keys
+# with roster jids
+ROSTER_VER_KEY = "@version@"
+
+
+class ClientPluginWrapper:
+    """Use a plugin with default value if plugin is missing"""
+
+    def __init__(self, client, plugin_name, missing):
+        self.client = client
+        self.plugin = client.host_app.plugins.get(plugin_name)
+        if self.plugin is None:
+            self.plugin_name = plugin_name
+        self.missing = missing
+
+    def __getattr__(self, attr):
+        if self.plugin is None:
+            missing = self.missing
+            if isinstance(missing, type) and issubclass(missing, Exception):
+                raise missing(f"plugin {self.plugin_name!r} is not available")
+            elif isinstance(missing, Exception):
+                raise missing
+            else:
+                return lambda *args, **kwargs: missing
+        return partial(getattr(self.plugin, attr), self.client)
+
+
+class SatXMPPEntity(core_types.SatXMPPEntity):
+    """Common code for Client and Component"""
+    # profile is added there when start_connection begins and removed when it is finished
+    profiles_connecting = set()
+
+    def __init__(self, host_app, profile, max_retries):
+        factory = self.factory
+
+        # we monkey patch clientConnectionLost to handle network_enabled/network_disabled
+        # and to allow plugins to tune reconnection mechanism
+        clientConnectionFailed_ori = factory.clientConnectionFailed
+        clientConnectionLost_ori = factory.clientConnectionLost
+        factory.clientConnectionFailed = partial(
+            self.connection_terminated, term_type="failed", cb=clientConnectionFailed_ori)
+        factory.clientConnectionLost = partial(
+            self.connection_terminated, term_type="lost", cb=clientConnectionLost_ori)
+
+        factory.maxRetries = max_retries
+        factory.maxDelay = 30
+        # when self._connected_d is None, we are not connected
+        # else, it's a deferred which fire on disconnection
+        self._connected_d = None
+        self.profile = profile
+        self.host_app = host_app
+        self.cache = cache.Cache(host_app, profile)
+        self.mess_id2uid = {}  # map from message id to uid used in history.
+                               # Key: (full_jid, message_id) Value: uid
+        # this Deferred fire when entity is connected
+        self.conn_deferred = defer.Deferred()
+        self._progress_cb = {}  # callback called when a progress is requested
+                                # (key = progress id)
+        self.actions = {}  # used to keep track of actions for retrieval (key = action_id)
+        self.encryption = encryption.EncryptionHandler(self)
+
+    def __str__(self):
+        return f"Client for profile {self.profile}"
+
+    def __repr__(self):
+        return f"{super().__repr__()} - profile: {self.profile!r}"
+
+    ## initialisation ##
+
+    async def _call_connection_triggers(self, connection_timer):
+        """Call conneting trigger prepare connected trigger
+
+        @param plugins(iterable): plugins to use
+        @return (list[object, callable]): plugin to trigger tuples with:
+            - plugin instance
+            - profile_connected* triggers (to call after connection)
+        """
+        plugin_conn_cb = []
+        for plugin in self._get_plugins_list():
+            # we check if plugin handle client mode
+            if plugin.is_handler:
+                plugin.get_handler(self).setHandlerParent(self)
+
+            # profile_connecting/profile_connected methods handling
+
+            timer = connection_timer[plugin] = {
+                "total": 0
+            }
+            # profile connecting is called right now (before actually starting client)
+            connecting_cb = getattr(plugin, "profile_connecting", None)
+            if connecting_cb is not None:
+                connecting_start = time.time()
+                await utils.as_deferred(connecting_cb, self)
+                timer["connecting"] = time.time() - connecting_start
+                timer["total"] += timer["connecting"]
+
+            # profile connected is called after client is ready and roster is got
+            connected_cb = getattr(plugin, "profile_connected", None)
+            if connected_cb is not None:
+                plugin_conn_cb.append((plugin, connected_cb))
+
+        return plugin_conn_cb
+
+    def _get_plugins_list(self):
+        """Return list of plugin to use
+
+        need to be implemented by subclasses
+        this list is used to call profileConnect* triggers
+        @return(iterable[object]): plugins to use
+        """
+        raise NotImplementedError
+
+    def _create_sub_protocols(self):
+        return
+
+    def entity_connected(self):
+        """Called once connection is done
+
+        may return a Deferred, to perform initialisation tasks
+        """
+        return
+
+    @staticmethod
+    async def _run_profile_connected(
+        callback: Callable,
+        entity: "SatXMPPEntity",
+        timer: Dict[str, float]
+    ) -> None:
+        connected_start = time.time()
+        await utils.as_deferred(callback, entity)
+        timer["connected"] = time.time() - connected_start
+        timer["total"] += timer["connected"]
+
+    @classmethod
+    async def start_connection(cls, host, profile, max_retries):
+        """instantiate the entity and start the connection"""
+        # FIXME: reconnection doesn't seems to be handled correclty
+        #        (client is deleted then recreated from scratch)
+        #        most of methods called here should be called once on first connection
+        #        (e.g. adding subprotocols)
+        #        but client should not be deleted except if session is finished
+        #        (independently of connection/deconnection)
+        if profile in cls.profiles_connecting:
+            raise exceptions.CancelError(f"{profile} is already being connected")
+        cls.profiles_connecting.add(profile)
+        try:
+            try:
+                port = int(
+                    host.memory.param_get_a(
+                        C.FORCE_PORT_PARAM, "Connection", profile_key=profile
+                    )
+                )
+            except ValueError:
+                log.debug(_("Can't parse port value, using default value"))
+                port = (
+                    None
+                )  # will use default value 5222 or be retrieved from a DNS SRV record
+
+            password = await host.memory.param_get_a_async(
+                "Password", "Connection", profile_key=profile
+            )
+
+            entity_jid_s = await host.memory.param_get_a_async(
+                "JabberID", "Connection", profile_key=profile)
+            entity_jid = jid.JID(entity_jid_s)
+
+            if not entity_jid.resource and not cls.is_component and entity_jid.user:
+                # if no resource is specified, we create our own instead of using
+                # server returned one, as it will then stay stable in case of
+                # reconnection. we only do that for client and if there is a user part, to
+                # let server decide for anonymous login
+                resource_dict = await host.memory.storage.get_privates(
+                    "core:xmpp", ["resource"] , profile=profile)
+                try:
+                    resource = resource_dict["resource"]
+                except KeyError:
+                    resource = f"{C.APP_NAME_FILE}.{shortuuid.uuid()}"
+                    await host.memory.storage.set_private_value(
+                        "core:xmpp", "resource", resource, profile=profile)
+
+                log.info(_("We'll use the stable resource {resource}").format(
+                    resource=resource))
+                entity_jid.resource = resource
+
+            if profile in host.profiles:
+                if host.profiles[profile].is_connected():
+                    raise exceptions.InternalError(
+                        f"There is already a connected profile of name {profile!r} in "
+                        f"host")
+                log.debug(
+                    "removing unconnected profile {profile!r}")
+                del host.profiles[profile]
+            entity = host.profiles[profile] = cls(
+                host, profile, entity_jid, password,
+                host.memory.param_get_a(C.FORCE_SERVER_PARAM, "Connection",
+                                      profile_key=profile) or None,
+                port, max_retries,
+                )
+
+            await entity.encryption.load_sessions()
+
+            entity._create_sub_protocols()
+
+            entity.fallBack = SatFallbackHandler(host)
+            entity.fallBack.setHandlerParent(entity)
+
+            entity.versionHandler = SatVersionHandler(C.APP_NAME, host.full_version)
+            entity.versionHandler.setHandlerParent(entity)
+
+            entity.identityHandler = SatIdentityHandler()
+            entity.identityHandler.setHandlerParent(entity)
+
+            log.debug(_("setting plugins parents"))
+
+            connection_timer: Dict[str, Dict[str, float]] = {}
+            plugin_conn_cb = await entity._call_connection_triggers(connection_timer)
+
+            entity.startService()
+
+            await entity.conn_deferred
+
+            await defer.maybeDeferred(entity.entity_connected)
+
+            # Call profile_connected callback for all plugins,
+            # and print error message if any of them fails
+            conn_cb_list = []
+            for plugin, callback in plugin_conn_cb:
+                conn_cb_list.append(
+                    defer.ensureDeferred(
+                        cls._run_profile_connected(
+                            callback, entity, connection_timer[plugin]
+                        )
+                    )
+                )
+            list_d = defer.DeferredList(conn_cb_list)
+
+            def log_plugin_results(results):
+                if not results:
+                    log.info("no plugin loaded")
+                    return
+                all_succeed = all([success for success, result in results])
+                if not all_succeed:
+                    log.error(_("Plugins initialisation error"))
+                    for idx, (success, result) in enumerate(results):
+                        if not success:
+                            plugin_name = plugin_conn_cb[idx][0]._info["import_name"]
+                            log.error(f"error (plugin {plugin_name}): {result}")
+
+                log.debug(f"Plugin loading time for {profile!r} (longer to shorter):\n")
+                plugins_by_timer = sorted(
+                    connection_timer,
+                    key=lambda p: connection_timer[p]["total"],
+                    reverse=True
+                )
+                # total is the addition of all connecting and connected, doesn't really
+                # reflect the real loading time as connected are launched in a
+                # DeferredList
+                total_plugins = 0
+                # total real sum all connecting (which happen sequentially) and the
+                # longuest connected (connected happen in parallel, thus the longuest is
+                # roughly the total time for connected)
+                total_real = 0
+                total_real = max(t.get("connected", 0) for t in connection_timer.values())
+
+                for plugin in plugins_by_timer:
+                    name = plugin._info["import_name"]
+                    timer = connection_timer[plugin]
+                    total_plugins += timer["total"]
+                    try:
+                        connecting = f"{timer['connecting']:.2f}s"
+                    except KeyError:
+                        connecting = "n/a"
+                    else:
+                        total_real += timer["connecting"]
+                    try:
+                        connected = f"{timer['connected']:.2f}s"
+                    except KeyError:
+                        connected = "n/a"
+                    log.debug(
+                        f"  - {name}: total={timer['total']:.2f}s "
+                        f"connecting={connecting} connected={connected}"
+                    )
+                log.debug(
+                    f"  Plugins total={total_plugins:.2f}s real={total_real:.2f}s\n"
+                )
+
+            await list_d.addCallback(
+                log_plugin_results
+            )  # FIXME: we should have a timeout here, and a way to know if a plugin freeze
+            # TODO: mesure launch time of each plugin
+        finally:
+            cls.profiles_connecting.remove(profile)
+
+    def _disconnection_cb(self, __):
+        self._connected_d = None
+
+    def _disconnection_eb(self, failure_):
+        log.error(_("Error while disconnecting: {}".format(failure_)))
+
+    def _authd(self, xmlstream):
+        super(SatXMPPEntity, self)._authd(xmlstream)
+        log.debug(_("{profile} identified").format(profile=self.profile))
+        self.stream_initialized()
+
+    def _finish_connection(self, __):
+        if self.conn_deferred.called:
+            # can happen in case of forced disconnection by server
+            log.debug(f"{self} has already been connected")
+        else:
+            self.conn_deferred.callback(None)
+
+    def stream_initialized(self):
+        """Called after _authd"""
+        log.debug(_("XML stream is initialized"))
+        if not self.host_app.trigger.point("xml_init", self):
+            return
+        self.post_stream_init()
+
+    def post_stream_init(self):
+        """Workflow after stream initalisation."""
+        log.info(
+            _("********** [{profile}] CONNECTED **********").format(profile=self.profile)
+        )
+
+        # the following Deferred is used to know when we are connected
+        # so we need to be set it to None when connection is lost
+        self._connected_d = defer.Deferred()
+        self._connected_d.addCallback(self._clean_connection)
+        self._connected_d.addCallback(self._disconnection_cb)
+        self._connected_d.addErrback(self._disconnection_eb)
+
+        # we send the signal to the clients
+        self.host_app.bridge.connected(self.jid.full(), self.profile)
+
+        self.disco = SatDiscoProtocol(self)
+        self.disco.setHandlerParent(self)
+        self.discoHandler = disco.DiscoHandler()
+        self.discoHandler.setHandlerParent(self)
+        disco_d = defer.succeed(None)
+
+        if not self.host_app.trigger.point("Disco handled", disco_d, self.profile):
+            return
+
+        disco_d.addCallback(self._finish_connection)
+
+    def initializationFailed(self, reason):
+        log.error(
+            _(
+                "ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s"
+                % {"profile": self.profile, "reason": reason}
+            )
+        )
+        self.conn_deferred.errback(reason.value)
+        try:
+            super(SatXMPPEntity, self).initializationFailed(reason)
+        except:
+            # we already chained an errback, no need to raise an exception
+            pass
+
+    ## connection ##
+
+    def connection_terminated(self, connector, reason, term_type, cb):
+        """Display disconnection reason, and call factory method
+
+        This method is monkey patched to factory, allowing plugins to handle finely
+        reconnection with the triggers.
+        @param connector(twisted.internet.base.BaseConnector): current connector
+        @param reason(failure.Failure): why connection has been terminated
+        @param term_type(unicode): on of 'failed' or 'lost'
+        @param cb(callable): original factory method
+
+        @trigger connection_failed(connector, reason): connection can't be established
+        @trigger connection_lost(connector, reason): connection was available but it not
+            anymore
+        """
+        # we save connector because it may be deleted when connection will be dropped
+        # if reconnection is disabled
+        self._saved_connector = connector
+        if reason is not None and not isinstance(reason.value,
+                                                 internet_error.ConnectionDone):
+            try:
+                reason_str = str(reason.value)
+            except Exception:
+                # FIXME: workaround for Android were p4a strips docstrings
+                #        while Twisted use docstring in __str__
+                # TODO: create a ticket upstream, Twisted should work when optimization
+                #       is used
+                reason_str = str(reason.value.__class__)
+            log.warning(f"[{self.profile}] Connection {term_type}: {reason_str}")
+        if not self.host_app.trigger.point("connection_" + term_type, connector, reason):
+            return
+        return cb(connector, reason)
+
+    def network_disabled(self):
+        """Indicate that network has been completely disabled
+
+        In other words, internet is not available anymore and transport must be stopped.
+        Retrying is disabled too, as it makes no sense to try without network, and it may
+        use resources (notably battery on mobiles).
+        """
+        log.info(_("stopping connection because of network disabled"))
+        self.factory.continueTrying = 0
+        self._network_disabled = True
+        if self.xmlstream is not None:
+            self.xmlstream.transport.abortConnection()
+
+    def network_enabled(self):
+        """Indicate that network has been (re)enabled
+
+        This happens when e.g. user activate WIFI connection.
+        """
+        try:
+            connector = self._saved_connector
+            network_disabled = self._network_disabled
+        except AttributeError:
+            # connection has not been stopped by network_disabled
+            # we don't have to restart it
+            log.debug(f"no connection to restart [{self.profile}]")
+            return
+        else:
+            del self._network_disabled
+            if not network_disabled:
+                raise exceptions.InternalError("network_disabled should be True")
+        log.info(_("network is available, trying to connect"))
+        # we want to be sure to start fresh
+        self.factory.resetDelay()
+        # we have a saved connector, meaning the connection has been stopped previously
+        # we can now try to reconnect
+        connector.connect()
+
+    def _connected(self, xs):
+        send_hooks = []
+        receive_hooks = []
+        self.host_app.trigger.point(
+            "stream_hooks", self, receive_hooks, send_hooks)
+        for hook in receive_hooks:
+            xs.add_hook(C.STREAM_HOOK_RECEIVE, hook)
+        for hook in send_hooks:
+            xs.add_hook(C.STREAM_HOOK_SEND, hook)
+        super(SatXMPPEntity, self)._connected(xs)
+
+    def disconnect_profile(self, reason):
+        if self._connected_d is not None:
+            self.host_app.bridge.disconnected(
+                self.profile
+            )  # we send the signal to the clients
+            log.info(
+                _("********** [{profile}] DISCONNECTED **********").format(
+                    profile=self.profile
+                )
+            )
+            # we purge only if no new connection attempt is expected
+            if not self.factory.continueTrying:
+                log.debug("continueTrying not set, purging entity")
+                self._connected_d.callback(None)
+                # and we remove references to this client
+                self.host_app.purge_entity(self.profile)
+
+        if not self.conn_deferred.called:
+            if reason is None:
+                err = error.StreamError("Server unexpectedly closed the connection")
+            else:
+                err = reason
+                try:
+                    if err.value.args[0][0][2] == "certificate verify failed":
+                        err = exceptions.InvalidCertificate(
+                            _("Your server certificate is not valid "
+                              "(its identity can't be checked).\n\n"
+                              "This should never happen and may indicate that "
+                              "somebody is trying to spy on you.\n"
+                              "Please contact your server administrator."))
+                        self.factory.stopTrying()
+                        try:
+                            # with invalid certificate, we should not retry to connect
+                            # so we delete saved connector to avoid reconnection if
+                            # network_enabled is called.
+                            del self._saved_connector
+                        except AttributeError:
+                            pass
+                except (IndexError, TypeError):
+                    pass
+            self.conn_deferred.errback(err)
+
+    def _disconnected(self, reason):
+        super(SatXMPPEntity, self)._disconnected(reason)
+        if not self.host_app.trigger.point("disconnected", self, reason):
+            return
+        self.disconnect_profile(reason)
+
+    @defer.inlineCallbacks
+    def _clean_connection(self, __):
+        """method called on disconnection
+
+        used to call profile_disconnected* triggers
+        """
+        trigger_name = "profile_disconnected"
+        for plugin in self._get_plugins_list():
+            disconnected_cb = getattr(plugin, trigger_name, None)
+            if disconnected_cb is not None:
+                yield disconnected_cb(self)
+
+    def is_connected(self):
+        """Return True is client is fully connected
+
+        client is considered fully connected if transport is started and all plugins
+        are initialised
+        """
+        try:
+            transport_connected = bool(self.xmlstream.transport.connected)
+        except AttributeError:
+            return False
+
+        return self._connected_d is not None and transport_connected
+
+    def entity_disconnect(self):
+        if not self.host_app.trigger.point("disconnecting", self):
+            return
+        log.info(_("Disconnecting..."))
+        self.stopService()
+        if self._connected_d is not None:
+            return self._connected_d
+        else:
+            return defer.succeed(None)
+
+    ## sending ##
+
+    def IQ(self, type_="set", timeout=60):
+        """shortcut to create an IQ element managing deferred
+
+        @param type_(unicode): IQ type ('set' or 'get')
+        @param timeout(None, int): timeout in seconds
+        @return((D)domish.Element: result stanza
+            errback is called if an error stanza is returned
+        """
+        iq_elt = xmlstream.IQ(self.xmlstream, type_)
+        iq_elt.timeout = timeout
+        return iq_elt
+
+    def sendError(self, iq_elt, condition, text=None, appCondition=None):
+        """Send error stanza build from iq_elt
+
+        @param iq_elt(domish.Element): initial IQ element
+        @param condition(unicode): error condition
+        """
+        iq_error_elt = error.StanzaError(
+            condition, text=text, appCondition=appCondition
+        ).toResponse(iq_elt)
+        self.xmlstream.send(iq_error_elt)
+
+    def generate_message_xml(
+        self,
+        data: core_types.MessageData,
+        post_xml_treatments: Optional[defer.Deferred] = None
+    ) -> core_types.MessageData:
+        """Generate <message/> stanza from message data
+
+        @param data: message data
+            domish element will be put in data['xml']
+            following keys are needed:
+                - from
+                - to
+                - uid: can be set to '' if uid attribute is not wanted
+                - message
+                - type
+                - subject
+                - extra
+        @param post_xml_treatments: a Deferred which will be called with data once XML is
+            generated
+        @return: message data
+        """
+        data["xml"] = message_elt = domish.Element((None, "message"))
+        message_elt["to"] = data["to"].full()
+        message_elt["from"] = data["from"].full()
+        message_elt["type"] = data["type"]
+        if data["uid"]:  # key must be present but can be set to ''
+            # by a plugin to avoid id on purpose
+            message_elt["id"] = data["uid"]
+        for lang, subject in data["subject"].items():
+            subject_elt = message_elt.addElement("subject", content=subject)
+            if lang:
+                subject_elt[(C.NS_XML, "lang")] = lang
+        for lang, message in data["message"].items():
+            body_elt = message_elt.addElement("body", content=message)
+            if lang:
+                body_elt[(C.NS_XML, "lang")] = lang
+        try:
+            thread = data["extra"]["thread"]
+        except KeyError:
+            if "thread_parent" in data["extra"]:
+                raise exceptions.InternalError(
+                    "thread_parent found while there is not associated thread"
+                )
+        else:
+            thread_elt = message_elt.addElement("thread", content=thread)
+            try:
+                thread_elt["parent"] = data["extra"]["thread_parent"]
+            except KeyError:
+                pass
+
+        if post_xml_treatments is not None:
+            post_xml_treatments.callback(data)
+        return data
+
+    @property
+    def is_admin(self) -> bool:
+        """True if a client is an administrator with extra privileges"""
+        return self.host_app.memory.is_admin(self.profile)
+
+    def add_post_xml_callbacks(self, post_xml_treatments):
+        """Used to add class level callbacks at the end of the workflow
+
+        @param post_xml_treatments(D): the same Deferred as in sendMessage trigger
+        """
+        raise NotImplementedError
+
+    async def a_send(self, obj):
+        # original send method accept string
+        # but we restrict to domish.Element to make trigger treatments easier
+        assert isinstance(obj, domish.Element)
+        # XXX: this trigger is the last one before sending stanza on wire
+        #      it is intended for things like end 2 end encryption.
+        #      *DO NOT* cancel (i.e. return False) without very good reason
+        #      (out of band transmission for instance).
+        #      e2e should have a priority of 0 here, and out of band transmission
+        #      a lower priority
+        if not (await self.host_app.trigger.async_point("send", self, obj)):
+            return
+        super().send(obj)
+
+    def send(self, obj):
+        defer.ensureDeferred(self.a_send(obj))
+
+    async def send_message_data(self, mess_data):
+        """Convenient method to send message data to stream
+
+        This method will send mess_data[u'xml'] to stream, but a trigger is there
+        The trigger can't be cancelled, it's a good place for e2e encryption which
+        don't handle full stanza encryption
+        This trigger can return a Deferred (it's an async_point)
+        @param mess_data(dict): message data as constructed by onMessage workflow
+        @return (dict): mess_data (so it can be used in a deferred chain)
+        """
+        # XXX: This is the last trigger before u"send" (last but one globally)
+        #      for sending message.
+        #      This is intented for e2e encryption which doesn't do full stanza
+        #      encryption (e.g. OTR)
+        #      This trigger point can't cancel the method
+        await self.host_app.trigger.async_point("send_message_data", self, mess_data,
+            triggers_no_cancel=True)
+        await self.a_send(mess_data["xml"])
+        return mess_data
+
+    def sendMessage(
+            self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None,
+            no_trigger=False):
+        r"""Send a message to an entity
+
+        @param to_jid(jid.JID): destinee of the message
+        @param message(dict): message body, key is the language (use '' when unknown)
+        @param subject(dict): message subject, key is the language (use '' when unknown)
+        @param mess_type(str): one of standard message type (cf RFC 6121 §5.2.2) or:
+            - auto: for automatic type detection
+            - info: for information ("info_type" can be specified in extra)
+        @param extra(dict, None): extra data. Key can be:
+            - info_type: information type, can be
+                TODO
+        @param uid(unicode, None): unique id:
+            should be unique at least in this XMPP session
+            if None, an uuid will be generated
+        @param no_trigger (bool): if True, sendMessage[suffix] trigger will no be used
+            useful when a message need to be sent without any modification
+            /!\ this will also skip encryption methods!
+        """
+        if subject is None:
+            subject = {}
+        if extra is None:
+            extra = {}
+
+        assert mess_type in C.MESS_TYPE_ALL
+
+        data = {  # dict is similar to the one used in client.onMessage
+            "from": self.jid,
+            "to": to_jid,
+            "uid": uid or str(uuid.uuid4()),
+            "message": message,
+            "subject": subject,
+            "type": mess_type,
+            "extra": extra,
+            "timestamp": time.time(),
+        }
+        # XXX: plugin can add their pre XML treatments to this deferred
+        pre_xml_treatments = defer.Deferred()
+        # XXX: plugin can add their post XML treatments to this deferred
+        post_xml_treatments = defer.Deferred()
+
+        if data["type"] == C.MESS_TYPE_AUTO:
+            # we try to guess the type
+            if data["subject"]:
+                data["type"] = C.MESS_TYPE_NORMAL
+            elif not data["to"].resource:
+                # we may have a groupchat message, we check if the we know this jid
+                try:
+                    entity_type = self.host_app.memory.get_entity_datum(
+                        self, data["to"], C.ENTITY_TYPE
+                    )
+                    # FIXME: should entity_type manage resources ?
+                except (exceptions.UnknownEntityError, KeyError):
+                    entity_type = "contact"
+
+                if entity_type == C.ENTITY_TYPE_MUC:
+                    data["type"] = C.MESS_TYPE_GROUPCHAT
+                else:
+                    data["type"] = C.MESS_TYPE_CHAT
+            else:
+                data["type"] = C.MESS_TYPE_CHAT
+
+        # FIXME: send_only is used by libervia's OTR plugin to avoid
+        #        the triggers from frontend, and no_trigger do the same
+        #        thing internally, this could be unified
+        send_only = data["extra"].get("send_only", False)
+
+        if not no_trigger and not send_only:
+            # is the session encrypted? If so we indicate it in data
+            self.encryption.set_encryption_flag(data)
+
+            if not self.host_app.trigger.point(
+                "sendMessage" + self.trigger_suffix,
+                self,
+                data,
+                pre_xml_treatments,
+                post_xml_treatments,
+            ):
+                return defer.succeed(None)
+
+        log.debug(_("Sending message (type {type}, to {to})")
+                    .format(type=data["type"], to=to_jid.full()))
+
+        pre_xml_treatments.addCallback(lambda __: self.generate_message_xml(data, post_xml_treatments))
+        pre_xml_treatments.addCallback(lambda __: post_xml_treatments)
+        pre_xml_treatments.addErrback(self._cancel_error_trap)
+        post_xml_treatments.addCallback(
+            lambda __: defer.ensureDeferred(self.send_message_data(data))
+        )
+        if send_only:
+            log.debug(_("Triggers, storage and echo have been inhibited by the "
+                        "'send_only' parameter"))
+        else:
+            self.add_post_xml_callbacks(post_xml_treatments)
+            post_xml_treatments.addErrback(self._cancel_error_trap)
+            post_xml_treatments.addErrback(self.host_app.log_errback)
+        pre_xml_treatments.callback(data)
+        return pre_xml_treatments
+
+    def _cancel_error_trap(self, failure):
+        """A message sending can be cancelled by a plugin treatment"""
+        failure.trap(exceptions.CancelError)
+
+    def is_message_printable(self, mess_data):
+        """Return True if a message contain payload to show in frontends"""
+        return (
+            mess_data["message"] or mess_data["subject"]
+            or mess_data["extra"].get(C.KEY_ATTACHMENTS)
+            or mess_data["type"] == C.MESS_TYPE_INFO
+        )
+
+    async def message_add_to_history(self, data):
+        """Store message into database (for local history)
+
+        @param data: message data dictionnary
+        @param client: profile's client
+        """
+        if data["type"] != C.MESS_TYPE_GROUPCHAT:
+            # we don't add groupchat message to history, as we get them back
+            # and they will be added then
+
+            # we need a message to store
+            if self.is_message_printable(data):
+                await self.host_app.memory.add_to_history(self, data)
+            else:
+                log.warning(
+                    "No message found"
+                )  # empty body should be managed by plugins before this point
+        return data
+
+    def message_get_bridge_args(self, data):
+        """Generate args to use with bridge from data dict"""
+        return (data["uid"], data["timestamp"], data["from"].full(),
+                data["to"].full(), data["message"], data["subject"],
+                data["type"], data_format.serialise(data["extra"]))
+
+
+    def message_send_to_bridge(self, data):
+        """Send message to bridge, so frontends can display it
+
+        @param data: message data dictionnary
+        @param client: profile's client
+        """
+        if data["type"] != C.MESS_TYPE_GROUPCHAT:
+            # we don't send groupchat message to bridge, as we get them back
+            # and they will be added the
+
+            # we need a message to send something
+            if self.is_message_printable(data):
+
+                # We send back the message, so all frontends are aware of it
+                self.host_app.bridge.message_new(
+                    *self.message_get_bridge_args(data),
+                    profile=self.profile
+                )
+            else:
+                log.warning(_("No message found"))
+        return data
+
+    ## helper methods ##
+
+    def p(self, plugin_name, missing=exceptions.MissingModule):
+        """Get a plugin if available
+
+        @param plugin_name(str): name of the plugin
+        @param missing(object): value to return if plugin is missing
+            if it is a subclass of Exception, it will be raised with a helping str as
+            argument.
+        @return (object): requested plugin wrapper, or default value
+            The plugin wrapper will return the method with client set as first
+            positional argument
+        """
+        return ClientPluginWrapper(self, plugin_name, missing)
+
+
+ExtraDict = dict  # TODO
+
+
+@implementer(iwokkel.IDisco)
+class SatXMPPClient(SatXMPPEntity, wokkel_client.XMPPClient):
+    trigger_suffix = ""
+    is_component = False
+
+    def __init__(self, host_app, profile, user_jid, password, host=None,
+                 port=C.XMPP_C2S_PORT, max_retries=C.XMPP_MAX_RETRIES):
+        # XXX: DNS SRV records are checked when the host is not specified.
+        # If no SRV record is found, the host is directly extracted from the JID.
+        self.started = time.time()
+
+        # Currently, we use "client/pc/Salut à Toi", but as
+        # SàT is multi-frontends and can be used on mobile devices, as a bot,
+        # with a web frontend,
+        # etc., we should implement a way to dynamically update identities through the
+        # bridge
+        self.identities = [disco.DiscoIdentity("client", "pc", C.APP_NAME)]
+        if sys.platform == "android":
+            # for now we consider Android devices to be always phones
+            self.identities = [disco.DiscoIdentity("client", "phone", C.APP_NAME)]
+
+        hosts_map = host_app.memory.config_get(None, "hosts_dict", {})
+        if host is None and user_jid.host in hosts_map:
+            host_data = hosts_map[user_jid.host]
+            if isinstance(host_data, str):
+                host = host_data
+            elif isinstance(host_data, dict):
+                if "host" in host_data:
+                    host = host_data["host"]
+                if "port" in host_data:
+                    port = host_data["port"]
+            else:
+                log.warning(
+                    _("invalid data used for host: {data}").format(data=host_data)
+                )
+                host_data = None
+            if host_data is not None:
+                log.info(
+                    "using {host}:{port} for host {host_ori} as requested in config"
+                    .format(host_ori=user_jid.host, host=host, port=port)
+                )
+
+        self.check_certificate = host_app.memory.param_get_a(
+            "check_certificate", "Connection", profile_key=profile)
+
+        if self.check_certificate:
+            tls_required, configurationForTLS = True, None
+        else:
+            tls_required = False
+            configurationForTLS = ssl.CertificateOptions(trustRoot=None)
+
+        wokkel_client.XMPPClient.__init__(
+            self, user_jid, password, host or None, port or C.XMPP_C2S_PORT,
+            tls_required=tls_required, configurationForTLS=configurationForTLS
+        )
+        SatXMPPEntity.__init__(self, host_app, profile, max_retries)
+
+        if not self.check_certificate:
+            msg = (_("Certificate validation is deactivated, this is unsecure and "
+                "somebody may be spying on you. If you have no good reason to disable "
+                "certificate validation, please activate \"Check certificate\" in your "
+                "settings in \"Connection\" tab."))
+            xml_tools.quick_note(host_app, self, msg, _("Security notice"),
+                level = C.XMLUI_DATA_LVL_WARNING)
+
+    @property
+    def server_jid(self):
+        return jid.JID(self.jid.host)
+
+    def _get_plugins_list(self):
+        for p in self.host_app.plugins.values():
+            if C.PLUG_MODE_CLIENT in p._info["modes"]:
+                yield p
+
+    def _create_sub_protocols(self):
+        self.messageProt = SatMessageProtocol(self.host_app)
+        self.messageProt.setHandlerParent(self)
+
+        self.roster = SatRosterProtocol(self.host_app)
+        self.roster.setHandlerParent(self)
+
+        self.presence = SatPresenceProtocol(self.host_app)
+        self.presence.setHandlerParent(self)
+
+    @classmethod
+    async def start_connection(cls, host, profile, max_retries):
+        try:
+            await super(SatXMPPClient, cls).start_connection(host, profile, max_retries)
+        except exceptions.CancelError as e:
+            log.warning(f"start_connection cancelled: {e}")
+            return
+        entity = host.profiles[profile]
+        # we finally send our presence
+        entity.presence.available()
+
+    def entity_connected(self):
+        # we want to be sure that we got the roster
+        return self.roster.got_roster
+
+    def add_post_xml_callbacks(self, post_xml_treatments):
+        post_xml_treatments.addCallback(self.messageProt.complete_attachments)
+        post_xml_treatments.addCallback(
+            lambda ret: defer.ensureDeferred(self.message_add_to_history(ret))
+        )
+        post_xml_treatments.addCallback(self.message_send_to_bridge)
+
+    def feedback(
+        self,
+        to_jid: jid.JID,
+        message: str,
+        extra: Optional[ExtraDict] = None
+    ) -> None:
+        """Send message to frontends
+
+        This message will be an info message, not recorded in history.
+        It can be used to give feedback of a command
+        @param to_jid: destinee jid
+        @param message: message to send to frontends
+        @param extra: extra data to use in particular, info subtype can be specified with
+            MESS_EXTRA_INFO
+        """
+        if extra is None:
+            extra = {}
+        self.host_app.bridge.message_new(
+            uid=str(uuid.uuid4()),
+            timestamp=time.time(),
+            from_jid=self.jid.full(),
+            to_jid=to_jid.full(),
+            message={"": message},
+            subject={},
+            mess_type=C.MESS_TYPE_INFO,
+            extra=data_format.serialise(extra),
+            profile=self.profile,
+        )
+
+    def _finish_connection(self, __):
+        d = self.roster.request_roster()
+        d.addCallback(lambda __: super(SatXMPPClient, self)._finish_connection(__))
+
+
+@implementer(iwokkel.IDisco)
+class SatXMPPComponent(SatXMPPEntity, component.Component):
+    """XMPP component
+
+    This component are similar but not identical to clients.
+    An entry point plugin is launched after component is connected.
+    Component need to instantiate MessageProtocol itself
+    """
+
+    trigger_suffix = (
+        "Component"
+    )  # used for to distinguish some trigger points set in SatXMPPEntity
+    is_component = True
+    # XXX: set to True from entry plugin to keep messages in history for sent messages
+    sendHistory = False
+    # XXX: same as sendHistory but for received messaged
+    receiveHistory = False
+
+    def __init__(self, host_app, profile, component_jid, password, host=None, port=None,
+                 max_retries=C.XMPP_MAX_RETRIES):
+        self.started = time.time()
+        if port is None:
+            port = C.XMPP_COMPONENT_PORT
+
+        ## entry point ##
+        entry_point = host_app.memory.get_entry_point(profile)
+        try:
+            self.entry_plugin = host_app.plugins[entry_point]
+        except KeyError:
+            raise exceptions.NotFound(
+                _("The requested entry point ({entry_point}) is not available").format(
+                    entry_point=entry_point
+                )
+            )
+
+        self.enabled_features = set()
+        self.identities = [disco.DiscoIdentity("component", "generic", C.APP_NAME)]
+        # jid is set automatically on bind by Twisted for Client, but not for Component
+        self.jid = component_jid
+        if host is None:
+            try:
+                host = component_jid.host.split(".", 1)[1]
+            except IndexError:
+                raise ValueError("Can't guess host from jid, please specify a host")
+        # XXX: component.Component expect unicode jid, while Client expect jid.JID.
+        #      this is not consistent, so we use jid.JID for SatXMPP*
+        component.Component.__init__(self, host, port, component_jid.full(), password)
+        SatXMPPEntity.__init__(self, host_app, profile, max_retries)
+
+    @property
+    def server_jid(self):
+        # FIXME: not the best way to get server jid, maybe use config option?
+        return jid.JID(self.jid.host.split(".", 1)[-1])
+
+    @property
+    def is_admin(self) -> bool:
+        return False
+
+    def _create_sub_protocols(self):
+        self.messageProt = SatMessageProtocol(self.host_app)
+        self.messageProt.setHandlerParent(self)
+
+    def _build_dependencies(self, current, plugins, required=True):
+        """build recursively dependencies needed for a plugin
+
+        this method build list of plugin needed for a component and raises
+        errors if they are not available or not allowed for components
+        @param current(object): parent plugin to check
+            use entry_point for first call
+        @param plugins(list): list of validated plugins, will be filled by the method
+            give an empty list for first call
+        @param required(bool): True if plugin is mandatory
+            for recursive calls only, should not be modified by inital caller
+        @raise InternalError: one of the plugin is not handling components
+        @raise KeyError: one plugin should be present in self.host_app.plugins but it
+                         is not
+        """
+        if C.PLUG_MODE_COMPONENT not in current._info["modes"]:
+            if not required:
+                return
+            else:
+                log.error(
+                    _(
+                        "Plugin {current_name} is needed for {entry_name}, "
+                        "but it doesn't handle component mode"
+                    ).format(
+                        current_name=current._info["import_name"],
+                        entry_name=self.entry_plugin._info["import_name"],
+                    )
+                )
+                raise exceptions.InternalError(_("invalid plugin mode"))
+
+        for import_name in current._info.get(C.PI_DEPENDENCIES, []):
+            # plugins are already loaded as dependencies
+            # so we know they are in self.host_app.plugins
+            dep = self.host_app.plugins[import_name]
+            self._build_dependencies(dep, plugins)
+
+        for import_name in current._info.get(C.PI_RECOMMENDATIONS, []):
+            # here plugins are only recommendations,
+            # so they may not exist in self.host_app.plugins
+            try:
+                dep = self.host_app.plugins[import_name]
+            except KeyError:
+                continue
+            self._build_dependencies(dep, plugins, required=False)
+
+        if current not in plugins:
+            # current can be required for several plugins and so
+            # it can already be present in the list
+            plugins.append(current)
+
+    def _get_plugins_list(self):
+        # XXX: for component we don't launch all plugins triggers
+        #      but only the ones from which there is a dependency
+        plugins = []
+        self._build_dependencies(self.entry_plugin, plugins)
+        return plugins
+
+    def entity_connected(self):
+        # we can now launch entry point
+        try:
+            start_cb = self.entry_plugin.componentStart
+        except AttributeError:
+            return
+        else:
+            return start_cb(self)
+
+    def add_post_xml_callbacks(self, post_xml_treatments):
+        if self.sendHistory:
+            post_xml_treatments.addCallback(
+                lambda ret: defer.ensureDeferred(self.message_add_to_history(ret))
+            )
+
+    def get_owner_from_jid(self, to_jid: jid.JID) -> jid.JID:
+        """Retrieve "owner" of a component resource from the destination jid of the request
+
+        This method needs plugin XEP-0106 for unescaping, if you use it you must add the
+        plugin to your dependencies.
+        A "user" part must be present in "to_jid" (otherwise, the component itself is addressed)
+        @param to_jid: destination JID of the request
+        """
+        try:
+            unescape = self.host_app.plugins['XEP-0106'].unescape
+        except KeyError:
+            raise exceptions.MissingPlugin("Plugin XEP-0106 is needed to retrieve owner")
+        else:
+            user = unescape(to_jid.user)
+        if '@' in user:
+            # a full jid is specified
+            return jid.JID(user)
+        else:
+            # only user part is specified, we use our own host to build the full jid
+            return jid.JID(None, (user, self.host, None))
+
+    def get_owner_and_peer(self, iq_elt: domish.Element) -> Tuple[jid.JID, jid.JID]:
+        """Retrieve owner of a component jid, and the jid of the requesting peer
+
+        "owner" is found by either unescaping full jid from node, or by combining node
+        with our host.
+        Peer jid is the requesting jid from the IQ element
+        @param iq_elt: IQ stanza sent from the requested
+        @return: owner and peer JIDs
+        """
+        to_jid = jid.JID(iq_elt['to'])
+        if to_jid.user:
+            owner = self.get_owner_from_jid(to_jid)
+        else:
+            owner = jid.JID(iq_elt["from"]).userhostJID()
+
+        peer_jid = jid.JID(iq_elt["from"])
+        return peer_jid, owner
+
+    def get_virtual_client(self, jid_: jid.JID) -> SatXMPPEntity:
+        """Get client for this component with a specified jid
+
+        This is needed to perform operations with a virtual JID corresponding to a virtual
+        entity (e.g. identified of a legacy network account) instead of the JID of the
+        gateway itself.
+        @param jid_: virtual JID to use
+        @return: virtual client
+        """
+        client = copy.copy(self)
+        client.jid = jid_
+        return client
+
+
+class SatMessageProtocol(xmppim.MessageProtocol):
+
+    def __init__(self, host):
+        xmppim.MessageProtocol.__init__(self)
+        self.host = host
+
+    @property
+    def client(self):
+        return self.parent
+
+    def normalize_ns(self, elt: domish.Element, namespace: Optional[str]) -> None:
+        if elt.uri == namespace:
+            elt.defaultUri = elt.uri = C.NS_CLIENT
+        for child in elt.elements():
+            self.normalize_ns(child, namespace)
+
+    def parse_message(self, message_elt):
+        """Parse a message XML and return message_data
+
+        @param message_elt(domish.Element): raw <message> xml
+        @param client(SatXMPPClient, None): client to map message id to uid
+            if None, mapping will not be done
+        @return(dict): message data
+        """
+        if message_elt.name != "message":
+            log.warning(_(
+                "parse_message used with a non <message/> stanza, ignoring: {xml}"
+                .format(xml=message_elt.toXml())))
+            return {}
+
+        if message_elt.uri == None:
+            # xmlns may be None when wokkel element parsing strip out root namespace
+            self.normalize_ns(message_elt, None)
+        elif message_elt.uri != C.NS_CLIENT:
+            log.warning(_(
+                "received <message> with a wrong namespace: {xml}"
+                .format(xml=message_elt.toXml())))
+
+        client = self.parent
+
+        if not message_elt.hasAttribute('to'):
+            message_elt['to'] = client.jid.full()
+
+        message = {}
+        subject = {}
+        extra = {}
+        data = {
+            "from": jid.JID(message_elt["from"]),
+            "to": jid.JID(message_elt["to"]),
+            "uid": message_elt.getAttribute(
+                "uid", str(uuid.uuid4())
+            ),  # XXX: uid is not a standard attribute but may be added by plugins
+            "message": message,
+            "subject": subject,
+            "type": message_elt.getAttribute("type", "normal"),
+            "extra": extra,
+        }
+
+        try:
+            message_id = data["extra"]["message_id"] = message_elt["id"]
+        except KeyError:
+            pass
+        else:
+            client.mess_id2uid[(data["from"], message_id)] = data["uid"]
+
+        # message
+        for e in message_elt.elements(C.NS_CLIENT, "body"):
+            message[e.getAttribute((C.NS_XML, "lang"), "")] = str(e)
+
+        # subject
+        for e in message_elt.elements(C.NS_CLIENT, "subject"):
+            subject[e.getAttribute((C.NS_XML, "lang"), "")] = str(e)
+
+        # delay and timestamp
+        try:
+            received_timestamp = message_elt._received_timestamp
+        except AttributeError:
+            # message_elt._received_timestamp should have been set in onMessage
+            # but if parse_message is called directly, it can be missing
+            log.debug("missing received timestamp for {message_elt}".format(
+                message_elt=message_elt))
+            received_timestamp = time.time()
+
+        try:
+            delay_elt = next(message_elt.elements(delay.NS_DELAY, "delay"))
+        except StopIteration:
+            data["timestamp"] = received_timestamp
+        else:
+            parsed_delay = delay.Delay.fromElement(delay_elt)
+            data["timestamp"] = calendar.timegm(parsed_delay.stamp.utctimetuple())
+            data["received_timestamp"] = received_timestamp
+            if parsed_delay.sender:
+                data["delay_sender"] = parsed_delay.sender.full()
+
+        self.host.trigger.point("message_parse", client,  message_elt, data)
+        return data
+
+    def _on_message_start_workflow(self, cont, client, message_elt, post_treat):
+        """Parse message and do post treatments
+
+        It is the first callback called after message_received trigger
+        @param cont(bool): workflow will continue only if this is True
+        @param message_elt(domish.Element): message stanza
+            may have be modified by triggers
+        @param post_treat(defer.Deferred): post parsing treatments
+        """
+        if not cont:
+            return
+        data = self.parse_message(message_elt)
+        post_treat.addCallback(self.complete_attachments)
+        post_treat.addCallback(self.skip_empty_message)
+        if not client.is_component or client.receiveHistory:
+            post_treat.addCallback(
+                lambda ret: defer.ensureDeferred(self.add_to_history(ret))
+            )
+        if not client.is_component:
+            post_treat.addCallback(self.bridge_signal, data)
+        post_treat.addErrback(self.cancel_error_trap)
+        post_treat.callback(data)
+
+    def onMessage(self, message_elt):
+        # TODO: handle threads
+        message_elt._received_timestamp = time.time()
+        client = self.parent
+        if not "from" in message_elt.attributes:
+            message_elt["from"] = client.jid.host
+        log.debug(_("got message from: {from_}").format(from_=message_elt["from"]))
+        if self.client.is_component and message_elt.uri == component.NS_COMPONENT_ACCEPT:
+            # we use client namespace all the time to simplify parsing
+            self.normalize_ns(message_elt, component.NS_COMPONENT_ACCEPT)
+
+        # plugin can add their treatments to this deferred
+        post_treat = defer.Deferred()
+
+        d = self.host.trigger.async_point(
+            "message_received", client, message_elt, post_treat
+        )
+
+        d.addCallback(self._on_message_start_workflow, client, message_elt, post_treat)
+
+    def complete_attachments(self, data):
+        """Complete missing metadata of attachments"""
+        for attachment in data['extra'].get(C.KEY_ATTACHMENTS, []):
+            if "name" not in attachment and "url" in attachment:
+                name = (Path(unquote(urlparse(attachment['url']).path)).name
+                        or C.FILE_DEFAULT_NAME)
+                attachment["name"] = name
+            if ((C.KEY_ATTACHMENTS_MEDIA_TYPE not in attachment
+                 and "name" in attachment)):
+                media_type = mimetypes.guess_type(attachment['name'], strict=False)[0]
+                if media_type:
+                    attachment[C.KEY_ATTACHMENTS_MEDIA_TYPE] = media_type
+
+        return data
+
+    def skip_empty_message(self, data):
+        if not data["message"] and not data["extra"] and not data["subject"]:
+            raise failure.Failure(exceptions.CancelError("Cancelled empty message"))
+        return data
+
+    async def add_to_history(self, data):
+        if data.pop("history", None) == C.HISTORY_SKIP:
+            log.debug("history is skipped as requested")
+            data["extra"]["history"] = C.HISTORY_SKIP
+        else:
+            # we need a message to store
+            if self.parent.is_message_printable(data):
+                return await self.host.memory.add_to_history(self.parent, data)
+            else:
+                log.debug("not storing empty message to history: {data}"
+                    .format(data=data))
+
+    def bridge_signal(self, __, data):
+        try:
+            data["extra"]["received_timestamp"] = str(data["received_timestamp"])
+            data["extra"]["delay_sender"] = data["delay_sender"]
+        except KeyError:
+            pass
+        if self.client.encryption.isEncrypted(data):
+            data["extra"]["encrypted"] = True
+        if data is not None:
+            if self.parent.is_message_printable(data):
+                self.host.bridge.message_new(
+                    data["uid"],
+                    data["timestamp"],
+                    data["from"].full(),
+                    data["to"].full(),
+                    data["message"],
+                    data["subject"],
+                    data["type"],
+                    data_format.serialise(data["extra"]),
+                    profile=self.parent.profile,
+                )
+            else:
+                log.debug("Discarding bridge signal for empty message: {data}".format(
+                    data=data))
+        return data
+
+    def cancel_error_trap(self, failure_):
+        """A message sending can be cancelled by a plugin treatment"""
+        failure_.trap(exceptions.CancelError)
+
+
+class SatRosterProtocol(xmppim.RosterClientProtocol):
+
+    def __init__(self, host):
+        xmppim.RosterClientProtocol.__init__(self)
+        self.host = host
+        self.got_roster = defer.Deferred()  # called when roster is received and ready
+        # XXX: the two following dicts keep a local copy of the roster
+        self._jids = {}  # map from jids to RosterItem: key=jid value=RosterItem
+        self._groups = {}  # map from groups to jids: key=group value=set of jids
+
+    def __contains__(self, entity_jid):
+        return self.is_jid_in_roster(entity_jid)
+
+    @property
+    def versioning(self):
+        """True if server support roster versioning"""
+        return (NS_ROSTER_VER, 'ver') in self.parent.xmlstream.features
+
+    @property
+    def roster_cache(self):
+        """Cache of roster from storage
+
+        This property return a new PersistentDict on each call, it must be loaded
+        manually if necessary
+        """
+        return persistent.PersistentDict(NS_ROSTER_VER, self.parent.profile)
+
+    def _register_item(self, item):
+        """Register item in local cache
+
+        item must be already registered in self._jids before this method is called
+        @param item (RosterIem): item added
+        """
+        log.debug("registering item: {}".format(item.entity.full()))
+        if item.entity.resource:
+            log.warning(
+                "Received a roster item with a resource, this is not common but not "
+                "restricted by RFC 6121, this case may be not well tested."
+            )
+        if not item.subscriptionTo:
+            if not item.subscriptionFrom:
+                log.info(
+                    _("There's no subscription between you and [{}]!").format(
+                        item.entity.full()
+                    )
+                )
+            else:
+                log.info(_("You are not subscribed to [{}]!").format(item.entity.full()))
+        if not item.subscriptionFrom:
+            log.info(_("[{}] is not subscribed to you!").format(item.entity.full()))
+
+        for group in item.groups:
+            self._groups.setdefault(group, set()).add(item.entity)
+
+    @defer.inlineCallbacks
+    def _cache_roster(self, version):
+        """Serialise local roster and save it to storage
+
+        @param version(unicode): version of roster in local cache
+        """
+        roster_cache = self.roster_cache
+        yield roster_cache.clear()
+        roster_cache[ROSTER_VER_KEY] = version
+        for roster_jid, roster_item in self._jids.items():
+            roster_jid_s = roster_jid.full()
+            roster_item_elt = roster_item.toElement().toXml()
+            roster_cache[roster_jid_s] = roster_item_elt
+
+    @defer.inlineCallbacks
+    def resync(self):
+        """Ask full roster to resync database
+
+        this should not be necessary, but may be used if user suspsect roster
+        to be somehow corrupted
+        """
+        roster_cache = self.roster_cache
+        yield roster_cache.clear()
+        self._jids.clear()
+        self._groups.clear()
+        yield self.request_roster()
+
+    @defer.inlineCallbacks
+    def request_roster(self):
+        """Ask the server for Roster list """
+        if self.versioning:
+            log.info(_("our server support roster versioning, we use it"))
+            roster_cache = self.roster_cache
+            yield roster_cache.load()
+            try:
+                version = roster_cache[ROSTER_VER_KEY]
+            except KeyError:
+                log.info(_("no roster in cache, we start fresh"))
+                # u"" means we use versioning without valid roster in cache
+                version = ""
+            else:
+                log.info(_("We have roster v{version} in cache").format(version=version))
+                # we deserialise cached roster to our local cache
+                for roster_jid_s, roster_item_elt_s in roster_cache.items():
+                    if roster_jid_s == ROSTER_VER_KEY:
+                        continue
+                    roster_jid = jid.JID(roster_jid_s)
+                    roster_item_elt = generic.parseXml(roster_item_elt_s.encode('utf-8'))
+                    roster_item = xmppim.RosterItem.fromElement(roster_item_elt)
+                    self._jids[roster_jid] = roster_item
+                    self._register_item(roster_item)
+        else:
+            log.warning(_("our server doesn't support roster versioning"))
+            version = None
+
+        log.debug("requesting roster")
+        roster = yield self.getRoster(version=version)
+        if roster is None:
+            log.debug("empty roster result received, we'll get roster item with roster "
+                      "pushes")
+        else:
+            # a full roster is received
+            self._groups.clear()
+            self._jids = roster
+            for item in roster.values():
+                if not item.subscriptionTo and not item.subscriptionFrom and not item.ask:
+                    # XXX: current behaviour: we don't want contact in our roster list
+                    # if there is no presence subscription
+                    # may change in the future
+                    log.info(
+                        "Removing contact {} from roster because there is no presence "
+                        "subscription".format(
+                            item.jid
+                        )
+                    )
+                    self.removeItem(item.entity)  # FIXME: to be checked
+                else:
+                    self._register_item(item)
+            yield self._cache_roster(roster.version)
+
+        if not self.got_roster.called:
+            # got_roster may already be called if we use resync()
+            self.got_roster.callback(None)
+
+    def removeItem(self, to_jid):
+        """Remove a contact from roster list
+        @param to_jid: a JID instance
+        @return: Deferred
+        """
+        return xmppim.RosterClientProtocol.removeItem(self, to_jid)
+
+    def get_attributes(self, item):
+        """Return dictionary of attributes as used in bridge from a RosterItem
+
+        @param item: RosterItem
+        @return: dictionary of attributes
+        """
+        item_attr = {
+            "to": str(item.subscriptionTo),
+            "from": str(item.subscriptionFrom),
+            "ask": str(item.ask),
+        }
+        if item.name:
+            item_attr["name"] = item.name
+        return item_attr
+
+    def setReceived(self, request):
+        item = request.item
+        entity = item.entity
+        log.info(_("adding {entity} to roster").format(entity=entity.full()))
+        if request.version is not None:
+            # we update the cache in storage
+            roster_cache = self.roster_cache
+            roster_cache[entity.full()] = item.toElement().toXml()
+            roster_cache[ROSTER_VER_KEY] = request.version
+
+        try:  # update the cache for the groups the contact has been removed from
+            left_groups = set(self._jids[entity].groups).difference(item.groups)
+            for group in left_groups:
+                jids_set = self._groups[group]
+                jids_set.remove(entity)
+                if not jids_set:
+                    del self._groups[group]
+        except KeyError:
+            pass  # no previous item registration (or it's been cleared)
+        self._jids[entity] = item
+        self._register_item(item)
+        self.host.bridge.contact_new(
+            entity.full(), self.get_attributes(item), list(item.groups),
+            self.parent.profile
+        )
+
+    def removeReceived(self, request):
+        entity = request.item.entity
+        log.info(_("removing {entity} from roster").format(entity=entity.full()))
+        if request.version is not None:
+            # we update the cache in storage
+            roster_cache = self.roster_cache
+            try:
+                del roster_cache[request.item.entity.full()]
+            except KeyError:
+                # because we don't use load(), cache won't have the key, but it
+                # will be deleted from storage anyway
+                pass
+            roster_cache[ROSTER_VER_KEY] = request.version
+
+        # we first remove item from local cache (self._groups and self._jids)
+        try:
+            item = self._jids.pop(entity)
+        except KeyError:
+            log.error(
+                "Received a roster remove event for an item not in cache ({})".format(
+                    entity
+                )
+            )
+            return
+        for group in item.groups:
+            try:
+                jids_set = self._groups[group]
+                jids_set.remove(entity)
+                if not jids_set:
+                    del self._groups[group]
+            except KeyError:
+                log.warning(
+                    f"there is no cache for the group [{group}] of the removed roster "
+                    f"item [{entity}]"
+                )
+
+        # then we send the bridge signal
+        self.host.bridge.contact_deleted(entity.full(), self.parent.profile)
+
+    def get_groups(self):
+        """Return a list of groups"""
+        return list(self._groups.keys())
+
+    def get_item(self, entity_jid):
+        """Return RosterItem for a given jid
+
+        @param entity_jid(jid.JID): jid of the contact
+        @return(RosterItem, None): RosterItem instance
+            None if contact is not in cache
+        """
+        return self._jids.get(entity_jid, None)
+
+    def get_jids(self):
+        """Return all jids of the roster"""
+        return list(self._jids.keys())
+
+    def is_jid_in_roster(self, entity_jid):
+        """Return True if jid is in roster"""
+        if not isinstance(entity_jid, jid.JID):
+            raise exceptions.InternalError(
+                f"a JID is expected, not {type(entity_jid)}: {entity_jid!r}")
+        return entity_jid in self._jids
+
+    def is_subscribed_from(self, entity_jid: jid.JID) -> bool:
+        """Return True if entity is authorised to see our presence"""
+        try:
+            item = self._jids[entity_jid.userhostJID()]
+        except KeyError:
+            return False
+        return item.subscriptionFrom
+
+    def is_subscribed_to(self, entity_jid: jid.JID) -> bool:
+        """Return True if we are subscribed to entity"""
+        try:
+            item = self._jids[entity_jid.userhostJID()]
+        except KeyError:
+            return False
+        return item.subscriptionTo
+
+    def get_items(self):
+        """Return all items of the roster"""
+        return list(self._jids.values())
+
+    def get_jids_from_group(self, group):
+        try:
+            return self._groups[group]
+        except KeyError:
+            raise exceptions.UnknownGroupError(group)
+
+    def get_jids_set(self, type_, groups=None):
+        """Helper method to get a set of jids
+
+        @param type_(unicode): one of:
+            C.ALL: get all jids from roster
+            C.GROUP: get jids from groups (listed in "groups")
+        @groups(list[unicode]): list of groups used if type_==C.GROUP
+        @return (set(jid.JID)): set of selected jids
+        """
+        if type_ == C.ALL and groups is not None:
+            raise ValueError("groups must not be set for {} type".format(C.ALL))
+
+        if type_ == C.ALL:
+            return set(self.get_jids())
+        elif type_ == C.GROUP:
+            jids = set()
+            for group in groups:
+                jids.update(self.get_jids_from_group(group))
+            return jids
+        else:
+            raise ValueError("Unexpected type_ {}".format(type_))
+
+    def get_nick(self, entity_jid):
+        """Return a nick name for an entity
+
+        return nick choosed by user if available
+        else return user part of entity_jid
+        """
+        item = self.get_item(entity_jid)
+        if item is None:
+            return entity_jid.user
+        else:
+            return item.name or entity_jid.user
+
+
+class SatPresenceProtocol(xmppim.PresenceClientProtocol):
+
+    def __init__(self, host):
+        xmppim.PresenceClientProtocol.__init__(self)
+        self.host = host
+
+    @property
+    def client(self):
+        return self.parent
+
+    def send(self, obj):
+        presence_d = defer.succeed(None)
+        if not self.host.trigger.point("Presence send", self.parent, obj, presence_d):
+            return
+        presence_d.addCallback(lambda __: super(SatPresenceProtocol, self).send(obj))
+        return presence_d
+
+    def availableReceived(self, entity, show=None, statuses=None, priority=0):
+        if not statuses:
+            statuses = {}
+
+        if None in statuses:  # we only want string keys
+            statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None)
+
+        if not self.host.trigger.point(
+            "presence_received", self.parent, entity, show, priority, statuses
+        ):
+            return
+
+        self.host.memory.set_presence_status(
+            entity, show or "", int(priority), statuses, self.parent.profile
+        )
+
+        # now it's time to notify frontends
+        self.host.bridge.presence_update(
+            entity.full(), show or "", int(priority), statuses, self.parent.profile
+        )
+
+    def unavailableReceived(self, entity, statuses=None):
+        log.debug(
+            _("presence update for [%(entity)s] (unavailable, statuses=%(statuses)s)")
+            % {"entity": entity, C.PRESENCE_STATUSES: statuses}
+        )
+
+        if not statuses:
+            statuses = {}
+
+        if None in statuses:  # we only want string keys
+            statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None)
+
+        if not self.host.trigger.point(
+            "presence_received", self.parent, entity, C.PRESENCE_UNAVAILABLE, 0, statuses,
+        ):
+            return
+
+        # now it's time to notify frontends
+        # if the entity is not known yet in this session or is already unavailable,
+        # there is no need to send an unavailable signal
+        try:
+            presence = self.host.memory.get_entity_datum(
+                self.client, entity, "presence"
+            )
+        except (KeyError, exceptions.UnknownEntityError):
+            # the entity has not been seen yet in this session
+            pass
+        else:
+            if presence.show != C.PRESENCE_UNAVAILABLE:
+                self.host.bridge.presence_update(
+                    entity.full(),
+                    C.PRESENCE_UNAVAILABLE,
+                    0,
+                    statuses,
+                    self.parent.profile,
+                )
+
+        self.host.memory.set_presence_status(
+            entity, C.PRESENCE_UNAVAILABLE, 0, statuses, self.parent.profile
+        )
+
+    def available(self, entity=None, show=None, statuses=None, priority=None):
+        """Set a presence and statuses.
+
+        @param entity (jid.JID): entity
+        @param show (unicode): value in ('unavailable', '', 'away', 'xa', 'chat', 'dnd')
+        @param statuses (dict{unicode: unicode}): multilingual statuses with
+            the entry key beeing a language code on 2 characters or "default".
+        """
+        if priority is None:
+            try:
+                priority = int(
+                    self.host.memory.param_get_a(
+                        "Priority", "Connection", profile_key=self.parent.profile
+                    )
+                )
+            except ValueError:
+                priority = 0
+
+        if statuses is None:
+            statuses = {}
+
+        # default for us is None for wokkel
+        # so we must temporarily switch to wokkel's convention...
+        if C.PRESENCE_STATUSES_DEFAULT in statuses:
+            statuses[None] = statuses.pop(C.PRESENCE_STATUSES_DEFAULT)
+
+        presence_elt = xmppim.AvailablePresence(entity, show, statuses, priority)
+
+        # ... before switching back
+        if None in statuses:
+            statuses["default"] = statuses.pop(None)
+
+        if not self.host.trigger.point("presence_available", presence_elt, self.parent):
+            return
+        return self.send(presence_elt)
+
+    @defer.inlineCallbacks
+    def subscribed(self, entity):
+        yield self.parent.roster.got_roster
+        xmppim.PresenceClientProtocol.subscribed(self, entity)
+        self.host.memory.del_waiting_sub(entity.userhost(), self.parent.profile)
+        item = self.parent.roster.get_item(entity)
+        if (
+            not item or not item.subscriptionTo
+        ):  # we automatically subscribe to 'to' presence
+            log.debug(_('sending automatic "from" subscription request'))
+            self.subscribe(entity)
+
+    def unsubscribed(self, entity):
+        xmppim.PresenceClientProtocol.unsubscribed(self, entity)
+        self.host.memory.del_waiting_sub(entity.userhost(), self.parent.profile)
+
+    def subscribedReceived(self, entity):
+        log.debug(_("subscription approved for [%s]") % entity.userhost())
+        self.host.bridge.subscribe("subscribed", entity.userhost(), self.parent.profile)
+
+    def unsubscribedReceived(self, entity):
+        log.debug(_("unsubscription confirmed for [%s]") % entity.userhost())
+        self.host.bridge.subscribe("unsubscribed", entity.userhost(), self.parent.profile)
+
+    @defer.inlineCallbacks
+    def subscribeReceived(self, entity):
+        log.debug(_("subscription request from [%s]") % entity.userhost())
+        yield self.parent.roster.got_roster
+        item = self.parent.roster.get_item(entity)
+        if item and item.subscriptionTo:
+            # We automatically accept subscription if we are already subscribed to
+            # contact presence
+            log.debug(_("sending automatic subscription acceptance"))
+            self.subscribed(entity)
+        else:
+            self.host.memory.add_waiting_sub(
+                "subscribe", entity.userhost(), self.parent.profile
+            )
+            self.host.bridge.subscribe(
+                "subscribe", entity.userhost(), self.parent.profile
+            )
+
+    @defer.inlineCallbacks
+    def unsubscribeReceived(self, entity):
+        log.debug(_("unsubscription asked for [%s]") % entity.userhost())
+        yield self.parent.roster.got_roster
+        item = self.parent.roster.get_item(entity)
+        if item and item.subscriptionFrom:  # we automatically remove contact
+            log.debug(_("automatic contact deletion"))
+            self.host.contact_del(entity, self.parent.profile)
+        self.host.bridge.subscribe("unsubscribe", entity.userhost(), self.parent.profile)
+
+
+@implementer(iwokkel.IDisco)
+class SatDiscoProtocol(disco.DiscoClientProtocol):
+
+    def __init__(self, host):
+        disco.DiscoClientProtocol.__init__(self)
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        # those features are implemented in Wokkel (or sat_tmp.wokkel)
+        # and thus are always available
+        return [disco.DiscoFeature(NS_X_DATA),
+                disco.DiscoFeature(NS_XML_ELEMENT),
+                disco.DiscoFeature(NS_DISCO_INFO)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []
+
+
+class SatFallbackHandler(generic.FallbackHandler):
+    def __init__(self, host):
+        generic.FallbackHandler.__init__(self)
+
+    def iqFallback(self, iq):
+        if iq.handled is True:
+            return
+        log.debug("iqFallback: xml = [%s]" % (iq.toXml()))
+        generic.FallbackHandler.iqFallback(self, iq)
+
+
+class SatVersionHandler(generic.VersionHandler):
+
+    def getDiscoInfo(self, requestor, target, node):
+        # XXX: We need to work around wokkel's behaviour (namespace not added if there
+        #      is a node) as it cause issues with XEP-0115 & PEP (XEP-0163): there is a
+        #      node when server ask for disco info, and not when we generate the key, so
+        #      the hash is used with different disco features, and when the server (seen
+        #      on ejabberd) generate its own hash for security check it reject our
+        #      features (resulting in e.g. no notification on PEP)
+        return generic.VersionHandler.getDiscoInfo(self, requestor, target, None)
+
+
+@implementer(iwokkel.IDisco)
+class SatIdentityHandler(XMPPHandler):
+    """Manage disco Identity of SàT."""
+    # TODO: dynamic identity update (see docstring). Note that a XMPP entity can have
+    #       several identities
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return self.parent.identities
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []