Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0300.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0300.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0300.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 + + +# SAT plugin for Hash functions (XEP-0300) +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Tuple +import base64 +from collections import OrderedDict +import hashlib + +from twisted.internet import threads +from twisted.internet import defer +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "Cryptographic Hash Functions", + C.PI_IMPORT_NAME: "XEP-0300", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0300"], + C.PI_MAIN: "XEP_0300", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Management of cryptographic hashes"""), +} + +NS_HASHES = "urn:xmpp:hashes:2" +NS_HASHES_FUNCTIONS = "urn:xmpp:hash-function-text-names:{}" +BUFFER_SIZE = 2 ** 12 +ALGO_DEFAULT = "sha-256" + + +class XEP_0300(object): + # TODO: add blake after moving to Python 3 + ALGOS = OrderedDict( + ( + ("md5", hashlib.md5), + ("sha-1", hashlib.sha1), + ("sha-256", hashlib.sha256), + ("sha-512", hashlib.sha512), + ) + ) + ALGO_DEFAULT = ALGO_DEFAULT + + def __init__(self, host): + log.info(_("plugin Hashes initialization")) + host.register_namespace("hashes", NS_HASHES) + + def get_handler(self, client): + return XEP_0300_handler() + + def get_hasher(self, algo=ALGO_DEFAULT): + """Return hasher instance + + @param algo(unicode): one of the XEP_300.ALGOS keys + @return (hash object): same object s in hashlib. + update method need to be called for each chunh + diget or hexdigest can be used at the end + """ + return self.ALGOS[algo]() + + def get_default_algo(self): + return ALGO_DEFAULT + + @defer.inlineCallbacks + def get_best_peer_algo(self, to_jid, profile): + """Return the best available hashing algorith of other peer + + @param to_jid(jid.JID): peer jid + @parm profile: %(doc_profile)s + @return (D(unicode, None)): best available algorithm, + or None if hashing is not possible + """ + client = self.host.get_client(profile) + for algo in reversed(XEP_0300.ALGOS): + has_feature = yield self.host.hasFeature( + client, NS_HASHES_FUNCTIONS.format(algo), to_jid + ) + if has_feature: + log.debug( + "Best hashing algorithm found for {jid}: {algo}".format( + jid=to_jid.full(), algo=algo + ) + ) + defer.returnValue(algo) + + def _calculate_hash_blocking(self, file_obj, hasher): + """Calculate hash in a blocking way + + /!\\ blocking method, please use calculate_hash instead + @param file_obj(file): a file-like object + @param hasher(hash object): the method to call to initialise hash object + @return (str): the hex digest of the hash + """ + while True: + buf = file_obj.read(BUFFER_SIZE) + if not buf: + break + hasher.update(buf) + return hasher.hexdigest() + + def calculate_hash(self, file_obj, hasher): + return threads.deferToThread(self._calculate_hash_blocking, file_obj, hasher) + + def calculate_hash_elt(self, file_obj=None, algo=ALGO_DEFAULT): + """Compute hash and build hash element + + @param file_obj(file, None): file-like object to use to calculate the hash + @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS + @return (D(domish.Element)): hash element + """ + + def hash_calculated(hash_): + return self.build_hash_elt(hash_, algo) + + hasher = self.get_hasher(algo) + hash_d = self.calculate_hash(file_obj, hasher) + hash_d.addCallback(hash_calculated) + return hash_d + + def build_hash_used_elt(self, algo=ALGO_DEFAULT): + hash_used_elt = domish.Element((NS_HASHES, "hash-used")) + hash_used_elt["algo"] = algo + return hash_used_elt + + def parse_hash_used_elt(self, parent): + """Find and parse a hash-used element + + @param (domish.Element): parent of <hash/> element + @return (unicode): hash algorithm used + @raise exceptions.NotFound: the element is not present + @raise exceptions.DataError: the element is invalid + """ + try: + hash_used_elt = next(parent.elements(NS_HASHES, "hash-used")) + except StopIteration: + raise exceptions.NotFound + algo = hash_used_elt["algo"] + if not algo: + raise exceptions.DataError + return algo + + def build_hash_elt(self, hash_, algo=ALGO_DEFAULT): + """Compute hash and build hash element + + @param hash_(str): hash to use + @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS + @return (domish.Element): computed hash + """ + assert hash_ + assert algo + hash_elt = domish.Element((NS_HASHES, "hash")) + if hash_ is not None: + b64_hash = base64.b64encode(hash_.encode('utf-8')).decode('utf-8') + hash_elt.addContent(b64_hash) + hash_elt["algo"] = algo + return hash_elt + + def parse_hash_elt(self, parent: domish.Element) -> Tuple[str, bytes]: + """Find and parse a hash element + + if multiple elements are found, the strongest managed one is returned + @param parent: parent of <hash/> element + @return: (algo, hash) tuple + both values can be None if <hash/> is empty + @raise exceptions.NotFound: the element is not present + @raise exceptions.DataError: the element is invalid + """ + algos = list(XEP_0300.ALGOS.keys()) + hash_elt = None + best_algo = None + best_value = None + for hash_elt in parent.elements(NS_HASHES, "hash"): + algo = hash_elt.getAttribute("algo") + try: + idx = algos.index(algo) + except ValueError: + log.warning(f"Proposed {algo} algorithm is not managed") + algo = None + continue + + if best_algo is None or algos.index(best_algo) < idx: + best_algo = algo + best_value = base64.b64decode(str(hash_elt)).decode('utf-8') + + if not hash_elt: + raise exceptions.NotFound + if not best_algo or not best_value: + raise exceptions.DataError + return best_algo, best_value + + +@implementer(iwokkel.IDisco) +class XEP_0300_handler(XMPPHandler): + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + hash_functions_names = [ + disco.DiscoFeature(NS_HASHES_FUNCTIONS.format(algo)) + for algo in XEP_0300.ALGOS + ] + return [disco.DiscoFeature(NS_HASHES)] + hash_functions_names + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []