Mercurial > libervia-backend
changeset 3289:9057713ab124
plugin comp file sharing: files can now be uploaded/downloaded via HTTP:
plugin XEP-0363 can now be used by components, and file sharing uses it.
The new `public_id` file metadata is used to serve files.
Files uploaded are put in the `/uploads` path.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 29 May 2020 21:55:45 +0200 |
parents | 780fb8dd07ef |
children | 3ff952c042ae |
files | sat/plugins/plugin_comp_file_sharing.py sat/plugins/plugin_misc_upload.py sat/plugins/plugin_xep_0363.py |
diffstat | 3 files changed, 409 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_file_sharing.py Fri May 29 21:50:49 2020 +0200 +++ b/sat/plugins/plugin_comp_file_sharing.py Fri May 29 21:55:45 2020 +0200 @@ -16,21 +16,29 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import os.path +import mimetypes +from functools import partial +import shortuuid +import unicodedata +from urllib.parse import urljoin, urlparse, quote, unquote +from dataclasses import dataclass +from pathlib import Path from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core import exceptions from sat.core.log import getLogger from sat.tools.common import regex from sat.tools.common import uri +from sat.tools.common import files_utils +from sat.tools.common import tls from sat.tools import stream -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.words.protocols.jabber import error +from twisted.web import server, resource, static from wokkel import pubsub from wokkel import generic -from functools import partial -import os -import os.path -import mimetypes log = getLogger(__name__) @@ -51,6 +59,7 @@ "XEP-0261", "XEP-0264", "XEP-0329", + "XEP-0363", ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "FileSharing", @@ -61,9 +70,228 @@ HASH_ALGO = "sha-256" NS_COMMENTS = "org.salut-a-toi.comments" COMMENT_NODE_PREFIX = "org.salut-a-toi.file_comments/" +# Directory used to buffer request body (i.e. file in case of PUT) we use more than one @ +# there, to be sure than it's not conflicting with a JID +TMP_BUFFER_DIR = "@@tmp@@" + +server.version = unicodedata.normalize( + 'NFKD', + f"{C.APP_NAME} file sharing {C.APP_VERSION}" +).encode('ascii','ignore') -class FileSharing(object): +class HTTPFileServer(resource.Resource): + isLeaf = True + + def errorPage(self, request, code): + request.setResponseCode(code) + if code == 400: + brief = 'Bad Request' + details = "Your request is invalid" + elif code == 403: + brief = 'Forbidden' + details = "You're not allowed to use this resource" + elif code == 404: + brief = 'Not Found' + details = "No resource found at this URL" + else: + brief = 'Error' + details = "This resource can't be used" + log.error(f"Unexpected return code used: {code}") + log.warning( + f'Error returned while trying to access url {request.uri.decode()}: ' + f'"{brief}" ({code}): {details}' + ) + + return resource.ErrorPage(code, brief, details).render(request) + + def getDispositionType(self, media_type, media_subtype): + if media_type in ('image', 'video'): + return 'inline' + elif media_type == 'application' and media_subtype == 'pdf': + return 'inline' + else: + return 'attachment' + + def render_GET(self, request): + try: + request.upload_data + except exceptions.DataError: + return self.errorPage(request, 404) + + defer.ensureDeferred(self.renderGet(request)) + return server.NOT_DONE_YET + + async def renderGet(self, request): + try: + upload_id, filename = request.upload_data + except exceptions.DataError: + request.write(self.errorPage(request, 403)) + request.finish() + return + found_files = await request.file_sharing.host.memory.getFiles( + client=None, peer_jid=None, perms_to_check=None, public_id=upload_id) + if not found_files: + request.write(self.errorPage(request, 404)) + request.finish() + return + if len(found_files) > 1: + log.error(f"more that one files found for public id {upload_id!r}") + + found_file = found_files[0] + file_path = request.file_sharing.files_path/found_file['file_hash'] + file_res = static.File(file_path) + file_res.type = f'{found_file["media_type"]}/{found_file["media_subtype"]}' + file_res.encoding = file_res.contentEncodings.get(Path(found_file['name']).suffix) + disp_type = self.getDispositionType( + found_file['media_type'], found_file['media_subtype']) + # the URL is percent encoded, and not all browsers/tools unquote the file name, + # thus we add a content disposition header + request.setHeader( + 'Content-Disposition', + f"{disp_type}; filename*=UTF-8''{quote(found_file['name'])}" + ) + ret = file_res.render(request) + if ret != server.NOT_DONE_YET: + # HEAD returns directly the result (while GET use a produced) + request.write(ret) + request.finish() + + def render_PUT(self, request): + defer.ensureDeferred(self.renderPut(request)) + return server.NOT_DONE_YET + + async def renderPut(self, request): + try: + client, upload_request = request.upload_request_data + upload_id, filename = request.upload_data + except AttributeError: + request.write(self.errorPage(request, 400)) + request.finish() + return + + # at this point request is checked and file is buffered, we can store it + # we close the content here, before registering the file + request.content.close() + tmp_file_path = Path(request.content.name) + request.content = None + + file_data = { + "name": unquote(upload_request.filename), + "mime_type": upload_request.content_type, + "size": upload_request.size, + "path": "/uploads" + } + + await request.file_sharing.registerReceivedFile( + client, upload_request.from_, file_data, tmp_file_path, + public_id=upload_id, + ) + + request.setResponseCode(201) + request.finish() + + +class FileSharingRequest(server.Request): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._upload_data = None + + @property + def upload_data(self): + """A tuple with upload_id and filename retrieve from requested path""" + if self._upload_data is not None: + return self._upload_data + + # self.path is not available if we are easly in the request (e.g. when gotLength + # is called), in which case channel._path must be used. On the other hand, when + # render_[VERB] is called, only self.path is available + path = self.channel._path if self.path is None else self.path + # we normalise the path + path = urlparse(path.decode()).path + try: + __, upload_id, filename = path.split('/') + except ValueError: + raise exceptions.DataError("no enought path elements") + if len(upload_id) < 10: + raise exceptions.DataError(f"invalid upload ID received for a PUT: {upload_id!r}") + + self._upload_data = (upload_id, filename) + return self._upload_data + + @property + def file_sharing(self): + return self.channel.site.file_sharing + + @property + def file_tmp_dir(self): + return self.channel.site.file_tmp_dir + + def refuseRequest(self): + if self.content is not None: + self.content.close() + self.content = open(os.devnull, 'w+b') + self.channel._respondToBadRequestAndDisconnect() + + def gotLength(self, length): + if self.channel._command.decode().upper() == 'PUT': + # for PUT we check early if upload_id is fine, to avoid buffering a file we'll refuse + # we buffer the file in component's TMP_BUFFER_DIR, so we just have to rename it at the end + try: + upload_id, filename = self.upload_data + except exceptions.DataError as e: + log.warning("Invalid PUT request, we stop here: {e}") + return self.refuseRequest() + try: + client, upload_request, timer = self.file_sharing.expected_uploads.pop(upload_id) + except KeyError: + log.warning(f"unknown (expired?) upload ID received for a PUT: {upload_id!r}") + return self.refuseRequest() + + if not timer.active: + log.warning(f"upload id {upload_id!r} used for a PUT, but it is expired") + return self.refuseRequest() + + timer.cancel() + + if upload_request.filename != filename: + log.warning( + f"invalid filename for PUT (upload id: {upload_id!r}, URL: {self.channel._path.decode()}). Original " + f"{upload_request.filename!r} doesn't match {filename!r}" + ) + return self.refuseRequest() + + self.upload_request_data = (client, upload_request) + + file_tmp_path = files_utils.get_unique_name( + self.file_tmp_dir/upload_id) + + self.content = open(file_tmp_path, 'w+b') + else: + return super().gotLength(length) + + +class FileSharingSite(server.Site): + requestFactory = FileSharingRequest + + def __init__(self, file_sharing): + self.file_sharing = file_sharing + self.file_tmp_dir = file_sharing.host.getLocalPath( + None, C.FILES_TMP_DIR, TMP_BUFFER_DIR, component=True, profile=False + ) + for old_file in self.file_tmp_dir.iterdir(): + log.debug(f"purging old buffer file at {old_file}") + old_file.unlink() + super().__init__(HTTPFileServer()) + + def getContentFile(self, length): + file_tmp_path = self.file_tmp_dir/shortuuid.uuid() + return open(file_tmp_path, 'w+b') + + +class FileSharing: + def __init__(self, host): log.info(_("File Sharing initialization")) self.host = host @@ -71,6 +299,7 @@ self._jf = host.plugins["XEP-0234"] self._h = host.plugins["XEP-0300"] self._t = host.plugins["XEP-0264"] + host.plugins["XEP-0363"].registerHandler(self._onHTTPUpload) host.trigger.add("FILE_getDestDir", self._getDestDirTrigger) host.trigger.add( "XEP-0234_fileSendingRequest", self._fileSendingRequestTrigger, priority=1000 @@ -79,11 +308,37 @@ host.trigger.add("XEP-0234_parseFileElement", self._getFileComments) host.trigger.add("XEP-0329_compGetFilesFromNode", self._addCommentsData) self.files_path = host.getLocalPath(None, C.FILES_DIR, profile=False) + self.http_port = host.memory.getConfig( + 'component file_sharing', 'http_upload_port', 8888) + connection_type = host.memory.getConfig( + 'component file_sharing', 'http_upload_connection_type', 'https') + if connection_type not in ('http', 'https'): + raise exceptions.ConfigError( + f'bad http_upload_connection_type, you must use one of "http" or "https"' + ) + self.server = FileSharingSite(self) + self.expected_uploads = {} + if connection_type == 'http': + reactor.listenTCP(self.http_port, self.server) + else: + options = tls.getOptionsFromConfig(host.memory.config, "component file_sharing") + tls.TLSOptionsCheck(options) + context_factory = tls.getTLSContextFactory(options) + reactor.listenSSL(self.http_port, self.server, context_factory) + def getHandler(self, client): return Comments_handler(self) - def profileConnected(self, client): + def profileConnecting(self, client): + public_base_url = self.host.memory.getConfig( + 'component file_sharing', 'http_upload_public_facing_url') + if public_base_url is None: + client._file_sharing_base_url = f"https://{client.host}:{self.http_port}" + else: + client._file_sharing_base_url = public_base_url + client._file_sharing_allowed_hosts = self.host.memory.getConfig( + 'component file_sharing', 'http_upload_allowed_hosts_list') or [client.host] path = client.file_tmp_dir = os.path.join( self.host.memory.getConfig("", "local_dir"), C.FILES_TMP_DIR, @@ -92,34 +347,35 @@ if not os.path.exists(path): os.makedirs(path) - @defer.inlineCallbacks - def _fileTransferedCb(self, __, client, peer_jid, file_data, file_path): - """post file reception tasks + async def registerReceivedFile( + self, client, peer_jid, file_data, file_path, public_id=None, extra=None): + """Post file reception tasks - on file is received, this method create hash/thumbnails if necessary + once file is received, this method create hash/thumbnails if necessary move the file to the right location, and create metadata entry in database """ name = file_data["name"] - extra = {} + if extra is None: + extra = {} - if file_data["hash_algo"] == HASH_ALGO: + if file_data.get("hash_algo") == HASH_ALGO: log.debug(_("Reusing already generated hash")) file_hash = file_data["hash_hasher"].hexdigest() else: hasher = self._h.getHasher(HASH_ALGO) - with open("file_path") as f: - file_hash = yield self._h.calculateHash(f, hasher) - final_path = os.path.join(self.files_path, file_hash) + with file_path.open('rb') as f: + file_hash = await self._h.calculateHash(f, hasher) + final_path = self.files_path/file_hash - if os.path.isfile(final_path): + if final_path.is_file(): log.debug( "file [{file_hash}] already exists, we can remove temporary one".format( file_hash=file_hash ) ) - os.unlink(file_path) + file_path.unlink() else: - os.rename(file_path, final_path) + file_path.rename(final_path) log.debug( "file [{file_hash}] moved to {files_path}".format( file_hash=file_hash, files_path=self.files_path @@ -134,7 +390,7 @@ thumbnails = extra.setdefault(C.KEY_THUMBNAILS, []) for max_thumb_size in (self._t.SIZE_SMALL, self._t.SIZE_MEDIUM): try: - thumb_size, thumb_id = yield self._t.generateThumbnail( + thumb_size, thumb_id = await self._t.generateThumbnail( final_path, max_thumb_size, # we keep thumbnails for 6 months @@ -155,6 +411,7 @@ path=file_data.get("path"), namespace=file_data.get("namespace"), mime_type=mime_type, + public_id=public_id, owner=peer_jid, extra=extra, ) @@ -171,14 +428,15 @@ filename = file_data["name"] assert filename and not "/" in filename file_tmp_dir = self.host.getLocalPath( - client, C.FILES_TMP_DIR, peer_jid.userhost(), component=True, profile=False + None, C.FILES_TMP_DIR, peer_jid.userhost(), component=True, profile=False ) - file_tmp_path = file_data["file_path"] = os.path.join( - file_tmp_dir, file_data["name"] - ) + file_tmp_path = file_data['file_path'] = files_utils.get_unique_name( + file_tmp_dir/filename) transfer_data["finished_d"].addCallback( - self._fileTransferedCb, client, peer_jid, file_data, file_tmp_path + lambda __: defer.ensureDeferred( + self.registerReceivedFile(client, peer_jid, file_data, file_tmp_path) + ) ) self._f.openFileWrite( @@ -257,6 +515,32 @@ ), ) + ## HTTP Upload ## + + def _purge_slot(self, upload_id): + try: + del self.expected_uploads[upload_id] + except KeyError: + log.error(f"trying to purge an inexisting upload slot ({upload_id})") + + def _onHTTPUpload(self, client, request): + # filename should be already cleaned, but it's better to double check + assert '/' not in request.filename + if request.from_.host not in client._file_sharing_allowed_hosts: + raise error.StanzaError("forbidden") + + upload_id = shortuuid.ShortUUID().random(length=30) + assert '/' not in upload_id + timer = reactor.callLater(30, self._purge_slot, upload_id) + self.expected_uploads[upload_id] = (client, request, timer) + url = urljoin(client._file_sharing_base_url, f"{upload_id}/{request.filename}") + slot = self.host.plugins["XEP-0363"].Slot( + put=url, + get=url, + headers=[], + ) + return slot + ## comments triggers ## def _addFileComments(self, file_elt, extra_args):
--- a/sat/plugins/plugin_misc_upload.py Fri May 29 21:50:49 2020 +0200 +++ b/sat/plugins/plugin_misc_upload.py Fri May 29 21:55:45 2020 +0200 @@ -35,6 +35,7 @@ C.PI_NAME: "File Upload", C.PI_IMPORT_NAME: "UPLOAD", C.PI_TYPE: C.PLUG_TYPE_MISC, + C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_MAIN: "UploadPlugin", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""File upload management"""),
--- a/sat/plugins/plugin_xep_0363.py Fri May 29 21:50:49 2020 +0200 +++ b/sat/plugins/plugin_xep_0363.py Fri May 29 21:55:45 2020 +0200 @@ -18,16 +18,19 @@ import os.path import mimetypes +from typing import NamedTuple, Callable, Optional from dataclasses import dataclass +from urllib import parse from wokkel import disco, iwokkel from zope.interface import implementer -from twisted.words.protocols.jabber import jid -from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.protocols.jabber import jid, xmlstream, error +from twisted.words.xish import domish from twisted.internet import reactor from twisted.internet import defer from twisted.web import client as http_client from twisted.web import http_headers from sat.core.i18n import _ +from sat.core.xmpp import SatXMPPComponent from sat.core.constants import Const as C from sat.core.log import getLogger from sat.core import exceptions @@ -40,6 +43,7 @@ C.PI_NAME: "HTTP File Upload", C.PI_IMPORT_NAME: "XEP-0363", C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0363"], C.PI_DEPENDENCIES: ["FILE", "UPLOAD"], C.PI_MAIN: "XEP_0363", @@ -48,6 +52,7 @@ } NS_HTTP_UPLOAD = "urn:xmpp:http:upload:0" +IQ_HTTP_UPLOAD_REQUEST = C.IQ_GET + '/request[@xmlns="' + NS_HTTP_UPLOAD + '"]' ALLOWED_HEADERS = ('authorization', 'cookie', 'expires') @@ -59,7 +64,21 @@ headers: list -class XEP_0363(object): +class UploadRequest(NamedTuple): + from_: jid.JID + filename: str + size: int + content_type: Optional[str] + + +class RequestHandler(NamedTuple): + callback: Callable[[SatXMPPComponent, UploadRequest], Optional[Slot]] + priority: int + + +class XEP_0363: + Slot=Slot + def __init__(self, host): log.info(_("plugin HTTP File Upload initialization")) self.host = host @@ -81,9 +100,26 @@ host.plugins["UPLOAD"].register( "HTTP Upload", self.getHTTPUploadEntity, self.fileHTTPUpload ) + # list of callbacks used when a request is done to a component + self.handlers = [] def getHandler(self, client): - return XEP_0363_handler() + return XEP_0363_handler(self) + + def registerHandler(self, callback, priority=0): + """Register a request handler + + @param callack: method to call when a request is done + the callback must return a Slot if the request is handled, + otherwise, other callbacks will be tried. + If the callback raises a StanzaError, its condition will be used if no other + callback can handle the request. + @param priority: handlers with higher priorities will be called first + """ + assert callback not in self.handlers + req_handler = RequestHandler(callback, priority) + self.handlers.append(req_handler) + self.handlers.sort(key=lambda handler: handler.priority, reverse=True) async def getHTTPUploadEntity(self, client, upload_jid=None): """Get HTTP upload capable entity @@ -313,9 +349,67 @@ return Slot(put=put_url, get=get_url, headers=headers) + # component + + def onComponentRequest(self, iq_elt, client): + iq_elt.handled=True + try: + request_elt = next(iq_elt.elements(NS_HTTP_UPLOAD, "request")) + request = UploadRequest( + from_=jid.JID(iq_elt['from']), + filename=parse.quote(request_elt['filename'].replace('/', '_'), safe=''), + size=int(request_elt['size']), + content_type=request_elt.getAttribute('content-type') + ) + except (StopIteration, KeyError, ValueError): + client.sendError(iq_elt, "bad-request") + return + + err = None + + for handler in self.handlers: + try: + slot = handler.callback(client, request) + except error.StanzaError as e: + log.warning(f"a stanza error has been raised while processing HTTP Upload of request: {e}") + if err is None: + # we keep the first error to return its condition later, + # if no other callback handle the request + err = e + if slot: + break + else: + log.warning( + _("no service can handle HTTP Upload request: {elt}") + .format(elt=iq_elt.toXml())) + if err is not None: + condition = err.condition + else: + condition = "feature-not-implemented" + client.sendError(iq_elt, condition) + return + + iq_result_elt = xmlstream.toResponse(iq_elt, "result") + slot_elt = iq_result_elt.addElement((NS_HTTP_UPLOAD, 'slot')) + put_elt = slot_elt.addElement('put') + put_elt['url'] = slot.put + get_elt = slot_elt.addElement('get') + get_elt['url'] = slot.get + client.send(iq_result_elt) + @implementer(iwokkel.IDisco) -class XEP_0363_handler(XMPPHandler): +class XEP_0363_handler(xmlstream.XMPPHandler): + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + def connectionInitialized(self): + if self.parent.is_component: + self.xmlstream.addObserver( + IQ_HTTP_UPLOAD_REQUEST, self.plugin_parent.onComponentRequest, + client=self.parent + ) def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_HTTP_UPLOAD)]