Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0448.py @ 4334:111dce64dcb5
plugins XEP-0300, XEP-0446, XEP-0447, XEP0448 and others: Refactoring to use Pydantic:
Pydantic models are used more and more in Libervia, for the bridge API, and also to
convert `domish.Element` to internal representation.
Type hints have also been added in many places.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:12:38 +0100 |
parents | 0d7bb4df2343 |
children |
comparison
equal
deleted
inserted
replaced
4333:e94799a0908f | 4334:111dce64dcb5 |
---|---|
19 import base64 | 19 import base64 |
20 from functools import partial | 20 from functools import partial |
21 from pathlib import Path | 21 from pathlib import Path |
22 import secrets | 22 import secrets |
23 from textwrap import dedent | 23 from textwrap import dedent |
24 from typing import Any, Dict, Optional, Tuple, Union | 24 from typing import Any, Dict, Optional, Self, Tuple, Union, cast |
25 | 25 |
26 from cryptography.exceptions import AlreadyFinalized | 26 from cryptography.exceptions import AlreadyFinalized |
27 from cryptography.hazmat import backends | 27 from cryptography.hazmat import backends |
28 from cryptography.hazmat.primitives import ciphers | 28 from cryptography.hazmat.primitives import ciphers |
29 from cryptography.hazmat.primitives.ciphers import CipherContext, modes | 29 from cryptography.hazmat.primitives.ciphers import CipherContext, modes |
30 from cryptography.hazmat.primitives.padding import PKCS7, PaddingContext | 30 from cryptography.hazmat.primitives.padding import PKCS7, PaddingContext |
31 from pydantic import BaseModel, ValidationError | |
31 import treq | 32 import treq |
32 from twisted.internet import defer | 33 from twisted.internet import defer |
33 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | 34 from twisted.words.protocols.jabber.xmlstream import XMPPHandler |
34 from twisted.words.xish import domish | 35 from twisted.words.xish import domish |
35 from wokkel import disco, iwokkel | 36 from wokkel import disco, iwokkel |
38 from libervia.backend.core import exceptions | 39 from libervia.backend.core import exceptions |
39 from libervia.backend.core.constants import Const as C | 40 from libervia.backend.core.constants import Const as C |
40 from libervia.backend.core.core_types import SatXMPPEntity | 41 from libervia.backend.core.core_types import SatXMPPEntity |
41 from libervia.backend.core.i18n import _ | 42 from libervia.backend.core.i18n import _ |
42 from libervia.backend.core.log import getLogger | 43 from libervia.backend.core.log import getLogger |
44 from libervia.backend.plugins.plugin_misc_download import DownloadPlugin | |
45 from libervia.backend.plugins.plugin_xep_0103 import XEP_0103 | |
46 from libervia.backend.plugins.plugin_xep_0300 import NS_HASHES, XEP_0300, Hash | |
47 from libervia.backend.plugins.plugin_xep_0447 import XEP_0447, Source | |
43 from libervia.backend.tools import stream | 48 from libervia.backend.tools import stream |
44 from libervia.backend.tools.web import treq_client_no_ssl | 49 from libervia.backend.tools.web import treq_client_no_ssl |
45 | 50 |
46 log = getLogger(__name__) | 51 log = getLogger(__name__) |
47 | 52 |
77 NS_AES_128_GCM = "urn:xmpp:ciphers:aes-128-gcm-nopadding:0" | 82 NS_AES_128_GCM = "urn:xmpp:ciphers:aes-128-gcm-nopadding:0" |
78 NS_AES_256_GCM = "urn:xmpp:ciphers:aes-256-gcm-nopadding:0" | 83 NS_AES_256_GCM = "urn:xmpp:ciphers:aes-256-gcm-nopadding:0" |
79 NS_AES_256_CBC = "urn:xmpp:ciphers:aes-256-cbc-pkcs7:0" | 84 NS_AES_256_CBC = "urn:xmpp:ciphers:aes-256-cbc-pkcs7:0" |
80 | 85 |
81 | 86 |
87 class EncryptedSource(Source): | |
88 type = "encrypted" | |
89 encrypted = True | |
90 cipher: str | |
91 key: str | |
92 iv: str | |
93 hashes: list[Hash] | |
94 sources: list[Source] | |
95 _hash: XEP_0300 | None = None | |
96 _sfs: XEP_0447 | None = None | |
97 | |
98 @classmethod | |
99 def from_element(cls, element: domish.Element) -> Self: | |
100 """Parse an <encrypted> element and return corresponding EncryptedData model | |
101 | |
102 @param encrypted_elt: element to parse | |
103 @raise exceptions.DataError: the element is invalid | |
104 | |
105 """ | |
106 assert cls._hash is not None, "_hash attribute is not set" | |
107 assert cls._sfs is not None, "_sfs attribute is not set" | |
108 try: | |
109 cipher = element["cipher"] | |
110 key = str(next(element.elements(NS_ESFS, "key"))) | |
111 iv = str(next(element.elements(NS_ESFS, "iv"))) | |
112 except (KeyError, StopIteration): | |
113 raise exceptions.DataError( | |
114 "invalid <encrypted/> element: {encrypted_elt.toXml()}" | |
115 ) | |
116 sources = cls._sfs.parse_sources_elt(element) | |
117 if not sources: | |
118 raise exceptions.DataError(f"Sources are missing in {element.toXml()}") | |
119 | |
120 if any(isinstance(source, cls) for source in sources): | |
121 raise exceptions.DataError( | |
122 f"EncryptedData is used as a source of another EncryptedData" | |
123 ) | |
124 | |
125 encrypted_data = { | |
126 "cipher": cipher, | |
127 "key": key, | |
128 "iv": iv, | |
129 "hashes": Hash.from_parent(element), | |
130 "sources": sources, | |
131 } | |
132 | |
133 return cls(**encrypted_data) | |
134 | |
135 def to_element(self) -> domish.Element: | |
136 """Convert EncryptedData model to an <encrypted> element | |
137 | |
138 @return: domish.Element representing the encrypted data | |
139 | |
140 """ | |
141 assert self._hash is not None, "_hash attribute is not set" | |
142 encrypted_elt = domish.Element((NS_ESFS, "encrypted")) | |
143 encrypted_elt["cipher"] = self.cipher | |
144 encrypted_elt.addElement("key").addContent(self.key) | |
145 encrypted_elt.addElement("iv").addContent(self.iv) | |
146 for hash_ in self.hashes: | |
147 encrypted_elt.addChild(hash_.to_element()) | |
148 | |
149 return encrypted_elt | |
150 | |
151 | |
82 class XEP_0448: | 152 class XEP_0448: |
83 | 153 |
84 def __init__(self, host): | 154 def __init__(self, host): |
85 self.host = host | 155 self.host = host |
86 log.info(_("XEP_0448 plugin initialization")) | 156 log.info(_("XEP_0448 plugin initialization")) |
87 host.register_namespace("esfs", NS_ESFS) | 157 host.register_namespace("esfs", NS_ESFS) |
88 self._u = host.plugins["XEP-0103"] | 158 self._u = cast(XEP_0103, host.plugins["XEP-0103"]) |
89 self._h = host.plugins["XEP-0300"] | 159 self._h = cast(XEP_0300, host.plugins["XEP-0300"]) |
90 self._hints = host.plugins["XEP-0334"] | 160 self._hints = host.plugins["XEP-0334"] |
91 self._http_upload = host.plugins["XEP-0363"] | 161 self._http_upload = host.plugins["XEP-0363"] |
92 self._o = host.plugins["XEP-0384"] | 162 self._o = host.plugins["XEP-0384"] |
93 self._sfs = host.plugins["XEP-0447"] | 163 self._sfs = cast(XEP_0447, host.plugins["XEP-0447"]) |
94 self._sfs.register_source_handler( | 164 self._sfs.register_source(NS_ESFS, "encrypted", EncryptedSource) |
95 NS_ESFS, "encrypted", self.parse_encrypted_elt, encrypted=True | |
96 ) | |
97 self._attach = host.plugins["ATTACH"] | 165 self._attach = host.plugins["ATTACH"] |
98 self._attach.register( | 166 self._attach.register( |
99 self.can_handle_attachment, self.attach, encrypted=True, priority=1000 | 167 self.can_handle_attachment, self.attach, encrypted=True, priority=1000 |
100 ) | 168 ) |
101 host.plugins["DOWNLOAD"].register_download_handler(NS_ESFS, self.download) | 169 EncryptedSource._hash = self._h |
170 EncryptedSource._sfs = self._sfs | |
171 download = cast(DownloadPlugin, host.plugins["DOWNLOAD"]) | |
172 download.register_download_handler(NS_ESFS, self.download) | |
102 host.trigger.add("XEP-0363_upload_pre_slot", self._upload_pre_slot) | 173 host.trigger.add("XEP-0363_upload_pre_slot", self._upload_pre_slot) |
103 host.trigger.add("XEP-0363_upload", self._upload_trigger) | 174 host.trigger.add("XEP-0363_upload", self._upload_trigger) |
104 | 175 |
105 def get_handler(self, client): | 176 def get_handler(self, client): |
106 return XEP0448Handler() | 177 return XEP0448Handler() |
107 | |
108 def parse_encrypted_elt(self, encrypted_elt: domish.Element) -> Dict[str, Any]: | |
109 """Parse an <encrypted> element and return corresponding source data | |
110 | |
111 @param encrypted_elt: element to parse | |
112 @raise exceptions.DataError: the element is invalid | |
113 | |
114 """ | |
115 sources = self._sfs.parse_sources_elt(encrypted_elt) | |
116 if not sources: | |
117 raise exceptions.NotFound("sources are missing in {encrypted_elt.toXml()}") | |
118 if len(sources) > 1: | |
119 log.debug( | |
120 "more that one sources has been found, this is not expected, only the " | |
121 "first one will be used" | |
122 ) | |
123 source = sources[0] | |
124 source["type"] = NS_ESFS | |
125 try: | |
126 encrypted_data = source["encrypted_data"] = { | |
127 "cipher": encrypted_elt["cipher"], | |
128 "key": str(next(encrypted_elt.elements(NS_ESFS, "key"))), | |
129 "iv": str(next(encrypted_elt.elements(NS_ESFS, "iv"))), | |
130 } | |
131 except (KeyError, StopIteration): | |
132 raise exceptions.DataError( | |
133 "invalid <encrypted/> element: {encrypted_elt.toXml()}" | |
134 ) | |
135 try: | |
136 hash_algo, hash_value = self._h.parse_hash_elt(encrypted_elt) | |
137 except exceptions.NotFound: | |
138 pass | |
139 else: | |
140 encrypted_data["hash_algo"] = hash_algo | |
141 encrypted_data["hash"] = base64.b64encode(hash_value.encode()).decode() | |
142 return source | |
143 | 178 |
144 async def download( | 179 async def download( |
145 self, | 180 self, |
146 client: SatXMPPEntity, | 181 client: SatXMPPEntity, |
147 attachment: Dict[str, Any], | 182 attachment: dict[str, Any], |
148 source: Dict[str, Any], | 183 source: dict[str, Any], |
149 dest_path: Union[Path, str], | 184 dest_path: Union[Path, str], |
150 extra: Optional[Dict[str, Any]] = None, | 185 extra: dict[str, Any] | None = None, |
151 ) -> Tuple[str, defer.Deferred]: | 186 ) -> tuple[str, defer.Deferred]: |
152 # TODO: check hash | 187 # TODO: check hash |
153 if extra is None: | 188 if extra is None: |
154 extra = {} | 189 extra = {} |
190 assert source["type"] == "encrypted" | |
155 try: | 191 try: |
156 encrypted_data = source["encrypted_data"] | 192 cipher = source["cipher"] |
157 cipher = encrypted_data["cipher"] | 193 iv = base64.b64decode(source["iv"]) |
158 iv = base64.b64decode(encrypted_data["iv"]) | 194 key = base64.b64decode(source["key"]) |
159 key = base64.b64decode(encrypted_data["key"]) | |
160 except KeyError as e: | 195 except KeyError as e: |
161 raise ValueError(f"{source} has incomplete encryption data: {e}") | 196 raise ValueError(f"{source} has incomplete encryption data: {e}") from e |
197 | |
162 try: | 198 try: |
163 download_url = source["url"] | 199 download_url = source["sources"][0]["url"] |
164 except KeyError: | 200 except (IndexError, KeyError) as e: |
165 raise ValueError(f"{source} has missing URL") | 201 raise ValueError(f"{source} has missing URL") from e |
166 | 202 |
167 if extra.get("ignore_tls_errors", False): | 203 if extra.get("ignore_tls_errors", False): |
168 log.warning("TLS certificate check disabled, this is highly insecure") | 204 log.warning("TLS certificate check disabled, this is highly insecure") |
169 treq_client = treq_client_no_ssl | 205 treq_client = treq_client_no_ssl |
170 else: | 206 else: |
292 [], | 328 [], |
293 name=attachment["filename"], | 329 name=attachment["filename"], |
294 size=attachment["size"], | 330 size=attachment["size"], |
295 file_hash=file_hash, | 331 file_hash=file_hash, |
296 ) | 332 ) |
297 encrypted_elt = file_sharing_elt.sources.addElement( | 333 sources_elt = file_sharing_elt.sources |
298 (NS_ESFS, "encrypted") | 334 assert sources_elt is not None |
299 ) | 335 encrypted_elt = sources_elt.addElement((NS_ESFS, "encrypted")) |
300 encrypted_elt["cipher"] = NS_AES_256_GCM | 336 encrypted_elt["cipher"] = NS_AES_256_GCM |
301 encrypted_elt.addElement( | 337 encrypted_elt.addElement( |
302 "key", content=base64.b64encode(encryption_data["key"]).decode() | 338 "key", content=base64.b64encode(encryption_data["key"]).decode() |
303 ) | 339 ) |
304 encrypted_elt.addElement( | 340 encrypted_elt.addElement( |
309 attachment["encrypted_hash"], attachment["encrypted_hash_algo"] | 345 attachment["encrypted_hash"], attachment["encrypted_hash_algo"] |
310 ) | 346 ) |
311 ) | 347 ) |
312 encrypted_elt.addChild( | 348 encrypted_elt.addChild( |
313 self._sfs.get_sources_elt( | 349 self._sfs.get_sources_elt( |
314 [self._u.get_url_data_elt(attachment["url"])] | 350 [self._u.generate_url_data(attachment["url"]).to_element()] |
315 ) | 351 ) |
316 ) | 352 ) |
317 data["xml"].addChild(file_sharing_elt) | 353 data["xml"].addChild(file_sharing_elt) |
318 | 354 |
319 for attachment in extra_attachments: | 355 for attachment in extra_attachments: |