comparison libervia/backend/plugins/plugin_xep_0448.py @ 4334:111dce64dcb5

plugins XEP-0300, XEP-0446, XEP-0447, XEP0448 and others: Refactoring to use Pydantic: Pydantic models are used more and more in Libervia, for the bridge API, and also to convert `domish.Element` to internal representation. Type hints have also been added in many places. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:12:38 +0100
parents 0d7bb4df2343
children
comparison
equal deleted inserted replaced
4333:e94799a0908f 4334:111dce64dcb5
19 import base64 19 import base64
20 from functools import partial 20 from functools import partial
21 from pathlib import Path 21 from pathlib import Path
22 import secrets 22 import secrets
23 from textwrap import dedent 23 from textwrap import dedent
24 from typing import Any, Dict, Optional, Tuple, Union 24 from typing import Any, Dict, Optional, Self, Tuple, Union, cast
25 25
26 from cryptography.exceptions import AlreadyFinalized 26 from cryptography.exceptions import AlreadyFinalized
27 from cryptography.hazmat import backends 27 from cryptography.hazmat import backends
28 from cryptography.hazmat.primitives import ciphers 28 from cryptography.hazmat.primitives import ciphers
29 from cryptography.hazmat.primitives.ciphers import CipherContext, modes 29 from cryptography.hazmat.primitives.ciphers import CipherContext, modes
30 from cryptography.hazmat.primitives.padding import PKCS7, PaddingContext 30 from cryptography.hazmat.primitives.padding import PKCS7, PaddingContext
31 from pydantic import BaseModel, ValidationError
31 import treq 32 import treq
32 from twisted.internet import defer 33 from twisted.internet import defer
33 from twisted.words.protocols.jabber.xmlstream import XMPPHandler 34 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
34 from twisted.words.xish import domish 35 from twisted.words.xish import domish
35 from wokkel import disco, iwokkel 36 from wokkel import disco, iwokkel
38 from libervia.backend.core import exceptions 39 from libervia.backend.core import exceptions
39 from libervia.backend.core.constants import Const as C 40 from libervia.backend.core.constants import Const as C
40 from libervia.backend.core.core_types import SatXMPPEntity 41 from libervia.backend.core.core_types import SatXMPPEntity
41 from libervia.backend.core.i18n import _ 42 from libervia.backend.core.i18n import _
42 from libervia.backend.core.log import getLogger 43 from libervia.backend.core.log import getLogger
44 from libervia.backend.plugins.plugin_misc_download import DownloadPlugin
45 from libervia.backend.plugins.plugin_xep_0103 import XEP_0103
46 from libervia.backend.plugins.plugin_xep_0300 import NS_HASHES, XEP_0300, Hash
47 from libervia.backend.plugins.plugin_xep_0447 import XEP_0447, Source
43 from libervia.backend.tools import stream 48 from libervia.backend.tools import stream
44 from libervia.backend.tools.web import treq_client_no_ssl 49 from libervia.backend.tools.web import treq_client_no_ssl
45 50
46 log = getLogger(__name__) 51 log = getLogger(__name__)
47 52
77 NS_AES_128_GCM = "urn:xmpp:ciphers:aes-128-gcm-nopadding:0" 82 NS_AES_128_GCM = "urn:xmpp:ciphers:aes-128-gcm-nopadding:0"
78 NS_AES_256_GCM = "urn:xmpp:ciphers:aes-256-gcm-nopadding:0" 83 NS_AES_256_GCM = "urn:xmpp:ciphers:aes-256-gcm-nopadding:0"
79 NS_AES_256_CBC = "urn:xmpp:ciphers:aes-256-cbc-pkcs7:0" 84 NS_AES_256_CBC = "urn:xmpp:ciphers:aes-256-cbc-pkcs7:0"
80 85
81 86
87 class EncryptedSource(Source):
88 type = "encrypted"
89 encrypted = True
90 cipher: str
91 key: str
92 iv: str
93 hashes: list[Hash]
94 sources: list[Source]
95 _hash: XEP_0300 | None = None
96 _sfs: XEP_0447 | None = None
97
98 @classmethod
99 def from_element(cls, element: domish.Element) -> Self:
100 """Parse an <encrypted> element and return corresponding EncryptedData model
101
102 @param encrypted_elt: element to parse
103 @raise exceptions.DataError: the element is invalid
104
105 """
106 assert cls._hash is not None, "_hash attribute is not set"
107 assert cls._sfs is not None, "_sfs attribute is not set"
108 try:
109 cipher = element["cipher"]
110 key = str(next(element.elements(NS_ESFS, "key")))
111 iv = str(next(element.elements(NS_ESFS, "iv")))
112 except (KeyError, StopIteration):
113 raise exceptions.DataError(
114 "invalid <encrypted/> element: {encrypted_elt.toXml()}"
115 )
116 sources = cls._sfs.parse_sources_elt(element)
117 if not sources:
118 raise exceptions.DataError(f"Sources are missing in {element.toXml()}")
119
120 if any(isinstance(source, cls) for source in sources):
121 raise exceptions.DataError(
122 f"EncryptedData is used as a source of another EncryptedData"
123 )
124
125 encrypted_data = {
126 "cipher": cipher,
127 "key": key,
128 "iv": iv,
129 "hashes": Hash.from_parent(element),
130 "sources": sources,
131 }
132
133 return cls(**encrypted_data)
134
135 def to_element(self) -> domish.Element:
136 """Convert EncryptedData model to an <encrypted> element
137
138 @return: domish.Element representing the encrypted data
139
140 """
141 assert self._hash is not None, "_hash attribute is not set"
142 encrypted_elt = domish.Element((NS_ESFS, "encrypted"))
143 encrypted_elt["cipher"] = self.cipher
144 encrypted_elt.addElement("key").addContent(self.key)
145 encrypted_elt.addElement("iv").addContent(self.iv)
146 for hash_ in self.hashes:
147 encrypted_elt.addChild(hash_.to_element())
148
149 return encrypted_elt
150
151
82 class XEP_0448: 152 class XEP_0448:
83 153
84 def __init__(self, host): 154 def __init__(self, host):
85 self.host = host 155 self.host = host
86 log.info(_("XEP_0448 plugin initialization")) 156 log.info(_("XEP_0448 plugin initialization"))
87 host.register_namespace("esfs", NS_ESFS) 157 host.register_namespace("esfs", NS_ESFS)
88 self._u = host.plugins["XEP-0103"] 158 self._u = cast(XEP_0103, host.plugins["XEP-0103"])
89 self._h = host.plugins["XEP-0300"] 159 self._h = cast(XEP_0300, host.plugins["XEP-0300"])
90 self._hints = host.plugins["XEP-0334"] 160 self._hints = host.plugins["XEP-0334"]
91 self._http_upload = host.plugins["XEP-0363"] 161 self._http_upload = host.plugins["XEP-0363"]
92 self._o = host.plugins["XEP-0384"] 162 self._o = host.plugins["XEP-0384"]
93 self._sfs = host.plugins["XEP-0447"] 163 self._sfs = cast(XEP_0447, host.plugins["XEP-0447"])
94 self._sfs.register_source_handler( 164 self._sfs.register_source(NS_ESFS, "encrypted", EncryptedSource)
95 NS_ESFS, "encrypted", self.parse_encrypted_elt, encrypted=True
96 )
97 self._attach = host.plugins["ATTACH"] 165 self._attach = host.plugins["ATTACH"]
98 self._attach.register( 166 self._attach.register(
99 self.can_handle_attachment, self.attach, encrypted=True, priority=1000 167 self.can_handle_attachment, self.attach, encrypted=True, priority=1000
100 ) 168 )
101 host.plugins["DOWNLOAD"].register_download_handler(NS_ESFS, self.download) 169 EncryptedSource._hash = self._h
170 EncryptedSource._sfs = self._sfs
171 download = cast(DownloadPlugin, host.plugins["DOWNLOAD"])
172 download.register_download_handler(NS_ESFS, self.download)
102 host.trigger.add("XEP-0363_upload_pre_slot", self._upload_pre_slot) 173 host.trigger.add("XEP-0363_upload_pre_slot", self._upload_pre_slot)
103 host.trigger.add("XEP-0363_upload", self._upload_trigger) 174 host.trigger.add("XEP-0363_upload", self._upload_trigger)
104 175
105 def get_handler(self, client): 176 def get_handler(self, client):
106 return XEP0448Handler() 177 return XEP0448Handler()
107
108 def parse_encrypted_elt(self, encrypted_elt: domish.Element) -> Dict[str, Any]:
109 """Parse an <encrypted> element and return corresponding source data
110
111 @param encrypted_elt: element to parse
112 @raise exceptions.DataError: the element is invalid
113
114 """
115 sources = self._sfs.parse_sources_elt(encrypted_elt)
116 if not sources:
117 raise exceptions.NotFound("sources are missing in {encrypted_elt.toXml()}")
118 if len(sources) > 1:
119 log.debug(
120 "more that one sources has been found, this is not expected, only the "
121 "first one will be used"
122 )
123 source = sources[0]
124 source["type"] = NS_ESFS
125 try:
126 encrypted_data = source["encrypted_data"] = {
127 "cipher": encrypted_elt["cipher"],
128 "key": str(next(encrypted_elt.elements(NS_ESFS, "key"))),
129 "iv": str(next(encrypted_elt.elements(NS_ESFS, "iv"))),
130 }
131 except (KeyError, StopIteration):
132 raise exceptions.DataError(
133 "invalid <encrypted/> element: {encrypted_elt.toXml()}"
134 )
135 try:
136 hash_algo, hash_value = self._h.parse_hash_elt(encrypted_elt)
137 except exceptions.NotFound:
138 pass
139 else:
140 encrypted_data["hash_algo"] = hash_algo
141 encrypted_data["hash"] = base64.b64encode(hash_value.encode()).decode()
142 return source
143 178
144 async def download( 179 async def download(
145 self, 180 self,
146 client: SatXMPPEntity, 181 client: SatXMPPEntity,
147 attachment: Dict[str, Any], 182 attachment: dict[str, Any],
148 source: Dict[str, Any], 183 source: dict[str, Any],
149 dest_path: Union[Path, str], 184 dest_path: Union[Path, str],
150 extra: Optional[Dict[str, Any]] = None, 185 extra: dict[str, Any] | None = None,
151 ) -> Tuple[str, defer.Deferred]: 186 ) -> tuple[str, defer.Deferred]:
152 # TODO: check hash 187 # TODO: check hash
153 if extra is None: 188 if extra is None:
154 extra = {} 189 extra = {}
190 assert source["type"] == "encrypted"
155 try: 191 try:
156 encrypted_data = source["encrypted_data"] 192 cipher = source["cipher"]
157 cipher = encrypted_data["cipher"] 193 iv = base64.b64decode(source["iv"])
158 iv = base64.b64decode(encrypted_data["iv"]) 194 key = base64.b64decode(source["key"])
159 key = base64.b64decode(encrypted_data["key"])
160 except KeyError as e: 195 except KeyError as e:
161 raise ValueError(f"{source} has incomplete encryption data: {e}") 196 raise ValueError(f"{source} has incomplete encryption data: {e}") from e
197
162 try: 198 try:
163 download_url = source["url"] 199 download_url = source["sources"][0]["url"]
164 except KeyError: 200 except (IndexError, KeyError) as e:
165 raise ValueError(f"{source} has missing URL") 201 raise ValueError(f"{source} has missing URL") from e
166 202
167 if extra.get("ignore_tls_errors", False): 203 if extra.get("ignore_tls_errors", False):
168 log.warning("TLS certificate check disabled, this is highly insecure") 204 log.warning("TLS certificate check disabled, this is highly insecure")
169 treq_client = treq_client_no_ssl 205 treq_client = treq_client_no_ssl
170 else: 206 else:
292 [], 328 [],
293 name=attachment["filename"], 329 name=attachment["filename"],
294 size=attachment["size"], 330 size=attachment["size"],
295 file_hash=file_hash, 331 file_hash=file_hash,
296 ) 332 )
297 encrypted_elt = file_sharing_elt.sources.addElement( 333 sources_elt = file_sharing_elt.sources
298 (NS_ESFS, "encrypted") 334 assert sources_elt is not None
299 ) 335 encrypted_elt = sources_elt.addElement((NS_ESFS, "encrypted"))
300 encrypted_elt["cipher"] = NS_AES_256_GCM 336 encrypted_elt["cipher"] = NS_AES_256_GCM
301 encrypted_elt.addElement( 337 encrypted_elt.addElement(
302 "key", content=base64.b64encode(encryption_data["key"]).decode() 338 "key", content=base64.b64encode(encryption_data["key"]).decode()
303 ) 339 )
304 encrypted_elt.addElement( 340 encrypted_elt.addElement(
309 attachment["encrypted_hash"], attachment["encrypted_hash_algo"] 345 attachment["encrypted_hash"], attachment["encrypted_hash_algo"]
310 ) 346 )
311 ) 347 )
312 encrypted_elt.addChild( 348 encrypted_elt.addChild(
313 self._sfs.get_sources_elt( 349 self._sfs.get_sources_elt(
314 [self._u.get_url_data_elt(attachment["url"])] 350 [self._u.generate_url_data(attachment["url"]).to_element()]
315 ) 351 )
316 ) 352 )
317 data["xml"].addChild(file_sharing_elt) 353 data["xml"].addChild(file_sharing_elt)
318 354
319 for attachment in extra_attachments: 355 for attachment in extra_attachments: