view src/plugins/plugin_xep_0096.py @ 1005:b4af31a8a4f2

core (logs): added formatting, name filter and outputs management: - formatting is inspired from, and use when possible, standard logging. "message", "levelname", and "name" are the only format managed, depending on backend more can be managed (standard backend formats are specified in official python logging doc) - name filter use regular expressions. It's possible to log only plugins with SAT_LOG_LOGGER="^sat.plugins". To log only XEPs 96 & 65, we can use SAT_LOG_LOGGER='(xep_0095|xep_0065)' - output management use a particular syntax: - output handler are name with "//", so far there are "//default" (most of time stderr), "//memory" and "//file" - options can be specified in parenthesis, e.g. "//memory(50)" mean a 50 lines memory buffer (50 is the current default, so that's equivalent to "//memory") - several handlers can be specified: "//file(/tmp/sat.log)//default" will use the default logging + a the /tmp/sat.log file - if there is only one handler, it use the file handler: "/tmp/sat.log" is the same as "//file(/tmp/sat.log)" - not finished, need more work for twisted and basic backends
author Goffi <goffi@goffi.org>
date Mon, 05 May 2014 18:58:34 +0200
parents 4dbe8e57ff51
children 9a85836f0d45
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

# SAT plugin for managing xep-0096
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core.log import getLogger
log = getLogger(__name__)
from twisted.words.xish import domish
from twisted.words.protocols.jabber import jid
from twisted.words.protocols import jabber
import os
from twisted.internet import reactor

from wokkel import data_form

IQ_SET = '/iq[@type="set"]'
PROFILE_NAME = "file-transfer"
PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME

PLUGIN_INFO = {
    "name": "XEP-0096 Plugin",
    "import_name": "XEP-0096",
    "type": "XEP",
    "protocols": ["XEP-0096"],
    "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"],
    "main": "XEP_0096",
    "handler": "no",
    "description": _("""Implementation of SI File Transfer""")
}


class XEP_0096(object):

    def __init__(self, host):
        log.info(_("Plugin XEP_0096 initialization"))
        self.host = host
        self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE,
                                 self.host.plugins["XEP-0047"].NAMESPACE]  # Stream methods managed
        self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest)
        host.bridge.addMethod("sendFile", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.sendFile)

    def profileConnected(self, profile):
        client = self.host.getClient(profile)
        client._xep_0096_waiting_for_approval = {}  # key = id, value = [transfer data, IdelayedCall Reactor timeout,
                                        # current stream method, [failed stream methods], profile]

    def _kill_id(self, approval_id, profile):
        """Delete a waiting_for_approval id, called after timeout
        @param approval_id: id of _xep_0096_waiting_for_approval"""
        log.info(_("SI File Transfer: TimeOut reached for id %s") % approval_id)
        try:
            client = self.host.getClient(profile)
            del client._xep_0096_waiting_for_approval[approval_id]
        except KeyError:
            log.warning(_("kill id called on a non existant approval id"))

    def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile):
        """Called when a file transfer is requested
        @param iq_id: id of the iq request
        @param from_jid: jid of the sender
        @param si_id: Stream Initiation session id
        @param si_mime_type: Mime type of the file (or default "application/octet-stream" if unknown)
        @param si_el: domish.Element of the request
        @param profile: %(doc_profile)s"""
        log.info(_("XEP-0096 file transfer requested"))
        log.debug(si_el.toXml())
        client = self.host.getClient(profile)
        filename = ""
        file_size = ""
        file_date = None
        file_hash = None
        file_desc = ""
        can_range = False
        file_elts = filter(lambda elt: elt.name == 'file', si_el.elements())
        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)

        if file_elts:
            file_el = file_elts[0]
            filename = file_el["name"]
            file_size = file_el["size"]
            file_date = file_el.getAttribute("date", "")
            file_hash = file_el.getAttribute("hash", "")
            log.info(_("File proposed: name=[%(name)s] size=%(size)s") % {'name': filename, 'size': file_size})
            for file_child_el in file_el.elements():
                if file_child_el.name == "desc":
                    file_desc = unicode(file_child_el)
                elif file_child_el.name == "range":
                    can_range = True
        else:
            log.warning(_("No file element found"))
            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
            return

        if feature_elts:
            feature_el = feature_elts[0]
            data_form.Form.fromElement(feature_el.firstChildElement())
            try:
                stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m)
            except KeyError:
                log.warning(_("No stream method found"))
                self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
                return
            if not stream_method:
                log.warning(_("Can't find a valid stream method"))
                self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile)
                return
        else:
            log.warning(_("No feature element found"))
            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
            return

        #if we are here, the transfer can start, we just need user's agreement
        data = {"filename": filename, "id": iq_id, "from": from_jid, "size": file_size, "date": file_date, "hash": file_hash, "desc": file_desc, "can_range": str(can_range)}
        client._xep_0096_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id, profile), stream_method, []]

        self.host.askConfirmation(si_id, "FILE_TRANSFER", data, self.confirmationCB, profile)

    def _getFileObject(self, dest_path, can_range=False):
        """Open file, put file pointer to the end if the file if needed
        @param dest_path: path of the destination file
        @param can_range: True if the file pointer can be moved
        @return: File Object"""
        return open(dest_path, "ab" if can_range else "wb")

    def confirmationCB(self, sid, accepted, frontend_data, profile):
        """Called on confirmation answer
        @param sid: file transfer session id
        @param accepted: True if file transfer is accepted
        @param frontend_data: data sent by frontend"""
        client = self.host.getClient(profile)
        data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
        can_range = data['can_range'] == "True"
        range_offset = 0
        if accepted:
            if timeout.active():
                timeout.cancel()
            try:
                dest_path = frontend_data['dest_path']
            except KeyError:
                log.error(_('dest path not found in frontend_data'))
                del(client._xep_0096_waiting_for_approval[sid])
                return
            if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
                file_obj = self._getFileObject(dest_path, can_range)
                range_offset = file_obj.tell()
                self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile)
            elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
                file_obj = self._getFileObject(dest_path, can_range)
                range_offset = file_obj.tell()
                self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, int(data["size"]), self._transferSucceeded, self._transferFailed, profile)
            else:
                log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
                del(client._xep_0096_waiting_for_approval[sid])
                return

            #we can send the iq result
            feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
            misc_elts = []
            misc_elts.append(domish.Element((PROFILE, "file")))
            if can_range:
                range_elt = domish.Element((None, "range"))
                range_elt['offset'] = str(range_offset)
                #TODO: manage range length
                misc_elts.append(range_elt)
            self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
        else:
            log.debug(_("Transfer [%s] refused"), sid)
            self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
            del(client._xep_0096_waiting_for_approval[sid])

    def _transferSucceeded(self, sid, file_obj, stream_method, profile):
        """Called by the stream method when transfer successfuly finished
        @param id: stream id"""
        client = self.host.getClient(profile)
        file_obj.close()
        log.info(_('Transfer %s successfuly finished') % sid)
        del(client._xep_0096_waiting_for_approval[sid])

    def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
        """Called when something went wrong with the transfer
        @param id: stream id
        @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR"""
        client = self.host.getClient(profile)
        data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid]
        log.warning(_('Transfer %(id)s failed with stream method %(s_method)s: %(reason)s') % {
            'id': sid,
            's_method': stream_method,
            'reason': reason})
        filepath = file_obj.name
        file_obj.close()
        os.remove(filepath)
        #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
        log.warning(_("All stream methods failed, can't transfer the file"))
        del(client._xep_0096_waiting_for_approval[sid])

    def fileCb(self, filepath, sid, size, profile, IQ):
        if IQ['type'] == "error":
            stanza_err = jabber.error.exceptionFromStanza(IQ)
            if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
                log.debug(_("File transfer refused by %s") % IQ['from'])
                self.host.bridge.newAlert(_("The contact %s refused your file") % IQ['from'], _("File refused"), "INFO", profile)
            else:
                log.warning(_("Error during file transfer with %s") % IQ['from'])
                self.host.bridge.newAlert(_("Something went wrong during the file transfer session intialisation with %s") % IQ['from'], _("File transfer error"), "ERROR", profile)
            return

        si_elt = IQ.firstChildElement()

        if IQ['type'] != "result" or not si_elt or si_elt.name != "si":
            log.error(_("Protocol error during file transfer"))
            return

        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
        if not feature_elts:
            log.warning(_("No feature element"))
            return

        choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0])
        try:
            stream_method = choosed_options["stream-method"]
        except KeyError:
            log.warning(_("No stream method choosed"))
            return

        range_offset = 0
        range_length = None
        range_elts = filter(lambda elt: elt.name == 'range', si_elt.elements())
        if range_elts:
            range_elt = range_elts[0]
            range_offset = range_elt.getAttribute("offset", 0)
            range_length = range_elt.getAttribute("length")

        if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
            file_obj = open(filepath, 'r')
            if range_offset:
                file_obj.seek(range_offset)
            self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile)
        elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
            file_obj = open(filepath, 'r')
            if range_offset:
                file_obj.seek(range_offset)
            self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, range_length, self.sendSuccessCb, self.sendFailureCb, size, profile)
        else:
            log.warning(_("Invalid stream method received"))

    def sendFile(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
        """send a file using XEP-0096
        @to_jid: recipient
        @filepath: absolute path to the file to send
        @data: dictionnary with the optional following keys:
               - "description": description of the file
        @param profile_key: %(doc_profile_key)s
        @return: an unique id to identify the transfer
        """
        profile = self.host.memory.getProfileName(profile_key)
        if not profile:
            log.warning(_("Trying to send a file from an unknown profile"))
            return ""
        feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m})

        file_transfer_elts = []

        statinfo = os.stat(filepath)
        file_elt = domish.Element((PROFILE, 'file'))
        file_elt['name'] = os.path.basename(filepath)
        size = file_elt['size'] = str(statinfo.st_size)
        file_transfer_elts.append(file_elt)

        file_transfer_elts.append(domish.Element((None, 'range')))

        sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, file_transfer_elts, profile_key=profile)
        offer.addCallback(self.fileCb, filepath, sid, size, profile)
        return sid

    def sendSuccessCb(self, sid, file_obj, stream_method, profile):
        log.info(_('Transfer %(sid)s successfuly finished [%(profile)s]')
             % {"sid": sid, "profile": profile})
        file_obj.close()

    def sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
        file_obj.close()
        log.warning(_('Transfer %(id)s failed with stream method %(s_method)s: %(reason)s [%(profile)s]') % {'id': sid, "s_method": stream_method, 'reason': reason, 'profile': profile})