comparison frontends/src/jp/cmd_pipe.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat_frontends.jp import base 20 from sat_frontends.jp import base
21 21
22 import tempfile 22 from sat_frontends.jp.constants import Const as C
23 import sys 23 import sys
24 import os
25 import os.path
26 import shutil
27 from sat.core.i18n import _ 24 from sat.core.i18n import _
28 from sat_frontends.tools import jid 25 from sat_frontends.tools import jid
29 import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI 26 import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI
30 27 from functools import partial
28 import socket
29 import SocketServer
30 import errno
31 31
32 __commands__ = ["Pipe"] 32 __commands__ = ["Pipe"]
33 33
34 START_PORT = 9999
34 35
35 class PipeOut(base.CommandBase): 36 class PipeOut(base.CommandBase):
36 37
37 def __init__(self, host): 38 def __init__(self, host):
38 super(PipeOut, self).__init__(host, 'out', help=_('send a pipe a stream')) 39 super(PipeOut, self).__init__(host, 'out', help=_('send a pipe a stream'))
39 self.need_loop = True 40 self.need_loop = True
40 41
41 def add_parser_options(self): 42 def add_parser_options(self):
42 self.parser.add_argument("jid", type=base.unicode_decoder, help=_("the destination jid")) 43 self.parser.add_argument("jid", type=base.unicode_decoder, help=_("the destination jid"))
43 44
45 def streamOutCb(self, port):
46 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
47 s.connect(('127.0.0.1', int(port)))
48 while True:
49 buf = sys.stdin.read(4096)
50 if not buf:
51 break
52 try:
53 s.sendall(buf)
54 except socket.error as e:
55 if e.errno == errno.EPIPE:
56 sys.stderr.write(str(e) + '\n')
57 self.host.quit(1)
58 else:
59 raise e
60 self.host.quit()
61
44 def start(self): 62 def start(self):
45 """ Create named pipe, and send stdin to it """ 63 """ Create named pipe, and send stdin to it """
46 # TODO: check_jids 64 self.host.bridge.streamOut(
47 tmp_dir = tempfile.mkdtemp() 65 self.host.get_full_jid(self.args.jid),
48 fifopath = os.path.join(tmp_dir,"pipe_out") 66 self.profile,
49 os.mkfifo(fifopath) 67 callback=self.streamOutCb,
50 self.host.bridge.pipeOut(self.host.get_full_jid(self.args.jid), fifopath, self.profile) 68 errback=partial(self.errback,
51 with open(fifopath, 'w') as f: 69 msg=_(u"can't start stream: {}"),
52 shutil.copyfileobj(sys.stdin, f) 70 exit_code=C.EXIT_BRIDGE_ERRBACK))
53 shutil.rmtree(tmp_dir) 71
54 self.host.quit() 72
73 class StreamServer(SocketServer.BaseRequestHandler):
74
75 def handle(self):
76 while True:
77 data = self.request.recv(4096)
78 if not data:
79 break
80 sys.stdout.write(data)
81 try:
82 sys.stdout.flush()
83 except IOError as e:
84 sys.stderr.write(str(e) + '\n')
85 break
86 # calling shutdown will do a deadlock as we don't use separate thread
87 # this is a workaround (cf. https://stackoverflow.com/a/36017741)
88 self.server._BaseServer__shutdown_request = True
55 89
56 90
57 class PipeIn(base.CommandAnswering): 91 class PipeIn(base.CommandAnswering):
58 92
59 def __init__(self, host): 93 def __init__(self, host):
60 super(PipeIn, self).__init__(host, 'in', help=_('receive a pipe stream')) 94 super(PipeIn, self).__init__(host, 'in', help=_('receive a pipe stream'))
61 self.action_callbacks = {"PIPE": self.onPipeAction} 95 self.action_callbacks = {"STREAM": self.onStreamAction}
62 96
63 def add_parser_options(self): 97 def add_parser_options(self):
64 self.parser.add_argument("jids", type=base.unicode_decoder, nargs="*", help=_('Jids accepted (none means "accept everything")')) 98 self.parser.add_argument("jids", type=base.unicode_decoder, nargs="*", help=_('Jids accepted (none means "accept everything")'))
65 99
66 def getXmluiId(self, action_data): 100 def getXmluiId(self, action_data):
67 # FIXME: we temporarily use ElementTree, but a real XMLUI managing module 101 # FIXME: we temporarily use ElementTree, but a real XMLUI managing module
68 # should be available in the futur 102 # should be available in the future
69 # TODO: XMLUI module 103 # TODO: XMLUI module
70 try: 104 try:
71 xml_ui = action_data['xmlui'] 105 xml_ui = action_data['xmlui']
72 except KeyError: 106 except KeyError:
73 self.disp(_(u"Action has no XMLUI"), 1) 107 self.disp(_(u"Action has no XMLUI"), 1)
76 xmlui_id = ui.get('submit') 110 xmlui_id = ui.get('submit')
77 if not xmlui_id: 111 if not xmlui_id:
78 self.disp(_(u"Invalid XMLUI received"), error=True) 112 self.disp(_(u"Invalid XMLUI received"), error=True)
79 return xmlui_id 113 return xmlui_id
80 114
81 def onPipeAction(self, action_data, action_id, security_limit, profile): 115 def onStreamAction(self, action_data, action_id, security_limit, profile):
82 xmlui_id = self.getXmluiId(action_data) 116 xmlui_id = self.getXmluiId(action_data)
83 if xmlui_id is None: 117 if xmlui_id is None:
84 return self.host.quitFromSignal(1) 118 return self.host.quitFromSignal(1)
85 try: 119 try:
86 from_jid = jid.JID(action_data['meta_from_jid']) 120 from_jid = jid.JID(action_data['meta_from_jid'])
87 except KeyError: 121 except KeyError:
88 self.disp(_(u"Ignoring action without from_jid data"), 1) 122 self.disp(_(u"Ignoring action without from_jid data"), 1)
89 return 123 return
90 124
91 if not self.bare_jids or from_jid.bare in self.bare_jids: 125 if not self.bare_jids or from_jid.bare in self.bare_jids:
92 tmp_dir = tempfile.mkdtemp() 126 host, port = "localhost", START_PORT
93 fifopath = os.path.join(tmp_dir,"pipe_in") 127 while True:
94 os.mkfifo(fifopath) 128 try:
95 xmlui_data = {'path': fifopath} 129 server = SocketServer.TCPServer((host, port), StreamServer)
130 except socket.error as e:
131 if e.errno == errno.EADDRINUSE:
132 port += 1
133 else:
134 raise e
135 else:
136 break
137 xmlui_data = {'answer': C.BOOL_TRUE,
138 'port': unicode(port)}
96 self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile) 139 self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile)
97 140 server.serve_forever()
98 with open(fifopath, 'r') as f: 141 self.host.quitFromSignal()
99 shutil.copyfileobj(f, sys.stdout)
100 shutil.rmtree(tmp_dir)
101 self.host.quit()
102 142
103 def start(self): 143 def start(self):
104 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] 144 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids]
105 145
106 146