Mercurial > libervia-backend
changeset 4334:111dce64dcb5
plugins XEP-0300, XEP-0446, XEP-0447, XEP0448 and others: Refactoring to use Pydantic:
Pydantic models are used more and more in Libervia, for the bridge API, and also to
convert `domish.Element` to internal representation.
Type hints have also been added in many places.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:12:38 +0100 |
parents | e94799a0908f |
children | 430d5d99a740 |
files | libervia/backend/plugins/plugin_misc_download.py libervia/backend/plugins/plugin_misc_text_syntaxes.py libervia/backend/plugins/plugin_misc_uri_finder.py libervia/backend/plugins/plugin_xep_0048.py libervia/backend/plugins/plugin_xep_0060.py libervia/backend/plugins/plugin_xep_0103.py libervia/backend/plugins/plugin_xep_0234.py libervia/backend/plugins/plugin_xep_0300.py libervia/backend/plugins/plugin_xep_0373.py libervia/backend/plugins/plugin_xep_0446.py libervia/backend/plugins/plugin_xep_0447.py libervia/backend/plugins/plugin_xep_0448.py |
diffstat | 12 files changed, 694 insertions(+), 324 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_misc_download.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_misc_download.py Tue Dec 03 00:12:38 2024 +0100 @@ -277,12 +277,12 @@ callback: Callable[ [ SatXMPPEntity, - Dict[str, Any], - Dict[str, Any], + dict[str, Any], + dict[str, Any], Union[str, Path], - Dict[str, Any], + dict[str, Any], ], - Tuple[str, defer.Deferred], + tuple[str, defer.Deferred], ], ) -> None: """Register a handler to manage a type of attachment source @@ -295,11 +295,11 @@ source used. The callabke must return a tuple with: - progress ID - - a Deferred which fire whant the file is fully downloaded + - a Deferred which fire when the file is fully downloaded """ - if source_type is self._download_callbacks: + if source_type in self._download_callbacks: raise exceptions.ConflictError( - f"The is already a callback registered for source type {source_type!r}" + f"There is already a callback registered for source type {source_type!r}" ) self._download_callbacks[source_type] = callback
--- a/libervia/backend/plugins/plugin_misc_text_syntaxes.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_misc_text_syntaxes.py Tue Dec 03 00:12:38 2024 +0100 @@ -43,7 +43,7 @@ except ImportError: raise exceptions.MissingModule( 'Missing module "lxml_html_clean". Please download and install it from ' - 'http://lxml.de/. Note that this is a separate package to install in addition ' + "http://lxml.de/. Note that this is a separate package to install in addition " 'to "lxml".' )
--- a/libervia/backend/plugins/plugin_misc_uri_finder.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_misc_uri_finder.py Tue Dec 03 00:12:38 2024 +0100 @@ -74,12 +74,12 @@ @return (dict[unicode, unicode]): map from key to found uri """ keys_re = "|".join(keys) - label_re = r'\"(?P<label>[^\"]+)\"' + label_re = r"\"(?P<label>[^\"]+)\"" uri_re = re.compile( r"(?P<key>{keys_re})[ :]? +(?P<uri>xmpp:\S+)(?:.*use {label_re} label)?".format( keys_re=keys_re, label_re=label_re ), - re.IGNORECASE + re.IGNORECASE, ) path = os.path.normpath(path) if not os.path.isdir(path) or not os.path.isabs(path):
--- a/libervia/backend/plugins/plugin_xep_0048.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0048.py Tue Dec 03 00:12:38 2024 +0100 @@ -422,10 +422,7 @@ raise NotImplementedError def bookmarks_list( - self, - type_: str, - storage_location: str, - profile_key: str = C.PROF_KEY_NONE + self, type_: str, storage_location: str, profile_key: str = C.PROF_KEY_NONE ) -> defer.Deferred[dict]: """Return stored bookmarks
--- a/libervia/backend/plugins/plugin_xep_0060.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0060.py Tue Dec 03 00:12:38 2024 +0100 @@ -711,10 +711,10 @@ client: SatXMPPEntity, service: jid.JID, nodeIdentifier: str, - items: list[pubsub.Item]|None = None, - options: dict|None = None, - sender: jid.JID|None = None, - extra: dict[str, Any]|None = None, + items: list[pubsub.Item] | None = None, + options: dict | None = None, + sender: jid.JID | None = None, + extra: dict[str, Any] | None = None, ) -> domish.Element: """Publish pubsub items @@ -807,13 +807,13 @@ async def get_items( self, client: SatXMPPEntity, - service: jid.JID|None, + service: jid.JID | None, node: str, - max_items: int|None = None, - item_ids: list[str]|None = None, - sub_id: str|None = None, - rsm_request: rsm.RSMRequest|None = None, - extra: dict|None = None, + max_items: int | None = None, + item_ids: list[str] | None = None, + sub_id: str | None = None, + rsm_request: rsm.RSMRequest | None = None, + extra: dict | None = None, ) -> tuple[list[domish.Element], dict]: """Retrieve pubsub items from a node.
--- a/libervia/backend/plugins/plugin_xep_0103.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0103.py Tue Dec 03 00:12:38 2024 +0100 @@ -15,18 +15,18 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import Dict, Any +from typing import Self +from pydantic import BaseModel, Field from twisted.words.xish import domish +from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger -from libervia.backend.core import exceptions log = getLogger(__name__) - PLUGIN_INFO = { C.PI_NAME: "URL Address Information", C.PI_IMPORT_NAME: "XEP-0103", @@ -37,45 +37,87 @@ C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Implementation of XEP-0103 (URL Address Information)"""), } +NS_URL_DATA = "http://jabber.org/protocol/url-data" -NS_URL_DATA = "http://jabber.org/protocol/url-data" + +class Desc(BaseModel): + """ + Model for the <desc/> element. + """ + + content: str + xml_lang: str | None = None + + +class URLData(BaseModel): + """ + Model for the <url-data/> element. + """ + + url: str + desc: list[Desc] = Field(default_factory=list) + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + """Create a URLData instance from a <url-data> element or its parent. + + @param url_data_elt: The <url-data> element or a parent element. + @return: URLData instance. + @raise exceptions.NotFound: If the <url-data> element is not found. + """ + if element.uri != NS_URL_DATA or element.name != "url-data": + child_url_data_elt = next(element.elements(NS_URL_DATA, "url-data"), None) + if child_url_data_elt is None: + raise exceptions.NotFound("<url-data> element not found") + else: + element = child_url_data_elt + kwargs = { + "url": element["target"], + "desc": [ + Desc(content=str(desc_elt), xml_lang=desc_elt.getAttribute("xml:lang")) + for desc_elt in element.elements(NS_URL_DATA, "desc") + ], + } + return cls(**kwargs) + + def to_element(self) -> domish.Element: + """Build the <url-data> element from this instance's data. + + @return: <url-data> element. + """ + url_data_elt = domish.Element((NS_URL_DATA, "url-data")) + url_data_elt["target"] = self.url + for desc in self.desc: + desc_elt = url_data_elt.addElement((NS_URL_DATA, "desc")) + if desc.xml_lang: + desc_elt["xml:lang"] = desc.xml_lang + desc_elt.addContent(desc.content) + return url_data_elt class XEP_0103: namespace = NS_URL_DATA def __init__(self, host): - log.info(_("XEP-0103 (URL Address Information) plugin initialization")) + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") host.register_namespace("url-data", NS_URL_DATA) - def get_url_data_elt(self, url: str, **kwargs) -> domish.Element: + def generate_url_data(self, url: str, **kwargs) -> URLData: """Generate the element describing the URL @param url: URL to use @param extra: extra metadata describing how to access the URL @return: ``<url-data/>`` element """ - url_data_elt = domish.Element((NS_URL_DATA, "url-data")) - url_data_elt["target"] = url - return url_data_elt + url_data = URLData(url=url, **kwargs) + return url_data - def parse_url_data_elt(self, url_data_elt: domish.Element) -> Dict[str, Any]: + def parse_url_data_elt(self, url_data_elt: domish.Element) -> URLData: """Parse <url-data/> element @param url_data_elt: <url-data/> element a parent element can also be used - @return: url-data data. It's a dict whose keys correspond to - [get_url_data_elt] parameters + @return: URLData instance @raise exceptions.NotFound: no <url-data/> element has been found """ - if url_data_elt.name != "url-data": - try: - url_data_elt = next(url_data_elt.elements(NS_URL_DATA, "url-data")) - except StopIteration: - raise exceptions.NotFound - try: - data: Dict[str, Any] = {"url": url_data_elt["target"]} - except KeyError: - raise ValueError(f'"target" attribute is missing: {url_data_elt.toXml}') - - return data + return URLData.from_element(url_data_elt)
--- a/libervia/backend/plugins/plugin_xep_0234.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0234.py Tue Dec 03 00:12:38 2024 +0100 @@ -437,14 +437,14 @@ def _file_jingle_request( self, - peer_jid, - filepath, - name="", - file_hash="", - hash_algo="", - extra=None, - profile=C.PROF_KEY_NONE, - ): + peer_jid: str, + filepath: str, + name: str = "", + file_hash: str = "", + hash_algo: str = "", + extra: dict | None = None, + profile: str = C.PROF_KEY_NONE, + ) -> defer.Deferred[str]: client = self.host.get_client(profile) return defer.ensureDeferred( self.file_jingle_request( @@ -460,21 +460,22 @@ async def file_jingle_request( self, - client, - peer_jid, - filepath, - name=None, - file_hash=None, - hash_algo=None, - extra=None, - ): + client: SatXMPPEntity, + peer_jid: jid.JID, + filepath: str, + name: str | None = None, + file_hash: str | None = None, + hash_algo: str | None = None, + extra: dict | None = None, + ) -> str: """Request a file using jingle file transfer - @param peer_jid(jid.JID): destinee jid - @param filepath(str): absolute path where the file will be downloaded - @param name(unicode, None): name of the file - @param file_hash(unicode, None): hash of the file - @return (D(unicode)): progress id + @param client: Profile session. + @param peer_jid: destinee jid + @param filepath: absolute path where the file will be downloaded + @param name: name of the file + @param file_hash: hash of the file + @return: progress id """ progress_id_d = defer.Deferred() if extra is None:
--- a/libervia/backend/plugins/plugin_xep_0300.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0300.py Tue Dec 03 00:12:38 2024 +0100 @@ -1,8 +1,7 @@ #!/usr/bin/env python3 - -# SAT plugin for Hash functions (XEP-0300) -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) +# Libervia plugin for Hash functions (XEP-0300) +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -17,13 +16,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import Tuple import base64 from collections import OrderedDict import hashlib +from typing import BinaryIO, Callable, Self, TYPE_CHECKING +from _hashlib import HASH +from pydantic import BaseModel, Field from twisted.internet import threads from twisted.internet import defer +from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel @@ -31,9 +33,13 @@ from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger +if TYPE_CHECKING: + from libervia.backend.core.main import LiberviaBackend + log = getLogger(__name__) @@ -54,9 +60,91 @@ ALGO_DEFAULT = "sha-256" -class XEP_0300(object): +class Hash(BaseModel): + """ + Model for hash data. + """ + + algo: str = Field(description="The algorithm used for hashing.") + hash_: str = Field(min_length=16, description="The base64-encoded hash value.") + + @classmethod + def from_element(cls, hash_elt: domish.Element) -> Self: + """ + Create a HashModel instance from a <hash> element. + + @param hash_elt: The <hash> element. + @return: HashModel instance. + @raise exceptions.NotFound: If the <hash> element is not found. + """ + if hash_elt.uri != NS_HASHES or hash_elt.name != "hash": + raise exceptions.NotFound("<hash> element not found") + algo = hash_elt.getAttribute("algo") + hash_value = str(hash_elt) + return cls(algo=algo, hash_=hash_value) + + @classmethod + def from_parent(cls, parent_elt: domish.Element) -> list[Self]: + """Find and return child <hash> element in given parent. + + @param parent_elt: Element which may content child <hash> elements. + @return: list of Hash corresponding to found elements + """ + return [ + cls.from_element(hash_elt) + for hash_elt in parent_elt.elements(NS_HASHES, "hash") + ] + + def to_element(self) -> domish.Element: + """Build the <hash> element from this instance's data. + + @return: <hash> element. + """ + hash_elt = domish.Element((NS_HASHES, "hash")) + hash_elt["algo"] = self.algo + hash_elt.addContent(self.hash_) + return hash_elt + + +class HashUsed(BaseModel): + """ + Model for hash-used data. + """ + + algo: str = Field(description="The algorithm used for hashing.") + + @classmethod + def from_element(cls, hash_used_elt: domish.Element) -> Self: + """Create a HashUsedModel instance from a <hash-used> element. + + @param hash_used_elt: The <hash-used> element. + @return: HashUsedModel instance. + @raise exceptions.NotFound: If the <hash-used> element is not found. + """ + if hash_used_elt.uri != NS_HASHES or hash_used_elt.name != "hash-used": + child_hash_used_elt = next( + hash_used_elt.elements(NS_HASHES, "hash-used"), None + ) + if child_hash_used_elt is None: + raise exceptions.NotFound("<hash-used> element not found") + else: + hash_used_elt = child_hash_used_elt + algo = hash_used_elt.getAttribute("algo") + return cls(algo=algo) + + def to_element(self) -> domish.Element: + """Build the <hash-used> element from this instance's data. + + @return: <hash-used> element. + """ + hash_used_elt = domish.Element((NS_HASHES, "hash-used")) + hash_used_elt["algo"] = self.algo + return hash_used_elt + + +class XEP_0300: # TODO: add blake after moving to Python 3 - ALGOS = OrderedDict( + ALGOS: OrderedDict[str, Callable] = OrderedDict( ( ("md5", hashlib.md5), ("sha-1", hashlib.sha1), @@ -66,38 +154,38 @@ ) ALGO_DEFAULT = ALGO_DEFAULT - def __init__(self, host): + def __init__(self, host: "LiberviaBackend"): log.info(_("plugin Hashes initialization")) host.register_namespace("hashes", NS_HASHES) + self.host = host - def get_handler(self, client): + def get_handler(self, client: SatXMPPEntity) -> XMPPHandler: return XEP_0300_handler() - def get_hasher(self, algo=ALGO_DEFAULT): + def get_hasher(self, algo: str = ALGO_DEFAULT) -> Callable: """Return hasher instance - @param algo(unicode): one of the XEP_300.ALGOS keys - @return (hash object): same object s in hashlib. - update method need to be called for each chunh - diget or hexdigest can be used at the end + @param algo: one of the XEP_300.ALGOS keys + @return: same object s in hashlib. + update method need to be called for each chunk + digest or hexdigest can be used at the end """ return self.ALGOS[algo]() - def get_default_algo(self): + def get_default_algo(self) -> str: return ALGO_DEFAULT - @defer.inlineCallbacks - def get_best_peer_algo(self, to_jid, profile): - """Return the best available hashing algorith of other peer + async def get_best_peer_algo(self, to_jid: jid.JID, profile: str) -> str | None: + """Return the best available hashing algorithm of other peer - @param to_jid(jid.JID): peer jid - @parm profile: %(doc_profile)s - @return (D(unicode, None)): best available algorithm, + @param to_jid: peer jid + @param profile: %(doc_profile)s + @return: best available algorithm, or None if hashing is not possible """ client = self.host.get_client(profile) for algo in reversed(XEP_0300.ALGOS): - has_feature = yield self.host.hasFeature( + has_feature = await self.host.hasFeature( client, NS_HASHES_FUNCTIONS.format(algo), to_jid ) if has_feature: @@ -106,15 +194,15 @@ jid=to_jid.full(), algo=algo ) ) - defer.returnValue(algo) + return algo - def _calculate_hash_blocking(self, file_obj, hasher): + def _calculate_hash_blocking(self, file_obj: BinaryIO, hasher: HASH) -> str: """Calculate hash in a blocking way /!\\ blocking method, please use calculate_hash instead - @param file_obj(file): a file-like object - @param hasher(hash object): the method to call to initialise hash object - @return (str): the hex digest of the hash + @param file_obj: a file-like object + @param hasher: the method to call to initialise hash object + @return: the hex digest of the hash """ while True: buf = file_obj.read(BUFFER_SIZE) @@ -123,64 +211,49 @@ hasher.update(buf) return hasher.hexdigest() - def calculate_hash(self, file_obj, hasher): + def calculate_hash(self, file_obj: BinaryIO, hasher: HASH) -> defer.Deferred[str]: return threads.deferToThread(self._calculate_hash_blocking, file_obj, hasher) - def calculate_hash_elt(self, file_obj=None, algo=ALGO_DEFAULT): + async def calculate_hash_elt( + self, file_obj: BinaryIO, algo: str = ALGO_DEFAULT + ) -> domish.Element: """Compute hash and build hash element - @param file_obj(file, None): file-like object to use to calculate the hash - @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS - @return (D(domish.Element)): hash element + @param file_obj: file-like object to use to calculate the hash + @param algo: algorithm to use, must be a key of XEP_0300.ALGOS + @return: hash element """ - - def hash_calculated(hash_): - return self.build_hash_elt(hash_, algo) - hasher = self.get_hasher(algo) - hash_d = self.calculate_hash(file_obj, hasher) - hash_d.addCallback(hash_calculated) - return hash_d + hash_ = await self.calculate_hash(file_obj, hasher) + return self.build_hash_elt(hash_, algo) - def build_hash_used_elt(self, algo=ALGO_DEFAULT): - hash_used_elt = domish.Element((NS_HASHES, "hash-used")) - hash_used_elt["algo"] = algo - return hash_used_elt + def build_hash_used_elt(self, algo: str = ALGO_DEFAULT) -> domish.Element: + hash_used_model = HashUsed(algo=algo) + return hash_used_model.to_element() - def parse_hash_used_elt(self, parent): + def parse_hash_used_elt(self, parent_elt: domish.Element) -> str: """Find and parse a hash-used element - @param (domish.Element): parent of <hash/> element - @return (unicode): hash algorithm used + @param parent: parent of <hash-used/> element + @return: hash algorithm used @raise exceptions.NotFound: the element is not present @raise exceptions.DataError: the element is invalid """ - try: - hash_used_elt = next(parent.elements(NS_HASHES, "hash-used")) - except StopIteration: - raise exceptions.NotFound - algo = hash_used_elt["algo"] - if not algo: - raise exceptions.DataError - return algo + hash_used_model = HashUsed.from_element(parent_elt) + return hash_used_model.algo - def build_hash_elt(self, hash_, algo=ALGO_DEFAULT): + def build_hash_elt(self, hash_hex: str, algo: str = ALGO_DEFAULT) -> domish.Element: """Compute hash and build hash element - @param hash_(str): hash to use - @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS - @return (domish.Element): computed hash + @param hash_: Hexadecimal representation of hash to use. + @param algo: Algorithm to use, must be a key of XEP_0300.ALGOS. + @return: <hash> element """ - assert hash_ - assert algo - hash_elt = domish.Element((NS_HASHES, "hash")) - if hash_ is not None: - b64_hash = base64.b64encode(hash_.encode("utf-8")).decode("utf-8") - hash_elt.addContent(b64_hash) - hash_elt["algo"] = algo - return hash_elt + b64_hash = base64.b64encode(hash_hex.encode()).decode() + hash_model = Hash(algo=algo, hash_=b64_hash) + return hash_model.to_element() - def parse_hash_elt(self, parent: domish.Element) -> Tuple[str, bytes]: + def parse_hash_elt(self, parent: domish.Element) -> tuple[str, str]: """Find and parse a hash element if multiple elements are found, the strongest managed one is returned @@ -195,7 +268,8 @@ best_algo = None best_value = None for hash_elt in parent.elements(NS_HASHES, "hash"): - algo = hash_elt.getAttribute("algo") + hash_model = Hash.from_element(hash_elt) + algo = hash_model.algo try: idx = algos.index(algo) except ValueError: @@ -205,7 +279,7 @@ if best_algo is None or algos.index(best_algo) < idx: best_algo = algo - best_value = base64.b64decode(str(hash_elt)).decode("utf-8") + best_value = base64.b64decode(hash_model.hash_).decode() if not hash_elt: raise exceptions.NotFound @@ -217,12 +291,16 @@ @implementer(iwokkel.IDisco) class XEP_0300_handler(XMPPHandler): - def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + def getDiscoInfo( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoFeature]: hash_functions_names = [ disco.DiscoFeature(NS_HASHES_FUNCTIONS.format(algo)) for algo in XEP_0300.ALGOS ] return [disco.DiscoFeature(NS_HASHES)] + hash_functions_names - def getDiscoItems(self, requestor, target, nodeIdentifier=""): + def getDiscoItems( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoItem]: return []
--- a/libervia/backend/plugins/plugin_xep_0373.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0373.py Tue Dec 03 00:12:38 2024 +0100 @@ -787,23 +787,17 @@ algorithm=self.ALGORITHM, expires=False, certify=True, - force=True + force=True, ) primary_key_obj = c.get_key(primary_create_result.fpr, secret=True) c.create_subkey( - primary_key_obj, - algorithm=self.ALGORITHM, - expires=False, - sign=True + primary_key_obj, algorithm=self.ALGORITHM, expires=False, sign=True ) c.create_subkey( - primary_key_obj, - algorithm=self.ALGORITHM, - expires=False, - encrypt=True + primary_key_obj, algorithm=self.ALGORITHM, expires=False, encrypt=True ) except gpg.errors.GPGMEError as e: raise GPGProviderError("Internal GPGME error") from e
--- a/libervia/backend/plugins/plugin_xep_0446.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0446.py Tue Dec 03 00:12:38 2024 +0100 @@ -15,16 +15,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from logging import exception -from typing import Optional, Union, Tuple, Dict, Any from pathlib import Path +from typing import Self, cast +from pydantic import BaseModel, Field from twisted.words.xish import domish +from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger -from libervia.backend.core import exceptions +from libervia.backend.plugins.plugin_xep_0300 import XEP_0300 from libervia.backend.tools import utils log = getLogger(__name__) @@ -45,26 +46,171 @@ NS_FILE_METADATA = "urn:xmpp:file:metadata:0" +class FileMetadata(BaseModel): + """ + Model for file metadata. + """ + + name: str | None = Field(None, description="Name of the file.") + media_type: str | None = Field(None, description="Media type of the file.") + desc: str | None = Field(None, description="Description of the file.") + size: int | None = Field(None, description="Size of the file in bytes.") + file_hash: tuple[str, str] | None = Field( + None, description="File hash as a tuple of (algo, hash)." + ) + date: float | int | None = Field( + None, description="Timestamp of the last modification datetime." + ) + width: int | None = Field(None, description="Image or video width in pixels.") + height: int | None = Field(None, description="Image or video height in pixels.") + length: int | None = Field(None, description="Video or audio length in milliseconds.") + thumbnail: str | None = Field(None, description="URL to a thumbnail.") + + _hash: XEP_0300 | None = None + + @classmethod + def from_element(cls, file_metadata_elt: domish.Element) -> Self: + """Create a FileMetadata instance from a <file> element or its parent. + + @param file_metadata_elt: The <file> element or a parent element. + @return: FileMetadata instance. + @raise exceptions.NotFound: If the <file> element is not found. + """ + assert cls._hash is not None, "_hash attribute is not set" + + if file_metadata_elt.uri != NS_FILE_METADATA or file_metadata_elt.name != "file": + child_file_metadata_elt = next( + file_metadata_elt.elements(NS_FILE_METADATA, "file"), None + ) + if child_file_metadata_elt is None: + raise exceptions.NotFound("<file> element not found") + else: + file_metadata_elt = child_file_metadata_elt + + kwargs = {} + for key in ( + "name", + "media_type", + "desc", + "size", + "date", + "width", + "height", + "length", + ): + elt = next(file_metadata_elt.elements(NS_FILE_METADATA, key), None) + if elt is not None: + if key in ("name", "media_type", "desc"): + content = str(elt) + if key == "name": + content = Path(content).name + kwargs[key] = content + elif key in ("size", "width", "height", "length"): + kwargs[key] = int(str(elt)) + elif key == "date": + kwargs[key] = utils.parse_xmpp_date(str(elt)) + + hash_elt = next(file_metadata_elt.elements(NS_FILE_METADATA, "hash"), None) + if hash_elt: + try: + algo, hash_ = cls._hash.parse_hash_elt(hash_elt) + except exceptions.NotFound: + pass + except exceptions.DataError: + from libervia.backend.tools.xml_tools import p_fmt_elt + + log.warning(f"invalid <hash/> element:\n{p_fmt_elt(file_metadata_elt)}") + else: + kwargs["file_hash"] = (algo, hash_) + + return cls(**kwargs) + + def to_element(self) -> domish.Element: + """Build the <file> element from this instance's data. + + @return: <file> element. + """ + assert self._hash is not None, "_hash attribute is not set" + + file_elt = domish.Element((NS_FILE_METADATA, "file")) + + for key, value in ( + ("name", self.name), + ("media-type", self.media_type), + ("desc", self.desc), + ("size", self.size), + ("width", self.width), + ("height", self.height), + ("length", self.length), + ): + if value is not None: + file_elt.addElement(key, content=str(value)) + + if self.file_hash is not None: + hash_algo, hash_ = self.file_hash + file_elt.addChild(self._hash.build_hash_elt(hash_, hash_algo)) + + if self.date is not None: + file_elt.addElement("date", content=utils.xmpp_date(self.date)) + + if self.thumbnail is not None: + log.warning("thumbnail is not implemented yet") + + return file_elt + + @classmethod + def from_filedata_dict(cls, file_data: dict) -> Self: + """Create an instance of FileMetadata from data dict as returned by ``memory``. + + @param data: A filedata dict as returned by ``memory.get_files`` + @return: An instance of FileMetadata. + """ + + # Extracting relevant fields + name = file_data["name"] + media_type = f'{file_data["media_type"]}/{file_data["media_subtype"]}' + desc = None # TODO + size = file_data["size"] + file_hash = (file_data["hash_algo"], file_data["file_hash"]) + date = file_data.get("modified") or file_data["created"] + width = None # TODO + height = None # TODO + length = None # TODO + thumbnail = None # TODO + + return cls( + name=name, + media_type=media_type, + desc=desc, + size=size, + file_hash=file_hash, + date=date, + width=width, + height=height, + length=length, + thumbnail=thumbnail, + ) + + class XEP_0446: - def __init__(self, host): log.info(_("XEP-0446 (File Metadata Element) plugin initialization")) host.register_namespace("file-metadata", NS_FILE_METADATA) - self._hash = host.plugins["XEP-0300"] + FileMetadata._hash = cast(XEP_0300, host.plugins["XEP-0300"]) - def get_file_metadata_elt( + def generate_file_metadata( self, - name: Optional[str] = None, - media_type: Optional[str] = None, - desc: Optional[str] = None, - size: Optional[int] = None, - file_hash: Optional[Tuple[str, str]] = None, - date: Optional[Union[float, int]] = None, - width: Optional[int] = None, - height: Optional[int] = None, - length: Optional[int] = None, - thumbnail: Optional[str] = None, - ) -> domish.Element: + name: str | None = None, + media_type: str | None = None, + desc: str | None = None, + size: int | None = None, + file_hash: tuple[str, str] | None = None, + date: float | int | None = None, + width: int | None = None, + height: int | None = None, + length: int | None = None, + thumbnail: str | None = None, + ) -> FileMetadata: """Generate the element describing a file @param name: name of the file @@ -79,87 +225,25 @@ @param thumbnail: URL to a thumbnail @return: ``<file/>`` element """ - if name: - name = Path(name).name - file_elt = domish.Element((NS_FILE_METADATA, "file")) - for name, value in ( - ("name", name), - ("media-type", media_type), - ("desc", desc), - ("size", size), - ("width", width), - ("height", height), - ("length", length), - ): - if value is not None: - file_elt.addElement(name, content=str(value)) - if file_hash is not None: - hash_algo, hash_ = file_hash - file_elt.addChild(self._hash.build_hash_elt(hash_, hash_algo)) - if date is not None: - file_elt.addElement("date", utils.xmpp_date(date)) - if thumbnail is not None: - # TODO: implement thumbnails - log.warning("thumbnail is not implemented yet") - return file_elt + return FileMetadata( + name=name, + media_type=media_type, + desc=desc, + size=size, + file_hash=file_hash, + date=date, + width=width, + height=height, + length=length, + thumbnail=thumbnail, + ) - def parse_file_metadata_elt( - self, file_metadata_elt: domish.Element - ) -> Dict[str, Any]: + def parse_file_metadata_elt(self, file_metadata_elt: domish.Element) -> FileMetadata: """Parse <file/> element @param file_metadata_elt: <file/> element a parent element can also be used - @return: file metadata. It's a dict whose keys correspond to - [get_file_metadata_elt] parameters + @return: FileMetadata instance. @raise exceptions.NotFound: no <file/> element has been found """ - - if file_metadata_elt.name != "file": - try: - file_metadata_elt = next( - file_metadata_elt.elements(NS_FILE_METADATA, "file") - ) - except StopIteration: - raise exceptions.NotFound - data: Dict[str, Any] = {} - - for key, type_ in ( - ("name", str), - ("media-type", str), - ("desc", str), - ("size", int), - ("date", "timestamp"), - ("width", int), - ("height", int), - ("length", int), - ): - elt = next(file_metadata_elt.elements(NS_FILE_METADATA, key), None) - if elt is not None: - if type_ in (str, int): - content = str(elt) - if key == "name": - # we avoid malformed names or names containing path elements - content = Path(content).name - elif key == "media-type": - key = "media_type" - data[key] = type_(content) - elif type == "timestamp": - data[key] = utils.parse_xmpp_date(str(elt)) - else: - raise exceptions.InternalError - - try: - algo, hash_ = self._hash.parse_hash_elt(file_metadata_elt) - except exceptions.NotFound: - pass - except exceptions.DataError: - from libervia.backend.tools.xml_tools import p_fmt_elt - - log.warning("invalid <hash/> element:\n{p_fmt_elt(file_metadata_elt)}") - else: - data["file_hash"] = (algo, hash_) - - # TODO: thumbnails - - return data + return FileMetadata.from_element(file_metadata_elt)
--- a/libervia/backend/plugins/plugin_xep_0447.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0447.py Tue Dec 03 00:12:38 2024 +0100 @@ -15,12 +15,28 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from abc import ABC, abstractmethod from collections import namedtuple from functools import partial import mimetypes from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import ( + Any, + Callable, + ClassVar, + Dict, + Final, + List, + Literal, + NamedTuple, + Optional, + Self, + Tuple, + Union, + cast, +) +from pydantic import BaseModel, Field import treq from twisted.internet import defer from twisted.words.xish import domish @@ -30,6 +46,8 @@ from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger +from libervia.backend.plugins.plugin_xep_0103 import URLData, XEP_0103 +from libervia.backend.plugins.plugin_xep_0446 import FileMetadata, XEP_0446 from libervia.backend.tools import stream from libervia.backend.tools.web import treq_client_no_ssl @@ -42,7 +60,13 @@ C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0447"], - C.PI_DEPENDENCIES: ["XEP-0103", "XEP-0334", "XEP-0446", "ATTACH", "DOWNLOAD"], + C.PI_DEPENDENCIES: [ + "XEP-0103", + "XEP-0334", + "XEP-0446", + "ATTACH", + "DOWNLOAD", + ], C.PI_RECOMMENDATIONS: ["XEP-0363"], C.PI_MAIN: "XEP_0447", C.PI_HANDLER: "no", @@ -50,7 +74,116 @@ } NS_SFS = "urn:xmpp:sfs:0" -SourceHandler = namedtuple("SourceHandler", ["callback", "encrypted"]) + + +class Source(ABC, BaseModel): + + type: ClassVar[str] + encrypted: ClassVar[bool] = False + + def __init_subclass__(cls) -> None: + super().__init_subclass__() + if not hasattr(cls, "type"): + raise TypeError( + f'Can\'t instantiate {cls.__name__} without "type" class attribute.' + ) + + @classmethod + @abstractmethod + def from_element(cls, element: domish.Element) -> Self: + """Parse an element and return corresponding model + + @param element: element to parse + @raise exceptions.DataError: the element is invalid + """ + + @abstractmethod + def to_element(self) -> domish.Element: + """Convert model to an element + + @return: domish.Element representing the model + """ + + +class FileSharing(BaseModel): + """ + Model for handling XEP-0447 <file-sharing> element. + """ + + file: FileMetadata + sources: list[Source] + disposition: str | None = Field( + default=None, + description="Disposition of the file, either 'attachment' or 'inline'.", + ) + id: str | None = Field( + default=None, description="Unique identifier for the file-sharing element." + ) + _sfs: "XEP_0447 | None" = None + + def to_element(self) -> domish.Element: + """Build the <file-sharing> element from this instance's data. + + @return: <file-sharing> element. + """ + file_sharing_elt = domish.Element((NS_SFS, "file-sharing")) + + if self.disposition: + file_sharing_elt["disposition"] = self.disposition + + if self.id: + file_sharing_elt["id"] = self.id + + file_sharing_elt.addChild(self.file.to_element()) + + sources_elt = file_sharing_elt.addElement("sources") + for source in self.sources: + sources_elt.addChild(source.to_element()) + + return file_sharing_elt + + @classmethod + def from_element(cls, file_sharing_elt: domish.Element) -> Self: + """Create a FileSharing instance from a <file-sharing> element or its parent. + + @param file_sharing_elt: The <file-sharing> element or a parent element. + @return: FileSharing instance. + @raise exceptions.NotFound: If the <file-sharing> element is not found. + """ + assert cls._sfs is not None + if file_sharing_elt.uri != NS_SFS or file_sharing_elt.name != "file-sharing": + child_file_sharing_elt = next( + file_sharing_elt.elements(NS_SFS, "file-sharing"), None + ) + if child_file_sharing_elt is None: + raise exceptions.NotFound("<file-sharing> element not found") + else: + file_sharing_elt = child_file_sharing_elt + + kwargs = {} + disposition = file_sharing_elt.getAttribute("disposition") + if disposition: + kwargs["disposition"] = disposition + + file_id = file_sharing_elt.getAttribute("id") + if file_id: + kwargs["id"] = file_id + + kwargs["file"] = FileMetadata.from_element(file_sharing_elt) + kwargs["sources"] = cls._sfs.parse_sources_elt(file_sharing_elt) + + return cls(**kwargs) + + +class URLDataSource(URLData, Source): + type = "url" + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + return super().from_element(element) + + def to_element(self) -> domish.Element: + return super().to_element() class XEP_0447: @@ -60,27 +193,34 @@ self.host = host log.info(_("XEP-0447 (Stateless File Sharing) plugin initialization")) host.register_namespace("sfs", NS_SFS) - self._sources_handlers = {} - self._u = host.plugins["XEP-0103"] + FileSharing._sfs = self + self._sources_handlers: dict[tuple[str, str], type[Source]] = {} + self._u = cast(XEP_0103, host.plugins["XEP-0103"]) self._hints = host.plugins["XEP-0334"] - self._m = host.plugins["XEP-0446"] + self._m = cast(XEP_0446, host.plugins["XEP-0446"]) self._http_upload = host.plugins.get("XEP-0363") self._attach = host.plugins["ATTACH"] self._attach.register(self.can_handle_attachment, self.attach, priority=1000) - self.register_source_handler( - self._u.namespace, "url-data", self._u.parse_url_data_elt + self.register_source( + self._u.namespace, + "url-data", + URLDataSource, + ) + self.register_source( + self._jp.namespace, + "jinglepub", + JinglePubSource, ) host.plugins["DOWNLOAD"].register_download_handler( self._u.namespace, self.download ) host.trigger.add("message_received", self._message_received_trigger) - def register_source_handler( + def register_source( self, namespace: str, element_name: str, - callback: Callable[[domish.Element], Dict[str, Any]], - encrypted: bool = False, + source: type[Source], ) -> None: """Register a handler for file source @@ -97,7 +237,7 @@ f"There is already a resource handler for namespace {namespace!r} and " f"name {element_name!r}" ) - self._sources_handlers[key] = SourceHandler(callback, encrypted) + self._sources_handlers[key] = source async def download( self, @@ -193,7 +333,7 @@ if media_type is None and name: media_type = mimetypes.guess_type(name, strict=False)[0] file_sharing_elt.addChild( - self._m.get_file_metadata_elt( + self._m.generate_file_metadata( name=name, media_type=media_type, desc=desc, @@ -204,19 +344,21 @@ height=height, length=length, thumbnail=thumbnail, - ) + ).to_element() ) sources_elt = self.get_sources_elt() file_sharing_elt.addChild(sources_elt) for source_data in sources: if "url" in source_data: - sources_elt.addChild(self._u.get_url_data_elt(**source_data)) + sources_elt.addChild( + self._u.generate_url_data(**source_data).to_element() + ) else: raise NotImplementedError(f"source data not implemented: {source_data}") return file_sharing_elt - def parse_sources_elt(self, sources_elt: domish.Element) -> List[Dict[str, Any]]: + def parse_sources_elt(self, sources_elt: domish.Element) -> List[Source]: """Parse <sources/> element @param sources_elt: <sources/> element, or a direct parent element @@ -242,12 +384,8 @@ log.warning(f"unmanaged file sharing element: {elt.toXml}") continue else: - source_data = source_handler.callback(elt) - if source_handler.encrypted: - source_data[C.MESS_KEY_ENCRYPTED] = True - if "type" not in source_data: - source_data["type"] = elt.uri - sources.append(source_data) + source = source_handler.from_element(elt) + sources.append(source) return sources def parse_file_sharing_elt(self, file_sharing_elt: domish.Element) -> Dict[str, Any]: @@ -263,7 +401,7 @@ except StopIteration: raise exceptions.NotFound try: - data = self._m.parse_file_metadata_elt(file_sharing_elt) + data = self._m.parse_file_metadata_elt(file_sharing_elt).model_dump() except exceptions.NotFound: data = {} disposition = file_sharing_elt.getAttribute("disposition")
--- a/libervia/backend/plugins/plugin_xep_0448.py Tue Dec 03 00:11:00 2024 +0100 +++ b/libervia/backend/plugins/plugin_xep_0448.py Tue Dec 03 00:12:38 2024 +0100 @@ -21,13 +21,14 @@ from pathlib import Path import secrets from textwrap import dedent -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, Optional, Self, Tuple, Union, cast from cryptography.exceptions import AlreadyFinalized from cryptography.hazmat import backends from cryptography.hazmat.primitives import ciphers from cryptography.hazmat.primitives.ciphers import CipherContext, modes from cryptography.hazmat.primitives.padding import PKCS7, PaddingContext +from pydantic import BaseModel, ValidationError import treq from twisted.internet import defer from twisted.words.protocols.jabber.xmlstream import XMPPHandler @@ -40,6 +41,10 @@ from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger +from libervia.backend.plugins.plugin_misc_download import DownloadPlugin +from libervia.backend.plugins.plugin_xep_0103 import XEP_0103 +from libervia.backend.plugins.plugin_xep_0300 import NS_HASHES, XEP_0300, Hash +from libervia.backend.plugins.plugin_xep_0447 import XEP_0447, Source from libervia.backend.tools import stream from libervia.backend.tools.web import treq_client_no_ssl @@ -79,90 +84,121 @@ NS_AES_256_CBC = "urn:xmpp:ciphers:aes-256-cbc-pkcs7:0" +class EncryptedSource(Source): + type = "encrypted" + encrypted = True + cipher: str + key: str + iv: str + hashes: list[Hash] + sources: list[Source] + _hash: XEP_0300 | None = None + _sfs: XEP_0447 | None = None + + @classmethod + def from_element(cls, element: domish.Element) -> Self: + """Parse an <encrypted> element and return corresponding EncryptedData model + + @param encrypted_elt: element to parse + @raise exceptions.DataError: the element is invalid + + """ + assert cls._hash is not None, "_hash attribute is not set" + assert cls._sfs is not None, "_sfs attribute is not set" + try: + cipher = element["cipher"] + key = str(next(element.elements(NS_ESFS, "key"))) + iv = str(next(element.elements(NS_ESFS, "iv"))) + except (KeyError, StopIteration): + raise exceptions.DataError( + "invalid <encrypted/> element: {encrypted_elt.toXml()}" + ) + sources = cls._sfs.parse_sources_elt(element) + if not sources: + raise exceptions.DataError(f"Sources are missing in {element.toXml()}") + + if any(isinstance(source, cls) for source in sources): + raise exceptions.DataError( + f"EncryptedData is used as a source of another EncryptedData" + ) + + encrypted_data = { + "cipher": cipher, + "key": key, + "iv": iv, + "hashes": Hash.from_parent(element), + "sources": sources, + } + + return cls(**encrypted_data) + + def to_element(self) -> domish.Element: + """Convert EncryptedData model to an <encrypted> element + + @return: domish.Element representing the encrypted data + + """ + assert self._hash is not None, "_hash attribute is not set" + encrypted_elt = domish.Element((NS_ESFS, "encrypted")) + encrypted_elt["cipher"] = self.cipher + encrypted_elt.addElement("key").addContent(self.key) + encrypted_elt.addElement("iv").addContent(self.iv) + for hash_ in self.hashes: + encrypted_elt.addChild(hash_.to_element()) + + return encrypted_elt + + class XEP_0448: def __init__(self, host): self.host = host log.info(_("XEP_0448 plugin initialization")) host.register_namespace("esfs", NS_ESFS) - self._u = host.plugins["XEP-0103"] - self._h = host.plugins["XEP-0300"] + self._u = cast(XEP_0103, host.plugins["XEP-0103"]) + self._h = cast(XEP_0300, host.plugins["XEP-0300"]) self._hints = host.plugins["XEP-0334"] self._http_upload = host.plugins["XEP-0363"] self._o = host.plugins["XEP-0384"] - self._sfs = host.plugins["XEP-0447"] - self._sfs.register_source_handler( - NS_ESFS, "encrypted", self.parse_encrypted_elt, encrypted=True - ) + self._sfs = cast(XEP_0447, host.plugins["XEP-0447"]) + self._sfs.register_source(NS_ESFS, "encrypted", EncryptedSource) self._attach = host.plugins["ATTACH"] self._attach.register( self.can_handle_attachment, self.attach, encrypted=True, priority=1000 ) - host.plugins["DOWNLOAD"].register_download_handler(NS_ESFS, self.download) + EncryptedSource._hash = self._h + EncryptedSource._sfs = self._sfs + download = cast(DownloadPlugin, host.plugins["DOWNLOAD"]) + download.register_download_handler(NS_ESFS, self.download) host.trigger.add("XEP-0363_upload_pre_slot", self._upload_pre_slot) host.trigger.add("XEP-0363_upload", self._upload_trigger) def get_handler(self, client): return XEP0448Handler() - def parse_encrypted_elt(self, encrypted_elt: domish.Element) -> Dict[str, Any]: - """Parse an <encrypted> element and return corresponding source data - - @param encrypted_elt: element to parse - @raise exceptions.DataError: the element is invalid - - """ - sources = self._sfs.parse_sources_elt(encrypted_elt) - if not sources: - raise exceptions.NotFound("sources are missing in {encrypted_elt.toXml()}") - if len(sources) > 1: - log.debug( - "more that one sources has been found, this is not expected, only the " - "first one will be used" - ) - source = sources[0] - source["type"] = NS_ESFS - try: - encrypted_data = source["encrypted_data"] = { - "cipher": encrypted_elt["cipher"], - "key": str(next(encrypted_elt.elements(NS_ESFS, "key"))), - "iv": str(next(encrypted_elt.elements(NS_ESFS, "iv"))), - } - except (KeyError, StopIteration): - raise exceptions.DataError( - "invalid <encrypted/> element: {encrypted_elt.toXml()}" - ) - try: - hash_algo, hash_value = self._h.parse_hash_elt(encrypted_elt) - except exceptions.NotFound: - pass - else: - encrypted_data["hash_algo"] = hash_algo - encrypted_data["hash"] = base64.b64encode(hash_value.encode()).decode() - return source - async def download( self, client: SatXMPPEntity, - attachment: Dict[str, Any], - source: Dict[str, Any], + attachment: dict[str, Any], + source: dict[str, Any], dest_path: Union[Path, str], - extra: Optional[Dict[str, Any]] = None, - ) -> Tuple[str, defer.Deferred]: + extra: dict[str, Any] | None = None, + ) -> tuple[str, defer.Deferred]: # TODO: check hash if extra is None: extra = {} + assert source["type"] == "encrypted" try: - encrypted_data = source["encrypted_data"] - cipher = encrypted_data["cipher"] - iv = base64.b64decode(encrypted_data["iv"]) - key = base64.b64decode(encrypted_data["key"]) + cipher = source["cipher"] + iv = base64.b64decode(source["iv"]) + key = base64.b64decode(source["key"]) except KeyError as e: - raise ValueError(f"{source} has incomplete encryption data: {e}") + raise ValueError(f"{source} has incomplete encryption data: {e}") from e + try: - download_url = source["url"] - except KeyError: - raise ValueError(f"{source} has missing URL") + download_url = source["sources"][0]["url"] + except (IndexError, KeyError) as e: + raise ValueError(f"{source} has missing URL") from e if extra.get("ignore_tls_errors", False): log.warning("TLS certificate check disabled, this is highly insecure") @@ -294,9 +330,9 @@ size=attachment["size"], file_hash=file_hash, ) - encrypted_elt = file_sharing_elt.sources.addElement( - (NS_ESFS, "encrypted") - ) + sources_elt = file_sharing_elt.sources + assert sources_elt is not None + encrypted_elt = sources_elt.addElement((NS_ESFS, "encrypted")) encrypted_elt["cipher"] = NS_AES_256_GCM encrypted_elt.addElement( "key", content=base64.b64encode(encryption_data["key"]).decode() @@ -311,7 +347,7 @@ ) encrypted_elt.addChild( self._sfs.get_sources_elt( - [self._u.get_url_data_elt(attachment["url"])] + [self._u.generate_url_data(attachment["url"]).to_element()] ) ) data["xml"].addChild(file_sharing_elt)