comparison sat/plugins/plugin_xep_0420.py @ 3877:00212260f659

plugin XEP-0420: Implementation of Stanza Content Encryption: Includes implementation of XEP-0082 (XMPP date and time profiles) and tests for both new plugins. Everything is type checked, linted, format checked and unit tested. Adds new dependency xmlschema. fix 377
author Syndace <me@syndace.dev>
date Tue, 23 Aug 2022 12:04:11 +0200
parents
children 8289ac1b34f4
comparison
equal deleted inserted replaced
3876:e3c1f4736ab2 3877:00212260f659
1 #!/usr/bin/env python3
2
3 # Libervia plugin for Stanza Content Encryption
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 # Type-check with `mypy --strict --disable-error-code no-untyped-call`
20 # Lint with `pylint`
21
22 from abc import ABC, abstractmethod
23 from datetime import datetime
24 import enum
25 import secrets
26 import string
27 from typing import Dict, Iterator, List, NamedTuple, Optional, Set, Tuple, Union, cast
28
29 from lxml import etree
30
31 from sat.core.constants import Const as C
32 from sat.core.i18n import D_
33 from sat.core.log import Logger, getLogger
34 from sat.core.sat_main import SAT
35 from sat.tools.xml_tools import ElementParser
36 from sat.plugins.plugin_xep_0033 import NS_ADDRESS
37 from sat.plugins.plugin_xep_0082 import XEP_0082
38 from sat.plugins.plugin_xep_0334 import NS_HINTS
39 from sat.plugins.plugin_xep_0359 import NS_SID
40 from sat.plugins.plugin_xep_0380 import NS_EME
41 from twisted.words.protocols.jabber import jid
42 from twisted.words.xish import domish
43
44
45 __all__ = [ # pylint: disable=unused-variable
46 "PLUGIN_INFO",
47 "NS_SCE",
48 "XEP_0420",
49 "ProfileRequirementsNotMet",
50 "AffixVerificationFailed",
51 "SCECustomAffix",
52 "SCEAffixPolicy",
53 "SCEProfile",
54 "SCEAffixValues"
55 ]
56
57
58 log = cast(Logger, getLogger(__name__))
59
60
61 PLUGIN_INFO = {
62 C.PI_NAME: "SCE",
63 C.PI_IMPORT_NAME: "XEP-0420",
64 C.PI_TYPE: "SEC",
65 C.PI_PROTOCOLS: [ "XEP-0420" ],
66 C.PI_DEPENDENCIES: [ "XEP-0334", "XEP-0082" ],
67 C.PI_RECOMMENDATIONS: [ "XEP-0045", "XEP-0033", "XEP-0359" ],
68 C.PI_MAIN: "XEP_0420",
69 C.PI_HANDLER: "no",
70 C.PI_DESCRIPTION: D_("Implementation of Stanza Content Encryption"),
71 }
72
73
74 NS_SCE = "urn:xmpp:sce:1"
75
76
77 class ProfileRequirementsNotMet(Exception):
78 """
79 Raised by :meth:`XEP_0420.unpack_stanza` in case the requirements formulated by the
80 profile are not met.
81 """
82
83
84 class AffixVerificationFailed(Exception):
85 """
86 Raised by :meth:`XEP_0420.unpack_stanza` in case of affix verification failure.
87 """
88
89
90 class SCECustomAffix(ABC):
91 """
92 Interface for custom affixes of SCE profiles.
93 """
94
95 @property
96 @abstractmethod
97 def element_name(self) -> str:
98 """
99 @return: The name of the affix's XML element.
100 """
101
102 @property
103 @abstractmethod
104 def element_schema(self) -> str:
105 """
106 @return: The XML schema definition of the affix element's XML structure, i.e. the
107 ``<xs:element/>`` schema element. This element will be referenced using
108 ``<xs:element ref="{element_name}"/>``.
109 """
110
111 @abstractmethod
112 def create(self, stanza: domish.Element) -> domish.Element:
113 """
114 @param stanza: The stanza element which has been processed by
115 :meth:`XEP_0420.pack_stanza`, i.e. all encryptable children have been removed
116 and only the root ``<message/>`` or ``<iq/>`` and unencryptable children
117 remain. Do not modify.
118 @return: An affix element to include in the envelope. The element must have the
119 name :attr:`element_name` and must validate using :attr:`element_schema`.
120 @raise ValueError: if the affix couldn't be built.
121 """
122
123 @abstractmethod
124 def verify(self, stanza: domish.Element, element: domish.Element) -> None:
125 """
126 @param stanza: The stanza element before being processed by
127 :meth:`XEP_0420.unpack_stanza`, i.e. all encryptable children have been
128 removed and only the root ``<message/>`` or ``<iq/>`` and unencryptable
129 children remain. Do not modify.
130 @param element: The affix element to verify.
131 @raise AffixVerificationFailed: on verification failure.
132 """
133
134
135 @enum.unique
136 class SCEAffixPolicy(enum.Enum):
137 """
138 Policy for the presence of an affix in an SCE envelope.
139 """
140
141 REQUIRED: str = "REQUIRED"
142 OPTIONAL: str = "OPTIONAL"
143 NOT_NEEDED: str = "NOT_NEEDED"
144
145
146 class SCEProfile(NamedTuple):
147 # pylint: disable=invalid-name
148 """
149 An SCE profile, i.e. the definition which affixes are required, optional or not needed
150 at all by an SCE-enabled encryption protocol.
151 """
152
153 rpad_policy: SCEAffixPolicy
154 time_policy: SCEAffixPolicy
155 to_policy: SCEAffixPolicy
156 from_policy: SCEAffixPolicy
157 custom_policies: Dict[SCECustomAffix, SCEAffixPolicy]
158
159
160 class SCEAffixValues(NamedTuple):
161 # pylint: disable=invalid-name
162 """
163 Structure returned by :meth:`XEP_0420.unpack_stanza` with the parsed/processes values
164 of all affixes included in the envelope. For custom affixes, the whole affix element
165 is returned.
166 """
167
168 rpad: Optional[str]
169 timestamp: Optional[datetime]
170 recipient: Optional[jid.JID]
171 sender: Optional[jid.JID]
172 custom: Dict[SCECustomAffix, domish.Element]
173
174
175 ENVELOPE_SCHEMA = """<?xml version="1.0" encoding="utf8"?>
176 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
177 targetNamespace="urn:xmpp:sce:1"
178 xmlns="urn:xmpp:sce:1">
179
180 <xs:element name="envelope">
181 <xs:complexType>
182 <xs:all>
183 <xs:element ref="content"/>
184 <xs:element ref="rpad" minOccurs="0"/>
185 <xs:element ref="time" minOccurs="0"/>
186 <xs:element ref="to" minOccurs="0"/>
187 <xs:element ref="from" minOccurs="0"/>
188 {custom_affix_references}
189 </xs:all>
190 </xs:complexType>
191 </xs:element>
192
193 <xs:element name="content">
194 <xs:complexType>
195 <xs:sequence>
196 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
197 </xs:sequence>
198 </xs:complexType>
199 </xs:element>
200
201 <xs:element name="rpad" type="xs:string"/>
202
203 <xs:element name="time">
204 <xs:complexType>
205 <xs:attribute name="stamp" type="xs:dateTime"/>
206 </xs:complexType>
207 </xs:element>
208
209 <xs:element name="to">
210 <xs:complexType>
211 <xs:attribute name="jid" type="xs:string"/>
212 </xs:complexType>
213 </xs:element>
214
215 <xs:element name="from">
216 <xs:complexType>
217 <xs:attribute name="jid" type="xs:string"/>
218 </xs:complexType>
219 </xs:element>
220
221 {custom_affix_definitions}
222 </xs:schema>
223 """
224
225
226 class XEP_0420: # pylint: disable=invalid-name
227 """
228 Implementation of XEP-0420: Stanza Content Encryption under namespace
229 ``urn:xmpp:sce:1``.
230
231 This is a passive plugin, i.e. it doesn't hook into any triggers to process stanzas
232 actively, but offers API for other plugins to use.
233 """
234
235 # Set of namespaces whose elements are never allowed to be transferred in an encrypted
236 # envelope.
237 MUST_BE_PLAINTEXT_NAMESPACES: Set[str] = {
238 NS_HINTS,
239 NS_SID, # TODO: Not sure whether this ban applies to both stanza-id and origin-id
240 NS_ADDRESS,
241 # Not part of the specification (yet), but just doesn't make sense in an encrypted
242 # envelope:
243 NS_EME
244 }
245
246 # Set of (namespace, element name) tuples that define elements which are never allowed
247 # to be transferred in an encrypted envelope. If all elements under a certain
248 # namespace are forbidden, the namespace can be added to
249 # :attr:`MUST_BE_PLAINTEXT_NAMESPACES` instead.
250 # Note: only full namespaces are forbidden by the spec for now, the following is for
251 # potential future use.
252 MUST_BE_PLAINTEXT_ELEMENTS: Set[Tuple[str, str]] = set()
253
254 def __init__(self, sat: SAT) -> None:
255 """
256 @param sat: The SAT instance.
257 """
258
259 @staticmethod
260 def pack_stanza(profile: SCEProfile, stanza: domish.Element) -> bytes:
261 """Pack a stanza according to Stanza Content Encryption.
262
263 Removes all elements from the stanza except for a few exceptions that explicitly
264 need to be transferred in plaintext, e.g. because they contain hints/instructions
265 for the server on how to process the stanza. Together with the affix elements as
266 requested by the profile, the removed elements are added to an envelope XML
267 structure that builds the plaintext to be encrypted by the SCE-enabled encryption
268 scheme. Optional affixes are always added to the structure, i.e. they are treated
269 by the packing code as if they were required.
270
271 Once built, the envelope structure is serialized to a byte string and returned for
272 the encryption scheme to encrypt and add to the stanza.
273
274 @param profile: The SCE profile, i.e. the definition of affixes to include in the
275 envelope.
276 @param stanza: The stanza to process. Will be modified by the call.
277 @return: The serialized envelope structure that builds the plaintext for the
278 encryption scheme to process.
279 @raise ValueError: if the <to/> or <from/> affixes are requested but the stanza
280 doesn't have the "to"/"from" attribute set to extract the value from. Can also
281 be raised by custom affixes.
282
283 @warning: It is up to the calling code to add a <store/> message processing hint
284 if applicable.
285 """
286
287 # Prepare the envelope and content elements
288 envelope = domish.Element((NS_SCE, "envelope"))
289 content = envelope.addElement((NS_SCE, "content"))
290
291 # Note the serialized byte size of the content element before adding any children
292 empty_content_byte_size = len(content.toXml().encode("utf-8"))
293
294 # Just for type safety
295 stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
296 content_children = cast(List[Union[domish.Element, str]], content.children)
297
298 # Move elements that are not explicitly forbidden from being encrypted from the
299 # stanza to the content element.
300 for child in list(cast(Iterator[domish.Element], stanza.elements())):
301 if (
302 child.uri not in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
303 and (child.uri, child.name) not in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
304 ):
305 # Remove the child from the stanza
306 stanza_children.remove(child)
307
308 # A namespace of ``None`` can be used on domish elements to inherit the
309 # namespace from the parent. When moving elements from the stanza root to
310 # the content element, however, we don't want elements to inherit the
311 # namespace of the content element. Thus, check for elements with ``None``
312 # for their namespace and set the namespace to jabber:client, which is the
313 # namespace of the parent element.
314 if child.uri is None:
315 child.uri = C.NS_CLIENT
316 child.defaultUri = C.NS_CLIENT
317
318 # Add the child with corrected namespaces to the content element
319 content_children.append(child)
320
321 # Add the affixes requested by the profile
322 if profile.rpad_policy is not SCEAffixPolicy.NOT_NEEDED:
323 # The specification defines the rpad affix to contain "[...] a randomly
324 # generated sequence of random length between 0 and 200 characters." This
325 # implementation differs a bit from the specification in that a minimum size
326 # other than 0 is chosen depending on the serialized size of the content
327 # element. This is to prevent the scenario where the encrypted content is
328 # short and the rpad is also randomly chosen to be short, which could allow
329 # guessing the content of a short message. To do so, the rpad length is first
330 # chosen to pad the content to at least 53 bytes, then afterwards another 0 to
331 # 200 bytes are added. Note that single-byte characters are used by this
332 # implementation, thus the number of characters equals the number of bytes.
333 content_byte_size = len(content.toXml().encode("utf-8"))
334 content_byte_size_diff = content_byte_size - empty_content_byte_size
335 rpad_length = max(0, 53 - content_byte_size_diff) + secrets.randbelow(201)
336 rpad_content = "".join(
337 secrets.choice(string.digits + string.ascii_letters + string.punctuation)
338 for __
339 in range(rpad_length)
340 )
341 envelope.addElement((NS_SCE, "rpad"), content=rpad_content)
342
343 if profile.time_policy is not SCEAffixPolicy.NOT_NEEDED:
344 time_element = envelope.addElement((NS_SCE, "time"))
345 time_element["stamp"] = XEP_0082.format_datetime()
346
347 if profile.to_policy is not SCEAffixPolicy.NOT_NEEDED:
348 recipient = cast(Optional[str], stanza.getAttribute("to", None))
349 if recipient is None:
350 raise ValueError(
351 "<to/> affix requested, but stanza doesn't have the 'to' attribute"
352 " set."
353 )
354
355 to_element = envelope.addElement((NS_SCE, "to"))
356 to_element["jid"] = jid.JID(recipient).userhost()
357
358 if profile.from_policy is not SCEAffixPolicy.NOT_NEEDED:
359 sender = cast(Optional[str], stanza.getAttribute("from", None))
360 if sender is None:
361 raise ValueError(
362 "<from/> affix requested, but stanza doesn't have the 'from'"
363 " attribute set."
364 )
365
366 from_element = envelope.addElement((NS_SCE, "from"))
367 from_element["jid"] = jid.JID(sender).userhost()
368
369 for affix, policy in profile.custom_policies.items():
370 if policy is not SCEAffixPolicy.NOT_NEEDED:
371 envelope.addChild(affix.create(stanza))
372
373 return cast(str, envelope.toXml()).encode("utf-8")
374
375 @staticmethod
376 def unpack_stanza(
377 profile: SCEProfile,
378 stanza: domish.Element,
379 envelope_serialized: bytes
380 ) -> SCEAffixValues:
381 """Unpack a stanza packed according to Stanza Content Encryption.
382
383 Parses the serialized envelope as XML, verifies included affixes and makes sure
384 the requirements of the profile are met, and restores the stanza by moving
385 decrypted elements from the envelope back to the stanza top level.
386
387 @param profile: The SCE profile, i.e. the definition of affixes that have to/may
388 be included in the envelope.
389 @param stanza: The stanza to process. Will be modified by the call.
390 @param envelope_serialized: The serialized envelope, i.e. the plaintext produced
391 by the decryption scheme utilizing SCE.
392 @return: The parsed and processed values of all affixes that were present on the
393 envelope, notably including the timestamp.
394 @raise ValueError: if the serialized envelope element is malformed.
395 @raise ProfileRequirementsNotMet: if one or more affixes required by the profile
396 are missing from the envelope.
397 @raise AffixVerificationFailed: if an affix included in the envelope fails to
398 validate. It doesn't matter whether the affix is required by the profile or
399 not, all affixes included in the envelope are validated and cause this
400 exception to be raised on failure.
401
402 @warning: It is up to the calling code to verify the timestamp, if returned, since
403 the requirements on the timestamp may vary between SCE-enabled protocols.
404 """
405
406 try:
407 envelope_serialized_string = envelope_serialized.decode("utf-8")
408 except UnicodeError as e:
409 raise ValueError("Serialized envelope can't bare parsed as utf-8.") from e
410
411 custom_affixes = set(profile.custom_policies.keys())
412
413 # Make sure the envelope adheres to the schema
414 parser = etree.XMLParser(schema=etree.XMLSchema(etree.XML(ENVELOPE_SCHEMA.format(
415 custom_affix_references="".join(
416 f'<xs:element ref="{custom_affix.element_name}" minOccurs="0"/>'
417 for custom_affix
418 in custom_affixes
419 ),
420 custom_affix_definitions="".join(
421 custom_affix.element_schema
422 for custom_affix
423 in custom_affixes
424 )
425 ).encode("utf-8"))))
426
427 try:
428 etree.fromstring(envelope_serialized_string, parser)
429 except etree.XMLSyntaxError as e:
430 raise ValueError("Serialized envelope doesn't pass schema validation.") from e
431
432 # Prepare the envelope and content elements
433 envelope = cast(domish.Element, ElementParser()(envelope_serialized_string))
434 content = cast(domish.Element, next(envelope.elements(NS_SCE, "content")))
435
436 # Verify the affixes
437 rpad_element = cast(
438 Optional[domish.Element],
439 next(envelope.elements(NS_SCE, "rpad"), None)
440 )
441 time_element = cast(
442 Optional[domish.Element],
443 next(envelope.elements(NS_SCE, "time"), None)
444 )
445 to_element = cast(
446 Optional[domish.Element],
447 next(envelope.elements(NS_SCE, "to"), None)
448 )
449 from_element = cast(
450 Optional[domish.Element],
451 next(envelope.elements(NS_SCE, "from"), None)
452 )
453
454 # The rpad doesn't need verification.
455 rpad_value = None if rpad_element is None else str(rpad_element)
456
457 # The time affix isn't verified other than that the timestamp is parseable.
458 try:
459 timestamp_value = None if time_element is None else \
460 XEP_0082.parse_datetime(time_element["stamp"])
461 except ValueError as e:
462 raise AffixVerificationFailed("Malformed time affix") from e
463
464 # The to affix is verified by comparing the to attribute of the stanza with the
465 # JID referenced by the affix. Note that only bare JIDs are compared as per the
466 # specification.
467 recipient_value: Optional[jid.JID] = None
468 if to_element is not None:
469 recipient_value = jid.JID(to_element["jid"])
470
471 recipient_actual = cast(Optional[str], stanza.getAttribute("to", None))
472 if recipient_actual is None:
473 raise AffixVerificationFailed(
474 "'To' affix is included in the envelope, but the stanza is lacking a"
475 " 'to' attribute to compare the value to."
476 )
477
478 recipient_actual_bare_jid = jid.JID(recipient_actual).userhost()
479 recipient_target_bare_jid = recipient_value.userhost()
480
481 if recipient_actual_bare_jid != recipient_target_bare_jid:
482 raise AffixVerificationFailed(
483 f"Mismatch between actual and target recipient bare JIDs:"
484 f" {recipient_actual_bare_jid} vs {recipient_target_bare_jid}."
485 )
486
487 # The from affix is verified by comparing the from attribute of the stanza with
488 # the JID referenced by the affix. Note that only bare JIDs are compared as per
489 # the specification.
490 sender_value: Optional[jid.JID] = None
491 if from_element is not None:
492 sender_value = jid.JID(from_element["jid"])
493
494 sender_actual = cast(Optional[str], stanza.getAttribute("from", None))
495 if sender_actual is None:
496 raise AffixVerificationFailed(
497 "'From' affix is included in the envelope, but the stanza is lacking"
498 " a 'from' attribute to compare the value to."
499 )
500
501 sender_actual_bare_jid = jid.JID(sender_actual).userhost()
502 sender_target_bare_jid = sender_value.userhost()
503
504 if sender_actual_bare_jid != sender_target_bare_jid:
505 raise AffixVerificationFailed(
506 f"Mismatch between actual and target sender bare JIDs:"
507 f" {sender_actual_bare_jid} vs {sender_target_bare_jid}."
508 )
509
510 # Find and verify custom affixes
511 custom_values: Dict[SCECustomAffix, domish.Element] = {}
512 for affix in custom_affixes:
513 element_name = affix.element_name
514 element = cast(
515 Optional[domish.Element],
516 next(envelope.elements(NS_SCE, element_name), None)
517 )
518 if element is not None:
519 affix.verify(stanza, element)
520 custom_values[affix] = element
521
522 # Check whether all affixes required by the profile are present
523 rpad_missing = \
524 profile.rpad_policy is SCEAffixPolicy.REQUIRED and rpad_element is None
525 time_missing = \
526 profile.time_policy is SCEAffixPolicy.REQUIRED and time_element is None
527 to_missing = \
528 profile.to_policy is SCEAffixPolicy.REQUIRED and to_element is None
529 from_missing = \
530 profile.from_policy is SCEAffixPolicy.REQUIRED and from_element is None
531 custom_missing = any(
532 affix not in custom_values
533 for affix, policy
534 in profile.custom_policies.items()
535 if policy is SCEAffixPolicy.REQUIRED
536 )
537
538 if rpad_missing or time_missing or to_missing or from_missing or custom_missing:
539 custom_missing_string = ""
540 for custom_affix in custom_affixes:
541 value = "present" if custom_affix in custom_values else "missing"
542 custom_missing_string += f", [custom]{custom_affix.element_name}={value}"
543
544 raise ProfileRequirementsNotMet(
545 f"SCE envelope is missing affixes required by the profile {profile}."
546 f" Affix presence:"
547 f" rpad={'missing' if rpad_missing else 'present'}"
548 f", time={'missing' if time_missing else 'present'}"
549 f", to={'missing' if to_missing else 'present'}"
550 f", from={'missing' if from_missing else 'present'}"
551 + custom_missing_string
552 )
553
554 # Just for type safety
555 content_children = cast(List[Union[domish.Element, str]], content.children)
556 stanza_children = cast(List[Union[domish.Element, str]], stanza.children)
557
558 # Move elements that are not explicitly forbidden from being encrypted from the
559 # content element to the stanza.
560 for child in list(cast(Iterator[domish.Element], content.elements())):
561 if (
562 child.uri in XEP_0420.MUST_BE_PLAINTEXT_NAMESPACES
563 or (child.uri, child.name) in XEP_0420.MUST_BE_PLAINTEXT_ELEMENTS
564 ):
565 log.warning(
566 f"An element that MUST be transferred in plaintext was found in an"
567 f" SCE envelope: {child.toXml()}"
568 )
569 else:
570 # Remove the child from the content element
571 content_children.remove(child)
572
573 # Add the child to the stanza
574 stanza_children.append(child)
575
576 return SCEAffixValues(
577 rpad_value,
578 timestamp_value,
579 recipient_value,
580 sender_value,
581 custom_values
582 )