comparison sat/plugins/plugin_exp_jingle_stream.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 378188abe941
comparison
equal deleted inserted replaced
2623:49533de4540b 2624:56f94936df1e
19 19
20 from sat.core.i18n import _, D_ 20 from sat.core.i18n import _, D_
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core import exceptions 22 from sat.core import exceptions
23 from sat.core.log import getLogger 23 from sat.core.log import getLogger
24
24 log = getLogger(__name__) 25 log = getLogger(__name__)
25 from sat.tools import xml_tools 26 from sat.tools import xml_tools
26 from sat.tools import stream 27 from sat.tools import stream
27 from twisted.words.xish import domish 28 from twisted.words.xish import domish
28 from twisted.words.protocols.jabber import jid 29 from twisted.words.protocols.jabber import jid
33 from twisted.internet import error 34 from twisted.internet import error
34 from twisted.internet import interfaces 35 from twisted.internet import interfaces
35 from zope import interface 36 from zope import interface
36 import errno 37 import errno
37 38
38 NS_STREAM = 'http://salut-a-toi.org/protocol/stream' 39 NS_STREAM = "http://salut-a-toi.org/protocol/stream"
39 SECURITY_LIMIT=30 40 SECURITY_LIMIT = 30
40 START_PORT = 8888 41 START_PORT = 8888
41 42
42 PLUGIN_INFO = { 43 PLUGIN_INFO = {
43 C.PI_NAME: "Jingle Stream Plugin", 44 C.PI_NAME: "Jingle Stream Plugin",
44 C.PI_IMPORT_NAME: "STREAM", 45 C.PI_IMPORT_NAME: "STREAM",
45 C.PI_TYPE: "EXP", 46 C.PI_TYPE: "EXP",
46 C.PI_PROTOCOLS: [], 47 C.PI_PROTOCOLS: [],
47 C.PI_DEPENDENCIES: ["XEP-0166"], 48 C.PI_DEPENDENCIES: ["XEP-0166"],
48 C.PI_MAIN: "JingleStream", 49 C.PI_MAIN: "JingleStream",
49 C.PI_HANDLER: "no", 50 C.PI_HANDLER: "no",
50 C.PI_DESCRIPTION: _("""Jingle Stream plugin""") 51 C.PI_DESCRIPTION: _("""Jingle Stream plugin"""),
51 } 52 }
52 53
53 CONFIRM = D_(u"{peer} wants to send you a stream, do you accept ?") 54 CONFIRM = D_(u"{peer} wants to send you a stream, do you accept ?")
54 CONFIRM_TITLE = D_(u"Stream Request") 55 CONFIRM_TITLE = D_(u"Stream Request")
55 56
56 57
57 class StreamProtocol(protocol.Protocol): 58 class StreamProtocol(protocol.Protocol):
58
59 def __init__(self): 59 def __init__(self):
60 self.pause = False 60 self.pause = False
61 61
62 def setPause(self, paused): 62 def setPause(self, paused):
63 # in Python 2.x, Twisted classes are old style 63 # in Python 2.x, Twisted classes are old style
116 if self.consumer is None: 116 if self.consumer is None:
117 self.client_conn.setPause(True) 117 self.client_conn.setPause(True)
118 118
119 def startStream(self, consumer): 119 def startStream(self, consumer):
120 if self.consumer is not None: 120 if self.consumer is not None:
121 raise exceptions.InternalError(_(u"stream can't be used with multiple consumers")) 121 raise exceptions.InternalError(
122 _(u"stream can't be used with multiple consumers")
123 )
122 assert self.deferred is None 124 assert self.deferred is None
123 self.consumer = consumer 125 self.consumer = consumer
124 consumer.registerProducer(self, True) 126 consumer.registerProducer(self, True)
125 self.deferred = defer.Deferred() 127 self.deferred = defer.Deferred()
126 if self.client_conn is not None: 128 if self.client_conn is not None:
174 """This non standard jingle application send byte stream""" 176 """This non standard jingle application send byte stream"""
175 177
176 def __init__(self, host): 178 def __init__(self, host):
177 log.info(_("Plugin Stream initialization")) 179 log.info(_("Plugin Stream initialization"))
178 self.host = host 180 self.host = host
179 self._j = host.plugins["XEP-0166"] # shortcut to access jingle 181 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
180 self._j.registerApplication(NS_STREAM, self) 182 self._j.registerApplication(NS_STREAM, self)
181 host.bridge.addMethod("streamOut", ".plugin", in_sign='ss', out_sign='s', method=self._streamOut, async=True) 183 host.bridge.addMethod(
184 "streamOut",
185 ".plugin",
186 in_sign="ss",
187 out_sign="s",
188 method=self._streamOut,
189 async=True,
190 )
191
182 # jingle callbacks 192 # jingle callbacks
183 193
184 def _streamOut(self, to_jid_s, profile_key): 194 def _streamOut(self, to_jid_s, profile_key):
185 client = self.host.getClient(profile_key) 195 client = self.host.getClient(profile_key)
186 return self.streamOut(client, jid.JID(to_jid_s)) 196 return self.streamOut(client, jid.JID(to_jid_s))
204 else: 214 else:
205 raise e 215 raise e
206 else: 216 else:
207 factory.port_listening = port_listening 217 factory.port_listening = port_listening
208 break 218 break
209 self._j.initiate(client, 219 self._j.initiate(
210 to_jid, 220 client,
211 [{'app_ns': NS_STREAM, 221 to_jid,
212 'senders': self._j.ROLE_INITIATOR, 222 [
213 'app_kwargs': {'stream_object': factory}, 223 {
214 }]) 224 "app_ns": NS_STREAM,
225 "senders": self._j.ROLE_INITIATOR,
226 "app_kwargs": {"stream_object": factory},
227 }
228 ],
229 )
215 defer.returnValue(unicode(port)) 230 defer.returnValue(unicode(port))
216 231
217 def jingleSessionInit(self, client, session, content_name, stream_object): 232 def jingleSessionInit(self, client, session, content_name, stream_object):
218 content_data = session['contents'][content_name] 233 content_data = session["contents"][content_name]
219 application_data = content_data['application_data'] 234 application_data = content_data["application_data"]
220 assert 'stream_object' not in application_data 235 assert "stream_object" not in application_data
221 application_data['stream_object'] = stream_object 236 application_data["stream_object"] = stream_object
222 desc_elt = domish.Element((NS_STREAM, 'description')) 237 desc_elt = domish.Element((NS_STREAM, "description"))
223 return desc_elt 238 return desc_elt
224 239
225 @defer.inlineCallbacks 240 @defer.inlineCallbacks
226 def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): 241 def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt):
227 """This method request confirmation for a jingle session""" 242 """This method request confirmation for a jingle session"""
228 content_data = session['contents'][content_name] 243 content_data = session["contents"][content_name]
229 if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): 244 if content_data["senders"] not in (
245 self._j.ROLE_INITIATOR,
246 self._j.ROLE_RESPONDER,
247 ):
230 log.warning(u"Bad sender, assuming initiator") 248 log.warning(u"Bad sender, assuming initiator")
231 content_data['senders'] = self._j.ROLE_INITIATOR 249 content_data["senders"] = self._j.ROLE_INITIATOR
232 250
233 confirm_data = yield xml_tools.deferDialog(self.host, 251 confirm_data = yield xml_tools.deferDialog(
234 _(CONFIRM).format(peer=session['peer_jid'].full()), 252 self.host,
253 _(CONFIRM).format(peer=session["peer_jid"].full()),
235 _(CONFIRM_TITLE), 254 _(CONFIRM_TITLE),
236 type_=C.XMLUI_DIALOG_CONFIRM, 255 type_=C.XMLUI_DIALOG_CONFIRM,
237 action_extra={'meta_from_jid': session['peer_jid'].full(), 256 action_extra={
238 'meta_type': "STREAM", 257 "meta_from_jid": session["peer_jid"].full(),
239 }, 258 "meta_type": "STREAM",
259 },
240 security_limit=SECURITY_LIMIT, 260 security_limit=SECURITY_LIMIT,
241 profile=client.profile) 261 profile=client.profile,
242 262 )
243 if not C.bool(confirm_data['answer']): 263
264 if not C.bool(confirm_data["answer"]):
244 defer.returnValue(False) 265 defer.returnValue(False)
245 try: 266 try:
246 port = int(confirm_data['port']) 267 port = int(confirm_data["port"])
247 except (ValueError, KeyError): 268 except (ValueError, KeyError):
248 raise exceptions.DataError(_(u'given port is invalid')) 269 raise exceptions.DataError(_(u"given port is invalid"))
249 endpoint = endpoints.TCP4ClientEndpoint(reactor, 'localhost', port) 270 endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", port)
250 factory = StreamFactory() 271 factory = StreamFactory()
251 yield endpoint.connect(factory) 272 yield endpoint.connect(factory)
252 content_data['stream_object'] = factory 273 content_data["stream_object"] = factory
253 finished_d = content_data['finished_d'] = defer.Deferred() 274 finished_d = content_data["finished_d"] = defer.Deferred()
254 args = [client, session, content_name, content_data] 275 args = [client, session, content_name, content_data]
255 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) 276 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
256 defer.returnValue(True) 277 defer.returnValue(True)
257 278
258 def jingleHandler(self, client, action, session, content_name, desc_elt): 279 def jingleHandler(self, client, action, session, content_name, desc_elt):
259 content_data = session['contents'][content_name] 280 content_data = session["contents"][content_name]
260 application_data = content_data['application_data'] 281 application_data = content_data["application_data"]
261 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): 282 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE):
262 pass 283 pass
263 elif action == self._j.A_SESSION_ACCEPT: 284 elif action == self._j.A_SESSION_ACCEPT:
264 assert not 'stream_object' in content_data 285 assert not "stream_object" in content_data
265 content_data['stream_object'] = application_data['stream_object'] 286 content_data["stream_object"] = application_data["stream_object"]
266 finished_d = content_data['finished_d'] = defer.Deferred() 287 finished_d = content_data["finished_d"] = defer.Deferred()
267 args = [client, session, content_name, content_data] 288 args = [client, session, content_name, content_data]
268 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) 289 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args)
269 else: 290 else:
270 log.warning(u"FIXME: unmanaged action {}".format(action)) 291 log.warning(u"FIXME: unmanaged action {}".format(action))
271 return desc_elt 292 return desc_elt
272 293
273 def _finishedCb(self, dummy, client, session, content_name, content_data): 294 def _finishedCb(self, dummy, client, session, content_name, content_data):
274 log.info(u"Pipe transfer completed") 295 log.info(u"Pipe transfer completed")
275 self._j.contentTerminate(client, session, content_name) 296 self._j.contentTerminate(client, session, content_name)
276 content_data['stream_object'].stopStream() 297 content_data["stream_object"].stopStream()
277 298
278 def _finishedEb(self, failure, client, session, content_name, content_data): 299 def _finishedEb(self, failure, client, session, content_name, content_data):
279 log.warning(u"Error while streaming pipe: {}".format(failure)) 300 log.warning(u"Error while streaming pipe: {}".format(failure))
280 self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT) 301 self._j.contentTerminate(
281 content_data['stream_object'].stopStream() 302 client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT
303 )
304 content_data["stream_object"].stopStream()