Mercurial > libervia-backend
diff sat/plugins/plugin_comp_file_sharing.py @ 3289:9057713ab124
plugin comp file sharing: files can now be uploaded/downloaded via HTTP:
plugin XEP-0363 can now be used by components, and file sharing uses it.
The new `public_id` file metadata is used to serve files.
Files uploaded are put in the `/uploads` path.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 29 May 2020 21:55:45 +0200 |
parents | 9d0df638c8b4 |
children | 449dfbfcdbcc |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_file_sharing.py Fri May 29 21:50:49 2020 +0200 +++ b/sat/plugins/plugin_comp_file_sharing.py Fri May 29 21:55:45 2020 +0200 @@ -16,21 +16,29 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import os.path +import mimetypes +from functools import partial +import shortuuid +import unicodedata +from urllib.parse import urljoin, urlparse, quote, unquote +from dataclasses import dataclass +from pathlib import Path from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core import exceptions from sat.core.log import getLogger from sat.tools.common import regex from sat.tools.common import uri +from sat.tools.common import files_utils +from sat.tools.common import tls from sat.tools import stream -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.words.protocols.jabber import error +from twisted.web import server, resource, static from wokkel import pubsub from wokkel import generic -from functools import partial -import os -import os.path -import mimetypes log = getLogger(__name__) @@ -51,6 +59,7 @@ "XEP-0261", "XEP-0264", "XEP-0329", + "XEP-0363", ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "FileSharing", @@ -61,9 +70,228 @@ HASH_ALGO = "sha-256" NS_COMMENTS = "org.salut-a-toi.comments" COMMENT_NODE_PREFIX = "org.salut-a-toi.file_comments/" +# Directory used to buffer request body (i.e. file in case of PUT) we use more than one @ +# there, to be sure than it's not conflicting with a JID +TMP_BUFFER_DIR = "@@tmp@@" + +server.version = unicodedata.normalize( + 'NFKD', + f"{C.APP_NAME} file sharing {C.APP_VERSION}" +).encode('ascii','ignore') -class FileSharing(object): +class HTTPFileServer(resource.Resource): + isLeaf = True + + def errorPage(self, request, code): + request.setResponseCode(code) + if code == 400: + brief = 'Bad Request' + details = "Your request is invalid" + elif code == 403: + brief = 'Forbidden' + details = "You're not allowed to use this resource" + elif code == 404: + brief = 'Not Found' + details = "No resource found at this URL" + else: + brief = 'Error' + details = "This resource can't be used" + log.error(f"Unexpected return code used: {code}") + log.warning( + f'Error returned while trying to access url {request.uri.decode()}: ' + f'"{brief}" ({code}): {details}' + ) + + return resource.ErrorPage(code, brief, details).render(request) + + def getDispositionType(self, media_type, media_subtype): + if media_type in ('image', 'video'): + return 'inline' + elif media_type == 'application' and media_subtype == 'pdf': + return 'inline' + else: + return 'attachment' + + def render_GET(self, request): + try: + request.upload_data + except exceptions.DataError: + return self.errorPage(request, 404) + + defer.ensureDeferred(self.renderGet(request)) + return server.NOT_DONE_YET + + async def renderGet(self, request): + try: + upload_id, filename = request.upload_data + except exceptions.DataError: + request.write(self.errorPage(request, 403)) + request.finish() + return + found_files = await request.file_sharing.host.memory.getFiles( + client=None, peer_jid=None, perms_to_check=None, public_id=upload_id) + if not found_files: + request.write(self.errorPage(request, 404)) + request.finish() + return + if len(found_files) > 1: + log.error(f"more that one files found for public id {upload_id!r}") + + found_file = found_files[0] + file_path = request.file_sharing.files_path/found_file['file_hash'] + file_res = static.File(file_path) + file_res.type = f'{found_file["media_type"]}/{found_file["media_subtype"]}' + file_res.encoding = file_res.contentEncodings.get(Path(found_file['name']).suffix) + disp_type = self.getDispositionType( + found_file['media_type'], found_file['media_subtype']) + # the URL is percent encoded, and not all browsers/tools unquote the file name, + # thus we add a content disposition header + request.setHeader( + 'Content-Disposition', + f"{disp_type}; filename*=UTF-8''{quote(found_file['name'])}" + ) + ret = file_res.render(request) + if ret != server.NOT_DONE_YET: + # HEAD returns directly the result (while GET use a produced) + request.write(ret) + request.finish() + + def render_PUT(self, request): + defer.ensureDeferred(self.renderPut(request)) + return server.NOT_DONE_YET + + async def renderPut(self, request): + try: + client, upload_request = request.upload_request_data + upload_id, filename = request.upload_data + except AttributeError: + request.write(self.errorPage(request, 400)) + request.finish() + return + + # at this point request is checked and file is buffered, we can store it + # we close the content here, before registering the file + request.content.close() + tmp_file_path = Path(request.content.name) + request.content = None + + file_data = { + "name": unquote(upload_request.filename), + "mime_type": upload_request.content_type, + "size": upload_request.size, + "path": "/uploads" + } + + await request.file_sharing.registerReceivedFile( + client, upload_request.from_, file_data, tmp_file_path, + public_id=upload_id, + ) + + request.setResponseCode(201) + request.finish() + + +class FileSharingRequest(server.Request): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._upload_data = None + + @property + def upload_data(self): + """A tuple with upload_id and filename retrieve from requested path""" + if self._upload_data is not None: + return self._upload_data + + # self.path is not available if we are easly in the request (e.g. when gotLength + # is called), in which case channel._path must be used. On the other hand, when + # render_[VERB] is called, only self.path is available + path = self.channel._path if self.path is None else self.path + # we normalise the path + path = urlparse(path.decode()).path + try: + __, upload_id, filename = path.split('/') + except ValueError: + raise exceptions.DataError("no enought path elements") + if len(upload_id) < 10: + raise exceptions.DataError(f"invalid upload ID received for a PUT: {upload_id!r}") + + self._upload_data = (upload_id, filename) + return self._upload_data + + @property + def file_sharing(self): + return self.channel.site.file_sharing + + @property + def file_tmp_dir(self): + return self.channel.site.file_tmp_dir + + def refuseRequest(self): + if self.content is not None: + self.content.close() + self.content = open(os.devnull, 'w+b') + self.channel._respondToBadRequestAndDisconnect() + + def gotLength(self, length): + if self.channel._command.decode().upper() == 'PUT': + # for PUT we check early if upload_id is fine, to avoid buffering a file we'll refuse + # we buffer the file in component's TMP_BUFFER_DIR, so we just have to rename it at the end + try: + upload_id, filename = self.upload_data + except exceptions.DataError as e: + log.warning("Invalid PUT request, we stop here: {e}") + return self.refuseRequest() + try: + client, upload_request, timer = self.file_sharing.expected_uploads.pop(upload_id) + except KeyError: + log.warning(f"unknown (expired?) upload ID received for a PUT: {upload_id!r}") + return self.refuseRequest() + + if not timer.active: + log.warning(f"upload id {upload_id!r} used for a PUT, but it is expired") + return self.refuseRequest() + + timer.cancel() + + if upload_request.filename != filename: + log.warning( + f"invalid filename for PUT (upload id: {upload_id!r}, URL: {self.channel._path.decode()}). Original " + f"{upload_request.filename!r} doesn't match {filename!r}" + ) + return self.refuseRequest() + + self.upload_request_data = (client, upload_request) + + file_tmp_path = files_utils.get_unique_name( + self.file_tmp_dir/upload_id) + + self.content = open(file_tmp_path, 'w+b') + else: + return super().gotLength(length) + + +class FileSharingSite(server.Site): + requestFactory = FileSharingRequest + + def __init__(self, file_sharing): + self.file_sharing = file_sharing + self.file_tmp_dir = file_sharing.host.getLocalPath( + None, C.FILES_TMP_DIR, TMP_BUFFER_DIR, component=True, profile=False + ) + for old_file in self.file_tmp_dir.iterdir(): + log.debug(f"purging old buffer file at {old_file}") + old_file.unlink() + super().__init__(HTTPFileServer()) + + def getContentFile(self, length): + file_tmp_path = self.file_tmp_dir/shortuuid.uuid() + return open(file_tmp_path, 'w+b') + + +class FileSharing: + def __init__(self, host): log.info(_("File Sharing initialization")) self.host = host @@ -71,6 +299,7 @@ self._jf = host.plugins["XEP-0234"] self._h = host.plugins["XEP-0300"] self._t = host.plugins["XEP-0264"] + host.plugins["XEP-0363"].registerHandler(self._onHTTPUpload) host.trigger.add("FILE_getDestDir", self._getDestDirTrigger) host.trigger.add( "XEP-0234_fileSendingRequest", self._fileSendingRequestTrigger, priority=1000 @@ -79,11 +308,37 @@ host.trigger.add("XEP-0234_parseFileElement", self._getFileComments) host.trigger.add("XEP-0329_compGetFilesFromNode", self._addCommentsData) self.files_path = host.getLocalPath(None, C.FILES_DIR, profile=False) + self.http_port = host.memory.getConfig( + 'component file_sharing', 'http_upload_port', 8888) + connection_type = host.memory.getConfig( + 'component file_sharing', 'http_upload_connection_type', 'https') + if connection_type not in ('http', 'https'): + raise exceptions.ConfigError( + f'bad http_upload_connection_type, you must use one of "http" or "https"' + ) + self.server = FileSharingSite(self) + self.expected_uploads = {} + if connection_type == 'http': + reactor.listenTCP(self.http_port, self.server) + else: + options = tls.getOptionsFromConfig(host.memory.config, "component file_sharing") + tls.TLSOptionsCheck(options) + context_factory = tls.getTLSContextFactory(options) + reactor.listenSSL(self.http_port, self.server, context_factory) + def getHandler(self, client): return Comments_handler(self) - def profileConnected(self, client): + def profileConnecting(self, client): + public_base_url = self.host.memory.getConfig( + 'component file_sharing', 'http_upload_public_facing_url') + if public_base_url is None: + client._file_sharing_base_url = f"https://{client.host}:{self.http_port}" + else: + client._file_sharing_base_url = public_base_url + client._file_sharing_allowed_hosts = self.host.memory.getConfig( + 'component file_sharing', 'http_upload_allowed_hosts_list') or [client.host] path = client.file_tmp_dir = os.path.join( self.host.memory.getConfig("", "local_dir"), C.FILES_TMP_DIR, @@ -92,34 +347,35 @@ if not os.path.exists(path): os.makedirs(path) - @defer.inlineCallbacks - def _fileTransferedCb(self, __, client, peer_jid, file_data, file_path): - """post file reception tasks + async def registerReceivedFile( + self, client, peer_jid, file_data, file_path, public_id=None, extra=None): + """Post file reception tasks - on file is received, this method create hash/thumbnails if necessary + once file is received, this method create hash/thumbnails if necessary move the file to the right location, and create metadata entry in database """ name = file_data["name"] - extra = {} + if extra is None: + extra = {} - if file_data["hash_algo"] == HASH_ALGO: + if file_data.get("hash_algo") == HASH_ALGO: log.debug(_("Reusing already generated hash")) file_hash = file_data["hash_hasher"].hexdigest() else: hasher = self._h.getHasher(HASH_ALGO) - with open("file_path") as f: - file_hash = yield self._h.calculateHash(f, hasher) - final_path = os.path.join(self.files_path, file_hash) + with file_path.open('rb') as f: + file_hash = await self._h.calculateHash(f, hasher) + final_path = self.files_path/file_hash - if os.path.isfile(final_path): + if final_path.is_file(): log.debug( "file [{file_hash}] already exists, we can remove temporary one".format( file_hash=file_hash ) ) - os.unlink(file_path) + file_path.unlink() else: - os.rename(file_path, final_path) + file_path.rename(final_path) log.debug( "file [{file_hash}] moved to {files_path}".format( file_hash=file_hash, files_path=self.files_path @@ -134,7 +390,7 @@ thumbnails = extra.setdefault(C.KEY_THUMBNAILS, []) for max_thumb_size in (self._t.SIZE_SMALL, self._t.SIZE_MEDIUM): try: - thumb_size, thumb_id = yield self._t.generateThumbnail( + thumb_size, thumb_id = await self._t.generateThumbnail( final_path, max_thumb_size, # we keep thumbnails for 6 months @@ -155,6 +411,7 @@ path=file_data.get("path"), namespace=file_data.get("namespace"), mime_type=mime_type, + public_id=public_id, owner=peer_jid, extra=extra, ) @@ -171,14 +428,15 @@ filename = file_data["name"] assert filename and not "/" in filename file_tmp_dir = self.host.getLocalPath( - client, C.FILES_TMP_DIR, peer_jid.userhost(), component=True, profile=False + None, C.FILES_TMP_DIR, peer_jid.userhost(), component=True, profile=False ) - file_tmp_path = file_data["file_path"] = os.path.join( - file_tmp_dir, file_data["name"] - ) + file_tmp_path = file_data['file_path'] = files_utils.get_unique_name( + file_tmp_dir/filename) transfer_data["finished_d"].addCallback( - self._fileTransferedCb, client, peer_jid, file_data, file_tmp_path + lambda __: defer.ensureDeferred( + self.registerReceivedFile(client, peer_jid, file_data, file_tmp_path) + ) ) self._f.openFileWrite( @@ -257,6 +515,32 @@ ), ) + ## HTTP Upload ## + + def _purge_slot(self, upload_id): + try: + del self.expected_uploads[upload_id] + except KeyError: + log.error(f"trying to purge an inexisting upload slot ({upload_id})") + + def _onHTTPUpload(self, client, request): + # filename should be already cleaned, but it's better to double check + assert '/' not in request.filename + if request.from_.host not in client._file_sharing_allowed_hosts: + raise error.StanzaError("forbidden") + + upload_id = shortuuid.ShortUUID().random(length=30) + assert '/' not in upload_id + timer = reactor.callLater(30, self._purge_slot, upload_id) + self.expected_uploads[upload_id] = (client, request, timer) + url = urljoin(client._file_sharing_base_url, f"{upload_id}/{request.filename}") + slot = self.host.plugins["XEP-0363"].Slot( + put=url, + get=url, + headers=[], + ) + return slot + ## comments triggers ## def _addFileComments(self, file_elt, extra_args):