comparison libervia/backend/plugins/plugin_xep_0420.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0420.py@cecf45416403
children 040095a5dc7f
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia plugin for Stanza Content Encryption
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from abc import ABC, abstractmethod
20 from datetime import datetime
21 import enum
22 import secrets
23 import string
24 from typing import Dict, NamedTuple, Optional, Set, Tuple, cast
25 from typing_extensions import Final
26
27 from lxml import etree
28 from libervia.backend.core import exceptions
29
30 from libervia.backend.core.constants import Const as C
31 from libervia.backend.core.i18n import D_
32 from libervia.backend.core.log import Logger, getLogger
33 from libervia.backend.core.sat_main import SAT
34 from libervia.backend.tools.xml_tools import ElementParser
35 from libervia.backend.plugins.plugin_xep_0033 import NS_ADDRESS
36 from libervia.backend.plugins.plugin_xep_0082 import XEP_0082
37 from libervia.backend.plugins.plugin_xep_0334 import NS_HINTS
38 from libervia.backend.plugins.plugin_xep_0359 import NS_SID
39 from libervia.backend.plugins.plugin_xep_0380 import NS_EME
40 from twisted.words.protocols.jabber import jid
41 from twisted.words.xish import domish
42
43
44 __all__ = [ # pylint: disable=unused-variable
45 "PLUGIN_INFO",
46 "NS_SCE",
47 "XEP_0420",
48 "ProfileRequirementsNotMet",
49 "AffixVerificationFailed",
50 "SCECustomAffix",
51 "SCEAffixPolicy",
52 "SCEProfile",
53 "SCEAffixValues"
54 ]
55
56
57 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
58
59
60 PLUGIN_INFO = {
61 C.PI_NAME: "SCE",
62 C.PI_IMPORT_NAME: "XEP-0420",
63 C.PI_TYPE: "SEC",
64 C.PI_PROTOCOLS: [ "XEP-0420" ],
65 C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ],
66 C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ],
67 C.PI_MAIN: "XEP_0420",
68 C.PI_HANDLER: "no",
69 C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"),
70 }
71
72
73 NS_SCE: Final = "urn:xmpp:sce:1"
74
75
76 class ProfileRequirementsNotMet(Exception):
77 """
78 Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the
79 profile are not met.
80 """
81
82
83 class AffixVerificationFailed(Exception):
84 """
85 Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure.
86 """
87
88
89 class SCECustomAffix(ABC):
90 """
91 Interface for custom affixes of SCE profiles.
92 """
93
94 @property
95 @abstractmethod
96 def element_name(self) -> str:
97 """
98 @return: The name of the affix's XML element.
99 """
100
101 @property
102 @abstractmethod
103 def element_schema(self) -> str:
104 """
105 @return: The XML schema definition of the affix element's XML structure, i.e. the
106 ``<xs:element/>`` schema element. This element will be referenced using
107 ``<xs:element ref="{element_name}"/>``.
108 """
109
110 @abstractmethod
111 def create(self, stanza: domish.Element) -> domish.Element:
112 """
113 @param stanza: The stanza element which has been processed by
114 :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed
115 and only the root ``<message/>`` or ``<iq/>`` and unencryptable children
116 remain. Do not modify.
117 @return: An affix element to include in the envelope. The element must have the
118 name :attr:`element_name` and must validate using :attr:`element_schema`.
119 @raise ValueError: if the affix couldn't be built due to missing information on
120 the stanza.
121 """
122
123 @abstractmethod
124 def verify(self, stanza: domish.Element, element: domish.Element) -> None:
125 """
126 @param stanza: The stanza element before being processed by
127 :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been
128 removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable
129 children remain. Do not modify.
130 @param element: The affix element to verify.
131 @raise AffixVerificationFailed: on verification failure.
132 """
133
134
135 @enum.unique
136 class SCEAffixPolicy(enum.Enum):
137 """
138 Policy for the presence of an affix in an SCE envelope.
139 """
140
141 REQUIRED: str = "REQUIRED"
142 OPTIONAL: str = "OPTIONAL"
143 NOT_NEEDED: str = "NOT_NEEDED"
144
145
146 class SCEProfile(NamedTuple):
147 # pylint: disable=invalid-name
148 """
149 An SCE profile, i.e. the definition which affixes are required, optional or not needed
150 at all by an SCE-enabled encryption protocol.
151 """
152
153 rpad_policy: SCEAffixPolicy
154 time_policy: SCEAffixPolicy
155 to_policy: SCEAffixPolicy
156 from_policy: SCEAffixPolicy
157 custom_policies: Dict[SCECustomAffix, SCEAffixPolicy]
158
159
160 class SCEAffixValues(NamedTuple):
161 # pylint: disable=invalid-name
162 """
163 Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values
164 of all affixes included in the envelope. For custom affixes, the whole affix element
165 is returned.
166 """
167
168 rpad: Optional[str]
169 timestamp: Optional[datetime]
170 recipient: Optional[jid.JID]
171 sender: Optional[jid.JID]
172 custom: Dict[SCECustomAffix, domish.Element]
173
174
175 ENVELOPE_SCHEMA = """<?xml version="1.0" encoding="utf8"?>
176 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
177 targetNamespace="urn:xmpp:sce:1"
178 xmlns="urn:xmpp:sce:1">
179
180 <xs:element name="envelope">
181 <xs:complexType>
182 <xs:all>
183 <xs:element ref="content"/>
184 <xs:element ref="rpad" minOccurs="0"/>
185 <xs:element ref="time" minOccurs="0"/>
186 <xs:element ref="to" minOccurs="0"/>
187 <xs:element ref="from" minOccurs="0"/>
188 {custom_affix_references}
189 </xs:all>
190 </xs:complexType>
191 </xs:element>
192
193 <xs:element name="content">
194 <xs:complexType>
195 <xs:sequence>
196 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
197 </xs:sequence>
198 </xs:complexType>
199 </xs:element>
200
201 <xs:element name="rpad" type="xs:string"/>
202
203 <xs:element name="time">
204 <xs:complexType>
205 <xs:attribute name="stamp" type="xs:dateTime"/>
206 </xs:complexType>
207 </xs:element>
208
209 <xs:element name="to">
210 <xs:complexType>
211 <xs:attribute name="jid" type="xs:string"/>
212 </xs:complexType>
213 </xs:element>
214
215 <xs:element name="from">
216 <xs:complexType>
217 <xs:attribute name="jid" type="xs:string"/>
218 </xs:complexType>
219 </xs:element>
220
221 {custom_affix_definitions}
222 </xs:schema>
223 """
224
225
226 class XEP_0420: # pylint: disable=invalid-name
227 """
228 Implementation of XEP-0420: Stanza Content Encryption under namespace
229 ``urn:xmpp:sce:1``.
230
231 This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas
232 actively, but offers API for other plugins to use.
233 """
234
235 # Set of namespaces whose elements are never allowed to be transferred in an encrypted
236 # envelope.
237 MUST_BE_PLAINTEXT_NAMESPACES: Set[str] = {
238 NS_HINTS,
239 NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id
240 NS_ADDRESS,
241 # Not part of the specification (yet), but just doesn't make sense in an encrypted
242 # envelope:
243 NS_EME
244 }
245
246 # Set of (namespace, element name) tuples that define elements which are never allowed
247 # to be transferred in an encrypted envelope. If all elements under a certain
248 # namespace are forbidden, the namespace can be added to
249 # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead.
250 # Note: only full namespaces are forbidden by the spec for now, the following is for
251 # potential future use.
252 MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set()
253
254 def __init__(self, sat: SAT) -> None:
255 """
256 @param sat: The SAT instance.
257 """
258
259 @staticmethod
260 def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes:
261 """Pack a stanza according to Stanza Content Encryption.
262
263 Removes all elements from the stanza except for a few exceptions that explicitly
264 need to be transferred in plaintext, e.g. because they contain hints/instructions
265 for the server on how to process the stanza. Together with the affix elements as
266 requested by the profile, the removed elements are added to an envelope XML
267 structure that builds the plaintext to be encrypted by the SCE-enabled encryption
268 scheme. Optional affixes are always added to the structure, i.e. they are treated
269 by the packing code as if they were required.
270
271 Once built, the envelope structure is serialized to a byte string and returned for
272 the encryption scheme to encrypt and add to the stanza.
273
274 @param profile: The SCE profile, i.e. the definition of affixes to include in the
275 envelope.
276 @param stanza: The stanza to process. Will be modified by the call.
277 @return: The serialized envelope structure that builds the plaintext for the
278 encryption scheme to process.
279 @raise ValueError: if the <to/> or <from/> affixes are requested but the stanza
280 doesn't have the "to"/"from" attribute set to extract the value from. Can also
281 be raised by custom affixes.
282
283 @warning: It is up to the calling code to add a <store/> message processing hint
284 if applicable.
285 """
286
287 # Prepare the envelope and content elements
288 envelope = domish.Element((NS_SCE, "envelope"))
289 content = envelope.addElement((NS_SCE, "content"))
290
291 # Note the serialized byte size of the content element before adding any children
292 empty_content_byte_size = len(content.toXml().encode("utf-8"))
293
294 # Move elements that are not explicitly forbidden from being encrypted from the
295 # stanza to the content element.
296 for child in list(stanza.elements()):
297 if (
298 child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
299 and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
300 ):
301 # Remove the child from the stanza
302 stanza.children.remove(child)
303
304 # A namespace of ``None`` can be used on domish elements to inherit the
305 # namespace from the parent. When moving elements from the stanza root to
306 # the content element, however, we don't want elements to inherit the
307 # namespace of the content element. Thus, check for elements with ``None``
308 # for their namespace and set the namespace to jabber:client, which is the
309 # namespace of the parent element.
310 if child.uri is None:
311 child.uri = C.NS_CLIENT
312 child.defaultUri = C.NS_CLIENT
313
314 # Add the child with corrected namespaces to the content element
315 content.addChild(child)
316
317 # Add the affixes requested by the profile
318 if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED:
319 # The specification defines the rpad affix to contain "[...] a randomly
320 # generated sequence of random length between 0 and 200 characters." This
321 # implementation differs a bit from the specification in that a minimum size
322 # other than 0 is chosen depending on the serialized size of the content
323 # element. This is to prevent the scenario where the encrypted content is
324 # short and the rpad is also randomly chosen to be short, which could allow
325 # guessing the content of a short message. To do so, the rpad length is first
326 # chosen to pad the content to at least 53 bytes, then afterwards another 0 to
327 # 200 bytes are added. Note that single-byte characters are used by this
328 # implementation, thus the number of characters equals the number of bytes.
329 content_byte_size = len(content.toXml().encode("utf-8"))
330 content_byte_size_diff = content_byte_size - empty_content_byte_size
331 rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201)
332 rpad_content = "".join(
333 secrets.choice(string.digits + string.ascii_letters + string.punctuation)
334 for __
335 in range(rpad_length)
336 )
337 envelope.addElement((NS_SCE, "rpad"), content=rpad_content)
338
339 if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED:
340 time_element = envelope.addElement((NS_SCE, "time"))
341 time_element["stamp"] = XEP_0082.format_datetime()
342
343 if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED:
344 recipient = stanza.getAttribute("to", None)
345 if recipient is not None:
346 to_element = envelope.addElement((NS_SCE, "to"))
347 to_element["jid"] = jid.JID(recipient).userhost()
348 elif profile.to_policy is SCEAffixPolicy.REQUIRED:
349 raise ValueError(
350 "<to/> affix requested, but stanza doesn't have the 'to' attribute"
351 " set."
352 )
353
354 if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED:
355 sender = stanza.getAttribute("from", None)
356 if sender is not None:
357 from_element = envelope.addElement((NS_SCE, "from"))
358 from_element["jid"] = jid.JID(sender).userhost()
359 elif profile.from_policy is SCEAffixPolicy.REQUIRED:
360 raise ValueError(
361 "<from/> affix requested, but stanza doesn't have the 'from'"
362 " attribute set."
363 )
364
365 for affix, policy in profile.custom_policies.items():
366 if policy is not SCEAffixPolicy.NOT_NEEDED:
367 envelope.addChild(affix.create(stanza))
368
369 return envelope.toXml().encode("utf-8")
370
371 @staticmethod
372 def unpack_stanza(
373 profile: SCEProfile,
374 stanza: domish.Element,
375 envelope_serialized: bytes
376 ) -> SCEAffixValues:
377 """Unpack a stanza packed according to Stanza Content Encryption.
378
379 Parses the serialized envelope as XML, verifies included affixes and makes sure
380 the requirements of the profile are met, and restores the stanza by moving
381 decrypted elements from the envelope back to the stanza top level.
382
383 @param profile: The SCE profile, i.e. the definition of affixes that have to/may
384 be included in the envelope.
385 @param stanza: The stanza to process. Will be modified by the call.
386 @param envelope_serialized: The serialized envelope, i.e. the plaintext produced
387 by the decryption scheme utilizing SCE.
388 @return: The parsed and processed values of all affixes that were present on the
389 envelope, notably including the timestamp.
390 @raise exceptions.ParsingError: if the serialized envelope element is malformed.
391 @raise ProfileRequirementsNotMet: if one or more affixes required by the profile
392 are missing from the envelope.
393 @raise AffixVerificationFailed: if an affix included in the envelope fails to
394 validate. It doesn't matter whether the affix is required by the profile or
395 not, all affixes included in the envelope are validated and cause this
396 exception to be raised on failure.
397
398 @warning: It is up to the calling code to verify the timestamp, if returned, since
399 the requirements on the timestamp may vary between SCE-enabled protocols.
400 """
401
402 try:
403 envelope_serialized_string = envelope_serialized.decode("utf-8")
404 except UnicodeError as e:
405 raise exceptions.ParsingError(
406 "Serialized envelope can't bare parsed as utf-8."
407 ) from e
408
409 custom_affixes = set(profile.custom_policies.keys())
410
411 # Make sure the envelope adheres to the schema
412 parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format(
413 custom_affix_references="".join(
414 f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>'
415 for custom_affix
416 in custom_affixes
417 ),
418 custom_affix_definitions="".join(
419 custom_affix.element_schema
420 for custom_affix
421 in custom_affixes
422 )
423 ).encode("utf-8"))))
424
425 try:
426 etree.fromstring(envelope_serialized_string, parser)
427 except etree.XMLSyntaxError as e:
428 raise exceptions.ParsingError(
429 "Serialized envelope doesn't pass schema validation."
430 ) from e
431
432 # Prepare the envelope and content elements
433 envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
434 content = next(envelope.elements(NS_SCE, "content"))
435
436 # Verify the affixes
437 rpad_element = cast(
438 Optional[domish.Element],
439 next(envelope.elements(NS_SCE, "rpad"), None)
440 )
441 time_element = cast(
442 Optional[domish.Element],
443 next(envelope.elements(NS_SCE, "time"), None)
444 )
445 to_element = cast(
446 Optional[domish.Element],
447 next(envelope.elements(NS_SCE, "to"), None)
448 )
449 from_element = cast(
450 Optional[domish.Element],
451 next(envelope.elements(NS_SCE, "from"), None)
452 )
453
454 # The rpad doesn't need verification.
455 rpad_value = None if rpad_element is None else str(rpad_element)
456
457 # The time affix isn't verified other than that the timestamp is parseable.
458 try:
459 timestamp_value = None if time_element is None else \
460 XEP_0082.parse_datetime(time_element["stamp"])
461 except ValueError as e:
462 raise AffixVerificationFailed("Malformed time affix.") from e
463
464 # The to affix is verified by comparing the to attribute of the stanza with the
465 # JID referenced by the affix. Note that only bare JIDs are compared as per the
466 # specification.
467 recipient_value: Optional[jid.JID] = None
468 if to_element is not None:
469 recipient_value = jid.JID(to_element["jid"])
470
471 recipient_actual = stanza.getAttribute("to", None)
472 if recipient_actual is None:
473 raise AffixVerificationFailed(
474 "'To' affix is included in the envelope, but the stanza is lacking a"
475 " 'to' attribute to compare the value to."
476 )
477
478 recipient_actual_bare_jid = jid.JID(recipient_actual).userhost()
479 recipient_target_bare_jid = recipient_value.userhost()
480
481 if recipient_actual_bare_jid != recipient_target_bare_jid:
482 raise AffixVerificationFailed(
483 f"Mismatch between actual and target recipient bare JIDs:"
484 f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}."
485 )
486
487 # The from affix is verified by comparing the from attribute of the stanza with
488 # the JID referenced by the affix. Note that only bare JIDs are compared as per
489 # the specification.
490 sender_value: Optional[jid.JID] = None
491 if from_element is not None:
492 sender_value = jid.JID(from_element["jid"])
493
494 sender_actual = stanza.getAttribute("from", None)
495 if sender_actual is None:
496 raise AffixVerificationFailed(
497 "'From' affix is included in the envelope, but the stanza is lacking"
498 " a 'from' attribute to compare the value to."
499 )
500
501 sender_actual_bare_jid = jid.JID(sender_actual).userhost()
502 sender_target_bare_jid = sender_value.userhost()
503
504 if sender_actual_bare_jid != sender_target_bare_jid:
505 raise AffixVerificationFailed(
506 f"Mismatch between actual and target sender bare JIDs:"
507 f" {sender_actual_bare_jid} vs {sender_target_bare_jid}."
508 )
509
510 # Find and verify custom affixes
511 custom_values: Dict[SCECustomAffix, domish.Element] = {}
512 for affix in custom_affixes:
513 element_name = affix.element_name
514 element = cast(
515 Optional[domish.Element],
516 next(envelope.elements(NS_SCE, element_name), None)
517 )
518 if element is not None:
519 affix.verify(stanza, element)
520 custom_values[affix] = element
521
522 # Check whether all affixes required by the profile are present
523 rpad_missing = \
524 profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None
525 time_missing = \
526 profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None
527 to_missing = \
528 profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None
529 from_missing = \
530 profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None
531 custom_missing = any(
532 affix not in custom_values
533 for affix, policy
534 in profile.custom_policies.items()
535 if policy is SCEAffixPolicy.REQUIRED
536 )
537
538 if rpad_missing or time_missing or to_missing or from_missing or custom_missing:
539 custom_missing_string = ""
540 for custom_affix in custom_affixes:
541 value = "present" if custom_affix in custom_values else "missing"
542 custom_missing_string += f", [custom]{custom_affix.element_name}={value}"
543
544 raise ProfileRequirementsNotMet(
545 f"SCE envelope is missing affixes required by the profile {profile}."
546 f" Affix presence:"
547 f" rpad={'missing' if rpad_missing else 'present'}"
548 f", time={'missing' if time_missing else 'present'}"
549 f", to={'missing' if to_missing else 'present'}"
550 f", from={'missing' if from_missing else 'present'}"
551 + custom_missing_string
552 )
553
554 # Move elements that are not explicitly forbidden from being encrypted from the
555 # content element to the stanza.
556 for child in list(content.elements()):
557 if (
558 child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
559 or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
560 ):
561 log.warning(
562 f"An element that MUST be transferred in plaintext was found in an"
563 f" SCE envelope: {child.toXml()}"
564 )
565 else:
566 # Remove the child from the content element
567 content.children.remove(child)
568
569 # Add the child to the stanza
570 stanza.addChild(child)
571
572 return SCEAffixValues(
573 rpad_value,
574 timestamp_value,
575 recipient_value,
576 sender_value,
577 custom_values
578 )