view sat/plugins/plugin_xep_0447.py @ 3917:626629781a53

plugin XEP-0420: fix exception on missing `from` or `to`: fix exception raised on missing `from` or `to` attribute when the policy doesn't require them. Patch made by Syndace <me@syndace.dev>
author Goffi <goffi@goffi.org>
date Thu, 06 Oct 2022 16:02:00 +0200
parents 4b7106eede0c
children 0ff265725489
line wrap: on
line source

#!/usr/bin/env python3

# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Dict, List, Tuple, Union, Any

from twisted.words.xish import domish

from sat.core.constants import Const as C
from sat.core.i18n import _
from sat.core.log import getLogger
from sat.core import exceptions

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Stateless File Sharing",
    C.PI_IMPORT_NAME: "XEP-0447",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0447"],
    C.PI_DEPENDENCIES: ["XEP-0103", "XEP-0446"],
    C.PI_MAIN: "XEP_0447",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Implementation of XEP-0447 (Stateless File Sharing)"""),
}

NS_SFS = "urn:xmpp:sfs:0"


class XEP_0447:
    namespace = NS_SFS

    def __init__(self, host):
        log.info(_("XEP-0447 (Stateless File Sharing) plugin initialization"))
        host.registerNamespace("sfs", NS_SFS)
        self._u = host.plugins["XEP-0103"]
        self._m = host.plugins["XEP-0446"]

    def get_file_sharing_elt(
        self,
        sources: List[Dict[str, Any]],
        disposition: Optional[str] = None,
        name: Optional[str] = None,
        media_type: Optional[str] = None,
        desc: Optional[str] = None,
        size: Optional[int] = None,
        file_hash: Optional[Tuple[str, str]] = None,
        date: Optional[Union[float, int]] = None,
        width: Optional[int] = None,
        height: Optional[int] = None,
        length: Optional[int] = None,
        thumbnail: Optional[str] = None,
        **kwargs,
    ) -> domish.Element:
        """Generate the <file-sharing/> element

        @param extra: extra metadata describing how to access the URL
        @return: ``<sfs/>`` element
        """
        file_sharing_elt = domish.Element((NS_SFS, "file-sharing"))
        if disposition is not None:
            file_sharing_elt["disposition"] = disposition
        file_sharing_elt.addChild(
            self._m.get_file_metadata_elt(
                name=name,
                media_type=media_type,
                desc=desc,
                size=size,
                file_hash=file_hash,
                date=date,
                width=width,
                height=height,
                length=length,
                thumbnail=thumbnail,
            )
        )
        sources_elt = file_sharing_elt.addElement("sources")
        for source_data in sources:
            if "url" in source_data:
                sources_elt.addChild(
                    self._u.get_url_data_elt(**source_data)
                )
            else:
                raise NotImplementedError(
                    f"source data not implemented: {source_data}"
                )

        return file_sharing_elt

    def parse_file_sharing_elt(
        self,
        file_sharing_elt: domish.Element
    ) -> Dict[str, Any]:
        """Parse <file-sharing/> element and return file-sharing data

        @param file_sharing_elt: <file-sharing/> element
        @return: file-sharing data. It a dict whose keys correspond to
            [get_file_sharing_elt] parameters
        """
        if file_sharing_elt.name != "file-sharing":
            try:
                file_sharing_elt = next(
                    file_sharing_elt.elements(NS_SFS, "file-sharing")
                )
            except StopIteration:
                raise exceptions.NotFound
        try:
            data = self._m.parse_file_metadata_elt(file_sharing_elt)
        except exceptions.NotFound:
            data = {}
        disposition = file_sharing_elt.getAttribute("disposition")
        if disposition is not None:
            data["disposition"] = disposition
        sources = data["sources"] = []
        try:
            sources_elt = next(file_sharing_elt.elements(NS_SFS, "sources"))
        except StopIteration:
            raise ValueError(f"<sources/> element is missing: {file_sharing_elt.toXml()}")
        for elt in sources_elt.elements():
            if elt.name == "url-data" and elt.uri == self._u.namespace:
                source_data = self._u.parse_url_data_elt(elt)
            else:
                log.warning(f"unmanaged file sharing element: {elt.toXml}")
                continue
            sources.append(source_data)

        return data