view sat/core/patches.py @ 2687:e9cd473a2f46

core (xmpp): server certificate validation: XMPP server certificate is now checked, and connection is refused (by default) if it's not valid. Certificate check can be disabled in the new parameter "Configuration/check_certificate". If certificate checking is disabled, a warning note is sent on every new connection. Twisted and Wokkel are temporarly monkey patched in sat.core.tls_patches module, until modifications are merged upstream.
author Goffi <goffi@goffi.org>
date Sat, 10 Nov 2018 10:16:35 +0100
parents
children 1ecceac3df96
line wrap: on
line source

from twisted.words.protocols.jabber import xmlstream
from twisted.internet import ssl
from wokkel import client

"""This module apply monkey patches to Twisted and Wokkel to handle certificate validation
   during XMPP connection"""


class TLSInitiatingInitializer(xmlstream.TLSInitiatingInitializer):
    check_certificate = True

    def onProceed(self, obj):
        self.xmlstream.removeObserver('/failure', self.onFailure)
        trustRoot = ssl.platformTrust() if self.check_certificate else None
        ctx = ssl.CertificateOptions(trustRoot=trustRoot)
        self.xmlstream.transport.startTLS(ctx)
        self.xmlstream.reset()
        self.xmlstream.sendHeader()
        self._deferred.callback(xmlstream.Reset)


class XMPPClient(client.XMPPClient):

    def __init__(self, jid, password, host=None, port=5222, check_certificate=True):
        self.jid = jid
        self.domain = jid.host.encode('idna')
        self.host = host
        self.port = port

        factory = HybridClientFactory(jid, password, check_certificate)

        client.StreamManager.__init__(self, factory)


def HybridClientFactory(jid, password, check_certificate=True):
    a = HybridAuthenticator(jid, password, check_certificate)

    return xmlstream.XmlStreamFactory(a)


class HybridAuthenticator(client.HybridAuthenticator):

    def __init__(self, jid, password, check_certificate):
        xmlstream.ConnectAuthenticator.__init__(self, jid.host)
        self.jid = jid
        self.password = password
        self.check_certificate = check_certificate

    def associateWithStream(self, xs):
        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)

        tlsInit = xmlstream.TLSInitiatingInitializer(xs)
        tlsInit.check_certificate = self.check_certificate
        xs.initializers = [client.client.CheckVersionInitializer(xs),
                           tlsInit,
                           client.CheckAuthInitializer(xs)]


def apply():
    xmlstream.TLSInitiatingInitializer = TLSInitiatingInitializer
    client.XMPPClient = XMPPClient
    client.HybridClientFactory = HybridClientFactory
    client.HybridAuthenticator = HybridAuthenticator