changeset 3969:8e7d5796fb23

plugin XEP-0391: implement XEP-0391 (Jingle Encrypted Transports) + XEP-0396 (JET-OMEMO): rel 378
author Goffi <goffi@goffi.org>
date Mon, 31 Oct 2022 04:09:34 +0100
parents 0dd79c6cc1d2
children 4c3361e2bf55
files sat/plugins/plugin_xep_0166.py sat/plugins/plugin_xep_0234.py sat/plugins/plugin_xep_0384.py sat/plugins/plugin_xep_0391.py
diffstat 4 files changed, 383 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0166.py	Mon Oct 31 04:04:32 2022 +0100
+++ b/sat/plugins/plugin_xep_0166.py	Mon Oct 31 04:09:34 2022 +0100
@@ -17,21 +17,25 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
-import uuid
+from collections import namedtuple
 import time
-from typing import Tuple
-from collections import namedtuple
-from zope.interface import implementer
-from twisted.words.protocols.jabber import jid
+from typing import Any, Dict, Tuple
+import uuid
+
 from twisted.internet import defer
 from twisted.internet import reactor
+from twisted.python import failure
+from twisted.words.protocols.jabber import jid
 from twisted.words.protocols.jabber import error
 from twisted.words.protocols.jabber import xmlstream
-from twisted.python import failure
+from twisted.words.xish import domish
 from wokkel import disco, iwokkel
+from zope.interface import implementer
+
 from sat.core import exceptions
-from sat.core.i18n import _, D_
 from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import D_, _
 from sat.core.log import getLogger
 from sat.tools import xml_tools
 from sat.tools import utils
@@ -65,7 +69,8 @@
 TransportData = namedtuple("TransportData", ("namespace", "handler", "priority"))
 
 
-class XEP_0166(object):
+class XEP_0166:
+    namespace = NS_JINGLE
     ROLE_INITIATOR = "initiator"
     ROLE_RESPONDER = "responder"
     TRANSPORT_DATAGRAM = "UDP"
@@ -372,7 +377,7 @@
             content_name = content["name"] = str(uuid.uuid4())
         return application, app_args, app_kwargs, content_name
 
-    async def initiate(self, client, peer_jid, contents):
+    async def initiate(self, client, peer_jid, contents, encrypted=False):
         """Send a session initiation request
 
         @param peer_jid(jid.JID): jid to establith session with
@@ -387,6 +392,8 @@
                     default to BOTH (see XEP-0166 §7.3)
                 - app_args(list): args to pass to the application plugin
                 - app_kwargs(dict): keyword args to pass to the application plugin
+        @param encrypted: if True, session must be encrypted and "encryption" must be set
+            to all content data of session
         @return (unicode): jingle session id
         """
         assert contents  # there must be at least one content
@@ -471,6 +478,18 @@
             )
             content_elt.addChild(transport_elt)
 
+        if not await self.host.trigger.asyncPoint(
+            "XEP-0166_initiate_elt_built",
+            client, session, iq_elt, jingle_elt
+        ):
+            return
+        if encrypted:
+            for content in session["contents"].values():
+                if "encryption" not in content:
+                    raise exceptions.EncryptionError(
+                        "Encryption is requested, but no encryption has been set"
+                    )
+
         try:
             await iq_elt.send()
         except Exception as e:
@@ -514,7 +533,14 @@
 
     ## jingle events ##
 
-    def _onJingleRequest(self, request, client):
+    def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None:
+        defer.ensureDeferred(self.on_jingle_request(client, request))
+
+    async def on_jingle_request(
+        self,
+        client: SatXMPPEntity,
+        request: domish.Element
+    ) -> None:
         """Called when any jingle request is received
 
         The request will then be dispatched to appropriate method
@@ -594,7 +620,7 @@
                 raise exceptions.InternalError
 
         if action == XEP_0166.A_SESSION_INITIATE:
-            self.onSessionInitiate(client, request, jingle_elt, session)
+            await self.onSessionInitiate(client, request, jingle_elt, session)
         elif action == XEP_0166.A_SESSION_TERMINATE:
             self.onSessionTerminate(client, request, jingle_elt, session)
         elif action == XEP_0166.A_SESSION_ACCEPT:
@@ -796,7 +822,13 @@
 
         return defers_list
 
-    def onSessionInitiate(self, client, request, jingle_elt, session):
+    async def onSessionInitiate(
+        self,
+        client: SatXMPPEntity,
+        request: domish.Element,
+        jingle_elt: domish.Element,
+        session: Dict[str, Any]
+    ) -> None:
         """Called on session-initiate action
 
         The "jingleRequestConfirmation" method of each application will be called
@@ -829,6 +861,13 @@
         # at this point we can send the <iq/> result to confirm reception of the request
         client.send(xmlstream.toResponse(request, "result"))
 
+
+        if not await self.host.trigger.asyncPoint(
+            "XEP-0166_on_session_initiate",
+            client, session, request, jingle_elt
+        ):
+            return
+
         # we now request each application plugin confirmation
         # and if all are accepted, we can accept the session
         confirm_defers = self._callPlugins(
@@ -1199,7 +1238,7 @@
 
     def connectionInitialized(self):
         self.xmlstream.addObserver(
-            JINGLE_REQUEST, self.plugin_parent._onJingleRequest, client=self.parent
+            JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent
         )
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
--- a/sat/plugins/plugin_xep_0234.py	Mon Oct 31 04:04:32 2022 +0100
+++ b/sat/plugins/plugin_xep_0234.py	Mon Oct 31 04:09:34 2022 +0100
@@ -16,22 +16,24 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-import os.path
+from collections import namedtuple
 import mimetypes
-from collections import namedtuple
-from zope.interface import implementer
-from twisted.words.xish import domish
-from twisted.words.protocols.jabber import jid
-from twisted.python import failure
-from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+import os.path
+
 from twisted.internet import defer
 from twisted.internet import reactor
 from twisted.internet import error as internet_error
+from twisted.python import failure
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
 from wokkel import disco, iwokkel
-from sat.core.i18n import _, D_
+from zope.interface import implementer
+
+from sat.core import exceptions
 from sat.core.constants import Const as C
+from sat.core.i18n import D_, _
 from sat.core.log import getLogger
-from sat.core import exceptions
 from sat.tools import utils
 from sat.tools import stream
 from sat.tools.common import date_utils
@@ -329,6 +331,7 @@
             extra = {}
         if file_desc is not None:
             extra["file_desc"] = file_desc
+        encrypted = extra.pop("encrypted", False)
         await self._j.initiate(
             client,
             peer_jid,
@@ -344,6 +347,7 @@
                     },
                 }
             ],
+            encrypted = encrypted
         )
         return await progress_id_d
 
@@ -490,7 +494,7 @@
             )
         else:
             # we receive the file
-            return await self._fileReceivingRequestConf(
+            return await self._file_receiving_request_conf(
                 client, session, content_data, content_name, file_data, file_elt
             )
 
@@ -524,7 +528,7 @@
         log.warning(_("File continue is not implemented yet"))
         return False
 
-    async def _fileReceivingRequestConf(
+    async def _file_receiving_request_conf(
         self, client, session, content_data, content_name, file_data, file_elt
     ):
         """parse file_elt, and handle user permission/file opening"""
@@ -564,14 +568,17 @@
             client, session["peer_jid"], content_data, file_data, stream_object=True
         )
         if confirmed:
+            await self.host.trigger.asyncPoint(
+                "XEP-0234_file_receiving_request_conf",
+                client, session, content_data, file_elt
+            )
             args = [client, session, content_name, content_data]
             finished_d.addCallbacks(
                 self._finishedCb, self._finishedEb, args, None, args
             )
         return confirmed
 
-    @defer.inlineCallbacks
-    def jingleHandler(self, client, action, session, content_name, desc_elt):
+    async def jingleHandler(self, client, action, session, content_name, desc_elt):
         content_data = session["contents"][content_name]
         application_data = content_data["application_data"]
         if action in (self._j.A_ACCEPTED_ACK,):
@@ -617,7 +624,7 @@
                     self.host.bridge.progressError(
                         progress_id, C.PROGRESS_ERROR_FAILED, client.profile
                     )
-                    yield self._j.terminate(
+                    await self._j.terminate(
                         client, self._j.REASON_FAILED_APPLICATION, session)
                     raise e
             else:
@@ -637,9 +644,13 @@
             finished_d = content_data["finished_d"] = defer.Deferred()
             args = [client, session, content_name, content_data]
             finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
+            await self.host.trigger.asyncPoint(
+                "XEP-0234_jingle_handler",
+                client, session, content_data, desc_elt
+            )
         else:
             log.warning("FIXME: unmanaged action {}".format(action))
-        defer.returnValue(desc_elt)
+        return desc_elt
 
     def jingleSessionInfo(self, client, action, session, content_name, jingle_elt):
         """Called on session-info action
--- a/sat/plugins/plugin_xep_0384.py	Mon Oct 31 04:04:32 2022 +0100
+++ b/sat/plugins/plugin_xep_0384.py	Mon Oct 31 04:09:34 2022 +0100
@@ -1630,7 +1630,9 @@
         @param client: The client.
         """
 
-        await self.__prepare_for_profile(cast(str, client.profile))
+        await self.get_session_manager(
+            cast(str, client.profile)
+        )
 
     async def cmd_omemo_reset(
         self,
@@ -1665,7 +1667,7 @@
 
         bare_jid = mess_data["to"].userhost()
 
-        session_manager = await self.__prepare_for_profile(client.profile)
+        session_manager = await self.get_session_manager(client.profile)
         devices = await session_manager.get_device_information(bare_jid)
 
         for device in devices:
@@ -1701,7 +1703,7 @@
         else:
             bare_jids = { entity.userhost() }
 
-        session_manager = await self.__prepare_for_profile(client.profile)
+        session_manager = await self.get_session_manager(client.profile)
 
         # At least sort the devices by bare JID such that they aren't listed completely
         # random
@@ -1896,7 +1898,7 @@
 
         return bare_jids
 
-    async def __prepare_for_profile(self, profile: str) -> omemo.SessionManager:
+    async def get_session_manager(self, profile: str) -> omemo.SessionManager:
         """
         @param profile: The profile to prepare for.
         @return: A session manager instance for this profile. Creates a new instance if
@@ -2198,7 +2200,7 @@
         ))
 
         try:
-            session_manager = await self.__prepare_for_profile(cast(str, client.profile))
+            session_manager = await self.get_session_manager(cast(str, client.profile))
         except Exception as e:
             log.error(f"error while preparing profile for {client.profile}: {e}")
             # we don't want to block the workflow
@@ -2265,7 +2267,7 @@
             device_information, __ = await session_manager.get_own_device_information()
         else:
             try:
-                plaintext, device_information = await session_manager.decrypt(message)
+                plaintext, device_information, __ = await session_manager.decrypt(message)
             except omemo.MessageNotForUs:
                 # The difference between this being a debug or a warning is whether there
                 # is a body included in the message. Without a body, we can assume that
@@ -2584,7 +2586,7 @@
 
         log.debug(f"Plaintext to encrypt: {plaintext}")
 
-        session_manager = await self.__prepare_for_profile(client.profile)
+        session_manager = await self.get_session_manager(client.profile)
 
         try:
             messages, encryption_errors = await session_manager.encrypt(
@@ -2702,7 +2704,7 @@
             )
             return
 
-        session_manager = await self.__prepare_for_profile(profile)
+        session_manager = await self.get_session_manager(profile)
 
         await session_manager.update_device_list(
             namespace,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0391.py	Mon Oct 31 04:09:34 2022 +0100
@@ -0,0 +1,295 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for Jingle Encrypted Transports
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from base64 import b64encode
+from functools import partial
+import io
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+from twisted.words.protocols.jabber import error, jid, xmlstream
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+from cryptography.exceptions import AlreadyFinalized
+from cryptography.hazmat import backends
+from cryptography.hazmat.primitives import ciphers
+from cryptography.hazmat.primitives.ciphers import Cipher, CipherContext, modes
+from cryptography.hazmat.primitives.padding import PKCS7, PaddingContext
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.tools import xml_tools
+
+try:
+    import oldmemo
+    import oldmemo.etree
+except ImportError as import_error:
+    raise exceptions.MissingModule(
+        "You are missing one or more package required by the OMEMO plugin. Please"
+        " download/install the pip packages 'oldmemo'."
+    ) from import_error
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "XEP-0391"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Jingle Encrypted Transports",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_TYPE: C.PLUG_TYPE_XEP,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0391", "XEP-0396"],
+    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0384"],
+    C.PI_MAIN: "JET",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""End-to-end encryption of Jingle transports"""),
+}
+
+NS_JET = "urn:xmpp:jingle:jet:0"
+NS_JET_OMEMO = "urn:xmpp:jingle:jet-omemo:0"
+
+
+class JET:
+    namespace = NS_JET
+
+    def __init__(self, host):
+        log.info(_("XEP-0391 (Pubsub Attachments) plugin initialization"))
+        host.registerNamespace("jet", NS_JET)
+        self.host = host
+        self._o = host.plugins["XEP-0384"]
+        self._j = host.plugins["XEP-0166"]
+        host.trigger.add(
+            "XEP-0166_initiate_elt_built",
+            self._on_initiate_elt_build
+        )
+        host.trigger.add(
+            "XEP-0166_on_session_initiate",
+            self._on_session_initiate
+        )
+        host.trigger.add(
+            "XEP-0234_jingle_handler",
+            self._add_encryption_filter
+        )
+        host.trigger.add(
+            "XEP-0234_file_receiving_request_conf",
+            self._add_encryption_filter
+        )
+
+    def getHandler(self, client):
+        return JET_Handler()
+
+    async def _on_initiate_elt_build(
+        self,
+        client: SatXMPPEntity,
+        session: Dict[str, Any],
+        iq_elt: domish.Element,
+        jingle_elt: domish.Element
+    ) -> bool:
+        if client.encryption.get_namespace(
+               session["peer_jid"].userhostJID()
+           ) != self._o.NS_OLDMEMO:
+            return True
+        for content_elt in jingle_elt.elements(self._j.namespace, "content"):
+            content_data = session["contents"][content_elt["name"]]
+            security_elt = content_elt.addElement((NS_JET, "security"))
+            security_elt["name"] = content_elt["name"]
+            # XXX: for now only OLDMEMO is supported, thus we do it directly here. If some
+            #   other are supported in the future, a plugin registering mechanism will be
+            #   implemented.
+            cipher = "urn:xmpp:ciphers:aes-128-gcm-nopadding"
+            enc_type = "eu.siacs.conversations.axolotl"
+            security_elt["cipher"] = cipher
+            security_elt["type"] = enc_type
+            encryption_data = content_data["encryption"] = {
+                "cipher": cipher,
+                "type": enc_type
+            }
+            session_manager = await self._o.get_session_manager(client.profile)
+            try:
+                messages, encryption_errors = await session_manager.encrypt(
+                    frozenset({session["peer_jid"].userhost()}),
+                    # the value seems to be the commonly used value
+                    { self._o.NS_OLDMEMO: b" " },
+                    backend_priority_order=[ self._o.NS_OLDMEMO ],
+                    identifier = client.jid.userhost()
+                )
+            except Exception as e:
+                log.error("Can't generate IV and keys: {e}")
+                raise e
+            message, plain_key_material = next(iter(messages.items()))
+            iv, key = message.content.initialization_vector, plain_key_material.key
+            content_data["encryption"].update({
+                "iv": iv,
+                "key": key
+            })
+            encrypted_elt = xml_tools.et_elt_2_domish_elt(
+                oldmemo.etree.serialize_message(message)
+            )
+            security_elt.addChild(encrypted_elt)
+        return True
+
+    async def _on_session_initiate(
+        self,
+        client: SatXMPPEntity,
+        session: Dict[str, Any],
+        iq_elt: domish.Element,
+        jingle_elt: domish.Element
+    ) -> bool:
+        if client.encryption.get_namespace(
+               session["peer_jid"].userhostJID()
+           ) != self._o.NS_OLDMEMO:
+            return True
+        for content_elt in jingle_elt.elements(self._j.namespace, "content"):
+            content_data = session["contents"][content_elt["name"]]
+            security_elt = next(content_elt.elements(NS_JET, "security"), None)
+            if security_elt is None:
+                continue
+            encrypted_elt = next(
+                security_elt.elements(self._o.NS_OLDMEMO, "encrypted"), None
+            )
+            if encrypted_elt is None:
+                log.warning(
+                    "missing <encrypted> element, can't decrypt: {security_elt.toXml()}"
+                )
+                continue
+            session_manager = await self._o.get_session_manager(client.profile)
+            try:
+                message = await oldmemo.etree.parse_message(
+                    xml_tools.domish_elt_2_et_elt(encrypted_elt, False),
+                    session["peer_jid"].userhost(),
+                    client.jid.userhost(),
+                    session_manager
+                )
+                __, __, plain_key_material = await session_manager.decrypt(message)
+            except Exception as e:
+                log.warning(f"Can't get IV and key: {e}\n{security_elt.toXml()}")
+                continue
+            try:
+                content_data["encryption"] = {
+                    "cipher": security_elt["cipher"],
+                    "type": security_elt["type"],
+                    "iv": message.content.initialization_vector,
+                    "key": plain_key_material.key
+                }
+            except KeyError as e:
+                log.warning(f"missing data, can't decrypt: {e}")
+                continue
+
+        return True
+
+    def __encrypt(
+        self,
+        data: bytes,
+        encryptor: CipherContext,
+        data_cb: Callable
+    ) -> bytes:
+        data_cb(data)
+        if data:
+            return encryptor.update(data)
+        else:
+            try:
+                return encryptor.finalize() + encryptor.tag
+            except AlreadyFinalized:
+                return b''
+
+    def __decrypt(
+        self,
+        data: bytes,
+        buffer: list[bytes],
+        decryptor: CipherContext,
+        data_cb: Callable
+    ) -> bytes:
+        buffer.append(data)
+        data = b''.join(buffer)
+        buffer.clear()
+        if len(data) > 16:
+            decrypted = decryptor.update(data[:-16])
+            data_cb(decrypted)
+        else:
+            decrypted = b''
+        buffer.append(data[-16:])
+        return decrypted
+
+    def __decrypt_finalize(
+        self,
+        file_obj: io.BytesIO,
+        buffer: list[bytes],
+        decryptor: CipherContext,
+    ) -> None:
+        tag = b''.join(buffer)
+        file_obj.write(decryptor.finalize_with_tag(tag))
+
+    async def _add_encryption_filter(
+        self,
+        client: SatXMPPEntity,
+        session: Dict[str, Any],
+        content_data: Dict[str, Any],
+        elt: domish.Element
+    ) -> bool:
+        file_obj = content_data["stream_object"].file_obj
+        try:
+            encryption_data=content_data["encryption"]
+        except KeyError:
+            return True
+        cipher = ciphers.Cipher(
+            ciphers.algorithms.AES(encryption_data["key"]),
+            modes.GCM(encryption_data["iv"]),
+            backend=backends.default_backend(),
+        )
+        if file_obj.mode == "wb":
+            # we are receiving a file
+            buffer = []
+            decryptor = cipher.decryptor()
+            file_obj.pre_close_cb = partial(
+                self.__decrypt_finalize,
+                file_obj=file_obj,
+                buffer=buffer,
+                decryptor=decryptor
+            )
+            file_obj.data_cb = partial(
+                self.__decrypt,
+                buffer=buffer,
+                decryptor=decryptor,
+                data_cb=file_obj.data_cb
+            )
+        else:
+            # we are sending a file
+            file_obj.data_cb = partial(
+                self.__encrypt,
+                encryptor=cipher.encryptor(),
+                data_cb=file_obj.data_cb
+            )
+
+        return True
+
+
+@implementer(iwokkel.IDisco)
+class JET_Handler(xmlstream.XMPPHandler):
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        return [
+            disco.DiscoFeature(NS_JET),
+            disco.DiscoFeature(NS_JET_OMEMO),
+        ]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []