comparison src/plugins/plugin_exp_jingle_stream.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents src/plugins/plugin_exp_pipe.py@0046283a285d
children
comparison
equal deleted inserted replaced
2488:78c7992a26ed 2489:e2a7bb875957
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SAT plugin for managing pipes (experimental)
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _, D_
21 from sat.core.constants import Const as C
22 from sat.core import exceptions
23 from sat.core.log import getLogger
24 log = getLogger(__name__)
25 from sat.tools import xml_tools
26 from sat.tools import stream
27 from twisted.words.xish import domish
28 from twisted.words.protocols.jabber import jid
29 from twisted.internet import defer
30 from twisted.internet import protocol
31 from twisted.internet import endpoints
32 from twisted.internet import reactor
33 from twisted.internet import error
34 from twisted.internet import interfaces
35 from zope import interface
36 import errno
37
38 NS_STREAM = 'http://salut-a-toi.org/protocol/stream'
39 SECURITY_LIMIT=30
40 START_PORT = 8888
41
42 PLUGIN_INFO = {
43 C.PI_NAME: "Jingle Stream Plugin",
44 C.PI_IMPORT_NAME: "STREAM",
45 C.PI_TYPE: "EXP",
46 C.PI_PROTOCOLS: [],
47 C.PI_DEPENDENCIES: ["XEP-0166"],
48 C.PI_MAIN: "JingleStream",
49 C.PI_HANDLER: "no",
50 C.PI_DESCRIPTION: _("""Jingle Stream plugin""")
51 }
52
53 CONFIRM = D_(u"{peer} wants to send you a stream, do you accept ?")
54 CONFIRM_TITLE = D_(u"Stream Request")
55
56
57 class StreamProtocol(protocol.Protocol):
58
59 def __init__(self):
60 self.pause = False
61
62 def setPause(self, paused):
63 # in Python 2.x, Twisted classes are old style
64 # so we can use property and setter
65 if paused:
66 if not self.pause:
67 self.transport.pauseProducing()
68 self.pause = True
69 else:
70 if self.pause:
71 self.transport.resumeProducing()
72 self.pause = False
73
74 def disconnect(self):
75 self.transport.loseConnection()
76
77 def connectionMade(self):
78 if self.factory.client_conn is not None:
79 self.transport.loseConnection()
80 self.factory.setClientConn(self)
81
82 def dataReceived(self, data):
83 self.factory.writeToConsumer(data)
84
85 def sendData(self, data):
86 self.transport.write(data)
87
88 def connectionLost(self, reason):
89 if self.factory.client_conn != self:
90 # only the first connected client_conn is relevant
91 return
92
93 if reason.type == error.ConnectionDone:
94 self.factory.streamFinished()
95 else:
96 self.factory.streamFailed(reason)
97
98
99 @interface.implementer(stream.IStreamProducer)
100 @interface.implementer(interfaces.IPushProducer)
101 @interface.implementer(interfaces.IConsumer)
102 class StreamFactory(protocol.Factory):
103 protocol = StreamProtocol
104 consumer = None
105 producer = None
106 deferred = None
107
108 def __init__(self):
109 self.client_conn = None
110
111 def setClientConn(self, stream_protocol):
112 # in Python 2.x, Twisted classes are old style
113 # so we can use property and setter
114 assert self.client_conn is None
115 self.client_conn = stream_protocol
116 if self.consumer is None:
117 self.client_conn.setPause(True)
118
119 def startStream(self, consumer):
120 if self.consumer is not None:
121 raise exceptions.InternalError(_(u"stream can't be used with multiple consumers"))
122 assert self.deferred is None
123 self.consumer = consumer
124 consumer.registerProducer(self, True)
125 self.deferred = defer.Deferred()
126 if self.client_conn is not None:
127 self.client_conn.setPause(False)
128 return self.deferred
129
130 def streamFinished(self):
131 self.client_conn = None
132 if self.consumer:
133 self.consumer.unregisterProducer()
134 self.port_listening.stopListening()
135 self.deferred.callback(None)
136
137 def streamFailed(self, failure_):
138 self.client_conn = None
139 if self.consumer:
140 self.consumer.unregisterProducer()
141 self.port_listening.stopListening()
142 self.deferred.errback(failure_)
143 elif self.producer:
144 self.producer.stopProducing()
145
146 def stopStream(self):
147 if self.client_conn is not None:
148 self.client_conn.disconnect()
149
150 def registerProducer(self, producer, streaming):
151 self.producer = producer
152
153 def pauseProducing(self):
154 self.client_conn.setPause(True)
155
156 def resumeProducing(self):
157 self.client_conn.setPause(False)
158
159 def stopProducing(self):
160 if self.client_conn:
161 self.client_conn.disconnect()
162
163 def write(self, data):
164 try:
165 self.client_conn.sendData(data)
166 except AttributeError:
167 log.warning(_(u"No client connected, can't send data"))
168
169 def writeToConsumer(self, data):
170 self.consumer.write(data)
171
172
173 class JingleStream(object):
174 """This non standard jingle application send byte stream"""
175
176 def __init__(self, host):
177 log.info(_("Plugin Stream initialization"))
178 self.host = host
179 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
180 self._j.registerApplication(NS_STREAM, self)
181 host.bridge.addMethod("streamOut", ".plugin", in_sign='ss', out_sign='s', method=self._streamOut, async=True)
182 # jingle callbacks
183
184 def _streamOut(self, to_jid_s, profile_key):
185 client = self.host.getClient(profile_key)
186 return self.streamOut(client, jid.JID(to_jid_s))
187
188 @defer.inlineCallbacks
189 def streamOut(self, client, to_jid):
190 """send a stream
191
192 @param peer_jid(jid.JID): recipient
193 @return: an unique id to identify the transfer
194 """
195 port = START_PORT
196 factory = StreamFactory()
197 while True:
198 endpoint = endpoints.TCP4ServerEndpoint(reactor, port)
199 try:
200 port_listening = yield endpoint.listen(factory)
201 except error.CannotListenError as e:
202 if e.socketError.errno == errno.EADDRINUSE:
203 port += 1
204 else:
205 raise e
206 else:
207 factory.port_listening = port_listening
208 break
209 self._j.initiate(client,
210 to_jid,
211 [{'app_ns': NS_STREAM,
212 'senders': self._j.ROLE_INITIATOR,
213 'app_kwargs': {'stream_object': factory},
214 }])
215 defer.returnValue(unicode(port))
216
217 def jingleSessionInit(self, client, session, content_name, stream_object):
218 content_data = session['contents'][content_name]
219 application_data = content_data['application_data']
220 assert 'stream_object' not in application_data
221 application_data['stream_object'] = stream_object
222 desc_elt = domish.Element((NS_STREAM, 'description'))
223 return desc_elt
224
225 @defer.inlineCallbacks
226 def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt):
227 """This method request confirmation for a jingle session"""
228 content_data = session['contents'][content_name]
229 if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER):
230 log.warning(u"Bad sender, assuming initiator")
231 content_data['senders'] = self._j.ROLE_INITIATOR
232
233 confirm_data = yield xml_tools.deferDialog(self.host,
234 _(CONFIRM).format(peer=session['peer_jid'].full()),
235 _(CONFIRM_TITLE),
236 type_=C.XMLUI_DIALOG_CONFIRM,
237 action_extra={'meta_from_jid': session['peer_jid'].full(),
238 'meta_type': "STREAM",
239 },
240 security_limit=SECURITY_LIMIT,
241 profile=client.profile)
242
243 if not C.bool(confirm_data['answer']):
244 defer.returnValue(False)
245 try:
246 port = int(confirm_data['port'])
247 except (ValueError, KeyError):
248 raise exceptions.DataError(_(u'given port is invalid'))
249 endpoint = endpoints.TCP4ClientEndpoint(reactor, 'localhost', port)
250 factory = StreamFactory()
251 yield endpoint.connect(factory)
252 content_data['stream_object'] = factory
253 finished_d = content_data['finished_d'] = defer.Deferred()
254 args = [client, session, content_name, content_data]
255 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
256 defer.returnValue(True)
257
258 def jingleHandler(self, client, action, session, content_name, desc_elt):
259 content_data = session['contents'][content_name]
260 application_data = content_data['application_data']
261 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE):
262 pass
263 elif action == self._j.A_SESSION_ACCEPT:
264 assert not 'stream_object' in content_data
265 content_data['stream_object'] = application_data['stream_object']
266 finished_d = content_data['finished_d'] = defer.Deferred()
267 args = [client, session, content_name, content_data]
268 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
269 else:
270 log.warning(u"FIXME: unmanaged action {}".format(action))
271 return desc_elt
272
273 def _finishedCb(self, dummy, client, session, content_name, content_data):
274 log.info(u"Pipe transfer completed")
275 self._j.contentTerminate(client, session, content_name)
276 content_data['stream_object'].stopStream()
277
278 def _finishedEb(self, failure, client, session, content_name, content_data):
279 log.warning(u"Error while streaming pipe: {}".format(failure))
280 self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT)
281 content_data['stream_object'].stopStream()