Mercurial > libervia-backend
comparison src/plugins/plugin_exp_jingle_stream.py @ 2489:e2a7bb875957
plugin pipe/stream, file transfert: refactoring and improvments:
this is a big patch as things had to be changed at the same time.
- changed methods using profile argument to use client instead
- move SatFile in a new tools.stream module, has it should be part of core, not a plugin
- new IStreamProducer interface, to handler starting a pull producer
- new FileStreamObject which create a stream producer/consumer from a SatFile
- plugin pipe is no more using unix named pipe, as it complicate the thing,
special care need to be taken to not block, and it's generally not necessary.
Instead a socket is now used, so the plugin has been renomed to jingle stream.
- bad connection/error should be better handler in jingle stream plugin, and code should not block anymore
- jp pipe commands have been updated accordingly
fix bug 237
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 08 Feb 2018 00:37:42 +0100 |
parents | src/plugins/plugin_exp_pipe.py@0046283a285d |
children |
comparison
equal
deleted
inserted
replaced
2488:78c7992a26ed | 2489:e2a7bb875957 |
---|---|
1 #!/usr/bin/env python2 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SAT plugin for managing pipes (experimental) | |
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from sat.core.i18n import _, D_ | |
21 from sat.core.constants import Const as C | |
22 from sat.core import exceptions | |
23 from sat.core.log import getLogger | |
24 log = getLogger(__name__) | |
25 from sat.tools import xml_tools | |
26 from sat.tools import stream | |
27 from twisted.words.xish import domish | |
28 from twisted.words.protocols.jabber import jid | |
29 from twisted.internet import defer | |
30 from twisted.internet import protocol | |
31 from twisted.internet import endpoints | |
32 from twisted.internet import reactor | |
33 from twisted.internet import error | |
34 from twisted.internet import interfaces | |
35 from zope import interface | |
36 import errno | |
37 | |
38 NS_STREAM = 'http://salut-a-toi.org/protocol/stream' | |
39 SECURITY_LIMIT=30 | |
40 START_PORT = 8888 | |
41 | |
42 PLUGIN_INFO = { | |
43 C.PI_NAME: "Jingle Stream Plugin", | |
44 C.PI_IMPORT_NAME: "STREAM", | |
45 C.PI_TYPE: "EXP", | |
46 C.PI_PROTOCOLS: [], | |
47 C.PI_DEPENDENCIES: ["XEP-0166"], | |
48 C.PI_MAIN: "JingleStream", | |
49 C.PI_HANDLER: "no", | |
50 C.PI_DESCRIPTION: _("""Jingle Stream plugin""") | |
51 } | |
52 | |
53 CONFIRM = D_(u"{peer} wants to send you a stream, do you accept ?") | |
54 CONFIRM_TITLE = D_(u"Stream Request") | |
55 | |
56 | |
57 class StreamProtocol(protocol.Protocol): | |
58 | |
59 def __init__(self): | |
60 self.pause = False | |
61 | |
62 def setPause(self, paused): | |
63 # in Python 2.x, Twisted classes are old style | |
64 # so we can use property and setter | |
65 if paused: | |
66 if not self.pause: | |
67 self.transport.pauseProducing() | |
68 self.pause = True | |
69 else: | |
70 if self.pause: | |
71 self.transport.resumeProducing() | |
72 self.pause = False | |
73 | |
74 def disconnect(self): | |
75 self.transport.loseConnection() | |
76 | |
77 def connectionMade(self): | |
78 if self.factory.client_conn is not None: | |
79 self.transport.loseConnection() | |
80 self.factory.setClientConn(self) | |
81 | |
82 def dataReceived(self, data): | |
83 self.factory.writeToConsumer(data) | |
84 | |
85 def sendData(self, data): | |
86 self.transport.write(data) | |
87 | |
88 def connectionLost(self, reason): | |
89 if self.factory.client_conn != self: | |
90 # only the first connected client_conn is relevant | |
91 return | |
92 | |
93 if reason.type == error.ConnectionDone: | |
94 self.factory.streamFinished() | |
95 else: | |
96 self.factory.streamFailed(reason) | |
97 | |
98 | |
99 @interface.implementer(stream.IStreamProducer) | |
100 @interface.implementer(interfaces.IPushProducer) | |
101 @interface.implementer(interfaces.IConsumer) | |
102 class StreamFactory(protocol.Factory): | |
103 protocol = StreamProtocol | |
104 consumer = None | |
105 producer = None | |
106 deferred = None | |
107 | |
108 def __init__(self): | |
109 self.client_conn = None | |
110 | |
111 def setClientConn(self, stream_protocol): | |
112 # in Python 2.x, Twisted classes are old style | |
113 # so we can use property and setter | |
114 assert self.client_conn is None | |
115 self.client_conn = stream_protocol | |
116 if self.consumer is None: | |
117 self.client_conn.setPause(True) | |
118 | |
119 def startStream(self, consumer): | |
120 if self.consumer is not None: | |
121 raise exceptions.InternalError(_(u"stream can't be used with multiple consumers")) | |
122 assert self.deferred is None | |
123 self.consumer = consumer | |
124 consumer.registerProducer(self, True) | |
125 self.deferred = defer.Deferred() | |
126 if self.client_conn is not None: | |
127 self.client_conn.setPause(False) | |
128 return self.deferred | |
129 | |
130 def streamFinished(self): | |
131 self.client_conn = None | |
132 if self.consumer: | |
133 self.consumer.unregisterProducer() | |
134 self.port_listening.stopListening() | |
135 self.deferred.callback(None) | |
136 | |
137 def streamFailed(self, failure_): | |
138 self.client_conn = None | |
139 if self.consumer: | |
140 self.consumer.unregisterProducer() | |
141 self.port_listening.stopListening() | |
142 self.deferred.errback(failure_) | |
143 elif self.producer: | |
144 self.producer.stopProducing() | |
145 | |
146 def stopStream(self): | |
147 if self.client_conn is not None: | |
148 self.client_conn.disconnect() | |
149 | |
150 def registerProducer(self, producer, streaming): | |
151 self.producer = producer | |
152 | |
153 def pauseProducing(self): | |
154 self.client_conn.setPause(True) | |
155 | |
156 def resumeProducing(self): | |
157 self.client_conn.setPause(False) | |
158 | |
159 def stopProducing(self): | |
160 if self.client_conn: | |
161 self.client_conn.disconnect() | |
162 | |
163 def write(self, data): | |
164 try: | |
165 self.client_conn.sendData(data) | |
166 except AttributeError: | |
167 log.warning(_(u"No client connected, can't send data")) | |
168 | |
169 def writeToConsumer(self, data): | |
170 self.consumer.write(data) | |
171 | |
172 | |
173 class JingleStream(object): | |
174 """This non standard jingle application send byte stream""" | |
175 | |
176 def __init__(self, host): | |
177 log.info(_("Plugin Stream initialization")) | |
178 self.host = host | |
179 self._j = host.plugins["XEP-0166"] # shortcut to access jingle | |
180 self._j.registerApplication(NS_STREAM, self) | |
181 host.bridge.addMethod("streamOut", ".plugin", in_sign='ss', out_sign='s', method=self._streamOut, async=True) | |
182 # jingle callbacks | |
183 | |
184 def _streamOut(self, to_jid_s, profile_key): | |
185 client = self.host.getClient(profile_key) | |
186 return self.streamOut(client, jid.JID(to_jid_s)) | |
187 | |
188 @defer.inlineCallbacks | |
189 def streamOut(self, client, to_jid): | |
190 """send a stream | |
191 | |
192 @param peer_jid(jid.JID): recipient | |
193 @return: an unique id to identify the transfer | |
194 """ | |
195 port = START_PORT | |
196 factory = StreamFactory() | |
197 while True: | |
198 endpoint = endpoints.TCP4ServerEndpoint(reactor, port) | |
199 try: | |
200 port_listening = yield endpoint.listen(factory) | |
201 except error.CannotListenError as e: | |
202 if e.socketError.errno == errno.EADDRINUSE: | |
203 port += 1 | |
204 else: | |
205 raise e | |
206 else: | |
207 factory.port_listening = port_listening | |
208 break | |
209 self._j.initiate(client, | |
210 to_jid, | |
211 [{'app_ns': NS_STREAM, | |
212 'senders': self._j.ROLE_INITIATOR, | |
213 'app_kwargs': {'stream_object': factory}, | |
214 }]) | |
215 defer.returnValue(unicode(port)) | |
216 | |
217 def jingleSessionInit(self, client, session, content_name, stream_object): | |
218 content_data = session['contents'][content_name] | |
219 application_data = content_data['application_data'] | |
220 assert 'stream_object' not in application_data | |
221 application_data['stream_object'] = stream_object | |
222 desc_elt = domish.Element((NS_STREAM, 'description')) | |
223 return desc_elt | |
224 | |
225 @defer.inlineCallbacks | |
226 def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): | |
227 """This method request confirmation for a jingle session""" | |
228 content_data = session['contents'][content_name] | |
229 if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): | |
230 log.warning(u"Bad sender, assuming initiator") | |
231 content_data['senders'] = self._j.ROLE_INITIATOR | |
232 | |
233 confirm_data = yield xml_tools.deferDialog(self.host, | |
234 _(CONFIRM).format(peer=session['peer_jid'].full()), | |
235 _(CONFIRM_TITLE), | |
236 type_=C.XMLUI_DIALOG_CONFIRM, | |
237 action_extra={'meta_from_jid': session['peer_jid'].full(), | |
238 'meta_type': "STREAM", | |
239 }, | |
240 security_limit=SECURITY_LIMIT, | |
241 profile=client.profile) | |
242 | |
243 if not C.bool(confirm_data['answer']): | |
244 defer.returnValue(False) | |
245 try: | |
246 port = int(confirm_data['port']) | |
247 except (ValueError, KeyError): | |
248 raise exceptions.DataError(_(u'given port is invalid')) | |
249 endpoint = endpoints.TCP4ClientEndpoint(reactor, 'localhost', port) | |
250 factory = StreamFactory() | |
251 yield endpoint.connect(factory) | |
252 content_data['stream_object'] = factory | |
253 finished_d = content_data['finished_d'] = defer.Deferred() | |
254 args = [client, session, content_name, content_data] | |
255 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) | |
256 defer.returnValue(True) | |
257 | |
258 def jingleHandler(self, client, action, session, content_name, desc_elt): | |
259 content_data = session['contents'][content_name] | |
260 application_data = content_data['application_data'] | |
261 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): | |
262 pass | |
263 elif action == self._j.A_SESSION_ACCEPT: | |
264 assert not 'stream_object' in content_data | |
265 content_data['stream_object'] = application_data['stream_object'] | |
266 finished_d = content_data['finished_d'] = defer.Deferred() | |
267 args = [client, session, content_name, content_data] | |
268 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) | |
269 else: | |
270 log.warning(u"FIXME: unmanaged action {}".format(action)) | |
271 return desc_elt | |
272 | |
273 def _finishedCb(self, dummy, client, session, content_name, content_data): | |
274 log.info(u"Pipe transfer completed") | |
275 self._j.contentTerminate(client, session, content_name) | |
276 content_data['stream_object'].stopStream() | |
277 | |
278 def _finishedEb(self, failure, client, session, content_name, content_data): | |
279 log.warning(u"Error while streaming pipe: {}".format(failure)) | |
280 self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT) | |
281 content_data['stream_object'].stopStream() |