Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0373.py @ 4346:62746042e6d9
plugin gre encrypter: implement GRE Encrypter: OpenPGP:
rel 455
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 13 Jan 2025 01:23:22 +0100 |
parents | 111dce64dcb5 |
children |
comparison
equal
deleted
inserted
replaced
4345:07e87adb2f65 | 4346:62746042e6d9 |
---|---|
18 | 18 |
19 from abc import ABC, abstractmethod | 19 from abc import ABC, abstractmethod |
20 import base64 | 20 import base64 |
21 from datetime import datetime, timezone | 21 from datetime import datetime, timezone |
22 import enum | 22 import enum |
23 import json | |
24 import secrets | 23 import secrets |
25 import string | 24 import string |
26 from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast | 25 from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast |
27 from xml.sax.saxutils import quoteattr | 26 from xml.sax.saxutils import quoteattr |
28 | 27 |
72 "GPGProviderError", | 71 "GPGProviderError", |
73 "GPGPublicKey", | 72 "GPGPublicKey", |
74 "GPGSecretKey", | 73 "GPGSecretKey", |
75 "GPGProvider", | 74 "GPGProvider", |
76 "PublicKeyMetadata", | 75 "PublicKeyMetadata", |
77 "gpg_provider", | |
78 "TrustLevel", | 76 "TrustLevel", |
79 ] | 77 ] |
80 | 78 |
81 | 79 |
82 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] | 80 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call] |
99 | 97 |
100 | 98 |
101 PARAM_CATEGORY = "Security" | 99 PARAM_CATEGORY = "Security" |
102 PARAM_NAME = "ox_policy" | 100 PARAM_NAME = "ox_policy" |
103 STR_KEY_PUBLIC_KEYS_METADATA = "/public-keys-metadata/{}" | 101 STR_KEY_PUBLIC_KEYS_METADATA = "/public-keys-metadata/{}" |
102 | |
103 | |
104 def crc24(data: bytes) -> int: | |
105 """Compute the CRC-24 checksum for the given data. | |
106 | |
107 @param data: The binary data to compute the checksum for. | |
108 @return: The 24-bit CRC checksum. | |
109 """ | |
110 crc = 0xB704CE | |
111 for byte in data: | |
112 crc ^= byte << 16 | |
113 for _ in range(8): | |
114 crc <<= 1 | |
115 if crc & 0x1000000: | |
116 crc ^= 0x1864CFB | |
117 return crc & 0xFFFFFF | |
118 | |
119 | |
120 def binary_to_ascii_armor( | |
121 data: bytes, | |
122 armor_type: str = "MESSAGE", | |
123 headers: dict|None = None, | |
124 line_length: int = 76 | |
125 ) -> str: | |
126 """Convert binary data to OpenPGP ASCII Armor format. | |
127 | |
128 @param data: The binary data to encode. | |
129 @param armor_type: The type of armor (e.g., "MESSAGE", "PUBLIC KEY BLOCK"). | |
130 @param headers: Optional dictionary of headers to include in the armor. | |
131 @param line_length: Maximum length of each line. | |
132 @return: The ASCII Armor encoded string. | |
133 """ | |
134 encoded_data = base64.b64encode(data).decode('ascii') | |
135 | |
136 encoded_lines = [ | |
137 encoded_data[i:i+line_length] for i in range(0, len(encoded_data), line_length) | |
138 ] | |
139 | |
140 checksum = crc24(data) | |
141 checksum_str = base64.b64encode(checksum.to_bytes(3, 'big')).decode('ascii') | |
142 | |
143 header_lines = [] | |
144 if headers: | |
145 for key, value in headers.items(): | |
146 header_lines.append(f"{key}: {value}") | |
147 | |
148 armor = [] | |
149 armor.append(f"-----BEGIN PGP {armor_type}-----") | |
150 armor.extend(header_lines) | |
151 armor.append("") | |
152 armor.extend(encoded_lines) | |
153 armor.append(f"={checksum_str}") | |
154 armor.append(f"-----END PGP {armor_type}-----") | |
155 | |
156 return '\n'.join(armor) | |
104 | 157 |
105 | 158 |
106 class VerificationError(Exception): | 159 class VerificationError(Exception): |
107 """ | 160 """ |
108 Raised by verifying methods of :class:`XEP_0373` on semantical verification errors. | 161 Raised by verifying methods of :class:`XEP_0373` on semantical verification errors. |
686 | 739 |
687 signers.append(secret_key.public_key.key_obj) | 740 signers.append(secret_key.public_key.key_obj) |
688 | 741 |
689 sign = signing_keys is not None | 742 sign = signing_keys is not None |
690 | 743 |
691 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c: | 744 kwargs = {"home_dir": self.__home_dir, "signers": signers} |
745 | |
746 with gpg.Context(**kwargs) as c: | |
692 try: | 747 try: |
693 ciphertext, __, __ = c.encrypt( | 748 ciphertext, __, __ = c.encrypt( |
694 plaintext, | 749 plaintext, |
695 recipients=recipients, | 750 recipients=recipients, |
696 sign=sign, | 751 sign=sign, |
995 </individual> | 1050 </individual> |
996 </params> | 1051 </params> |
997 """ | 1052 """ |
998 | 1053 |
999 | 1054 |
1000 def get_gpg_provider(sat: LiberviaBackend, client: SatXMPPClient) -> GPGProvider: | 1055 def get_gpg_provider(sat: LiberviaBackend, client: SatXMPPEntity) -> GPGProvider: |
1001 """Get the GPG provider for a client. | 1056 """Get the GPG provider for a client. |
1002 | 1057 |
1003 @param sat: The SAT instance. | 1058 @param sat: The SAT instance. |
1004 @param client: The client. | 1059 @param client: The client. |
1005 @return: The GPG provider specifically for that client. | 1060 @return: The GPG provider specifically for that client. |