Mercurial > libervia-backend
diff frontends/src/jp/cmd_pipe.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children |
line wrap: on
line diff
--- a/frontends/src/jp/cmd_pipe.py Thu Feb 01 07:24:34 2018 +0100 +++ b/frontends/src/jp/cmd_pipe.py Thu Feb 08 00:37:42 2018 +0100 @@ -19,18 +19,19 @@ from sat_frontends.jp import base -import tempfile +from sat_frontends.jp.constants import Const as C import sys -import os -import os.path -import shutil from sat.core.i18n import _ from sat_frontends.tools import jid import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI - +from functools import partial +import socket +import SocketServer +import errno __commands__ = ["Pipe"] +START_PORT = 9999 class PipeOut(base.CommandBase): @@ -41,31 +42,64 @@ def add_parser_options(self): self.parser.add_argument("jid", type=base.unicode_decoder, help=_("the destination jid")) + def streamOutCb(self, port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('127.0.0.1', int(port))) + while True: + buf = sys.stdin.read(4096) + if not buf: + break + try: + s.sendall(buf) + except socket.error as e: + if e.errno == errno.EPIPE: + sys.stderr.write(str(e) + '\n') + self.host.quit(1) + else: + raise e + self.host.quit() + def start(self): """ Create named pipe, and send stdin to it """ - # TODO: check_jids - tmp_dir = tempfile.mkdtemp() - fifopath = os.path.join(tmp_dir,"pipe_out") - os.mkfifo(fifopath) - self.host.bridge.pipeOut(self.host.get_full_jid(self.args.jid), fifopath, self.profile) - with open(fifopath, 'w') as f: - shutil.copyfileobj(sys.stdin, f) - shutil.rmtree(tmp_dir) - self.host.quit() + self.host.bridge.streamOut( + self.host.get_full_jid(self.args.jid), + self.profile, + callback=self.streamOutCb, + errback=partial(self.errback, + msg=_(u"can't start stream: {}"), + exit_code=C.EXIT_BRIDGE_ERRBACK)) + + +class StreamServer(SocketServer.BaseRequestHandler): + + def handle(self): + while True: + data = self.request.recv(4096) + if not data: + break + sys.stdout.write(data) + try: + sys.stdout.flush() + except IOError as e: + sys.stderr.write(str(e) + '\n') + break + # calling shutdown will do a deadlock as we don't use separate thread + # this is a workaround (cf. https://stackoverflow.com/a/36017741) + self.server._BaseServer__shutdown_request = True class PipeIn(base.CommandAnswering): def __init__(self, host): super(PipeIn, self).__init__(host, 'in', help=_('receive a pipe stream')) - self.action_callbacks = {"PIPE": self.onPipeAction} + self.action_callbacks = {"STREAM": self.onStreamAction} def add_parser_options(self): self.parser.add_argument("jids", type=base.unicode_decoder, nargs="*", help=_('Jids accepted (none means "accept everything")')) def getXmluiId(self, action_data): # FIXME: we temporarily use ElementTree, but a real XMLUI managing module - # should be available in the futur + # should be available in the future # TODO: XMLUI module try: xml_ui = action_data['xmlui'] @@ -78,7 +112,7 @@ self.disp(_(u"Invalid XMLUI received"), error=True) return xmlui_id - def onPipeAction(self, action_data, action_id, security_limit, profile): + def onStreamAction(self, action_data, action_id, security_limit, profile): xmlui_id = self.getXmluiId(action_data) if xmlui_id is None: return self.host.quitFromSignal(1) @@ -89,16 +123,22 @@ return if not self.bare_jids or from_jid.bare in self.bare_jids: - tmp_dir = tempfile.mkdtemp() - fifopath = os.path.join(tmp_dir,"pipe_in") - os.mkfifo(fifopath) - xmlui_data = {'path': fifopath} + host, port = "localhost", START_PORT + while True: + try: + server = SocketServer.TCPServer((host, port), StreamServer) + except socket.error as e: + if e.errno == errno.EADDRINUSE: + port += 1 + else: + raise e + else: + break + xmlui_data = {'answer': C.BOOL_TRUE, + 'port': unicode(port)} self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile) - - with open(fifopath, 'r') as f: - shutil.copyfileobj(f, sys.stdout) - shutil.rmtree(tmp_dir) - self.host.quit() + server.serve_forever() + self.host.quitFromSignal() def start(self): self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]