comparison src/plugins/plugin_xep_0260.py @ 1560:dcce63810733

plugin XEP-0260: first draft
author Goffi <goffi@goffi.org>
date Mon, 02 Nov 2015 22:02:41 +0100
parents
children 268fda4236ca
comparison
equal deleted inserted replaced
1559:7cc29634b6ef 1560:dcce63810733
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 # SAT plugin for Jingle (XEP-0260)
5 # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core.log import getLogger
22 log = getLogger(__name__)
23 from sat.core import exceptions
24 from wokkel import disco, iwokkel
25 from zope.interface import implements
26 from twisted.words.xish import domish
27 from twisted.words.protocols.jabber import jid
28 from twisted.internet import defer
29 import uuid
30
31 try:
32 from twisted.words.protocols.xmlstream import XMPPHandler
33 except ImportError:
34 from wokkel.subprotocols import XMPPHandler
35
36
37 NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1'
38
39 PLUGIN_INFO = {
40 "name": "Jingle SOCKS5 Bytestreams",
41 "import_name": "XEP-0260",
42 "type": "XEP",
43 "protocols": ["XEP-0260"],
44 "dependencies": ["XEP-0166", "XEP-0065"],
45 "main": "XEP_0260",
46 "handler": "yes",
47 "description": _("""Implementation of Jingle SOCKS5 Bytestreams""")
48 }
49
50
51 class XEP_0260(object):
52 # TODO: udp handling
53
54 def __init__(self, host):
55 log.info(_("plugin Jingle SOCKS5 Bytestreams"))
56 self.host = host
57 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
58 self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream
59 self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100)
60
61 def getHandler(self, profile):
62 return XEP_0260_handler()
63
64 def _parseCandidates(self, transport_elt):
65 """Parse <candidate> elements
66
67 @param transport_elt(domish.Element): parent <transport> element
68 @return (list[plugin_xep_0065.Candidate): list of parsed candidates
69 """
70 candidates = []
71 for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'):
72 try:
73 cid = candidate_elt['cid']
74 host = candidate_elt['host']
75 jid_= jid.JID(candidate_elt['jid'])
76 port = int(candidate_elt.getAttribute('port', 1080))
77 priority = int(candidate_elt['priority'])
78 type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT)
79 except (KeyError, ValueError):
80 raise exceptions.DataError()
81 candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid)
82 candidates.append(candidate)
83 # self._s5b.registerCandidate(candidate)
84 return candidates
85
86 def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None):
87 """Build <transport> element with candidates
88
89 @param session(dict): jingle session data
90 @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add
91 @param sid(unicode): transport stream id
92 @param client: %(doc_client)s
93 @param mode(str, None): 'tcp' or 'udp', or None to have no attribute
94 @return (domish.Element): parent <transport> element where <candidate> elements must be added
95 """
96 proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None)
97 transport_elt = domish.Element((NS_JINGLE_S5B, "transport"))
98 transport_elt['sid'] = sid
99 if proxy is not None:
100 transport_elt['dstaddr'] = session_hash
101 if mode is not None:
102 transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now
103
104 for candidate in candidates:
105 log.debug(u"Adding candidate: {}".format(candidate))
106 candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B)
107 if candidate.id is None:
108 candidate.id = unicode(uuid.uuid4())
109 candidate_elt['cid'] = candidate.id
110 candidate_elt['host'] = candidate.host
111 candidate_elt['jid'] = candidate.jid.full()
112 candidate_elt['port'] = unicode(candidate.port)
113 candidate_elt['priority'] = unicode(candidate.priority)
114 candidate_elt['type'] = candidate.type
115 return transport_elt
116
117 @defer.inlineCallbacks
118 def jingleSessionInit(self, session, content_name, profile):
119 client = self.host.getClient(profile)
120 content_data = session['contents'][content_name]
121 transport_data = content_data['transport_data']
122 sid = transport_data['sid'] = unicode(uuid.uuid4())
123 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['to_jid'], sid)
124 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile)
125 mode = 'tcp' # XXX: we only manage tcp for now
126 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode)
127
128 defer.returnValue(transport_elt)
129
130 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client):
131 """Called when the best candidate from other peer is found
132
133 @param candidate(XEP_0065.Candidate, None): selected candidate,
134 or None if no candidate is accessible
135 @param session(dict): session data
136 @param transport_data(dict): transport data
137 @param content_name(dict): name of the current content
138 @param client(unicode): %(doc_client)s
139 """
140
141 transport_data['best_candidate'] = candidate
142 # we need to disconnect all non selected candidates before removing them
143 for c in transport_data['peer_candidates']:
144 if c is None or c is candidate:
145 continue
146 c.discard()
147 del transport_data['peer_candidates']
148 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile)
149 if candidate is None:
150 log.warning(u"Can't connect to any peer candidate")
151 candidate_elt = transport_elt.addElement('candidate-error')
152 else:
153 log.info(u"Found best peer candidate: {}".format(unicode(candidate)))
154 candidate_elt = transport_elt.addElement('candidate-used')
155 candidate_elt['cid'] = candidate.id
156 iq_elt.send() # TODO: check result stanza
157 content_data = session['contents'][content_name]
158 self._checkCandidates(session, content_data, transport_data, client)
159
160 def _checkCandidates(self, session, content_data, transport_data, client):
161 """Called when a candidate has been choosed
162
163 if we have both candidates, we select one, or fallback to an other transport
164 @param session(dict): session data
165 @param content_data(dict): content data
166 @param transport_data(dict): transport data
167 @param client(unicode): %(doc_client)s
168 """
169 try:
170 best_candidate = transport_data['best_candidate']
171 except KeyError:
172 # we have not our best candidate yet
173 return
174 try:
175 peer_best_candidate = transport_data['peer_best_candidate']
176 except KeyError:
177 # we have not peer best candidate yet
178 return
179
180 # at this point we have both candidates, it's time to choose one
181 if best_candidate is None or peer_best_candidate is None:
182 choosed_candidate = best_candidate or peer_best_candidate
183 else:
184 if best_candidate.priority == peer_best_candidate.priority:
185 # same priority, we choose initiator one according to XEP-0260 §2.4 #4
186 log.debug(u"Candidates have same priority, we choose the initiator one")
187 if session['initiator'] == client.jid:
188 choosed_candidate = best_candidate
189 else:
190 choosed_candidate = peer_best_candidate
191 else:
192 choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority)
193
194 if choosed_candidate is None:
195 log.warning(u"Socks5 negociation failed, we need to fallback to IBB")
196 else:
197 if choosed_candidate==best_candidate:
198 who = u'our'
199 else:
200 who = u'other peer'
201 best_candidate.discard()
202
203 log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
204 who = who,
205 candidate = choosed_candidate))
206 del transport_data['best_candidate']
207 del transport_data['peer_best_candidate']
208 if content_data['senders'] == session['role']:
209 # we can now start the file transfer
210 choosed_candidate.startTransfer(transport_data['session_hash'])
211
212 @defer.inlineCallbacks
213 def jingleHandler(self, action, session, content_name, transport_elt, profile):
214 client = self.host.getClient(profile)
215 content_data = session['contents'][content_name]
216 transport_data = content_data['transport_data']
217
218 if action in (self._j.A_ACCEPTED_ACK,):
219 pass
220
221 elif action == self._j.A_SESSION_ACCEPT:
222 # initiator side, we select a candidate in the ones sent by responder
223 assert 'peer_candidates' not in transport_data
224 transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
225
226 # elif action == self._j.A_START:
227 elif action == self._j.A_START:
228 session_hash = transport_data['session_hash']
229 peer_candidates = transport_data['peer_candidates']
230 file_obj = content_data['file_obj']
231 stream_d = self._s5b.registerHash(session_hash, file_obj, profile)
232 args = [session, content_name, profile]
233 stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args)
234 d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile)
235 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
236
237 elif action == self._j.A_SESSION_INITIATE:
238 # responder side, we select a candidate in the ones sent by initiator
239 # and we give our candidates
240 assert 'peer_candidates' not in transport_data
241 sid = transport_data['sid'] = transport_elt['sid']
242 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(session['to_jid'], client.jid, sid)
243 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt)
244 file_obj = content_data['file_obj']
245 stream_d = self._s5b.registerHash(session_hash, file_obj, profile)
246 args = [session, content_name, profile]
247 stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args)
248 d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile)
249 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client)
250 candidates = yield self._s5b.getCandidates(profile)
251 # we remove duplicate candidates
252 candidates = [candidate for candidate in candidates if candidate not in peer_candidates]
253
254 transport_data['candidates'] = candidates
255 # we can now build a new <transport> element with our candidates
256 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client)
257
258 elif action == self._j.A_TRANSPORT_INFO:
259 # other peer gave us its choosed candidate
260 try:
261 candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-used').next()
262 except StopIteration:
263 try:
264 candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-error').next()
265 except StopIteration:
266 log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml()))
267 raise exceptions.DataError
268 else:
269 # candidate-error, no candidate worked
270 transport_data['peer_best_candidate'] = None
271 else:
272 # candidate-used, one candidate was choosed
273 try:
274 cid = candidate_elt.attributes['cid']
275 except KeyError:
276 log.warning(u"No cid found in <candidate-used>")
277 raise exceptions.DataError
278 try:
279 candidate = (c for c in transport_data['candidates'] if c.id == cid).next()
280 except StopIteration:
281 log.warning(u"Given cid doesn't correspond to any known candidate !")
282 raise exceptions.DataError # TODO: send an error to other peer, and use better exception
283 except KeyError:
284 # a transport-info can also be intentionaly sent too early by other peer
285 # but there is little probability
286 log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point')
287 raise exceptions.InternalError
288 # at this point we have the candidate choosed by other peer
289 transport_data['peer_best_candidate'] = candidate
290 log.info(u"Other peer best candidate: {}".format(candidate))
291
292 del transport_data['candidates']
293 self._checkCandidates(session, content_data, transport_data, client)
294
295 else:
296 log.warning(u"FIXME: unmanaged action {}".format(action))
297
298 defer.returnValue(transport_elt)
299
300 def _streamCb(self, dummy, session, content_name, profile):
301 self._j.contentTerminate(session, content_name, profile=profile)
302
303 def _streamEb(self, failure, session, content_name, profile):
304 log.warning(u"Error while streaming through s5b: {}".format(failure))
305 self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile)
306
307
308 class XEP_0260_handler(XMPPHandler):
309 implements(iwokkel.IDisco)
310
311 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
312 return [disco.DiscoFeature(NS_JINGLE_S5B)]
313
314 def getDiscoItems(self, requestor, target, nodeIdentifier=''):
315 return []