view sat/plugins/plugin_comp_file_sharing.py @ 3292:84f77f04aa08

tools (common/utils): new OrderedSet class
author Goffi <goffi@goffi.org>
date Mon, 08 Jun 2020 08:54:53 +0200
parents 449dfbfcdbcc
children da443cf946ad
line wrap: on
line source

#!/usr/bin/env python3

# SAT plugin for parrot mode (experimental)
# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import os.path
import mimetypes
from functools import partial
import shortuuid
import unicodedata
from urllib.parse import urljoin, urlparse, quote, unquote
from dataclasses import dataclass
from pathlib import Path
from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core import exceptions
from sat.core.log import getLogger
from sat.tools.common import regex
from sat.tools.common import uri
from sat.tools.common import files_utils
from sat.tools.common import tls
from sat.tools import stream
from twisted.internet import defer, reactor
from twisted.words.protocols.jabber import error
from twisted.web import server, resource, static
from wokkel import pubsub
from wokkel import generic


log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "File sharing component",
    C.PI_IMPORT_NAME: "file_sharing",
    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: [
        "FILE",
        "FILE_SHARING_MANAGEMENT",
        "XEP-0231",
        "XEP-0234",
        "XEP-0260",
        "XEP-0261",
        "XEP-0264",
        "XEP-0329",
        "XEP-0363",
    ],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "FileSharing",
    C.PI_HANDLER: C.BOOL_TRUE,
    C.PI_DESCRIPTION: _("""Component hosting and sharing files"""),
}

HASH_ALGO = "sha-256"
NS_COMMENTS = "org.salut-a-toi.comments"
COMMENT_NODE_PREFIX = "org.salut-a-toi.file_comments/"
# Directory used to buffer request body (i.e. file in case of PUT) we use more than one @
# there, to be sure than it's not conflicting with a JID
TMP_BUFFER_DIR = "@@tmp@@"

server.version = unicodedata.normalize(
    'NFKD',
    f"{C.APP_NAME} file sharing {C.APP_VERSION}"
).encode('ascii','ignore')


class HTTPFileServer(resource.Resource):
    isLeaf = True

    def errorPage(self, request, code):
        request.setResponseCode(code)
        if code == 400:
            brief = 'Bad Request'
            details = "Your request is invalid"
        elif code == 403:
            brief = 'Forbidden'
            details = "You're not allowed to use this resource"
        elif code == 404:
            brief = 'Not Found'
            details = "No resource found at this URL"
        else:
            brief = 'Error'
            details = "This resource can't be used"
            log.error(f"Unexpected return code used: {code}")
        log.warning(
            f'Error returned while trying to access url {request.uri.decode()}: '
            f'"{brief}" ({code}): {details}'
        )

        return resource.ErrorPage(code, brief, details).render(request)

    def getDispositionType(self, media_type, media_subtype):
        if media_type in ('image', 'video'):
            return 'inline'
        elif media_type == 'application' and media_subtype == 'pdf':
            return 'inline'
        else:
            return 'attachment'

    def render_GET(self, request):
        try:
            request.upload_data
        except exceptions.DataError:
            return self.errorPage(request, 404)

        defer.ensureDeferred(self.renderGet(request))
        return server.NOT_DONE_YET

    async def renderGet(self, request):
        try:
            upload_id, filename = request.upload_data
        except exceptions.DataError:
            request.write(self.errorPage(request, 403))
            request.finish()
            return
        found_files = await request.file_sharing.host.memory.getFiles(
            client=None, peer_jid=None, perms_to_check=None, public_id=upload_id)
        if not found_files:
            request.write(self.errorPage(request, 404))
            request.finish()
            return
        if len(found_files) > 1:
            log.error(f"more that one files found for public id {upload_id!r}")

        found_file = found_files[0]
        file_path = request.file_sharing.files_path/found_file['file_hash']
        file_res = static.File(file_path)
        file_res.type = f'{found_file["media_type"]}/{found_file["media_subtype"]}'
        file_res.encoding = file_res.contentEncodings.get(Path(found_file['name']).suffix)
        disp_type = self.getDispositionType(
            found_file['media_type'], found_file['media_subtype'])
        # the URL is percent encoded, and not all browsers/tools unquote the file name,
        # thus we add a content disposition header
        request.setHeader(
            'Content-Disposition',
            f"{disp_type}; filename*=UTF-8''{quote(found_file['name'])}"
        )
        ret = file_res.render(request)
        if ret != server.NOT_DONE_YET:
            # HEAD returns directly the result (while GET use a produced)
            request.write(ret)
            request.finish()

    def render_PUT(self, request):
        defer.ensureDeferred(self.renderPut(request))
        return server.NOT_DONE_YET

    async def renderPut(self, request):
        try:
            client, upload_request = request.upload_request_data
            upload_id, filename = request.upload_data
        except AttributeError:
            request.write(self.errorPage(request, 400))
            request.finish()
            return

        # at this point request is checked and file is buffered, we can store it
        # we close the content here, before registering the file
        request.content.close()
        tmp_file_path = Path(request.content.name)
        request.content = None

        file_data = {
            "name": unquote(upload_request.filename),
            "mime_type": upload_request.content_type,
            "size": upload_request.size,
            "path": "/uploads"
        }

        await request.file_sharing.registerReceivedFile(
            client, upload_request.from_, file_data, tmp_file_path,
            public_id=upload_id,
        )

        request.setResponseCode(201)
        request.finish()


class FileSharingRequest(server.Request):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._upload_data = None

    @property
    def upload_data(self):
        """A tuple with upload_id and filename retrieve from requested path"""
        if self._upload_data is not None:
            return self._upload_data

        # self.path is not available if we are easly in the request (e.g. when gotLength
        # is called), in which case channel._path must be used. On the other hand, when
        # render_[VERB] is called, only self.path is available
        path = self.channel._path if self.path is None else self.path
        # we normalise the path
        path = urlparse(path.decode()).path
        try:
            __, upload_id, filename = path.split('/')
        except ValueError:
            raise exceptions.DataError("no enought path elements")
        if len(upload_id) < 10:
            raise exceptions.DataError(f"invalid upload ID received for a PUT: {upload_id!r}")

        self._upload_data = (upload_id, filename)
        return self._upload_data

    @property
    def file_sharing(self):
        return self.channel.site.file_sharing

    @property
    def file_tmp_dir(self):
        return self.channel.site.file_tmp_dir

    def refuseRequest(self):
        if self.content is not None:
            self.content.close()
        self.content = open(os.devnull, 'w+b')
        self.channel._respondToBadRequestAndDisconnect()

    def gotLength(self, length):
        if self.channel._command.decode().upper() == 'PUT':
            # for PUT we check early if upload_id is fine, to avoid buffering a file we'll refuse
            # we buffer the file in component's TMP_BUFFER_DIR, so we just have to rename it at the end
            try:
                upload_id, filename = self.upload_data
            except exceptions.DataError as e:
                log.warning("Invalid PUT request, we stop here: {e}")
                return self.refuseRequest()
            try:
                client, upload_request, timer = self.file_sharing.expected_uploads.pop(upload_id)
            except KeyError:
                log.warning(f"unknown (expired?) upload ID received for a PUT: {upload_id!r}")
                return self.refuseRequest()

            if not timer.active:
                log.warning(f"upload id {upload_id!r} used for a PUT, but it is expired")
                return self.refuseRequest()

            timer.cancel()

            if upload_request.filename != filename:
                log.warning(
                    f"invalid filename for PUT (upload id: {upload_id!r}, URL: {self.channel._path.decode()}). Original "
                    f"{upload_request.filename!r} doesn't match {filename!r}"
                )
                return self.refuseRequest()

            self.upload_request_data = (client, upload_request)

            file_tmp_path = files_utils.get_unique_name(
                self.file_tmp_dir/upload_id)

            self.content = open(file_tmp_path, 'w+b')
        else:
            return super().gotLength(length)


class FileSharingSite(server.Site):
    requestFactory = FileSharingRequest

    def __init__(self, file_sharing):
        self.file_sharing = file_sharing
        self.file_tmp_dir = file_sharing.host.getLocalPath(
            None, C.FILES_TMP_DIR, TMP_BUFFER_DIR, component=True, profile=False
        )
        for old_file in self.file_tmp_dir.iterdir():
            log.debug(f"purging old buffer file at {old_file}")
            old_file.unlink()
        super().__init__(HTTPFileServer())

    def getContentFile(self, length):
        file_tmp_path = self.file_tmp_dir/shortuuid.uuid()
        return open(file_tmp_path, 'w+b')


class FileSharing:

    def __init__(self, host):
        self.host = host
        self.initialised = False

    def init(self):
        # we init once on first component connection,
        # there is not need to init this plugin if not component use it
        # TODO: this plugin should not be loaded at all if no component uses it
        #   and should be loaded dynamically as soon as a suitable profile is created
        if self.initialised:
            return
        self.initialised = True
        log.info(_("File Sharing initialization"))
        self._f = self.host.plugins["FILE"]
        self._jf = self.host.plugins["XEP-0234"]
        self._h = self.host.plugins["XEP-0300"]
        self._t = self.host.plugins["XEP-0264"]
        self.host.plugins["XEP-0363"].registerHandler(self._onHTTPUpload)
        self.host.trigger.add("FILE_getDestDir", self._getDestDirTrigger)
        self.host.trigger.add(
            "XEP-0234_fileSendingRequest", self._fileSendingRequestTrigger, priority=1000
        )
        self.host.trigger.add("XEP-0234_buildFileElement", self._addFileComments)
        self.host.trigger.add("XEP-0234_parseFileElement", self._getFileComments)
        self.host.trigger.add("XEP-0329_compGetFilesFromNode", self._addCommentsData)
        self.files_path = self.host.getLocalPath(None, C.FILES_DIR, profile=False)
        self.http_port = self.host.memory.getConfig(
            'component file_sharing', 'http_upload_port', 8888)
        connection_type = self.host.memory.getConfig(
            'component file_sharing', 'http_upload_connection_type', 'https')
        if connection_type not in ('http', 'https'):
            raise exceptions.ConfigError(
                f'bad http_upload_connection_type, you must use one of "http" or "https"'
            )
        self.server = FileSharingSite(self)
        self.expected_uploads = {}
        if connection_type == 'http':
            reactor.listenTCP(self.http_port, self.server)
        else:
            options = tls.getOptionsFromConfig(self.host.memory.config, "component file_sharing")
            tls.TLSOptionsCheck(options)
            context_factory = tls.getTLSContextFactory(options)
            reactor.listenSSL(self.http_port, self.server, context_factory)


    def getHandler(self, client):
        return Comments_handler(self)

    def profileConnecting(self, client):
        self.init()
        public_base_url = self.host.memory.getConfig(
            'component file_sharing', 'http_upload_public_facing_url')
        if public_base_url is None:
            client._file_sharing_base_url = f"https://{client.host}:{self.http_port}"
        else:
            client._file_sharing_base_url = public_base_url
        client._file_sharing_allowed_hosts = self.host.memory.getConfig(
            'component file_sharing', 'http_upload_allowed_hosts_list') or [client.host]
        path = client.file_tmp_dir = os.path.join(
            self.host.memory.getConfig("", "local_dir"),
            C.FILES_TMP_DIR,
            regex.pathEscape(client.profile),
        )
        if not os.path.exists(path):
            os.makedirs(path)

    async def registerReceivedFile(
            self, client, peer_jid, file_data, file_path, public_id=None, extra=None):
        """Post file reception tasks

        once file is received, this method create hash/thumbnails if necessary
        move the file to the right location, and create metadata entry in database
        """
        name = file_data["name"]
        if extra is None:
            extra = {}

        if file_data.get("hash_algo") == HASH_ALGO:
            log.debug(_("Reusing already generated hash"))
            file_hash = file_data["hash_hasher"].hexdigest()
        else:
            hasher = self._h.getHasher(HASH_ALGO)
            with file_path.open('rb') as f:
                file_hash = await self._h.calculateHash(f, hasher)
        final_path = self.files_path/file_hash

        if final_path.is_file():
            log.debug(
                "file [{file_hash}] already exists, we can remove temporary one".format(
                    file_hash=file_hash
                )
            )
            file_path.unlink()
        else:
            file_path.rename(final_path)
            log.debug(
                "file [{file_hash}] moved to {files_path}".format(
                    file_hash=file_hash, files_path=self.files_path
                )
            )

        mime_type = file_data.get("mime_type")
        if not mime_type or mime_type == "application/octet-stream":
            mime_type = mimetypes.guess_type(name)[0]

        if mime_type is not None and mime_type.startswith("image"):
            thumbnails = extra.setdefault(C.KEY_THUMBNAILS, [])
            for max_thumb_size in (self._t.SIZE_SMALL, self._t.SIZE_MEDIUM):
                try:
                    thumb_size, thumb_id = await self._t.generateThumbnail(
                        final_path,
                        max_thumb_size,
                        #  we keep thumbnails for 6 months
                        60 * 60 * 24 * 31 * 6,
                    )
                except Exception as e:
                    log.warning(_("Can't create thumbnail: {reason}").format(reason=e))
                    break
                thumbnails.append({"id": thumb_id, "size": thumb_size})

        self.host.memory.setFile(
            client,
            name=name,
            version="",
            file_hash=file_hash,
            hash_algo=HASH_ALGO,
            size=file_data["size"],
            path=file_data.get("path"),
            namespace=file_data.get("namespace"),
            mime_type=mime_type,
            public_id=public_id,
            owner=peer_jid,
            extra=extra,
        )

    def _getDestDirTrigger(
        self, client, peer_jid, transfer_data, file_data, stream_object
    ):
        """This trigger accept file sending request, and store file locally"""
        if not client.is_component:
            return True, None
        assert stream_object
        assert "stream_object" not in transfer_data
        assert C.KEY_PROGRESS_ID in file_data
        filename = file_data["name"]
        assert filename and not "/" in filename
        file_tmp_dir = self.host.getLocalPath(
            None, C.FILES_TMP_DIR, peer_jid.userhost(), component=True, profile=False
        )
        file_tmp_path = file_data['file_path'] = files_utils.get_unique_name(
            file_tmp_dir/filename)

        transfer_data["finished_d"].addCallback(
            lambda __: defer.ensureDeferred(
                self.registerReceivedFile(client, peer_jid, file_data, file_tmp_path)
            )
        )

        self._f.openFileWrite(
            client, file_tmp_path, transfer_data, file_data, stream_object
        )
        return False, defer.succeed(True)

    @defer.inlineCallbacks
    def _retrieveFiles(
        self, client, session, content_data, content_name, file_data, file_elt
    ):
        """This method retrieve a file on request, and send if after checking permissions"""
        peer_jid = session["peer_jid"]
        try:
            found_files = yield self.host.memory.getFiles(
                client,
                peer_jid=peer_jid,
                name=file_data.get("name"),
                file_hash=file_data.get("file_hash"),
                hash_algo=file_data.get("hash_algo"),
                path=file_data.get("path"),
                namespace=file_data.get("namespace"),
            )
        except exceptions.NotFound:
            found_files = None
        except exceptions.PermissionError:
            log.warning(
                _("{peer_jid} is trying to access an unauthorized file: {name}").format(
                    peer_jid=peer_jid, name=file_data.get("name")
                )
            )
            defer.returnValue(False)

        if not found_files:
            log.warning(
                _("no matching file found ({file_data})").format(file_data=file_data)
            )
            defer.returnValue(False)

        # we only use the first found file
        found_file = found_files[0]
        if found_file['type'] != C.FILE_TYPE_FILE:
            raise TypeError("a file was expected, type is {type_}".format(
                type_=found_file['type']))
        file_hash = found_file["file_hash"]
        file_path = os.path.join(self.files_path, file_hash)
        file_data["hash_hasher"] = hasher = self._h.getHasher(found_file["hash_algo"])
        size = file_data["size"] = found_file["size"]
        file_data["file_hash"] = file_hash
        file_data["hash_algo"] = found_file["hash_algo"]

        # we complete file_elt so peer can have some details on the file
        if "name" not in file_data:
            file_elt.addElement("name", content=found_file["name"])
        file_elt.addElement("size", content=str(size))
        content_data["stream_object"] = stream.FileStreamObject(
            self.host,
            client,
            file_path,
            uid=self._jf.getProgressId(session, content_name),
            size=size,
            data_cb=lambda data: hasher.update(data),
        )
        defer.returnValue(True)

    def _fileSendingRequestTrigger(
        self, client, session, content_data, content_name, file_data, file_elt
    ):
        if not client.is_component:
            return True, None
        else:
            return (
                False,
                self._retrieveFiles(
                    client, session, content_data, content_name, file_data, file_elt
                ),
            )

    ## HTTP Upload ##

    def _purge_slot(self, upload_id):
        try:
            del self.expected_uploads[upload_id]
        except KeyError:
            log.error(f"trying to purge an inexisting upload slot ({upload_id})")

    def _onHTTPUpload(self, client, request):
        # filename should be already cleaned, but it's better to double check
        assert '/' not in request.filename
        if request.from_.host not in client._file_sharing_allowed_hosts:
            raise error.StanzaError("forbidden")

        upload_id = shortuuid.ShortUUID().random(length=30)
        assert '/' not in upload_id
        timer = reactor.callLater(30, self._purge_slot, upload_id)
        self.expected_uploads[upload_id] = (client, request, timer)
        url = urljoin(client._file_sharing_base_url, f"{upload_id}/{request.filename}")
        slot = self.host.plugins["XEP-0363"].Slot(
            put=url,
            get=url,
            headers=[],
        )
        return slot

    ## comments triggers ##

    def _addFileComments(self, file_elt, extra_args):
        try:
            comments_url = extra_args.pop("comments_url")
        except KeyError:
            return

        comment_elt = file_elt.addElement((NS_COMMENTS, "comments"), content=comments_url)

        try:
            count = len(extra_args["extra"]["comments"])
        except KeyError:
            count = 0

        comment_elt["count"] = str(count)
        return True

    def _getFileComments(self, file_elt, file_data):
        try:
            comments_elt = next(file_elt.elements(NS_COMMENTS, "comments"))
        except StopIteration:
            return
        file_data["comments_url"] = str(comments_elt)
        file_data["comments_count"] = comments_elt["count"]
        return True

    def _addCommentsData(self, client, iq_elt, owner, node_path, files_data):
        for file_data in files_data:
            file_data["comments_url"] = uri.buildXMPPUri(
                "pubsub",
                path=client.jid.full(),
                node=COMMENT_NODE_PREFIX + file_data["id"],
            )
        return True


class Comments_handler(pubsub.PubSubService):
    """This class is a minimal Pubsub service handling virtual nodes for comments"""

    def __init__(self, plugin_parent):
        super(Comments_handler, self).__init__()  # PubsubVirtualResource())
        self.host = plugin_parent.host
        self.plugin_parent = plugin_parent
        self.discoIdentity = {
            "category": "pubsub",
            "type": "virtual",  # FIXME: non standard, here to avoid this service being considered as main pubsub one
            "name": "files commenting service",
        }

    def _getFileId(self, nodeIdentifier):
        if not nodeIdentifier.startswith(COMMENT_NODE_PREFIX):
            raise error.StanzaError("item-not-found")
        file_id = nodeIdentifier[len(COMMENT_NODE_PREFIX) :]
        if not file_id:
            raise error.StanzaError("item-not-found")
        return file_id

    @defer.inlineCallbacks
    def getFileData(self, requestor, nodeIdentifier):
        file_id = self._getFileId(nodeIdentifier)
        try:
            files = yield self.host.memory.getFiles(self.parent, requestor, file_id)
        except (exceptions.NotFound, exceptions.PermissionError):
            # we don't differenciate between NotFound and PermissionError
            # to avoid leaking information on existing files
            raise error.StanzaError("item-not-found")
        if not files:
            raise error.StanzaError("item-not-found")
        if len(files) > 1:
            raise error.InternalError("there should be only one file")
        defer.returnValue(files[0])

    def commentsUpdate(self, extra, new_comments, peer_jid):
        """update comments (replace or insert new_comments)

        @param extra(dict): extra data to update
        @param new_comments(list[tuple(unicode, unicode, unicode)]): comments to update or insert
        @param peer_jid(unicode, None): bare jid of the requestor, or None if request is done by owner
        """
        current_comments = extra.setdefault("comments", [])
        new_comments_by_id = {c[0]: c for c in new_comments}
        updated = []
        # we now check every current comment, to see if one id in new ones
        # exist, in which case we must update
        for idx, comment in enumerate(current_comments):
            comment_id = comment[0]
            if comment_id in new_comments_by_id:
                # a new comment has an existing id, update is requested
                if peer_jid and comment[1] != peer_jid:
                    # requestor has not the right to modify the comment
                    raise exceptions.PermissionError
                # we replace old_comment with updated one
                new_comment = new_comments_by_id[comment_id]
                current_comments[idx] = new_comment
                updated.append(new_comment)

        # we now remove every updated comments, to only keep
        # the ones to insert
        for comment in updated:
            new_comments.remove(comment)

        current_comments.extend(new_comments)

    def commentsDelete(self, extra, comments):
        try:
            comments_dict = extra["comments"]
        except KeyError:
            return
        for comment in comments:
            try:
                comments_dict.remove(comment)
            except ValueError:
                continue

    def _getFrom(self, item_elt):
        """retrieve published of an item

        @param item_elt(domish.element): <item> element
        @return (unicode): full jid as string
        """
        iq_elt = item_elt
        while iq_elt.parent != None:
            iq_elt = iq_elt.parent
        return iq_elt["from"]

    @defer.inlineCallbacks
    def publish(self, requestor, service, nodeIdentifier, items):
        #  we retrieve file a first time to check authorisations
        file_data = yield self.getFileData(requestor, nodeIdentifier)
        file_id = file_data["id"]
        comments = [(item["id"], self._getFrom(item), item.toXml()) for item in items]
        if requestor.userhostJID() == file_data["owner"]:
            peer_jid = None
        else:
            peer_jid = requestor.userhost()
        update_cb = partial(self.commentsUpdate, new_comments=comments, peer_jid=peer_jid)
        try:
            yield self.host.memory.fileUpdate(file_id, "extra", update_cb)
        except exceptions.PermissionError:
            raise error.StanzaError("not-authorized")

    @defer.inlineCallbacks
    def items(self, requestor, service, nodeIdentifier, maxItems, itemIdentifiers):
        file_data = yield self.getFileData(requestor, nodeIdentifier)
        comments = file_data["extra"].get("comments", [])
        if itemIdentifiers:
            defer.returnValue(
                [generic.parseXml(c[2]) for c in comments if c[0] in itemIdentifiers]
            )
        else:
            defer.returnValue([generic.parseXml(c[2]) for c in comments])

    @defer.inlineCallbacks
    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
        file_data = yield self.getFileData(requestor, nodeIdentifier)
        file_id = file_data["id"]
        try:
            comments = file_data["extra"]["comments"]
        except KeyError:
            raise error.StanzaError("item-not-found")

        to_remove = []
        for comment in comments:
            comment_id = comment[0]
            if comment_id in itemIdentifiers:
                to_remove.append(comment)
                itemIdentifiers.remove(comment_id)
                if not itemIdentifiers:
                    break

        if itemIdentifiers:
            # not all items have been to_remove, we can't continue
            raise error.StanzaError("item-not-found")

        if requestor.userhostJID() != file_data["owner"]:
            if not all([c[1] == requestor.userhost() for c in to_remove]):
                raise error.StanzaError("not-authorized")

        remove_cb = partial(self.commentsDelete, comments=to_remove)
        yield self.host.memory.fileUpdate(file_id, "extra", remove_cb)