Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0300.py @ 4336:6e0918e638ee
plugin XEP-0498: "Pubsub File Sharing" implementation:
Partial implementation of XEP-0498, necessary to implement the service part in email
gateway.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:23 +0100 |
parents | 111dce64dcb5 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for Hash functions (XEP-0300) # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 from collections import OrderedDict import hashlib from typing import BinaryIO, Callable, Self, TYPE_CHECKING from _hashlib import HASH from pydantic import BaseModel, Field from twisted.internet import threads from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Cryptographic Hash Functions", C.PI_IMPORT_NAME: "XEP-0300", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0300"], C.PI_MAIN: "XEP_0300", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Management of cryptographic hashes"""), } NS_HASHES = "urn:xmpp:hashes:2" NS_HASHES_FUNCTIONS = "urn:xmpp:hash-function-text-names:{}" BUFFER_SIZE = 2**12 ALGO_DEFAULT = "sha-256" class Hash(BaseModel): """ Model for hash data. """ algo: str = Field(description="The algorithm used for hashing.") hash_: str = Field(min_length=16, description="The base64-encoded hash value.") @classmethod def from_element(cls, hash_elt: domish.Element) -> Self: """ Create a HashModel instance from a <hash> element. @param hash_elt: The <hash> element. @return: HashModel instance. @raise exceptions.NotFound: If the <hash> element is not found. """ if hash_elt.uri != NS_HASHES or hash_elt.name != "hash": raise exceptions.NotFound("<hash> element not found") algo = hash_elt.getAttribute("algo") hash_value = str(hash_elt) return cls(algo=algo, hash_=hash_value) @classmethod def from_parent(cls, parent_elt: domish.Element) -> list[Self]: """Find and return child <hash> element in given parent. @param parent_elt: Element which may content child <hash> elements. @return: list of Hash corresponding to found elements """ return [ cls.from_element(hash_elt) for hash_elt in parent_elt.elements(NS_HASHES, "hash") ] def to_element(self) -> domish.Element: """Build the <hash> element from this instance's data. @return: <hash> element. """ hash_elt = domish.Element((NS_HASHES, "hash")) hash_elt["algo"] = self.algo hash_elt.addContent(self.hash_) return hash_elt class HashUsed(BaseModel): """ Model for hash-used data. """ algo: str = Field(description="The algorithm used for hashing.") @classmethod def from_element(cls, hash_used_elt: domish.Element) -> Self: """Create a HashUsedModel instance from a <hash-used> element. @param hash_used_elt: The <hash-used> element. @return: HashUsedModel instance. @raise exceptions.NotFound: If the <hash-used> element is not found. """ if hash_used_elt.uri != NS_HASHES or hash_used_elt.name != "hash-used": child_hash_used_elt = next( hash_used_elt.elements(NS_HASHES, "hash-used"), None ) if child_hash_used_elt is None: raise exceptions.NotFound("<hash-used> element not found") else: hash_used_elt = child_hash_used_elt algo = hash_used_elt.getAttribute("algo") return cls(algo=algo) def to_element(self) -> domish.Element: """Build the <hash-used> element from this instance's data. @return: <hash-used> element. """ hash_used_elt = domish.Element((NS_HASHES, "hash-used")) hash_used_elt["algo"] = self.algo return hash_used_elt class XEP_0300: # TODO: add blake after moving to Python 3 ALGOS: OrderedDict[str, Callable] = OrderedDict( ( ("md5", hashlib.md5), ("sha-1", hashlib.sha1), ("sha-256", hashlib.sha256), ("sha-512", hashlib.sha512), ) ) ALGO_DEFAULT = ALGO_DEFAULT def __init__(self, host: "LiberviaBackend"): log.info(_("plugin Hashes initialization")) host.register_namespace("hashes", NS_HASHES) self.host = host def get_handler(self, client: SatXMPPEntity) -> XMPPHandler: return XEP_0300_handler() def get_hasher(self, algo: str = ALGO_DEFAULT) -> Callable: """Return hasher instance @param algo: one of the XEP_300.ALGOS keys @return: same object s in hashlib. update method need to be called for each chunk digest or hexdigest can be used at the end """ return self.ALGOS[algo]() def get_default_algo(self) -> str: return ALGO_DEFAULT async def get_best_peer_algo(self, to_jid: jid.JID, profile: str) -> str | None: """Return the best available hashing algorithm of other peer @param to_jid: peer jid @param profile: %(doc_profile)s @return: best available algorithm, or None if hashing is not possible """ client = self.host.get_client(profile) for algo in reversed(XEP_0300.ALGOS): has_feature = await self.host.hasFeature( client, NS_HASHES_FUNCTIONS.format(algo), to_jid ) if has_feature: log.debug( "Best hashing algorithm found for {jid}: {algo}".format( jid=to_jid.full(), algo=algo ) ) return algo def _calculate_hash_blocking(self, file_obj: BinaryIO, hasher: HASH) -> str: """Calculate hash in a blocking way /!\\ blocking method, please use calculate_hash instead @param file_obj: a file-like object @param hasher: the method to call to initialise hash object @return: the hex digest of the hash """ while True: buf = file_obj.read(BUFFER_SIZE) if not buf: break hasher.update(buf) return hasher.hexdigest() def calculate_hash(self, file_obj: BinaryIO, hasher: HASH) -> defer.Deferred[str]: return threads.deferToThread(self._calculate_hash_blocking, file_obj, hasher) async def calculate_hash_elt( self, file_obj: BinaryIO, algo: str = ALGO_DEFAULT ) -> domish.Element: """Compute hash and build hash element @param file_obj: file-like object to use to calculate the hash @param algo: algorithm to use, must be a key of XEP_0300.ALGOS @return: hash element """ hasher = self.get_hasher(algo) hash_ = await self.calculate_hash(file_obj, hasher) return self.build_hash_elt(hash_, algo) def build_hash_used_elt(self, algo: str = ALGO_DEFAULT) -> domish.Element: hash_used_model = HashUsed(algo=algo) return hash_used_model.to_element() def parse_hash_used_elt(self, parent_elt: domish.Element) -> str: """Find and parse a hash-used element @param parent: parent of <hash-used/> element @return: hash algorithm used @raise exceptions.NotFound: the element is not present @raise exceptions.DataError: the element is invalid """ hash_used_model = HashUsed.from_element(parent_elt) return hash_used_model.algo def build_hash_elt(self, hash_hex: str, algo: str = ALGO_DEFAULT) -> domish.Element: """Compute hash and build hash element @param hash_: Hexadecimal representation of hash to use. @param algo: Algorithm to use, must be a key of XEP_0300.ALGOS. @return: <hash> element """ b64_hash = base64.b64encode(hash_hex.encode()).decode() hash_model = Hash(algo=algo, hash_=b64_hash) return hash_model.to_element() def parse_hash_elt(self, parent: domish.Element) -> tuple[str, str]: """Find and parse a hash element if multiple elements are found, the strongest managed one is returned @param parent: parent of <hash/> element @return: (algo, hash) tuple both values can be None if <hash/> is empty @raise exceptions.NotFound: the element is not present @raise exceptions.DataError: the element is invalid """ algos = list(XEP_0300.ALGOS.keys()) hash_elt = None best_algo = None best_value = None for hash_elt in parent.elements(NS_HASHES, "hash"): hash_model = Hash.from_element(hash_elt) algo = hash_model.algo try: idx = algos.index(algo) except ValueError: log.warning(f"Proposed {algo} algorithm is not managed") algo = None continue if best_algo is None or algos.index(best_algo) < idx: best_algo = algo best_value = base64.b64decode(hash_model.hash_).decode() if not hash_elt: raise exceptions.NotFound if not best_algo or not best_value: raise exceptions.DataError return best_algo, best_value @implementer(iwokkel.IDisco) class XEP_0300_handler(XMPPHandler): def getDiscoInfo( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoFeature]: hash_functions_names = [ disco.DiscoFeature(NS_HASHES_FUNCTIONS.format(algo)) for algo in XEP_0300.ALGOS ] return [disco.DiscoFeature(NS_HASHES)] + hash_functions_names def getDiscoItems( self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" ) -> list[disco.DiscoItem]: return []