Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0166/__init__.py @ 4044:3900626bc100
plugin XEP-0166: refactoring, and various improvments:
- add models for transport and applications handlers and linked data
- split models into separate file
- some type hints
- some documentation comments
- add actions to prepare confirmation, useful to do initial parsing of all contents
- application arg/kwargs and some transport data can be initialised during Jingle
`initiate` call, this is notably useful when a call is made with transport data (this is
the call for A/V calls where codecs and ICE candidate can be specified when starting a
call)
- session data can be specified during Jingle `initiate` call
- new `store_in_session` argument in `_parse_elements`, which can be used to avoid
race-condition when a context element (<decription> or <transport>) is being parsed for
an action while an other action happens (like `transport-info`)
- don't sed `sid` in `transport_elt` during a `transport-info` action anymore in
`build_action`: this is specific to Jingle File Transfer and has been moved there
rel 419
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 15 May 2023 16:23:11 +0200 |
parents | sat/plugins/plugin_xep_0166.py@524856bd7b19 |
children | dd39e60ca2aa |
comparison
equal
deleted
inserted
replaced
4043:9641ce286e07 | 4044:3900626bc100 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for Jingle (XEP-0166) | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 | |
20 import time | |
21 from typing import Any, Callable, Dict, Final, List, Optional, Tuple | |
22 import uuid | |
23 | |
24 from twisted.internet import defer | |
25 from twisted.internet import reactor | |
26 from twisted.python import failure | |
27 from twisted.words.protocols.jabber import jid | |
28 from twisted.words.protocols.jabber import error | |
29 from twisted.words.protocols.jabber import xmlstream | |
30 from twisted.words.xish import domish | |
31 from wokkel import disco, iwokkel | |
32 from zope.interface import implementer | |
33 | |
34 from sat.core import exceptions | |
35 from sat.core.constants import Const as C | |
36 from sat.core.core_types import SatXMPPEntity | |
37 from sat.core.i18n import D_, _ | |
38 from sat.core.log import getLogger | |
39 from sat.tools import xml_tools | |
40 from sat.tools import utils | |
41 | |
42 from .models import ( | |
43 ApplicationData, | |
44 BaseApplicationHandler, | |
45 BaseTransportHandler, | |
46 ContentData, | |
47 TransportData, | |
48 ) | |
49 | |
50 | |
51 log = getLogger(__name__) | |
52 | |
53 | |
54 IQ_SET : Final = '/iq[@type="set"]' | |
55 NS_JINGLE : Final = "urn:xmpp:jingle:1" | |
56 NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1" | |
57 JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]' | |
58 STATE_PENDING : Final = "PENDING" | |
59 STATE_ACTIVE : Final = "ACTIVE" | |
60 STATE_ENDED : Final = "ENDED" | |
61 CONFIRM_TXT : Final = D_( | |
62 "{entity} want to start a jingle session with you, do you accept ?" | |
63 ) | |
64 | |
65 PLUGIN_INFO : Final = { | |
66 C.PI_NAME: "Jingle", | |
67 C.PI_IMPORT_NAME: "XEP-0166", | |
68 C.PI_TYPE: "XEP", | |
69 C.PI_MODES: C.PLUG_MODE_BOTH, | |
70 C.PI_PROTOCOLS: ["XEP-0166"], | |
71 C.PI_MAIN: "XEP_0166", | |
72 C.PI_HANDLER: "yes", | |
73 C.PI_DESCRIPTION: _("""Implementation of Jingle"""), | |
74 } | |
75 | |
76 | |
77 class XEP_0166: | |
78 namespace : Final = NS_JINGLE | |
79 | |
80 ROLE_INITIATOR : Final = "initiator" | |
81 ROLE_RESPONDER : Final = "responder" | |
82 | |
83 TRANSPORT_DATAGRAM : Final = "UDP" | |
84 TRANSPORT_STREAMING : Final = "TCP" | |
85 | |
86 REASON_SUCCESS : Final = "success" | |
87 REASON_DECLINE : Final = "decline" | |
88 REASON_FAILED_APPLICATION : Final = "failed-application" | |
89 REASON_FAILED_TRANSPORT : Final = "failed-transport" | |
90 REASON_CONNECTIVITY_ERROR : Final = "connectivity-error" | |
91 | |
92 # standard actions | |
93 | |
94 A_SESSION_INITIATE : Final = "session-initiate" | |
95 A_SESSION_ACCEPT : Final = "session-accept" | |
96 A_SESSION_TERMINATE : Final = "session-terminate" | |
97 A_SESSION_INFO : Final = "session-info" | |
98 A_TRANSPORT_REPLACE : Final = "transport-replace" | |
99 A_TRANSPORT_ACCEPT : Final = "transport-accept" | |
100 A_TRANSPORT_REJECT : Final = "transport-reject" | |
101 A_TRANSPORT_INFO : Final = "transport-info" | |
102 | |
103 # non standard actions | |
104 | |
105 #: called before the confirmation request, first event for responder, useful for | |
106 #: parsing | |
107 A_PREPARE_CONFIRMATION : Final = "prepare-confirmation" | |
108 #: initiator must prepare tranfer | |
109 A_PREPARE_INITIATOR : Final = "prepare-initiator" | |
110 #: responder must prepare tranfer | |
111 A_PREPARE_RESPONDER : Final = "prepare-responder" | |
112 #; session accepted ack has been received from initiator | |
113 A_ACCEPTED_ACK : Final = ( | |
114 "accepted-ack" | |
115 ) | |
116 A_START : Final = "start" # application can start | |
117 #: called when a transport is destroyed (e.g. because it is remplaced). Used to do | |
118 #: cleaning operations | |
119 A_DESTROY : Final = ( | |
120 "destroy" | |
121 ) | |
122 | |
123 def __init__(self, host): | |
124 log.info(_("plugin Jingle initialization")) | |
125 self.host = host | |
126 self._applications = {} # key: namespace, value: application data | |
127 self._transports = {} # key: namespace, value: transport data | |
128 # we also keep transports by type, they are then sorted by priority | |
129 self._type_transports = { | |
130 XEP_0166.TRANSPORT_DATAGRAM: [], | |
131 XEP_0166.TRANSPORT_STREAMING: [], | |
132 } | |
133 | |
134 def profile_connected(self, client): | |
135 client.jingle_sessions = {} # key = sid, value = session_data | |
136 | |
137 def get_handler(self, client): | |
138 return XEP_0166_handler(self) | |
139 | |
140 def get_session(self, client: SatXMPPEntity, session_id: str) -> dict: | |
141 """Retrieve session from its SID | |
142 | |
143 @param session_id: session ID | |
144 @return: found session | |
145 | |
146 @raise exceptions.NotFound: no session with this SID has been found | |
147 """ | |
148 try: | |
149 return client.jingle_sessions[session_id] | |
150 except KeyError: | |
151 raise exceptions.NotFound( | |
152 f"No session with SID {session_id} found" | |
153 ) | |
154 | |
155 | |
156 def _del_session(self, client, sid): | |
157 try: | |
158 del client.jingle_sessions[sid] | |
159 except KeyError: | |
160 log.debug( | |
161 f"Jingle session id {sid!r} is unknown, nothing to delete " | |
162 f"[{client.profile}]") | |
163 else: | |
164 log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]") | |
165 | |
166 ## helpers methods to build stanzas ## | |
167 | |
168 def _build_jingle_elt( | |
169 self, | |
170 client: SatXMPPEntity, | |
171 session: dict, | |
172 action: str | |
173 ) -> Tuple[xmlstream.IQ, domish.Element]: | |
174 iq_elt = client.IQ("set") | |
175 iq_elt["from"] = session['local_jid'].full() | |
176 iq_elt["to"] = session["peer_jid"].full() | |
177 jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) | |
178 jingle_elt["sid"] = session["id"] | |
179 jingle_elt["action"] = action | |
180 return iq_elt, jingle_elt | |
181 | |
182 def sendError(self, client, error_condition, sid, request, jingle_condition=None): | |
183 """Send error stanza | |
184 | |
185 @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys | |
186 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed | |
187 @param request(domish.Element): original request | |
188 @param jingle_condition(None, unicode): if not None, additional jingle-specific error information | |
189 """ | |
190 iq_elt = error.StanzaError(error_condition).toResponse(request) | |
191 if jingle_condition is not None: | |
192 iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition)) | |
193 if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid: | |
194 self._del_session(client, sid) | |
195 log.warning( | |
196 "Error while managing jingle session, cancelling: {condition}".format( | |
197 condition=error_condition | |
198 ) | |
199 ) | |
200 return client.send(iq_elt) | |
201 | |
202 def _terminate_eb(self, failure_): | |
203 log.warning(_("Error while terminating session: {msg}").format(msg=failure_)) | |
204 | |
205 def terminate(self, client, reason, session, text=None): | |
206 """Terminate the session | |
207 | |
208 send the session-terminate action, and delete the session data | |
209 @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element | |
210 if a list of element, add them as children of the <reason/> element | |
211 @param session(dict): data of the session | |
212 """ | |
213 iq_elt, jingle_elt = self._build_jingle_elt( | |
214 client, session, XEP_0166.A_SESSION_TERMINATE | |
215 ) | |
216 reason_elt = jingle_elt.addElement("reason") | |
217 if isinstance(reason, str): | |
218 reason_elt.addElement(reason) | |
219 else: | |
220 for elt in reason: | |
221 reason_elt.addChild(elt) | |
222 if text is not None: | |
223 reason_elt.addElement("text", content=text) | |
224 self._del_session(client, session["id"]) | |
225 d = iq_elt.send() | |
226 d.addErrback(self._terminate_eb) | |
227 return d | |
228 | |
229 ## errors which doesn't imply a stanza sending ## | |
230 | |
231 def _iq_error(self, failure_, sid, client): | |
232 """Called when we got an <iq/> error | |
233 | |
234 @param failure_(failure.Failure): the exceptions raised | |
235 @param sid(unicode): jingle session id | |
236 """ | |
237 log.warning( | |
238 "Error while sending jingle <iq/> stanza: {failure_}".format( | |
239 failure_=failure_.value | |
240 ) | |
241 ) | |
242 self._del_session(client, sid) | |
243 | |
244 def _jingle_error_cb(self, failure_, session, request, client): | |
245 """Called when something is going wrong while parsing jingle request | |
246 | |
247 The error condition depend of the exceptions raised: | |
248 exceptions.DataError raise a bad-request condition | |
249 @param fail(failure.Failure): the exceptions raised | |
250 @param session(dict): data of the session | |
251 @param request(domsih.Element): jingle request | |
252 @param client: %(doc_client)s | |
253 """ | |
254 log.warning(f"Error while processing jingle request [{client.profile}]") | |
255 if isinstance(failure_.value, defer.FirstError): | |
256 failure_ = failure_.value.subFailure.value | |
257 if isinstance(failure_, exceptions.DataError): | |
258 return self.sendError(client, "bad-request", session["id"], request) | |
259 elif isinstance(failure_, error.StanzaError): | |
260 return self.terminate(client, self.REASON_FAILED_APPLICATION, session, | |
261 text=str(failure_)) | |
262 else: | |
263 log.error(f"Unmanaged jingle exception: {failure_}") | |
264 return self.terminate(client, self.REASON_FAILED_APPLICATION, session, | |
265 text=str(failure_)) | |
266 | |
267 ## methods used by other plugins ## | |
268 | |
269 def register_application( | |
270 self, | |
271 namespace: str, | |
272 handler: BaseApplicationHandler | |
273 ) -> None: | |
274 """Register an application plugin | |
275 | |
276 @param namespace(unicode): application namespace managed by the plugin | |
277 @param handler(object): instance of a class which manage the application. | |
278 May have the following methods: | |
279 - request_confirmation(session, desc_elt, client): | |
280 - if present, it is called on when session must be accepted. | |
281 - if it return True the session is accepted, else rejected. | |
282 A Deferred can be returned | |
283 - if not present, a generic accept dialog will be used | |
284 - jingle_session_init( | |
285 client, self, session, content_name[, *args, **kwargs] | |
286 ): must return the domish.Element used for initial content | |
287 - jingle_handler( | |
288 client, self, action, session, content_name, transport_elt | |
289 ): | |
290 called on several action to negociate the application or transport | |
291 - jingle_terminate: called on session terminate, with reason_elt | |
292 May be used to clean session | |
293 """ | |
294 if namespace in self._applications: | |
295 raise exceptions.ConflictError( | |
296 f"Trying to register already registered namespace {namespace}" | |
297 ) | |
298 self._applications[namespace] = ApplicationData( | |
299 namespace=namespace, handler=handler | |
300 ) | |
301 log.debug("new jingle application registered") | |
302 | |
303 def register_transport( | |
304 self, | |
305 namespace: str, | |
306 transport_type: str, | |
307 handler: BaseTransportHandler, | |
308 priority: int = 0 | |
309 ) -> None: | |
310 """Register a transport plugin | |
311 | |
312 @param namespace: the XML namespace used for this transport | |
313 @param transport_type: type of transport to use (see XEP-0166 §8) | |
314 @param handler: instance of a class which manage the application. | |
315 @param priority: priority of this transport | |
316 """ | |
317 assert transport_type in ( | |
318 XEP_0166.TRANSPORT_DATAGRAM, | |
319 XEP_0166.TRANSPORT_STREAMING, | |
320 ) | |
321 if namespace in self._transports: | |
322 raise exceptions.ConflictError( | |
323 "Trying to register already registered namespace {}".format(namespace) | |
324 ) | |
325 transport_data = TransportData( | |
326 namespace=namespace, handler=handler, priority=priority | |
327 ) | |
328 self._type_transports[transport_type].append(transport_data) | |
329 self._type_transports[transport_type].sort( | |
330 key=lambda transport_data: transport_data.priority, reverse=True | |
331 ) | |
332 self._transports[namespace] = transport_data | |
333 log.debug("new jingle transport registered") | |
334 | |
335 @defer.inlineCallbacks | |
336 def transport_replace(self, client, transport_ns, session, content_name): | |
337 """Replace a transport | |
338 | |
339 @param transport_ns(unicode): namespace of the new transport to use | |
340 @param session(dict): jingle session data | |
341 @param content_name(unicode): name of the content | |
342 """ | |
343 # XXX: for now we replace the transport before receiving confirmation from other peer | |
344 # this is acceptable because we terminate the session if transport is rejected. | |
345 # this behavious may change in the future. | |
346 content_data = session["contents"][content_name] | |
347 transport_data = content_data["transport_data"] | |
348 try: | |
349 transport = self._transports[transport_ns] | |
350 except KeyError: | |
351 raise exceptions.InternalError("Unkown transport") | |
352 yield content_data["transport"].handler.jingle_handler( | |
353 client, XEP_0166.A_DESTROY, session, content_name, None | |
354 ) | |
355 content_data["transport"] = transport | |
356 transport_data.clear() | |
357 | |
358 iq_elt, jingle_elt = self._build_jingle_elt( | |
359 client, session, XEP_0166.A_TRANSPORT_REPLACE | |
360 ) | |
361 content_elt = jingle_elt.addElement("content") | |
362 content_elt["name"] = content_name | |
363 content_elt["creator"] = content_data["creator"] | |
364 | |
365 transport_elt = transport.handler.jingle_session_init(client, session, content_name) | |
366 content_elt.addChild(transport_elt) | |
367 iq_elt.send() | |
368 | |
369 def build_action( | |
370 self, | |
371 client: SatXMPPEntity, | |
372 action: str, | |
373 session: dict, | |
374 content_name: str, | |
375 iq_elt: Optional[xmlstream.IQ] = None, | |
376 context_elt: Optional[domish.Element] = None | |
377 ) -> Tuple[xmlstream.IQ, domish.Element]: | |
378 """Build an element according to requested action | |
379 | |
380 @param action: a jingle action (see XEP-0166 §7.2), | |
381 session-* actions are not managed here | |
382 transport-replace is managed in the dedicated [transport_replace] method | |
383 @param session: jingle session data | |
384 @param content_name: name of the content | |
385 @param iq_elt: use this IQ instead of creating a new one if provided | |
386 @param context_elt: use this element instead of creating a new one if provided | |
387 @return: parent <iq> element, <transport> or <description> element, according to action | |
388 """ | |
389 # we first build iq, jingle and content element which are the same in every cases | |
390 if iq_elt is not None: | |
391 try: | |
392 jingle_elt = next(iq_elt.elements(NS_JINGLE, "jingle")) | |
393 except StopIteration: | |
394 raise exceptions.InternalError( | |
395 "The <iq> element provided doesn't have a <jingle> element" | |
396 ) | |
397 else: | |
398 iq_elt, jingle_elt = self._build_jingle_elt(client, session, action) | |
399 # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked | |
400 content_data = session["contents"][content_name] | |
401 content_elt = jingle_elt.addElement("content") | |
402 content_elt["name"] = content_name | |
403 content_elt["creator"] = content_data["creator"] | |
404 | |
405 if context_elt is not None: | |
406 pass | |
407 elif action == XEP_0166.A_TRANSPORT_INFO: | |
408 context_elt = transport_elt = content_elt.addElement( | |
409 "transport", content_data["transport"].namespace | |
410 ) | |
411 else: | |
412 raise exceptions.InternalError(f"unmanaged action {action}") | |
413 | |
414 return iq_elt, context_elt | |
415 | |
416 def build_session_info(self, client, session): | |
417 """Build a session-info action | |
418 | |
419 @param session(dict): jingle session data | |
420 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element | |
421 """ | |
422 return self._build_jingle_elt(client, session, XEP_0166.A_SESSION_INFO) | |
423 | |
424 def get_application(self, namespace: str) -> ApplicationData: | |
425 """Retreive application corresponding to a namespace | |
426 | |
427 @raise exceptions.NotFound if application can't be found | |
428 """ | |
429 try: | |
430 return self._applications[namespace] | |
431 except KeyError: | |
432 raise exceptions.NotFound( | |
433 f"No application registered for {namespace}" | |
434 ) | |
435 | |
436 def get_content_data(self, content: dict) -> ContentData: | |
437 """"Retrieve application and its argument from content""" | |
438 app_ns = content["app_ns"] | |
439 try: | |
440 application = self.get_application(app_ns) | |
441 except exceptions.NotFound as e: | |
442 raise exceptions.InternalError(str(e)) | |
443 app_args = content.get("app_args", []) | |
444 app_kwargs = content.get("app_kwargs", {}) | |
445 transport_data = content.get("transport_data", {}) | |
446 try: | |
447 content_name = content["name"] | |
448 except KeyError: | |
449 content_name = content["name"] = str(uuid.uuid4()) | |
450 return ContentData( | |
451 application, | |
452 app_args, | |
453 app_kwargs, | |
454 transport_data, | |
455 content_name | |
456 ) | |
457 | |
458 async def initiate( | |
459 self, | |
460 client: SatXMPPEntity, | |
461 peer_jid: jid.JID, | |
462 contents: List[dict], | |
463 encrypted: bool = False, | |
464 **extra_data: Any | |
465 ) -> str: | |
466 """Send a session initiation request | |
467 | |
468 @param peer_jid: jid to establith session with | |
469 @param contents: list of contents to use: | |
470 The dict must have the following keys: | |
471 - app_ns(str): namespace of the application | |
472 the following keys are optional: | |
473 - transport_type(str): type of transport to use (see XEP-0166 §8) | |
474 default to TRANSPORT_STREAMING | |
475 - name(str): name of the content | |
476 - senders(str): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none | |
477 default to BOTH (see XEP-0166 §7.3) | |
478 - app_args(list): args to pass to the application plugin | |
479 - app_kwargs(dict): keyword args to pass to the application plugin | |
480 @param encrypted: if True, session must be encrypted and "encryption" must be set | |
481 to all content data of session | |
482 @return: jingle session id | |
483 """ | |
484 assert contents # there must be at least one content | |
485 if (peer_jid == client.jid | |
486 or client.is_component and peer_jid.host == client.jid.host): | |
487 raise ValueError(_("You can't do a jingle session with yourself")) | |
488 initiator = client.jid | |
489 sid = str(uuid.uuid4()) | |
490 # TODO: session cleaning after timeout ? | |
491 session = client.jingle_sessions[sid] = { | |
492 "id": sid, | |
493 "state": STATE_PENDING, | |
494 "initiator": initiator, | |
495 "role": XEP_0166.ROLE_INITIATOR, | |
496 "local_jid": client.jid, | |
497 "peer_jid": peer_jid, | |
498 "started": time.time(), | |
499 "contents": {}, | |
500 **extra_data, | |
501 } | |
502 | |
503 if not await self.host.trigger.async_point( | |
504 "XEP-0166_initiate", | |
505 client, session, contents | |
506 ): | |
507 return sid | |
508 | |
509 iq_elt, jingle_elt = self._build_jingle_elt( | |
510 client, session, XEP_0166.A_SESSION_INITIATE | |
511 ) | |
512 jingle_elt["initiator"] = initiator.full() | |
513 | |
514 session_contents = session["contents"] | |
515 | |
516 for content in contents: | |
517 # we get the application plugin | |
518 content_data = self.get_content_data(content) | |
519 | |
520 # and the transport plugin | |
521 transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING) | |
522 try: | |
523 transport = self._type_transports[transport_type][0] | |
524 except IndexError: | |
525 raise exceptions.InternalError( | |
526 "No transport registered for {}".format(transport_type) | |
527 ) | |
528 | |
529 # we build the session data for this content | |
530 session_content = { | |
531 "application": content_data.application, | |
532 "application_data": {}, | |
533 "transport": transport, | |
534 "transport_data": content_data.transport_data, | |
535 "creator": XEP_0166.ROLE_INITIATOR, | |
536 "senders": content.get("senders", "both"), | |
537 } | |
538 if content_data.content_name in session_contents: | |
539 raise exceptions.InternalError( | |
540 "There is already a content with this name" | |
541 ) | |
542 session_contents[content_data.content_name] = session_content | |
543 | |
544 # we construct the content element | |
545 content_elt = jingle_elt.addElement("content") | |
546 content_elt["creator"] = session_content["creator"] | |
547 content_elt["name"] = content_data.content_name | |
548 try: | |
549 content_elt["senders"] = content["senders"] | |
550 except KeyError: | |
551 pass | |
552 | |
553 # then the description element | |
554 desc_elt = await utils.as_deferred( | |
555 content_data.application.handler.jingle_session_init, | |
556 client, session, content_data.content_name, | |
557 *content_data.app_args, **content_data.app_kwargs | |
558 ) | |
559 content_elt.addChild(desc_elt) | |
560 | |
561 # and the transport one | |
562 transport_elt = await utils.as_deferred( | |
563 transport.handler.jingle_session_init, | |
564 client, session, content_data.content_name, | |
565 ) | |
566 content_elt.addChild(transport_elt) | |
567 | |
568 if not await self.host.trigger.async_point( | |
569 "XEP-0166_initiate_elt_built", | |
570 client, session, iq_elt, jingle_elt | |
571 ): | |
572 return sid | |
573 if encrypted: | |
574 for content in session["contents"].values(): | |
575 if "encryption" not in content: | |
576 raise exceptions.EncryptionError( | |
577 "Encryption is requested, but no encryption has been set" | |
578 ) | |
579 | |
580 try: | |
581 await iq_elt.send() | |
582 except Exception as e: | |
583 failure_ = failure.Failure(e) | |
584 self._iq_error(failure_, sid, client) | |
585 raise failure_ | |
586 return sid | |
587 | |
588 def delayed_content_terminate(self, *args, **kwargs): | |
589 """Put content_terminate in queue but don't execute immediately | |
590 | |
591 This is used to terminate a content inside a handler, to avoid modifying contents | |
592 """ | |
593 reactor.callLater(0, self.content_terminate, *args, **kwargs) | |
594 | |
595 def content_terminate(self, client, session, content_name, reason=REASON_SUCCESS): | |
596 """Terminate and remove a content | |
597 | |
598 if there is no more content, then session is terminated | |
599 @param session(dict): jingle session | |
600 @param content_name(unicode): name of the content terminated | |
601 @param reason(unicode): reason of the termination | |
602 """ | |
603 contents = session["contents"] | |
604 del contents[content_name] | |
605 if not contents: | |
606 self.terminate(client, reason, session) | |
607 | |
608 ## defaults methods called when plugin doesn't have them ## | |
609 | |
610 def jingle_request_confirmation_default( | |
611 self, client, action, session, content_name, desc_elt | |
612 ): | |
613 """This method request confirmation for a jingle session""" | |
614 log.debug("Using generic jingle confirmation method") | |
615 return xml_tools.defer_confirm( | |
616 self.host, | |
617 _(CONFIRM_TXT).format(entity=session["peer_jid"].full()), | |
618 _("Confirm Jingle session"), | |
619 profile=client.profile, | |
620 ) | |
621 | |
622 ## jingle events ## | |
623 | |
624 def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None: | |
625 defer.ensureDeferred(self.on_jingle_request(client, request)) | |
626 | |
627 async def on_jingle_request( | |
628 self, | |
629 client: SatXMPPEntity, | |
630 request: domish.Element | |
631 ) -> None: | |
632 """Called when any jingle request is received | |
633 | |
634 The request will then be dispatched to appropriate method | |
635 according to current state | |
636 @param request(domish.Element): received IQ request | |
637 """ | |
638 request.handled = True | |
639 jingle_elt = next(request.elements(NS_JINGLE, "jingle")) | |
640 | |
641 # first we need the session id | |
642 try: | |
643 sid = jingle_elt["sid"] | |
644 if not sid: | |
645 raise KeyError | |
646 except KeyError: | |
647 log.warning("Received jingle request has no sid attribute") | |
648 self.sendError(client, "bad-request", None, request) | |
649 return | |
650 | |
651 # then the action | |
652 try: | |
653 action = jingle_elt["action"] | |
654 if not action: | |
655 raise KeyError | |
656 except KeyError: | |
657 log.warning("Received jingle request has no action") | |
658 self.sendError(client, "bad-request", None, request) | |
659 return | |
660 | |
661 peer_jid = jid.JID(request["from"]) | |
662 | |
663 # we get or create the session | |
664 try: | |
665 session = client.jingle_sessions[sid] | |
666 except KeyError: | |
667 if action == XEP_0166.A_SESSION_INITIATE: | |
668 pass | |
669 elif action == XEP_0166.A_SESSION_TERMINATE: | |
670 log.debug( | |
671 "ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format( | |
672 request_id=sid, profile=client.profile | |
673 ) | |
674 ) | |
675 return | |
676 else: | |
677 log.warning( | |
678 "Received request for an unknown session id: {request_id} [{profile}]".format( | |
679 request_id=sid, profile=client.profile | |
680 ) | |
681 ) | |
682 self.sendError(client, "item-not-found", None, request, "unknown-session") | |
683 return | |
684 | |
685 session = client.jingle_sessions[sid] = { | |
686 "id": sid, | |
687 "state": STATE_PENDING, | |
688 "initiator": peer_jid, | |
689 "role": XEP_0166.ROLE_RESPONDER, | |
690 # we store local_jid using request['to'] because for a component the jid | |
691 # used may not be client.jid (if a local part is used). | |
692 "local_jid": jid.JID(request['to']), | |
693 "peer_jid": peer_jid, | |
694 "started": time.time(), | |
695 } | |
696 else: | |
697 if session["peer_jid"] != peer_jid: | |
698 log.warning( | |
699 "sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format( | |
700 sid | |
701 ) | |
702 ) | |
703 self.sendError(client, "service-unavailable", sid, request) | |
704 return | |
705 if session["id"] != sid: | |
706 log.error("session id doesn't match") | |
707 self.sendError(client, "service-unavailable", sid, request) | |
708 raise exceptions.InternalError | |
709 | |
710 if action == XEP_0166.A_SESSION_INITIATE: | |
711 await self.on_session_initiate(client, request, jingle_elt, session) | |
712 elif action == XEP_0166.A_SESSION_TERMINATE: | |
713 self.on_session_terminate(client, request, jingle_elt, session) | |
714 elif action == XEP_0166.A_SESSION_ACCEPT: | |
715 await self.on_session_accept(client, request, jingle_elt, session) | |
716 elif action == XEP_0166.A_SESSION_INFO: | |
717 self.on_session_info(client, request, jingle_elt, session) | |
718 elif action == XEP_0166.A_TRANSPORT_INFO: | |
719 self.on_transport_info(client, request, jingle_elt, session) | |
720 elif action == XEP_0166.A_TRANSPORT_REPLACE: | |
721 await self.on_transport_replace(client, request, jingle_elt, session) | |
722 elif action == XEP_0166.A_TRANSPORT_ACCEPT: | |
723 self.on_transport_accept(client, request, jingle_elt, session) | |
724 elif action == XEP_0166.A_TRANSPORT_REJECT: | |
725 self.on_transport_reject(client, request, jingle_elt, session) | |
726 else: | |
727 raise exceptions.InternalError(f"Unknown action {action}") | |
728 | |
729 ## Actions callbacks ## | |
730 | |
731 def _parse_elements( | |
732 self, | |
733 jingle_elt: domish.Element, | |
734 session: dict, | |
735 request: domish.Element, | |
736 client: SatXMPPEntity, | |
737 new: bool = False, | |
738 creator: str = ROLE_INITIATOR, | |
739 with_application: bool =True, | |
740 with_transport: bool = True, | |
741 store_in_session: bool = True, | |
742 ) -> Dict[str, dict]: | |
743 """Parse contents elements and fill contents_dict accordingly | |
744 | |
745 after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" | |
746 @param jingle_elt: parent <jingle> element, containing one or more <content> | |
747 @param session: session data | |
748 @param request: the whole request | |
749 @param client: %(doc_client)s | |
750 @param new: True if the content is new and must be created, | |
751 else the content must exists, and session data will be filled | |
752 @param creator: only used if new is True: creating pear (see § 7.3) | |
753 @param with_application: if True, raise an error if there is no <description> | |
754 element else ignore it | |
755 @param with_transport: if True, raise an error if there is no <transport> element | |
756 else ignore it | |
757 @param store_in_session: if True, the ``session`` contents will be updated with | |
758 the parsed elements. | |
759 Use False when you parse an action which can happen at any time (e.g. | |
760 transport-info) and meaning that a parsed element may already be present in | |
761 the session (e.g. if an authorisation request is waiting for user answer), | |
762 This can't be used when ``new`` is set. | |
763 @return: contents_dict (from session, or a new one if "store_in_session" is False) | |
764 @raise exceptions.CancelError: the error is treated and the calling method can | |
765 cancel the treatment (i.e. return) | |
766 """ | |
767 if store_in_session: | |
768 contents_dict = session["contents"] | |
769 else: | |
770 if new: | |
771 raise exceptions.InternalError( | |
772 '"store_in_session" must not be used when "new" is set' | |
773 ) | |
774 contents_dict = {n: {} for n in session["contents"]} | |
775 content_elts = jingle_elt.elements(NS_JINGLE, "content") | |
776 | |
777 for content_elt in content_elts: | |
778 name = content_elt["name"] | |
779 | |
780 if new: | |
781 # the content must not exist, we check it | |
782 if not name or name in contents_dict: | |
783 self.sendError(client, "bad-request", session["id"], request) | |
784 raise exceptions.CancelError | |
785 content_data = contents_dict[name] = { | |
786 "creator": creator, | |
787 "senders": content_elt.attributes.get("senders", "both"), | |
788 } | |
789 else: | |
790 # the content must exist, we check it | |
791 try: | |
792 content_data = contents_dict[name] | |
793 except KeyError: | |
794 log.warning("Other peer try to access an unknown content") | |
795 self.sendError(client, "bad-request", session["id"], request) | |
796 raise exceptions.CancelError | |
797 | |
798 # application | |
799 if with_application: | |
800 desc_elt = content_elt.description | |
801 if not desc_elt: | |
802 self.sendError(client, "bad-request", session["id"], request) | |
803 raise exceptions.CancelError | |
804 | |
805 if new: | |
806 # the content is new, we need to check and link the application | |
807 app_ns = desc_elt.uri | |
808 if not app_ns or app_ns == NS_JINGLE: | |
809 self.sendError(client, "bad-request", session["id"], request) | |
810 raise exceptions.CancelError | |
811 | |
812 try: | |
813 application = self._applications[app_ns] | |
814 except KeyError: | |
815 log.warning( | |
816 "Unmanaged application namespace [{}]".format(app_ns) | |
817 ) | |
818 self.sendError( | |
819 client, "service-unavailable", session["id"], request | |
820 ) | |
821 raise exceptions.CancelError | |
822 | |
823 content_data["application"] = application | |
824 content_data["application_data"] = {} | |
825 else: | |
826 # the content exists, we check that we have not a former desc_elt | |
827 if "desc_elt" in content_data: | |
828 raise exceptions.InternalError( | |
829 "desc_elt should not exist at this point" | |
830 ) | |
831 | |
832 content_data["desc_elt"] = desc_elt | |
833 | |
834 # transport | |
835 if with_transport: | |
836 transport_elt = content_elt.transport | |
837 if not transport_elt: | |
838 self.sendError(client, "bad-request", session["id"], request) | |
839 raise exceptions.CancelError | |
840 | |
841 if new: | |
842 # the content is new, we need to check and link the transport | |
843 transport_ns = transport_elt.uri | |
844 if not app_ns or app_ns == NS_JINGLE: | |
845 self.sendError(client, "bad-request", session["id"], request) | |
846 raise exceptions.CancelError | |
847 | |
848 try: | |
849 transport = self._transports[transport_ns] | |
850 except KeyError: | |
851 raise exceptions.InternalError( | |
852 "No transport registered for namespace {}".format( | |
853 transport_ns | |
854 ) | |
855 ) | |
856 content_data["transport"] = transport | |
857 content_data["transport_data"] = {} | |
858 else: | |
859 # the content exists, we check that we have not a former transport_elt | |
860 if "transport_elt" in content_data: | |
861 raise exceptions.InternalError( | |
862 "transport_elt should not exist at this point" | |
863 ) | |
864 | |
865 content_data["transport_elt"] = transport_elt | |
866 | |
867 return contents_dict | |
868 | |
869 def _ignore(self, client, action, session, content_name, elt): | |
870 """Dummy method used when not exception must be raised if a method is not implemented in _call_plugins | |
871 | |
872 must be used as app_default_cb and/or transp_default_cb | |
873 """ | |
874 return elt | |
875 | |
876 def _call_plugins( | |
877 self, | |
878 client: SatXMPPEntity, | |
879 action: str, | |
880 session: dict, | |
881 app_method_name: Optional[str] = "jingle_handler", | |
882 transp_method_name: Optional[str] = "jingle_handler", | |
883 app_default_cb: Optional[Callable] = None, | |
884 transp_default_cb: Optional[Callable] = None, | |
885 delete: bool = True, | |
886 elements: bool = True, | |
887 force_element: Optional[domish.Element] = None | |
888 ) -> List[defer.Deferred]: | |
889 """Call application and transport plugin methods for all contents | |
890 | |
891 @param action: jingle action name | |
892 @param session: jingle session data | |
893 @param app_method_name: name of the method to call for applications | |
894 None to ignore | |
895 @param transp_method_name: name of the method to call for transports | |
896 None to ignore | |
897 @param app_default_cb: default callback to use if plugin has not app_method_name | |
898 None to raise an exception instead | |
899 @param transp_default_cb: default callback to use if plugin has not transp_method_name | |
900 None to raise an exception instead | |
901 @param delete: if True, remove desc_elt and transport_elt from session | |
902 ignored if elements is False | |
903 @param elements: True if elements(desc_elt and tranport_elt) must be managed | |
904 must be True if _call_plugins is used in a request, and False if it is used | |
905 after a request (i.e. on <iq> result or error) | |
906 @param force_element: if elements is False, it is used as element parameter | |
907 else it is ignored | |
908 @return : list of launched Deferred | |
909 @raise exceptions.NotFound: method is not implemented | |
910 """ | |
911 contents_dict = session["contents"] | |
912 defers_list = [] | |
913 for content_name, content_data in contents_dict.items(): | |
914 for method_name, handler_key, default_cb, elt_name in ( | |
915 (app_method_name, "application", app_default_cb, "desc_elt"), | |
916 (transp_method_name, "transport", transp_default_cb, "transport_elt"), | |
917 ): | |
918 if method_name is None: | |
919 continue | |
920 | |
921 handler = content_data[handler_key].handler | |
922 try: | |
923 method = getattr(handler, method_name) | |
924 except AttributeError: | |
925 if default_cb is None: | |
926 raise exceptions.NotFound( | |
927 "{} not implemented !".format(method_name) | |
928 ) | |
929 else: | |
930 method = default_cb | |
931 if elements: | |
932 elt = content_data.pop(elt_name) if delete else content_data[elt_name] | |
933 else: | |
934 elt = force_element | |
935 d = utils.as_deferred( | |
936 method, client, action, session, content_name, elt | |
937 ) | |
938 defers_list.append(d) | |
939 | |
940 return defers_list | |
941 | |
942 async def on_session_initiate( | |
943 self, | |
944 client: SatXMPPEntity, | |
945 request: domish.Element, | |
946 jingle_elt: domish.Element, | |
947 session: Dict[str, Any] | |
948 ) -> None: | |
949 """Called on session-initiate action | |
950 | |
951 The "jingle_request_confirmation" method of each application will be called | |
952 (or self.jingle_request_confirmation_default if the former doesn't exist). | |
953 The session is only accepted if all application are confirmed. | |
954 The application must manage itself multiple contents scenari (e.g. audio/video). | |
955 @param client: %(doc_client)s | |
956 @param request(domish.Element): full request | |
957 @param jingle_elt(domish.Element): <jingle> element | |
958 @param session(dict): session data | |
959 """ | |
960 if "contents" in session: | |
961 raise exceptions.InternalError( | |
962 "Contents dict should not already exist at this point" | |
963 ) | |
964 session["contents"] = contents_dict = {} | |
965 | |
966 try: | |
967 self._parse_elements( | |
968 jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR | |
969 ) | |
970 except exceptions.CancelError: | |
971 return | |
972 | |
973 if not contents_dict: | |
974 # there MUST be at least one content | |
975 self.sendError(client, "bad-request", session["id"], request) | |
976 return | |
977 | |
978 # at this point we can send the <iq/> result to confirm reception of the request | |
979 client.send(xmlstream.toResponse(request, "result")) | |
980 | |
981 | |
982 if not await self.host.trigger.async_point( | |
983 "XEP-0166_on_session_initiate", | |
984 client, session, request, jingle_elt | |
985 ): | |
986 return | |
987 | |
988 await defer.DeferredList(self._call_plugins( | |
989 client, | |
990 XEP_0166.A_PREPARE_CONFIRMATION, | |
991 session, | |
992 delete=False | |
993 )) | |
994 | |
995 # we now request each application plugin confirmation | |
996 # and if all are accepted, we can accept the session | |
997 confirm_defers = self._call_plugins( | |
998 client, | |
999 XEP_0166.A_SESSION_INITIATE, | |
1000 session, | |
1001 "jingle_request_confirmation", | |
1002 None, | |
1003 self.jingle_request_confirmation_default, | |
1004 delete=False, | |
1005 ) | |
1006 | |
1007 confirm_dlist = defer.gatherResults(confirm_defers) | |
1008 confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client) | |
1009 confirm_dlist.addErrback(self._jingle_error_cb, session, request, client) | |
1010 | |
1011 def _confirmation_cb(self, confirm_results, session, jingle_elt, client): | |
1012 """Method called when confirmation from user has been received | |
1013 | |
1014 This method is only called for the responder | |
1015 @param confirm_results(list[bool]): all True if session is accepted | |
1016 @param session(dict): session data | |
1017 @param jingle_elt(domish.Element): jingle data of this session | |
1018 @param client: %(doc_client)s | |
1019 """ | |
1020 confirmed = all(confirm_results) | |
1021 if not confirmed: | |
1022 return self.terminate(client, XEP_0166.REASON_DECLINE, session) | |
1023 | |
1024 iq_elt, jingle_elt = self._build_jingle_elt( | |
1025 client, session, XEP_0166.A_SESSION_ACCEPT | |
1026 ) | |
1027 jingle_elt["responder"] = session['local_jid'].full() | |
1028 | |
1029 # contents | |
1030 | |
1031 def addElement(domish_elt, content_elt): | |
1032 content_elt.addChild(domish_elt) | |
1033 | |
1034 defers_list = [] | |
1035 | |
1036 for content_name, content_data in session["contents"].items(): | |
1037 content_elt = jingle_elt.addElement("content") | |
1038 content_elt["creator"] = XEP_0166.ROLE_INITIATOR | |
1039 content_elt["name"] = content_name | |
1040 | |
1041 application = content_data["application"] | |
1042 app_session_accept_cb = application.handler.jingle_handler | |
1043 | |
1044 app_d = utils.as_deferred( | |
1045 app_session_accept_cb, | |
1046 client, | |
1047 XEP_0166.A_SESSION_INITIATE, | |
1048 session, | |
1049 content_name, | |
1050 content_data.pop("desc_elt"), | |
1051 ) | |
1052 app_d.addCallback(addElement, content_elt) | |
1053 defers_list.append(app_d) | |
1054 | |
1055 transport = content_data["transport"] | |
1056 transport_session_accept_cb = transport.handler.jingle_handler | |
1057 | |
1058 transport_d = utils.as_deferred( | |
1059 transport_session_accept_cb, | |
1060 client, | |
1061 XEP_0166.A_SESSION_INITIATE, | |
1062 session, | |
1063 content_name, | |
1064 content_data.pop("transport_elt"), | |
1065 ) | |
1066 transport_d.addCallback(addElement, content_elt) | |
1067 defers_list.append(transport_d) | |
1068 | |
1069 d_list = defer.DeferredList(defers_list) | |
1070 d_list.addCallback( | |
1071 lambda __: self._call_plugins( | |
1072 client, | |
1073 XEP_0166.A_PREPARE_RESPONDER, | |
1074 session, | |
1075 app_method_name=None, | |
1076 elements=False, | |
1077 ) | |
1078 ) | |
1079 d_list.addCallback(lambda __: iq_elt.send()) | |
1080 | |
1081 def change_state(__, session): | |
1082 session["state"] = STATE_ACTIVE | |
1083 | |
1084 d_list.addCallback(change_state, session) | |
1085 d_list.addCallback( | |
1086 lambda __: self._call_plugins( | |
1087 client, XEP_0166.A_ACCEPTED_ACK, session, elements=False | |
1088 ) | |
1089 ) | |
1090 d_list.addErrback(self._iq_error, session["id"], client) | |
1091 return d_list | |
1092 | |
1093 def on_session_terminate(self, client, request, jingle_elt, session): | |
1094 # TODO: check reason, display a message to user if needed | |
1095 log.debug("Jingle Session {} terminated".format(session["id"])) | |
1096 try: | |
1097 reason_elt = next(jingle_elt.elements(NS_JINGLE, "reason")) | |
1098 except StopIteration: | |
1099 log.warning("No reason given for session termination") | |
1100 reason_elt = jingle_elt.addElement("reason") | |
1101 | |
1102 terminate_defers = self._call_plugins( | |
1103 client, | |
1104 XEP_0166.A_SESSION_TERMINATE, | |
1105 session, | |
1106 "jingle_terminate", | |
1107 "jingle_terminate", | |
1108 self._ignore, | |
1109 self._ignore, | |
1110 elements=False, | |
1111 force_element=reason_elt, | |
1112 ) | |
1113 terminate_dlist = defer.DeferredList(terminate_defers) | |
1114 | |
1115 terminate_dlist.addCallback(lambda __: self._del_session(client, session["id"])) | |
1116 client.send(xmlstream.toResponse(request, "result")) | |
1117 | |
1118 async def on_session_accept(self, client, request, jingle_elt, session): | |
1119 """Method called once session is accepted | |
1120 | |
1121 This method is only called for initiator | |
1122 @param client: %(doc_client)s | |
1123 @param request(domish.Element): full <iq> request | |
1124 @param jingle_elt(domish.Element): the <jingle> element | |
1125 @param session(dict): session data | |
1126 """ | |
1127 log.debug(f"Jingle session {session['id']} has been accepted") | |
1128 | |
1129 try: | |
1130 self._parse_elements(jingle_elt, session, request, client) | |
1131 except exceptions.CancelError: | |
1132 return | |
1133 | |
1134 # at this point we can send the <iq/> result to confirm reception of the request | |
1135 client.send(xmlstream.toResponse(request, "result")) | |
1136 # and change the state | |
1137 session["state"] = STATE_ACTIVE | |
1138 | |
1139 await defer.DeferredList(self._call_plugins( | |
1140 client, | |
1141 XEP_0166.A_PREPARE_INITIATOR, | |
1142 session, | |
1143 delete=False | |
1144 )) | |
1145 | |
1146 negociate_defers = [] | |
1147 negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) | |
1148 | |
1149 negociate_dlist = defer.gatherResults(negociate_defers) | |
1150 | |
1151 # after negociations we start the transfer | |
1152 negociate_dlist.addCallback( | |
1153 lambda __: self._call_plugins( | |
1154 client, XEP_0166.A_START, session, app_method_name=None, elements=False | |
1155 ) | |
1156 ) | |
1157 | |
1158 def _on_session_cb(self, result, client, request, jingle_elt, session): | |
1159 client.send(xmlstream.toResponse(request, "result")) | |
1160 | |
1161 def _on_session_eb(self, failure_, client, request, jingle_elt, session): | |
1162 log.error("Error while handling on_session_info: {}".format(failure_.value)) | |
1163 # XXX: only error managed so far, maybe some applications/transports need more | |
1164 self.sendError( | |
1165 client, "feature-not-implemented", None, request, "unsupported-info" | |
1166 ) | |
1167 | |
1168 def on_session_info(self, client, request, jingle_elt, session): | |
1169 """Method called when a session-info action is received from other peer | |
1170 | |
1171 This method is only called for initiator | |
1172 @param client: %(doc_client)s | |
1173 @param request(domish.Element): full <iq> request | |
1174 @param jingle_elt(domish.Element): the <jingle> element | |
1175 @param session(dict): session data | |
1176 """ | |
1177 if not jingle_elt.children: | |
1178 # this is a session ping, see XEP-0166 §6.8 | |
1179 client.send(xmlstream.toResponse(request, "result")) | |
1180 return | |
1181 | |
1182 try: | |
1183 # XXX: session-info is most likely only used for application, so we don't call transport plugins | |
1184 # if a future transport use it, this behaviour must be adapted | |
1185 defers = self._call_plugins( | |
1186 client, | |
1187 XEP_0166.A_SESSION_INFO, | |
1188 session, | |
1189 "jingle_session_info", | |
1190 None, | |
1191 elements=False, | |
1192 force_element=jingle_elt, | |
1193 ) | |
1194 except exceptions.NotFound as e: | |
1195 self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) | |
1196 return | |
1197 | |
1198 dlist = defer.DeferredList(defers, fireOnOneErrback=True) | |
1199 dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) | |
1200 dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) | |
1201 | |
1202 async def on_transport_replace(self, client, request, jingle_elt, session): | |
1203 """A transport change is requested | |
1204 | |
1205 The request is parsed, and jingle_handler is called on concerned transport plugin(s) | |
1206 @param client: %(doc_client)s | |
1207 @param request(domish.Element): full <iq> request | |
1208 @param jingle_elt(domish.Element): the <jingle> element | |
1209 @param session(dict): session data | |
1210 """ | |
1211 log.debug("Other peer wants to replace the transport") | |
1212 try: | |
1213 self._parse_elements( | |
1214 jingle_elt, session, request, client, with_application=False | |
1215 ) | |
1216 except exceptions.CancelError: | |
1217 defer.returnValue(None) | |
1218 | |
1219 client.send(xmlstream.toResponse(request, "result")) | |
1220 | |
1221 content_name = None | |
1222 to_replace = [] | |
1223 | |
1224 for content_name, content_data in session["contents"].items(): | |
1225 try: | |
1226 transport_elt = content_data.pop("transport_elt") | |
1227 except KeyError: | |
1228 continue | |
1229 transport_ns = transport_elt.uri | |
1230 try: | |
1231 transport = self._transports[transport_ns] | |
1232 except KeyError: | |
1233 log.warning( | |
1234 "Other peer want to replace current transport with an unknown one: {}".format( | |
1235 transport_ns | |
1236 ) | |
1237 ) | |
1238 content_name = None | |
1239 break | |
1240 to_replace.append((content_name, content_data, transport, transport_elt)) | |
1241 | |
1242 if content_name is None: | |
1243 # wa can't accept the replacement | |
1244 iq_elt, reject_jingle_elt = self._build_jingle_elt( | |
1245 client, session, XEP_0166.A_TRANSPORT_REJECT | |
1246 ) | |
1247 for child in jingle_elt.children: | |
1248 reject_jingle_elt.addChild(child) | |
1249 | |
1250 iq_elt.send() | |
1251 defer.returnValue(None) | |
1252 | |
1253 # at this point, everything is alright and we can replace the transport(s) | |
1254 # this is similar to an session-accept action, but for transports only | |
1255 iq_elt, accept_jingle_elt = self._build_jingle_elt( | |
1256 client, session, XEP_0166.A_TRANSPORT_ACCEPT | |
1257 ) | |
1258 for content_name, content_data, transport, transport_elt in to_replace: | |
1259 # we can now actually replace the transport | |
1260 await utils.as_deferred( | |
1261 content_data["transport"].handler.jingle_handler, | |
1262 client, XEP_0166.A_DESTROY, session, content_name, None | |
1263 ) | |
1264 content_data["transport"] = transport | |
1265 content_data["transport_data"].clear() | |
1266 # and build the element | |
1267 content_elt = accept_jingle_elt.addElement("content") | |
1268 content_elt["name"] = content_name | |
1269 content_elt["creator"] = content_data["creator"] | |
1270 # we notify the transport and insert its <transport/> in the answer | |
1271 accept_transport_elt = await utils.as_deferred( | |
1272 transport.handler.jingle_handler, | |
1273 client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt | |
1274 ) | |
1275 content_elt.addChild(accept_transport_elt) | |
1276 # there is no confirmation needed here, so we can directly prepare it | |
1277 await utils.as_deferred( | |
1278 transport.handler.jingle_handler, | |
1279 client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None | |
1280 ) | |
1281 | |
1282 iq_elt.send() | |
1283 | |
1284 def on_transport_accept(self, client, request, jingle_elt, session): | |
1285 """Method called once transport replacement is accepted | |
1286 | |
1287 @param client: %(doc_client)s | |
1288 @param request(domish.Element): full <iq> request | |
1289 @param jingle_elt(domish.Element): the <jingle> element | |
1290 @param session(dict): session data | |
1291 """ | |
1292 log.debug("new transport has been accepted") | |
1293 | |
1294 try: | |
1295 self._parse_elements( | |
1296 jingle_elt, session, request, client, with_application=False | |
1297 ) | |
1298 except exceptions.CancelError: | |
1299 return | |
1300 | |
1301 # at this point we can send the <iq/> result to confirm reception of the request | |
1302 client.send(xmlstream.toResponse(request, "result")) | |
1303 | |
1304 negociate_defers = [] | |
1305 negociate_defers = self._call_plugins( | |
1306 client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None | |
1307 ) | |
1308 | |
1309 negociate_dlist = defer.DeferredList(negociate_defers) | |
1310 | |
1311 # after negociations we start the transfer | |
1312 negociate_dlist.addCallback( | |
1313 lambda __: self._call_plugins( | |
1314 client, XEP_0166.A_START, session, app_method_name=None, elements=False | |
1315 ) | |
1316 ) | |
1317 | |
1318 def on_transport_reject(self, client, request, jingle_elt, session): | |
1319 """Method called when a transport replacement is refused | |
1320 | |
1321 @param client: %(doc_client)s | |
1322 @param request(domish.Element): full <iq> request | |
1323 @param jingle_elt(domish.Element): the <jingle> element | |
1324 @param session(dict): session data | |
1325 """ | |
1326 # XXX: for now, we terminate the session in case of transport-reject | |
1327 # this behaviour may change in the future | |
1328 self.terminate(client, "failed-transport", session) | |
1329 | |
1330 def on_transport_info( | |
1331 self, | |
1332 client: SatXMPPEntity, | |
1333 request: domish.Element, | |
1334 jingle_elt: domish.Element, | |
1335 session: dict | |
1336 ) -> None: | |
1337 """Method called when a transport-info action is received from other peer | |
1338 | |
1339 The request is parsed, and jingle_handler is called on concerned transport | |
1340 plugin(s) | |
1341 @param client: %(doc_client)s | |
1342 @param request: full <iq> request | |
1343 @param jingle_elt: the <jingle> element | |
1344 @param session: session data | |
1345 """ | |
1346 log.debug(f"Jingle session {session['id']} has been accepted") | |
1347 | |
1348 try: | |
1349 parsed_contents = self._parse_elements( | |
1350 jingle_elt, session, request, client, with_application=False, | |
1351 store_in_session=False | |
1352 ) | |
1353 except exceptions.CancelError: | |
1354 return | |
1355 | |
1356 # The parsing was OK, we send the <iq> result | |
1357 client.send(xmlstream.toResponse(request, "result")) | |
1358 | |
1359 for content_name, content_data in session["contents"].items(): | |
1360 try: | |
1361 transport_elt = parsed_contents[content_name]["transport_elt"] | |
1362 except KeyError: | |
1363 continue | |
1364 else: | |
1365 utils.as_deferred( | |
1366 content_data["transport"].handler.jingle_handler, | |
1367 client, | |
1368 XEP_0166.A_TRANSPORT_INFO, | |
1369 session, | |
1370 content_name, | |
1371 transport_elt, | |
1372 ) | |
1373 | |
1374 | |
1375 @implementer(iwokkel.IDisco) | |
1376 class XEP_0166_handler(xmlstream.XMPPHandler): | |
1377 | |
1378 def __init__(self, plugin_parent): | |
1379 self.plugin_parent = plugin_parent | |
1380 | |
1381 def connectionInitialized(self): | |
1382 self.xmlstream.addObserver( | |
1383 JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent | |
1384 ) | |
1385 | |
1386 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
1387 return [disco.DiscoFeature(NS_JINGLE)] | |
1388 | |
1389 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
1390 return [] |