view sat/plugins/plugin_xep_0446.py @ 3914:4cb38c8312a1

plugin XEP-0384, xml_tools: avoid `getItems` timeout + fix empty node crash + parsing: - use `max_items` in `getItems` calls for bundles, as otherwise some pubsub service may return full nodes, which may be huge is `max_items=1` is not set on the node, possibly resulting in timeouts. - the plugin was crashing when TWOMEMO devices list node has no items at all. This is not the case anymore. - a naive parsing method has been implemented in `xml_tools` to replace the serialisation/deserialisation method. This should be more efficient and will avoid annoying `ns0:` prefixes in XML logs.
author Goffi <goffi@goffi.org>
date Sat, 24 Sep 2022 16:37:46 +0200
parents dbf0c7faaf49
children 0ff265725489
line wrap: on
line source

#!/usr/bin/env python3

# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from logging import exception
from typing import Optional, Union, Tuple, Dict, Any
from pathlib import Path

from twisted.words.xish import domish

from sat.core.constants import Const as C
from sat.core.i18n import _
from sat.core.log import getLogger
from sat.core import exceptions
from sat.tools import utils

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "File Metadata Element",
    C.PI_IMPORT_NAME: "XEP-0446",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0446"],
    C.PI_DEPENDENCIES: ["XEP-0300"],
    C.PI_MAIN: "XEP_0446",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Implementation of XEP-0446 (File Metadata Element)"""),
}

NS_FILE_METADATA = "urn:xmpp:file:metadata:0"


class XEP_0446:

    def __init__(self, host):
        log.info(_("XEP-0446 (File Metadata Element) plugin initialization"))
        host.registerNamespace("file-metadata", NS_FILE_METADATA)
        self._hash = host.plugins["XEP-0300"]

    def get_file_metadata_elt(
        self,
        name: Optional[str] = None,
        media_type: Optional[str] = None,
        desc: Optional[str] = None,
        size: Optional[int] = None,
        file_hash: Optional[Tuple[str, str]] = None,
        date: Optional[Union[float, int]] = None,
        width: Optional[int] = None,
        height: Optional[int] = None,
        length: Optional[int] = None,
        thumbnail: Optional[str] = None,
    ) -> domish.Element:
        """Generate the element describing a file

        @param name: name of the file
        @param media_type: media-type
        @param desc: description
        @param size: size in bytes
        @param file_hash: (algo, hash)
        @param date: timestamp of the last modification datetime
        @param width: image width in pixels
        @param height: image height in pixels
        @param length: video length in seconds
        @param thumbnail: URL to a thumbnail
        @return: ``<file/>`` element
        """
        if name:
            name = Path(name).name
        file_elt = domish.Element((NS_FILE_METADATA, "file"))
        for name, value in (
            ("name", name),
            ("media-type", media_type),
            ("desc", desc),
            ("size", size),
            ("width", width),
            ("height", height),
            ("length", length),
        ):
            if value is not None:
                file_elt.addElement(name, content=str(value))
        if file_hash is not None:
            hash_algo, hash_ = file_hash
            file_elt.addChild(self._hash.buildHashElt(hash_, hash_algo))
        if date is not None:
            file_elt.addElement("date", utils.xmpp_date(date))
        if thumbnail is not None:
            # TODO: implement thumbnails
            log.warning("thumbnail is not implemented yet")
        return file_elt

    def parse_file_metadata_elt(
        self,
        file_metadata_elt: domish.Element
    ) -> Dict[str, Any]:
        """Parse <file-metadata/> element

        @param file_metadata_elt: <file-metadata/> element
            a parent element can also be used
        @return: file-metadata data. It's a dict whose keys correspond to
            [get_file_metadata_elt] parameters
        @raise exceptions.NotFound: no <file-metadata/> element has been found
        """

        if file_metadata_elt.name != "file-metadata":
            try:
                file_metadata_elt = next(
                    file_metadata_elt.elements(NS_FILE_METADATA, "file-metadata")
                )
            except StopIteration:
                raise exceptions.NotFound
        data: Dict[str, Any] = {}

        for key, type_ in (
            ("name", str),
            ("media-type", str),
            ("desc", str),
            ("size", int),
            ("date", "timestamp"),
            ("width", int),
            ("height", int),
            ("length", int),
        ):
            elt = next(file_metadata_elt.elements(NS_FILE_METADATA, key), None)
            if elt is not None:
                if type_ in (str, int):
                    content = str(elt)
                    if key == "name":
                        # we avoid malformed names or names containing path elements
                        content = Path(content).name
                    elif key == "media-type":
                        key = "media_type"
                    data[key] = type_(content)
                elif type == "timestamp":
                    data[key] = utils.parse_xmpp_date(str(elt))
                else:
                    raise exceptions.InternalError

        try:
            algo, hash_ = self._hash.parseHashElt(file_metadata_elt)
        except exceptions.NotFound:
            pass
        except exceptions.DataError:
            from sat.tools.xml_tools import pFmtElt
            log.warning("invalid <hash/> element:\n{pFmtElt(file_metadata_elt)}")
        else:
            data["file_hash"] = (algo, hash_.decode())

        # TODO: thumbnails

        return data