Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0260.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0260.py@3900626bc100 |
children | b86912d3fd33 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT plugin for Jingle (XEP-0260) | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from libervia.backend.core.i18n import _ | |
21 from libervia.backend.core.constants import Const as C | |
22 from libervia.backend.core.log import getLogger | |
23 | |
24 log = getLogger(__name__) | |
25 from libervia.backend.core import exceptions | |
26 from wokkel import disco, iwokkel | |
27 from zope.interface import implementer | |
28 from twisted.words.xish import domish | |
29 from twisted.words.protocols.jabber import jid | |
30 from twisted.internet import defer | |
31 import uuid | |
32 | |
33 try: | |
34 from twisted.words.protocols.xmlstream import XMPPHandler | |
35 except ImportError: | |
36 from wokkel.subprotocols import XMPPHandler | |
37 | |
38 | |
39 NS_JINGLE_S5B = "urn:xmpp:jingle:transports:s5b:1" | |
40 | |
41 PLUGIN_INFO = { | |
42 C.PI_NAME: "Jingle SOCKS5 Bytestreams", | |
43 C.PI_IMPORT_NAME: "XEP-0260", | |
44 C.PI_TYPE: "XEP", | |
45 C.PI_MODES: C.PLUG_MODE_BOTH, | |
46 C.PI_PROTOCOLS: ["XEP-0260"], | |
47 C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"], | |
48 C.PI_RECOMMENDATIONS: ["XEP-0261"], # needed for fallback | |
49 C.PI_MAIN: "XEP_0260", | |
50 C.PI_HANDLER: "yes", | |
51 C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams"""), | |
52 } | |
53 | |
54 | |
55 class ProxyError(Exception): | |
56 def __str__(self): | |
57 return "an error happened while trying to use the proxy" | |
58 | |
59 | |
60 class XEP_0260(object): | |
61 # TODO: udp handling | |
62 | |
63 def __init__(self, host): | |
64 log.info(_("plugin Jingle SOCKS5 Bytestreams")) | |
65 self.host = host | |
66 self._j = host.plugins["XEP-0166"] # shortcut to access jingle | |
67 self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream | |
68 try: | |
69 self._jingle_ibb = host.plugins["XEP-0261"] | |
70 except KeyError: | |
71 self._jingle_ibb = None | |
72 self._j.register_transport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100) | |
73 | |
74 def get_handler(self, client): | |
75 return XEP_0260_handler() | |
76 | |
77 def _parse_candidates(self, transport_elt): | |
78 """Parse <candidate> elements | |
79 | |
80 @param transport_elt(domish.Element): parent <transport> element | |
81 @return (list[plugin_xep_0065.Candidate): list of parsed candidates | |
82 """ | |
83 candidates = [] | |
84 for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, "candidate"): | |
85 try: | |
86 cid = candidate_elt["cid"] | |
87 host = candidate_elt["host"] | |
88 jid_ = jid.JID(candidate_elt["jid"]) | |
89 port = int(candidate_elt.getAttribute("port", 1080)) | |
90 priority = int(candidate_elt["priority"]) | |
91 type_ = candidate_elt.getAttribute("type", self._s5b.TYPE_DIRECT) | |
92 except (KeyError, ValueError): | |
93 raise exceptions.DataError() | |
94 candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid) | |
95 candidates.append(candidate) | |
96 # self._s5b.registerCandidate(candidate) | |
97 return candidates | |
98 | |
99 def _build_candidates(self, session, candidates, sid, session_hash, client, mode=None): | |
100 """Build <transport> element with candidates | |
101 | |
102 @param session(dict): jingle session data | |
103 @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add | |
104 @param sid(unicode): transport stream id | |
105 @param client: %(doc_client)s | |
106 @param mode(str, None): 'tcp' or 'udp', or None to have no attribute | |
107 @return (domish.Element): parent <transport> element where <candidate> elements must be added | |
108 """ | |
109 proxy = next( | |
110 ( | |
111 candidate | |
112 for candidate in candidates | |
113 if candidate.type == self._s5b.TYPE_PROXY | |
114 ), | |
115 None, | |
116 ) | |
117 transport_elt = domish.Element((NS_JINGLE_S5B, "transport")) | |
118 transport_elt["sid"] = sid | |
119 if proxy is not None: | |
120 transport_elt["dstaddr"] = session_hash | |
121 if mode is not None: | |
122 transport_elt["mode"] = "tcp" # XXX: we only manage tcp for now | |
123 | |
124 for candidate in candidates: | |
125 log.debug("Adding candidate: {}".format(candidate)) | |
126 candidate_elt = transport_elt.addElement("candidate", NS_JINGLE_S5B) | |
127 if candidate.id is None: | |
128 candidate.id = str(uuid.uuid4()) | |
129 candidate_elt["cid"] = candidate.id | |
130 candidate_elt["host"] = candidate.host | |
131 candidate_elt["jid"] = candidate.jid.full() | |
132 candidate_elt["port"] = str(candidate.port) | |
133 candidate_elt["priority"] = str(candidate.priority) | |
134 candidate_elt["type"] = candidate.type | |
135 return transport_elt | |
136 | |
137 @defer.inlineCallbacks | |
138 def jingle_session_init(self, client, session, content_name): | |
139 content_data = session["contents"][content_name] | |
140 transport_data = content_data["transport_data"] | |
141 sid = transport_data["sid"] = str(uuid.uuid4()) | |
142 session_hash = transport_data["session_hash"] = self._s5b.get_session_hash( | |
143 session["local_jid"], session["peer_jid"], sid | |
144 ) | |
145 transport_data["peer_session_hash"] = self._s5b.get_session_hash( | |
146 session["peer_jid"], session["local_jid"], sid | |
147 ) # requester and target are inversed for peer candidates | |
148 transport_data["stream_d"] = self._s5b.register_hash(client, session_hash, None) | |
149 candidates = transport_data["candidates"] = yield self._s5b.get_candidates( | |
150 client, session["local_jid"]) | |
151 mode = "tcp" # XXX: we only manage tcp for now | |
152 transport_elt = self._build_candidates( | |
153 session, candidates, sid, session_hash, client, mode | |
154 ) | |
155 | |
156 defer.returnValue(transport_elt) | |
157 | |
158 def _proxy_activated_cb(self, iq_result_elt, client, candidate, session, content_name): | |
159 """Called when activation confirmation has been received from proxy | |
160 | |
161 cf XEP-0260 § 2.4 | |
162 """ | |
163 # now that the proxy is activated, we have to inform other peer | |
164 content_data = session["contents"][content_name] | |
165 iq_elt, transport_elt = self._j.build_action( | |
166 client, self._j.A_TRANSPORT_INFO, session, content_name | |
167 ) | |
168 transport_elt["sid"] = content_data["transport_data"]["sid"] | |
169 activated_elt = transport_elt.addElement("activated") | |
170 activated_elt["cid"] = candidate.id | |
171 iq_elt.send() | |
172 | |
173 def _proxy_activated_eb(self, stanza_error, client, candidate, session, content_name): | |
174 """Called when activation error has been received from proxy | |
175 | |
176 cf XEP-0260 § 2.4 | |
177 """ | |
178 # TODO: fallback to IBB | |
179 # now that the proxy is activated, we have to inform other peer | |
180 content_data = session["contents"][content_name] | |
181 iq_elt, transport_elt = self._j.build_action( | |
182 client, self._j.A_TRANSPORT_INFO, session, content_name | |
183 ) | |
184 transport_elt["sid"] = content_data["transport_data"]["sid"] | |
185 transport_elt.addElement("proxy-error") | |
186 iq_elt.send() | |
187 log.warning( | |
188 "Can't activate proxy, we need to fallback to IBB: {reason}".format( | |
189 reason=stanza_error.value.condition | |
190 ) | |
191 ) | |
192 self.do_fallback(session, content_name, client) | |
193 | |
194 def _found_peer_candidate( | |
195 self, candidate, session, transport_data, content_name, client | |
196 ): | |
197 """Called when the best candidate from other peer is found | |
198 | |
199 @param candidate(XEP_0065.Candidate, None): selected candidate, | |
200 or None if no candidate is accessible | |
201 @param session(dict): session data | |
202 @param transport_data(dict): transport data | |
203 @param content_name(unicode): name of the current content | |
204 @param client(unicode): %(doc_client)s | |
205 """ | |
206 | |
207 content_data = session["contents"][content_name] | |
208 transport_data["best_candidate"] = candidate | |
209 # we need to disconnect all non selected candidates before removing them | |
210 for c in transport_data["peer_candidates"]: | |
211 if c is None or c is candidate: | |
212 continue | |
213 c.discard() | |
214 del transport_data["peer_candidates"] | |
215 iq_elt, transport_elt = self._j.build_action( | |
216 client, self._j.A_TRANSPORT_INFO, session, content_name | |
217 ) | |
218 transport_elt["sid"] = content_data["transport_data"]["sid"] | |
219 if candidate is None: | |
220 log.warning("Can't connect to any peer candidate") | |
221 candidate_elt = transport_elt.addElement("candidate-error") | |
222 else: | |
223 log.info("Found best peer candidate: {}".format(str(candidate))) | |
224 candidate_elt = transport_elt.addElement("candidate-used") | |
225 candidate_elt["cid"] = candidate.id | |
226 iq_elt.send() # TODO: check result stanza | |
227 self._check_candidates(session, content_name, transport_data, client) | |
228 | |
229 def _check_candidates(self, session, content_name, transport_data, client): | |
230 """Called when a candidate has been choosed | |
231 | |
232 if we have both candidates, we select one, or fallback to an other transport | |
233 @param session(dict): session data | |
234 @param content_name(unicode): name of the current content | |
235 @param transport_data(dict): transport data | |
236 @param client(unicode): %(doc_client)s | |
237 """ | |
238 content_data = session["contents"][content_name] | |
239 try: | |
240 best_candidate = transport_data["best_candidate"] | |
241 except KeyError: | |
242 # we have not our best candidate yet | |
243 return | |
244 try: | |
245 peer_best_candidate = transport_data["peer_best_candidate"] | |
246 except KeyError: | |
247 # we have not peer best candidate yet | |
248 return | |
249 | |
250 # at this point we have both candidates, it's time to choose one | |
251 if best_candidate is None or peer_best_candidate is None: | |
252 choosed_candidate = best_candidate or peer_best_candidate | |
253 else: | |
254 if best_candidate.priority == peer_best_candidate.priority: | |
255 # same priority, we choose initiator one according to XEP-0260 §2.4 #4 | |
256 log.debug( | |
257 "Candidates have same priority, we select the one choosed by initiator" | |
258 ) | |
259 if session["initiator"] == session["local_jid"]: | |
260 choosed_candidate = best_candidate | |
261 else: | |
262 choosed_candidate = peer_best_candidate | |
263 else: | |
264 choosed_candidate = max( | |
265 best_candidate, peer_best_candidate, key=lambda c: c.priority | |
266 ) | |
267 | |
268 if choosed_candidate is None: | |
269 log.warning("Socks5 negociation failed, we need to fallback to IBB") | |
270 self.do_fallback(session, content_name, client) | |
271 else: | |
272 if choosed_candidate == peer_best_candidate: | |
273 # peer_best_candidate was choosed from the candidates we have sent | |
274 # so our_candidate is true if choosed_candidate is peer_best_candidate | |
275 our_candidate = True | |
276 # than also mean that best_candidate must be discarded ! | |
277 try: | |
278 best_candidate.discard() | |
279 except AttributeError: # but it can be None | |
280 pass | |
281 else: | |
282 our_candidate = False | |
283 | |
284 log.info( | |
285 "Socks5 negociation successful, {who} candidate will be used: {candidate}".format( | |
286 who="our" if our_candidate else "other peer", | |
287 candidate=choosed_candidate, | |
288 ) | |
289 ) | |
290 del transport_data["best_candidate"] | |
291 del transport_data["peer_best_candidate"] | |
292 | |
293 if choosed_candidate.type == self._s5b.TYPE_PROXY: | |
294 # the stream transfer need to wait for proxy activation | |
295 # (see XEP-0260 § 2.4) | |
296 if our_candidate: | |
297 d = self._s5b.connect_candidate( | |
298 client, choosed_candidate, transport_data["session_hash"] | |
299 ) | |
300 d.addCallback( | |
301 lambda __: choosed_candidate.activate( | |
302 transport_data["sid"], session["peer_jid"], client | |
303 ) | |
304 ) | |
305 args = [client, choosed_candidate, session, content_name] | |
306 d.addCallbacks( | |
307 self._proxy_activated_cb, self._proxy_activated_eb, args, None, args | |
308 ) | |
309 else: | |
310 # this Deferred will be called when we'll receive activation confirmation from other peer | |
311 d = transport_data["activation_d"] = defer.Deferred() | |
312 else: | |
313 d = defer.succeed(None) | |
314 | |
315 if content_data["senders"] == session["role"]: | |
316 # we can now start the stream transfer (or start it after proxy activation) | |
317 d.addCallback( | |
318 lambda __: choosed_candidate.start_transfer( | |
319 transport_data["session_hash"] | |
320 ) | |
321 ) | |
322 d.addErrback(self._start_eb, session, content_name, client) | |
323 | |
324 def _start_eb(self, fail, session, content_name, client): | |
325 """Called when it's not possible to start the transfer | |
326 | |
327 Will try to fallback to IBB | |
328 """ | |
329 try: | |
330 reason = str(fail.value) | |
331 except AttributeError: | |
332 reason = str(fail) | |
333 log.warning("Cant start transfert, we'll try fallback method: {}".format(reason)) | |
334 self.do_fallback(session, content_name, client) | |
335 | |
336 def _candidate_info( | |
337 self, candidate_elt, session, content_name, transport_data, client | |
338 ): | |
339 """Called when best candidate has been received from peer (or if none is working) | |
340 | |
341 @param candidate_elt(domish.Element): candidate-used or candidate-error element | |
342 (see XEP-0260 §2.3) | |
343 @param session(dict): session data | |
344 @param content_name(unicode): name of the current content | |
345 @param transport_data(dict): transport data | |
346 @param client(unicode): %(doc_client)s | |
347 """ | |
348 if candidate_elt.name == "candidate-error": | |
349 # candidate-error, no candidate worked | |
350 transport_data["peer_best_candidate"] = None | |
351 else: | |
352 # candidate-used, one candidate was choosed | |
353 try: | |
354 cid = candidate_elt.attributes["cid"] | |
355 except KeyError: | |
356 log.warning("No cid found in <candidate-used>") | |
357 raise exceptions.DataError | |
358 try: | |
359 candidate = next(( | |
360 c for c in transport_data["candidates"] if c.id == cid | |
361 )) | |
362 except StopIteration: | |
363 log.warning("Given cid doesn't correspond to any known candidate !") | |
364 raise exceptions.DataError # TODO: send an error to other peer, and use better exception | |
365 except KeyError: | |
366 # a transport-info can also be intentionaly sent too early by other peer | |
367 # but there is little probability | |
368 log.error( | |
369 '"candidates" key doesn\'t exists in transport_data, it should at this point' | |
370 ) | |
371 raise exceptions.InternalError | |
372 # at this point we have the candidate choosed by other peer | |
373 transport_data["peer_best_candidate"] = candidate | |
374 log.info("Other peer best candidate: {}".format(candidate)) | |
375 | |
376 del transport_data["candidates"] | |
377 self._check_candidates(session, content_name, transport_data, client) | |
378 | |
379 def _proxy_activation_info( | |
380 self, proxy_elt, session, content_name, transport_data, client | |
381 ): | |
382 """Called when proxy has been activated (or has sent an error) | |
383 | |
384 @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element | |
385 (see XEP-0260 §2.4) | |
386 @param session(dict): session data | |
387 @param content_name(unicode): name of the current content | |
388 @param transport_data(dict): transport data | |
389 @param client(unicode): %(doc_client)s | |
390 """ | |
391 try: | |
392 activation_d = transport_data.pop("activation_d") | |
393 except KeyError: | |
394 log.warning("Received unexpected transport-info for proxy activation") | |
395 | |
396 if proxy_elt.name == "activated": | |
397 activation_d.callback(None) | |
398 else: | |
399 activation_d.errback(ProxyError()) | |
400 | |
401 @defer.inlineCallbacks | |
402 def jingle_handler(self, client, action, session, content_name, transport_elt): | |
403 content_data = session["contents"][content_name] | |
404 transport_data = content_data["transport_data"] | |
405 | |
406 if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): | |
407 pass | |
408 | |
409 elif action == self._j.A_SESSION_ACCEPT: | |
410 # initiator side, we select a candidate in the ones sent by responder | |
411 assert "peer_candidates" not in transport_data | |
412 transport_data["peer_candidates"] = self._parse_candidates(transport_elt) | |
413 | |
414 elif action == self._j.A_START: | |
415 session_hash = transport_data["session_hash"] | |
416 peer_candidates = transport_data["peer_candidates"] | |
417 stream_object = content_data["stream_object"] | |
418 self._s5b.associate_stream_object(client, session_hash, stream_object) | |
419 stream_d = transport_data.pop("stream_d") | |
420 stream_d.chainDeferred(content_data["finished_d"]) | |
421 peer_session_hash = transport_data["peer_session_hash"] | |
422 d = self._s5b.get_best_candidate( | |
423 client, peer_candidates, session_hash, peer_session_hash | |
424 ) | |
425 d.addCallback( | |
426 self._found_peer_candidate, session, transport_data, content_name, client | |
427 ) | |
428 | |
429 elif action == self._j.A_SESSION_INITIATE: | |
430 # responder side, we select a candidate in the ones sent by initiator | |
431 # and we give our candidates | |
432 assert "peer_candidates" not in transport_data | |
433 sid = transport_data["sid"] = transport_elt["sid"] | |
434 session_hash = transport_data["session_hash"] = self._s5b.get_session_hash( | |
435 session["local_jid"], session["peer_jid"], sid | |
436 ) | |
437 peer_session_hash = transport_data[ | |
438 "peer_session_hash" | |
439 ] = self._s5b.get_session_hash( | |
440 session["peer_jid"], session["local_jid"], sid | |
441 ) # requester and target are inversed for peer candidates | |
442 peer_candidates = transport_data["peer_candidates"] = self._parse_candidates( | |
443 transport_elt | |
444 ) | |
445 stream_object = content_data["stream_object"] | |
446 stream_d = self._s5b.register_hash(client, session_hash, stream_object) | |
447 stream_d.chainDeferred(content_data["finished_d"]) | |
448 d = self._s5b.get_best_candidate( | |
449 client, peer_candidates, session_hash, peer_session_hash | |
450 ) | |
451 d.addCallback( | |
452 self._found_peer_candidate, session, transport_data, content_name, client | |
453 ) | |
454 candidates = yield self._s5b.get_candidates(client, session["local_jid"]) | |
455 # we remove duplicate candidates | |
456 candidates = [ | |
457 candidate for candidate in candidates if candidate not in peer_candidates | |
458 ] | |
459 | |
460 transport_data["candidates"] = candidates | |
461 # we can now build a new <transport> element with our candidates | |
462 transport_elt = self._build_candidates( | |
463 session, candidates, sid, session_hash, client | |
464 ) | |
465 | |
466 elif action == self._j.A_TRANSPORT_INFO: | |
467 # transport-info can be about candidate or proxy activation | |
468 candidate_elt = None | |
469 | |
470 for method, names in ( | |
471 (self._candidate_info, ("candidate-used", "candidate-error")), | |
472 (self._proxy_activation_info, ("activated", "proxy-error")), | |
473 ): | |
474 for name in names: | |
475 try: | |
476 candidate_elt = next(transport_elt.elements(NS_JINGLE_S5B, name)) | |
477 except StopIteration: | |
478 continue | |
479 else: | |
480 method( | |
481 candidate_elt, session, content_name, transport_data, client | |
482 ) | |
483 break | |
484 | |
485 if candidate_elt is None: | |
486 log.warning( | |
487 "Unexpected transport element: {}".format(transport_elt.toXml()) | |
488 ) | |
489 elif action == self._j.A_DESTROY: | |
490 # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session. | |
491 # note that sid argument is not necessary for sessions created by this plugin | |
492 self._s5b.kill_session(None, transport_data["session_hash"], None, client) | |
493 else: | |
494 log.warning("FIXME: unmanaged action {}".format(action)) | |
495 | |
496 defer.returnValue(transport_elt) | |
497 | |
498 def jingle_terminate(self, client, action, session, content_name, reason_elt): | |
499 if reason_elt.decline: | |
500 log.debug("Session declined, deleting S5B session") | |
501 # we just need to clean the S5B session if it is declined | |
502 content_data = session["contents"][content_name] | |
503 transport_data = content_data["transport_data"] | |
504 self._s5b.kill_session(None, transport_data["session_hash"], None, client) | |
505 | |
506 def _do_fallback(self, feature_checked, session, content_name, client): | |
507 """Do the fallback, method called once feature is checked | |
508 | |
509 @param feature_checked(bool): True if other peer can do IBB | |
510 """ | |
511 if not feature_checked: | |
512 log.warning( | |
513 "Other peer can't manage jingle IBB, be have to terminate the session" | |
514 ) | |
515 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) | |
516 else: | |
517 self._j.transport_replace( | |
518 client, self._jingle_ibb.NAMESPACE, session, content_name | |
519 ) | |
520 | |
521 def do_fallback(self, session, content_name, client): | |
522 """Fallback to IBB transport, used in last resort | |
523 | |
524 @param session(dict): session data | |
525 @param content_name(unicode): name of the current content | |
526 @param client(unicode): %(doc_client)s | |
527 """ | |
528 if session["role"] != self._j.ROLE_INITIATOR: | |
529 # only initiator must do the fallback, see XEP-0260 §3 | |
530 return | |
531 if self._jingle_ibb is None: | |
532 log.warning( | |
533 "Jingle IBB (XEP-0261) plugin is not available, we have to close the session" | |
534 ) | |
535 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session) | |
536 else: | |
537 d = self.host.hasFeature( | |
538 client, self._jingle_ibb.NAMESPACE, session["peer_jid"] | |
539 ) | |
540 d.addCallback(self._do_fallback, session, content_name, client) | |
541 return d | |
542 | |
543 | |
544 @implementer(iwokkel.IDisco) | |
545 class XEP_0260_handler(XMPPHandler): | |
546 | |
547 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
548 return [disco.DiscoFeature(NS_JINGLE_S5B)] | |
549 | |
550 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
551 return [] |