Mercurial > libervia-backend
comparison frontends/src/jp/cmd_pipe.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | 0046283a285d |
children |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat_frontends.jp import base | 20 from sat_frontends.jp import base |
21 | 21 |
22 import tempfile | 22 from sat_frontends.jp.constants import Const as C |
23 import sys | 23 import sys |
24 import os | |
25 import os.path | |
26 import shutil | |
27 from sat.core.i18n import _ | 24 from sat.core.i18n import _ |
28 from sat_frontends.tools import jid | 25 from sat_frontends.tools import jid |
29 import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI | 26 import xml.etree.ElementTree as ET # FIXME: used temporarily to manage XMLUI |
30 | 27 from functools import partial |
28 import socket | |
29 import SocketServer | |
30 import errno | |
31 | 31 |
32 __commands__ = ["Pipe"] | 32 __commands__ = ["Pipe"] |
33 | 33 |
34 START_PORT = 9999 | |
34 | 35 |
35 class PipeOut(base.CommandBase): | 36 class PipeOut(base.CommandBase): |
36 | 37 |
37 def __init__(self, host): | 38 def __init__(self, host): |
38 super(PipeOut, self).__init__(host, 'out', help=_('send a pipe a stream')) | 39 super(PipeOut, self).__init__(host, 'out', help=_('send a pipe a stream')) |
39 self.need_loop = True | 40 self.need_loop = True |
40 | 41 |
41 def add_parser_options(self): | 42 def add_parser_options(self): |
42 self.parser.add_argument("jid", type=base.unicode_decoder, help=_("the destination jid")) | 43 self.parser.add_argument("jid", type=base.unicode_decoder, help=_("the destination jid")) |
43 | 44 |
45 def streamOutCb(self, port): | |
46 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
47 s.connect(('127.0.0.1', int(port))) | |
48 while True: | |
49 buf = sys.stdin.read(4096) | |
50 if not buf: | |
51 break | |
52 try: | |
53 s.sendall(buf) | |
54 except socket.error as e: | |
55 if e.errno == errno.EPIPE: | |
56 sys.stderr.write(str(e) + '\n') | |
57 self.host.quit(1) | |
58 else: | |
59 raise e | |
60 self.host.quit() | |
61 | |
44 def start(self): | 62 def start(self): |
45 """ Create named pipe, and send stdin to it """ | 63 """ Create named pipe, and send stdin to it """ |
46 # TODO: check_jids | 64 self.host.bridge.streamOut( |
47 tmp_dir = tempfile.mkdtemp() | 65 self.host.get_full_jid(self.args.jid), |
48 fifopath = os.path.join(tmp_dir,"pipe_out") | 66 self.profile, |
49 os.mkfifo(fifopath) | 67 callback=self.streamOutCb, |
50 self.host.bridge.pipeOut(self.host.get_full_jid(self.args.jid), fifopath, self.profile) | 68 errback=partial(self.errback, |
51 with open(fifopath, 'w') as f: | 69 msg=_(u"can't start stream: {}"), |
52 shutil.copyfileobj(sys.stdin, f) | 70 exit_code=C.EXIT_BRIDGE_ERRBACK)) |
53 shutil.rmtree(tmp_dir) | 71 |
54 self.host.quit() | 72 |
73 class StreamServer(SocketServer.BaseRequestHandler): | |
74 | |
75 def handle(self): | |
76 while True: | |
77 data = self.request.recv(4096) | |
78 if not data: | |
79 break | |
80 sys.stdout.write(data) | |
81 try: | |
82 sys.stdout.flush() | |
83 except IOError as e: | |
84 sys.stderr.write(str(e) + '\n') | |
85 break | |
86 # calling shutdown will do a deadlock as we don't use separate thread | |
87 # this is a workaround (cf. https://stackoverflow.com/a/36017741) | |
88 self.server._BaseServer__shutdown_request = True | |
55 | 89 |
56 | 90 |
57 class PipeIn(base.CommandAnswering): | 91 class PipeIn(base.CommandAnswering): |
58 | 92 |
59 def __init__(self, host): | 93 def __init__(self, host): |
60 super(PipeIn, self).__init__(host, 'in', help=_('receive a pipe stream')) | 94 super(PipeIn, self).__init__(host, 'in', help=_('receive a pipe stream')) |
61 self.action_callbacks = {"PIPE": self.onPipeAction} | 95 self.action_callbacks = {"STREAM": self.onStreamAction} |
62 | 96 |
63 def add_parser_options(self): | 97 def add_parser_options(self): |
64 self.parser.add_argument("jids", type=base.unicode_decoder, nargs="*", help=_('Jids accepted (none means "accept everything")')) | 98 self.parser.add_argument("jids", type=base.unicode_decoder, nargs="*", help=_('Jids accepted (none means "accept everything")')) |
65 | 99 |
66 def getXmluiId(self, action_data): | 100 def getXmluiId(self, action_data): |
67 # FIXME: we temporarily use ElementTree, but a real XMLUI managing module | 101 # FIXME: we temporarily use ElementTree, but a real XMLUI managing module |
68 # should be available in the futur | 102 # should be available in the future |
69 # TODO: XMLUI module | 103 # TODO: XMLUI module |
70 try: | 104 try: |
71 xml_ui = action_data['xmlui'] | 105 xml_ui = action_data['xmlui'] |
72 except KeyError: | 106 except KeyError: |
73 self.disp(_(u"Action has no XMLUI"), 1) | 107 self.disp(_(u"Action has no XMLUI"), 1) |
76 xmlui_id = ui.get('submit') | 110 xmlui_id = ui.get('submit') |
77 if not xmlui_id: | 111 if not xmlui_id: |
78 self.disp(_(u"Invalid XMLUI received"), error=True) | 112 self.disp(_(u"Invalid XMLUI received"), error=True) |
79 return xmlui_id | 113 return xmlui_id |
80 | 114 |
81 def onPipeAction(self, action_data, action_id, security_limit, profile): | 115 def onStreamAction(self, action_data, action_id, security_limit, profile): |
82 xmlui_id = self.getXmluiId(action_data) | 116 xmlui_id = self.getXmluiId(action_data) |
83 if xmlui_id is None: | 117 if xmlui_id is None: |
84 return self.host.quitFromSignal(1) | 118 return self.host.quitFromSignal(1) |
85 try: | 119 try: |
86 from_jid = jid.JID(action_data['meta_from_jid']) | 120 from_jid = jid.JID(action_data['meta_from_jid']) |
87 except KeyError: | 121 except KeyError: |
88 self.disp(_(u"Ignoring action without from_jid data"), 1) | 122 self.disp(_(u"Ignoring action without from_jid data"), 1) |
89 return | 123 return |
90 | 124 |
91 if not self.bare_jids or from_jid.bare in self.bare_jids: | 125 if not self.bare_jids or from_jid.bare in self.bare_jids: |
92 tmp_dir = tempfile.mkdtemp() | 126 host, port = "localhost", START_PORT |
93 fifopath = os.path.join(tmp_dir,"pipe_in") | 127 while True: |
94 os.mkfifo(fifopath) | 128 try: |
95 xmlui_data = {'path': fifopath} | 129 server = SocketServer.TCPServer((host, port), StreamServer) |
130 except socket.error as e: | |
131 if e.errno == errno.EADDRINUSE: | |
132 port += 1 | |
133 else: | |
134 raise e | |
135 else: | |
136 break | |
137 xmlui_data = {'answer': C.BOOL_TRUE, | |
138 'port': unicode(port)} | |
96 self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile) | 139 self.host.bridge.launchAction(xmlui_id, xmlui_data, profile_key=profile) |
97 | 140 server.serve_forever() |
98 with open(fifopath, 'r') as f: | 141 self.host.quitFromSignal() |
99 shutil.copyfileobj(f, sys.stdout) | |
100 shutil.rmtree(tmp_dir) | |
101 self.host.quit() | |
102 | 142 |
103 def start(self): | 143 def start(self): |
104 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] | 144 self.bare_jids = [jid.JID(jid_).bare for jid_ in self.args.jids] |
105 | 145 |
106 | 146 |