Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0260.py @ 1560:dcce63810733
plugin XEP-0260: first draft
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Nov 2015 22:02:41 +0100 |
parents | |
children | 268fda4236ca |
comparison
equal
deleted
inserted
replaced
1559:7cc29634b6ef | 1560:dcce63810733 |
---|---|
1 #!/usr/bin/python | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SAT plugin for Jingle (XEP-0260) | |
5 # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from sat.core.i18n import _ | |
21 from sat.core.log import getLogger | |
22 log = getLogger(__name__) | |
23 from sat.core import exceptions | |
24 from wokkel import disco, iwokkel | |
25 from zope.interface import implements | |
26 from twisted.words.xish import domish | |
27 from twisted.words.protocols.jabber import jid | |
28 from twisted.internet import defer | |
29 import uuid | |
30 | |
31 try: | |
32 from twisted.words.protocols.xmlstream import XMPPHandler | |
33 except ImportError: | |
34 from wokkel.subprotocols import XMPPHandler | |
35 | |
36 | |
37 NS_JINGLE_S5B = 'urn:xmpp:jingle:transports:s5b:1' | |
38 | |
39 PLUGIN_INFO = { | |
40 "name": "Jingle SOCKS5 Bytestreams", | |
41 "import_name": "XEP-0260", | |
42 "type": "XEP", | |
43 "protocols": ["XEP-0260"], | |
44 "dependencies": ["XEP-0166", "XEP-0065"], | |
45 "main": "XEP_0260", | |
46 "handler": "yes", | |
47 "description": _("""Implementation of Jingle SOCKS5 Bytestreams""") | |
48 } | |
49 | |
50 | |
51 class XEP_0260(object): | |
52 # TODO: udp handling | |
53 | |
54 def __init__(self, host): | |
55 log.info(_("plugin Jingle SOCKS5 Bytestreams")) | |
56 self.host = host | |
57 self._j = host.plugins["XEP-0166"] # shortcut to access jingle | |
58 self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream | |
59 self._j.registerTransport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) | |
60 | |
61 def getHandler(self, profile): | |
62 return XEP_0260_handler() | |
63 | |
64 def _parseCandidates(self, transport_elt): | |
65 """Parse <candidate> elements | |
66 | |
67 @param transport_elt(domish.Element): parent <transport> element | |
68 @return (list[plugin_xep_0065.Candidate): list of parsed candidates | |
69 """ | |
70 candidates = [] | |
71 for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, 'candidate'): | |
72 try: | |
73 cid = candidate_elt['cid'] | |
74 host = candidate_elt['host'] | |
75 jid_= jid.JID(candidate_elt['jid']) | |
76 port = int(candidate_elt.getAttribute('port', 1080)) | |
77 priority = int(candidate_elt['priority']) | |
78 type_ = candidate_elt.getAttribute('type', self._s5b.TYPE_DIRECT) | |
79 except (KeyError, ValueError): | |
80 raise exceptions.DataError() | |
81 candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) | |
82 candidates.append(candidate) | |
83 # self._s5b.registerCandidate(candidate) | |
84 return candidates | |
85 | |
86 def _buildCandidates(self, session, candidates, sid, session_hash, client, mode=None): | |
87 """Build <transport> element with candidates | |
88 | |
89 @param session(dict): jingle session data | |
90 @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add | |
91 @param sid(unicode): transport stream id | |
92 @param client: %(doc_client)s | |
93 @param mode(str, None): 'tcp' or 'udp', or None to have no attribute | |
94 @return (domish.Element): parent <transport> element where <candidate> elements must be added | |
95 """ | |
96 proxy = next((candidate for candidate in candidates if candidate.type == self._s5b.TYPE_PROXY), None) | |
97 transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) | |
98 transport_elt['sid'] = sid | |
99 if proxy is not None: | |
100 transport_elt['dstaddr'] = session_hash | |
101 if mode is not None: | |
102 transport_elt['mode'] = 'tcp' # XXX: we only manage tcp for now | |
103 | |
104 for candidate in candidates: | |
105 log.debug(u"Adding candidate: {}".format(candidate)) | |
106 candidate_elt = transport_elt.addElement('candidate', NS_JINGLE_S5B) | |
107 if candidate.id is None: | |
108 candidate.id = unicode(uuid.uuid4()) | |
109 candidate_elt['cid'] = candidate.id | |
110 candidate_elt['host'] = candidate.host | |
111 candidate_elt['jid'] = candidate.jid.full() | |
112 candidate_elt['port'] = unicode(candidate.port) | |
113 candidate_elt['priority'] = unicode(candidate.priority) | |
114 candidate_elt['type'] = candidate.type | |
115 return transport_elt | |
116 | |
117 @defer.inlineCallbacks | |
118 def jingleSessionInit(self, session, content_name, profile): | |
119 client = self.host.getClient(profile) | |
120 content_data = session['contents'][content_name] | |
121 transport_data = content_data['transport_data'] | |
122 sid = transport_data['sid'] = unicode(uuid.uuid4()) | |
123 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(client.jid, session['to_jid'], sid) | |
124 candidates = transport_data['candidates'] = yield self._s5b.getCandidates(profile) | |
125 mode = 'tcp' # XXX: we only manage tcp for now | |
126 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client, mode) | |
127 | |
128 defer.returnValue(transport_elt) | |
129 | |
130 def _foundPeerCandidate(self, candidate, session, transport_data, content_name, client): | |
131 """Called when the best candidate from other peer is found | |
132 | |
133 @param candidate(XEP_0065.Candidate, None): selected candidate, | |
134 or None if no candidate is accessible | |
135 @param session(dict): session data | |
136 @param transport_data(dict): transport data | |
137 @param content_name(dict): name of the current content | |
138 @param client(unicode): %(doc_client)s | |
139 """ | |
140 | |
141 transport_data['best_candidate'] = candidate | |
142 # we need to disconnect all non selected candidates before removing them | |
143 for c in transport_data['peer_candidates']: | |
144 if c is None or c is candidate: | |
145 continue | |
146 c.discard() | |
147 del transport_data['peer_candidates'] | |
148 iq_elt, transport_elt = self._j.buildAction(self._j.A_TRANSPORT_INFO, session, content_name, client.profile) | |
149 if candidate is None: | |
150 log.warning(u"Can't connect to any peer candidate") | |
151 candidate_elt = transport_elt.addElement('candidate-error') | |
152 else: | |
153 log.info(u"Found best peer candidate: {}".format(unicode(candidate))) | |
154 candidate_elt = transport_elt.addElement('candidate-used') | |
155 candidate_elt['cid'] = candidate.id | |
156 iq_elt.send() # TODO: check result stanza | |
157 content_data = session['contents'][content_name] | |
158 self._checkCandidates(session, content_data, transport_data, client) | |
159 | |
160 def _checkCandidates(self, session, content_data, transport_data, client): | |
161 """Called when a candidate has been choosed | |
162 | |
163 if we have both candidates, we select one, or fallback to an other transport | |
164 @param session(dict): session data | |
165 @param content_data(dict): content data | |
166 @param transport_data(dict): transport data | |
167 @param client(unicode): %(doc_client)s | |
168 """ | |
169 try: | |
170 best_candidate = transport_data['best_candidate'] | |
171 except KeyError: | |
172 # we have not our best candidate yet | |
173 return | |
174 try: | |
175 peer_best_candidate = transport_data['peer_best_candidate'] | |
176 except KeyError: | |
177 # we have not peer best candidate yet | |
178 return | |
179 | |
180 # at this point we have both candidates, it's time to choose one | |
181 if best_candidate is None or peer_best_candidate is None: | |
182 choosed_candidate = best_candidate or peer_best_candidate | |
183 else: | |
184 if best_candidate.priority == peer_best_candidate.priority: | |
185 # same priority, we choose initiator one according to XEP-0260 §2.4 #4 | |
186 log.debug(u"Candidates have same priority, we choose the initiator one") | |
187 if session['initiator'] == client.jid: | |
188 choosed_candidate = best_candidate | |
189 else: | |
190 choosed_candidate = peer_best_candidate | |
191 else: | |
192 choosed_candidate = max(best_candidate, peer_best_candidate, key=lambda c:c.priority) | |
193 | |
194 if choosed_candidate is None: | |
195 log.warning(u"Socks5 negociation failed, we need to fallback to IBB") | |
196 else: | |
197 if choosed_candidate==best_candidate: | |
198 who = u'our' | |
199 else: | |
200 who = u'other peer' | |
201 best_candidate.discard() | |
202 | |
203 log.info(u"Socks5 negociation successful, {who} candidate will be used: {candidate}".format( | |
204 who = who, | |
205 candidate = choosed_candidate)) | |
206 del transport_data['best_candidate'] | |
207 del transport_data['peer_best_candidate'] | |
208 if content_data['senders'] == session['role']: | |
209 # we can now start the file transfer | |
210 choosed_candidate.startTransfer(transport_data['session_hash']) | |
211 | |
212 @defer.inlineCallbacks | |
213 def jingleHandler(self, action, session, content_name, transport_elt, profile): | |
214 client = self.host.getClient(profile) | |
215 content_data = session['contents'][content_name] | |
216 transport_data = content_data['transport_data'] | |
217 | |
218 if action in (self._j.A_ACCEPTED_ACK,): | |
219 pass | |
220 | |
221 elif action == self._j.A_SESSION_ACCEPT: | |
222 # initiator side, we select a candidate in the ones sent by responder | |
223 assert 'peer_candidates' not in transport_data | |
224 transport_data['peer_candidates'] = self._parseCandidates(transport_elt) | |
225 | |
226 # elif action == self._j.A_START: | |
227 elif action == self._j.A_START: | |
228 session_hash = transport_data['session_hash'] | |
229 peer_candidates = transport_data['peer_candidates'] | |
230 file_obj = content_data['file_obj'] | |
231 stream_d = self._s5b.registerHash(session_hash, file_obj, profile) | |
232 args = [session, content_name, profile] | |
233 stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args) | |
234 d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) | |
235 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) | |
236 | |
237 elif action == self._j.A_SESSION_INITIATE: | |
238 # responder side, we select a candidate in the ones sent by initiator | |
239 # and we give our candidates | |
240 assert 'peer_candidates' not in transport_data | |
241 sid = transport_data['sid'] = transport_elt['sid'] | |
242 session_hash = transport_data['session_hash'] = self._s5b.getSessionHash(session['to_jid'], client.jid, sid) | |
243 peer_candidates = transport_data['peer_candidates'] = self._parseCandidates(transport_elt) | |
244 file_obj = content_data['file_obj'] | |
245 stream_d = self._s5b.registerHash(session_hash, file_obj, profile) | |
246 args = [session, content_name, profile] | |
247 stream_d.addCallbacks(self._streamCb, self._streamEb, args, None, args) | |
248 d = self._s5b.getBestCandidate(peer_candidates, session_hash, profile) | |
249 d.addCallback(self._foundPeerCandidate, session, transport_data, content_name, client) | |
250 candidates = yield self._s5b.getCandidates(profile) | |
251 # we remove duplicate candidates | |
252 candidates = [candidate for candidate in candidates if candidate not in peer_candidates] | |
253 | |
254 transport_data['candidates'] = candidates | |
255 # we can now build a new <transport> element with our candidates | |
256 transport_elt = self._buildCandidates(session, candidates, sid, session_hash, client) | |
257 | |
258 elif action == self._j.A_TRANSPORT_INFO: | |
259 # other peer gave us its choosed candidate | |
260 try: | |
261 candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-used').next() | |
262 except StopIteration: | |
263 try: | |
264 candidate_elt = transport_elt.elements(NS_JINGLE_S5B, 'candidate-error').next() | |
265 except StopIteration: | |
266 log.warning(u"Unexpected transport element: {}".format(transport_elt.toXml())) | |
267 raise exceptions.DataError | |
268 else: | |
269 # candidate-error, no candidate worked | |
270 transport_data['peer_best_candidate'] = None | |
271 else: | |
272 # candidate-used, one candidate was choosed | |
273 try: | |
274 cid = candidate_elt.attributes['cid'] | |
275 except KeyError: | |
276 log.warning(u"No cid found in <candidate-used>") | |
277 raise exceptions.DataError | |
278 try: | |
279 candidate = (c for c in transport_data['candidates'] if c.id == cid).next() | |
280 except StopIteration: | |
281 log.warning(u"Given cid doesn't correspond to any known candidate !") | |
282 raise exceptions.DataError # TODO: send an error to other peer, and use better exception | |
283 except KeyError: | |
284 # a transport-info can also be intentionaly sent too early by other peer | |
285 # but there is little probability | |
286 log.error(u'"candidates" key doesn\'t exists in transport_data, it should at this point') | |
287 raise exceptions.InternalError | |
288 # at this point we have the candidate choosed by other peer | |
289 transport_data['peer_best_candidate'] = candidate | |
290 log.info(u"Other peer best candidate: {}".format(candidate)) | |
291 | |
292 del transport_data['candidates'] | |
293 self._checkCandidates(session, content_data, transport_data, client) | |
294 | |
295 else: | |
296 log.warning(u"FIXME: unmanaged action {}".format(action)) | |
297 | |
298 defer.returnValue(transport_elt) | |
299 | |
300 def _streamCb(self, dummy, session, content_name, profile): | |
301 self._j.contentTerminate(session, content_name, profile=profile) | |
302 | |
303 def _streamEb(self, failure, session, content_name, profile): | |
304 log.warning(u"Error while streaming through s5b: {}".format(failure)) | |
305 self._j.contentTerminate(session, content_name, reason=self._j.REASON_FAILED_TRANSPORT, profile=profile) | |
306 | |
307 | |
308 class XEP_0260_handler(XMPPHandler): | |
309 implements(iwokkel.IDisco) | |
310 | |
311 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | |
312 return [disco.DiscoFeature(NS_JINGLE_S5B)] | |
313 | |
314 def getDiscoItems(self, requestor, target, nodeIdentifier=''): | |
315 return [] |