view libervia/backend/plugins/plugin_xep_0300.py @ 4336:6e0918e638ee

plugin XEP-0498: "Pubsub File Sharing" implementation: Partial implementation of XEP-0498, necessary to implement the service part in email gateway. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 111dce64dcb5
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Hash functions (XEP-0300)
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
from collections import OrderedDict
import hashlib
from typing import BinaryIO, Callable, Self, TYPE_CHECKING

from _hashlib import HASH
from pydantic import BaseModel, Field
from twisted.internet import threads
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Cryptographic Hash Functions",
    C.PI_IMPORT_NAME: "XEP-0300",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0300"],
    C.PI_MAIN: "XEP_0300",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Management of cryptographic hashes"""),
}

NS_HASHES = "urn:xmpp:hashes:2"
NS_HASHES_FUNCTIONS = "urn:xmpp:hash-function-text-names:{}"
BUFFER_SIZE = 2**12
ALGO_DEFAULT = "sha-256"


class Hash(BaseModel):
    """
    Model for hash data.
    """

    algo: str = Field(description="The algorithm used for hashing.")
    hash_: str = Field(min_length=16, description="The base64-encoded hash value.")

    @classmethod
    def from_element(cls, hash_elt: domish.Element) -> Self:
        """
        Create a HashModel instance from a <hash> element.

        @param hash_elt: The <hash> element.
        @return: HashModel instance.
        @raise exceptions.NotFound: If the <hash> element is not found.
        """
        if hash_elt.uri != NS_HASHES or hash_elt.name != "hash":
            raise exceptions.NotFound("<hash> element not found")
        algo = hash_elt.getAttribute("algo")
        hash_value = str(hash_elt)
        return cls(algo=algo, hash_=hash_value)

    @classmethod
    def from_parent(cls, parent_elt: domish.Element) -> list[Self]:
        """Find and return child <hash> element in given parent.

        @param parent_elt: Element which may content child <hash> elements.
        @return: list of Hash corresponding to found elements
        """
        return [
            cls.from_element(hash_elt)
            for hash_elt in parent_elt.elements(NS_HASHES, "hash")
        ]

    def to_element(self) -> domish.Element:
        """Build the <hash> element from this instance's data.

        @return: <hash> element.
        """
        hash_elt = domish.Element((NS_HASHES, "hash"))
        hash_elt["algo"] = self.algo
        hash_elt.addContent(self.hash_)
        return hash_elt


class HashUsed(BaseModel):
    """
    Model for hash-used data.
    """

    algo: str = Field(description="The algorithm used for hashing.")

    @classmethod
    def from_element(cls, hash_used_elt: domish.Element) -> Self:
        """Create a HashUsedModel instance from a <hash-used> element.

        @param hash_used_elt: The <hash-used> element.
        @return: HashUsedModel instance.
        @raise exceptions.NotFound: If the <hash-used> element is not found.
        """
        if hash_used_elt.uri != NS_HASHES or hash_used_elt.name != "hash-used":
            child_hash_used_elt = next(
                hash_used_elt.elements(NS_HASHES, "hash-used"), None
            )
            if child_hash_used_elt is None:
                raise exceptions.NotFound("<hash-used> element not found")
            else:
                hash_used_elt = child_hash_used_elt
        algo = hash_used_elt.getAttribute("algo")
        return cls(algo=algo)

    def to_element(self) -> domish.Element:
        """Build the <hash-used> element from this instance's data.

        @return: <hash-used> element.
        """
        hash_used_elt = domish.Element((NS_HASHES, "hash-used"))
        hash_used_elt["algo"] = self.algo
        return hash_used_elt


class XEP_0300:
    # TODO: add blake after moving to Python 3
    ALGOS: OrderedDict[str, Callable] = OrderedDict(
        (
            ("md5", hashlib.md5),
            ("sha-1", hashlib.sha1),
            ("sha-256", hashlib.sha256),
            ("sha-512", hashlib.sha512),
        )
    )
    ALGO_DEFAULT = ALGO_DEFAULT

    def __init__(self, host: "LiberviaBackend"):
        log.info(_("plugin Hashes initialization"))
        host.register_namespace("hashes", NS_HASHES)
        self.host = host

    def get_handler(self, client: SatXMPPEntity) -> XMPPHandler:
        return XEP_0300_handler()

    def get_hasher(self, algo: str = ALGO_DEFAULT) -> Callable:
        """Return hasher instance

        @param algo: one of the XEP_300.ALGOS keys
        @return: same object s in hashlib.
           update method need to be called for each chunk
           digest or hexdigest can be used at the end
        """
        return self.ALGOS[algo]()

    def get_default_algo(self) -> str:
        return ALGO_DEFAULT

    async def get_best_peer_algo(self, to_jid: jid.JID, profile: str) -> str | None:
        """Return the best available hashing algorithm of other peer

        @param to_jid: peer jid
        @param profile: %(doc_profile)s
        @return: best available algorithm,
           or None if hashing is not possible
        """
        client = self.host.get_client(profile)
        for algo in reversed(XEP_0300.ALGOS):
            has_feature = await self.host.hasFeature(
                client, NS_HASHES_FUNCTIONS.format(algo), to_jid
            )
            if has_feature:
                log.debug(
                    "Best hashing algorithm found for {jid}: {algo}".format(
                        jid=to_jid.full(), algo=algo
                    )
                )
                return algo

    def _calculate_hash_blocking(self, file_obj: BinaryIO, hasher: HASH) -> str:
        """Calculate hash in a blocking way

        /!\\ blocking method, please use calculate_hash instead
        @param file_obj: a file-like object
        @param hasher: the method to call to initialise hash object
        @return: the hex digest of the hash
        """
        while True:
            buf = file_obj.read(BUFFER_SIZE)
            if not buf:
                break
            hasher.update(buf)
        return hasher.hexdigest()

    def calculate_hash(self, file_obj: BinaryIO, hasher: HASH) -> defer.Deferred[str]:
        return threads.deferToThread(self._calculate_hash_blocking, file_obj, hasher)

    async def calculate_hash_elt(
        self, file_obj: BinaryIO, algo: str = ALGO_DEFAULT
    ) -> domish.Element:
        """Compute hash and build hash element

        @param file_obj: file-like object to use to calculate the hash
        @param algo: algorithm to use, must be a key of XEP_0300.ALGOS
        @return: hash element
        """
        hasher = self.get_hasher(algo)
        hash_ = await self.calculate_hash(file_obj, hasher)
        return self.build_hash_elt(hash_, algo)

    def build_hash_used_elt(self, algo: str = ALGO_DEFAULT) -> domish.Element:
        hash_used_model = HashUsed(algo=algo)
        return hash_used_model.to_element()

    def parse_hash_used_elt(self, parent_elt: domish.Element) -> str:
        """Find and parse a hash-used element

        @param parent: parent of <hash-used/> element
        @return: hash algorithm used
        @raise exceptions.NotFound: the element is not present
        @raise exceptions.DataError: the element is invalid
        """
        hash_used_model = HashUsed.from_element(parent_elt)
        return hash_used_model.algo

    def build_hash_elt(self, hash_hex: str, algo: str = ALGO_DEFAULT) -> domish.Element:
        """Compute hash and build hash element

        @param hash_: Hexadecimal representation of hash to use.
        @param algo: Algorithm to use, must be a key of XEP_0300.ALGOS.
        @return: <hash> element
        """
        b64_hash = base64.b64encode(hash_hex.encode()).decode()
        hash_model = Hash(algo=algo, hash_=b64_hash)
        return hash_model.to_element()

    def parse_hash_elt(self, parent: domish.Element) -> tuple[str, str]:
        """Find and parse a hash element

        if multiple elements are found, the strongest managed one is returned
        @param parent: parent of <hash/> element
        @return: (algo, hash) tuple
            both values can be None if <hash/> is empty
        @raise exceptions.NotFound: the element is not present
        @raise exceptions.DataError: the element is invalid
        """
        algos = list(XEP_0300.ALGOS.keys())
        hash_elt = None
        best_algo = None
        best_value = None
        for hash_elt in parent.elements(NS_HASHES, "hash"):
            hash_model = Hash.from_element(hash_elt)
            algo = hash_model.algo
            try:
                idx = algos.index(algo)
            except ValueError:
                log.warning(f"Proposed {algo} algorithm is not managed")
                algo = None
                continue

            if best_algo is None or algos.index(best_algo) < idx:
                best_algo = algo
                best_value = base64.b64decode(hash_model.hash_).decode()

        if not hash_elt:
            raise exceptions.NotFound
        if not best_algo or not best_value:
            raise exceptions.DataError
        return best_algo, best_value


@implementer(iwokkel.IDisco)
class XEP_0300_handler(XMPPHandler):

    def getDiscoInfo(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoFeature]:
        hash_functions_names = [
            disco.DiscoFeature(NS_HASHES_FUNCTIONS.format(algo))
            for algo in XEP_0300.ALGOS
        ]
        return [disco.DiscoFeature(NS_HASHES)] + hash_functions_names

    def getDiscoItems(
        self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
    ) -> list[disco.DiscoItem]:
        return []