comparison sat/plugins/plugin_xep_0260.py @ 2562:26edcf3a30eb

core, setup: huge cleaning: - moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention - move twisted directory to root - removed all hacks from setup.py, and added missing dependencies, it is now clean - use https URL for website in setup.py - removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed - renamed sat.sh to sat and fixed its installation - added python_requires to specify Python version needed - replaced glib2reactor which use deprecated code by gtk3reactor sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author Goffi <goffi@goffi.org>
date Mon, 02 Apr 2018 19:44:50 +0200
parents src/plugins/plugin_xep_0260.py@67cc54b01a12
children 56f94936df1e
comparison
equal deleted inserted replaced
2561:bd30dc3ffe5a 2562:26edcf3a30eb
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SAT plugin for Jingle (XEP-0260)
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger
23 log = getLogger(__name__)
24 from sat.core import exceptions
25 from wokkel import disco, iwokkel
26 from zope.interface import implements
27 from twisted.words.xish import domish
28 from twisted.words.protocols.jabber import jid
29 from twisted.internet import defer
30 import uuid
31
32 try:
33 from twisted.words.protocols.xmlstream import XMPPHandler
34 except ImportError:
35 from wokkel.subprotocols import XMPPHandler
36
37
38 NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1'
39
40 PLUGIN_INFO = {
41 C.PI_NAME: "Jingle SOCKS5 Bytestreams",
42 C.PI_IMPORT_NAME: "XEP-0260",
43 C.PI_TYPE: "XEP",
44 C.PI_MODES: C.PLUG_MODE_BOTH,
45 C.PI_PROTOCOLS: ["XEP-0260"],
46 C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"],
47 C.PI_RECOMMENDATIONS: ["XEP-0261"], # needed for fallback
48 C.PI_MAIN: "XEP_0260",
49 C.PI_HANDLER: "yes",
50 C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams""")
51 }
52
53
54 class ProxyError(Exception):
55
56 def __str__(self):
57 return "an error happened while trying to use the proxy"
58
59
60 class XEP_0260(object):
61 # TODO: udp handling
62
63 def __init__(self, host):
64 log.info(_("plugin Jingle SOCKS5 Bytestreams"))
65 self.host = host
66 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
67 self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream
68 try:
69 self._jingle_ibb = host.plugins["XEP-0261"]
70 except KeyError:
71 self._jingle_ibb = None
72 self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100)
73
74 def getHandler(self, client):
75 return XEP_0260_handler()
76
77 def _parseCandidates(self, transport_elt):
78 """Parse <candidate> elements
79
80 @param transport_elt(domish.Element): parent <transport> element
81 @return (list[plugin_xep_0065.Candidate): list of parsed candidates
82 """
83 candidates = []
84 for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'):
85 try:
86 cid = candidate_elt['cid']
87 host = candidate_elt['host']
88 jid_= jid.JID(candidate_elt['jid'])
89 port = int(candidate_elt.getAttribute('port', 1080))
90 priority = int(candidate_elt['priority'])
91 type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT)
92 except (KeyError, ValueError):
93 raise exceptions.DataError()
94 candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid)
95 candidates.append(candidate)
96 # self._s5b.registerCandidate(candidate)
97 return candidates
98
99 def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None):
100 """Build <transport> element with candidates
101
102 @param session(dict): jingle session data
103 @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add
104 @param sid(unicode): transport stream id
105 @param client: %(doc_client)s
106 @param mode(str, None): 'tcp' or 'udp', or None to have no attribute
107 @return (domish.Element): parent <transport> element where <candidate> elements must be added
108 """
109 proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None)
110 transport_elt = domish.Element((NS_JINGLE_S5B, "transport"))
111 transport_elt['sid'] = sid
112 if proxy is not None:
113 transport_elt['dstaddr'] = session_hash
114 if mode is not None:
115 transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now
116
117 for candidate in candidates:
118 log.debug(u"Adding candidate: {}".format(candidate))
119 candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B)
120 if candidate.id is None:
121 candidate.id = unicode(uuid.uuid4())
122 candidate_elt['cid'] = candidate.id
123 candidate_elt['host'] = candidate.host
124 candidate_elt['jid'] = candidate.jid.full()
125 candidate_elt['port'] = unicode(candidate.port)
126 candidate_elt['priority'] = unicode(candidate.priority)
127 candidate_elt['type'] = candidate.type
128 return transport_elt
129
130 @defer.inlineCallbacks
131 def jingleSessionInit(self, client, session, content_name):
132 content_data = session['contents'][content_name]
133 transport_data = content_data['transport_data']
134 sid = transport_data['sid'] = unicode(uuid.uuid4())
135 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
136 transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
137 transport_data['stream_d'] = self._s5b.registerHash(client, session_hash, None)
138 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(client)
139 mode = 'tcp' # XXX: we only manage tcp for now
140 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode)
141
142 defer.returnValue(transport_elt)
143
144 def _proxyActivatedCb(self, iq_result_elt, client, candidate, session, content_name):
145 """Called when activation confirmation has been received from proxy
146
147 cf XEP-0260 § 2.4
148 """
149 # now that the proxy is activated, we have to inform other peer
150 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
151 activated_elt = transport_elt.addElement('activated')
152 activated_elt['cid'] = candidate.id
153 iq_elt.send()
154
155 def _proxyActivatedEb(self, stanza_error, client, candidate, session, content_name):
156 """Called when activation error has been received from proxy
157
158 cf XEP-0260 § 2.4
159 """
160 # TODO: fallback to IBB
161 # now that the proxy is activated, we have to inform other peer
162 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
163 transport_elt.addElement('proxy-error')
164 iq_elt.send()
165 log.warning(u"Can't activate proxy, we need to fallback to IBB: {reason}"
166 .format(reason = stanza_error.value.condition))
167 self.doFallback(session, content_name, client)
168
169 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client):
170 """Called when the best candidate from other peer is found
171
172 @param candidate(XEP_0065.Candidate, None): selected candidate,
173 or None if no candidate is accessible
174 @param session(dict): session data
175 @param transport_data(dict): transport data
176 @param content_name(unicode): name of the current content
177 @param client(unicode): %(doc_client)s
178 """
179
180 transport_data['best_candidate'] = candidate
181 # we need to disconnect all non selected candidates before removing them
182 for c in transport_data['peer_candidates']:
183 if c is None or c is candidate:
184 continue
185 c.discard()
186 del transport_data['peer_candidates']
187 iq_elt, transport_elt = self._j.buildAction(client, self._j.A_TRANSPORT_INFO, session, content_name)
188 if candidate is None:
189 log.warning(u"Can't connect to any peer candidate")
190 candidate_elt = transport_elt.addElement('candidate-error')
191 else:
192 log.info(u"Found best peer candidate: {}".format(unicode(candidate)))
193 candidate_elt = transport_elt.addElement('candidate-used')
194 candidate_elt['cid'] = candidate.id
195 iq_elt.send() # TODO: check result stanza
196 self._checkCandidates(session, content_name, transport_data, client)
197
198 def _checkCandidates(self, session, content_name, transport_data, client):
199 """Called when a candidate has been choosed
200
201 if we have both candidates, we select one, or fallback to an other transport
202 @param session(dict): session data
203 @param content_name(unicode): name of the current content
204 @param transport_data(dict): transport data
205 @param client(unicode): %(doc_client)s
206 """
207 content_data = session['contents'][content_name]
208 try:
209 best_candidate = transport_data['best_candidate']
210 except KeyError:
211 # we have not our best candidate yet
212 return
213 try:
214 peer_best_candidate = transport_data['peer_best_candidate']
215 except KeyError:
216 # we have not peer best candidate yet
217 return
218
219 # at this point we have both candidates, it's time to choose one
220 if best_candidate is None or peer_best_candidate is None:
221 choosed_candidate = best_candidate or peer_best_candidate
222 else:
223 if best_candidate.priority == peer_best_candidate.priority:
224 # same priority, we choose initiator one according to XEP-0260 §2.4 #4
225 log.debug(u"Candidates have same priority, we select the one choosed by initiator")
226 if session['initiator'] == client.jid:
227 choosed_candidate = best_candidate
228 else:
229 choosed_candidate = peer_best_candidate
230 else:
231 choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority)
232
233 if choosed_candidate is None:
234 log.warning(u"Socks5 negociation failed, we need to fallback to IBB")
235 self.doFallback(session, content_name, client)
236 else:
237 if choosed_candidate == peer_best_candidate:
238 # peer_best_candidate was choosed from the candidates we have sent
239 # so our_candidate is true if choosed_candidate is peer_best_candidate
240 our_candidate = True
241 # than also mean that best_candidate must be discarded !
242 try:
243 best_candidate.discard()
244 except AttributeError: # but it can be None
245 pass
246 else:
247 our_candidate = False
248
249 log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
250 who = u'our' if our_candidate else u'other peer',
251 candidate = choosed_candidate))
252 del transport_data['best_candidate']
253 del transport_data['peer_best_candidate']
254
255 if choosed_candidate.type == self._s5b.TYPE_PROXY:
256 # the stream transfer need to wait for proxy activation
257 # (see XEP-0260 § 2.4)
258 if our_candidate:
259 d = self._s5b.connectCandidate(client, choosed_candidate, transport_data['session_hash'])
260 d.addCallback(lambda dummy: choosed_candidate.activate(transport_data['sid'], session['peer_jid'], client))
261 args = [client, choosed_candidate, session, content_name]
262 d.addCallbacks(self._proxyActivatedCb, self._proxyActivatedEb, args, None, args)
263 else:
264 # this Deferred will be called when we'll receive activation confirmation from other peer
265 d = transport_data['activation_d'] = defer.Deferred()
266 else:
267 d = defer.succeed(None)
268
269 if content_data['senders'] == session['role']:
270 # we can now start the stream transfer (or start it after proxy activation)
271 d.addCallback(lambda dummy: choosed_candidate.startTransfer(transport_data['session_hash']))
272 d.addErrback(self._startEb, session, content_name, client)
273
274 def _startEb(self, fail, session, content_name, client):
275 """Called when it's not possible to start the transfer
276
277 Will try to fallback to IBB
278 """
279 try:
280 reason = unicode(fail.value)
281 except AttributeError:
282 reason = unicode(fail)
283 log.warning(u"Cant start transfert, we'll try fallback method: {}".format(reason))
284 self.doFallback(session, content_name, client)
285
286 def _candidateInfo(self, candidate_elt, session, content_name, transport_data, client):
287 """Called when best candidate has been received from peer (or if none is working)
288
289 @param candidate_elt(domish.Element): candidate-used or candidate-error element
290 (see XEP-0260 §2.3)
291 @param session(dict): session data
292 @param content_name(unicode): name of the current content
293 @param transport_data(dict): transport data
294 @param client(unicode): %(doc_client)s
295 """
296 if candidate_elt.name == 'candidate-error':
297 # candidate-error, no candidate worked
298 transport_data['peer_best_candidate'] = None
299 else:
300 # candidate-used, one candidate was choosed
301 try:
302 cid = candidate_elt.attributes['cid']
303 except KeyError:
304 log.warning(u"No cid found in <candidate-used>")
305 raise exceptions.DataError
306 try:
307 candidate = (c for c in transport_data['candidates'] if c.id == cid).next()
308 except StopIteration:
309 log.warning(u"Given cid doesn't correspond to any known candidate !")
310 raise exceptions.DataError # TODO: send an error to other peer, and use better exception
311 except KeyError:
312 # a transport-info can also be intentionaly sent too early by other peer
313 # but there is little probability
314 log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point')
315 raise exceptions.InternalError
316 # at this point we have the candidate choosed by other peer
317 transport_data['peer_best_candidate'] = candidate
318 log.info(u"Other peer best candidate: {}".format(candidate))
319
320 del transport_data['candidates']
321 self._checkCandidates(session, content_name, transport_data, client)
322
323 def _proxyActivationInfo(self, proxy_elt, session, content_name, transport_data, client):
324 """Called when proxy has been activated (or has sent an error)
325
326 @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element
327 (see XEP-0260 §2.4)
328 @param session(dict): session data
329 @param content_name(unicode): name of the current content
330 @param transport_data(dict): transport data
331 @param client(unicode): %(doc_client)s
332 """
333 try:
334 activation_d = transport_data.pop('activation_d')
335 except KeyError:
336 log.warning(u"Received unexpected transport-info for proxy activation")
337
338 if proxy_elt.name == 'activated':
339 activation_d.callback(None)
340 else:
341 activation_d.errback(ProxyError())
342
343 @defer.inlineCallbacks
344 def jingleHandler(self, client, action, session, content_name, transport_elt):
345 content_data = session['contents'][content_name]
346 transport_data = content_data['transport_data']
347
348 if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
349 pass
350
351 elif action == self._j.A_SESSION_ACCEPT:
352 # initiator side, we select a candidate in the ones sent by responder
353 assert 'peer_candidates' not in transport_data
354 transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
355
356 elif action == self._j.A_START:
357 session_hash = transport_data['session_hash']
358 peer_candidates = transport_data['peer_candidates']
359 stream_object = content_data['stream_object']
360 self._s5b.associateStreamObject(client, session_hash, stream_object)
361 stream_d = transport_data.pop('stream_d')
362 stream_d.chainDeferred(content_data['finished_d'])
363 peer_session_hash = transport_data['peer_session_hash']
364 d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash)
365 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
366
367 elif action == self._j.A_SESSION_INITIATE:
368 # responder side, we select a candidate in the ones sent by initiator
369 # and we give our candidates
370 assert 'peer_candidates' not in transport_data
371 sid = transport_data['sid'] = transport_elt['sid']
372 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['peer_jid'], sid)
373 peer_session_hash = transport_data['peer_session_hash'] = self._s5b.getSessionHash(session['peer_jid'], client.jid, sid) # requester and target are inversed for peer candidates
374 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
375 stream_object = content_data['stream_object']
376 stream_d = self._s5b.registerHash(client, session_hash, stream_object)
377 stream_d.chainDeferred(content_data['finished_d'])
378 d = self._s5b.getBestCandidate(client, peer_candidates, session_hash, peer_session_hash)
379 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
380 candidates = yield self._s5b.getCandidates(client)
381 # we remove duplicate candidates
382 candidates = [candidate for candidate in candidates if candidate not in peer_candidates]
383
384 transport_data['candidates'] = candidates
385 # we can now build a new <transport> element with our candidates
386 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client)
387
388 elif action == self._j.A_TRANSPORT_INFO:
389 # transport-info can be about candidate or proxy activation
390 candidate_elt = None
391
392 for method, names in ((self._candidateInfo, ('candidate-used', 'candidate-error')),
393 (self._proxyActivationInfo, ('activated', 'proxy-error'))):
394 for name in names:
395 try:
396 candidate_elt = transport_elt.elements(NS_JINGLE_S5B, name).next()
397 except StopIteration:
398 continue
399 else:
400 method(candidate_elt, session, content_name, transport_data, client)
401 break
402
403 if candidate_elt is None:
404 log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml()))
405 elif action == self._j.A_DESTROY:
406 # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session.
407 # note that sid argument is not necessary for sessions created by this plugin
408 self._s5b.killSession(None, transport_data['session_hash'], None, client)
409 else:
410 log.warning(u"FIXME: unmanaged action {}".format(action))
411
412 defer.returnValue(transport_elt)
413
414 def jingleTerminate(self, client, action, session, content_name, reason_elt):
415 if reason_elt.decline:
416 log.debug(u"Session declined, deleting S5B session")
417 # we just need to clean the S5B session if it is declined
418 content_data = session['contents'][content_name]
419 transport_data = content_data['transport_data']
420 self._s5b.killSession(None, transport_data['session_hash'], None, client)
421
422 def _doFallback(self, feature_checked, session, content_name, client):
423 """Do the fallback, method called once feature is checked
424
425 @param feature_checked(bool): True if other peer can do IBB
426 """
427 if not feature_checked:
428 log.warning(u"Other peer can't manage jingle IBB, be have to terminate the session")
429 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
430 else:
431 self._j.transportReplace(client, self._jingle_ibb.NAMESPACE, session, content_name)
432
433 def doFallback(self, session, content_name, client):
434 """Fallback to IBB transport, used in last resort
435
436 @param session(dict): session data
437 @param content_name(unicode): name of the current content
438 @param client(unicode): %(doc_client)s
439 """
440 if session['role'] != self._j.ROLE_INITIATOR:
441 # only initiator must do the fallback, see XEP-0260 §3
442 return
443 if self._jingle_ibb is None:
444 log.warning(u"Jingle IBB (XEP-0261) plugin is not available, we have to close the session")
445 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
446 else:
447 d = self.host.hasFeature(client, self._jingle_ibb.NAMESPACE, session['peer_jid'])
448 d.addCallback(self._doFallback, session, content_name, client)
449 return d
450
451
452 class XEP_0260_handler(XMPPHandler):
453 implements(iwokkel.IDisco)
454
455 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
456 return [disco.DiscoFeature(NS_JINGLE_S5B)]
457
458 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
459 return []