comparison libervia/backend/plugins/plugin_xep_0260.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0260.py@3900626bc100
children b86912d3fd33
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for Jingle (XEP-0260)
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from libervia.backend.core.i18n import _
21 from libervia.backend.core.constants import Const as C
22 from libervia.backend.core.log import getLogger
23
24 log = getLogger(__name__)
25 from libervia.backend.core import exceptions
26 from wokkel import disco, iwokkel
27 from zope.interface import implementer
28 from twisted.words.xish import domish
29 from twisted.words.protocols.jabber import jid
30 from twisted.internet import defer
31 import uuid
32
33 try:
34 from twisted.words.protocols.xmlstream import XMPPHandler
35 except ImportError:
36 from wokkel.subprotocols import XMPPHandler
37
38
39 NS_JINGLE_S5B = "urn:xmpp:jingle:transports:s5b:1"
40
41 PLUGIN_INFO = {
42 C.PI_NAME: "Jingle SOCKS5 Bytestreams",
43 C.PI_IMPORT_NAME: "XEP-0260",
44 C.PI_TYPE: "XEP",
45 C.PI_MODES: C.PLUG_MODE_BOTH,
46 C.PI_PROTOCOLS: ["XEP-0260"],
47 C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0065"],
48 C.PI_RECOMMENDATIONS: ["XEP-0261"], # needed for fallback
49 C.PI_MAIN: "XEP_0260",
50 C.PI_HANDLER: "yes",
51 C.PI_DESCRIPTION: _("""Implementation of Jingle SOCKS5 Bytestreams"""),
52 }
53
54
55 class ProxyError(Exception):
56 def __str__(self):
57 return "an error happened while trying to use the proxy"
58
59
60 class XEP_0260(object):
61 # TODO: udp handling
62
63 def __init__(self, host):
64 log.info(_("plugin Jingle SOCKS5 Bytestreams"))
65 self.host = host
66 self._j = host.plugins["XEP-0166"] # shortcut to access jingle
67 self._s5b = host.plugins["XEP-0065"] # and socks5 bytestream
68 try:
69 self._jingle_ibb = host.plugins["XEP-0261"]
70 except KeyError:
71 self._jingle_ibb = None
72 self._j.register_transport(NS_JINGLE_S5B, self._j.TRANSPORT_STREAMING, self, 100)
73
74 def get_handler(self, client):
75 return XEP_0260_handler()
76
77 def _parse_candidates(self, transport_elt):
78 """Parse <candidate> elements
79
80 @param transport_elt(domish.Element): parent <transport> element
81 @return (list[plugin_xep_0065.Candidate): list of parsed candidates
82 """
83 candidates = []
84 for candidate_elt in transport_elt.elements(NS_JINGLE_S5B, "candidate"):
85 try:
86 cid = candidate_elt["cid"]
87 host = candidate_elt["host"]
88 jid_ = jid.JID(candidate_elt["jid"])
89 port = int(candidate_elt.getAttribute("port", 1080))
90 priority = int(candidate_elt["priority"])
91 type_ = candidate_elt.getAttribute("type", self._s5b.TYPE_DIRECT)
92 except (KeyError, ValueError):
93 raise exceptions.DataError()
94 candidate = self._s5b.Candidate(host, port, type_, priority, jid_, cid)
95 candidates.append(candidate)
96 # self._s5b.registerCandidate(candidate)
97 return candidates
98
99 def _build_candidates(self, session, candidates, sid, session_hash, client, mode=None):
100 """Build <transport> element with candidates
101
102 @param session(dict): jingle session data
103 @param candidates(iterator[plugin_xep_0065.Candidate]): iterator of candidates to add
104 @param sid(unicode): transport stream id
105 @param client: %(doc_client)s
106 @param mode(str, None): 'tcp' or 'udp', or None to have no attribute
107 @return (domish.Element): parent <transport> element where <candidate> elements must be added
108 """
109 proxy = next(
110 (
111 candidate
112 for candidate in candidates
113 if candidate.type == self._s5b.TYPE_PROXY
114 ),
115 None,
116 )
117 transport_elt = domish.Element((NS_JINGLE_S5B, "transport"))
118 transport_elt["sid"] = sid
119 if proxy is not None:
120 transport_elt["dstaddr"] = session_hash
121 if mode is not None:
122 transport_elt["mode"] = "tcp" # XXX: we only manage tcp for now
123
124 for candidate in candidates:
125 log.debug("Adding candidate: {}".format(candidate))
126 candidate_elt = transport_elt.addElement("candidate", NS_JINGLE_S5B)
127 if candidate.id is None:
128 candidate.id = str(uuid.uuid4())
129 candidate_elt["cid"] = candidate.id
130 candidate_elt["host"] = candidate.host
131 candidate_elt["jid"] = candidate.jid.full()
132 candidate_elt["port"] = str(candidate.port)
133 candidate_elt["priority"] = str(candidate.priority)
134 candidate_elt["type"] = candidate.type
135 return transport_elt
136
137 @defer.inlineCallbacks
138 def jingle_session_init(self, client, session, content_name):
139 content_data = session["contents"][content_name]
140 transport_data = content_data["transport_data"]
141 sid = transport_data["sid"] = str(uuid.uuid4())
142 session_hash = transport_data["session_hash"] = self._s5b.get_session_hash(
143 session["local_jid"], session["peer_jid"], sid
144 )
145 transport_data["peer_session_hash"] = self._s5b.get_session_hash(
146 session["peer_jid"], session["local_jid"], sid
147 ) # requester and target are inversed for peer candidates
148 transport_data["stream_d"] = self._s5b.register_hash(client, session_hash, None)
149 candidates = transport_data["candidates"] = yield self._s5b.get_candidates(
150 client, session["local_jid"])
151 mode = "tcp" # XXX: we only manage tcp for now
152 transport_elt = self._build_candidates(
153 session, candidates, sid, session_hash, client, mode
154 )
155
156 defer.returnValue(transport_elt)
157
158 def _proxy_activated_cb(self, iq_result_elt, client, candidate, session, content_name):
159 """Called when activation confirmation has been received from proxy
160
161 cf XEP-0260 § 2.4
162 """
163 # now that the proxy is activated, we have to inform other peer
164 content_data = session["contents"][content_name]
165 iq_elt, transport_elt = self._j.build_action(
166 client, self._j.A_TRANSPORT_INFO, session, content_name
167 )
168 transport_elt["sid"] = content_data["transport_data"]["sid"]
169 activated_elt = transport_elt.addElement("activated")
170 activated_elt["cid"] = candidate.id
171 iq_elt.send()
172
173 def _proxy_activated_eb(self, stanza_error, client, candidate, session, content_name):
174 """Called when activation error has been received from proxy
175
176 cf XEP-0260 § 2.4
177 """
178 # TODO: fallback to IBB
179 # now that the proxy is activated, we have to inform other peer
180 content_data = session["contents"][content_name]
181 iq_elt, transport_elt = self._j.build_action(
182 client, self._j.A_TRANSPORT_INFO, session, content_name
183 )
184 transport_elt["sid"] = content_data["transport_data"]["sid"]
185 transport_elt.addElement("proxy-error")
186 iq_elt.send()
187 log.warning(
188 "Can't activate proxy, we need to fallback to IBB: {reason}".format(
189 reason=stanza_error.value.condition
190 )
191 )
192 self.do_fallback(session, content_name, client)
193
194 def _found_peer_candidate(
195 self, candidate, session, transport_data, content_name, client
196 ):
197 """Called when the best candidate from other peer is found
198
199 @param candidate(XEP_0065.Candidate, None): selected candidate,
200 or None if no candidate is accessible
201 @param session(dict): session data
202 @param transport_data(dict): transport data
203 @param content_name(unicode): name of the current content
204 @param client(unicode): %(doc_client)s
205 """
206
207 content_data = session["contents"][content_name]
208 transport_data["best_candidate"] = candidate
209 # we need to disconnect all non selected candidates before removing them
210 for c in transport_data["peer_candidates"]:
211 if c is None or c is candidate:
212 continue
213 c.discard()
214 del transport_data["peer_candidates"]
215 iq_elt, transport_elt = self._j.build_action(
216 client, self._j.A_TRANSPORT_INFO, session, content_name
217 )
218 transport_elt["sid"] = content_data["transport_data"]["sid"]
219 if candidate is None:
220 log.warning("Can't connect to any peer candidate")
221 candidate_elt = transport_elt.addElement("candidate-error")
222 else:
223 log.info("Found best peer candidate: {}".format(str(candidate)))
224 candidate_elt = transport_elt.addElement("candidate-used")
225 candidate_elt["cid"] = candidate.id
226 iq_elt.send() # TODO: check result stanza
227 self._check_candidates(session, content_name, transport_data, client)
228
229 def _check_candidates(self, session, content_name, transport_data, client):
230 """Called when a candidate has been choosed
231
232 if we have both candidates, we select one, or fallback to an other transport
233 @param session(dict): session data
234 @param content_name(unicode): name of the current content
235 @param transport_data(dict): transport data
236 @param client(unicode): %(doc_client)s
237 """
238 content_data = session["contents"][content_name]
239 try:
240 best_candidate = transport_data["best_candidate"]
241 except KeyError:
242 # we have not our best candidate yet
243 return
244 try:
245 peer_best_candidate = transport_data["peer_best_candidate"]
246 except KeyError:
247 # we have not peer best candidate yet
248 return
249
250 # at this point we have both candidates, it's time to choose one
251 if best_candidate is None or peer_best_candidate is None:
252 choosed_candidate = best_candidate or peer_best_candidate
253 else:
254 if best_candidate.priority == peer_best_candidate.priority:
255 # same priority, we choose initiator one according to XEP-0260 §2.4 #4
256 log.debug(
257 "Candidates have same priority, we select the one choosed by initiator"
258 )
259 if session["initiator"] == session["local_jid"]:
260 choosed_candidate = best_candidate
261 else:
262 choosed_candidate = peer_best_candidate
263 else:
264 choosed_candidate = max(
265 best_candidate, peer_best_candidate, key=lambda c: c.priority
266 )
267
268 if choosed_candidate is None:
269 log.warning("Socks5 negociation failed, we need to fallback to IBB")
270 self.do_fallback(session, content_name, client)
271 else:
272 if choosed_candidate == peer_best_candidate:
273 # peer_best_candidate was choosed from the candidates we have sent
274 # so our_candidate is true if choosed_candidate is peer_best_candidate
275 our_candidate = True
276 # than also mean that best_candidate must be discarded !
277 try:
278 best_candidate.discard()
279 except AttributeError: # but it can be None
280 pass
281 else:
282 our_candidate = False
283
284 log.info(
285 "Socks5 negociation successful, {who} candidate will be used: {candidate}".format(
286 who="our" if our_candidate else "other peer",
287 candidate=choosed_candidate,
288 )
289 )
290 del transport_data["best_candidate"]
291 del transport_data["peer_best_candidate"]
292
293 if choosed_candidate.type == self._s5b.TYPE_PROXY:
294 # the stream transfer need to wait for proxy activation
295 # (see XEP-0260 § 2.4)
296 if our_candidate:
297 d = self._s5b.connect_candidate(
298 client, choosed_candidate, transport_data["session_hash"]
299 )
300 d.addCallback(
301 lambda __: choosed_candidate.activate(
302 transport_data["sid"], session["peer_jid"], client
303 )
304 )
305 args = [client, choosed_candidate, session, content_name]
306 d.addCallbacks(
307 self._proxy_activated_cb, self._proxy_activated_eb, args, None, args
308 )
309 else:
310 # this Deferred will be called when we'll receive activation confirmation from other peer
311 d = transport_data["activation_d"] = defer.Deferred()
312 else:
313 d = defer.succeed(None)
314
315 if content_data["senders"] == session["role"]:
316 # we can now start the stream transfer (or start it after proxy activation)
317 d.addCallback(
318 lambda __: choosed_candidate.start_transfer(
319 transport_data["session_hash"]
320 )
321 )
322 d.addErrback(self._start_eb, session, content_name, client)
323
324 def _start_eb(self, fail, session, content_name, client):
325 """Called when it's not possible to start the transfer
326
327 Will try to fallback to IBB
328 """
329 try:
330 reason = str(fail.value)
331 except AttributeError:
332 reason = str(fail)
333 log.warning("Cant start transfert, we'll try fallback method: {}".format(reason))
334 self.do_fallback(session, content_name, client)
335
336 def _candidate_info(
337 self, candidate_elt, session, content_name, transport_data, client
338 ):
339 """Called when best candidate has been received from peer (or if none is working)
340
341 @param candidate_elt(domish.Element): candidate-used or candidate-error element
342 (see XEP-0260 §2.3)
343 @param session(dict): session data
344 @param content_name(unicode): name of the current content
345 @param transport_data(dict): transport data
346 @param client(unicode): %(doc_client)s
347 """
348 if candidate_elt.name == "candidate-error":
349 # candidate-error, no candidate worked
350 transport_data["peer_best_candidate"] = None
351 else:
352 # candidate-used, one candidate was choosed
353 try:
354 cid = candidate_elt.attributes["cid"]
355 except KeyError:
356 log.warning("No cid found in <candidate-used>")
357 raise exceptions.DataError
358 try:
359 candidate = next((
360 c for c in transport_data["candidates"] if c.id == cid
361 ))
362 except StopIteration:
363 log.warning("Given cid doesn't correspond to any known candidate !")
364 raise exceptions.DataError # TODO: send an error to other peer, and use better exception
365 except KeyError:
366 # a transport-info can also be intentionaly sent too early by other peer
367 # but there is little probability
368 log.error(
369 '"candidates" key doesn\'t exists in transport_data, it should at this point'
370 )
371 raise exceptions.InternalError
372 # at this point we have the candidate choosed by other peer
373 transport_data["peer_best_candidate"] = candidate
374 log.info("Other peer best candidate: {}".format(candidate))
375
376 del transport_data["candidates"]
377 self._check_candidates(session, content_name, transport_data, client)
378
379 def _proxy_activation_info(
380 self, proxy_elt, session, content_name, transport_data, client
381 ):
382 """Called when proxy has been activated (or has sent an error)
383
384 @param proxy_elt(domish.Element): <activated/> or <proxy-error/> element
385 (see XEP-0260 §2.4)
386 @param session(dict): session data
387 @param content_name(unicode): name of the current content
388 @param transport_data(dict): transport data
389 @param client(unicode): %(doc_client)s
390 """
391 try:
392 activation_d = transport_data.pop("activation_d")
393 except KeyError:
394 log.warning("Received unexpected transport-info for proxy activation")
395
396 if proxy_elt.name == "activated":
397 activation_d.callback(None)
398 else:
399 activation_d.errback(ProxyError())
400
401 @defer.inlineCallbacks
402 def jingle_handler(self, client, action, session, content_name, transport_elt):
403 content_data = session["contents"][content_name]
404 transport_data = content_data["transport_data"]
405
406 if action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
407 pass
408
409 elif action == self._j.A_SESSION_ACCEPT:
410 # initiator side, we select a candidate in the ones sent by responder
411 assert "peer_candidates" not in transport_data
412 transport_data["peer_candidates"] = self._parse_candidates(transport_elt)
413
414 elif action == self._j.A_START:
415 session_hash = transport_data["session_hash"]
416 peer_candidates = transport_data["peer_candidates"]
417 stream_object = content_data["stream_object"]
418 self._s5b.associate_stream_object(client, session_hash, stream_object)
419 stream_d = transport_data.pop("stream_d")
420 stream_d.chainDeferred(content_data["finished_d"])
421 peer_session_hash = transport_data["peer_session_hash"]
422 d = self._s5b.get_best_candidate(
423 client, peer_candidates, session_hash, peer_session_hash
424 )
425 d.addCallback(
426 self._found_peer_candidate, session, transport_data, content_name, client
427 )
428
429 elif action == self._j.A_SESSION_INITIATE:
430 # responder side, we select a candidate in the ones sent by initiator
431 # and we give our candidates
432 assert "peer_candidates" not in transport_data
433 sid = transport_data["sid"] = transport_elt["sid"]
434 session_hash = transport_data["session_hash"] = self._s5b.get_session_hash(
435 session["local_jid"], session["peer_jid"], sid
436 )
437 peer_session_hash = transport_data[
438 "peer_session_hash"
439 ] = self._s5b.get_session_hash(
440 session["peer_jid"], session["local_jid"], sid
441 ) # requester and target are inversed for peer candidates
442 peer_candidates = transport_data["peer_candidates"] = self._parse_candidates(
443 transport_elt
444 )
445 stream_object = content_data["stream_object"]
446 stream_d = self._s5b.register_hash(client, session_hash, stream_object)
447 stream_d.chainDeferred(content_data["finished_d"])
448 d = self._s5b.get_best_candidate(
449 client, peer_candidates, session_hash, peer_session_hash
450 )
451 d.addCallback(
452 self._found_peer_candidate, session, transport_data, content_name, client
453 )
454 candidates = yield self._s5b.get_candidates(client, session["local_jid"])
455 # we remove duplicate candidates
456 candidates = [
457 candidate for candidate in candidates if candidate not in peer_candidates
458 ]
459
460 transport_data["candidates"] = candidates
461 # we can now build a new <transport> element with our candidates
462 transport_elt = self._build_candidates(
463 session, candidates, sid, session_hash, client
464 )
465
466 elif action == self._j.A_TRANSPORT_INFO:
467 # transport-info can be about candidate or proxy activation
468 candidate_elt = None
469
470 for method, names in (
471 (self._candidate_info, ("candidate-used", "candidate-error")),
472 (self._proxy_activation_info, ("activated", "proxy-error")),
473 ):
474 for name in names:
475 try:
476 candidate_elt = next(transport_elt.elements(NS_JINGLE_S5B, name))
477 except StopIteration:
478 continue
479 else:
480 method(
481 candidate_elt, session, content_name, transport_data, client
482 )
483 break
484
485 if candidate_elt is None:
486 log.warning(
487 "Unexpected transport element: {}".format(transport_elt.toXml())
488 )
489 elif action == self._j.A_DESTROY:
490 # the transport is replaced (fallback ?), We need mainly to kill XEP-0065 session.
491 # note that sid argument is not necessary for sessions created by this plugin
492 self._s5b.kill_session(None, transport_data["session_hash"], None, client)
493 else:
494 log.warning("FIXME: unmanaged action {}".format(action))
495
496 defer.returnValue(transport_elt)
497
498 def jingle_terminate(self, client, action, session, content_name, reason_elt):
499 if reason_elt.decline:
500 log.debug("Session declined, deleting S5B session")
501 # we just need to clean the S5B session if it is declined
502 content_data = session["contents"][content_name]
503 transport_data = content_data["transport_data"]
504 self._s5b.kill_session(None, transport_data["session_hash"], None, client)
505
506 def _do_fallback(self, feature_checked, session, content_name, client):
507 """Do the fallback, method called once feature is checked
508
509 @param feature_checked(bool): True if other peer can do IBB
510 """
511 if not feature_checked:
512 log.warning(
513 "Other peer can't manage jingle IBB, be have to terminate the session"
514 )
515 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
516 else:
517 self._j.transport_replace(
518 client, self._jingle_ibb.NAMESPACE, session, content_name
519 )
520
521 def do_fallback(self, session, content_name, client):
522 """Fallback to IBB transport, used in last resort
523
524 @param session(dict): session data
525 @param content_name(unicode): name of the current content
526 @param client(unicode): %(doc_client)s
527 """
528 if session["role"] != self._j.ROLE_INITIATOR:
529 # only initiator must do the fallback, see XEP-0260 §3
530 return
531 if self._jingle_ibb is None:
532 log.warning(
533 "Jingle IBB (XEP-0261) plugin is not available, we have to close the session"
534 )
535 self._j.terminate(client, self._j.REASON_CONNECTIVITY_ERROR, session)
536 else:
537 d = self.host.hasFeature(
538 client, self._jingle_ibb.NAMESPACE, session["peer_jid"]
539 )
540 d.addCallback(self._do_fallback, session, content_name, client)
541 return d
542
543
544 @implementer(iwokkel.IDisco)
545 class XEP_0260_handler(XMPPHandler):
546
547 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
548 return [disco.DiscoFeature(NS_JINGLE_S5B)]
549
550 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
551 return []