# HG changeset patch # User Goffi # Date 1667185774 -3600 # Node ID 8e7d5796fb23d4f15251f7a27a7b1fa2749fd8a6 # Parent 0dd79c6cc1d2bb8ebcfa058294c872f9ac288621 plugin XEP-0391: implement XEP-0391 (Jingle Encrypted Transports) + XEP-0396 (JET-OMEMO): rel 378 diff -r 0dd79c6cc1d2 -r 8e7d5796fb23 sat/plugins/plugin_xep_0166.py --- a/sat/plugins/plugin_xep_0166.py Mon Oct 31 04:04:32 2022 +0100 +++ b/sat/plugins/plugin_xep_0166.py Mon Oct 31 04:09:34 2022 +0100 @@ -17,21 +17,25 @@ # along with this program. If not, see . -import uuid +from collections import namedtuple import time -from typing import Tuple -from collections import namedtuple -from zope.interface import implementer -from twisted.words.protocols.jabber import jid +from typing import Any, Dict, Tuple +import uuid + from twisted.internet import defer from twisted.internet import reactor +from twisted.python import failure +from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error from twisted.words.protocols.jabber import xmlstream -from twisted.python import failure +from twisted.words.xish import domish from wokkel import disco, iwokkel +from zope.interface import implementer + from sat.core import exceptions -from sat.core.i18n import _, D_ from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import D_, _ from sat.core.log import getLogger from sat.tools import xml_tools from sat.tools import utils @@ -65,7 +69,8 @@ TransportData = namedtuple("TransportData", ("namespace", "handler", "priority")) -class XEP_0166(object): +class XEP_0166: + namespace = NS_JINGLE ROLE_INITIATOR = "initiator" ROLE_RESPONDER = "responder" TRANSPORT_DATAGRAM = "UDP" @@ -372,7 +377,7 @@ content_name = content["name"] = str(uuid.uuid4()) return application, app_args, app_kwargs, content_name - async def initiate(self, client, peer_jid, contents): + async def initiate(self, client, peer_jid, contents, encrypted=False): """Send a session initiation request @param peer_jid(jid.JID): jid to establith session with @@ -387,6 +392,8 @@ default to BOTH (see XEP-0166 §7.3) - app_args(list): args to pass to the application plugin - app_kwargs(dict): keyword args to pass to the application plugin + @param encrypted: if True, session must be encrypted and "encryption" must be set + to all content data of session @return (unicode): jingle session id """ assert contents # there must be at least one content @@ -471,6 +478,18 @@ ) content_elt.addChild(transport_elt) + if not await self.host.trigger.asyncPoint( + "XEP-0166_initiate_elt_built", + client, session, iq_elt, jingle_elt + ): + return + if encrypted: + for content in session["contents"].values(): + if "encryption" not in content: + raise exceptions.EncryptionError( + "Encryption is requested, but no encryption has been set" + ) + try: await iq_elt.send() except Exception as e: @@ -514,7 +533,14 @@ ## jingle events ## - def _onJingleRequest(self, request, client): + def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None: + defer.ensureDeferred(self.on_jingle_request(client, request)) + + async def on_jingle_request( + self, + client: SatXMPPEntity, + request: domish.Element + ) -> None: """Called when any jingle request is received The request will then be dispatched to appropriate method @@ -594,7 +620,7 @@ raise exceptions.InternalError if action == XEP_0166.A_SESSION_INITIATE: - self.onSessionInitiate(client, request, jingle_elt, session) + await self.onSessionInitiate(client, request, jingle_elt, session) elif action == XEP_0166.A_SESSION_TERMINATE: self.onSessionTerminate(client, request, jingle_elt, session) elif action == XEP_0166.A_SESSION_ACCEPT: @@ -796,7 +822,13 @@ return defers_list - def onSessionInitiate(self, client, request, jingle_elt, session): + async def onSessionInitiate( + self, + client: SatXMPPEntity, + request: domish.Element, + jingle_elt: domish.Element, + session: Dict[str, Any] + ) -> None: """Called on session-initiate action The "jingleRequestConfirmation" method of each application will be called @@ -829,6 +861,13 @@ # at this point we can send the result to confirm reception of the request client.send(xmlstream.toResponse(request, "result")) + + if not await self.host.trigger.asyncPoint( + "XEP-0166_on_session_initiate", + client, session, request, jingle_elt + ): + return + # we now request each application plugin confirmation # and if all are accepted, we can accept the session confirm_defers = self._callPlugins( @@ -1199,7 +1238,7 @@ def connectionInitialized(self): self.xmlstream.addObserver( - JINGLE_REQUEST, self.plugin_parent._onJingleRequest, client=self.parent + JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent ) def getDiscoInfo(self, requestor, target, nodeIdentifier=""): diff -r 0dd79c6cc1d2 -r 8e7d5796fb23 sat/plugins/plugin_xep_0234.py --- a/sat/plugins/plugin_xep_0234.py Mon Oct 31 04:04:32 2022 +0100 +++ b/sat/plugins/plugin_xep_0234.py Mon Oct 31 04:09:34 2022 +0100 @@ -16,22 +16,24 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import os.path +from collections import namedtuple import mimetypes -from collections import namedtuple -from zope.interface import implementer -from twisted.words.xish import domish -from twisted.words.protocols.jabber import jid -from twisted.python import failure -from twisted.words.protocols.jabber.xmlstream import XMPPHandler +import os.path + from twisted.internet import defer from twisted.internet import reactor from twisted.internet import error as internet_error +from twisted.python import failure +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish from wokkel import disco, iwokkel -from sat.core.i18n import _, D_ +from zope.interface import implementer + +from sat.core import exceptions from sat.core.constants import Const as C +from sat.core.i18n import D_, _ from sat.core.log import getLogger -from sat.core import exceptions from sat.tools import utils from sat.tools import stream from sat.tools.common import date_utils @@ -329,6 +331,7 @@ extra = {} if file_desc is not None: extra["file_desc"] = file_desc + encrypted = extra.pop("encrypted", False) await self._j.initiate( client, peer_jid, @@ -344,6 +347,7 @@ }, } ], + encrypted = encrypted ) return await progress_id_d @@ -490,7 +494,7 @@ ) else: # we receive the file - return await self._fileReceivingRequestConf( + return await self._file_receiving_request_conf( client, session, content_data, content_name, file_data, file_elt ) @@ -524,7 +528,7 @@ log.warning(_("File continue is not implemented yet")) return False - async def _fileReceivingRequestConf( + async def _file_receiving_request_conf( self, client, session, content_data, content_name, file_data, file_elt ): """parse file_elt, and handle user permission/file opening""" @@ -564,14 +568,17 @@ client, session["peer_jid"], content_data, file_data, stream_object=True ) if confirmed: + await self.host.trigger.asyncPoint( + "XEP-0234_file_receiving_request_conf", + client, session, content_data, file_elt + ) args = [client, session, content_name, content_data] finished_d.addCallbacks( self._finishedCb, self._finishedEb, args, None, args ) return confirmed - @defer.inlineCallbacks - def jingleHandler(self, client, action, session, content_name, desc_elt): + async def jingleHandler(self, client, action, session, content_name, desc_elt): content_data = session["contents"][content_name] application_data = content_data["application_data"] if action in (self._j.A_ACCEPTED_ACK,): @@ -617,7 +624,7 @@ self.host.bridge.progressError( progress_id, C.PROGRESS_ERROR_FAILED, client.profile ) - yield self._j.terminate( + await self._j.terminate( client, self._j.REASON_FAILED_APPLICATION, session) raise e else: @@ -637,9 +644,13 @@ finished_d = content_data["finished_d"] = defer.Deferred() args = [client, session, content_name, content_data] finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) + await self.host.trigger.asyncPoint( + "XEP-0234_jingle_handler", + client, session, content_data, desc_elt + ) else: log.warning("FIXME: unmanaged action {}".format(action)) - defer.returnValue(desc_elt) + return desc_elt def jingleSessionInfo(self, client, action, session, content_name, jingle_elt): """Called on session-info action diff -r 0dd79c6cc1d2 -r 8e7d5796fb23 sat/plugins/plugin_xep_0384.py --- a/sat/plugins/plugin_xep_0384.py Mon Oct 31 04:04:32 2022 +0100 +++ b/sat/plugins/plugin_xep_0384.py Mon Oct 31 04:09:34 2022 +0100 @@ -1630,7 +1630,9 @@ @param client: The client. """ - await self.__prepare_for_profile(cast(str, client.profile)) + await self.get_session_manager( + cast(str, client.profile) + ) async def cmd_omemo_reset( self, @@ -1665,7 +1667,7 @@ bare_jid = mess_data["to"].userhost() - session_manager = await self.__prepare_for_profile(client.profile) + session_manager = await self.get_session_manager(client.profile) devices = await session_manager.get_device_information(bare_jid) for device in devices: @@ -1701,7 +1703,7 @@ else: bare_jids = { entity.userhost() } - session_manager = await self.__prepare_for_profile(client.profile) + session_manager = await self.get_session_manager(client.profile) # At least sort the devices by bare JID such that they aren't listed completely # random @@ -1896,7 +1898,7 @@ return bare_jids - async def __prepare_for_profile(self, profile: str) -> omemo.SessionManager: + async def get_session_manager(self, profile: str) -> omemo.SessionManager: """ @param profile: The profile to prepare for. @return: A session manager instance for this profile. Creates a new instance if @@ -2198,7 +2200,7 @@ )) try: - session_manager = await self.__prepare_for_profile(cast(str, client.profile)) + session_manager = await self.get_session_manager(cast(str, client.profile)) except Exception as e: log.error(f"error while preparing profile for {client.profile}: {e}") # we don't want to block the workflow @@ -2265,7 +2267,7 @@ device_information, __ = await session_manager.get_own_device_information() else: try: - plaintext, device_information = await session_manager.decrypt(message) + plaintext, device_information, __ = await session_manager.decrypt(message) except omemo.MessageNotForUs: # The difference between this being a debug or a warning is whether there # is a body included in the message. Without a body, we can assume that @@ -2584,7 +2586,7 @@ log.debug(f"Plaintext to encrypt: {plaintext}") - session_manager = await self.__prepare_for_profile(client.profile) + session_manager = await self.get_session_manager(client.profile) try: messages, encryption_errors = await session_manager.encrypt( @@ -2702,7 +2704,7 @@ ) return - session_manager = await self.__prepare_for_profile(profile) + session_manager = await self.get_session_manager(profile) await session_manager.update_device_list( namespace, diff -r 0dd79c6cc1d2 -r 8e7d5796fb23 sat/plugins/plugin_xep_0391.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0391.py Mon Oct 31 04:09:34 2022 +0100 @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 + +# Libervia plugin for Jingle Encrypted Transports +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from base64 import b64encode +from functools import partial +import io +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +from twisted.words.protocols.jabber import error, jid, xmlstream +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer +from cryptography.exceptions import AlreadyFinalized +from cryptography.hazmat import backends +from cryptography.hazmat.primitives import ciphers +from cryptography.hazmat.primitives.ciphers import Cipher, CipherContext, modes +from cryptography.hazmat.primitives.padding import PKCS7, PaddingContext + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.tools import xml_tools + +try: + import oldmemo + import oldmemo.etree +except ImportError as import_error: + raise exceptions.MissingModule( + "You are missing one or more package required by the OMEMO plugin. Please" + " download/install the pip packages 'oldmemo'." + ) from import_error + + +log = getLogger(__name__) + +IMPORT_NAME = "XEP-0391" + +PLUGIN_INFO = { + C.PI_NAME: "Jingle Encrypted Transports", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_TYPE: C.PLUG_TYPE_XEP, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0391", "XEP-0396"], + C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0384"], + C.PI_MAIN: "JET", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""End-to-end encryption of Jingle transports"""), +} + +NS_JET = "urn:xmpp:jingle:jet:0" +NS_JET_OMEMO = "urn:xmpp:jingle:jet-omemo:0" + + +class JET: + namespace = NS_JET + + def __init__(self, host): + log.info(_("XEP-0391 (Pubsub Attachments) plugin initialization")) + host.registerNamespace("jet", NS_JET) + self.host = host + self._o = host.plugins["XEP-0384"] + self._j = host.plugins["XEP-0166"] + host.trigger.add( + "XEP-0166_initiate_elt_built", + self._on_initiate_elt_build + ) + host.trigger.add( + "XEP-0166_on_session_initiate", + self._on_session_initiate + ) + host.trigger.add( + "XEP-0234_jingle_handler", + self._add_encryption_filter + ) + host.trigger.add( + "XEP-0234_file_receiving_request_conf", + self._add_encryption_filter + ) + + def getHandler(self, client): + return JET_Handler() + + async def _on_initiate_elt_build( + self, + client: SatXMPPEntity, + session: Dict[str, Any], + iq_elt: domish.Element, + jingle_elt: domish.Element + ) -> bool: + if client.encryption.get_namespace( + session["peer_jid"].userhostJID() + ) != self._o.NS_OLDMEMO: + return True + for content_elt in jingle_elt.elements(self._j.namespace, "content"): + content_data = session["contents"][content_elt["name"]] + security_elt = content_elt.addElement((NS_JET, "security")) + security_elt["name"] = content_elt["name"] + # XXX: for now only OLDMEMO is supported, thus we do it directly here. If some + # other are supported in the future, a plugin registering mechanism will be + # implemented. + cipher = "urn:xmpp:ciphers:aes-128-gcm-nopadding" + enc_type = "eu.siacs.conversations.axolotl" + security_elt["cipher"] = cipher + security_elt["type"] = enc_type + encryption_data = content_data["encryption"] = { + "cipher": cipher, + "type": enc_type + } + session_manager = await self._o.get_session_manager(client.profile) + try: + messages, encryption_errors = await session_manager.encrypt( + frozenset({session["peer_jid"].userhost()}), + # the value seems to be the commonly used value + { self._o.NS_OLDMEMO: b" " }, + backend_priority_order=[ self._o.NS_OLDMEMO ], + identifier = client.jid.userhost() + ) + except Exception as e: + log.error("Can't generate IV and keys: {e}") + raise e + message, plain_key_material = next(iter(messages.items())) + iv, key = message.content.initialization_vector, plain_key_material.key + content_data["encryption"].update({ + "iv": iv, + "key": key + }) + encrypted_elt = xml_tools.et_elt_2_domish_elt( + oldmemo.etree.serialize_message(message) + ) + security_elt.addChild(encrypted_elt) + return True + + async def _on_session_initiate( + self, + client: SatXMPPEntity, + session: Dict[str, Any], + iq_elt: domish.Element, + jingle_elt: domish.Element + ) -> bool: + if client.encryption.get_namespace( + session["peer_jid"].userhostJID() + ) != self._o.NS_OLDMEMO: + return True + for content_elt in jingle_elt.elements(self._j.namespace, "content"): + content_data = session["contents"][content_elt["name"]] + security_elt = next(content_elt.elements(NS_JET, "security"), None) + if security_elt is None: + continue + encrypted_elt = next( + security_elt.elements(self._o.NS_OLDMEMO, "encrypted"), None + ) + if encrypted_elt is None: + log.warning( + "missing element, can't decrypt: {security_elt.toXml()}" + ) + continue + session_manager = await self._o.get_session_manager(client.profile) + try: + message = await oldmemo.etree.parse_message( + xml_tools.domish_elt_2_et_elt(encrypted_elt, False), + session["peer_jid"].userhost(), + client.jid.userhost(), + session_manager + ) + __, __, plain_key_material = await session_manager.decrypt(message) + except Exception as e: + log.warning(f"Can't get IV and key: {e}\n{security_elt.toXml()}") + continue + try: + content_data["encryption"] = { + "cipher": security_elt["cipher"], + "type": security_elt["type"], + "iv": message.content.initialization_vector, + "key": plain_key_material.key + } + except KeyError as e: + log.warning(f"missing data, can't decrypt: {e}") + continue + + return True + + def __encrypt( + self, + data: bytes, + encryptor: CipherContext, + data_cb: Callable + ) -> bytes: + data_cb(data) + if data: + return encryptor.update(data) + else: + try: + return encryptor.finalize() + encryptor.tag + except AlreadyFinalized: + return b'' + + def __decrypt( + self, + data: bytes, + buffer: list[bytes], + decryptor: CipherContext, + data_cb: Callable + ) -> bytes: + buffer.append(data) + data = b''.join(buffer) + buffer.clear() + if len(data) > 16: + decrypted = decryptor.update(data[:-16]) + data_cb(decrypted) + else: + decrypted = b'' + buffer.append(data[-16:]) + return decrypted + + def __decrypt_finalize( + self, + file_obj: io.BytesIO, + buffer: list[bytes], + decryptor: CipherContext, + ) -> None: + tag = b''.join(buffer) + file_obj.write(decryptor.finalize_with_tag(tag)) + + async def _add_encryption_filter( + self, + client: SatXMPPEntity, + session: Dict[str, Any], + content_data: Dict[str, Any], + elt: domish.Element + ) -> bool: + file_obj = content_data["stream_object"].file_obj + try: + encryption_data=content_data["encryption"] + except KeyError: + return True + cipher = ciphers.Cipher( + ciphers.algorithms.AES(encryption_data["key"]), + modes.GCM(encryption_data["iv"]), + backend=backends.default_backend(), + ) + if file_obj.mode == "wb": + # we are receiving a file + buffer = [] + decryptor = cipher.decryptor() + file_obj.pre_close_cb = partial( + self.__decrypt_finalize, + file_obj=file_obj, + buffer=buffer, + decryptor=decryptor + ) + file_obj.data_cb = partial( + self.__decrypt, + buffer=buffer, + decryptor=decryptor, + data_cb=file_obj.data_cb + ) + else: + # we are sending a file + file_obj.data_cb = partial( + self.__encrypt, + encryptor=cipher.encryptor(), + data_cb=file_obj.data_cb + ) + + return True + + +@implementer(iwokkel.IDisco) +class JET_Handler(xmlstream.XMPPHandler): + + def getDiscoInfo(self, requestor, service, nodeIdentifier=""): + return [ + disco.DiscoFeature(NS_JET), + disco.DiscoFeature(NS_JET_OMEMO), + ] + + def getDiscoItems(self, requestor, service, nodeIdentifier=""): + return []