view sat/core/patches.py @ 2689:d715d912afac

plugin XEP-0199: implementation of XMPP Ping
author Goffi <goffi@goffi.org>
date Sat, 10 Nov 2018 10:16:38 +0100
parents e9cd473a2f46
children 1ecceac3df96
line wrap: on
line source

from twisted.words.protocols.jabber import xmlstream
from twisted.internet import ssl
from wokkel import client

"""This module apply monkey patches to Twisted and Wokkel to handle certificate validation
   during XMPP connection"""


class TLSInitiatingInitializer(xmlstream.TLSInitiatingInitializer):
    check_certificate = True

    def onProceed(self, obj):
        self.xmlstream.removeObserver('/failure', self.onFailure)
        trustRoot = ssl.platformTrust() if self.check_certificate else None
        ctx = ssl.CertificateOptions(trustRoot=trustRoot)
        self.xmlstream.transport.startTLS(ctx)
        self.xmlstream.reset()
        self.xmlstream.sendHeader()
        self._deferred.callback(xmlstream.Reset)


class XMPPClient(client.XMPPClient):

    def __init__(self, jid, password, host=None, port=5222, check_certificate=True):
        self.jid = jid
        self.domain = jid.host.encode('idna')
        self.host = host
        self.port = port

        factory = HybridClientFactory(jid, password, check_certificate)

        client.StreamManager.__init__(self, factory)


def HybridClientFactory(jid, password, check_certificate=True):
    a = HybridAuthenticator(jid, password, check_certificate)

    return xmlstream.XmlStreamFactory(a)


class HybridAuthenticator(client.HybridAuthenticator):

    def __init__(self, jid, password, check_certificate):
        xmlstream.ConnectAuthenticator.__init__(self, jid.host)
        self.jid = jid
        self.password = password
        self.check_certificate = check_certificate

    def associateWithStream(self, xs):
        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)

        tlsInit = xmlstream.TLSInitiatingInitializer(xs)
        tlsInit.check_certificate = self.check_certificate
        xs.initializers = [client.client.CheckVersionInitializer(xs),
                           tlsInit,
                           client.CheckAuthInitializer(xs)]


def apply():
    xmlstream.TLSInitiatingInitializer = TLSInitiatingInitializer
    client.XMPPClient = XMPPClient
    client.HybridClientFactory = HybridClientFactory
    client.HybridAuthenticator = HybridAuthenticator