Mercurial > libervia-backend
changeset 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 78c7992a26ed |
children | b4bf282d6354 |
files | frontends/src/jp/cmd_pipe.py src/plugins/plugin_exp_jingle_stream.py src/plugins/plugin_exp_pipe.py src/plugins/plugin_misc_file.py src/plugins/plugin_misc_ip.py src/plugins/plugin_xep_0047.py src/plugins/plugin_xep_0065.py src/plugins/plugin_xep_0095.py src/plugins/plugin_xep_0096.py src/plugins/plugin_xep_0166.py src/plugins/plugin_xep_0234.py src/plugins/plugin_xep_0260.py src/plugins/plugin_xep_0261.py src/plugins/plugin_xep_0363.py src/tools/stream.py |
diffstat | 15 files changed, 869 insertions(+), 669 deletions(-) [+] |
line wrap: on
line diff
--- a/frontends/src/jp/cmd_pipe.py Thu Feb 01 07:24:34 2018 +0100 +++ b/frontends/src/jp/cmd_pipe.py Thu Feb 08 00:37:42 2018 +0100 @@ -19,18 +19,19 @@ from sat_frontends.jp import base -import tempfile +from sat_frontends.jp.constants import Const as C import sys -import os -import os.path -import shutil from sat.core.i18n import _ from sat_frontends.tools import jid import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI - +from functools import partial +import socket +import SocketServer +import errno __commands__ = ["Pipe"] +START_PORT = 9999 class PipeOut(base.CommandBase): @@ -41,31 +42,64 @@ def add_parser_options(self): self.parser.add_argument("jid", type=base.unicode_decoder, help=_("the destination jid")) + def streamOutCb(self, port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('127.0.0.1', int(port))) + while True: + buf = sys.stdin.read(4096) + if not buf: + break + try: + s.sendall(buf) + except socket.error as e: + if e.errno == errno.EPIPE: + sys.stderr.write(str(e) + '\n') + self.host.quit(1) + else: + raise e + self.host.quit() + def start(self): """ Create named pipe, and send stdin to it """ - # TODO: check_jids - tmp_dir = tempfile.mkdtemp() - fifopath = os.path.join(tmp_dir,"pipe_out") - os.mkfifo(fifopath) - self.host.bridge.pipeOut(self.host.get_full_jid(self.args.jid), fifopath, self.profile) - with open(fifopath, 'w') as f: - shutil.copyfileobj(sys.stdin, f) - shutil.rmtree(tmp_dir) - self.host.quit() + self.host.bridge.streamOut( + self.host.get_full_jid(self.args.jid), + self.profile, + callback=self.streamOutCb, + errback=partial(self.errback, + msg=_(u"can't start stream: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK)) + + +class StreamServer(SocketServer.BaseRequestHandler): + + def handle(self): + while True: + data = self.request.recv(4096) + if not data: + break + sys.stdout.write(data) + try: + sys.stdout.flush() + except IOError as e: + sys.stderr.write(str(e) + '\n') + break + # calling shutdown will do a deadlock as we don't use separate thread + # this is a workaround (cf. https://stackoverflow.com/a/36017741) + self.server._BaseServer__shutdown_request = True class PipeIn(base.CommandAnswering): def __init__(self, host): super(PipeIn, self).__init__(host, 'in', help=_('receive a pipe stream')) - self.action_callbacks = {"PIPE": self.onPipeAction} + self.action_callbacks = {"STREAM": self.onStreamAction} def add_parser_options(self): self.parser.add_argument("jids", type=base.unicode_decoder, nargs="*", help=_('Jids accepted (none means "accept everything")')) def getXmluiId(self, action_data): # FIXME: we temporarily use ElementTree, but a real XMLUI managing module - # should be available in the futur + # should be available in the future # TODO: XMLUI module try: xml_ui = action_data['xmlui'] @@ -78,7 +112,7 @@ self.disp(_(u"Invalid XMLUI received"), error=True) return xmlui_id - def onPipeAction(self, action_data, action_id, security_limit, profile): + def onStreamAction(self, action_data, action_id, security_limit, profile): xmlui_id = self.getXmluiId(action_data) if xmlui_id is None: return self.host.quitFromSignal(1) @@ -89,16 +123,22 @@ return if not self.bare_jids or from_jid.bare in self.bare_jids: - tmp_dir = tempfile.mkdtemp() - fifopath = os.path.join(tmp_dir,"pipe_in") - os.mkfifo(fifopath) - xmlui_data = {'path': fifopath} + host, port = "localhost", START_PORT + while True: + try: + server = SocketServer.TCPServer((host, port), StreamServer) + except socket.error as e: + if e.errno == errno.EADDRINUSE: + port += 1 + else: + raise e + else: + break + xmlui_data = {'answer': C.BOOL_TRUE, + 'port': unicode(port)} self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile) - - with open(fifopath, 'r') as f: - shutil.copyfileobj(f, sys.stdout) - shutil.rmtree(tmp_dir) - self.host.quit() + server.serve_forever() + self.host.quitFromSignal() def start(self): self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/plugin_exp_jingle_stream.py Thu Feb 08 00:37:42 2018 +0100 @@ -0,0 +1,281 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# SAT plugin for managing pipes (experimental) +# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from sat.core.i18n import _, D_ +from sat.core.constants import Const as C +from sat.core import exceptions +from sat.core.log import getLogger +log = getLogger(__name__) +from sat.tools import xml_tools +from sat.tools import stream +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid +from twisted.internet import defer +from twisted.internet import protocol +from twisted.internet import endpoints +from twisted.internet import reactor +from twisted.internet import error +from twisted.internet import interfaces +from zope import interface +import errno + +NS_STREAM = 'http://salut-a-toi.org/protocol/stream' +SECURITY_LIMIT=30 +START_PORT = 8888 + +PLUGIN_INFO = { + C.PI_NAME: "Jingle Stream Plugin", + C.PI_IMPORT_NAME: "STREAM", + C.PI_TYPE: "EXP", + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0166"], + C.PI_MAIN: "JingleStream", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Jingle Stream plugin""") +} + +CONFIRM = D_(u"{peer} wants to send you a stream, do you accept ?") +CONFIRM_TITLE = D_(u"Stream Request") + + +class StreamProtocol(protocol.Protocol): + + def __init__(self): + self.pause = False + + def setPause(self, paused): + # in Python 2.x, Twisted classes are old style + # so we can use property and setter + if paused: + if not self.pause: + self.transport.pauseProducing() + self.pause = True + else: + if self.pause: + self.transport.resumeProducing() + self.pause = False + + def disconnect(self): + self.transport.loseConnection() + + def connectionMade(self): + if self.factory.client_conn is not None: + self.transport.loseConnection() + self.factory.setClientConn(self) + + def dataReceived(self, data): + self.factory.writeToConsumer(data) + + def sendData(self, data): + self.transport.write(data) + + def connectionLost(self, reason): + if self.factory.client_conn != self: + # only the first connected client_conn is relevant + return + + if reason.type == error.ConnectionDone: + self.factory.streamFinished() + else: + self.factory.streamFailed(reason) + + +@interface.implementer(stream.IStreamProducer) +@interface.implementer(interfaces.IPushProducer) +@interface.implementer(interfaces.IConsumer) +class StreamFactory(protocol.Factory): + protocol = StreamProtocol + consumer = None + producer = None + deferred = None + + def __init__(self): + self.client_conn = None + + def setClientConn(self, stream_protocol): + # in Python 2.x, Twisted classes are old style + # so we can use property and setter + assert self.client_conn is None + self.client_conn = stream_protocol + if self.consumer is None: + self.client_conn.setPause(True) + + def startStream(self, consumer): + if self.consumer is not None: + raise exceptions.InternalError(_(u"stream can't be used with multiple consumers")) + assert self.deferred is None + self.consumer = consumer + consumer.registerProducer(self, True) + self.deferred = defer.Deferred() + if self.client_conn is not None: + self.client_conn.setPause(False) + return self.deferred + + def streamFinished(self): + self.client_conn = None + if self.consumer: + self.consumer.unregisterProducer() + self.port_listening.stopListening() + self.deferred.callback(None) + + def streamFailed(self, failure_): + self.client_conn = None + if self.consumer: + self.consumer.unregisterProducer() + self.port_listening.stopListening() + self.deferred.errback(failure_) + elif self.producer: + self.producer.stopProducing() + + def stopStream(self): + if self.client_conn is not None: + self.client_conn.disconnect() + + def registerProducer(self, producer, streaming): + self.producer = producer + + def pauseProducing(self): + self.client_conn.setPause(True) + + def resumeProducing(self): + self.client_conn.setPause(False) + + def stopProducing(self): + if self.client_conn: + self.client_conn.disconnect() + + def write(self, data): + try: + self.client_conn.sendData(data) + except AttributeError: + log.warning(_(u"No client connected, can't send data")) + + def writeToConsumer(self, data): + self.consumer.write(data) + + +class JingleStream(object): + """This non standard jingle application send byte stream""" + + def __init__(self, host): + log.info(_("Plugin Stream initialization")) + self.host = host + self._j = host.plugins["XEP-0166"] # shortcut to access jingle + self._j.registerApplication(NS_STREAM, self) + host.bridge.addMethod("streamOut", ".plugin", in_sign='ss', out_sign='s', method=self._streamOut, async=True) + # jingle callbacks + + def _streamOut(self, to_jid_s, profile_key): + client = self.host.getClient(profile_key) + return self.streamOut(client, jid.JID(to_jid_s)) + + @defer.inlineCallbacks + def streamOut(self, client, to_jid): + """send a stream + + @param peer_jid(jid.JID): recipient + @return: an unique id to identify the transfer + """ + port = START_PORT + factory = StreamFactory() + while True: + endpoint = endpoints.TCP4ServerEndpoint(reactor, port) + try: + port_listening = yield endpoint.listen(factory) + except error.CannotListenError as e: + if e.socketError.errno == errno.EADDRINUSE: + port += 1 + else: + raise e + else: + factory.port_listening = port_listening + break + self._j.initiate(client, + to_jid, + [{'app_ns': NS_STREAM, + 'senders': self._j.ROLE_INITIATOR, + 'app_kwargs': {'stream_object': factory}, + }]) + defer.returnValue(unicode(port)) + + def jingleSessionInit(self, client, session, content_name, stream_object): + content_data = session['contents'][content_name] + application_data = content_data['application_data'] + assert 'stream_object' not in application_data + application_data['stream_object'] = stream_object + desc_elt = domish.Element((NS_STREAM, 'description')) + return desc_elt + + @defer.inlineCallbacks + def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): + """This method request confirmation for a jingle session""" + content_data = session['contents'][content_name] + if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): + log.warning(u"Bad sender, assuming initiator") + content_data['senders'] = self._j.ROLE_INITIATOR + + confirm_data = yield xml_tools.deferDialog(self.host, + _(CONFIRM).format(peer=session['peer_jid'].full()), + _(CONFIRM_TITLE), + type_=C.XMLUI_DIALOG_CONFIRM, + action_extra={'meta_from_jid': session['peer_jid'].full(), + 'meta_type': "STREAM", + }, + security_limit=SECURITY_LIMIT, + profile=client.profile) + + if not C.bool(confirm_data['answer']): + defer.returnValue(False) + try: + port = int(confirm_data['port']) + except (ValueError, KeyError): + raise exceptions.DataError(_(u'given port is invalid')) + endpoint = endpoints.TCP4ClientEndpoint(reactor, 'localhost', port) + factory = StreamFactory() + yield endpoint.connect(factory) + content_data['stream_object'] = factory + finished_d = content_data['finished_d'] = defer.Deferred() + args = [client, session, content_name, content_data] + finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) + defer.returnValue(True) + + def jingleHandler(self, client, action, session, content_name, desc_elt): + content_data = session['contents'][content_name] + application_data = content_data['application_data'] + if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): + pass + elif action == self._j.A_SESSION_ACCEPT: + assert not 'stream_object' in content_data + content_data['stream_object'] = application_data['stream_object'] + finished_d = content_data['finished_d'] = defer.Deferred() + args = [client, session, content_name, content_data] + finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) + else: + log.warning(u"FIXME: unmanaged action {}".format(action)) + return desc_elt + + def _finishedCb(self, dummy, client, session, content_name, content_data): + log.info(u"Pipe transfer completed") + self._j.contentTerminate(client, session, content_name) + content_data['stream_object'].stopStream() + + def _finishedEb(self, failure, client, session, content_name, content_data): + log.warning(u"Error while streaming pipe: {}".format(failure)) + self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT) + content_data['stream_object'].stopStream()
--- a/src/plugins/plugin_exp_pipe.py Thu Feb 01 07:24:34 2018 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,142 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding: utf-8 -*- - -# SAT plugin for managing pipes (experimental) -# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -from sat.core.i18n import _, D_ -from sat.core.constants import Const as C -from sat.core.log import getLogger -log = getLogger(__name__) -from sat.tools import xml_tools -from twisted.words.xish import domish -from twisted.words.protocols.jabber import jid -from twisted.internet import defer - -NS_PIPE = 'http://salut-a-toi.org/protocol/pipe' -SECURITY_LIMIT=30 - -PLUGIN_INFO = { - C.PI_NAME: "Pipe Plugin", - C.PI_IMPORT_NAME: "EXP-PIPE", - C.PI_TYPE: "EXP", - C.PI_PROTOCOLS: ["EXP-PIPE"], - C.PI_DEPENDENCIES: ["XEP-0166"], - C.PI_MAIN: "Exp_Pipe", - C.PI_HANDLER: "no", - C.PI_DESCRIPTION: _("""Jingle Pipe Transfer experimental plugin""") -} - -CONFIRM = D_(u"{peer} wants to send you a pipe stream, do you accept ?") -CONFIRM_TITLE = D_(u"Pipe stream") - -class Exp_Pipe(object): - """This non standard jingle application works with named pipes""" - - def __init__(self, host): - log.info(_("Plugin Pipe initialization")) - self.host = host - self._j = host.plugins["XEP-0166"] # shortcut to access jingle - self._j.registerApplication(NS_PIPE, self) - host.bridge.addMethod("pipeOut", ".plugin", in_sign='sss', out_sign='', method=self._pipeOut) - - # jingle callbacks - - def _pipeOut(self, peer_jid_s, filepath, profile_key=C.PROF_KEY_NONE): - profile = self.host.memory.getProfileName(profile_key) - self.pipeOut(jid.JID(peer_jid_s), filepath, profile) - - def pipeOut(self, peer_jid, filepath, profile): - """send a file using EXP-PIPE - - @param peer_jid(jid.JID): recipient - @param filepath(unicode): absolute path to the named pipe to send - @param profile_key: %(doc_profile_key)s - @return: an unique id to identify the transfer - """ - self._j.initiate(peer_jid, - [{'app_ns': NS_PIPE, - 'senders': self._j.ROLE_INITIATOR, - 'app_kwargs': {'filepath': filepath, - }, - }], - profile=profile) - - def jingleSessionInit(self, session, content_name, filepath, profile=C.PROF_KEY_NONE): - content_data = session['contents'][content_name] - application_data = content_data['application_data'] - assert 'file_path' not in application_data - application_data['file_path'] = filepath - desc_elt = domish.Element((NS_PIPE, 'description')) - return desc_elt - - def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile): - """This method request confirmation for a jingle session""" - content_data = session['contents'][content_name] - if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): - log.warning(u"Bad sender, assuming initiator") - content_data['senders'] = self._j.ROLE_INITIATOR - - def gotConfirmation(data): - if data.get('cancelled', False): - return False - application_data = content_data['application_data'] - dest_path = application_data['file_path'] = data['path'] - content_data['file_obj'] = open(dest_path, 'w+') - finished_d = content_data['finished_d'] = defer.Deferred() - args = [session, content_name, content_data, profile] - finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) - return True - - d = xml_tools.deferDialog(self.host, - _(CONFIRM).format(peer=session['peer_jid'].full()), - _(CONFIRM_TITLE), - type_=C.XMLUI_DIALOG_FILE, - options={C.XMLUI_DATA_FILETYPE: C.XMLUI_DATA_FILETYPE_DIR}, - action_extra={'meta_from_jid': session['peer_jid'].full(), - 'meta_type': "PIPE", - }, - security_limit=SECURITY_LIMIT, - profile=profile) - - d.addCallback(gotConfirmation) - return d - - def jingleHandler(self, action, session, content_name, desc_elt, profile): - content_data = session['contents'][content_name] - application_data = content_data['application_data'] - if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): - pass - elif action == self._j.A_SESSION_ACCEPT: - assert not 'file_obj' in content_data - filepath = application_data['file_path'] - content_data['file_obj'] = open(filepath, 'r') # XXX: we have to be sure that filepath is well opened, as reading can block it - finished_d = content_data['finished_d'] = defer.Deferred() - args = [session, content_name, content_data, profile] - finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) - else: - log.warning(u"FIXME: unmanaged action {}".format(action)) - return desc_elt - - def _finishedCb(self, dummy, session, content_name, content_data, profile): - log.info(u"Pipe transfer completed") - self._j.contentTerminate(session, content_name, profile=profile) - content_data['file_obj'].close() - - def _finishedEb(self, failure, session, content_name, content_data, profile): - log.warning(u"Error while streaming pipe: {}".format(failure)) - content_data['file_obj'].close() - self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)
--- a/src/plugins/plugin_misc_file.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_misc_file.py Thu Feb 08 00:37:42 2018 +0100 @@ -23,11 +23,11 @@ log = getLogger(__name__) from sat.core import exceptions from sat.tools import xml_tools +from sat.tools import stream from twisted.internet import defer from twisted.words.protocols.jabber import jid import os import os.path -import uuid PLUGIN_INFO = { @@ -52,143 +52,8 @@ PROGRESS_ID_KEY = 'progress_id' -class SatFile(object): - """A file-like object to have high level files manipulation""" - # TODO: manage "with" statement - - def __init__(self, host, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True, profile=C.PROF_KEY_NONE): - """ - @param host: %(doc_host)s - @param path(str): path of the file to get - @param mode(str): same as for built-in "open" function - @param uid(unicode, None): unique id identifing this progressing element - This uid will be used with self.host.progressGet - will be automaticaly generated if None - @param size(None, int): size of the file - @param data_cb(None, callable): method to call on each data read/write - mainly useful to do things like calculating hash - @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent - if False, you'll have to call self.progressFinished and self.progressError yourself - progressStarted signal is always sent automatically - """ - self.host = host - self.uid = uid or unicode(uuid.uuid4()) - self._file = open(path, mode) - self.size = size - self.data_cb = data_cb - self.profile = profile - self.auto_end_signals = auto_end_signals - metadata = self.getProgressMetadata() - self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=profile) - self.host.bridge.progressStarted(self.uid, metadata, self.profile) - - def checkSize(self): - """Check that current size correspond to given size - - must be used when the transfer is supposed to be finished - @return (bool): True if the position is the same as given size - @raise exceptions.NotFound: size has not be specified - """ - position = self._file.tell() - if self.size is None: - raise exceptions.NotFound - return position == self.size - - - def close(self, progress_metadata=None, error=None): - """Close the current file - - @param progress_metadata(None, dict): metadata to send with _onProgressFinished message - @param error(None, unicode): set to an error message if progress was not successful - mutually exclusive with progress_metadata - error can happen even if error is None, if current size differ from given size - """ - if self._file.closed: - return # avoid double close (which is allowed) error - if error is None: - try: - size_ok = self.checkSize() - except exceptions.NotFound: - size_ok = True - if not size_ok: - error = u'declared and actual size mismatch' - log.warning(error) - progress_metadata = None - - self._file.close() - - if self.auto_end_signals: - if error is None: - self.progressFinished(progress_metadata) - else: - assert progress_metadata is None - self.progressError(error) - - self.host.removeProgressCb(self.uid, self.profile) - - def progressFinished(self, metadata=None): - if metadata is None: - metadata = {} - self.host.bridge.progressFinished(self.uid, metadata, self.profile) - - def progressError(self, error): - self.host.bridge.progressError(self.uid, error, self.profile) - - def flush(self): - self._file.flush() - - def write(self, buf): - self._file.write(buf) - if self.data_cb is not None: - return self.data_cb(buf) - - def read(self, size=-1): - read = self._file.read(size) - if self.data_cb is not None and read: - self.data_cb(read) - return read - - def seek(self, offset, whence=os.SEEK_SET): - self._file.seek(offset, whence) - - def tell(self): - return self._file.tell() - - def mode(self): - return self._file.mode() - - def getProgressMetadata(self): - """Return progression metadata as given to progressStarted - - @return (dict): metadata (check bridge for documentation) - """ - metadata = {'type': C.META_TYPE_FILE} - - mode = self._file.mode - if '+' in mode: - pass # we have no direction in read/write modes - elif mode in ('r', 'rb'): - metadata['direction'] = 'out' - elif mode in ('w', 'wb'): - metadata['direction'] = 'in' - elif 'U' in mode: - metadata['direction'] = 'out' - else: - raise exceptions.InternalError - - metadata['name'] = self._file.name - - return metadata - - def getProgress(self, progress_id, profile): - ret = {'position': self._file.tell()} - if self.size: - ret['size'] = self.size - return ret - - class FilePlugin(object): - File=SatFile + File=stream.SatFile def __init__(self, host): log.info(_("plugin File initialization")) @@ -198,10 +63,11 @@ host.importMenu((D_("Action"), D_("send file")), self._fileSendMenu, security_limit=10, help_string=D_("Send a file"), type_=C.MENU_SINGLE) def _fileSend(self, peer_jid_s, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE): - return self.fileSend(jid.JID(peer_jid_s), filepath, name or None, file_desc or None, profile) + client = self.host.getClient(profile) + return self.fileSend(client, jid.JID(peer_jid_s), filepath, name or None, file_desc or None) @defer.inlineCallbacks - def fileSend(self, peer_jid, filepath, filename=None, file_desc=None, profile=C.PROF_KEY_NONE): + def fileSend(self, client, peer_jid, filepath, filename=None, file_desc=None): """Send a file using best available method @param peer_jid(jid.JID): jid of the destinee @@ -211,7 +77,6 @@ @param profile: %(doc_profile)s @return (dict): action dictionary, with progress id in case of success, else xmlui message """ - client = self.host.getClient(profile) if not os.path.isfile(filepath): raise exceptions.DataError(u"The given path doesn't link to a file") if not filename: @@ -220,18 +85,18 @@ has_feature = yield self.host.hasFeature(client, namespace, peer_jid) if has_feature: log.info(u"{name} method will be used to send the file".format(name=method_name)) - progress_id = yield callback(peer_jid, filepath, filename, file_desc, profile) + progress_id = yield callback(client, peer_jid, filepath, filename, file_desc) defer.returnValue({'progress': progress_id}) msg = u"Can't find any method to send file to {jid}".format(jid=peer_jid.full()) log.warning(msg) defer.returnValue({'xmlui': xml_tools.note(u"Can't transfer file", msg, C.XMLUI_DATA_LVL_WARNING).toXml()}) - def _onFileChoosed(self, peer_jid, data, profile): + def _onFileChoosed(self, client, peer_jid, data): cancelled = C.bool(data.get("cancelled", C.BOOL_FALSE)) if cancelled: return path=data['path'] - return self.fileSend(peer_jid, path, profile=profile) + return self.fileSend(client, peer_jid, path) def _fileSendMenu(self, data, profile): """ XMLUI activated by menu: return file sending UI @@ -243,7 +108,7 @@ except RuntimeError: raise exceptions.DataError(_("Invalid JID")) - file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(jid_, data, profile), with_data=True, one_shot=True) + file_choosed_id = self.host.registerCallback(lambda data, profile: self._onFileChoosed(self.host.getClient(profile), jid_, data), with_data=True, one_shot=True) xml_ui = xml_tools.XMLUI( C.XMLUI_DIALOG, dialog_opt = { @@ -278,25 +143,37 @@ # Dialogs with user # the overwrite check is done here - def _openFileWrite(self, file_path, transfer_data, file_data, profile): - assert 'file_obj' not in transfer_data - transfer_data['file_obj'] = SatFile( - self.host, - file_path, - 'wb', - uid=file_data[PROGRESS_ID_KEY], - size=file_data['size'], - data_cb = file_data.get('data_cb'), - profile=profile, - ) + def _openFileWrite(self, client, file_path, transfer_data, file_data, stream_object): + if stream_object: + assert 'stream_object' not in transfer_data + transfer_data['stream_object'] = stream.FileStreamObject( + self.host, + client, + file_path, + mode='wb', + uid=file_data[PROGRESS_ID_KEY], + size=file_data['size'], + data_cb = file_data.get('data_cb'), + ) + else: + assert 'file_obj' not in transfer_data + transfer_data['file_obj'] = stream.SatFile( + self.host, + client, + file_path, + mode='wb', + uid=file_data[PROGRESS_ID_KEY], + size=file_data['size'], + data_cb = file_data.get('data_cb'), + ) - def _gotConfirmation(self, data, peer_jid, transfer_data, file_data, profile): + def _gotConfirmation(self, data, client, peer_jid, transfer_data, file_data, stream_object): """Called when the permission and dest path have been received @param peer_jid(jid.JID): jid of the file sender @param transfer_data(dict): same as for [self.getDestDir] @param file_data(dict): same as for [self.getDestDir] - @param profile: %(doc_profile)s + @param stream_object(bool): same as for [self.getDestDir] return (bool): True if copy is wanted and OK False if user wants to cancel if file exists ask confirmation and call again self._getDestDir if needed @@ -311,10 +188,10 @@ if os.path.exists(file_path): def check_overwrite(overwrite): if overwrite: - self._openFileWrite(file_path, transfer_data, file_data, profile) + self._openFileWrite(client, file_path, transfer_data, file_data, stream_object) return True else: - return self.getDestDir(peer_jid, transfer_data, file_data, profile) + return self.getDestDir(client, peer_jid, transfer_data, file_data) exists_d = xml_tools.deferConfirm( self.host, @@ -325,14 +202,14 @@ 'meta_progress_id': file_data[PROGRESS_ID_KEY] }, security_limit=SECURITY_LIMIT, - profile=profile) + profile=client.profile) exists_d.addCallback(check_overwrite) return exists_d - self._openFileWrite(file_path, transfer_data, file_data, profile) + self._openFileWrite(client, file_path, transfer_data, file_data, stream_object) return True - def getDestDir(self, peer_jid, transfer_data, file_data, profile): + def getDestDir(self, client, peer_jid, transfer_data, file_data, stream_object=False): """Request confirmation and destination dir to user Overwrite confirmation is managed. @@ -341,7 +218,7 @@ @param filename(unicode): name of the file @param transfer_data(dict): data of the transfer session, it will be only used to store the file_obj. - "file_obj" key *MUST NOT* exist before using getDestDir + "file_obj" (or "stream_object") key *MUST NOT* exist before using getDestDir @param file_data(dict): information about the file to be transfered It MUST contain the following keys: - peer_jid (jid.JID): other peer jid @@ -355,7 +232,8 @@ - data_cb (callable): method called on each data read/write "file_path" will be added to this dict once destination selected "size_human" will also be added with human readable file size - @param profile: %(doc_profile)s + @param stream_object(bool): if True, a stream_object will be used instead of file_obj + a stream.FileStreamObject will be used return (defer.Deferred): True if transfer is accepted """ filename = file_data['name'] @@ -373,6 +251,6 @@ 'meta_progress_id': file_data[PROGRESS_ID_KEY] }, security_limit=SECURITY_LIMIT, - profile=profile) - d.addCallback(self._gotConfirmation, peer_jid, transfer_data, file_data, profile) + profile=client.profile) + d.addCallback(self._gotConfirmation, client, peer_jid, transfer_data, file_data, stream_object) return d
--- a/src/plugins/plugin_misc_ip.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_misc_ip.py Thu Feb 08 00:37:42 2018 +0100 @@ -111,11 +111,10 @@ self._external_ip_cache = None self._local_ip_cache = None - def _externalAllowed(self, profile): + def _externalAllowed(self, client): """Return value of parameter with autorisation of user to do external requests if parameter is not set, a dialog is shown to use to get its confirmation, and parameted is set according to answer - @param profile: %(doc_profile)s @return (defer.Deferred[bool]): True if external request is autorised """ allow_get_ip = self.host.memory.params.getParamA(GET_IP_NAME, GET_IP_CATEGORY, use_default=False) @@ -127,7 +126,7 @@ # need to be fixed when params will be refactored self.host.memory.setParam(GET_IP_NAME, C.boolConst(allowed), GET_IP_CATEGORY) return allowed - d = xml_tools.deferConfirm(self.host, _(GET_IP_CONFIRM), _(GET_IP_CONFIRM_TITLE), profile=profile) + d = xml_tools.deferConfirm(self.host, _(GET_IP_CONFIRM), _(GET_IP_CONFIRM_TITLE), profile=client.profile) d.addCallback(setParam) return d @@ -185,10 +184,9 @@ return d @defer.inlineCallbacks - def getLocalIPs(self, profile): + def getLocalIPs(self, client): """Try do discover local area network IPs - @param profile): %(doc_profile)s @return (deferred): list of lan IP addresses if there are several addresses, the one used with the server is put first if no address is found, localhost IP will be in the list @@ -196,7 +194,6 @@ # TODO: manage permission requesting (e.g. for UMTS link) if self._local_ip_cache is not None: defer.returnValue(self._local_ip_cache) - client = self.host.getClient(profile) addresses = [] localhost = ['127.0.0.1'] @@ -231,7 +228,7 @@ defer.returnValue(addresses) # still not luck, we need to contact external website - allow_get_ip = yield self._externalAllowed(profile) + allow_get_ip = yield self._externalAllowed(client) if not allow_get_ip: defer.returnValue(addresses or localhost) @@ -245,22 +242,19 @@ defer.returnValue(addresses) @defer.inlineCallbacks - def getExternalIP(self, profile): + def getExternalIP(self, client): """Try to discover external IP - @param profile: %(doc_profile)s @return (deferred): external IP address or None if it can't be discovered """ if self._external_ip_cache is not None: defer.returnValue(self._external_ip_cache) - client = self.host.getClient(profile) # we first try with XEP-0279 ip_check = yield self.host.hasFeature(client, NS_IP_CHECK) if ip_check: log.debug(u"Server IP Check available, we use it to retrieve our IP") - client = self.host.getClient(profile) iq_elt = client.IQ("get") iq_elt.addElement((NS_IP_CHECK, 'address')) try: @@ -287,7 +281,7 @@ defer.returnValue(nat_ip) # and finally by requesting external website - allow_get_ip = yield self._externalAllowed(profile) + allow_get_ip = yield self._externalAllowed(client) try: ip = (yield webclient.getPage(GET_IP_PAGE)) if allow_get_ip else None except (internet_error.DNSLookupError, internet_error.TimeoutError):
--- a/src/plugins/plugin_xep_0047.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0047.py Thu Feb 08 00:37:42 2018 +0100 @@ -126,38 +126,34 @@ """ return self._createSession(*args, **kwargs)[DEFER_KEY] - def _createSession(self, file_obj, to_jid, sid, profile): + def _createSession(self, client, stream_object, to_jid, sid): """Called when a bytestream is imminent - @param file_obj(file): File object where data will be written + @param stream_object(IConsumer): stream object where data will be written @param to_jid(jid.JId): jid of the other peer @param sid(unicode): session id - @param profile: %(doc_profile)s @return (dict): session data """ - client = self.host.getClient(profile) if sid in client.xep_0047_current_stream: raise exceptions.ConflictError(u'A session with this id already exists !') session_data = client.xep_0047_current_stream[sid] = \ {'id': sid, DEFER_KEY: defer.Deferred(), 'to': to_jid, - 'file_obj': file_obj, + 'stream_object': stream_object, 'seq': -1, 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), } return session_data - def _onIBBOpen(self, iq_elt, profile): + def _onIBBOpen(self, iq_elt, client): """"Called when an IBB <open> element is received @param iq_elt(domish.Element): the whole <iq> stanza - @param profile: %(doc_profile)s """ log.debug(_(u"IBB stream opening")) iq_elt.handled = True - client = self.host.getClient(profile) open_elt = iq_elt.elements(NS_IBB, 'open').next() block_size = open_elt.getAttribute('block-size') sid = open_elt.getAttribute('sid') @@ -184,20 +180,18 @@ # we now set the stream observer to look after data packet # FIXME: if we never get the events, the observers stay. # would be better to have generic observer and check id once triggered - client.xmlstream.addObserver(event_data, observer_cb, profile=profile) - client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, profile=profile) + client.xmlstream.addObserver(event_data, observer_cb, client=client) + client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client) # finally, we send the accept stanza iq_result_elt = xmlstream.toResponse(iq_elt, 'result') client.send(iq_result_elt) - def _onIBBClose(self, iq_elt, profile): + def _onIBBClose(self, iq_elt, client): """"Called when an IBB <close> element is received @param iq_elt(domish.Element): the whole <iq> stanza - @param profile: %(doc_profile)s """ iq_elt.handled = True - client = self.host.getClient(profile) log.debug(_("IBB stream closing")) close_elt = iq_elt.elements(NS_IBB, 'close').next() # XXX: this observer is only triggered on valid sid, so we don't need to check it @@ -207,15 +201,13 @@ client.send(iq_result_elt) self._killSession(sid, client) - def _onIBBData(self, element, profile): + def _onIBBData(self, element, client): """Observer called on <iq> or <message> stanzas with data element - Manage the data elelement (check validity and write to the file_obj) + Manage the data elelement (check validity and write to the stream_object) @param element(domish.Element): <iq> or <message> stanza - @param profile: %(doc_profile)s """ element.handled = True - client = self.host.getClient(profile) data_elt = element.elements(NS_IBB, 'data').next() sid = data_elt['sid'] @@ -226,7 +218,7 @@ return self._sendError('item-not-found', None, element, client) from_jid = session_data["to"] - file_obj = session_data["file_obj"] + stream_object = session_data["stream_object"] if from_jid.full() != element['from']: log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from'])) @@ -248,7 +240,7 @@ # we can now decode the data try: - file_obj.write(base64.b64decode(str(data_elt))) + stream_object.write(base64.b64decode(str(data_elt))) except TypeError: # The base64 data is invalid log.warning(_(u"Invalid base64 data")) @@ -276,17 +268,15 @@ self._killSession(sid, client, error_condition) client.send(iq_elt) - def startStream(self, file_obj, to_jid, sid, block_size=None, profile=C.PROF_KEY_NONE): + def startStream(self, client, stream_object, to_jid, sid, block_size=None): """Launch the stream workflow - @param file_obj(file): file_obj to send + @param stream_object(ifaces.IStreamProducer): stream object to send @param to_jid(jid.JID): JID of the recipient @param sid(unicode): Stream session id @param block_size(int, None): size of the block (or None for default) - @param profile: %(doc_profile)s """ - session_data = self._createSession(file_obj, to_jid, sid, profile) - client = self.host.getClient(profile) + session_data = self._createSession(client, stream_object, to_jid, sid) if block_size is None: block_size = XEP_0047.BLOCK_SIZE @@ -313,7 +303,7 @@ """ session_data["timer"].reset(TIMEOUT) - buffer_ = session_data["file_obj"].read(session_data["block_size"]) + buffer_ = session_data["stream_object"].read(session_data["block_size"]) if buffer_: next_iq_elt = client.IQ() next_iq_elt['to'] = session_data["to"].full() @@ -357,7 +347,7 @@ self.plugin_parent = parent def connectionInitialized(self): - self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, profile=self.parent.profile) + self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_IBB)]
--- a/src/plugins/plugin_xep_0065.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0065.py Thu Feb 08 00:37:42 2018 +0100 @@ -66,7 +66,6 @@ from twisted.words.protocols.jabber import error as jabber_error from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import xmlstream -from twisted.protocols.basic import FileSender from twisted.internet import defer from collections import namedtuple import struct @@ -294,7 +293,7 @@ return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest() -class SOCKSv5(protocol.Protocol, FileSender): +class SOCKSv5(protocol.Protocol): CHUNK_SIZE = 2**16 def __init__(self, session_hash=None): @@ -317,15 +316,17 @@ self.peersock = None self.addressType = 0 self.requestType = 0 - self._file_obj = None + self._stream_object = None self.active = False # set to True when protocol is actually used for transfer # used by factories to know when the finished Deferred can be triggered @property - def file_obj(self): - if self._file_obj is None: - self._file_obj = self.getSession()['file'] - return self._file_obj + def stream_object(self): + if self._stream_object is None: + self._stream_object = self.getSession()['stream_object'] + if self.server_mode: + self._stream_object.registerProducer(self.transport, True) + return self._stream_object def getSession(self): """Return session associated with this candidate @@ -508,10 +509,10 @@ if chunk_size is not None: self.CHUNK_SIZE = chunk_size log.debug(u"Starting file transfer") - d = self.beginFileTransfer(self.file_obj, self.transport) - d.addCallback(self.fileTransfered) + d = self.stream_object.startStream(self.transport) + d.addCallback(self.streamFinished) - def fileTransfered(self, d): + def streamFinished(self, d): log.info(_("File transfer completed, closing connection")) self.transport.loseConnection() @@ -535,7 +536,7 @@ def dataReceived(self, buf): if self.state == STATE_READY: # Everything is set, we just have to write the incoming data - self.file_obj.write(buf) + self.stream_object.write(buf) if not self.active: self.active = True self.getSession()[TIMER_KEY].cancel() @@ -576,7 +577,7 @@ self.parent = parent def getSession(self, session_hash): - return self.parent.getSession(session_hash, None) + return self.parent.getSession(None, session_hash) def startTransfer(self, session_hash, chunk_size=None): session = self.getSession(session_hash) @@ -630,17 +631,16 @@ class Socks5ClientFactory(protocol.ClientFactory): protocol = SOCKSv5 - def __init__(self, parent, session, session_hash, profile): + def __init__(self, client, parent, session, session_hash): """Init the Client Factory @param session(dict): session data @param session_hash(unicode): hash used for peer_connection hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 - @param profile(unciode): %(doc_profile)s """ self.session = session self.session_hash = session_hash - self.profile = profile + self.client = client self.connection = defer.Deferred() self._protocol_instance = None self.connector = None @@ -696,7 +696,7 @@ self.host = host # session data - self.hash_profiles_map = {} # key: hash of the transfer session, value: session data + self.hash_clients_map = {} # key: hash of the transfer session, value: session data self._cache_proxies = {} # key: server jid, value: proxy data # misc data @@ -751,11 +751,10 @@ return self._server_factory @defer.inlineCallbacks - def getProxy(self, profile): + def getProxy(self, client): """Return the proxy available for this profile - cache is used between profiles using the same server - @param profile: %(doc_profile)s + cache is used between clients using the same server @return ((D)(ProxyInfos, None)): Found proxy infos, or None if not acceptable proxy is found """ @@ -763,7 +762,6 @@ log.info(u"No proxy found on this server") self._cache_proxies[server] = None defer.returnValue(None) - client = self.host.getClient(profile) server = client.jid.host try: defer.returnValue(self._cache_proxies[server]) @@ -810,8 +808,8 @@ """ self.getSocks5ServerFactory() local_port = self._server_factory_port - external_ip = yield self._ip.getExternalIP(client.profile) - local_ips = yield self._ip.getLocalIPs(client.profile) + external_ip = yield self._ip.getExternalIP(client) + local_ips = yield self._ip.getLocalIPs(client) if external_ip is not None and self._external_port is None: if external_ip != local_ips[0]: @@ -828,16 +826,14 @@ defer.returnValue((local_port, self._external_port, local_ips, external_ip)) @defer.inlineCallbacks - def getCandidates(self, profile): + def getCandidates(self, client): """Return a list of our stream candidates - @param profile: %(doc_profile)s @return (D(list[Candidate])): list of candidates, ordered by priority """ - client = self.host.getClient(profile) server_factory = yield self.getSocks5ServerFactory() local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) - proxy = yield self.getProxy(profile) + proxy = yield self.getProxy(client) # its time to gather the candidates candidates = [] @@ -873,7 +869,7 @@ candidate.factory.connector = connector return candidate.factory.connection - def connectCandidate(self, candidate, session_hash, peer_session_hash=None, delay=None, profile=C.PROF_KEY_NONE): + def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None): """Connect to a candidate Connection will be done with a Socks5ClientFactory @@ -887,14 +883,13 @@ - when a peer connect to a proxy *he proposed himself* in practice, peer_session_hash is only used by tryCandidates @param delay(None, float): optional delay to wait before connection, in seconds - @param profile: %(doc_profile)s @return (D): Deferred launched when TCP connection + Socks5 connection is done """ if peer_session_hash is None: # for XEP-0065, only one hash is needed peer_session_hash = session_hash - session = self.getSession(session_hash, profile) - factory = Socks5ClientFactory(self, session, peer_session_hash, profile) + session = self.getSession(client, session_hash) + factory = Socks5ClientFactory(client, self, session, peer_session_hash) candidate.factory = factory if delay is None: d = defer.succeed(candidate.host) @@ -904,23 +899,23 @@ d.addCallback(self._addConnector, candidate) return d - def tryCandidates(self, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE): + def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None): defers_list = [] for candidate in candidates: delay = CANDIDATE_DELAY * len(defers_list) if candidate.type == XEP_0065.TYPE_PROXY: delay += CANDIDATE_DELAY_PROXY - d = self.connectCandidate(candidate, session_hash, peer_session_hash, delay, profile) + d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay) if connection_cb is not None: - d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile)) + d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate)) if connection_eb is not None: - d.addErrback(connection_eb, candidate, profile) + d.addErrback(connection_eb, client, candidate) defers_list.append(d) return defers_list - def getBestCandidate(self, candidates, session_hash, peer_session_hash=None, profile=C.PROF_KEY_NONE): + def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None): """Get best candidate (according to priority) which can connect @param candidates(iterable[Candidate]): candidates to test @@ -928,12 +923,11 @@ hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 @param peer_session_hash(unicode, None): hash of the other peer only useful for XEP-0260, must be None for XEP-0065 streamhost candidates - @param profile: %(doc_profile)s @return (D(None, Candidate)): best candidate or None if none can connect """ defer_candidates = None - def connectionCb(candidate, profile): + def connectionCb(client, candidate): log.info(u"Connection of {} successful".format(unicode(candidate))) for idx, other_candidate in enumerate(candidates): try: @@ -943,7 +937,7 @@ except AttributeError: assert other_candidate is None - def connectionEb(failure, candidate, profile): + def connectionEb(failure, client, candidate): if failure.check(defer.CancelledError): log.debug(u"Connection of {} has been cancelled".format(candidate)) else: @@ -957,7 +951,7 @@ good_candidates = [c for c in candidates if c] return good_candidates[0] if good_candidates else None - defer_candidates = self.tryCandidates(candidates, session_hash, peer_session_hash, connectionCb, connectionEb, profile) + defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb) d_list = defer.DeferredList(defer_candidates) d_list.addCallback(allTested) return d_list @@ -969,7 +963,7 @@ @param client: %(doc_client)s """ log.info(u"Socks5 Bytestream: TimeOut reached") - session = self.getSession(session_hash, client.profile) + session = self.getSession(client, session_hash) session[DEFER_KEY].errback(exceptions.TimeOutError) def killSession(self, reason, session_hash, sid, client): @@ -989,8 +983,8 @@ )) try: - assert self.hash_profiles_map[session_hash] == client.profile - del self.hash_profiles_map[session_hash] + assert self.hash_clients_map[session_hash] == client + del self.hash_clients_map[session_hash] except KeyError: pass @@ -1015,19 +1009,17 @@ return reason - def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE): + def startStream(self, client, stream_object, to_jid, sid): """Launch the stream workflow - @param file_obj: file_obj to send + @param streamProducer: stream_object to use @param to_jid: JID of the recipient @param sid: Stream session id @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong - @param profile: %(doc_profile)s @return (D): Deferred fired when session is finished """ - client = self.host.getClient(profile) - session_data = self._createSession(file_obj, to_jid, sid, True, client.profile) + session_data = self._createSession(client, stream_object, to_jid, sid, True) session_data[client] = client @@ -1051,7 +1043,7 @@ args = [session_data, client] d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) - self.getCandidates(profile).addCallback(gotCandidates) + self.getCandidates(client).addCallback(gotCandidates) return session_data[DEFER_KEY] def _IQNegotiationCb(self, iq_elt, session_data, client): @@ -1080,7 +1072,7 @@ if candidate.type == XEP_0065.TYPE_PROXY: log.info(u"A Socks5 proxy is used") - d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile) + d = self.connectCandidate(client, candidate, session_data['hash']) d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client)) d.addErrback(self._activationEb) else: @@ -1102,22 +1094,20 @@ """ return self._createSession(*args, **kwargs)[DEFER_KEY] - def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE): + def _createSession(self, client, stream_object, to_jid, sid, requester=False): """Called when a bytestream is imminent - @param file_obj(file): File object where data will be written + @param stream_object(iface.IStreamProducer): File object where data will be written @param to_jid(jid.JId): jid of the other peer @param sid(unicode): session id @param initiator(bool): if True, this session is create by initiator - @param profile: %(doc_profile)s @return (dict): session data """ - client = self.host.getClient(profile) if sid in client.xep_0065_sid_session: raise exceptions.ConflictError(u'A session with this id already exists !') if requester: session_hash = getSessionHash(client.jid, to_jid, sid) - session_data = self._registerHash(session_hash, file_obj, profile) + session_data = self._registerHash(client, session_hash, stream_object) else: session_hash = getSessionHash(to_jid, client.jid, sid) session_d = defer.Deferred() @@ -1130,31 +1120,30 @@ session_data.update( {'id': sid, 'peer_jid': to_jid, - 'file': file_obj, + 'stream_object': stream_object, 'hash': session_hash, }) return session_data - def getSession(self, session_hash, profile): + def getSession(self, client, session_hash): """Return session data @param session_hash(unicode): hash of the session hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 - @param profile(None, unicode): profile of the peer - None is used only if profile is unknown (this is only the case + @param client(None, SatXMPPClient): client of the peer + None is used only if client is unknown (this is only the case for incoming request received by Socks5ServerFactory). None must only be used by Socks5ServerFactory. See comments below for details @return (dict): session data """ - if profile is None: + if client is None: try: - profile = self.hash_profiles_map[session_hash] + client = self.hash_clients_map[session_hash] except KeyError as e: log.warning(u"The requested session doesn't exists !") raise e - client = self.host.getClient(profile) return client._s5b_sessions[session_hash] def registerHash(self, *args, **kwargs): @@ -1163,16 +1152,14 @@ """ return self._registerHash(*args, **kwargs)[DEFER_KEY] - def _registerHash(self, session_hash, file_obj, profile): + def _registerHash(self, client, session_hash, stream_object): """Create a session_data associated to hash @param session_hash(str): hash of the session - @param file_obj(file, None): file-like object + @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object None if it will be filled later - @param profile: %(doc_profile)s return (dict): session data """ - client = self.host.getClient(profile) assert session_hash not in client._s5b_sessions session_d = defer.Deferred() session_d.addBoth(self.killSession, session_hash, None, client) @@ -1181,23 +1168,22 @@ TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), } - if file_obj is not None: - session_data['file'] = file_obj + if stream_object is not None: + session_data['stream_object'] = stream_object - assert session_hash not in self.hash_profiles_map - self.hash_profiles_map[session_hash] = profile + assert session_hash not in self.hash_clients_map + self.hash_clients_map[session_hash] = client return session_data - def associateFileObj(self, session_hash, file_obj, profile): - """Associate a file obj with a session""" - session_data = self.getSession(session_hash, profile) - assert 'file' not in session_data - session_data['file'] = file_obj + def associateStreamObject(self, client, session_hash, stream_object): + """Associate a stream object with a session""" + session_data = self.getSession(client, session_hash) + assert 'stream_object' not in session_data + session_data['stream_object'] = stream_object - def streamQuery(self, iq_elt, profile): + def streamQuery(self, iq_elt, client): log.debug(u"BS stream query") - client = self.host.getClient(profile) iq_elt.handled = True @@ -1238,7 +1224,7 @@ for candidate in candidates: log.info(u"Candidate proposed: {}".format(candidate)) - d = self.getBestCandidate(candidates, session_data['hash'], profile=profile) + d = self.getBestCandidate(client, candidates, session_data['hash']) d.addCallback(self._ackStream, iq_elt, session_data, client) def _ackStream(self, candidate, iq_elt, session_data, client): @@ -1262,7 +1248,7 @@ self.host = plugin_parent.host def connectionInitialized(self): - self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile) + self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_BS)]
--- a/src/plugins/plugin_xep_0095.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0095.py Thu Feb 08 00:37:42 2018 +0100 @@ -72,11 +72,11 @@ except KeyError: log.error(u"Trying to unregister SI profile [{}] which was not registered".format(si_profile)) - def streamInit(self, iq_elt, profile): + def streamInit(self, iq_elt, client): """This method is called on stream initiation (XEP-0095 #3.2) @param iq_elt: IQ element - @param profile: %(doc_profile)s""" + """ log.info(_("XEP-0095 Stream initiation")) iq_elt.handled = True si_elt = iq_elt.elements(NS_SI, 'si').next() @@ -86,19 +86,17 @@ si_profile_key = si_profile[len(SI_PROFILE_HEADER):] if si_profile.startswith(SI_PROFILE_HEADER) else si_profile if si_profile_key in self.si_profiles: #We know this SI profile, we call the callback - self.si_profiles[si_profile_key](iq_elt, si_id, si_mime_type, si_elt, profile) + self.si_profiles[si_profile_key](client, iq_elt, si_id, si_mime_type, si_elt) else: #We don't know this profile, we send an error - self.sendError(iq_elt, 'bad-profile', profile) + self.sendError(client, iq_elt, 'bad-profile') - def sendError(self, request, condition, profile): + def sendError(self, client, request, condition): """Send IQ error as a result @param request(domish.Element): original IQ request @param condition(str): error condition - @param profile: %(doc_profile)s """ - client = self.host.getClient(profile) if condition in SI_ERROR_CONDITIONS: si_condition = condition condition = 'bad-request' @@ -111,18 +109,16 @@ client.send(iq_error_elt) - def acceptStream(self, iq_elt, feature_elt, misc_elts=None, profile=C.PROF_KEY_NONE): + def acceptStream(self, client, iq_elt, feature_elt, misc_elts=None): """Send the accept stream initiation answer @param iq_elt(domish.Element): initial SI request @param feature_elt(domish.Element): 'feature' element containing stream method to use @param misc_elts(list[domish.Element]): list of elements to add - @param profile: %(doc_profile)s """ log.info(_("sending stream initiation accept answer")) if misc_elts is None: misc_elts = [] - client = self.host.getClient(profile) result_elt = xmlstream.toResponse(iq_elt, 'result') si_elt = result_elt.addElement((NS_SI, 'si')) si_elt.addChild(feature_elt) @@ -139,7 +135,7 @@ return (iq_elt, si_elt) - def proposeStream(self, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream', profile=C.PROF_KEY_NONE): + def proposeStream(self, client, to_jid, si_profile, feature_elt, misc_elts, mime_type='application/octet-stream'): """Propose a stream initiation @param to_jid(jid.JID): recipient @@ -147,13 +143,11 @@ @param feature_elt(domish.Element): feature element, according to XEP-0020 @param misc_elts(list[domish.Element]): list of elements to add @param mime_type(unicode): stream mime type - @param profile: %(doc_profile)s @return (tuple): tuple with: - session id (unicode) - (D(domish_elt, domish_elt): offer deferred which returl a tuple with iq_elt and si_elt """ - client = self.host.getClient(profile) offer = client.IQ() sid = str(uuid.uuid4()) log.debug(_(u"Stream Session ID: %s") % offer["id"]) @@ -181,7 +175,7 @@ self.host = plugin_parent.host def connectionInitialized(self): - self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.streamInit, profile=self.parent.profile) + self.xmlstream.addObserver(SI_REQUEST, self.plugin_parent.streamInit, client=self.parent) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_SI)] + [disco.DiscoFeature(u"http://jabber.org/protocol/si/profile/{}".format(profile_name)) for profile_name in self.plugin_parent.si_profiles]
--- a/src/plugins/plugin_xep_0096.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0096.py Thu Feb 08 00:37:42 2018 +0100 @@ -23,6 +23,7 @@ log = getLogger(__name__) from sat.core import exceptions from sat.tools import xml_tools +from sat.tools import stream from twisted.words.xish import domish from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error @@ -63,16 +64,15 @@ def unload(self): self._si.unregisterSIProfile(SI_PROFILE_NAME) - def _badRequest(self, iq_elt, message=None, profile=C.PROF_KEY_NONE): + def _badRequest(self, client, iq_elt, message=None): """Send a bad-request error @param iq_elt(domish.Element): initial <IQ> element of the SI request @param message(None, unicode): informational message to display in the logs - @param profile: %(doc_profile)s """ if message is not None: log.warning(message) - self._si.sendError(iq_elt, 'bad-request', profile) + self._si.sendError(client, iq_elt, 'bad-request') def _parseRange(self, parent_elt, file_size): """find and parse <range/> element @@ -107,14 +107,13 @@ return range_, range_offset, range_length - def _transferRequest(self, iq_elt, si_id, si_mime_type, si_elt, profile): + def _transferRequest(self, client, iq_elt, si_id, si_mime_type, si_elt): """Called when a file transfer is requested @param iq_elt(domish.Element): initial <IQ> element of the SI request @param si_id(unicode): Stream Initiation session id @param si_mime_type("unicode"): Mime type of the file (or default "application/octet-stream" if unknown) @param si_elt(domish.Element): request - @param profile: %(doc_profile)s """ log.info(_("XEP-0096 file transfer requested")) peer_jid = jid.JID(iq_elt['from']) @@ -122,18 +121,18 @@ try: file_elt = si_elt.elements(NS_SI_FT, "file").next() except StopIteration: - return self._badRequest(iq_elt, "No <file/> element found in SI File Transfer request", profile) + return self._badRequest(client, iq_elt, "No <file/> element found in SI File Transfer request") try: feature_elt = self.host.plugins["XEP-0020"].getFeatureElt(si_elt) except exceptions.NotFound: - return self._badRequest(iq_elt, "No <feature/> element found in SI File Transfer request", profile) + return self._badRequest(client, iq_elt, "No <feature/> element found in SI File Transfer request") try: filename = file_elt["name"] file_size = int(file_elt["size"]) except (KeyError, ValueError): - return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) + return self._badRequest(client, iq_elt, "Malformed SI File Transfer request") file_date = file_elt.getAttribute("date") file_hash = file_elt.getAttribute("hash") @@ -148,12 +147,12 @@ try: range_, range_offset, range_length = self._parseRange(file_elt, file_size) except ValueError: - return self._badRequest(iq_elt, "Malformed SI File Transfer request", profile) + return self._badRequest(client, iq_elt, "Malformed SI File Transfer request") try: stream_method = self.host.plugins["XEP-0020"].negotiate(feature_elt, 'stream-method', self.managed_stream_m, namespace=None) except KeyError: - return self._badRequest(iq_elt, "No stream method found", profile) + return self._badRequest(client, iq_elt, "No stream method found") if stream_method: if stream_method == self.host.plugins["XEP-0065"].NAMESPACE: @@ -164,7 +163,7 @@ log.error(u"Unknown stream method, this should not happen at this stage, cancelling transfer") else: log.warning(u"Can't find a valid stream method") - self._si.sendError(iq_elt, 'not-acceptable', profile) + self._si.sendError(client, iq_elt, 'not-acceptable') return #if we are here, the transfer can start, we just need user's agreement @@ -172,27 +171,19 @@ "range": range_, "range_offset": range_offset, "range_length": range_length, "si_id": si_id, "progress_id": si_id, "stream_method": stream_method, "stream_plugin": plugin} - d = self._f.getDestDir(peer_jid, data, data, profile) - d.addCallback(self.confirmationCb, iq_elt, data, profile) + d = self._f.getDestDir(client, peer_jid, data, data, stream_object=True) + d.addCallback(self.confirmationCb, client, iq_elt, data) - def _getFileObject(self, dest_path, can_range=False): - """Open file, put file pointer to the end if the file if needed - @param dest_path: path of the destination file - @param can_range: True if the file pointer can be moved - @return: File Object""" - return open(dest_path, "ab" if can_range else "wb") - - def confirmationCb(self, accepted, iq_elt, data, profile): + def confirmationCb(self, accepted, client, iq_elt, data): """Called on confirmation answer @param accepted(bool): True if file transfer is accepted @param iq_elt(domish.Element): initial SI request @param data(dict): session data - @param profile: %(doc_profile)s """ if not accepted: log.info(u"File transfer declined") - self._si.sendError(iq_elt, 'forbidden', profile) + self._si.sendError(client, iq_elt, 'forbidden') return # data, timeout, stream_method, failed_methods = client._xep_0096_waiting_for_approval[sid] # can_range = data['can_range'] == "True" @@ -216,9 +207,9 @@ # file_obj = self._getFileObject(dest_path, can_range) # range_offset = file_obj.tell() - d = data['stream_plugin'].createSession(data['file_obj'], data['peer_jid'], data['si_id'], profile=profile) - d.addCallback(self._transferCb, data, profile) - d.addErrback(self._transferEb, data, profile) + d = data['stream_plugin'].createSession(client, data['stream_object'], data['peer_jid'], data['si_id']) + d.addCallback(self._transferCb, client, data) + d.addErrback(self._transferEb, client, data) #we can send the iq result feature_elt = self.host.plugins["XEP-0020"].chooseOption({'stream-method': data['stream_method']}, namespace=None) @@ -229,32 +220,31 @@ # range_elt['offset'] = str(range_offset) # #TODO: manage range length # misc_elts.append(range_elt) - self._si.acceptStream(iq_elt, feature_elt, misc_elts, profile) + self._si.acceptStream(client, iq_elt, feature_elt, misc_elts) - def _transferCb(self, dummy, data, profile): + def _transferCb(self, dummy, client, data): """Called by the stream method when transfer successfuly finished @param data: session data - @param profile: %(doc_profile)s """ #TODO: check hash - data['file_obj'].close() + data['stream_object'].close() log.info(u'Transfer {si_id} successfuly finished'.format(**data)) - def _transferEb(self, failure, data, profile): + def _transferEb(self, failure, client, data): """Called when something went wrong with the transfer @param id: stream id @param data: session data - @param profile: %(doc_profile)s """ log.warning(u'Transfer {si_id} failed: {reason}'.format(reason=unicode(failure.value), **data)) - data['file_obj'].close() + data['stream_object'].close() def _sendFile(self, peer_jid_s, filepath, name, desc, profile=C.PROF_KEY_NONE): - return self.sendFile(jid.JID(peer_jid_s), filepath, name or None, desc or None, profile) + client = self.host.getClient(profile) + return self.sendFile(client, jid.JID(peer_jid_s), filepath, name or None, desc or None) - def sendFile(self, peer_jid, filepath, name=None, desc=None, profile=C.PROF_KEY_NONE): + def sendFile(self, client, peer_jid, filepath, name=None, desc=None): """Send a file using XEP-0096 @param peer_jid(jid.JID): recipient @@ -265,7 +255,6 @@ @param profile: %(doc_profile)s @return: an unique id to identify the transfer """ - client = self.host.getClient(profile) feature_elt = self.host.plugins["XEP-0020"].proposeFeatures({'stream-method': self.managed_stream_m}, namespace=None) file_transfer_elts = [] @@ -282,7 +271,7 @@ file_transfer_elts.append(domish.Element((None, 'range'))) - sid, offer_d = self._si.proposeStream(peer_jid, SI_PROFILE, feature_elt, file_transfer_elts, profile=client.profile) + sid, offer_d = self._si.proposeStream(client, peer_jid, SI_PROFILE, feature_elt, file_transfer_elts) args = [filepath, sid, size, client] offer_d.addCallbacks(self._fileCb, self._fileEb, args, None, args) return sid @@ -318,15 +307,15 @@ log.warning(u"Invalid stream method received") return - file_obj = self._f.File(self.host, - filepath, - uid=sid, - size=size, - profile=client.profile - ) - d = plugin.startStream(file_obj, jid.JID(iq_elt['from']), sid, profile=client.profile) - d.addCallback(self._sendCb, sid, file_obj, client.profile) - d.addErrback(self._sendEb, sid, file_obj, client.profile) + stream_object = stream.FileStreamObject(self.host, + client, + filepath, + uid=sid, + size=size, + ) + d = plugin.startStream(client, stream_object, jid.JID(iq_elt['from']), sid) + d.addCallback(self._sendCb, client, sid, stream_object) + d.addErrback(self._sendEb, client, sid, stream_object) def _fileEb(self, failure, filepath, sid, size, client): if failure.check(error.StanzaError): @@ -347,16 +336,16 @@ else: log.error(u'Error while proposing stream: {}'.format(failure)) - def _sendCb(self, dummy, sid, file_obj, profile): + def _sendCb(self, dummy, client, sid, stream_object): log.info(_(u'transfer {sid} successfuly finished [{profile}]').format( sid=sid, - profile=profile)) - file_obj.close() + profile=client.profile)) + stream_object.close() - def _sendEb(self, failure, sid, file_obj, profile): + def _sendEb(self, failure, client, sid, stream_object): log.warning(_(u'transfer {sid} failed [{profile}]: {reason}').format( sid=sid, - profile=profile, + profile=client.profile, reason=unicode(failure.value), )) - file_obj.close() + stream_object.close()
--- a/src/plugins/plugin_xep_0166.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0166.py Thu Feb 08 00:37:42 2018 +0100 @@ -122,16 +122,14 @@ jingle_elt['action'] = action return iq_elt, jingle_elt - def sendError(self, error_condition, sid, request, jingle_condition=None, profile=C.PROF_KEY_NONE): + def sendError(self, client, error_condition, sid, request, jingle_condition=None): """Send error stanza @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys @param sid(unicode,None): jingle session id, or None, if session must not be destroyed @param request(domish.Element): original request @param jingle_condition(None, unicode): if not None, additional jingle-specific error information - @param profile: %(doc_profile)s """ - client = self.host.getClient(profile) iq_elt = error.StanzaError(error_condition).toResponse(request) if jingle_condition is not None: iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition)) @@ -143,16 +141,14 @@ def _terminateEb(self, failure_): log.warning(_(u"Error while terminating session: {msg}").format(msg=failure_)) - def terminate(self, reason, session, profile): + def terminate(self, client, reason, session): """Terminate the session send the session-terminate action, and delete the session data @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element if a list of element, add them as children of the <reason/> element @param session(dict): data of the session - @param profile: %(doc_profile)s """ - client = self.host.getClient(profile) iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_TERMINATE) reason_elt = jingle_elt.addElement('reason') if isinstance(reason, basestring): @@ -172,7 +168,6 @@ @param failure_(failure.Failure): the exceptions raised @param sid(unicode): jingle session id - @param profile: %(doc_client)s """ log.warning(u"Error while sending jingle <iq/> stanza: {failure_}".format(failure_=failure_.value)) self._delSession(client, sid) @@ -189,7 +184,7 @@ """ log.warning("Error while processing jingle request") if isinstance(fail, exceptions.DataError): - self.sendError('bad-request', sid, request, profile=client.profile) + self.sendError(client, 'bad-request', sid, request) else: log.error("Unmanaged jingle exception") self._delSession(client, sid) @@ -208,8 +203,8 @@ - if it return True the session is accepted, else rejected. A Deferred can be returned - if not present, a generic accept dialog will be used - - jingleSessionInit(self, session, content_name[, *args, **kwargs], profile): must return the domish.Element used for initial content - - jingleHandler(self, action, session, content_name, transport_elt, profile): + - jingleSessionInit(client, self, session, content_name[, *args, **kwargs]): must return the domish.Element used for initial content + - jingleHandler(client, self, action, session, content_name, transport_elt): called on several action to negociate the application or transport - jingleTerminate: called on session terminate, with reason_elt May be used to clean session @@ -226,8 +221,8 @@ @param transport_type(unicode): type of transport to use (see XEP-0166 §8) @param handler(object): instance of a class which manage the application. Must have the following methods: - - jingleSessionInit(self, session, content_name[, *args, **kwargs], profile): must return the domish.Element used for initial content - - jingleHandler(self, action, session, content_name, transport_elt, profile): + - jingleSessionInit(client, self, session, content_name[, *args, **kwargs]): must return the domish.Element used for initial content + - jingleHandler(client, self, action, session, content_name, transport_elt): called on several action to negociate the application or transport @param priority(int): priority of this transport """ @@ -241,25 +236,23 @@ log.debug(u"new jingle transport registered") @defer.inlineCallbacks - def transportReplace(self, transport_ns, session, content_name, profile=C.PROF_KEY_NONE): + def transportReplace(self, client, transport_ns, session, content_name): """Replace a transport @param transport_ns(unicode): namespace of the new transport to use @param session(dict): jingle session data @param content_name(unicode): name of the content - @param profile: %(doc_profile)s """ # XXX: for now we replace the transport before receiving confirmation from other peer # this is acceptable because we terminate the session if transport is rejected. # this behavious may change in the future. - client = self.host.getClient(profile) content_data= session['contents'][content_name] transport_data = content_data['transport_data'] try: transport = self._transports[transport_ns] except KeyError: raise exceptions.InternalError(u"Unkown transport") - yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, profile) + yield content_data['transport'].handler.jingleHandler(client, XEP_0166.A_DESTROY, session, content_name, None) content_data['transport'] = transport transport_data.clear() @@ -268,11 +261,11 @@ content_elt['name'] = content_name content_elt['creator'] = content_data['creator'] - transport_elt = transport.handler.jingleSessionInit(session, content_name, profile) + transport_elt = transport.handler.jingleSessionInit(client, session, content_name) content_elt.addChild(transport_elt) iq_elt.send() - def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE): + def buildAction(self, client, action, session, content_name): """Build an element according to requested action @param action(unicode): a jingle action (see XEP-0166 §7.2), @@ -280,11 +273,8 @@ transport-replace is managed in the dedicated [transportReplace] method @param session(dict): jingle session data @param content_name(unicode): name of the content - @param profile: %(doc_profile)s @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action """ - client = self.host.getClient(profile) - # we first build iq, jingle and content element which are the same in every cases iq_elt, jingle_elt = self._buildJingleElt(client, session, action) # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked @@ -301,18 +291,16 @@ return iq_elt, context_elt - def buildSessionInfo(self, session, profile=C.PROF_KEY_NONE): + def buildSessionInfo(self, client, session): """Build a session-info action @param session(dict): jingle session data - @param profile: %(doc_profile)s @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element """ - client = self.host.getClient(profile) return self._buildJingleElt(client, session, XEP_0166.A_SESSION_INFO) @defer.inlineCallbacks - def initiate(self, peer_jid, contents, profile=C.PROF_KEY_NONE): + def initiate(self, client, peer_jid, contents): """Send a session initiation request @param peer_jid(jid.JID): jid to establith session with @@ -327,11 +315,9 @@ default to BOTH (see XEP-0166 §7.3) - app_args(list): args to pass to the application plugin - app_kwargs(dict): keyword args to pass to the application plugin - @param profile: %(doc_profile)s @return D(unicode): jingle session id """ assert contents # there must be at least one content - client = self.host.getClient(profile) initiator = client.jid sid = unicode(uuid.uuid4()) # TODO: session cleaning after timeout ? @@ -392,12 +378,11 @@ # then the description element app_args = content.get('app_args', []) app_kwargs = content.get('app_kwargs', {}) - app_kwargs['profile'] = profile - desc_elt = yield application.handler.jingleSessionInit(session, content_name, *app_args, **app_kwargs) + desc_elt = yield application.handler.jingleSessionInit(client, session, content_name, *app_args, **app_kwargs) content_elt.addChild(desc_elt) # and the transport one - transport_elt = yield transport.handler.jingleSessionInit(session, content_name, profile) + transport_elt = yield transport.handler.jingleSessionInit(client, session, content_name) content_elt.addChild(transport_elt) d = iq_elt.send() @@ -411,38 +396,35 @@ """ reactor.callLater(0, self.contentTerminate, *args, **kwargs) - def contentTerminate(self, session, content_name, reason=REASON_SUCCESS, profile=C.PROF_KEY_NONE): + def contentTerminate(self, client, session, content_name, reason=REASON_SUCCESS): """Terminate and remove a content if there is no more content, then session is terminated @param session(dict): jingle session @param content_name(unicode): name of the content terminated @param reason(unicode): reason of the termination - @param profile: %(doc_profile)s """ contents = session['contents'] del contents[content_name] if not contents: - self.terminate(reason, session, profile) + self.terminate(client, reason, session) ## defaults methods called when plugin doesn't have them ## - def jingleRequestConfirmationDefault(self, action, session, content_name, desc_elt, profile): + def jingleRequestConfirmationDefault(self, client, action, session, content_name, desc_elt): """This method request confirmation for a jingle session""" log.debug(u"Using generic jingle confirmation method") - return xml_tools.deferConfirm(self.host, _(CONFIRM_TXT).format(entity=session['peer_jid'].full()), _('Confirm Jingle session'), profile=profile) + return xml_tools.deferConfirm(self.host, _(CONFIRM_TXT).format(entity=session['peer_jid'].full()), _('Confirm Jingle session'), profile=client.profile) ## jingle events ## - def _onJingleRequest(self, request, profile): + def _onJingleRequest(self, request, client): """Called when any jingle request is received - The request will the be dispatched to appropriate method + The request will then be dispatched to appropriate method according to current state @param request(domish.Element): received IQ request - @para profile: %(doc_profile)s """ - client = self.host.getClient(profile) request.handled = True jingle_elt = request.elements(NS_JINGLE, 'jingle').next() @@ -453,7 +435,7 @@ raise KeyError except KeyError: log.warning(u"Received jingle request has no sid attribute") - self.sendError('bad-request', None, request, profile=profile) + self.sendError(client, 'bad-request', None, request) return # then the action @@ -463,7 +445,7 @@ raise KeyError except KeyError: log.warning(u"Received jingle request has no action") - self.sendError('bad-request', None, request, profile=profile) + self.sendError(client, 'bad-request', None, request) return peer_jid = jid.JID(request['from']) @@ -472,9 +454,18 @@ try: session = client.jingle_sessions[sid] except KeyError: - if action != XEP_0166.A_SESSION_INITIATE: - log.warning(u"Received request for an unknown session id: {}".format(sid)) - self.sendError('item-not-found', None, request, 'unknown-session', profile=profile) + if action == XEP_0166.A_SESSION_INITIATE: + pass + elif action == XEP_0166.A_SESSION_TERMINATE: + log.debug(u"ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format( + request_id=sid, + profile = client.profile)) + return + else: + log.warning(u"Received request for an unknown session id: {request_id} [{profile}]".format( + request_id=sid, + profile = client.profile)) + self.sendError(client, 'item-not-found', None, request, 'unknown-session') return session = client.jingle_sessions[sid] = {'id': sid, @@ -487,11 +478,11 @@ else: if session['peer_jid'] != peer_jid: log.warning(u"sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format(sid)) - self.sendError('service-unavailable', sid, request, profile=profile) + self.sendError(client, 'service-unavailable', sid, request) return if session['id'] != sid: log.error(u"session id doesn't match") - self.sendError('service-unavailable', sid, request, profile=profile) + self.sendError(client, 'service-unavailable', sid, request) raise exceptions.InternalError if action == XEP_0166.A_SESSION_INITIATE: @@ -539,7 +530,7 @@ if new: # the content must not exist, we check it if not name or name in contents_dict: - self.sendError('bad-request', session['id'], request, profile=client.profile) + self.sendError(client, 'bad-request', session['id'], request) raise exceptions.CancelError content_data = contents_dict[name] = {'creator': creator, 'senders': content_elt.attributes.get('senders', 'both'), @@ -550,28 +541,28 @@ content_data = contents_dict[name] except KeyError: log.warning(u"Other peer try to access an unknown content") - self.sendError('bad-request', session['id'], request, profile=client.profile) + self.sendError(client, 'bad-request', session['id'], request) raise exceptions.CancelError # application if with_application: desc_elt = content_elt.description if not desc_elt: - self.sendError('bad-request', session['id'], request, profile=client.profile) + self.sendError(client, 'bad-request', session['id'], request) raise exceptions.CancelError if new: # the content is new, we need to check and link the application app_ns = desc_elt.uri if not app_ns or app_ns == NS_JINGLE: - self.sendError('bad-request', session['id'], request, profile=client.profile) + self.sendError(client, 'bad-request', session['id'], request) raise exceptions.CancelError try: application = self._applications[app_ns] except KeyError: log.warning(u"Unmanaged application namespace [{}]".format(app_ns)) - self.sendError('service-unavailable', session['id'], request, profile=client.profile) + self.sendError(client, 'service-unavailable', session['id'], request) raise exceptions.CancelError content_data['application'] = application @@ -587,14 +578,14 @@ if with_transport: transport_elt = content_elt.transport if not transport_elt: - self.sendError('bad-request', session['id'], request, profile=client.profile) + self.sendError(client, 'bad-request', session['id'], request) raise exceptions.CancelError if new: # the content is new, we need to check and link the transport transport_ns = transport_elt.uri if not app_ns or app_ns == NS_JINGLE: - self.sendError('bad-request', session['id'], request, profile=client.profile) + self.sendError(client, 'bad-request', session['id'], request) raise exceptions.CancelError try: @@ -610,16 +601,17 @@ content_data['transport_elt'] = transport_elt - def _ignore(self, action, session, content_name, elt, profile): + def _ignore(self, client, action, session, content_name, elt): """Dummy method used when not exception must be raised if a method is not implemented in _callPlugins must be used as app_default_cb and/or transp_default_cb """ return elt - def _callPlugins(self, action, session, app_method_name='jingleHandler', transp_method_name='jingleHandler', + def _callPlugins(self, client, action, session, app_method_name='jingleHandler', + transp_method_name='jingleHandler', app_default_cb=None, transp_default_cb=None, delete=True, - elements=True, force_element=None, profile=C.PROF_KEY_NONE): + elements=True, force_element=None): """Call application and transport plugin methods for all contents @param action(unicode): jingle action name @@ -635,10 +627,10 @@ @param delete(bool): if True, remove desc_elt and transport_elt from session ignored if elements is False @param elements(bool): True if elements(desc_elt and tranport_elt) must be managed - must be True if _callPlugins is used in a request, and False if it used after a request (i.e. on <iq> result or error) + must be True if _callPlugins is used in a request, and False if it used after a request + (i.e. on <iq> result or error) @param force_element(None, domish.Element, object): if elements is False, it is used as element parameter else it is ignored - @param profile(unicode): %(doc_profile)s @return (list[defer.Deferred]): list of launched Deferred @raise exceptions.NotFound: method is not implemented """ @@ -663,7 +655,7 @@ elt = content_data.pop(elt_name) if delete else content_data[elt_name] else: elt = force_element - d = defer.maybeDeferred(method, action, session, content_name, elt, profile) + d = defer.maybeDeferred(method, client, action, session, content_name, elt) defers_list.append(d) return defers_list @@ -691,7 +683,7 @@ if not contents_dict: # there MUST be at least one content - self.sendError('bad-request', session['id'], request, profile=client.profile) + self.sendError(client, 'bad-request', session['id'], request) return # at this point we can send the <iq/> result to confirm reception of the request @@ -699,7 +691,7 @@ # we now request each application plugin confirmation # and if all are accepted, we can accept the session - confirm_defers = self._callPlugins(XEP_0166.A_SESSION_INITIATE, session, 'jingleRequestConfirmation', None, self.jingleRequestConfirmationDefault, delete=False, profile=client.profile) + confirm_defers = self._callPlugins(client, XEP_0166.A_SESSION_INITIATE, session, 'jingleRequestConfirmation', None, self.jingleRequestConfirmationDefault, delete=False) confirm_dlist = defer.gatherResults(confirm_defers) confirm_dlist.addCallback(self._confirmationCb, session, jingle_elt, client) @@ -716,7 +708,7 @@ """ confirmed = all(confirm_results) if not confirmed: - return self.terminate(XEP_0166.REASON_DECLINE, session, client.profile) + return self.terminate(client, XEP_0166.REASON_DECLINE, session) iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_SESSION_ACCEPT) jingle_elt['responder'] = client.jid.full() @@ -736,27 +728,27 @@ application = content_data['application'] app_session_accept_cb = application.handler.jingleHandler - app_d = defer.maybeDeferred(app_session_accept_cb, - XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('desc_elt'), client.profile) + app_d = defer.maybeDeferred(app_session_accept_cb, client, + XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('desc_elt')) app_d.addCallback(addElement, content_elt) defers_list.append(app_d) transport = content_data['transport'] transport_session_accept_cb = transport.handler.jingleHandler - transport_d = defer.maybeDeferred(transport_session_accept_cb, - XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('transport_elt'), client.profile) + transport_d = defer.maybeDeferred(transport_session_accept_cb, client, + XEP_0166.A_SESSION_INITIATE, session, content_name, content_data.pop('transport_elt')) transport_d.addCallback(addElement, content_elt) defers_list.append(transport_d) d_list = defer.DeferredList(defers_list) - d_list.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False, profile=client.profile)) + d_list.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_PREPARE_RESPONDER, session, app_method_name=None, elements=False)) d_list.addCallback(lambda dummy: iq_elt.send()) def changeState(dummy, session): session['state'] = STATE_ACTIVE d_list.addCallback(changeState, session) - d_list.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_ACCEPTED_ACK, session, elements=False, profile=client.profile)) + d_list.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_ACCEPTED_ACK, session, elements=False)) d_list.addErrback(self._iqError, session['id'], client) return d_list @@ -769,7 +761,7 @@ log.warning(u"Not reason given for session termination") reason_elt = jingle_elt.addElement('reason') - terminate_defers = self._callPlugins(XEP_0166.A_SESSION_TERMINATE, session, 'jingleTerminate', 'jingleTerminate', self._ignore, self._ignore, elements=False, force_element=reason_elt, profile=client.profile) + terminate_defers = self._callPlugins(client, XEP_0166.A_SESSION_TERMINATE, session, 'jingleTerminate', 'jingleTerminate', self._ignore, self._ignore, elements=False, force_element=reason_elt) terminate_dlist = defer.DeferredList(terminate_defers) terminate_dlist.addCallback(lambda dummy: self._delSession(client, session['id'])) @@ -797,12 +789,12 @@ session['state'] = STATE_ACTIVE negociate_defers = [] - negociate_defers = self._callPlugins(XEP_0166.A_SESSION_ACCEPT, session, profile=client.profile) + negociate_defers = self._callPlugins(client, XEP_0166.A_SESSION_ACCEPT, session) negociate_dlist = defer.DeferredList(negociate_defers) # after negociations we start the transfer - negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile)) + negociate_dlist.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_START, session, app_method_name=None, elements=False)) def _onSessionCb(self, result, client, request, jingle_elt, session): client.send(xmlstream.toResponse(request, 'result')) @@ -810,7 +802,7 @@ def _onSessionEb(self, failure_, client, request, jingle_elt, session): log.error(u"Error while handling onSessionInfo: {}".format(failure_.value)) # XXX: only error managed so far, maybe some applications/transports need more - self.sendError('feature-not-implemented', None, request, 'unsupported-info', client.profile) + self.sendError(client, 'feature-not-implemented', None, request, 'unsupported-info') def onSessionInfo(self, client, request, jingle_elt, session): """Method called when a session-info action is received from other peer @@ -829,8 +821,8 @@ try: # XXX: session-info is most likely only used for application, so we don't call transport plugins # if a future transport use it, this behaviour must be adapted - defers = self._callPlugins(XEP_0166.A_SESSION_INFO, session, 'jingleSessionInfo', None, - elements=False, force_element=jingle_elt, profile=client.profile) + defers = self._callPlugins(client, XEP_0166.A_SESSION_INFO, session, 'jingleSessionInfo', None, + elements=False, force_element=jingle_elt) except exceptions.NotFound as e: self._onSessionEb(failure.Failure(e), client, request, jingle_elt, session) return @@ -888,7 +880,7 @@ iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT) for content_name, content_data, transport, transport_elt in to_replace: # we can now actually replace the transport - yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, client.profile) + yield content_data['transport'].handler.jingleHandler(client, XEP_0166.A_DESTROY, session, content_name, None) content_data['transport'] = transport content_data['transport_data'].clear() # and build the element @@ -896,10 +888,10 @@ content_elt['name'] = content_name content_elt['creator'] = content_data['creator'] # we notify the transport and insert its <transport/> in the answer - accept_transport_elt = yield transport.handler.jingleHandler(XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt, client.profile) + accept_transport_elt = yield transport.handler.jingleHandler(client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt) content_elt.addChild(accept_transport_elt) # there is no confirmation needed here, so we can directly prepare it - yield transport.handler.jingleHandler(XEP_0166.A_PREPARE_RESPONDER, session, content_name, None, client.profile) + yield transport.handler.jingleHandler(client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None) iq_elt.send() @@ -922,12 +914,12 @@ client.send(xmlstream.toResponse(request, 'result')) negociate_defers = [] - negociate_defers = self._callPlugins(XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None, profile=client.profile) + negociate_defers = self._callPlugins(client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None) negociate_dlist = defer.DeferredList(negociate_defers) # after negociations we start the transfer - negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile)) + negociate_dlist.addCallback(lambda dummy: self._callPlugins(client, XEP_0166.A_START, session, app_method_name=None, elements=False)) def onTransportReject(self, client, request, jingle_elt, session): """Method called when a transport replacement is refused @@ -939,7 +931,7 @@ """ # XXX: for now, we terminate the session in case of transport-reject # this behaviour may change in the future - self.terminate('failed-transport', session, client.profile) + self.terminate(client, 'failed-transport', session) def onTransportInfo(self, client, request, jingle_elt, session): """Method called when a transport-info action is received from other peer @@ -966,7 +958,7 @@ except KeyError: continue else: - content_data['transport'].handler.jingleHandler(XEP_0166.A_TRANSPORT_INFO, session, content_name, transport_elt, client.profile) + content_data['transport'].handler.jingleHandler(client, XEP_0166.A_TRANSPORT_INFO, session, content_name, transport_elt) class XEP_0166_handler(xmlstream.XMPPHandler): @@ -976,7 +968,7 @@ self.plugin_parent = plugin_parent def connectionInitialized(self): - self.xmlstream.addObserver(JINGLE_REQUEST, self.plugin_parent._onJingleRequest, profile=self.parent.profile) + self.xmlstream.addObserver(JINGLE_REQUEST, self.plugin_parent._onJingleRequest, client=self.parent) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_JINGLE)]
--- a/src/plugins/plugin_xep_0234.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0234.py Thu Feb 08 00:37:42 2018 +0100 @@ -25,6 +25,7 @@ from wokkel import disco, iwokkel from zope.interface import implements from sat.tools import utils +from sat.tools import stream import os.path from twisted.words.xish import domish from twisted.words.protocols.jabber import jid @@ -76,33 +77,33 @@ return u'{}_{}'.format(session['id'], content_name) def _fileJingleSend(self, peer_jid, filepath, name="", file_desc="", profile=C.PROF_KEY_NONE): - return self.fileJingleSend(jid.JID(peer_jid), filepath, name or None, file_desc or None, profile) + client = self.host.getClient(profile) + return self.fileJingleSend(client, jid.JID(peer_jid), filepath, name or None, file_desc or None) - def fileJingleSend(self, peer_jid, filepath, name, file_desc=None, profile=C.PROF_KEY_NONE): + def fileJingleSend(self, client, peer_jid, filepath, name, file_desc=None): """Send a file using jingle file transfer @param peer_jid(jid.JID): destinee jid @param filepath(str): absolute path of the file @param name(unicode, None): name of the file @param file_desc(unicode, None): description of the file - @param profile: %(doc_profile)s @return (D(unicode)): progress id """ progress_id_d = defer.Deferred() - self._j.initiate(peer_jid, + self._j.initiate(client, + peer_jid, [{'app_ns': NS_JINGLE_FT, 'senders': self._j.ROLE_INITIATOR, 'app_kwargs': {'filepath': filepath, 'name': name, 'file_desc': file_desc, 'progress_id_d': progress_id_d}, - }], - profile=profile) + }]) return progress_id_d # jingle callbacks - def jingleSessionInit(self, session, content_name, filepath, name, file_desc, progress_id_d, profile=C.PROF_KEY_NONE): + def jingleSessionInit(self, client, session, content_name, filepath, name, file_desc, progress_id_d): progress_id_d.callback(self._getProgressId(session, content_name)) content_data = session['contents'][content_name] application_data = content_data['application_data'] @@ -122,7 +123,7 @@ file_elt.addChild(self._hash.buildHashElt()) return desc_elt - def jingleRequestConfirmation(self, action, session, content_name, desc_elt, profile): + def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): """This method request confirmation for a jingle session""" content_data = session['contents'][content_name] if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): @@ -166,15 +167,15 @@ def gotConfirmation(confirmed): if confirmed: finished_d = content_data['finished_d'] = defer.Deferred() - args = [session, content_name, content_data, profile] + args = [client, session, content_name, content_data] finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) return confirmed - d = self._f.getDestDir(session['peer_jid'], content_data, file_data, profile) + d = self._f.getDestDir(client, session['peer_jid'], content_data, file_data, stream_object=True) d.addCallback(gotConfirmation) return d - def jingleHandler(self, action, session, content_name, desc_elt, profile): + def jingleHandler(self, client, action, session, content_name, desc_elt): content_data = session['contents'][content_name] application_data = content_data['application_data'] if action in (self._j.A_ACCEPTED_ACK,): @@ -188,27 +189,28 @@ log.debug("adding <range> element") file_elt.addElement('range') elif action == self._j.A_SESSION_ACCEPT: - assert not 'file_obj' in content_data + assert not 'stream_object' in content_data file_data = application_data['file_data'] file_path = application_data['file_path'] size = file_data['size'] # XXX: hash security is not critical here, so we just take the higher mandatory one hasher = file_data['hash_hasher'] = self._hash.getHasher('sha-256') - content_data['file_obj'] = self._f.File(self.host, - file_path, - uid=self._getProgressId(session, content_name), - size=size, - data_cb=lambda data: hasher.update(data), - profile=profile - ) + content_data['stream_object'] = stream.FileStreamObject( + self.host, + client, + file_path, + uid=self._getProgressId(session, content_name), + size=size, + data_cb=lambda data: hasher.update(data), + ) finished_d = content_data['finished_d'] = defer.Deferred() - args = [session, content_name, content_data, profile] + args = [client, session, content_name, content_data] finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) else: log.warning(u"FIXME: unmanaged action {}".format(action)) return desc_elt - def jingleSessionInfo(self, action, session, content_name, jingle_elt, profile): + def jingleSessionInfo(self, client, action, session, content_name, jingle_elt): """Called on session-info action manage checksum, and ignore <received/> element @@ -240,17 +242,17 @@ log.warning(u"Hash algorithm used in given hash ({peer_algo}) doesn't correspond to the one we have used ({our_algo})" .format(peer_algo=algo, our_algo=file_data.get('hash_algo'))) else: - self._receiverTryTerminate(session, content_name, content_data, profile=profile) + self._receiverTryTerminate(client, session, content_name, content_data) else: raise NotImplementedError - def _sendCheckSum(self, session, content_name, content_data, profile): + def _sendCheckSum(self, client, session, content_name, content_data): """Send the session-info with the hash checksum""" file_data = content_data['application_data']['file_data'] hasher = file_data['hash_hasher'] hash_ = hasher.hexdigest() log.debug(u"Calculated hash: {}".format(hash_)) - iq_elt, jingle_elt = self._j.buildSessionInfo(session, profile) + iq_elt, jingle_elt = self._j.buildSessionInfo(client, session) checksum_elt = jingle_elt.addElement((NS_JINGLE_FT, 'checksum')) checksum_elt['creator'] = content_data['creator'] checksum_elt['name'] = content_name @@ -258,7 +260,7 @@ file_elt.addChild(self._hash.buildHashElt(hash_)) iq_elt.send() - def _receiverTryTerminate(self, session, content_name, content_data, last_try=False, profile=C.PROF_KEY_NONE): + def _receiverTryTerminate(self, client, session, content_name, content_data, last_try=False): """Try to terminate the session This method must only be used by the receiver. @@ -274,8 +276,8 @@ if hash_given is None: if last_try: log.warning(u"sender didn't sent hash checksum, we can't check the file") - self._j.delayedContentTerminate(session, content_name, profile=profile) - content_data['file_obj'].close() + self._j.delayedContentTerminate(client, session, content_name) + content_data['stream_object'].close() return True return False hasher = file_data['hash_hasher'] @@ -296,8 +298,8 @@ given = hash_given, our = hash_) - self._j.delayedContentTerminate(session, content_name, profile=profile) - content_data['file_obj'].close(progress_metadata, error) + self._j.delayedContentTerminate(client, session, content_name) + content_data['stream_object'].close(progress_metadata, error) # we may have the last_try timer still active, so we try to cancel it try: content_data['last_try_timer'].cancel() @@ -305,25 +307,25 @@ pass return True - def _finishedCb(self, dummy, session, content_name, content_data, profile): + def _finishedCb(self, dummy, client, session, content_name, content_data): log.info(u"File transfer terminated") if content_data['senders'] != session['role']: # we terminate the session only if we are the receiver, # as recommanded in XEP-0234 §2 (after example 6) content_data['transfer_finished'] = True - if not self._receiverTryTerminate(session, content_name, content_data, profile=profile): + if not self._receiverTryTerminate(client, session, content_name, content_data): # we have not received the hash yet, we wait 5 more seconds content_data['last_try_timer'] = reactor.callLater( - 5, self._receiverTryTerminate, session, content_name, content_data, last_try=True, profile=profile) + 5, self._receiverTryTerminate, client, session, content_name, content_data, last_try=True) else: # we are the sender, we send the checksum - self._sendCheckSum(session, content_name, content_data, profile) - content_data['file_obj'].close() + self._sendCheckSum(client, session, content_name, content_data) + content_data['stream_object'].close() - def _finishedEb(self, failure, session, content_name, content_data, profile): + def _finishedEb(self, failure, client, session, content_name, content_data): log.warning(u"Error while streaming file: {}".format(failure)) - content_data['file_obj'].close() - self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile) + content_data['stream_object'].close() + self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT) class XEP_0234_handler(XMPPHandler):
--- a/src/plugins/plugin_xep_0260.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0260.py Thu Feb 08 00:37:42 2018 +0100 @@ -127,44 +127,42 @@ return transport_elt @defer.inlineCallbacks - def jingleSessionInit(self, session, content_name, profile): - client = self.host.getClient(profile) + def jingleSessionInit(self, client, session, content_name): content_data = session['contents'][content_name] transport_data = content_data['transport_data'] sid = transport_data['sid'] = unicode(uuid.uuid4()) session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates - transport_data['stream_d'] = self._s5b.registerHash(session_hash, None, profile) - candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) + transport_data['stream_d'] = self._s5b.registerHash(client, session_hash, None) + candidates = transport_data['candidates'] = yield self._s5b.getCandidates(client) mode = 'tcp' # XXX: we only manage tcp for now transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) defer.returnValue(transport_elt) - def _proxyActivatedCb(self, iq_result_elt, candidate, session, content_name, profile): + def _proxyActivatedCb(self, iq_result_elt, client, candidate, session, content_name): """Called when activation confirmation has been received from proxy cf XEP-0260 § 2.4 """ # now that the proxy is activated, we have to inform other peer - iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) + iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) activated_elt = transport_elt.addElement('activated') activated_elt['cid'] = candidate.id iq_elt.send() - def _proxyActivatedEb(self, stanza_error, candidate, session, content_name, profile): + def _proxyActivatedEb(self, stanza_error, client, candidate, session, content_name): """Called when activation error has been received from proxy cf XEP-0260 § 2.4 """ # TODO: fallback to IBB # now that the proxy is activated, we have to inform other peer - iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, profile) + iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) transport_elt.addElement('proxy-error') iq_elt.send() log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}" .format(reason = stanza_error.value.condition)) - client = self.host.getClient(profile) self.doFallback(session, content_name, client) def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): @@ -185,7 +183,7 @@ continue c.discard() del transport_data['peer_candidates'] - iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) + iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) if candidate is None: log.warning(u"Can't connect to any peer candidate") candidate_elt = transport_elt.addElement('candidate-error') @@ -254,12 +252,12 @@ del transport_data['peer_best_candidate'] if choosed_candidate.type == self._s5b.TYPE_PROXY: - # the file transfer need to wait for proxy activation + # the stream transfer need to wait for proxy activation # (see XEP-0260 § 2.4) if our_candidate: - d = self._s5b.connectCandidate(choosed_candidate, transport_data['session_hash'], profile=client.profile) + d = self._s5b.connectCandidate(client, choosed_candidate, transport_data['session_hash']) d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client)) - args = [choosed_candidate, session, content_name, client.profile] + args = [client, choosed_candidate, session, content_name] d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args) else: # this Deferred will be called when we'll receive activation confirmation from other peer @@ -268,7 +266,7 @@ d = defer.succeed(None) if content_data['senders'] == session['role']: - # we can now start the file transfer (or start it after proxy activation) + # we can now start the stream transfer (or start it after proxy activation) d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash'])) d.addErrback(self._startEb, session, content_name, client) @@ -342,8 +340,7 @@ activation_d.errback(ProxyError()) @defer.inlineCallbacks - def jingleHandler(self, action, session, content_name, transport_elt, profile): - client = self.host.getClient(profile) + def jingleHandler(self, client, action, session, content_name, transport_elt): content_data = session['contents'][content_name] transport_data = content_data['transport_data'] @@ -358,12 +355,12 @@ elif action == self._j.A_START: session_hash = transport_data['session_hash'] peer_candidates = transport_data['peer_candidates'] - file_obj = content_data['file_obj'] - self._s5b.associateFileObj(session_hash, file_obj, profile) + stream_object = content_data['stream_object'] + self._s5b.associateStreamObject(client, session_hash, stream_object) stream_d = transport_data.pop('stream_d') stream_d.chainDeferred(content_data['finished_d']) peer_session_hash = transport_data['peer_session_hash'] - d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile) + d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash) d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) elif action == self._j.A_SESSION_INITIATE: @@ -374,12 +371,12 @@ session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) - file_obj = content_data['file_obj'] - stream_d = self._s5b.registerHash(session_hash, file_obj, profile) + stream_object = content_data['stream_object'] + stream_d = self._s5b.registerHash(client, session_hash, stream_object) stream_d.chainDeferred(content_data['finished_d']) - d = self._s5b.getBestCandidate(peer_candidates, session_hash, peer_session_hash, profile) + d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash) d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) - candidates = yield self._s5b.getCandidates(profile) + candidates = yield self._s5b.getCandidates(client) # we remove duplicate candidates candidates = [candidate for candidate in candidates if candidate not in peer_candidates] @@ -413,11 +410,10 @@ defer.returnValue(transport_elt) - def jingleTerminate(self, action, session, content_name, reason_elt, profile): + def jingleTerminate(self, client, action, session, content_name, reason_elt): if reason_elt.decline: log.debug(u"Session declined, deleting S5B session") # we just need to clean the S5B session if it is declined - client = self.host.getClient(profile) content_data = session['contents'][content_name] transport_data = content_data['transport_data'] self._s5b.killSession(None, transport_data['session_hash'], None, client) @@ -429,9 +425,9 @@ """ if not feature_checked: log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session") - self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile) + self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) else: - self._j.transportReplace(self._jingle_ibb.NAMESPACE, session, content_name, client.profile) + self._j.transportReplace(client, self._jingle_ibb.NAMESPACE, session, content_name) def doFallback(self, session, content_name, client): """Fallback to IBB transport, used in last resort @@ -445,7 +441,7 @@ return if self._jingle_ibb is None: log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session") - self._j.terminate(self._j.REASON_CONNECTIVITY_ERROR, session, client.profile) + self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) else: d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid']) d.addCallback(self._doFallback, session, content_name, client)
--- a/src/plugins/plugin_xep_0261.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0261.py Thu Feb 08 00:37:42 2018 +0100 @@ -59,7 +59,7 @@ def getHandler(self, client): return XEP_0261_handler() - def jingleSessionInit(self, session, content_name, profile): + def jingleSessionInit(self, client, session, content_name): transport_elt = domish.Element((NS_JINGLE_IBB, "transport")) content_data = session['contents'][content_name] transport_data = content_data['transport_data'] @@ -68,7 +68,7 @@ transport_elt['sid'] = transport_data['sid'] = unicode(uuid.uuid4()) return transport_elt - def jingleHandler(self, action, session, content_name, transport_elt, profile): + def jingleHandler(self, client, action, session, content_name, transport_elt): content_data = session['contents'][content_name] transport_data = content_data['transport_data'] if action in (self._j.A_SESSION_ACCEPT, @@ -80,13 +80,13 @@ elif action in (self._j.A_START, self._j.A_PREPARE_RESPONDER): peer_jid = session['peer_jid'] sid = transport_data['sid'] - file_obj = content_data['file_obj'] + stream_object = content_data['stream_object'] if action == self._j.A_START: block_size = transport_data['block_size'] - d = self._ibb.startStream(file_obj, peer_jid, sid, block_size, profile) + d = self._ibb.startStream(client, stream_object, peer_jid, sid, block_size) d.chainDeferred(content_data['finished_d']) else: - d = self._ibb.createSession(file_obj, peer_jid, sid, profile) + d = self._ibb.createSession(client, stream_object, peer_jid, sid) d.chainDeferred(content_data['finished_d']) else: log.warning(u"FIXME: unmanaged action {}".format(action))
--- a/src/plugins/plugin_xep_0363.py Thu Feb 01 07:24:34 2018 +0100 +++ b/src/plugins/plugin_xep_0363.py Thu Feb 08 00:37:42 2018 +0100 @@ -169,7 +169,7 @@ @return (tuple """ log.debug(u"Got upload slot: {}".format(slot)) - sat_file = self.host.plugins['FILE'].File(self.host, path, size=size, auto_end_signals=False, profile=client.profile) + sat_file = self.host.plugins['FILE'].File(self.host, client, path, size=size, auto_end_signals=False) progress_id_d.callback(sat_file.uid) file_producer = http_client.FileBodyProducer(sat_file) if ignore_tls_errors:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/tools/stream.py Thu Feb 08 00:37:42 2018 +0100 @@ -0,0 +1,200 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" interfaces """ + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.log import getLogger +from twisted.protocols import basic +from twisted.internet import interfaces +from zope import interface +import uuid +import os + +log = getLogger(__name__) + + +class IStreamProducer(interface.Interface): + + def startStream(consumer): + """start producing the stream + + @return (D): deferred fired when stream is finished + """ + + +class SatFile(object): + """A file-like object to have high level files manipulation""" + # TODO: manage "with" statement + + def __init__(self, host, client, path, mode='rb', uid=None, size=None, data_cb=None, auto_end_signals=True): + """ + @param host: %(doc_host)s + @param path(str): path of the file to get + @param mode(str): same as for built-in "open" function + @param uid(unicode, None): unique id identifing this progressing element + This uid will be used with self.host.progressGet + will be automaticaly generated if None + @param size(None, int): size of the file (when known in advance) + @param data_cb(None, callable): method to call on each data read/write + mainly useful to do things like calculating hash + @param auto_end_signals(bool): if True, progressFinished and progressError signals are automatically sent + if False, you'll have to call self.progressFinished and self.progressError yourself + progressStarted signal is always sent automatically + """ + self.host = host + self.profile = client.profile + self.uid = uid or unicode(uuid.uuid4()) + self._file = open(path, mode) + self.size = size + self.data_cb = data_cb + self.auto_end_signals = auto_end_signals + metadata = self.getProgressMetadata() + self.host.registerProgressCb(self.uid, self.getProgress, metadata, profile=client.profile) + self.host.bridge.progressStarted(self.uid, metadata, client.profile) + + def checkSize(self): + """Check that current size correspond to given size + + must be used when the transfer is supposed to be finished + @return (bool): True if the position is the same as given size + @raise exceptions.NotFound: size has not be specified + """ + position = self._file.tell() + if self.size is None: + raise exceptions.NotFound + return position == self.size + + def close(self, progress_metadata=None, error=None): + """Close the current file + + @param progress_metadata(None, dict): metadata to send with _onProgressFinished message + @param error(None, unicode): set to an error message if progress was not successful + mutually exclusive with progress_metadata + error can happen even if error is None, if current size differ from given size + """ + if self._file.closed: + return # avoid double close (which is allowed) error + if error is None: + try: + size_ok = self.checkSize() + except exceptions.NotFound: + size_ok = True + if not size_ok: + error = u'declared and actual size mismatch' + log.warning(error) + progress_metadata = None + + self._file.close() + + if self.auto_end_signals: + if error is None: + self.progressFinished(progress_metadata) + else: + assert progress_metadata is None + self.progressError(error) + + self.host.removeProgressCb(self.uid, self.profile) + + def progressFinished(self, metadata=None): + if metadata is None: + metadata = {} + self.host.bridge.progressFinished(self.uid, metadata, self.profile) + + def progressError(self, error): + self.host.bridge.progressError(self.uid, error, self.profile) + + def flush(self): + self._file.flush() + + def write(self, buf): + self._file.write(buf) + if self.data_cb is not None: + return self.data_cb(buf) + + def read(self, size=-1): + read = self._file.read(size) + if self.data_cb is not None and read: + self.data_cb(read) + return read + + def seek(self, offset, whence=os.SEEK_SET): + self._file.seek(offset, whence) + + def tell(self): + return self._file.tell() + + def mode(self): + return self._file.mode() + + def getProgressMetadata(self): + """Return progression metadata as given to progressStarted + + @return (dict): metadata (check bridge for documentation) + """ + metadata = {'type': C.META_TYPE_FILE} + + mode = self._file.mode + if '+' in mode: + pass # we have no direction in read/write modes + elif mode in ('r', 'rb'): + metadata['direction'] = 'out' + elif mode in ('w', 'wb'): + metadata['direction'] = 'in' + elif 'U' in mode: + metadata['direction'] = 'out' + else: + raise exceptions.InternalError + + metadata['name'] = self._file.name + + return metadata + + def getProgress(self, progress_id, profile): + ret = {'position': self._file.tell()} + if self.size: + ret['size'] = self.size + return ret + + +@interface.implementer(IStreamProducer) +@interface.implementer(interfaces.IConsumer) +class FileStreamObject(basic.FileSender): + + def __init__(self, host, client, path, **kwargs): + """ + + A SatFile will be created and put in self.file_obj + @param path(unicode): path to the file + @param **kwargs: kw arguments to pass to SatFile + """ + self.file_obj = SatFile(host, client, path, **kwargs) + + def registerProducer(self, producer, streaming): + pass + + def startStream(self, consumer): + return self.beginFileTransfer(self.file_obj, consumer) + + def write(self, data): + self.file_obj.write(data) + + def close(self, *args, **kwargs): + self.file_obj.close(*args, **kwargs)