view src/plugins/plugin_exp_pipe.py @ 1198:16ce9a6580a3

misc (install): Lower default setuptools version From 0d607b6ed49eab758fd9b272e148f032e65fb2e2 Mon Sep 17 00:00:00 2001 python-setuptools 5.7 is not yet in Debian, so we need to set the default version to 5.5 (the current version in sid) to avoid the newer version to be downloaded from pypi.
author Matteo Cypriani <mcy@lm7.fr>
date Tue, 09 Sep 2014 22:09:51 -0400
parents 301b342c697a
children 069ad98b360d
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

# SAT plugin for managing pipes (experimental)
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core.log import getLogger
log = getLogger(__name__)
from twisted.words.xish import domish
from twisted.words.protocols.jabber import jid
from twisted.words.protocols import jabber
from twisted.internet import reactor

from wokkel import data_form

IQ_SET = '/iq[@type="set"]'
PROFILE_NAME = "pipe-transfer"
PROFILE = "http://jabber.org/protocol/si/profile/" + PROFILE_NAME

PLUGIN_INFO = {
    "name": "Pipe Plugin",
    "import_name": "EXP-PIPE",
    "type": "EXP",
    "protocols": ["EXP-PIPE"],
    "dependencies": ["XEP-0020", "XEP-0095", "XEP-0065", "XEP-0047"],
    "main": "Exp_Pipe",
    "handler": "no",
    "description": _("""Implementation of SI Pipe Transfer""")
}


class Exp_Pipe(object):
    """This is a modified version of XEP-0096 to work with named pipes instead of files"""

    def __init__(self, host):
        log.info(_("Plugin Pipe initialization"))
        self.host = host
        self.managed_stream_m = [self.host.plugins["XEP-0065"].NAMESPACE,
                                 self.host.plugins["XEP-0047"].NAMESPACE]  # Stream methods managed
        self.host.plugins["XEP-0095"].registerSIProfile(PROFILE_NAME, self.transferRequest)
        host.bridge.addMethod("pipeOut", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self.pipeOut)

    def profileConnected(self, profile):
        client = self.host.getClient(profile)
        client._pipe_waiting_for_approval = {}  # key = id, value = [transfer data, IdelayedCall Reactor timeout,
                                        # current stream method, [failed stream methods], profile]

    def _kill_id(self, approval_id, profile):
        """Delete a waiting_for_approval id, called after timeout
        @param approval_id: id of _pipe_waiting_for_approval"""
        log.info(_("SI Pipe Transfer: TimeOut reached for id %s") % approval_id)
        try:
            client = self.host.getClient(profile)
            del client._pipe_waiting_for_approval[approval_id]
        except KeyError:
            log.warning(_("kill id called on a non existant approval id"))

    def transferRequest(self, iq_id, from_jid, si_id, si_mime_type, si_el, profile):
        """Called when a pipe transfer is requested
        @param iq_id: id of the iq request
        @param from_jid: jid of the sender
        @param si_id: Stream Initiation session id
        @param si_mime_type: Mime type of the pipe (or default "application/octet-stream" if unknown)
        @param si_el: domish.Element of the request
        @param profile: %(doc_profile)s"""
        log.info(_("EXP-PIPE file transfer requested"))
        log.debug(si_el.toXml())
        client = self.host.getClient(profile)
        pipe_elts = filter(lambda elt: elt.name == 'pipe', si_el.elements())
        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_el)

        if not pipe_elts:
            log.warning(_("No pipe element found"))
            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
            return

        if feature_elts:
            feature_el = feature_elts[0]
            data_form.Form.fromElement(feature_el.firstChildElement())
            try:
                stream_method = self.host.plugins["XEP-0020"].negociate(feature_el, 'stream-method', self.managed_stream_m)
            except KeyError:
                log.warning(_("No stream method found"))
                self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
                return
            if not stream_method:
                log.warning(_("Can't find a valid stream method"))
                self.host.plugins["XEP-0095"].sendFailedError(iq_id, from_jid, profile)
                return
        else:
            log.warning(_("No feature element found"))
            self.host.plugins["XEP-0095"].sendBadRequestError(iq_id, from_jid, profile)
            return

        #if we are here, the transfer can start, we just need user's agreement
        data = {"id": iq_id, "from": from_jid}
        client._pipe_waiting_for_approval[si_id] = [data, reactor.callLater(300, self._kill_id, si_id), stream_method, [], profile]

        self.host.askConfirmation(si_id, "PIPE_TRANSFER", data, self.confirmationCB, profile)

    def confirmationCB(self, sid, accepted, frontend_data, profile):
        """Called on confirmation answer
        @param sid: file transfer session id
        @param accepted: True if file transfer is accepted
        @param frontend_data: data sent by frontend"""
        client = self.host.getClient(profile)
        data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
        if accepted:
            if timeout.active():
                timeout.cancel()
            try:
                dest_path = frontend_data['dest_path']
            except KeyError:
                log.error(_('dest path not found in frontend_data'))
                del(client._pipe_waiting_for_approval[sid])
                return
            if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
                file_obj = open(dest_path, 'w+')
                self.host.plugins["XEP-0065"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
            elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
                file_obj = open(dest_path, 'w+')
                self.host.plugins["XEP-0047"].prepareToReceive(jid.JID(data['from']), sid, file_obj, None, self._transferSucceeded, self._transferFailed, profile)
            else:
                log.error(_("Unknown stream method, this should not happen at this stage, cancelling transfer"))
                del(client._pipe_waiting_for_approval[sid])
                return

            #we can send the iq result
            feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': stream_method})
            misc_elts = []
            misc_elts.append(domish.Element((PROFILE, "file")))
            self.host.plugins["XEP-0095"].acceptStream(data["id"], data['from'], feature_elt, misc_elts, profile)
        else:
            log.debug(_("Transfer [%s] refused"), sid)
            self.host.plugins["XEP-0095"].sendRejectedError(data["id"], data['from'], profile=profile)
            del(client._pipe_waiting_for_approval[sid])

    def _transferSucceeded(self, sid, file_obj, stream_method, profile):
        """Called by the stream method when transfer successfuly finished
        @param id: stream id"""
        client = self.host.getClient(profile)
        file_obj.close()
        log.info(_('Transfer %s successfuly finished') % sid)
        del(client._pipe_waiting_for_approval[sid])

    def _transferFailed(self, sid, file_obj, stream_method, reason, profile):
        """Called when something went wrong with the transfer
        @param id: stream id
        @param reason: can be TIMEOUT, IO_ERROR, PROTOCOL_ERROR"""
        client = self.host.getClient(profile)
        data, timeout, stream_method, failed_methods, profile = client._pipe_waiting_for_approval[sid]
        log.warning(_('Transfer %(id)s failed with stream method %(s_method)s') % {'id': sid,
                                                                               's_method': stream_method})
        # filepath = file_obj.name
        file_obj.close()
        #TODO: session remenber (within a time limit) when a stream method fail, and avoid that stream method with full jid for the rest of the session
        log.warning(_("All stream methods failed, can't transfer the file"))
        del(client._pipe_waiting_for_approval[sid])

    def pipeCb(self, filepath, sid, profile, IQ):
        if IQ['type'] == "error":
            stanza_err = jabber.error.exceptionFromStanza(IQ)
            if stanza_err.code == '403' and stanza_err.condition == 'forbidden':
                log.debug(_("Pipe transfer refused by %s") % IQ['from'])
                self.host.bridge.newAlert(_("The contact %s refused your pipe stream") % IQ['from'], _("Pipe stream refused"), "INFO", profile)
            else:
                log.warning(_("Error during pipe stream transfer with %s") % IQ['from'])
                self.host.bridge.newAlert(_("Something went wrong during the pipe stream session intialisation with %s") % IQ['from'], _("Pipe stream error"), "ERROR", profile)
            return

        si_elt = IQ.firstChildElement()

        if IQ['type'] != "result" or not si_elt or si_elt.name != "si":
            log.error(_("Protocol error during file transfer"))
            return

        feature_elts = self.host.plugins["XEP-0020"].getFeatureElt(si_elt)
        if not feature_elts:
            log.warning(_("No feature element"))
            return

        choosed_options = self.host.plugins["XEP-0020"].getChoosedOptions(feature_elts[0])
        try:
            stream_method = choosed_options["stream-method"]
        except KeyError:
            log.warning(_("No stream method choosed"))
            return

        if stream_method == self.host.plugins["XEP-0065"].NAMESPACE:
            #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
            #file_obj = os.fdopen(fd, 'r')
            file_obj = open(filepath, 'r')  # XXX: we have to be sure that filepath is well opened, as reading can block it
            self.host.plugins["XEP-0065"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
        elif stream_method == self.host.plugins["XEP-0047"].NAMESPACE:
            #fd = os.open(filepath, os.O_RDONLY | os.O_NONBLOCK) #XXX: non blocking openingl cause issues with XEP-0065's FileSender
            #file_obj = os.fdopen(fd, 'r')
            file_obj = open(filepath, 'r')  # XXX: we have to be sure that filepath is well opened, as reading can block it
            self.host.plugins["XEP-0047"].startStream(file_obj, jid.JID(IQ['from']), sid, None, self.sendSuccessCb, self.sendFailureCb, None, profile)
        else:
            log.warning(_("Invalid stream method received"))

    def pipeOut(self, to_jid, filepath, data={}, profile_key=C.PROF_KEY_NONE):
        """send a file using EXP-PIPE
        @to_jid: recipient
        @filepath: absolute path to the named pipe to send
        @data: dictionnary with the optional data
        @param profile_key: %(doc_profile_key)s
        @return: an unique id to identify the transfer
        """
        profile = self.host.memory.getProfileName(profile_key)
        if not profile:
            log.warning(_("Trying to send a file from an unknown profile"))
            return ""
        feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m})

        pipe_transfer_elts = []

        pipe_elt = domish.Element((PROFILE, 'pipe'))
        pipe_transfer_elts.append(pipe_elt)

        sid, offer = self.host.plugins["XEP-0095"].proposeStream(jid.JID(to_jid), PROFILE, feature_elt, pipe_transfer_elts, profile_key=profile)
        offer.addCallback(self.pipeCb, filepath, sid, profile)
        return sid

    def sendSuccessCb(self, sid, file_obj, stream_method, profile):
        log.info(_('Transfer %s successfuly finished') % sid)
        file_obj.close()

    def sendFailureCb(self, sid, file_obj, stream_method, reason, profile):
        file_obj.close()
        log.warning(_('Transfer %(id)s failed with stream method %(s_method)s %(profile)s') % {'id': sid, "s_method": stream_method, "profile": profile})