changeset 3289:9057713ab124

plugin comp file sharing: files can now be uploaded/downloaded via HTTP: plugin XEP-0363 can now be used by components, and file sharing uses it. The new `public_id` file metadata is used to serve files. Files uploaded are put in the `/uploads` path.
author Goffi <goffi@goffi.org>
date Fri, 29 May 2020 21:55:45 +0200
parents 780fb8dd07ef
children 3ff952c042ae
files sat/plugins/plugin_comp_file_sharing.py sat/plugins/plugin_misc_upload.py sat/plugins/plugin_xep_0363.py
diffstat 3 files changed, 409 insertions(+), 30 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_file_sharing.py	Fri May 29 21:50:49 2020 +0200
+++ b/sat/plugins/plugin_comp_file_sharing.py	Fri May 29 21:55:45 2020 +0200
@@ -16,21 +16,29 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import os
+import os.path
+import mimetypes
+from functools import partial
+import shortuuid
+import unicodedata
+from urllib.parse import urljoin, urlparse, quote, unquote
+from dataclasses import dataclass
+from pathlib import Path
 from sat.core.i18n import _
 from sat.core.constants import Const as C
 from sat.core import exceptions
 from sat.core.log import getLogger
 from sat.tools.common import regex
 from sat.tools.common import uri
+from sat.tools.common import files_utils
+from sat.tools.common import tls
 from sat.tools import stream
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from twisted.words.protocols.jabber import error
+from twisted.web import server, resource, static
 from wokkel import pubsub
 from wokkel import generic
-from functools import partial
-import os
-import os.path
-import mimetypes
 
 
 log = getLogger(__name__)
@@ -51,6 +59,7 @@
         "XEP-0261",
         "XEP-0264",
         "XEP-0329",
+        "XEP-0363",
     ],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "FileSharing",
@@ -61,9 +70,228 @@
 HASH_ALGO = "sha-256"
 NS_COMMENTS = "org.salut-a-toi.comments"
 COMMENT_NODE_PREFIX = "org.salut-a-toi.file_comments/"
+# Directory used to buffer request body (i.e. file in case of PUT) we use more than one @
+# there, to be sure than it's not conflicting with a JID
+TMP_BUFFER_DIR = "@@tmp@@"
+
+server.version = unicodedata.normalize(
+    'NFKD',
+    f"{C.APP_NAME} file sharing {C.APP_VERSION}"
+).encode('ascii','ignore')
 
 
-class FileSharing(object):
+class HTTPFileServer(resource.Resource):
+    isLeaf = True
+
+    def errorPage(self, request, code):
+        request.setResponseCode(code)
+        if code == 400:
+            brief = 'Bad Request'
+            details = "Your request is invalid"
+        elif code == 403:
+            brief = 'Forbidden'
+            details = "You're not allowed to use this resource"
+        elif code == 404:
+            brief = 'Not Found'
+            details = "No resource found at this URL"
+        else:
+            brief = 'Error'
+            details = "This resource can't be used"
+            log.error(f"Unexpected return code used: {code}")
+        log.warning(
+            f'Error returned while trying to access url {request.uri.decode()}: '
+            f'"{brief}" ({code}): {details}'
+        )
+
+        return resource.ErrorPage(code, brief, details).render(request)
+
+    def getDispositionType(self, media_type, media_subtype):
+        if media_type in ('image', 'video'):
+            return 'inline'
+        elif media_type == 'application' and media_subtype == 'pdf':
+            return 'inline'
+        else:
+            return 'attachment'
+
+    def render_GET(self, request):
+        try:
+            request.upload_data
+        except exceptions.DataError:
+            return self.errorPage(request, 404)
+
+        defer.ensureDeferred(self.renderGet(request))
+        return server.NOT_DONE_YET
+
+    async def renderGet(self, request):
+        try:
+            upload_id, filename = request.upload_data
+        except exceptions.DataError:
+            request.write(self.errorPage(request, 403))
+            request.finish()
+            return
+        found_files = await request.file_sharing.host.memory.getFiles(
+            client=None, peer_jid=None, perms_to_check=None, public_id=upload_id)
+        if not found_files:
+            request.write(self.errorPage(request, 404))
+            request.finish()
+            return
+        if len(found_files) > 1:
+            log.error(f"more that one files found for public id {upload_id!r}")
+
+        found_file = found_files[0]
+        file_path = request.file_sharing.files_path/found_file['file_hash']
+        file_res = static.File(file_path)
+        file_res.type = f'{found_file["media_type"]}/{found_file["media_subtype"]}'
+        file_res.encoding = file_res.contentEncodings.get(Path(found_file['name']).suffix)
+        disp_type = self.getDispositionType(
+            found_file['media_type'], found_file['media_subtype'])
+        # the URL is percent encoded, and not all browsers/tools unquote the file name,
+        # thus we add a content disposition header
+        request.setHeader(
+            'Content-Disposition',
+            f"{disp_type}; filename*=UTF-8''{quote(found_file['name'])}"
+        )
+        ret = file_res.render(request)
+        if ret != server.NOT_DONE_YET:
+            # HEAD returns directly the result (while GET use a produced)
+            request.write(ret)
+            request.finish()
+
+    def render_PUT(self, request):
+        defer.ensureDeferred(self.renderPut(request))
+        return server.NOT_DONE_YET
+
+    async def renderPut(self, request):
+        try:
+            client, upload_request = request.upload_request_data
+            upload_id, filename = request.upload_data
+        except AttributeError:
+            request.write(self.errorPage(request, 400))
+            request.finish()
+            return
+
+        # at this point request is checked and file is buffered, we can store it
+        # we close the content here, before registering the file
+        request.content.close()
+        tmp_file_path = Path(request.content.name)
+        request.content = None
+
+        file_data = {
+            "name": unquote(upload_request.filename),
+            "mime_type": upload_request.content_type,
+            "size": upload_request.size,
+            "path": "/uploads"
+        }
+
+        await request.file_sharing.registerReceivedFile(
+            client, upload_request.from_, file_data, tmp_file_path,
+            public_id=upload_id,
+        )
+
+        request.setResponseCode(201)
+        request.finish()
+
+
+class FileSharingRequest(server.Request):
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._upload_data = None
+
+    @property
+    def upload_data(self):
+        """A tuple with upload_id and filename retrieve from requested path"""
+        if self._upload_data is not None:
+            return self._upload_data
+
+        # self.path is not available if we are easly in the request (e.g. when gotLength
+        # is called), in which case channel._path must be used. On the other hand, when
+        # render_[VERB] is called, only self.path is available
+        path = self.channel._path if self.path is None else self.path
+        # we normalise the path
+        path = urlparse(path.decode()).path
+        try:
+            __, upload_id, filename = path.split('/')
+        except ValueError:
+            raise exceptions.DataError("no enought path elements")
+        if len(upload_id) < 10:
+            raise exceptions.DataError(f"invalid upload ID received for a PUT: {upload_id!r}")
+
+        self._upload_data = (upload_id, filename)
+        return self._upload_data
+
+    @property
+    def file_sharing(self):
+        return self.channel.site.file_sharing
+
+    @property
+    def file_tmp_dir(self):
+        return self.channel.site.file_tmp_dir
+
+    def refuseRequest(self):
+        if self.content is not None:
+            self.content.close()
+        self.content = open(os.devnull, 'w+b')
+        self.channel._respondToBadRequestAndDisconnect()
+
+    def gotLength(self, length):
+        if self.channel._command.decode().upper() == 'PUT':
+            # for PUT we check early if upload_id is fine, to avoid buffering a file we'll refuse
+            # we buffer the file in component's TMP_BUFFER_DIR, so we just have to rename it at the end
+            try:
+                upload_id, filename = self.upload_data
+            except exceptions.DataError as e:
+                log.warning("Invalid PUT request, we stop here: {e}")
+                return self.refuseRequest()
+            try:
+                client, upload_request, timer = self.file_sharing.expected_uploads.pop(upload_id)
+            except KeyError:
+                log.warning(f"unknown (expired?) upload ID received for a PUT: {upload_id!r}")
+                return self.refuseRequest()
+
+            if not timer.active:
+                log.warning(f"upload id {upload_id!r} used for a PUT, but it is expired")
+                return self.refuseRequest()
+
+            timer.cancel()
+
+            if upload_request.filename != filename:
+                log.warning(
+                    f"invalid filename for PUT (upload id: {upload_id!r}, URL: {self.channel._path.decode()}). Original "
+                    f"{upload_request.filename!r} doesn't match {filename!r}"
+                )
+                return self.refuseRequest()
+
+            self.upload_request_data = (client, upload_request)
+
+            file_tmp_path = files_utils.get_unique_name(
+                self.file_tmp_dir/upload_id)
+
+            self.content = open(file_tmp_path, 'w+b')
+        else:
+            return super().gotLength(length)
+
+
+class FileSharingSite(server.Site):
+    requestFactory = FileSharingRequest
+
+    def __init__(self, file_sharing):
+        self.file_sharing = file_sharing
+        self.file_tmp_dir = file_sharing.host.getLocalPath(
+            None, C.FILES_TMP_DIR, TMP_BUFFER_DIR, component=True, profile=False
+        )
+        for old_file in self.file_tmp_dir.iterdir():
+            log.debug(f"purging old buffer file at {old_file}")
+            old_file.unlink()
+        super().__init__(HTTPFileServer())
+
+    def getContentFile(self, length):
+        file_tmp_path = self.file_tmp_dir/shortuuid.uuid()
+        return open(file_tmp_path, 'w+b')
+
+
+class FileSharing:
+
     def __init__(self, host):
         log.info(_("File Sharing initialization"))
         self.host = host
@@ -71,6 +299,7 @@
         self._jf = host.plugins["XEP-0234"]
         self._h = host.plugins["XEP-0300"]
         self._t = host.plugins["XEP-0264"]
+        host.plugins["XEP-0363"].registerHandler(self._onHTTPUpload)
         host.trigger.add("FILE_getDestDir", self._getDestDirTrigger)
         host.trigger.add(
             "XEP-0234_fileSendingRequest", self._fileSendingRequestTrigger, priority=1000
@@ -79,11 +308,37 @@
         host.trigger.add("XEP-0234_parseFileElement", self._getFileComments)
         host.trigger.add("XEP-0329_compGetFilesFromNode", self._addCommentsData)
         self.files_path = host.getLocalPath(None, C.FILES_DIR, profile=False)
+        self.http_port = host.memory.getConfig(
+            'component file_sharing', 'http_upload_port', 8888)
+        connection_type = host.memory.getConfig(
+            'component file_sharing', 'http_upload_connection_type', 'https')
+        if connection_type not in ('http', 'https'):
+            raise exceptions.ConfigError(
+                f'bad http_upload_connection_type, you must use one of "http" or "https"'
+            )
+        self.server = FileSharingSite(self)
+        self.expected_uploads = {}
+        if connection_type == 'http':
+            reactor.listenTCP(self.http_port, self.server)
+        else:
+            options = tls.getOptionsFromConfig(host.memory.config, "component file_sharing")
+            tls.TLSOptionsCheck(options)
+            context_factory = tls.getTLSContextFactory(options)
+            reactor.listenSSL(self.http_port, self.server, context_factory)
+
 
     def getHandler(self, client):
         return Comments_handler(self)
 
-    def profileConnected(self, client):
+    def profileConnecting(self, client):
+        public_base_url = self.host.memory.getConfig(
+            'component file_sharing', 'http_upload_public_facing_url')
+        if public_base_url is None:
+            client._file_sharing_base_url = f"https://{client.host}:{self.http_port}"
+        else:
+            client._file_sharing_base_url = public_base_url
+        client._file_sharing_allowed_hosts = self.host.memory.getConfig(
+            'component file_sharing', 'http_upload_allowed_hosts_list') or [client.host]
         path = client.file_tmp_dir = os.path.join(
             self.host.memory.getConfig("", "local_dir"),
             C.FILES_TMP_DIR,
@@ -92,34 +347,35 @@
         if not os.path.exists(path):
             os.makedirs(path)
 
-    @defer.inlineCallbacks
-    def _fileTransferedCb(self, __, client, peer_jid, file_data, file_path):
-        """post file reception tasks
+    async def registerReceivedFile(
+            self, client, peer_jid, file_data, file_path, public_id=None, extra=None):
+        """Post file reception tasks
 
-        on file is received, this method create hash/thumbnails if necessary
+        once file is received, this method create hash/thumbnails if necessary
         move the file to the right location, and create metadata entry in database
         """
         name = file_data["name"]
-        extra = {}
+        if extra is None:
+            extra = {}
 
-        if file_data["hash_algo"] == HASH_ALGO:
+        if file_data.get("hash_algo") == HASH_ALGO:
             log.debug(_("Reusing already generated hash"))
             file_hash = file_data["hash_hasher"].hexdigest()
         else:
             hasher = self._h.getHasher(HASH_ALGO)
-            with open("file_path") as f:
-                file_hash = yield self._h.calculateHash(f, hasher)
-        final_path = os.path.join(self.files_path, file_hash)
+            with file_path.open('rb') as f:
+                file_hash = await self._h.calculateHash(f, hasher)
+        final_path = self.files_path/file_hash
 
-        if os.path.isfile(final_path):
+        if final_path.is_file():
             log.debug(
                 "file [{file_hash}] already exists, we can remove temporary one".format(
                     file_hash=file_hash
                 )
             )
-            os.unlink(file_path)
+            file_path.unlink()
         else:
-            os.rename(file_path, final_path)
+            file_path.rename(final_path)
             log.debug(
                 "file [{file_hash}] moved to {files_path}".format(
                     file_hash=file_hash, files_path=self.files_path
@@ -134,7 +390,7 @@
             thumbnails = extra.setdefault(C.KEY_THUMBNAILS, [])
             for max_thumb_size in (self._t.SIZE_SMALL, self._t.SIZE_MEDIUM):
                 try:
-                    thumb_size, thumb_id = yield self._t.generateThumbnail(
+                    thumb_size, thumb_id = await self._t.generateThumbnail(
                         final_path,
                         max_thumb_size,
                         #  we keep thumbnails for 6 months
@@ -155,6 +411,7 @@
             path=file_data.get("path"),
             namespace=file_data.get("namespace"),
             mime_type=mime_type,
+            public_id=public_id,
             owner=peer_jid,
             extra=extra,
         )
@@ -171,14 +428,15 @@
         filename = file_data["name"]
         assert filename and not "/" in filename
         file_tmp_dir = self.host.getLocalPath(
-            client, C.FILES_TMP_DIR, peer_jid.userhost(), component=True, profile=False
+            None, C.FILES_TMP_DIR, peer_jid.userhost(), component=True, profile=False
         )
-        file_tmp_path = file_data["file_path"] = os.path.join(
-            file_tmp_dir, file_data["name"]
-        )
+        file_tmp_path = file_data['file_path'] = files_utils.get_unique_name(
+            file_tmp_dir/filename)
 
         transfer_data["finished_d"].addCallback(
-            self._fileTransferedCb, client, peer_jid, file_data, file_tmp_path
+            lambda __: defer.ensureDeferred(
+                self.registerReceivedFile(client, peer_jid, file_data, file_tmp_path)
+            )
         )
 
         self._f.openFileWrite(
@@ -257,6 +515,32 @@
                 ),
             )
 
+    ## HTTP Upload ##
+
+    def _purge_slot(self, upload_id):
+        try:
+            del self.expected_uploads[upload_id]
+        except KeyError:
+            log.error(f"trying to purge an inexisting upload slot ({upload_id})")
+
+    def _onHTTPUpload(self, client, request):
+        # filename should be already cleaned, but it's better to double check
+        assert '/' not in request.filename
+        if request.from_.host not in client._file_sharing_allowed_hosts:
+            raise error.StanzaError("forbidden")
+
+        upload_id = shortuuid.ShortUUID().random(length=30)
+        assert '/' not in upload_id
+        timer = reactor.callLater(30, self._purge_slot, upload_id)
+        self.expected_uploads[upload_id] = (client, request, timer)
+        url = urljoin(client._file_sharing_base_url, f"{upload_id}/{request.filename}")
+        slot = self.host.plugins["XEP-0363"].Slot(
+            put=url,
+            get=url,
+            headers=[],
+        )
+        return slot
+
     ## comments triggers ##
 
     def _addFileComments(self, file_elt, extra_args):
--- a/sat/plugins/plugin_misc_upload.py	Fri May 29 21:50:49 2020 +0200
+++ b/sat/plugins/plugin_misc_upload.py	Fri May 29 21:55:45 2020 +0200
@@ -35,6 +35,7 @@
     C.PI_NAME: "File Upload",
     C.PI_IMPORT_NAME: "UPLOAD",
     C.PI_TYPE: C.PLUG_TYPE_MISC,
+    C.PI_MODES: C.PLUG_MODE_BOTH,
     C.PI_MAIN: "UploadPlugin",
     C.PI_HANDLER: "no",
     C.PI_DESCRIPTION: _("""File upload management"""),
--- a/sat/plugins/plugin_xep_0363.py	Fri May 29 21:50:49 2020 +0200
+++ b/sat/plugins/plugin_xep_0363.py	Fri May 29 21:55:45 2020 +0200
@@ -18,16 +18,19 @@
 
 import os.path
 import mimetypes
+from typing import NamedTuple, Callable, Optional
 from dataclasses import dataclass
+from urllib import parse
 from wokkel import disco, iwokkel
 from zope.interface import implementer
-from twisted.words.protocols.jabber import jid
-from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.protocols.jabber import jid, xmlstream, error
+from twisted.words.xish import domish
 from twisted.internet import reactor
 from twisted.internet import defer
 from twisted.web import client as http_client
 from twisted.web import http_headers
 from sat.core.i18n import _
+from sat.core.xmpp import SatXMPPComponent
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
 from sat.core import exceptions
@@ -40,6 +43,7 @@
     C.PI_NAME: "HTTP File Upload",
     C.PI_IMPORT_NAME: "XEP-0363",
     C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
     C.PI_PROTOCOLS: ["XEP-0363"],
     C.PI_DEPENDENCIES: ["FILE", "UPLOAD"],
     C.PI_MAIN: "XEP_0363",
@@ -48,6 +52,7 @@
 }
 
 NS_HTTP_UPLOAD = "urn:xmpp:http:upload:0"
+IQ_HTTP_UPLOAD_REQUEST = C.IQ_GET + '/request[@xmlns="' + NS_HTTP_UPLOAD + '"]'
 ALLOWED_HEADERS = ('authorization', 'cookie', 'expires')
 
 
@@ -59,7 +64,21 @@
     headers: list
 
 
-class XEP_0363(object):
+class UploadRequest(NamedTuple):
+    from_: jid.JID
+    filename: str
+    size: int
+    content_type: Optional[str]
+
+
+class RequestHandler(NamedTuple):
+    callback: Callable[[SatXMPPComponent, UploadRequest], Optional[Slot]]
+    priority: int
+
+
+class XEP_0363:
+    Slot=Slot
+
     def __init__(self, host):
         log.info(_("plugin HTTP File Upload initialization"))
         self.host = host
@@ -81,9 +100,26 @@
         host.plugins["UPLOAD"].register(
             "HTTP Upload", self.getHTTPUploadEntity, self.fileHTTPUpload
         )
+        # list of callbacks used when a request is done to a component
+        self.handlers = []
 
     def getHandler(self, client):
-        return XEP_0363_handler()
+        return XEP_0363_handler(self)
+
+    def registerHandler(self, callback, priority=0):
+        """Register a request handler
+
+        @param callack: method to call when a request is done
+            the callback must return a Slot if the request is handled,
+            otherwise, other callbacks will be tried.
+            If the callback raises a StanzaError, its condition will be used if no other
+            callback can handle the request.
+        @param priority: handlers with higher priorities will be called first
+        """
+        assert callback not in self.handlers
+        req_handler = RequestHandler(callback, priority)
+        self.handlers.append(req_handler)
+        self.handlers.sort(key=lambda handler: handler.priority, reverse=True)
 
     async def getHTTPUploadEntity(self, client, upload_jid=None):
         """Get HTTP upload capable entity
@@ -313,9 +349,67 @@
 
         return Slot(put=put_url, get=get_url, headers=headers)
 
+    # component
+
+    def onComponentRequest(self, iq_elt, client):
+        iq_elt.handled=True
+        try:
+            request_elt = next(iq_elt.elements(NS_HTTP_UPLOAD, "request"))
+            request = UploadRequest(
+                from_=jid.JID(iq_elt['from']),
+                filename=parse.quote(request_elt['filename'].replace('/', '_'), safe=''),
+                size=int(request_elt['size']),
+                content_type=request_elt.getAttribute('content-type')
+            )
+        except (StopIteration, KeyError, ValueError):
+            client.sendError(iq_elt, "bad-request")
+            return
+
+        err = None
+
+        for handler in self.handlers:
+            try:
+                slot = handler.callback(client, request)
+            except error.StanzaError as e:
+                log.warning(f"a stanza error has been raised while processing HTTP Upload of request: {e}")
+                if err is None:
+                    # we keep the first error to return its condition later,
+                    # if no other callback handle the request
+                    err = e
+            if slot:
+                break
+        else:
+            log.warning(
+                _("no service can handle HTTP Upload request: {elt}")
+                .format(elt=iq_elt.toXml()))
+            if err is not None:
+                condition = err.condition
+            else:
+                condition = "feature-not-implemented"
+            client.sendError(iq_elt, condition)
+            return
+
+        iq_result_elt = xmlstream.toResponse(iq_elt, "result")
+        slot_elt = iq_result_elt.addElement((NS_HTTP_UPLOAD, 'slot'))
+        put_elt = slot_elt.addElement('put')
+        put_elt['url'] = slot.put
+        get_elt = slot_elt.addElement('get')
+        get_elt['url'] = slot.get
+        client.send(iq_result_elt)
+
 
 @implementer(iwokkel.IDisco)
-class XEP_0363_handler(XMPPHandler):
+class XEP_0363_handler(xmlstream.XMPPHandler):
+
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+
+    def connectionInitialized(self):
+        if self.parent.is_component:
+            self.xmlstream.addObserver(
+                IQ_HTTP_UPLOAD_REQUEST, self.plugin_parent.onComponentRequest,
+                client=self.parent
+            )
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
         return [disco.DiscoFeature(NS_HTTP_UPLOAD)]