1560
|
1 #!/usr/bin/python |
|
2 # -*- coding: utf-8 -*- |
|
3 |
|
4 # SAT plugin for Jingle (XEP-0260) |
|
5 # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) |
|
6 |
|
7 # This program is free software: you can redistribute it and/or modify |
|
8 # it under the terms of the GNU Affero General Public License as published by |
|
9 # the Free Software Foundation, either version 3 of the License, or |
|
10 # (at your option) any later version. |
|
11 |
|
12 # This program is distributed in the hope that it will be useful, |
|
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
15 # GNU Affero General Public License for more details. |
|
16 |
|
17 # You should have received a copy of the GNU Affero General Public License |
|
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
19 |
|
20 from sat.core.i18n import _ |
|
21 from sat.core.log import getLogger |
|
22 log = getLogger(__name__) |
|
23 from sat.core import exceptions |
|
24 from wokkel import disco, iwokkel |
|
25 from zope.interface import implements |
|
26 from twisted.words.xish import domish |
|
27 from twisted.words.protocols.jabber import jid |
|
28 from twisted.internet import defer |
|
29 import uuid |
|
30 |
|
31 try: |
|
32 from twisted.words.protocols.xmlstream import XMPPHandler |
|
33 except ImportError: |
|
34 from wokkel.subprotocols import XMPPHandler |
|
35 |
|
36 |
|
37 NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1' |
|
38 |
|
39 PLUGIN_INFO = { |
|
40 "name": "Jingle SOCKS5 Bytestreams", |
|
41 "import_name": "XEP-0260", |
|
42 "type": "XEP", |
|
43 "protocols": ["XEP-0260"], |
|
44 "dependencies": ["XEP-0166", "XEP-0065"], |
|
45 "main": "XEP_0260", |
|
46 "handler": "yes", |
|
47 "description": _("""Implementation of Jingle SOCKS5 Bytestreams""") |
|
48 } |
|
49 |
|
50 |
|
51 class XEP_0260(object): |
|
52 # TODO: udp handling |
|
53 |
|
54 def __init__(self, host): |
|
55 log.info(_("plugin Jingle SOCKS5 Bytestreams")) |
|
56 self.host = host |
|
57 self._j = host.plugins["XEP-0166"] # shortcut to access jingle |
|
58 self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream |
|
59 self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) |
|
60 |
|
61 def getHandler(self, profile): |
|
62 return XEP_0260_handler() |
|
63 |
|
64 def _parseCandidates(self, transport_elt): |
|
65 """Parse <candidate> elements |
|
66 |
|
67 @param transport_elt(domish.Element): parent <transport> element |
|
68 @return (list[plugin_xep_0065.Candidate): list of parsed candidates |
|
69 """ |
|
70 candidates = [] |
|
71 for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'): |
|
72 try: |
|
73 cid = candidate_elt['cid'] |
|
74 host = candidate_elt['host'] |
|
75 jid_= jid.JID(candidate_elt['jid']) |
|
76 port = int(candidate_elt.getAttribute('port', 1080)) |
|
77 priority = int(candidate_elt['priority']) |
|
78 type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT) |
|
79 except (KeyError, ValueError): |
|
80 raise exceptions.DataError() |
|
81 candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) |
|
82 candidates.append(candidate) |
|
83 # self._s5b.registerCandidate(candidate) |
|
84 return candidates |
|
85 |
|
86 def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None): |
|
87 """Build <transport> element with candidates |
|
88 |
|
89 @param session(dict): jingle session data |
|
90 @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add |
|
91 @param sid(unicode): transport stream id |
|
92 @param client: %(doc_client)s |
|
93 @param mode(str, None): 'tcp' or 'udp', or None to have no attribute |
|
94 @return (domish.Element): parent <transport> element where <candidate> elements must be added |
|
95 """ |
|
96 proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None) |
|
97 transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) |
|
98 transport_elt['sid'] = sid |
|
99 if proxy is not None: |
|
100 transport_elt['dstaddr'] = session_hash |
|
101 if mode is not None: |
|
102 transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now |
|
103 |
|
104 for candidate in candidates: |
|
105 log.debug(u"Adding candidate: {}".format(candidate)) |
|
106 candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B) |
|
107 if candidate.id is None: |
|
108 candidate.id = unicode(uuid.uuid4()) |
|
109 candidate_elt['cid'] = candidate.id |
|
110 candidate_elt['host'] = candidate.host |
|
111 candidate_elt['jid'] = candidate.jid.full() |
|
112 candidate_elt['port'] = unicode(candidate.port) |
|
113 candidate_elt['priority'] = unicode(candidate.priority) |
|
114 candidate_elt['type'] = candidate.type |
|
115 return transport_elt |
|
116 |
|
117 @defer.inlineCallbacks |
|
118 def jingleSessionInit(self, session, content_name, profile): |
|
119 client = self.host.getClient(profile) |
|
120 content_data = session['contents'][content_name] |
|
121 transport_data = content_data['transport_data'] |
|
122 sid = transport_data['sid'] = unicode(uuid.uuid4()) |
|
123 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['to_jid'], sid) |
|
124 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) |
|
125 mode = 'tcp' # XXX: we only manage tcp for now |
|
126 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) |
|
127 |
|
128 defer.returnValue(transport_elt) |
|
129 |
|
130 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): |
|
131 """Called when the best candidate from other peer is found |
|
132 |
|
133 @param candidate(XEP_0065.Candidate, None): selected candidate, |
|
134 or None if no candidate is accessible |
|
135 @param session(dict): session data |
|
136 @param transport_data(dict): transport data |
|
137 @param content_name(dict): name of the current content |
|
138 @param client(unicode): %(doc_client)s |
|
139 """ |
|
140 |
|
141 transport_data['best_candidate'] = candidate |
|
142 # we need to disconnect all non selected candidates before removing them |
|
143 for c in transport_data['peer_candidates']: |
|
144 if c is None or c is candidate: |
|
145 continue |
|
146 c.discard() |
|
147 del transport_data['peer_candidates'] |
|
148 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) |
|
149 if candidate is None: |
|
150 log.warning(u"Can't connect to any peer candidate") |
|
151 candidate_elt = transport_elt.addElement('candidate-error') |
|
152 else: |
|
153 log.info(u"Found best peer candidate: {}".format(unicode(candidate))) |
|
154 candidate_elt = transport_elt.addElement('candidate-used') |
|
155 candidate_elt['cid'] = candidate.id |
|
156 iq_elt.send() # TODO: check result stanza |
|
157 content_data = session['contents'][content_name] |
|
158 self._checkCandidates(session, content_data, transport_data, client) |
|
159 |
|
160 def _checkCandidates(self, session, content_data, transport_data, client): |
|
161 """Called when a candidate has been choosed |
|
162 |
|
163 if we have both candidates, we select one, or fallback to an other transport |
|
164 @param session(dict): session data |
|
165 @param content_data(dict): content data |
|
166 @param transport_data(dict): transport data |
|
167 @param client(unicode): %(doc_client)s |
|
168 """ |
|
169 try: |
|
170 best_candidate = transport_data['best_candidate'] |
|
171 except KeyError: |
|
172 # we have not our best candidate yet |
|
173 return |
|
174 try: |
|
175 peer_best_candidate = transport_data['peer_best_candidate'] |
|
176 except KeyError: |
|
177 # we have not peer best candidate yet |
|
178 return |
|
179 |
|
180 # at this point we have both candidates, it's time to choose one |
|
181 if best_candidate is None or peer_best_candidate is None: |
|
182 choosed_candidate = best_candidate or peer_best_candidate |
|
183 else: |
|
184 if best_candidate.priority == peer_best_candidate.priority: |
|
185 # same priority, we choose initiator one according to XEP-0260 §2.4 #4 |
|
186 log.debug(u"Candidates have same priority, we choose the initiator one") |
|
187 if session['initiator'] == client.jid: |
|
188 choosed_candidate = best_candidate |
|
189 else: |
|
190 choosed_candidate = peer_best_candidate |
|
191 else: |
|
192 choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority) |
|
193 |
|
194 if choosed_candidate is None: |
|
195 log.warning(u"Socks5 negociation failed, we need to fallback to IBB") |
|
196 else: |
|
197 if choosed_candidate==best_candidate: |
|
198 who = u'our' |
|
199 else: |
|
200 who = u'other peer' |
|
201 best_candidate.discard() |
|
202 |
|
203 log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format( |
|
204 who = who, |
|
205 candidate = choosed_candidate)) |
|
206 del transport_data['best_candidate'] |
|
207 del transport_data['peer_best_candidate'] |
|
208 if content_data['senders'] == session['role']: |
|
209 # we can now start the file transfer |
|
210 choosed_candidate.startTransfer(transport_data['session_hash']) |
|
211 |
|
212 @defer.inlineCallbacks |
|
213 def jingleHandler(self, action, session, content_name, transport_elt, profile): |
|
214 client = self.host.getClient(profile) |
|
215 content_data = session['contents'][content_name] |
|
216 transport_data = content_data['transport_data'] |
|
217 |
|
218 if action in (self._j.A_ACCEPTED_ACK,): |
|
219 pass |
|
220 |
|
221 elif action == self._j.A_SESSION_ACCEPT: |
|
222 # initiator side, we select a candidate in the ones sent by responder |
|
223 assert 'peer_candidates' not in transport_data |
|
224 transport_data['peer_candidates'] = self._parseCandidates(transport_elt) |
|
225 |
|
226 # elif action == self._j.A_START: |
|
227 elif action == self._j.A_START: |
|
228 session_hash = transport_data['session_hash'] |
|
229 peer_candidates = transport_data['peer_candidates'] |
|
230 file_obj = content_data['file_obj'] |
|
231 stream_d = self._s5b.registerHash(session_hash, file_obj, profile) |
|
232 args = [session, content_name, profile] |
|
233 stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args) |
|
234 d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) |
|
235 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) |
|
236 |
|
237 elif action == self._j.A_SESSION_INITIATE: |
|
238 # responder side, we select a candidate in the ones sent by initiator |
|
239 # and we give our candidates |
|
240 assert 'peer_candidates' not in transport_data |
|
241 sid = transport_data['sid'] = transport_elt['sid'] |
|
242 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(session['to_jid'], client.jid, sid) |
|
243 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) |
|
244 file_obj = content_data['file_obj'] |
|
245 stream_d = self._s5b.registerHash(session_hash, file_obj, profile) |
|
246 args = [session, content_name, profile] |
|
247 stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args) |
|
248 d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) |
|
249 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) |
|
250 candidates = yield self._s5b.getCandidates(profile) |
|
251 # we remove duplicate candidates |
|
252 candidates = [candidate for candidate in candidates if candidate not in peer_candidates] |
|
253 |
|
254 transport_data['candidates'] = candidates |
|
255 # we can now build a new <transport> element with our candidates |
|
256 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client) |
|
257 |
|
258 elif action == self._j.A_TRANSPORT_INFO: |
|
259 # other peer gave us its choosed candidate |
|
260 try: |
|
261 candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-used').next() |
|
262 except StopIteration: |
|
263 try: |
|
264 candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-error').next() |
|
265 except StopIteration: |
|
266 log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml())) |
|
267 raise exceptions.DataError |
|
268 else: |
|
269 # candidate-error, no candidate worked |
|
270 transport_data['peer_best_candidate'] = None |
|
271 else: |
|
272 # candidate-used, one candidate was choosed |
|
273 try: |
|
274 cid = candidate_elt.attributes['cid'] |
|
275 except KeyError: |
|
276 log.warning(u"No cid found in <candidate-used>") |
|
277 raise exceptions.DataError |
|
278 try: |
|
279 candidate = (c for c in transport_data['candidates'] if c.id == cid).next() |
|
280 except StopIteration: |
|
281 log.warning(u"Given cid doesn't correspond to any known candidate !") |
|
282 raise exceptions.DataError # TODO: send an error to other peer, and use better exception |
|
283 except KeyError: |
|
284 # a transport-info can also be intentionaly sent too early by other peer |
|
285 # but there is little probability |
|
286 log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point') |
|
287 raise exceptions.InternalError |
|
288 # at this point we have the candidate choosed by other peer |
|
289 transport_data['peer_best_candidate'] = candidate |
|
290 log.info(u"Other peer best candidate: {}".format(candidate)) |
|
291 |
|
292 del transport_data['candidates'] |
|
293 self._checkCandidates(session, content_data, transport_data, client) |
|
294 |
|
295 else: |
|
296 log.warning(u"FIXME: unmanaged action {}".format(action)) |
|
297 |
|
298 defer.returnValue(transport_elt) |
|
299 |
|
300 def _streamCb(self, dummy, session, content_name, profile): |
|
301 self._j.contentTerminate(session, content_name, profile=profile) |
|
302 |
|
303 def _streamEb(self, failure, session, content_name, profile): |
|
304 log.warning(u"Error while streaming through s5b: {}".format(failure)) |
|
305 self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile) |
|
306 |
|
307 |
|
308 class XEP_0260_handler(XMPPHandler): |
|
309 implements(iwokkel.IDisco) |
|
310 |
|
311 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
|
312 return [disco.DiscoFeature(NS_JINGLE_S5B)] |
|
313 |
|
314 def getDiscoItems(self, requestor, target, nodeIdentifier=''): |
|
315 return [] |