diff libervia/backend/plugins/plugin_xep_0363.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0363.py@524856bd7b19
children 4a8b29ab34c0
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0363.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,451 @@
+#!/usr/bin/env python3
+
+# SàT plugin for HTTP File Upload (XEP-0363)
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from dataclasses import dataclass
+import mimetypes
+import os.path
+from pathlib import Path
+from typing import Callable, NamedTuple, Optional, Tuple
+from urllib import parse
+
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.web import client as http_client
+from twisted.web import http_headers
+from twisted.words.protocols.jabber import error, jid, xmlstream
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.xmpp import SatXMPPComponent
+from libervia.backend.tools import utils, web as sat_web
+
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "HTTP File Upload",
+    C.PI_IMPORT_NAME: "XEP-0363",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0363"],
+    C.PI_DEPENDENCIES: ["FILE", "UPLOAD"],
+    C.PI_MAIN: "XEP_0363",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Implementation of HTTP File Upload"""),
+}
+
+NS_HTTP_UPLOAD = "urn:xmpp:http:upload:0"
+IQ_HTTP_UPLOAD_REQUEST = C.IQ_GET + '/request[@xmlns="' + NS_HTTP_UPLOAD + '"]'
+ALLOWED_HEADERS = ('authorization', 'cookie', 'expires')
+
+
+@dataclass
+class Slot:
+    """Upload slot"""
+    put: str
+    get: str
+    headers: list
+
+
+class UploadRequest(NamedTuple):
+    from_: jid.JID
+    filename: str
+    size: int
+    content_type: Optional[str]
+
+
+class RequestHandler(NamedTuple):
+    callback: Callable[[SatXMPPComponent, UploadRequest], Optional[Slot]]
+    priority: int
+
+
+class XEP_0363:
+    Slot=Slot
+
+    def __init__(self, host):
+        log.info(_("plugin HTTP File Upload initialization"))
+        self.host = host
+        host.bridge.add_method(
+            "file_http_upload",
+            ".plugin",
+            in_sign="sssbs",
+            out_sign="",
+            method=self._file_http_upload,
+        )
+        host.bridge.add_method(
+            "file_http_upload_get_slot",
+            ".plugin",
+            in_sign="sisss",
+            out_sign="(ssaa{ss})",
+            method=self._get_slot,
+            async_=True,
+        )
+        host.plugins["UPLOAD"].register(
+            "HTTP Upload", self.get_http_upload_entity, self.file_http_upload
+        )
+        # list of callbacks used when a request is done to a component
+        self.handlers = []
+        # XXX: there is not yet official short name, so we use "http_upload"
+        host.register_namespace("http_upload", NS_HTTP_UPLOAD)
+
+    def get_handler(self, client):
+        return XEP_0363_handler(self)
+
+    def register_handler(self, callback, priority=0):
+        """Register a request handler
+
+        @param callack: method to call when a request is done
+            the callback must return a Slot if the request is handled,
+            otherwise, other callbacks will be tried.
+            If the callback raises a StanzaError, its condition will be used if no other
+            callback can handle the request.
+        @param priority: handlers with higher priorities will be called first
+        """
+        assert callback not in self.handlers
+        req_handler = RequestHandler(callback, priority)
+        self.handlers.append(req_handler)
+        self.handlers.sort(key=lambda handler: handler.priority, reverse=True)
+
+    def get_file_too_large_elt(self, max_size: int) -> domish.Element:
+        """Generate <file-too-large> app condition for errors"""
+        file_too_large_elt = domish.Element((NS_HTTP_UPLOAD, "file-too-large"))
+        file_too_large_elt.addElement("max-file-size", str(max_size))
+        return file_too_large_elt
+
+    async def get_http_upload_entity(self, client, upload_jid=None):
+        """Get HTTP upload capable entity
+
+         upload_jid is checked, then its components
+         @param upload_jid(None, jid.JID): entity to check
+         @return(D(jid.JID)): first HTTP upload capable entity
+         @raise exceptions.NotFound: no entity found
+         """
+        try:
+            entity = client.http_upload_service
+        except AttributeError:
+            found_entities = await self.host.find_features_set(client, (NS_HTTP_UPLOAD,))
+            try:
+                entity = client.http_upload_service = next(iter(found_entities))
+            except StopIteration:
+                entity = client.http_upload_service = None
+
+        if entity is None:
+            raise exceptions.NotFound("No HTTP upload entity found")
+
+        return entity
+
+    def _file_http_upload(self, filepath, filename="", upload_jid="",
+                        ignore_tls_errors=False, profile=C.PROF_KEY_NONE):
+        assert os.path.isabs(filepath) and os.path.isfile(filepath)
+        client = self.host.get_client(profile)
+        return defer.ensureDeferred(self.file_http_upload(
+            client,
+            filepath,
+            filename or None,
+            jid.JID(upload_jid) if upload_jid else None,
+            {"ignore_tls_errors": ignore_tls_errors},
+        ))
+
+    async def file_http_upload(
+        self,
+        client: SatXMPPEntity,
+        filepath: Path,
+        filename: Optional[str] = None,
+        upload_jid: Optional[jid.JID] = None,
+        extra: Optional[dict] = None
+    ) -> Tuple[str, defer.Deferred]:
+        """Upload a file through HTTP
+
+        @param filepath: absolute path of the file
+        @param filename: name to use for the upload
+            None to use basename of the path
+        @param upload_jid: upload capable entity jid,
+            or None to use autodetected, if possible
+        @param extra: options where key can be:
+            - ignore_tls_errors(bool): if True, SSL certificate will not be checked
+            - attachment(dict): file attachment data
+        @param profile: %(doc_profile)s
+        @return: progress id and Deferred which fire download URL
+        """
+        if extra is None:
+            extra = {}
+        ignore_tls_errors = extra.get("ignore_tls_errors", False)
+        file_metadata = {
+            "filename": filename or os.path.basename(filepath),
+            "filepath": filepath,
+            "size": os.path.getsize(filepath),
+        }
+
+        #: this trigger can be used to modify the filename or size requested when geting
+        #: the slot, it is notably useful with encryption.
+        self.host.trigger.point(
+            "XEP-0363_upload_pre_slot", client, extra, file_metadata,
+            triggers_no_cancel=True
+        )
+        try:
+            slot = await self.get_slot(
+                client, file_metadata["filename"], file_metadata["size"],
+                upload_jid=upload_jid
+            )
+        except Exception as e:
+            log.warning(_("Can't get upload slot: {reason}").format(reason=e))
+            raise e
+        else:
+            log.debug(f"Got upload slot: {slot}")
+            sat_file = self.host.plugins["FILE"].File(
+                self.host, client, filepath, uid=extra.get("progress_id"),
+                size=file_metadata["size"],
+                auto_end_signals=False
+            )
+            progress_id = sat_file.uid
+
+            file_producer = http_client.FileBodyProducer(sat_file)
+
+            if ignore_tls_errors:
+                agent = http_client.Agent(reactor, sat_web.NoCheckContextFactory())
+            else:
+                agent = http_client.Agent(reactor)
+
+            headers = {"User-Agent": [C.APP_NAME.encode("utf-8")]}
+
+            for name, value in slot.headers:
+                name = name.encode('utf-8')
+                value = value.encode('utf-8')
+                headers[name] = value
+
+
+            await self.host.trigger.async_point(
+                "XEP-0363_upload", client, extra, sat_file, file_producer, slot,
+                triggers_no_cancel=True)
+
+            download_d = agent.request(
+                b"PUT",
+                slot.put.encode("utf-8"),
+                http_headers.Headers(headers),
+                file_producer,
+            )
+            download_d.addCallbacks(
+                self._upload_cb,
+                self._upload_eb,
+                (sat_file, slot),
+                None,
+                (sat_file,),
+            )
+
+            return progress_id, download_d
+
+    def _upload_cb(self, __, sat_file, slot):
+        """Called once file is successfully uploaded
+
+        @param sat_file(SatFile): file used for the upload
+            should be closed, but it is needed to send the progress_finished signal
+        @param slot(Slot): put/get urls
+        """
+        log.info(f"HTTP upload finished ({slot.get})")
+        sat_file.progress_finished({"url": slot.get})
+        return slot.get
+
+    def _upload_eb(self, failure_, sat_file):
+        """Called on unsuccessful upload
+
+        @param sat_file(SatFile): file used for the upload
+            should be closed, be is needed to send the progress_error signal
+        """
+        try:
+            wrapped_fail = failure_.value.reasons[0]
+        except (AttributeError, IndexError) as e:
+            log.warning(_("upload failed: {reason}").format(reason=e))
+            sat_file.progress_error(str(failure_))
+        else:
+            if wrapped_fail.check(sat_web.SSLError):
+                msg = "TLS validation error, can't connect to HTTPS server"
+            else:
+                msg = "can't upload file"
+            log.warning(msg + ": " + str(wrapped_fail.value))
+            sat_file.progress_error(msg)
+        raise failure_
+
+    def _get_slot(self, filename, size, content_type, upload_jid,
+                 profile_key=C.PROF_KEY_NONE):
+        """Get an upload slot
+
+        This method can be used when uploading is done by the frontend
+        @param filename(unicode): name of the file to upload
+        @param size(int): size of the file (must be non null)
+        @param upload_jid(str, ''): HTTP upload capable entity
+        @param content_type(unicode, None): MIME type of the content
+            empty string or None to guess automatically
+        """
+        client = self.host.get_client(profile_key)
+        filename = filename.replace("/", "_")
+        d = defer.ensureDeferred(self.get_slot(
+            client, filename, size, content_type or None, jid.JID(upload_jid) or None
+        ))
+        d.addCallback(lambda slot: (slot.get, slot.put, slot.headers))
+        return d
+
+    async def get_slot(self, client, filename, size, content_type=None, upload_jid=None):
+        """Get a slot (i.e. download/upload links)
+
+        @param filename(unicode): name to use for the upload
+        @param size(int): size of the file to upload (must be >0)
+        @param content_type(None, unicode): MIME type of the content
+            None to autodetect
+        @param upload_jid(jid.JID, None): HTTP upload capable upload_jid
+            or None to use the server component (if any)
+        @param client: %(doc_client)s
+        @return (Slot): the upload (put) and download (get) URLs
+        @raise exceptions.NotFound: no HTTP upload capable upload_jid has been found
+        """
+        assert filename and size
+        if content_type is None:
+            # TODO: manage python magic for file guessing (in a dedicated plugin ?)
+            content_type = mimetypes.guess_type(filename, strict=False)[0]
+
+        if upload_jid is None:
+            try:
+                upload_jid = client.http_upload_service
+            except AttributeError:
+                found_entity = await self.get_http_upload_entity(client)
+                return await self.get_slot(
+                    client, filename, size, content_type, found_entity)
+            else:
+                if upload_jid is None:
+                    raise exceptions.NotFound("No HTTP upload entity found")
+
+        iq_elt = client.IQ("get")
+        iq_elt["to"] = upload_jid.full()
+        request_elt = iq_elt.addElement((NS_HTTP_UPLOAD, "request"))
+        request_elt["filename"] = filename
+        request_elt["size"] = str(size)
+        if content_type is not None:
+            request_elt["content-type"] = content_type
+
+        iq_result_elt = await iq_elt.send()
+
+        try:
+            slot_elt = next(iq_result_elt.elements(NS_HTTP_UPLOAD, "slot"))
+            put_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "put"))
+            put_url = put_elt['url']
+            get_elt = next(slot_elt.elements(NS_HTTP_UPLOAD, "get"))
+            get_url = get_elt['url']
+        except (StopIteration, KeyError):
+            raise exceptions.DataError("Incorrect stanza received from server")
+
+        headers = []
+        for header_elt in put_elt.elements(NS_HTTP_UPLOAD, "header"):
+            try:
+                name = header_elt["name"]
+                value = str(header_elt)
+            except KeyError:
+                log.warning(_("Invalid header element: {xml}").format(
+                    iq_result_elt.toXml()))
+                continue
+            name = name.replace('\n', '')
+            value = value.replace('\n', '')
+            if name.lower() not in ALLOWED_HEADERS:
+                log.warning(_('Ignoring unauthorised header "{name}": {xml}')
+                    .format(name=name, xml = iq_result_elt.toXml()))
+                continue
+            headers.append((name, value))
+
+        return Slot(put=put_url, get=get_url, headers=headers)
+
+    # component
+
+    def on_component_request(self, iq_elt, client):
+        iq_elt.handled=True
+        defer.ensureDeferred(self.handle_component_request(client, iq_elt))
+
+    async def handle_component_request(self, client, iq_elt):
+        try:
+            request_elt = next(iq_elt.elements(NS_HTTP_UPLOAD, "request"))
+            request = UploadRequest(
+                from_=jid.JID(iq_elt['from']),
+                filename=parse.quote(request_elt['filename'].replace('/', '_'), safe=''),
+                size=int(request_elt['size']),
+                content_type=request_elt.getAttribute('content-type')
+            )
+        except (StopIteration, KeyError, ValueError):
+            client.sendError(iq_elt, "bad-request")
+            return
+
+        err = None
+
+        for handler in self.handlers:
+            try:
+                slot = await utils.as_deferred(handler.callback, client, request)
+            except error.StanzaError as e:
+                log.warning(
+                    "a stanza error has been raised while processing HTTP Upload of "
+                    f"request: {e}"
+                )
+                if err is None:
+                    # we keep the first error to return its condition later,
+                    # if no other callback handle the request
+                    err = e
+            else:
+                if slot:
+                    break
+        else:
+            log.warning(
+                _("no service can handle HTTP Upload request: {elt}")
+                .format(elt=iq_elt.toXml()))
+            if err is None:
+                err = error.StanzaError("feature-not-implemented")
+            client.send(err.toResponse(iq_elt))
+            return
+
+        iq_result_elt = xmlstream.toResponse(iq_elt, "result")
+        slot_elt = iq_result_elt.addElement((NS_HTTP_UPLOAD, 'slot'))
+        put_elt = slot_elt.addElement('put')
+        put_elt['url'] = slot.put
+        get_elt = slot_elt.addElement('get')
+        get_elt['url'] = slot.get
+        client.send(iq_result_elt)
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0363_handler(xmlstream.XMPPHandler):
+
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+
+    def connectionInitialized(self):
+        if ((self.parent.is_component
+             and PLUGIN_INFO[C.PI_IMPORT_NAME] in self.parent.enabled_features)):
+            self.xmlstream.addObserver(
+                IQ_HTTP_UPLOAD_REQUEST, self.plugin_parent.on_component_request,
+                client=self.parent
+            )
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        if ((self.parent.is_component
+             and not PLUGIN_INFO[C.PI_IMPORT_NAME] in self.parent.enabled_features)):
+            return []
+        else:
+            return [disco.DiscoFeature(NS_HTTP_UPLOAD)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []