diff libervia/backend/plugins/plugin_xep_0300.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0300.py@524856bd7b19
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0300.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,228 @@
+#!/usr/bin/env python3
+
+
+# SAT plugin for Hash functions (XEP-0300)
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Tuple
+import base64
+from collections import OrderedDict
+import hashlib
+
+from twisted.internet import threads
+from twisted.internet import defer
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Cryptographic Hash Functions",
+    C.PI_IMPORT_NAME: "XEP-0300",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0300"],
+    C.PI_MAIN: "XEP_0300",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Management of cryptographic hashes"""),
+}
+
+NS_HASHES = "urn:xmpp:hashes:2"
+NS_HASHES_FUNCTIONS = "urn:xmpp:hash-function-text-names:{}"
+BUFFER_SIZE = 2 ** 12
+ALGO_DEFAULT = "sha-256"
+
+
+class XEP_0300(object):
+    # TODO: add blake after moving to Python 3
+    ALGOS = OrderedDict(
+        (
+            ("md5", hashlib.md5),
+            ("sha-1", hashlib.sha1),
+            ("sha-256", hashlib.sha256),
+            ("sha-512", hashlib.sha512),
+        )
+    )
+    ALGO_DEFAULT = ALGO_DEFAULT
+
+    def __init__(self, host):
+        log.info(_("plugin Hashes initialization"))
+        host.register_namespace("hashes", NS_HASHES)
+
+    def get_handler(self, client):
+        return XEP_0300_handler()
+
+    def get_hasher(self, algo=ALGO_DEFAULT):
+        """Return hasher instance
+
+        @param algo(unicode): one of the XEP_300.ALGOS keys
+        @return (hash object): same object s in hashlib.
+           update method need to be called for each chunh
+           diget or hexdigest can be used at the end
+        """
+        return self.ALGOS[algo]()
+
+    def get_default_algo(self):
+        return ALGO_DEFAULT
+
+    @defer.inlineCallbacks
+    def get_best_peer_algo(self, to_jid, profile):
+        """Return the best available hashing algorith of other peer
+
+         @param to_jid(jid.JID): peer jid
+         @parm profile: %(doc_profile)s
+         @return (D(unicode, None)): best available algorithm,
+            or None if hashing is not possible
+        """
+        client = self.host.get_client(profile)
+        for algo in reversed(XEP_0300.ALGOS):
+            has_feature = yield self.host.hasFeature(
+                client, NS_HASHES_FUNCTIONS.format(algo), to_jid
+            )
+            if has_feature:
+                log.debug(
+                    "Best hashing algorithm found for {jid}: {algo}".format(
+                        jid=to_jid.full(), algo=algo
+                    )
+                )
+                defer.returnValue(algo)
+
+    def _calculate_hash_blocking(self, file_obj, hasher):
+        """Calculate hash in a blocking way
+
+        /!\\ blocking method, please use calculate_hash instead
+        @param file_obj(file): a file-like object
+        @param hasher(hash object): the method to call to initialise hash object
+        @return (str): the hex digest of the hash
+        """
+        while True:
+            buf = file_obj.read(BUFFER_SIZE)
+            if not buf:
+                break
+            hasher.update(buf)
+        return hasher.hexdigest()
+
+    def calculate_hash(self, file_obj, hasher):
+        return threads.deferToThread(self._calculate_hash_blocking, file_obj, hasher)
+
+    def calculate_hash_elt(self, file_obj=None, algo=ALGO_DEFAULT):
+        """Compute hash and build hash element
+
+        @param file_obj(file, None): file-like object to use to calculate the hash
+        @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS
+        @return (D(domish.Element)): hash element
+        """
+
+        def hash_calculated(hash_):
+            return self.build_hash_elt(hash_, algo)
+
+        hasher = self.get_hasher(algo)
+        hash_d = self.calculate_hash(file_obj, hasher)
+        hash_d.addCallback(hash_calculated)
+        return hash_d
+
+    def build_hash_used_elt(self, algo=ALGO_DEFAULT):
+        hash_used_elt = domish.Element((NS_HASHES, "hash-used"))
+        hash_used_elt["algo"] = algo
+        return hash_used_elt
+
+    def parse_hash_used_elt(self, parent):
+        """Find and parse a hash-used element
+
+        @param (domish.Element): parent of <hash/> element
+        @return (unicode): hash algorithm used
+        @raise exceptions.NotFound: the element is not present
+        @raise exceptions.DataError: the element is invalid
+        """
+        try:
+            hash_used_elt = next(parent.elements(NS_HASHES, "hash-used"))
+        except StopIteration:
+            raise exceptions.NotFound
+        algo = hash_used_elt["algo"]
+        if not algo:
+            raise exceptions.DataError
+        return algo
+
+    def build_hash_elt(self, hash_, algo=ALGO_DEFAULT):
+        """Compute hash and build hash element
+
+        @param hash_(str): hash to use
+        @param algo(unicode): algorithme to use, must be a key of XEP_0300.ALGOS
+        @return (domish.Element): computed hash
+        """
+        assert hash_
+        assert algo
+        hash_elt = domish.Element((NS_HASHES, "hash"))
+        if hash_ is not None:
+            b64_hash = base64.b64encode(hash_.encode('utf-8')).decode('utf-8')
+            hash_elt.addContent(b64_hash)
+        hash_elt["algo"] = algo
+        return hash_elt
+
+    def parse_hash_elt(self, parent: domish.Element) -> Tuple[str, bytes]:
+        """Find and parse a hash element
+
+        if multiple elements are found, the strongest managed one is returned
+        @param parent: parent of <hash/> element
+        @return: (algo, hash) tuple
+            both values can be None if <hash/> is empty
+        @raise exceptions.NotFound: the element is not present
+        @raise exceptions.DataError: the element is invalid
+        """
+        algos = list(XEP_0300.ALGOS.keys())
+        hash_elt = None
+        best_algo = None
+        best_value = None
+        for hash_elt in parent.elements(NS_HASHES, "hash"):
+            algo = hash_elt.getAttribute("algo")
+            try:
+                idx = algos.index(algo)
+            except ValueError:
+                log.warning(f"Proposed {algo} algorithm is not managed")
+                algo = None
+                continue
+
+            if best_algo is None or algos.index(best_algo) < idx:
+                best_algo = algo
+                best_value = base64.b64decode(str(hash_elt)).decode('utf-8')
+
+        if not hash_elt:
+            raise exceptions.NotFound
+        if not best_algo or not best_value:
+            raise exceptions.DataError
+        return best_algo, best_value
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0300_handler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        hash_functions_names = [
+            disco.DiscoFeature(NS_HASHES_FUNCTIONS.format(algo))
+            for algo in XEP_0300.ALGOS
+        ]
+        return [disco.DiscoFeature(NS_HASHES)] + hash_functions_names
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []