# HG changeset patch # User Goffi # Date 1736727802 -3600 # Node ID 62746042e6d9c5dff8c48d2f22365c0e43ea9752 # Parent 07e87adb2f65552f864f890e94183d8ce18a41ee plugin gre encrypter: implement GRE Encrypter: OpenPGP: rel 455 diff -r 07e87adb2f65 -r 62746042e6d9 libervia/backend/plugins/plugin_sec_gre_encrypter_openpgp.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_sec_gre_encrypter_openpgp.py Mon Jan 13 01:23:22 2025 +0100 @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 + +# Libervia plugin +# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import base64 +from typing import Final, TYPE_CHECKING, cast + +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import data_form, disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.plugins import plugin_xep_0373 +from .plugin_exp_gre import Encrypter + +if TYPE_CHECKING: + from libervia.backend.core.main import LiberviaBackend + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "GRE Encrypter: OpenPGP", + C.PI_IMPORT_NAME: "GRE-OpenPGP", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: [ + "GRE", + ], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "GREEncrypterOpenPGP", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("Handle MIME formatting for Gateway Relayed Encryption."), +} + +NS_GRE_OPENPGP: Final = "urn:xmpp:gre:encrypter:openpgp:0" + + +class GREEncrypterOpenPGP(Encrypter): + name = "openpgp" + namespace = NS_GRE_OPENPGP + + def __init__(self, host: "LiberviaBackend") -> None: + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + super().__init__(host) + host.register_namespace("gre-openpgp", NS_GRE_OPENPGP) + + def get_handler(self, client: SatXMPPEntity) -> XMPPHandler: + return GREMIMEHandler(self) + + async def encrypt( + self, + client: SatXMPPEntity, + recipient_id: str, + message_elt: domish.Element, + formatted_payload: bytes, + encryption_data_form: data_form.Form, + ) -> str: + gpg_provider = plugin_xep_0373.get_gpg_provider(self.host, client) + public_keys = gpg_provider.list_public_keys(recipient_id) + if not public_keys: + raise exceptions.NotFound( + f"No public keys found for {recipient_id!r}, we can't encrypt." + ) + encrypted_data = gpg_provider.encrypt( + formatted_payload, public_keys + ) + return base64.b64encode(encrypted_data).decode("ASCII") + + +@implementer(iwokkel.IDisco) +class GREMIMEHandler(XMPPHandler): + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def getDiscoInfo( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoFeature]: + return [ + disco.DiscoFeature(NS_GRE_OPENPGP), + ] + + def getDiscoItems( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoItems]: + return [] diff -r 07e87adb2f65 -r 62746042e6d9 libervia/backend/plugins/plugin_xep_0373.py --- a/libervia/backend/plugins/plugin_xep_0373.py Mon Jan 13 01:23:22 2025 +0100 +++ b/libervia/backend/plugins/plugin_xep_0373.py Mon Jan 13 01:23:22 2025 +0100 @@ -20,7 +20,6 @@ import base64 from datetime import datetime, timezone import enum -import json import secrets import string from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast @@ -74,7 +73,6 @@ "GPGSecretKey", "GPGProvider", "PublicKeyMetadata", - "gpg_provider", "TrustLevel", ] @@ -103,6 +101,61 @@ STR_KEY_PUBLIC_KEYS_METADATA = "/public-keys-metadata/{}" +def crc24(data: bytes) -> int: + """Compute the CRC-24 checksum for the given data. + + @param data: The binary data to compute the checksum for. + @return: The 24-bit CRC checksum. + """ + crc = 0xB704CE + for byte in data: + crc ^= byte << 16 + for _ in range(8): + crc <<= 1 + if crc & 0x1000000: + crc ^= 0x1864CFB + return crc & 0xFFFFFF + + +def binary_to_ascii_armor( + data: bytes, + armor_type: str = "MESSAGE", + headers: dict|None = None, + line_length: int = 76 +) -> str: + """Convert binary data to OpenPGP ASCII Armor format. + + @param data: The binary data to encode. + @param armor_type: The type of armor (e.g., "MESSAGE", "PUBLIC KEY BLOCK"). + @param headers: Optional dictionary of headers to include in the armor. + @param line_length: Maximum length of each line. + @return: The ASCII Armor encoded string. + """ + encoded_data = base64.b64encode(data).decode('ascii') + + encoded_lines = [ + encoded_data[i:i+line_length] for i in range(0, len(encoded_data), line_length) + ] + + checksum = crc24(data) + checksum_str = base64.b64encode(checksum.to_bytes(3, 'big')).decode('ascii') + + header_lines = [] + if headers: + for key, value in headers.items(): + header_lines.append(f"{key}: {value}") + + armor = [] + armor.append(f"-----BEGIN PGP {armor_type}-----") + armor.extend(header_lines) + armor.append("") + armor.extend(encoded_lines) + armor.append(f"={checksum_str}") + armor.append(f"-----END PGP {armor_type}-----") + + return '\n'.join(armor) + + class VerificationError(Exception): """ Raised by verifying methods of :class:`XEP_0373` on semantical verification errors. @@ -688,7 +741,9 @@ sign = signing_keys is not None - with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: + kwargs = {"home_dir": self.__home_dir, "signers": signers} + + with gpg.Context(**kwargs) as c: try: ciphertext, __, __ = c.encrypt( plaintext, @@ -997,7 +1052,7 @@ """ -def get_gpg_provider(sat: LiberviaBackend, client: SatXMPPClient) -> GPGProvider: +def get_gpg_provider(sat: LiberviaBackend, client: SatXMPPEntity) -> GPGProvider: """Get the GPG provider for a client. @param sat: The SAT instance.