Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0260.py @ 2562:26edcf3a30eb
core, setup: huge cleaning:
- moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention
- move twisted directory to root
- removed all hacks from setup.py, and added missing dependencies, it is now clean
- use https URL for website in setup.py
- removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed
- renamed sat.sh to sat and fixed its installation
- added python_requires to specify Python version needed
- replaced glib2reactor which use deprecated code by gtk3reactor
sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Apr 2018 19:44:50 +0200 |
parents | src/plugins/plugin_xep_0260.py@67cc54b01a12 |
children | 56f94936df1e |
comparison
equal
deleted
inserted
replaced
2561:bd30dc3ffe5a | 2562:26edcf3a30eb |
---|---|
1 #!/usr/bin/env python2 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SAT plugin for Jingle (XEP-0260) | |
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from sat.core.i18n import _ | |
21 from sat.core.constants import Const as C | |
22 from sat.core.log import getLogger | |
23 log = getLogger(__name__) | |
24 from sat.core import exceptions | |
25 from wokkel import disco, iwokkel | |
26 from zope.interface import implements | |
27 from twisted.words.xish import domish | |
28 from twisted.words.protocols.jabber import jid | |
29 from twisted.internet import defer | |
30 import uuid | |
31 | |
32 try: | |
33 from twisted.words.protocols.xmlstream import XMPPHandler | |
34 except ImportError: | |
35 from wokkel.subprotocols import XMPPHandler | |
36 | |
37 | |
38 NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1' | |
39 | |
40 PLUGIN_INFO = { | |
41 C.PI_NAME: "Jingle SOCKS5 Bytestreams", | |
42 C.PI_IMPORT_NAME: "XEP-0260", | |
43 C.PI_TYPE: "XEP", | |
44 C.PI_MODES: C.PLUG_MODE_BOTH, | |
45 C.PI_PROTOCOLS: ["XEP-0260"], | |
46 C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"], | |
47 C.PI_RECOMMENDATIONS: ["XEP-0261"], # needed for fallback | |
48 C.PI_MAIN: "XEP_0260", | |
49 C.PI_HANDLER: "yes", | |
50 C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams""") | |
51 } | |
52 | |
53 | |
54 class ProxyError(Exception): | |
55 | |
56 def __str__(self): | |
57 return "an error happened while trying to use the proxy" | |
58 | |
59 | |
60 class XEP_0260(object): | |
61 # TODO: udp handling | |
62 | |
63 def __init__(self, host): | |
64 log.info(_("plugin Jingle SOCKS5 Bytestreams")) | |
65 self.host = host | |
66 self._j = host.plugins["XEP-0166"] # shortcut to access jingle | |
67 self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream | |
68 try: | |
69 self._jingle_ibb = host.plugins["XEP-0261"] | |
70 except KeyError: | |
71 self._jingle_ibb = None | |
72 self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) | |
73 | |
74 def getHandler(self, client): | |
75 return XEP_0260_handler() | |
76 | |
77 def _parseCandidates(self, transport_elt): | |
78 """Parse <candidate> elements | |
79 | |
80 @param transport_elt(domish.Element): parent <transport> element | |
81 @return (list[plugin_xep_0065.Candidate): list of parsed candidates | |
82 """ | |
83 candidates = [] | |
84 for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'): | |
85 try: | |
86 cid = candidate_elt['cid'] | |
87 host = candidate_elt['host'] | |
88 jid_= jid.JID(candidate_elt['jid']) | |
89 port = int(candidate_elt.getAttribute('port', 1080)) | |
90 priority = int(candidate_elt['priority']) | |
91 type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT) | |
92 except (KeyError, ValueError): | |
93 raise exceptions.DataError() | |
94 candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) | |
95 candidates.append(candidate) | |
96 # self._s5b.registerCandidate(candidate) | |
97 return candidates | |
98 | |
99 def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None): | |
100 """Build <transport> element with candidates | |
101 | |
102 @param session(dict): jingle session data | |
103 @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add | |
104 @param sid(unicode): transport stream id | |
105 @param client: %(doc_client)s | |
106 @param mode(str, None): 'tcp' or 'udp', or None to have no attribute | |
107 @return (domish.Element): parent <transport> element where <candidate> elements must be added | |
108 """ | |
109 proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None) | |
110 transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) | |
111 transport_elt['sid'] = sid | |
112 if proxy is not None: | |
113 transport_elt['dstaddr'] = session_hash | |
114 if mode is not None: | |
115 transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now | |
116 | |
117 for candidate in candidates: | |
118 log.debug(u"Adding candidate: {}".format(candidate)) | |
119 candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B) | |
120 if candidate.id is None: | |
121 candidate.id = unicode(uuid.uuid4()) | |
122 candidate_elt['cid'] = candidate.id | |
123 candidate_elt['host'] = candidate.host | |
124 candidate_elt['jid'] = candidate.jid.full() | |
125 candidate_elt['port'] = unicode(candidate.port) | |
126 candidate_elt['priority'] = unicode(candidate.priority) | |
127 candidate_elt['type'] = candidate.type | |
128 return transport_elt | |
129 | |
130 @defer.inlineCallbacks | |
131 def jingleSessionInit(self, client, session, content_name): | |
132 content_data = session['contents'][content_name] | |
133 transport_data = content_data['transport_data'] | |
134 sid = transport_data['sid'] = unicode(uuid.uuid4()) | |
135 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) | |
136 transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates | |
137 transport_data['stream_d'] = self._s5b.registerHash(client, session_hash, None) | |
138 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(client) | |
139 mode = 'tcp' # XXX: we only manage tcp for now | |
140 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) | |
141 | |
142 defer.returnValue(transport_elt) | |
143 | |
144 def _proxyActivatedCb(self, iq_result_elt, client, candidate, session, content_name): | |
145 """Called when activation confirmation has been received from proxy | |
146 | |
147 cf XEP-0260 § 2.4 | |
148 """ | |
149 # now that the proxy is activated, we have to inform other peer | |
150 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) | |
151 activated_elt = transport_elt.addElement('activated') | |
152 activated_elt['cid'] = candidate.id | |
153 iq_elt.send() | |
154 | |
155 def _proxyActivatedEb(self, stanza_error, client, candidate, session, content_name): | |
156 """Called when activation error has been received from proxy | |
157 | |
158 cf XEP-0260 § 2.4 | |
159 """ | |
160 # TODO: fallback to IBB | |
161 # now that the proxy is activated, we have to inform other peer | |
162 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) | |
163 transport_elt.addElement('proxy-error') | |
164 iq_elt.send() | |
165 log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}" | |
166 .format(reason = stanza_error.value.condition)) | |
167 self.doFallback(session, content_name, client) | |
168 | |
169 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): | |
170 """Called when the best candidate from other peer is found | |
171 | |
172 @param candidate(XEP_0065.Candidate, None): selected candidate, | |
173 or None if no candidate is accessible | |
174 @param session(dict): session data | |
175 @param transport_data(dict): transport data | |
176 @param content_name(unicode): name of the current content | |
177 @param client(unicode): %(doc_client)s | |
178 """ | |
179 | |
180 transport_data['best_candidate'] = candidate | |
181 # we need to disconnect all non selected candidates before removing them | |
182 for c in transport_data['peer_candidates']: | |
183 if c is None or c is candidate: | |
184 continue | |
185 c.discard() | |
186 del transport_data['peer_candidates'] | |
187 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name) | |
188 if candidate is None: | |
189 log.warning(u"Can't connect to any peer candidate") | |
190 candidate_elt = transport_elt.addElement('candidate-error') | |
191 else: | |
192 log.info(u"Found best peer candidate: {}".format(unicode(candidate))) | |
193 candidate_elt = transport_elt.addElement('candidate-used') | |
194 candidate_elt['cid'] = candidate.id | |
195 iq_elt.send() # TODO: check result stanza | |
196 self._checkCandidates(session, content_name, transport_data, client) | |
197 | |
198 def _checkCandidates(self, session, content_name, transport_data, client): | |
199 """Called when a candidate has been choosed | |
200 | |
201 if we have both candidates, we select one, or fallback to an other transport | |
202 @param session(dict): session data | |
203 @param content_name(unicode): name of the current content | |
204 @param transport_data(dict): transport data | |
205 @param client(unicode): %(doc_client)s | |
206 """ | |
207 content_data = session['contents'][content_name] | |
208 try: | |
209 best_candidate = transport_data['best_candidate'] | |
210 except KeyError: | |
211 # we have not our best candidate yet | |
212 return | |
213 try: | |
214 peer_best_candidate = transport_data['peer_best_candidate'] | |
215 except KeyError: | |
216 # we have not peer best candidate yet | |
217 return | |
218 | |
219 # at this point we have both candidates, it's time to choose one | |
220 if best_candidate is None or peer_best_candidate is None: | |
221 choosed_candidate = best_candidate or peer_best_candidate | |
222 else: | |
223 if best_candidate.priority == peer_best_candidate.priority: | |
224 # same priority, we choose initiator one according to XEP-0260 §2.4 #4 | |
225 log.debug(u"Candidates have same priority, we select the one choosed by initiator") | |
226 if session['initiator'] == client.jid: | |
227 choosed_candidate = best_candidate | |
228 else: | |
229 choosed_candidate = peer_best_candidate | |
230 else: | |
231 choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority) | |
232 | |
233 if choosed_candidate is None: | |
234 log.warning(u"Socks5 negociation failed, we need to fallback to IBB") | |
235 self.doFallback(session, content_name, client) | |
236 else: | |
237 if choosed_candidate == peer_best_candidate: | |
238 # peer_best_candidate was choosed from the candidates we have sent | |
239 # so our_candidate is true if choosed_candidate is peer_best_candidate | |
240 our_candidate = True | |
241 # than also mean that best_candidate must be discarded ! | |
242 try: | |
243 best_candidate.discard() | |
244 except AttributeError: # but it can be None | |
245 pass | |
246 else: | |
247 our_candidate = False | |
248 | |
249 log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format( | |
250 who = u'our' if our_candidate else u'other peer', | |
251 candidate = choosed_candidate)) | |
252 del transport_data['best_candidate'] | |
253 del transport_data['peer_best_candidate'] | |
254 | |
255 if choosed_candidate.type == self._s5b.TYPE_PROXY: | |
256 # the stream transfer need to wait for proxy activation | |
257 # (see XEP-0260 § 2.4) | |
258 if our_candidate: | |
259 d = self._s5b.connectCandidate(client, choosed_candidate, transport_data['session_hash']) | |
260 d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client)) | |
261 args = [client, choosed_candidate, session, content_name] | |
262 d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args) | |
263 else: | |
264 # this Deferred will be called when we'll receive activation confirmation from other peer | |
265 d = transport_data['activation_d'] = defer.Deferred() | |
266 else: | |
267 d = defer.succeed(None) | |
268 | |
269 if content_data['senders'] == session['role']: | |
270 # we can now start the stream transfer (or start it after proxy activation) | |
271 d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash'])) | |
272 d.addErrback(self._startEb, session, content_name, client) | |
273 | |
274 def _startEb(self, fail, session, content_name, client): | |
275 """Called when it's not possible to start the transfer | |
276 | |
277 Will try to fallback to IBB | |
278 """ | |
279 try: | |
280 reason = unicode(fail.value) | |
281 except AttributeError: | |
282 reason = unicode(fail) | |
283 log.warning(u"Cant start transfert, we'll try fallback method: {}".format(reason)) | |
284 self.doFallback(session, content_name, client) | |
285 | |
286 def _candidateInfo(self, candidate_elt, session, content_name, transport_data, client): | |
287 """Called when best candidate has been received from peer (or if none is working) | |
288 | |
289 @param candidate_elt(domish.Element): candidate-used or candidate-error element | |
290 (see XEP-0260 §2.3) | |
291 @param session(dict): session data | |
292 @param content_name(unicode): name of the current content | |
293 @param transport_data(dict): transport data | |
294 @param client(unicode): %(doc_client)s | |
295 """ | |
296 if candidate_elt.name == 'candidate-error': | |
297 # candidate-error, no candidate worked | |
298 transport_data['peer_best_candidate'] = None | |
299 else: | |
300 # candidate-used, one candidate was choosed | |
301 try: | |
302 cid = candidate_elt.attributes['cid'] | |
303 except KeyError: | |
304 log.warning(u"No cid found in <candidate-used>") | |
305 raise exceptions.DataError | |
306 try: | |
307 candidate = (c for c in transport_data['candidates'] if c.id == cid).next() | |
308 except StopIteration: | |
309 log.warning(u"Given cid doesn't correspond to any known candidate !") | |
310 raise exceptions.DataError # TODO: send an error to other peer, and use better exception | |
311 except KeyError: | |
312 # a transport-info can also be intentionaly sent too early by other peer | |
313 # but there is little probability | |
314 log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point') | |
315 raise exceptions.InternalError | |
316 # at this point we have the candidate choosed by other peer | |
317 transport_data['peer_best_candidate'] = candidate | |
318 log.info(u"Other peer best candidate: {}".format(candidate)) | |
319 | |
320 del transport_data['candidates'] | |
321 self._checkCandidates(session, content_name, transport_data, client) | |
322 | |
323 def _proxyActivationInfo(self, proxy_elt, session, content_name, transport_data, client): | |
324 """Called when proxy has been activated (or has sent an error) | |
325 | |
326 @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element | |
327 (see XEP-0260 §2.4) | |
328 @param session(dict): session data | |
329 @param content_name(unicode): name of the current content | |
330 @param transport_data(dict): transport data | |
331 @param client(unicode): %(doc_client)s | |
332 """ | |
333 try: | |
334 activation_d = transport_data.pop('activation_d') | |
335 except KeyError: | |
336 log.warning(u"Received unexpected transport-info for proxy activation") | |
337 | |
338 if proxy_elt.name == 'activated': | |
339 activation_d.callback(None) | |
340 else: | |
341 activation_d.errback(ProxyError()) | |
342 | |
343 @defer.inlineCallbacks | |
344 def jingleHandler(self, client, action, session, content_name, transport_elt): | |
345 content_data = session['contents'][content_name] | |
346 transport_data = content_data['transport_data'] | |
347 | |
348 if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): | |
349 pass | |
350 | |
351 elif action == self._j.A_SESSION_ACCEPT: | |
352 # initiator side, we select a candidate in the ones sent by responder | |
353 assert 'peer_candidates' not in transport_data | |
354 transport_data['peer_candidates'] = self._parseCandidates(transport_elt) | |
355 | |
356 elif action == self._j.A_START: | |
357 session_hash = transport_data['session_hash'] | |
358 peer_candidates = transport_data['peer_candidates'] | |
359 stream_object = content_data['stream_object'] | |
360 self._s5b.associateStreamObject(client, session_hash, stream_object) | |
361 stream_d = transport_data.pop('stream_d') | |
362 stream_d.chainDeferred(content_data['finished_d']) | |
363 peer_session_hash = transport_data['peer_session_hash'] | |
364 d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash) | |
365 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) | |
366 | |
367 elif action == self._j.A_SESSION_INITIATE: | |
368 # responder side, we select a candidate in the ones sent by initiator | |
369 # and we give our candidates | |
370 assert 'peer_candidates' not in transport_data | |
371 sid = transport_data['sid'] = transport_elt['sid'] | |
372 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid) | |
373 peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates | |
374 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) | |
375 stream_object = content_data['stream_object'] | |
376 stream_d = self._s5b.registerHash(client, session_hash, stream_object) | |
377 stream_d.chainDeferred(content_data['finished_d']) | |
378 d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash) | |
379 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) | |
380 candidates = yield self._s5b.getCandidates(client) | |
381 # we remove duplicate candidates | |
382 candidates = [candidate for candidate in candidates if candidate not in peer_candidates] | |
383 | |
384 transport_data['candidates'] = candidates | |
385 # we can now build a new <transport> element with our candidates | |
386 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client) | |
387 | |
388 elif action == self._j.A_TRANSPORT_INFO: | |
389 # transport-info can be about candidate or proxy activation | |
390 candidate_elt = None | |
391 | |
392 for method, names in ((self._candidateInfo, ('candidate-used', 'candidate-error')), | |
393 (self._proxyActivationInfo, ('activated', 'proxy-error'))): | |
394 for name in names: | |
395 try: | |
396 candidate_elt = transport_elt.elements(NS_JINGLE_S5B, name).next() | |
397 except StopIteration: | |
398 continue | |
399 else: | |
400 method(candidate_elt, session, content_name, transport_data, client) | |
401 break | |
402 | |
403 if candidate_elt is None: | |
404 log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml())) | |
405 elif action == self._j.A_DESTROY: | |
406 # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session. | |
407 # note that sid argument is not necessary for sessions created by this plugin | |
408 self._s5b.killSession(None, transport_data['session_hash'], None, client) | |
409 else: | |
410 log.warning(u"FIXME: unmanaged action {}".format(action)) | |
411 | |
412 defer.returnValue(transport_elt) | |
413 | |
414 def jingleTerminate(self, client, action, session, content_name, reason_elt): | |
415 if reason_elt.decline: | |
416 log.debug(u"Session declined, deleting S5B session") | |
417 # we just need to clean the S5B session if it is declined | |
418 content_data = session['contents'][content_name] | |
419 transport_data = content_data['transport_data'] | |
420 self._s5b.killSession(None, transport_data['session_hash'], None, client) | |
421 | |
422 def _doFallback(self, feature_checked, session, content_name, client): | |
423 """Do the fallback, method called once feature is checked | |
424 | |
425 @param feature_checked(bool): True if other peer can do IBB | |
426 """ | |
427 if not feature_checked: | |
428 log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session") | |
429 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) | |
430 else: | |
431 self._j.transportReplace(client, self._jingle_ibb.NAMESPACE, session, content_name) | |
432 | |
433 def doFallback(self, session, content_name, client): | |
434 """Fallback to IBB transport, used in last resort | |
435 | |
436 @param session(dict): session data | |
437 @param content_name(unicode): name of the current content | |
438 @param client(unicode): %(doc_client)s | |
439 """ | |
440 if session['role'] != self._j.ROLE_INITIATOR: | |
441 # only initiator must do the fallback, see XEP-0260 §3 | |
442 return | |
443 if self._jingle_ibb is None: | |
444 log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session") | |
445 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) | |
446 else: | |
447 d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid']) | |
448 d.addCallback(self._doFallback, session, content_name, client) | |
449 return d | |
450 | |
451 | |
452 class XEP_0260_handler(XMPPHandler): | |
453 implements(iwokkel.IDisco) | |
454 | |
455 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | |
456 return [disco.DiscoFeature(NS_JINGLE_S5B)] | |
457 | |
458 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | |
459 return [] |