Mercurial > libervia-backend
comparison sat/plugins/plugin_exp_jingle_stream.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 378188abe941 |
comparison
equal
deleted
inserted
replaced
2623:49533de4540b | 2624:56f94936df1e |
---|---|
19 | 19 |
20 from sat.core.i18n import _, D_ | 20 from sat.core.i18n import _, D_ |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core import exceptions | 22 from sat.core import exceptions |
23 from sat.core.log import getLogger | 23 from sat.core.log import getLogger |
24 | |
24 log = getLogger(__name__) | 25 log = getLogger(__name__) |
25 from sat.tools import xml_tools | 26 from sat.tools import xml_tools |
26 from sat.tools import stream | 27 from sat.tools import stream |
27 from twisted.words.xish import domish | 28 from twisted.words.xish import domish |
28 from twisted.words.protocols.jabber import jid | 29 from twisted.words.protocols.jabber import jid |
33 from twisted.internet import error | 34 from twisted.internet import error |
34 from twisted.internet import interfaces | 35 from twisted.internet import interfaces |
35 from zope import interface | 36 from zope import interface |
36 import errno | 37 import errno |
37 | 38 |
38 NS_STREAM = 'http://salut-a-toi.org/protocol/stream' | 39 NS_STREAM = "http://salut-a-toi.org/protocol/stream" |
39 SECURITY_LIMIT=30 | 40 SECURITY_LIMIT = 30 |
40 START_PORT = 8888 | 41 START_PORT = 8888 |
41 | 42 |
42 PLUGIN_INFO = { | 43 PLUGIN_INFO = { |
43 C.PI_NAME: "Jingle Stream Plugin", | 44 C.PI_NAME: "Jingle Stream Plugin", |
44 C.PI_IMPORT_NAME: "STREAM", | 45 C.PI_IMPORT_NAME: "STREAM", |
45 C.PI_TYPE: "EXP", | 46 C.PI_TYPE: "EXP", |
46 C.PI_PROTOCOLS: [], | 47 C.PI_PROTOCOLS: [], |
47 C.PI_DEPENDENCIES: ["XEP-0166"], | 48 C.PI_DEPENDENCIES: ["XEP-0166"], |
48 C.PI_MAIN: "JingleStream", | 49 C.PI_MAIN: "JingleStream", |
49 C.PI_HANDLER: "no", | 50 C.PI_HANDLER: "no", |
50 C.PI_DESCRIPTION: _("""Jingle Stream plugin""") | 51 C.PI_DESCRIPTION: _("""Jingle Stream plugin"""), |
51 } | 52 } |
52 | 53 |
53 CONFIRM = D_(u"{peer} wants to send you a stream, do you accept ?") | 54 CONFIRM = D_(u"{peer} wants to send you a stream, do you accept ?") |
54 CONFIRM_TITLE = D_(u"Stream Request") | 55 CONFIRM_TITLE = D_(u"Stream Request") |
55 | 56 |
56 | 57 |
57 class StreamProtocol(protocol.Protocol): | 58 class StreamProtocol(protocol.Protocol): |
58 | |
59 def __init__(self): | 59 def __init__(self): |
60 self.pause = False | 60 self.pause = False |
61 | 61 |
62 def setPause(self, paused): | 62 def setPause(self, paused): |
63 # in Python 2.x, Twisted classes are old style | 63 # in Python 2.x, Twisted classes are old style |
116 if self.consumer is None: | 116 if self.consumer is None: |
117 self.client_conn.setPause(True) | 117 self.client_conn.setPause(True) |
118 | 118 |
119 def startStream(self, consumer): | 119 def startStream(self, consumer): |
120 if self.consumer is not None: | 120 if self.consumer is not None: |
121 raise exceptions.InternalError(_(u"stream can't be used with multiple consumers")) | 121 raise exceptions.InternalError( |
122 _(u"stream can't be used with multiple consumers") | |
123 ) | |
122 assert self.deferred is None | 124 assert self.deferred is None |
123 self.consumer = consumer | 125 self.consumer = consumer |
124 consumer.registerProducer(self, True) | 126 consumer.registerProducer(self, True) |
125 self.deferred = defer.Deferred() | 127 self.deferred = defer.Deferred() |
126 if self.client_conn is not None: | 128 if self.client_conn is not None: |
174 """This non standard jingle application send byte stream""" | 176 """This non standard jingle application send byte stream""" |
175 | 177 |
176 def __init__(self, host): | 178 def __init__(self, host): |
177 log.info(_("Plugin Stream initialization")) | 179 log.info(_("Plugin Stream initialization")) |
178 self.host = host | 180 self.host = host |
179 self._j = host.plugins["XEP-0166"] # shortcut to access jingle | 181 self._j = host.plugins["XEP-0166"] # shortcut to access jingle |
180 self._j.registerApplication(NS_STREAM, self) | 182 self._j.registerApplication(NS_STREAM, self) |
181 host.bridge.addMethod("streamOut", ".plugin", in_sign='ss', out_sign='s', method=self._streamOut, async=True) | 183 host.bridge.addMethod( |
184 "streamOut", | |
185 ".plugin", | |
186 in_sign="ss", | |
187 out_sign="s", | |
188 method=self._streamOut, | |
189 async=True, | |
190 ) | |
191 | |
182 # jingle callbacks | 192 # jingle callbacks |
183 | 193 |
184 def _streamOut(self, to_jid_s, profile_key): | 194 def _streamOut(self, to_jid_s, profile_key): |
185 client = self.host.getClient(profile_key) | 195 client = self.host.getClient(profile_key) |
186 return self.streamOut(client, jid.JID(to_jid_s)) | 196 return self.streamOut(client, jid.JID(to_jid_s)) |
204 else: | 214 else: |
205 raise e | 215 raise e |
206 else: | 216 else: |
207 factory.port_listening = port_listening | 217 factory.port_listening = port_listening |
208 break | 218 break |
209 self._j.initiate(client, | 219 self._j.initiate( |
210 to_jid, | 220 client, |
211 [{'app_ns': NS_STREAM, | 221 to_jid, |
212 'senders': self._j.ROLE_INITIATOR, | 222 [ |
213 'app_kwargs': {'stream_object': factory}, | 223 { |
214 }]) | 224 "app_ns": NS_STREAM, |
225 "senders": self._j.ROLE_INITIATOR, | |
226 "app_kwargs": {"stream_object": factory}, | |
227 } | |
228 ], | |
229 ) | |
215 defer.returnValue(unicode(port)) | 230 defer.returnValue(unicode(port)) |
216 | 231 |
217 def jingleSessionInit(self, client, session, content_name, stream_object): | 232 def jingleSessionInit(self, client, session, content_name, stream_object): |
218 content_data = session['contents'][content_name] | 233 content_data = session["contents"][content_name] |
219 application_data = content_data['application_data'] | 234 application_data = content_data["application_data"] |
220 assert 'stream_object' not in application_data | 235 assert "stream_object" not in application_data |
221 application_data['stream_object'] = stream_object | 236 application_data["stream_object"] = stream_object |
222 desc_elt = domish.Element((NS_STREAM, 'description')) | 237 desc_elt = domish.Element((NS_STREAM, "description")) |
223 return desc_elt | 238 return desc_elt |
224 | 239 |
225 @defer.inlineCallbacks | 240 @defer.inlineCallbacks |
226 def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): | 241 def jingleRequestConfirmation(self, client, action, session, content_name, desc_elt): |
227 """This method request confirmation for a jingle session""" | 242 """This method request confirmation for a jingle session""" |
228 content_data = session['contents'][content_name] | 243 content_data = session["contents"][content_name] |
229 if content_data['senders'] not in (self._j.ROLE_INITIATOR, self._j.ROLE_RESPONDER): | 244 if content_data["senders"] not in ( |
245 self._j.ROLE_INITIATOR, | |
246 self._j.ROLE_RESPONDER, | |
247 ): | |
230 log.warning(u"Bad sender, assuming initiator") | 248 log.warning(u"Bad sender, assuming initiator") |
231 content_data['senders'] = self._j.ROLE_INITIATOR | 249 content_data["senders"] = self._j.ROLE_INITIATOR |
232 | 250 |
233 confirm_data = yield xml_tools.deferDialog(self.host, | 251 confirm_data = yield xml_tools.deferDialog( |
234 _(CONFIRM).format(peer=session['peer_jid'].full()), | 252 self.host, |
253 _(CONFIRM).format(peer=session["peer_jid"].full()), | |
235 _(CONFIRM_TITLE), | 254 _(CONFIRM_TITLE), |
236 type_=C.XMLUI_DIALOG_CONFIRM, | 255 type_=C.XMLUI_DIALOG_CONFIRM, |
237 action_extra={'meta_from_jid': session['peer_jid'].full(), | 256 action_extra={ |
238 'meta_type': "STREAM", | 257 "meta_from_jid": session["peer_jid"].full(), |
239 }, | 258 "meta_type": "STREAM", |
259 }, | |
240 security_limit=SECURITY_LIMIT, | 260 security_limit=SECURITY_LIMIT, |
241 profile=client.profile) | 261 profile=client.profile, |
242 | 262 ) |
243 if not C.bool(confirm_data['answer']): | 263 |
264 if not C.bool(confirm_data["answer"]): | |
244 defer.returnValue(False) | 265 defer.returnValue(False) |
245 try: | 266 try: |
246 port = int(confirm_data['port']) | 267 port = int(confirm_data["port"]) |
247 except (ValueError, KeyError): | 268 except (ValueError, KeyError): |
248 raise exceptions.DataError(_(u'given port is invalid')) | 269 raise exceptions.DataError(_(u"given port is invalid")) |
249 endpoint = endpoints.TCP4ClientEndpoint(reactor, 'localhost', port) | 270 endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", port) |
250 factory = StreamFactory() | 271 factory = StreamFactory() |
251 yield endpoint.connect(factory) | 272 yield endpoint.connect(factory) |
252 content_data['stream_object'] = factory | 273 content_data["stream_object"] = factory |
253 finished_d = content_data['finished_d'] = defer.Deferred() | 274 finished_d = content_data["finished_d"] = defer.Deferred() |
254 args = [client, session, content_name, content_data] | 275 args = [client, session, content_name, content_data] |
255 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) | 276 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) |
256 defer.returnValue(True) | 277 defer.returnValue(True) |
257 | 278 |
258 def jingleHandler(self, client, action, session, content_name, desc_elt): | 279 def jingleHandler(self, client, action, session, content_name, desc_elt): |
259 content_data = session['contents'][content_name] | 280 content_data = session["contents"][content_name] |
260 application_data = content_data['application_data'] | 281 application_data = content_data["application_data"] |
261 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): | 282 if action in (self._j.A_ACCEPTED_ACK, self._j.A_SESSION_INITIATE): |
262 pass | 283 pass |
263 elif action == self._j.A_SESSION_ACCEPT: | 284 elif action == self._j.A_SESSION_ACCEPT: |
264 assert not 'stream_object' in content_data | 285 assert not "stream_object" in content_data |
265 content_data['stream_object'] = application_data['stream_object'] | 286 content_data["stream_object"] = application_data["stream_object"] |
266 finished_d = content_data['finished_d'] = defer.Deferred() | 287 finished_d = content_data["finished_d"] = defer.Deferred() |
267 args = [client, session, content_name, content_data] | 288 args = [client, session, content_name, content_data] |
268 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) | 289 finished_d.addCallbacks(self._finishedCb, self._finishedEb, args, None, args) |
269 else: | 290 else: |
270 log.warning(u"FIXME: unmanaged action {}".format(action)) | 291 log.warning(u"FIXME: unmanaged action {}".format(action)) |
271 return desc_elt | 292 return desc_elt |
272 | 293 |
273 def _finishedCb(self, dummy, client, session, content_name, content_data): | 294 def _finishedCb(self, dummy, client, session, content_name, content_data): |
274 log.info(u"Pipe transfer completed") | 295 log.info(u"Pipe transfer completed") |
275 self._j.contentTerminate(client, session, content_name) | 296 self._j.contentTerminate(client, session, content_name) |
276 content_data['stream_object'].stopStream() | 297 content_data["stream_object"].stopStream() |
277 | 298 |
278 def _finishedEb(self, failure, client, session, content_name, content_data): | 299 def _finishedEb(self, failure, client, session, content_name, content_data): |
279 log.warning(u"Error while streaming pipe: {}".format(failure)) | 300 log.warning(u"Error while streaming pipe: {}".format(failure)) |
280 self._j.contentTerminate(client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT) | 301 self._j.contentTerminate( |
281 content_data['stream_object'].stopStream() | 302 client, session, content_name, reason=self._j.REASON_FAILED_TRANSPORT |
303 ) | |
304 content_data["stream_object"].stopStream() |