Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0420.py @ 3877:00212260f659
plugin XEP-0420: Implementation of Stanza Content Encryption:
Includes implementation of XEP-0082 (XMPP date and time profiles) and tests for both new plugins.
Everything is type checked, linted, format checked and unit tested.
Adds new dependency xmlschema.
fix 377
author | Syndace <me@syndace.dev> |
---|---|
date | Tue, 23 Aug 2022 12:04:11 +0200 |
parents | |
children | 8289ac1b34f4 |
comparison
equal
deleted
inserted
replaced
3876:e3c1f4736ab2 | 3877:00212260f659 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for Stanza Content Encryption | |
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 # Type-check with `mypy --strict --disable-error-code no-untyped-call` | |
20 # Lint with `pylint` | |
21 | |
22 from abc import ABC, abstractmethod | |
23 from datetime import datetime | |
24 import enum | |
25 import secrets | |
26 import string | |
27 from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast | |
28 | |
29 from lxml import etree | |
30 | |
31 from sat.core.constants import Const as C | |
32 from sat.core.i18n import D_ | |
33 from sat.core.log import Logger, getLogger | |
34 from sat.core.sat_main import SAT | |
35 from sat.tools.xml_tools import ElementParser | |
36 from sat.plugins.plugin_xep_0033 import NS_ADDRESS | |
37 from sat.plugins.plugin_xep_0082 import XEP_0082 | |
38 from sat.plugins.plugin_xep_0334 import NS_HINTS | |
39 from sat.plugins.plugin_xep_0359 import NS_SID | |
40 from sat.plugins.plugin_xep_0380 import NS_EME | |
41 from twisted.words.protocols.jabber import jid | |
42 from twisted.words.xish import domish | |
43 | |
44 | |
45 __all__ = [ # pylint: disable=unused-variable | |
46 "PLUGIN_INFO", | |
47 "NS_SCE", | |
48 "XEP_0420", | |
49 "ProfileRequirementsNotMet", | |
50 "AffixVerificationFailed", | |
51 "SCECustomAffix", | |
52 "SCEAffixPolicy", | |
53 "SCEProfile", | |
54 "SCEAffixValues" | |
55 ] | |
56 | |
57 | |
58 log = cast(Logger, getLogger(__name__)) | |
59 | |
60 | |
61 PLUGIN_INFO = { | |
62 C.PI_NAME: "SCE", | |
63 C.PI_IMPORT_NAME: "XEP-0420", | |
64 C.PI_TYPE: "SEC", | |
65 C.PI_PROTOCOLS: [ "XEP-0420" ], | |
66 C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ], | |
67 C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ], | |
68 C.PI_MAIN: "XEP_0420", | |
69 C.PI_HANDLER: "no", | |
70 C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"), | |
71 } | |
72 | |
73 | |
74 NS_SCE = "urn:xmpp:sce:1" | |
75 | |
76 | |
77 class ProfileRequirementsNotMet(Exception): | |
78 """ | |
79 Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the | |
80 profile are not met. | |
81 """ | |
82 | |
83 | |
84 class AffixVerificationFailed(Exception): | |
85 """ | |
86 Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure. | |
87 """ | |
88 | |
89 | |
90 class SCECustomAffix(ABC): | |
91 """ | |
92 Interface for custom affixes of SCE profiles. | |
93 """ | |
94 | |
95 @property | |
96 @abstractmethod | |
97 def element_name(self) -> str: | |
98 """ | |
99 @return: The name of the affix's XML element. | |
100 """ | |
101 | |
102 @property | |
103 @abstractmethod | |
104 def element_schema(self) -> str: | |
105 """ | |
106 @return: The XML schema definition of the affix element's XML structure, i.e. the | |
107 ``<xs:element/>`` schema element. This element will be referenced using | |
108 ``<xs:element ref="{element_name}"/>``. | |
109 """ | |
110 | |
111 @abstractmethod | |
112 def create(self, stanza: domish.Element) -> domish.Element: | |
113 """ | |
114 @param stanza: The stanza element which has been processed by | |
115 :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed | |
116 and only the root ``<message/>`` or ``<iq/>`` and unencryptable children | |
117 remain. Do not modify. | |
118 @return: An affix element to include in the envelope. The element must have the | |
119 name :attr:`element_name` and must validate using :attr:`element_schema`. | |
120 @raise ValueError: if the affix couldn't be built. | |
121 """ | |
122 | |
123 @abstractmethod | |
124 def verify(self, stanza: domish.Element, element: domish.Element) -> None: | |
125 """ | |
126 @param stanza: The stanza element before being processed by | |
127 :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been | |
128 removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable | |
129 children remain. Do not modify. | |
130 @param element: The affix element to verify. | |
131 @raise AffixVerificationFailed: on verification failure. | |
132 """ | |
133 | |
134 | |
135 @enum.unique | |
136 class SCEAffixPolicy(enum.Enum): | |
137 """ | |
138 Policy for the presence of an affix in an SCE envelope. | |
139 """ | |
140 | |
141 REQUIRED: str = "REQUIRED" | |
142 OPTIONAL: str = "OPTIONAL" | |
143 NOT_NEEDED: str = "NOT_NEEDED" | |
144 | |
145 | |
146 class SCEProfile(NamedTuple): | |
147 # pylint: disable=invalid-name | |
148 """ | |
149 An SCE profile, i.e. the definition which affixes are required, optional or not needed | |
150 at all by an SCE-enabled encryption protocol. | |
151 """ | |
152 | |
153 rpad_policy: SCEAffixPolicy | |
154 time_policy: SCEAffixPolicy | |
155 to_policy: SCEAffixPolicy | |
156 from_policy: SCEAffixPolicy | |
157 custom_policies: Dict[SCECustomAffix, SCEAffixPolicy] | |
158 | |
159 | |
160 class SCEAffixValues(NamedTuple): | |
161 # pylint: disable=invalid-name | |
162 """ | |
163 Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values | |
164 of all affixes included in the envelope. For custom affixes, the whole affix element | |
165 is returned. | |
166 """ | |
167 | |
168 rpad: Optional[str] | |
169 timestamp: Optional[datetime] | |
170 recipient: Optional[jid.JID] | |
171 sender: Optional[jid.JID] | |
172 custom: Dict[SCECustomAffix, domish.Element] | |
173 | |
174 | |
175 ENVELOPE_SCHEMA = """<?xml version="1.0" encoding="utf8"?> | |
176 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" | |
177 targetNamespace="urn:xmpp:sce:1" | |
178 xmlns="urn:xmpp:sce:1"> | |
179 | |
180 <xs:element name="envelope"> | |
181 <xs:complexType> | |
182 <xs:all> | |
183 <xs:element ref="content"/> | |
184 <xs:element ref="rpad" minOccurs="0"/> | |
185 <xs:element ref="time" minOccurs="0"/> | |
186 <xs:element ref="to" minOccurs="0"/> | |
187 <xs:element ref="from" minOccurs="0"/> | |
188 {custom_affix_references} | |
189 </xs:all> | |
190 </xs:complexType> | |
191 </xs:element> | |
192 | |
193 <xs:element name="content"> | |
194 <xs:complexType> | |
195 <xs:sequence> | |
196 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/> | |
197 </xs:sequence> | |
198 </xs:complexType> | |
199 </xs:element> | |
200 | |
201 <xs:element name="rpad" type="xs:string"/> | |
202 | |
203 <xs:element name="time"> | |
204 <xs:complexType> | |
205 <xs:attribute name="stamp" type="xs:dateTime"/> | |
206 </xs:complexType> | |
207 </xs:element> | |
208 | |
209 <xs:element name="to"> | |
210 <xs:complexType> | |
211 <xs:attribute name="jid" type="xs:string"/> | |
212 </xs:complexType> | |
213 </xs:element> | |
214 | |
215 <xs:element name="from"> | |
216 <xs:complexType> | |
217 <xs:attribute name="jid" type="xs:string"/> | |
218 </xs:complexType> | |
219 </xs:element> | |
220 | |
221 {custom_affix_definitions} | |
222 </xs:schema> | |
223 """ | |
224 | |
225 | |
226 class XEP_0420: # pylint: disable=invalid-name | |
227 """ | |
228 Implementation of XEP-0420: Stanza Content Encryption under namespace | |
229 ``urn:xmpp:sce:1``. | |
230 | |
231 This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas | |
232 actively, but offers API for other plugins to use. | |
233 """ | |
234 | |
235 # Set of namespaces whose elements are never allowed to be transferred in an encrypted | |
236 # envelope. | |
237 MUST_BE_PLAINTEXT_NAMESPACES: Set[str] = { | |
238 NS_HINTS, | |
239 NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id | |
240 NS_ADDRESS, | |
241 # Not part of the specification (yet), but just doesn't make sense in an encrypted | |
242 # envelope: | |
243 NS_EME | |
244 } | |
245 | |
246 # Set of (namespace, element name) tuples that define elements which are never allowed | |
247 # to be transferred in an encrypted envelope. If all elements under a certain | |
248 # namespace are forbidden, the namespace can be added to | |
249 # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead. | |
250 # Note: only full namespaces are forbidden by the spec for now, the following is for | |
251 # potential future use. | |
252 MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set() | |
253 | |
254 def __init__(self, sat: SAT) -> None: | |
255 """ | |
256 @param sat: The SAT instance. | |
257 """ | |
258 | |
259 @staticmethod | |
260 def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes: | |
261 """Pack a stanza according to Stanza Content Encryption. | |
262 | |
263 Removes all elements from the stanza except for a few exceptions that explicitly | |
264 need to be transferred in plaintext, e.g. because they contain hints/instructions | |
265 for the server on how to process the stanza. Together with the affix elements as | |
266 requested by the profile, the removed elements are added to an envelope XML | |
267 structure that builds the plaintext to be encrypted by the SCE-enabled encryption | |
268 scheme. Optional affixes are always added to the structure, i.e. they are treated | |
269 by the packing code as if they were required. | |
270 | |
271 Once built, the envelope structure is serialized to a byte string and returned for | |
272 the encryption scheme to encrypt and add to the stanza. | |
273 | |
274 @param profile: The SCE profile, i.e. the definition of affixes to include in the | |
275 envelope. | |
276 @param stanza: The stanza to process. Will be modified by the call. | |
277 @return: The serialized envelope structure that builds the plaintext for the | |
278 encryption scheme to process. | |
279 @raise ValueError: if the <to/> or <from/> affixes are requested but the stanza | |
280 doesn't have the "to"/"from" attribute set to extract the value from. Can also | |
281 be raised by custom affixes. | |
282 | |
283 @warning: It is up to the calling code to add a <store/> message processing hint | |
284 if applicable. | |
285 """ | |
286 | |
287 # Prepare the envelope and content elements | |
288 envelope = domish.Element((NS_SCE, "envelope")) | |
289 content = envelope.addElement((NS_SCE, "content")) | |
290 | |
291 # Note the serialized byte size of the content element before adding any children | |
292 empty_content_byte_size = len(content.toXml().encode("utf-8")) | |
293 | |
294 # Just for type safety | |
295 stanza_children = cast(List[Union[domish.Element, str]], stanza.children) | |
296 content_children = cast(List[Union[domish.Element, str]], content.children) | |
297 | |
298 # Move elements that are not explicitly forbidden from being encrypted from the | |
299 # stanza to the content element. | |
300 for child in list(cast(Iterator[domish.Element], stanza.elements())): | |
301 if ( | |
302 child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES | |
303 and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS | |
304 ): | |
305 # Remove the child from the stanza | |
306 stanza_children.remove(child) | |
307 | |
308 # A namespace of ``None`` can be used on domish elements to inherit the | |
309 # namespace from the parent. When moving elements from the stanza root to | |
310 # the content element, however, we don't want elements to inherit the | |
311 # namespace of the content element. Thus, check for elements with ``None`` | |
312 # for their namespace and set the namespace to jabber:client, which is the | |
313 # namespace of the parent element. | |
314 if child.uri is None: | |
315 child.uri = C.NS_CLIENT | |
316 child.defaultUri = C.NS_CLIENT | |
317 | |
318 # Add the child with corrected namespaces to the content element | |
319 content_children.append(child) | |
320 | |
321 # Add the affixes requested by the profile | |
322 if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED: | |
323 # The specification defines the rpad affix to contain "[...] a randomly | |
324 # generated sequence of random length between 0 and 200 characters." This | |
325 # implementation differs a bit from the specification in that a minimum size | |
326 # other than 0 is chosen depending on the serialized size of the content | |
327 # element. This is to prevent the scenario where the encrypted content is | |
328 # short and the rpad is also randomly chosen to be short, which could allow | |
329 # guessing the content of a short message. To do so, the rpad length is first | |
330 # chosen to pad the content to at least 53 bytes, then afterwards another 0 to | |
331 # 200 bytes are added. Note that single-byte characters are used by this | |
332 # implementation, thus the number of characters equals the number of bytes. | |
333 content_byte_size = len(content.toXml().encode("utf-8")) | |
334 content_byte_size_diff = content_byte_size - empty_content_byte_size | |
335 rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201) | |
336 rpad_content = "".join( | |
337 secrets.choice(string.digits + string.ascii_letters + string.punctuation) | |
338 for __ | |
339 in range(rpad_length) | |
340 ) | |
341 envelope.addElement((NS_SCE, "rpad"), content=rpad_content) | |
342 | |
343 if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED: | |
344 time_element = envelope.addElement((NS_SCE, "time")) | |
345 time_element["stamp"] = XEP_0082.format_datetime() | |
346 | |
347 if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED: | |
348 recipient = cast(Optional[str], stanza.getAttribute("to", None)) | |
349 if recipient is None: | |
350 raise ValueError( | |
351 "<to/> affix requested, but stanza doesn't have the 'to' attribute" | |
352 " set." | |
353 ) | |
354 | |
355 to_element = envelope.addElement((NS_SCE, "to")) | |
356 to_element["jid"] = jid.JID(recipient).userhost() | |
357 | |
358 if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED: | |
359 sender = cast(Optional[str], stanza.getAttribute("from", None)) | |
360 if sender is None: | |
361 raise ValueError( | |
362 "<from/> affix requested, but stanza doesn't have the 'from'" | |
363 " attribute set." | |
364 ) | |
365 | |
366 from_element = envelope.addElement((NS_SCE, "from")) | |
367 from_element["jid"] = jid.JID(sender).userhost() | |
368 | |
369 for affix, policy in profile.custom_policies.items(): | |
370 if policy is not SCEAffixPolicy.NOT_NEEDED: | |
371 envelope.addChild(affix.create(stanza)) | |
372 | |
373 return cast(str, envelope.toXml()).encode("utf-8") | |
374 | |
375 @staticmethod | |
376 def unpack_stanza( | |
377 profile: SCEProfile, | |
378 stanza: domish.Element, | |
379 envelope_serialized: bytes | |
380 ) -> SCEAffixValues: | |
381 """Unpack a stanza packed according to Stanza Content Encryption. | |
382 | |
383 Parses the serialized envelope as XML, verifies included affixes and makes sure | |
384 the requirements of the profile are met, and restores the stanza by moving | |
385 decrypted elements from the envelope back to the stanza top level. | |
386 | |
387 @param profile: The SCE profile, i.e. the definition of affixes that have to/may | |
388 be included in the envelope. | |
389 @param stanza: The stanza to process. Will be modified by the call. | |
390 @param envelope_serialized: The serialized envelope, i.e. the plaintext produced | |
391 by the decryption scheme utilizing SCE. | |
392 @return: The parsed and processed values of all affixes that were present on the | |
393 envelope, notably including the timestamp. | |
394 @raise ValueError: if the serialized envelope element is malformed. | |
395 @raise ProfileRequirementsNotMet: if one or more affixes required by the profile | |
396 are missing from the envelope. | |
397 @raise AffixVerificationFailed: if an affix included in the envelope fails to | |
398 validate. It doesn't matter whether the affix is required by the profile or | |
399 not, all affixes included in the envelope are validated and cause this | |
400 exception to be raised on failure. | |
401 | |
402 @warning: It is up to the calling code to verify the timestamp, if returned, since | |
403 the requirements on the timestamp may vary between SCE-enabled protocols. | |
404 """ | |
405 | |
406 try: | |
407 envelope_serialized_string = envelope_serialized.decode("utf-8") | |
408 except UnicodeError as e: | |
409 raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e | |
410 | |
411 custom_affixes = set(profile.custom_policies.keys()) | |
412 | |
413 # Make sure the envelope adheres to the schema | |
414 parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format( | |
415 custom_affix_references="".join( | |
416 f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>' | |
417 for custom_affix | |
418 in custom_affixes | |
419 ), | |
420 custom_affix_definitions="".join( | |
421 custom_affix.element_schema | |
422 for custom_affix | |
423 in custom_affixes | |
424 ) | |
425 ).encode("utf-8")))) | |
426 | |
427 try: | |
428 etree.fromstring(envelope_serialized_string, parser) | |
429 except etree.XMLSyntaxError as e: | |
430 raise ValueError("Serialized envelope doesn't pass schema validation.") from e | |
431 | |
432 # Prepare the envelope and content elements | |
433 envelope = cast(domish.Element, ElementParser()(envelope_serialized_string)) | |
434 content = cast(domish.Element, next(envelope.elements(NS_SCE, "content"))) | |
435 | |
436 # Verify the affixes | |
437 rpad_element = cast( | |
438 Optional[domish.Element], | |
439 next(envelope.elements(NS_SCE, "rpad"), None) | |
440 ) | |
441 time_element = cast( | |
442 Optional[domish.Element], | |
443 next(envelope.elements(NS_SCE, "time"), None) | |
444 ) | |
445 to_element = cast( | |
446 Optional[domish.Element], | |
447 next(envelope.elements(NS_SCE, "to"), None) | |
448 ) | |
449 from_element = cast( | |
450 Optional[domish.Element], | |
451 next(envelope.elements(NS_SCE, "from"), None) | |
452 ) | |
453 | |
454 # The rpad doesn't need verification. | |
455 rpad_value = None if rpad_element is None else str(rpad_element) | |
456 | |
457 # The time affix isn't verified other than that the timestamp is parseable. | |
458 try: | |
459 timestamp_value = None if time_element is None else \ | |
460 XEP_0082.parse_datetime(time_element["stamp"]) | |
461 except ValueError as e: | |
462 raise AffixVerificationFailed("Malformed time affix") from e | |
463 | |
464 # The to affix is verified by comparing the to attribute of the stanza with the | |
465 # JID referenced by the affix. Note that only bare JIDs are compared as per the | |
466 # specification. | |
467 recipient_value: Optional[jid.JID] = None | |
468 if to_element is not None: | |
469 recipient_value = jid.JID(to_element["jid"]) | |
470 | |
471 recipient_actual = cast(Optional[str], stanza.getAttribute("to", None)) | |
472 if recipient_actual is None: | |
473 raise AffixVerificationFailed( | |
474 "'To' affix is included in the envelope, but the stanza is lacking a" | |
475 " 'to' attribute to compare the value to." | |
476 ) | |
477 | |
478 recipient_actual_bare_jid = jid.JID(recipient_actual).userhost() | |
479 recipient_target_bare_jid = recipient_value.userhost() | |
480 | |
481 if recipient_actual_bare_jid != recipient_target_bare_jid: | |
482 raise AffixVerificationFailed( | |
483 f"Mismatch between actual and target recipient bare JIDs:" | |
484 f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}." | |
485 ) | |
486 | |
487 # The from affix is verified by comparing the from attribute of the stanza with | |
488 # the JID referenced by the affix. Note that only bare JIDs are compared as per | |
489 # the specification. | |
490 sender_value: Optional[jid.JID] = None | |
491 if from_element is not None: | |
492 sender_value = jid.JID(from_element["jid"]) | |
493 | |
494 sender_actual = cast(Optional[str], stanza.getAttribute("from", None)) | |
495 if sender_actual is None: | |
496 raise AffixVerificationFailed( | |
497 "'From' affix is included in the envelope, but the stanza is lacking" | |
498 " a 'from' attribute to compare the value to." | |
499 ) | |
500 | |
501 sender_actual_bare_jid = jid.JID(sender_actual).userhost() | |
502 sender_target_bare_jid = sender_value.userhost() | |
503 | |
504 if sender_actual_bare_jid != sender_target_bare_jid: | |
505 raise AffixVerificationFailed( | |
506 f"Mismatch between actual and target sender bare JIDs:" | |
507 f" {sender_actual_bare_jid} vs {sender_target_bare_jid}." | |
508 ) | |
509 | |
510 # Find and verify custom affixes | |
511 custom_values: Dict[SCECustomAffix, domish.Element] = {} | |
512 for affix in custom_affixes: | |
513 element_name = affix.element_name | |
514 element = cast( | |
515 Optional[domish.Element], | |
516 next(envelope.elements(NS_SCE, element_name), None) | |
517 ) | |
518 if element is not None: | |
519 affix.verify(stanza, element) | |
520 custom_values[affix] = element | |
521 | |
522 # Check whether all affixes required by the profile are present | |
523 rpad_missing = \ | |
524 profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None | |
525 time_missing = \ | |
526 profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None | |
527 to_missing = \ | |
528 profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None | |
529 from_missing = \ | |
530 profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None | |
531 custom_missing = any( | |
532 affix not in custom_values | |
533 for affix, policy | |
534 in profile.custom_policies.items() | |
535 if policy is SCEAffixPolicy.REQUIRED | |
536 ) | |
537 | |
538 if rpad_missing or time_missing or to_missing or from_missing or custom_missing: | |
539 custom_missing_string = "" | |
540 for custom_affix in custom_affixes: | |
541 value = "present" if custom_affix in custom_values else "missing" | |
542 custom_missing_string += f", [custom]{custom_affix.element_name}={value}" | |
543 | |
544 raise ProfileRequirementsNotMet( | |
545 f"SCE envelope is missing affixes required by the profile {profile}." | |
546 f" Affix presence:" | |
547 f" rpad={'missing' if rpad_missing else 'present'}" | |
548 f", time={'missing' if time_missing else 'present'}" | |
549 f", to={'missing' if to_missing else 'present'}" | |
550 f", from={'missing' if from_missing else 'present'}" | |
551 + custom_missing_string | |
552 ) | |
553 | |
554 # Just for type safety | |
555 content_children = cast(List[Union[domish.Element, str]], content.children) | |
556 stanza_children = cast(List[Union[domish.Element, str]], stanza.children) | |
557 | |
558 # Move elements that are not explicitly forbidden from being encrypted from the | |
559 # content element to the stanza. | |
560 for child in list(cast(Iterator[domish.Element], content.elements())): | |
561 if ( | |
562 child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES | |
563 or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS | |
564 ): | |
565 log.warning( | |
566 f"An element that MUST be transferred in plaintext was found in an" | |
567 f" SCE envelope: {child.toXml()}" | |
568 ) | |
569 else: | |
570 # Remove the child from the content element | |
571 content_children.remove(child) | |
572 | |
573 # Add the child to the stanza | |
574 stanza_children.append(child) | |
575 | |
576 return SCEAffixValues( | |
577 rpad_value, | |
578 timestamp_value, | |
579 recipient_value, | |
580 sender_value, | |
581 custom_values | |
582 ) |