Mercurial > libervia-backend
view src/plugins/plugin_xep_0234.py @ 1557:22f0307864b4
plugin XEP-0234: "senders" handling
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Nov 2015 22:02:41 +0100 |
parents | cbfbe028d099 |
children | ec3848916ee8 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT plugin for Jingle File Transfer (XEP-0234) # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _, D_ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) from sat.core import exceptions from sat.tools import xml_tools from wokkel import disco, iwokkel from zope.interface import implements from sat.tools import utils import os.path from twisted.words.xish import domish from twisted.words.protocols.jabber import jid from twisted.python import failure try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler NS_JINGLE_FT = 'urn:xmpp:jingle:apps:file-transfer:4' CONFIRM = D_(u'{entity} wants to send the file "{name}" to you:\n{desc}\n\nThe file has a size of {size_human}\n\nDo you accept ?') CONFIRM_TITLE = D_(u'Confirm file transfer') CONFIRM_OVERWRITE = D_(u'File {} already exists, are you sure you want to overwrite ?') CONFIRM_OVERWRITE_TITLE = D_(u'File exists') PLUGIN_INFO = { "name": "Jingle File Transfer", "import_name": "XEP-0234", "type": "XEP", "protocols": ["XEP-0234"], "dependencies": ["XEP-0166", "XEP-0300", "FILE"], "main": "XEP_0234", "handler": "yes", "description": _("""Implementation of Jingle File Transfer""") } class XEP_0234(object): def __init__(self, host): log.info(_("plugin Jingle File Transfer initialization")) self.host = host self._j = host.plugins["XEP-0166"] # shortcut to access jingle self._j.registerApplication(NS_JINGLE_FT, self) self._f = host.plugins["FILE"] host.bridge.addMethod("fileJingleSend", ".plugin", in_sign='sssss', out_sign='', method=self._fileJingleSend) def getHandler(self, profile): return XEP_0234_handler() def _fileJingleSend(self, to_jid, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE): return self.fileJingleSend(jid.JID(to_jid), filepath, name or None, file_desc or None, profile) def fileJingleSend(self, to_jid, filepath, name=None, file_desc=None, profile=C.PROF_KEY_NONE): self._j.initiate(to_jid, [{'app_ns': NS_JINGLE_FT, 'senders': self._j.ROLE_INITIATOR, 'app_kwargs': {'filepath': filepath, 'name': name, 'file_desc': file_desc}, }], profile=profile) # Dialogs with user # the overwrite check is done here def _getDestDir(self, session, content_data, profile): """Request confirmation and destination dir to user if transfer is confirmed, session is filled @param session(dict): jingle session data @param content_data(dict): content informations @param profile: %(doc_profile)s return (defer.Deferred): True if transfer is accepted """ application_data = content_data['application_data'] file_data = application_data['file_data'] d = xml_tools.deferDialog(self.host, _(CONFIRM).format(entity=session['to_jid'].full(), **file_data), _(CONFIRM_TITLE), type_=C.XMLUI_DIALOG_FILE, options={C.XMLUI_DATA_FILETYPE: C.XMLUI_DATA_FILETYPE_DIR}, profile=profile) d.addCallback(self._gotConfirmation, session, content_data, application_data, profile) return d def _gotConfirmation(self, data, session, content_data, application_data, profile): """Called when the permission and dest path have been received @param data(dict): xmlui data received from file dialog return (bool): True if copy is wanted and OK False if user wants to cancel if fill exists ask confirmation and call again self._getDestDir if needed """ if data.get('cancelled', False): return False file_data = application_data['file_data'] path = data['path'] file_data['file_path'] = file_path = os.path.join(path, file_data['name']) log.debug(u'destination file path set to {}'.format(file_path)) # we manage case where file already exists if os.path.exists(file_path): def check_overwrite(overwrite): if overwrite: assert 'file_obj' not in content_data content_data['file_obj'] = open(file_path, 'w') return True else: return self._getDestDir(session, content_data, profile) exists_d = xml_tools.deferConfirm( self.host, _(CONFIRM_OVERWRITE).format(file_path), _(CONFIRM_OVERWRITE_TITLE), profile=profile) exists_d.addCallback(check_overwrite) return exists_d assert 'file_obj' not in content_data content_data['file_obj'] = open(file_path, 'w') return True # jingle callbacks def jingleSessionInit(self, session, content_name, filepath, name=None, file_desc=None, profile=C.PROF_KEY_NONE): content_data = session['contents'][content_name] application_data = content_data['application_data'] assert 'file_path' not in application_data application_data['file_path'] = filepath file_data = application_data['file_data'] = {} file_data['date'] = utils.xmpp_date() file_data['desc'] = file_desc or '' file_data['media-type'] = "application/octet-stream" # TODO file_data['name'] = os.path.basename(filepath) if name is None else name file_data['size'] = os.path.getsize(filepath) desc_elt = domish.Element((NS_JINGLE_FT, 'description')) file_elt = desc_elt.addElement("file") for name in ('date', 'desc', 'media-type', 'name', 'size'): file_elt.addElement(name, content=unicode(file_data[name])) file_elt.addElement("range") # TODO file_elt.addChild(self.host.plugins["XEP-0300"].buidHash()) return desc_elt def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile): """This method request confirmation for a jingle session""" content_data = session['contents'][content_name] if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): log.warning(u"Bad sender, assuming initiator") content_data['senders'] = self._j.ROLE_INITIATOR # first we grab file informations try: file_elt = desc_elt.elements(NS_JINGLE_FT, 'file').next() except StopIteration: raise failure.Failure(exceptions.DataError) file_data = {} for name in ('date', 'desc', 'media-type', 'name', 'range', 'size'): try: file_data[name] = unicode(file_elt.elements(NS_JINGLE_FT, name).next()) except StopIteration: file_data[name] = '' try: size = file_data['size'] = int(file_data['size']) except ValueError: raise failure.Failure(exceptions.DataError) else: # human readable size file_data['size_human'] = u'{:.6n} Mio'.format(float(size)/(1024**2)) name = file_data['name'] if '/' in name or '\\' in name: log.warning(u"File name contain path characters, we replace them: {}".format(name)) file_data['name'] = name.replace('/', '_').replace('\\', '_') # TODO: parse hash using plugin XEP-0300 content_data['application_data']['file_data'] = file_data # now we actualy request permission to user return self._getDestDir(session, content_data, profile) def jingleHandler(self, action, session, content_name, desc_elt, profile): content_data = session['contents'][content_name] application_data = content_data['application_data'] if action in (self._j.A_ACCEPTED_ACK,): pass elif action == self._j.A_SESSION_INITIATE: file_elt = desc_elt.elements(NS_JINGLE_FT, 'file').next() try: file_elt.elements(NS_JINGLE_FT, 'range').next() except StopIteration: # initiator doesn't manage <range>, but we do so we advertise it log.debug("adding <range> element") file_elt.addElement('range') elif action == self._j.A_SESSION_ACCEPT: assert not 'file_obj' in content_data file_path = application_data['file_path'] size = application_data['file_data']['size'] file_obj = content_data['file_obj'] = self._f.File(self.host, file_path, size=size, profile=profile ) file_obj.eof.addCallback(lambda dummy: file_obj.close()) else: log.warning(u"FIXME: unmanaged action {}".format(action)) return desc_elt class XEP_0234_handler(XMPPHandler): implements(iwokkel.IDisco) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_JINGLE_FT)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []