Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0166/__init__.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0166/__init__.py@dd39e60ca2aa |
children | a8ac5e1e5848 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for Jingle (XEP-0166) | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 | |
20 import time | |
21 from typing import Any, Callable, Dict, Final, List, Optional, Tuple | |
22 import uuid | |
23 | |
24 from twisted.internet import defer | |
25 from twisted.internet import reactor | |
26 from twisted.python import failure | |
27 from twisted.words.protocols.jabber import jid | |
28 from twisted.words.protocols.jabber import error | |
29 from twisted.words.protocols.jabber import xmlstream | |
30 from twisted.words.xish import domish | |
31 from wokkel import disco, iwokkel | |
32 from zope.interface import implementer | |
33 | |
34 from libervia.backend.core import exceptions | |
35 from libervia.backend.core.constants import Const as C | |
36 from libervia.backend.core.core_types import SatXMPPEntity | |
37 from libervia.backend.core.i18n import D_, _ | |
38 from libervia.backend.core.log import getLogger | |
39 from libervia.backend.tools import xml_tools | |
40 from libervia.backend.tools import utils | |
41 | |
42 from .models import ( | |
43 ApplicationData, | |
44 BaseApplicationHandler, | |
45 BaseTransportHandler, | |
46 ContentData, | |
47 TransportData, | |
48 ) | |
49 | |
50 | |
51 log = getLogger(__name__) | |
52 | |
53 | |
54 IQ_SET : Final = '/iq[@type="set"]' | |
55 NS_JINGLE : Final = "urn:xmpp:jingle:1" | |
56 NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1" | |
57 JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]' | |
58 STATE_PENDING : Final = "PENDING" | |
59 STATE_ACTIVE : Final = "ACTIVE" | |
60 STATE_ENDED : Final = "ENDED" | |
61 CONFIRM_TXT : Final = D_( | |
62 "{entity} want to start a jingle session with you, do you accept ?" | |
63 ) | |
64 | |
65 PLUGIN_INFO : Final = { | |
66 C.PI_NAME: "Jingle", | |
67 C.PI_IMPORT_NAME: "XEP-0166", | |
68 C.PI_TYPE: "XEP", | |
69 C.PI_MODES: C.PLUG_MODE_BOTH, | |
70 C.PI_PROTOCOLS: ["XEP-0166"], | |
71 C.PI_MAIN: "XEP_0166", | |
72 C.PI_HANDLER: "yes", | |
73 C.PI_DESCRIPTION: _("""Implementation of Jingle"""), | |
74 } | |
75 | |
76 | |
77 class XEP_0166: | |
78 namespace : Final = NS_JINGLE | |
79 | |
80 ROLE_INITIATOR : Final = "initiator" | |
81 ROLE_RESPONDER : Final = "responder" | |
82 | |
83 TRANSPORT_DATAGRAM : Final = "UDP" | |
84 TRANSPORT_STREAMING : Final = "TCP" | |
85 | |
86 REASON_SUCCESS : Final = "success" | |
87 REASON_DECLINE : Final = "decline" | |
88 REASON_FAILED_APPLICATION : Final = "failed-application" | |
89 REASON_FAILED_TRANSPORT : Final = "failed-transport" | |
90 REASON_CONNECTIVITY_ERROR : Final = "connectivity-error" | |
91 | |
92 # standard actions | |
93 | |
94 A_SESSION_INITIATE : Final = "session-initiate" | |
95 A_SESSION_ACCEPT : Final = "session-accept" | |
96 A_SESSION_TERMINATE : Final = "session-terminate" | |
97 A_SESSION_INFO : Final = "session-info" | |
98 A_TRANSPORT_REPLACE : Final = "transport-replace" | |
99 A_TRANSPORT_ACCEPT : Final = "transport-accept" | |
100 A_TRANSPORT_REJECT : Final = "transport-reject" | |
101 A_TRANSPORT_INFO : Final = "transport-info" | |
102 | |
103 # non standard actions | |
104 | |
105 #: called before the confirmation request, first event for responder, useful for | |
106 #: parsing | |
107 A_PREPARE_CONFIRMATION : Final = "prepare-confirmation" | |
108 #: initiator must prepare tranfer | |
109 A_PREPARE_INITIATOR : Final = "prepare-initiator" | |
110 #: responder must prepare tranfer | |
111 A_PREPARE_RESPONDER : Final = "prepare-responder" | |
112 #; session accepted ack has been received from initiator | |
113 A_ACCEPTED_ACK : Final = ( | |
114 "accepted-ack" | |
115 ) | |
116 A_START : Final = "start" # application can start | |
117 #: called when a transport is destroyed (e.g. because it is remplaced). Used to do | |
118 #: cleaning operations | |
119 A_DESTROY : Final = ( | |
120 "destroy" | |
121 ) | |
122 | |
123 def __init__(self, host): | |
124 log.info(_("plugin Jingle initialization")) | |
125 self.host = host | |
126 self._applications = {} # key: namespace, value: application data | |
127 self._transports = {} # key: namespace, value: transport data | |
128 # we also keep transports by type, they are then sorted by priority | |
129 self._type_transports = { | |
130 XEP_0166.TRANSPORT_DATAGRAM: [], | |
131 XEP_0166.TRANSPORT_STREAMING: [], | |
132 } | |
133 | |
134 def profile_connected(self, client): | |
135 client.jingle_sessions = {} # key = sid, value = session_data | |
136 | |
137 def get_handler(self, client): | |
138 return XEP_0166_handler(self) | |
139 | |
140 def get_session(self, client: SatXMPPEntity, session_id: str) -> dict: | |
141 """Retrieve session from its SID | |
142 | |
143 @param session_id: session ID | |
144 @return: found session | |
145 | |
146 @raise exceptions.NotFound: no session with this SID has been found | |
147 """ | |
148 try: | |
149 return client.jingle_sessions[session_id] | |
150 except KeyError: | |
151 raise exceptions.NotFound( | |
152 f"No session with SID {session_id} found" | |
153 ) | |
154 | |
155 | |
156 def _del_session(self, client, sid): | |
157 try: | |
158 del client.jingle_sessions[sid] | |
159 except KeyError: | |
160 log.debug( | |
161 f"Jingle session id {sid!r} is unknown, nothing to delete " | |
162 f"[{client.profile}]") | |
163 else: | |
164 log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]") | |
165 | |
166 ## helpers methods to build stanzas ## | |
167 | |
168 def _build_jingle_elt( | |
169 self, | |
170 client: SatXMPPEntity, | |
171 session: dict, | |
172 action: str | |
173 ) -> Tuple[xmlstream.IQ, domish.Element]: | |
174 iq_elt = client.IQ("set") | |
175 iq_elt["from"] = session['local_jid'].full() | |
176 iq_elt["to"] = session["peer_jid"].full() | |
177 jingle_elt = iq_elt.addElement("jingle", NS_JINGLE) | |
178 jingle_elt["sid"] = session["id"] | |
179 jingle_elt["action"] = action | |
180 return iq_elt, jingle_elt | |
181 | |
182 def sendError(self, client, error_condition, sid, request, jingle_condition=None): | |
183 """Send error stanza | |
184 | |
185 @param error_condition: one of twisted.words.protocols.jabber.error.STANZA_CONDITIONS keys | |
186 @param sid(unicode,None): jingle session id, or None, if session must not be destroyed | |
187 @param request(domish.Element): original request | |
188 @param jingle_condition(None, unicode): if not None, additional jingle-specific error information | |
189 """ | |
190 iq_elt = error.StanzaError(error_condition).toResponse(request) | |
191 if jingle_condition is not None: | |
192 iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition)) | |
193 if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid: | |
194 self._del_session(client, sid) | |
195 log.warning( | |
196 "Error while managing jingle session, cancelling: {condition}".format( | |
197 condition=error_condition | |
198 ) | |
199 ) | |
200 return client.send(iq_elt) | |
201 | |
202 def _terminate_eb(self, failure_): | |
203 log.warning(_("Error while terminating session: {msg}").format(msg=failure_)) | |
204 | |
205 def terminate(self, client, reason, session, text=None): | |
206 """Terminate the session | |
207 | |
208 send the session-terminate action, and delete the session data | |
209 @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element | |
210 if a list of element, add them as children of the <reason/> element | |
211 @param session(dict): data of the session | |
212 """ | |
213 iq_elt, jingle_elt = self._build_jingle_elt( | |
214 client, session, XEP_0166.A_SESSION_TERMINATE | |
215 ) | |
216 reason_elt = jingle_elt.addElement("reason") | |
217 if isinstance(reason, str): | |
218 reason_elt.addElement(reason) | |
219 else: | |
220 for elt in reason: | |
221 reason_elt.addChild(elt) | |
222 if text is not None: | |
223 reason_elt.addElement("text", content=text) | |
224 self._del_session(client, session["id"]) | |
225 d = iq_elt.send() | |
226 d.addErrback(self._terminate_eb) | |
227 return d | |
228 | |
229 ## errors which doesn't imply a stanza sending ## | |
230 | |
231 def _iq_error(self, failure_, sid, client): | |
232 """Called when we got an <iq/> error | |
233 | |
234 @param failure_(failure.Failure): the exceptions raised | |
235 @param sid(unicode): jingle session id | |
236 """ | |
237 log.warning( | |
238 "Error while sending jingle <iq/> stanza: {failure_}".format( | |
239 failure_=failure_.value | |
240 ) | |
241 ) | |
242 self._del_session(client, sid) | |
243 | |
244 def _jingle_error_cb(self, failure_, session, request, client): | |
245 """Called when something is going wrong while parsing jingle request | |
246 | |
247 The error condition depend of the exceptions raised: | |
248 exceptions.DataError raise a bad-request condition | |
249 @param fail(failure.Failure): the exceptions raised | |
250 @param session(dict): data of the session | |
251 @param request(domsih.Element): jingle request | |
252 @param client: %(doc_client)s | |
253 """ | |
254 del session["jingle_elt"] | |
255 log.warning(f"Error while processing jingle request [{client.profile}]") | |
256 if isinstance(failure_.value, defer.FirstError): | |
257 failure_ = failure_.value.subFailure.value | |
258 if isinstance(failure_, exceptions.DataError): | |
259 return self.sendError(client, "bad-request", session["id"], request) | |
260 elif isinstance(failure_, error.StanzaError): | |
261 return self.terminate(client, self.REASON_FAILED_APPLICATION, session, | |
262 text=str(failure_)) | |
263 else: | |
264 log.error(f"Unmanaged jingle exception: {failure_}") | |
265 return self.terminate(client, self.REASON_FAILED_APPLICATION, session, | |
266 text=str(failure_)) | |
267 | |
268 ## methods used by other plugins ## | |
269 | |
270 def register_application( | |
271 self, | |
272 namespace: str, | |
273 handler: BaseApplicationHandler | |
274 ) -> None: | |
275 """Register an application plugin | |
276 | |
277 @param namespace(unicode): application namespace managed by the plugin | |
278 @param handler(object): instance of a class which manage the application. | |
279 May have the following methods: | |
280 - request_confirmation(session, desc_elt, client): | |
281 - if present, it is called on when session must be accepted. | |
282 - if it return True the session is accepted, else rejected. | |
283 A Deferred can be returned | |
284 - if not present, a generic accept dialog will be used | |
285 - jingle_session_init( | |
286 client, self, session, content_name[, *args, **kwargs] | |
287 ): must return the domish.Element used for initial content | |
288 - jingle_handler( | |
289 client, self, action, session, content_name, transport_elt | |
290 ): | |
291 called on several action to negociate the application or transport | |
292 - jingle_terminate: called on session terminate, with reason_elt | |
293 May be used to clean session | |
294 """ | |
295 if namespace in self._applications: | |
296 raise exceptions.ConflictError( | |
297 f"Trying to register already registered namespace {namespace}" | |
298 ) | |
299 self._applications[namespace] = ApplicationData( | |
300 namespace=namespace, handler=handler | |
301 ) | |
302 log.debug("new jingle application registered") | |
303 | |
304 def register_transport( | |
305 self, | |
306 namespace: str, | |
307 transport_type: str, | |
308 handler: BaseTransportHandler, | |
309 priority: int = 0 | |
310 ) -> None: | |
311 """Register a transport plugin | |
312 | |
313 @param namespace: the XML namespace used for this transport | |
314 @param transport_type: type of transport to use (see XEP-0166 §8) | |
315 @param handler: instance of a class which manage the application. | |
316 @param priority: priority of this transport | |
317 """ | |
318 assert transport_type in ( | |
319 XEP_0166.TRANSPORT_DATAGRAM, | |
320 XEP_0166.TRANSPORT_STREAMING, | |
321 ) | |
322 if namespace in self._transports: | |
323 raise exceptions.ConflictError( | |
324 "Trying to register already registered namespace {}".format(namespace) | |
325 ) | |
326 transport_data = TransportData( | |
327 namespace=namespace, handler=handler, priority=priority | |
328 ) | |
329 self._type_transports[transport_type].append(transport_data) | |
330 self._type_transports[transport_type].sort( | |
331 key=lambda transport_data: transport_data.priority, reverse=True | |
332 ) | |
333 self._transports[namespace] = transport_data | |
334 log.debug("new jingle transport registered") | |
335 | |
336 @defer.inlineCallbacks | |
337 def transport_replace(self, client, transport_ns, session, content_name): | |
338 """Replace a transport | |
339 | |
340 @param transport_ns(unicode): namespace of the new transport to use | |
341 @param session(dict): jingle session data | |
342 @param content_name(unicode): name of the content | |
343 """ | |
344 # XXX: for now we replace the transport before receiving confirmation from other peer | |
345 # this is acceptable because we terminate the session if transport is rejected. | |
346 # this behavious may change in the future. | |
347 content_data = session["contents"][content_name] | |
348 transport_data = content_data["transport_data"] | |
349 try: | |
350 transport = self._transports[transport_ns] | |
351 except KeyError: | |
352 raise exceptions.InternalError("Unkown transport") | |
353 yield content_data["transport"].handler.jingle_handler( | |
354 client, XEP_0166.A_DESTROY, session, content_name, None | |
355 ) | |
356 content_data["transport"] = transport | |
357 transport_data.clear() | |
358 | |
359 iq_elt, jingle_elt = self._build_jingle_elt( | |
360 client, session, XEP_0166.A_TRANSPORT_REPLACE | |
361 ) | |
362 content_elt = jingle_elt.addElement("content") | |
363 content_elt["name"] = content_name | |
364 content_elt["creator"] = content_data["creator"] | |
365 | |
366 transport_elt = transport.handler.jingle_session_init(client, session, content_name) | |
367 content_elt.addChild(transport_elt) | |
368 iq_elt.send() | |
369 | |
370 def build_action( | |
371 self, | |
372 client: SatXMPPEntity, | |
373 action: str, | |
374 session: dict, | |
375 content_name: str, | |
376 iq_elt: Optional[xmlstream.IQ] = None, | |
377 context_elt: Optional[domish.Element] = None | |
378 ) -> Tuple[xmlstream.IQ, domish.Element]: | |
379 """Build an element according to requested action | |
380 | |
381 @param action: a jingle action (see XEP-0166 §7.2), | |
382 session-* actions are not managed here | |
383 transport-replace is managed in the dedicated [transport_replace] method | |
384 @param session: jingle session data | |
385 @param content_name: name of the content | |
386 @param iq_elt: use this IQ instead of creating a new one if provided | |
387 @param context_elt: use this element instead of creating a new one if provided | |
388 @return: parent <iq> element, <transport> or <description> element, according to action | |
389 """ | |
390 # we first build iq, jingle and content element which are the same in every cases | |
391 if iq_elt is not None: | |
392 try: | |
393 jingle_elt = next(iq_elt.elements(NS_JINGLE, "jingle")) | |
394 except StopIteration: | |
395 raise exceptions.InternalError( | |
396 "The <iq> element provided doesn't have a <jingle> element" | |
397 ) | |
398 else: | |
399 iq_elt, jingle_elt = self._build_jingle_elt(client, session, action) | |
400 # FIXME: XEP-0260 § 2.3 Ex 5 has an initiator attribute, but it should not according to XEP-0166 §7.1 table 1, must be checked | |
401 content_data = session["contents"][content_name] | |
402 content_elt = jingle_elt.addElement("content") | |
403 content_elt["name"] = content_name | |
404 content_elt["creator"] = content_data["creator"] | |
405 | |
406 if context_elt is not None: | |
407 pass | |
408 elif action == XEP_0166.A_TRANSPORT_INFO: | |
409 context_elt = transport_elt = content_elt.addElement( | |
410 "transport", content_data["transport"].namespace | |
411 ) | |
412 else: | |
413 raise exceptions.InternalError(f"unmanaged action {action}") | |
414 | |
415 return iq_elt, context_elt | |
416 | |
417 def build_session_info(self, client, session): | |
418 """Build a session-info action | |
419 | |
420 @param session(dict): jingle session data | |
421 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <jingle> element | |
422 """ | |
423 return self._build_jingle_elt(client, session, XEP_0166.A_SESSION_INFO) | |
424 | |
425 def get_application(self, namespace: str) -> ApplicationData: | |
426 """Retreive application corresponding to a namespace | |
427 | |
428 @raise exceptions.NotFound if application can't be found | |
429 """ | |
430 try: | |
431 return self._applications[namespace] | |
432 except KeyError: | |
433 raise exceptions.NotFound( | |
434 f"No application registered for {namespace}" | |
435 ) | |
436 | |
437 def get_content_data(self, content: dict) -> ContentData: | |
438 """"Retrieve application and its argument from content""" | |
439 app_ns = content["app_ns"] | |
440 try: | |
441 application = self.get_application(app_ns) | |
442 except exceptions.NotFound as e: | |
443 raise exceptions.InternalError(str(e)) | |
444 app_args = content.get("app_args", []) | |
445 app_kwargs = content.get("app_kwargs", {}) | |
446 transport_data = content.get("transport_data", {}) | |
447 try: | |
448 content_name = content["name"] | |
449 except KeyError: | |
450 content_name = content["name"] = str(uuid.uuid4()) | |
451 return ContentData( | |
452 application, | |
453 app_args, | |
454 app_kwargs, | |
455 transport_data, | |
456 content_name | |
457 ) | |
458 | |
459 async def initiate( | |
460 self, | |
461 client: SatXMPPEntity, | |
462 peer_jid: jid.JID, | |
463 contents: List[dict], | |
464 encrypted: bool = False, | |
465 **extra_data: Any | |
466 ) -> str: | |
467 """Send a session initiation request | |
468 | |
469 @param peer_jid: jid to establith session with | |
470 @param contents: list of contents to use: | |
471 The dict must have the following keys: | |
472 - app_ns(str): namespace of the application | |
473 the following keys are optional: | |
474 - transport_type(str): type of transport to use (see XEP-0166 §8) | |
475 default to TRANSPORT_STREAMING | |
476 - name(str): name of the content | |
477 - senders(str): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none | |
478 default to BOTH (see XEP-0166 §7.3) | |
479 - app_args(list): args to pass to the application plugin | |
480 - app_kwargs(dict): keyword args to pass to the application plugin | |
481 @param encrypted: if True, session must be encrypted and "encryption" must be set | |
482 to all content data of session | |
483 @return: jingle session id | |
484 """ | |
485 assert contents # there must be at least one content | |
486 if (peer_jid == client.jid | |
487 or client.is_component and peer_jid.host == client.jid.host): | |
488 raise ValueError(_("You can't do a jingle session with yourself")) | |
489 initiator = client.jid | |
490 sid = str(uuid.uuid4()) | |
491 # TODO: session cleaning after timeout ? | |
492 session = client.jingle_sessions[sid] = { | |
493 "id": sid, | |
494 "state": STATE_PENDING, | |
495 "initiator": initiator, | |
496 "role": XEP_0166.ROLE_INITIATOR, | |
497 "local_jid": client.jid, | |
498 "peer_jid": peer_jid, | |
499 "started": time.time(), | |
500 "contents": {}, | |
501 **extra_data, | |
502 } | |
503 | |
504 if not await self.host.trigger.async_point( | |
505 "XEP-0166_initiate", | |
506 client, session, contents | |
507 ): | |
508 return sid | |
509 | |
510 iq_elt, jingle_elt = self._build_jingle_elt( | |
511 client, session, XEP_0166.A_SESSION_INITIATE | |
512 ) | |
513 jingle_elt["initiator"] = initiator.full() | |
514 session["jingle_elt"] = jingle_elt | |
515 | |
516 session_contents = session["contents"] | |
517 | |
518 for content in contents: | |
519 # we get the application plugin | |
520 content_data = self.get_content_data(content) | |
521 | |
522 # and the transport plugin | |
523 transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING) | |
524 try: | |
525 transport = self._type_transports[transport_type][0] | |
526 except IndexError: | |
527 raise exceptions.InternalError( | |
528 "No transport registered for {}".format(transport_type) | |
529 ) | |
530 | |
531 # we build the session data for this content | |
532 application_data = {} | |
533 transport_data = content_data.transport_data | |
534 session_content = { | |
535 "application": content_data.application, | |
536 "application_data": application_data, | |
537 "transport": transport, | |
538 "transport_data": transport_data, | |
539 "creator": XEP_0166.ROLE_INITIATOR, | |
540 "senders": content.get("senders", "both"), | |
541 } | |
542 if content_data.content_name in session_contents: | |
543 raise exceptions.InternalError( | |
544 "There is already a content with this name" | |
545 ) | |
546 session_contents[content_data.content_name] = session_content | |
547 | |
548 # we construct the content element | |
549 content_elt = jingle_elt.addElement("content") | |
550 content_elt["creator"] = session_content["creator"] | |
551 content_elt["name"] = content_data.content_name | |
552 try: | |
553 content_elt["senders"] = content["senders"] | |
554 except KeyError: | |
555 pass | |
556 | |
557 # then the description element | |
558 application_data["desc_elt"] = desc_elt = await utils.as_deferred( | |
559 content_data.application.handler.jingle_session_init, | |
560 client, session, content_data.content_name, | |
561 *content_data.app_args, **content_data.app_kwargs | |
562 ) | |
563 content_elt.addChild(desc_elt) | |
564 | |
565 # and the transport one | |
566 transport_data["transport_elt"] = transport_elt = await utils.as_deferred( | |
567 transport.handler.jingle_session_init, | |
568 client, session, content_data.content_name, | |
569 ) | |
570 content_elt.addChild(transport_elt) | |
571 | |
572 if not await self.host.trigger.async_point( | |
573 "XEP-0166_initiate_elt_built", | |
574 client, session, iq_elt, jingle_elt | |
575 ): | |
576 return sid | |
577 | |
578 # processing is done, we can remove elements | |
579 for content_data in session_contents.values(): | |
580 del content_data["application_data"]["desc_elt"] | |
581 del content_data["transport_data"]["transport_elt"] | |
582 del session["jingle_elt"] | |
583 | |
584 if encrypted: | |
585 for content in session["contents"].values(): | |
586 if "encryption" not in content: | |
587 raise exceptions.EncryptionError( | |
588 "Encryption is requested, but no encryption has been set" | |
589 ) | |
590 | |
591 try: | |
592 await iq_elt.send() | |
593 except Exception as e: | |
594 failure_ = failure.Failure(e) | |
595 self._iq_error(failure_, sid, client) | |
596 raise failure_ | |
597 return sid | |
598 | |
599 def delayed_content_terminate(self, *args, **kwargs): | |
600 """Put content_terminate in queue but don't execute immediately | |
601 | |
602 This is used to terminate a content inside a handler, to avoid modifying contents | |
603 """ | |
604 reactor.callLater(0, self.content_terminate, *args, **kwargs) | |
605 | |
606 def content_terminate(self, client, session, content_name, reason=REASON_SUCCESS): | |
607 """Terminate and remove a content | |
608 | |
609 if there is no more content, then session is terminated | |
610 @param session(dict): jingle session | |
611 @param content_name(unicode): name of the content terminated | |
612 @param reason(unicode): reason of the termination | |
613 """ | |
614 contents = session["contents"] | |
615 del contents[content_name] | |
616 if not contents: | |
617 self.terminate(client, reason, session) | |
618 | |
619 ## defaults methods called when plugin doesn't have them ## | |
620 | |
621 def jingle_request_confirmation_default( | |
622 self, client, action, session, content_name, desc_elt | |
623 ): | |
624 """This method request confirmation for a jingle session""" | |
625 log.debug("Using generic jingle confirmation method") | |
626 return xml_tools.defer_confirm( | |
627 self.host, | |
628 _(CONFIRM_TXT).format(entity=session["peer_jid"].full()), | |
629 _("Confirm Jingle session"), | |
630 profile=client.profile, | |
631 ) | |
632 | |
633 ## jingle events ## | |
634 | |
635 def _on_jingle_request(self, request: domish.Element, client: SatXMPPEntity) -> None: | |
636 defer.ensureDeferred(self.on_jingle_request(client, request)) | |
637 | |
638 async def on_jingle_request( | |
639 self, | |
640 client: SatXMPPEntity, | |
641 request: domish.Element | |
642 ) -> None: | |
643 """Called when any jingle request is received | |
644 | |
645 The request will then be dispatched to appropriate method | |
646 according to current state | |
647 @param request(domish.Element): received IQ request | |
648 """ | |
649 request.handled = True | |
650 jingle_elt = next(request.elements(NS_JINGLE, "jingle")) | |
651 | |
652 # first we need the session id | |
653 try: | |
654 sid = jingle_elt["sid"] | |
655 if not sid: | |
656 raise KeyError | |
657 except KeyError: | |
658 log.warning("Received jingle request has no sid attribute") | |
659 self.sendError(client, "bad-request", None, request) | |
660 return | |
661 | |
662 # then the action | |
663 try: | |
664 action = jingle_elt["action"] | |
665 if not action: | |
666 raise KeyError | |
667 except KeyError: | |
668 log.warning("Received jingle request has no action") | |
669 self.sendError(client, "bad-request", None, request) | |
670 return | |
671 | |
672 peer_jid = jid.JID(request["from"]) | |
673 | |
674 # we get or create the session | |
675 try: | |
676 session = client.jingle_sessions[sid] | |
677 except KeyError: | |
678 if action == XEP_0166.A_SESSION_INITIATE: | |
679 pass | |
680 elif action == XEP_0166.A_SESSION_TERMINATE: | |
681 log.debug( | |
682 "ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format( | |
683 request_id=sid, profile=client.profile | |
684 ) | |
685 ) | |
686 return | |
687 else: | |
688 log.warning( | |
689 "Received request for an unknown session id: {request_id} [{profile}]".format( | |
690 request_id=sid, profile=client.profile | |
691 ) | |
692 ) | |
693 self.sendError(client, "item-not-found", None, request, "unknown-session") | |
694 return | |
695 | |
696 session = client.jingle_sessions[sid] = { | |
697 "id": sid, | |
698 "state": STATE_PENDING, | |
699 "initiator": peer_jid, | |
700 "role": XEP_0166.ROLE_RESPONDER, | |
701 # we store local_jid using request['to'] because for a component the jid | |
702 # used may not be client.jid (if a local part is used). | |
703 "local_jid": jid.JID(request['to']), | |
704 "peer_jid": peer_jid, | |
705 "started": time.time(), | |
706 } | |
707 else: | |
708 if session["peer_jid"] != peer_jid: | |
709 log.warning( | |
710 "sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format( | |
711 sid | |
712 ) | |
713 ) | |
714 self.sendError(client, "service-unavailable", sid, request) | |
715 return | |
716 if session["id"] != sid: | |
717 log.error("session id doesn't match") | |
718 self.sendError(client, "service-unavailable", sid, request) | |
719 raise exceptions.InternalError | |
720 | |
721 if action == XEP_0166.A_SESSION_INITIATE: | |
722 await self.on_session_initiate(client, request, jingle_elt, session) | |
723 elif action == XEP_0166.A_SESSION_TERMINATE: | |
724 self.on_session_terminate(client, request, jingle_elt, session) | |
725 elif action == XEP_0166.A_SESSION_ACCEPT: | |
726 await self.on_session_accept(client, request, jingle_elt, session) | |
727 elif action == XEP_0166.A_SESSION_INFO: | |
728 self.on_session_info(client, request, jingle_elt, session) | |
729 elif action == XEP_0166.A_TRANSPORT_INFO: | |
730 self.on_transport_info(client, request, jingle_elt, session) | |
731 elif action == XEP_0166.A_TRANSPORT_REPLACE: | |
732 await self.on_transport_replace(client, request, jingle_elt, session) | |
733 elif action == XEP_0166.A_TRANSPORT_ACCEPT: | |
734 self.on_transport_accept(client, request, jingle_elt, session) | |
735 elif action == XEP_0166.A_TRANSPORT_REJECT: | |
736 self.on_transport_reject(client, request, jingle_elt, session) | |
737 else: | |
738 raise exceptions.InternalError(f"Unknown action {action}") | |
739 | |
740 ## Actions callbacks ## | |
741 | |
742 def _parse_elements( | |
743 self, | |
744 jingle_elt: domish.Element, | |
745 session: dict, | |
746 request: domish.Element, | |
747 client: SatXMPPEntity, | |
748 new: bool = False, | |
749 creator: str = ROLE_INITIATOR, | |
750 with_application: bool =True, | |
751 with_transport: bool = True, | |
752 store_in_session: bool = True, | |
753 ) -> Dict[str, dict]: | |
754 """Parse contents elements and fill contents_dict accordingly | |
755 | |
756 after the parsing, contents_dict will containt handlers, "desc_elt" and | |
757 "transport_elt" | |
758 @param jingle_elt: parent <jingle> element, containing one or more <content> | |
759 @param session: session data | |
760 @param request: the whole request | |
761 @param client: %(doc_client)s | |
762 @param new: True if the content is new and must be created, | |
763 else the content must exists, and session data will be filled | |
764 @param creator: only used if new is True: creating pear (see § 7.3) | |
765 @param with_application: if True, raise an error if there is no <description> | |
766 element else ignore it | |
767 @param with_transport: if True, raise an error if there is no <transport> element | |
768 else ignore it | |
769 @param store_in_session: if True, the ``session`` contents will be updated with | |
770 the parsed elements. | |
771 Use False when you parse an action which can happen at any time (e.g. | |
772 transport-info) and meaning that a parsed element may already be present in | |
773 the session (e.g. if an authorisation request is waiting for user answer), | |
774 This can't be used when ``new`` is set. | |
775 @return: contents_dict (from session, or a new one if "store_in_session" is False) | |
776 @raise exceptions.CancelError: the error is treated and the calling method can | |
777 cancel the treatment (i.e. return) | |
778 """ | |
779 if store_in_session: | |
780 contents_dict = session["contents"] | |
781 else: | |
782 if new: | |
783 raise exceptions.InternalError( | |
784 '"store_in_session" must not be used when "new" is set' | |
785 ) | |
786 contents_dict = {n: {} for n in session["contents"]} | |
787 content_elts = jingle_elt.elements(NS_JINGLE, "content") | |
788 | |
789 for content_elt in content_elts: | |
790 name = content_elt["name"] | |
791 | |
792 if new: | |
793 # the content must not exist, we check it | |
794 if not name or name in contents_dict: | |
795 self.sendError(client, "bad-request", session["id"], request) | |
796 raise exceptions.CancelError | |
797 content_data = contents_dict[name] = { | |
798 "creator": creator, | |
799 "senders": content_elt.attributes.get("senders", "both"), | |
800 } | |
801 else: | |
802 # the content must exist, we check it | |
803 try: | |
804 content_data = contents_dict[name] | |
805 except KeyError: | |
806 log.warning("Other peer try to access an unknown content") | |
807 self.sendError(client, "bad-request", session["id"], request) | |
808 raise exceptions.CancelError | |
809 | |
810 # application | |
811 if with_application: | |
812 desc_elt = content_elt.description | |
813 if not desc_elt: | |
814 self.sendError(client, "bad-request", session["id"], request) | |
815 raise exceptions.CancelError | |
816 | |
817 if new: | |
818 # the content is new, we need to check and link the application | |
819 app_ns = desc_elt.uri | |
820 if not app_ns or app_ns == NS_JINGLE: | |
821 self.sendError(client, "bad-request", session["id"], request) | |
822 raise exceptions.CancelError | |
823 | |
824 try: | |
825 application = self._applications[app_ns] | |
826 except KeyError: | |
827 log.warning( | |
828 "Unmanaged application namespace [{}]".format(app_ns) | |
829 ) | |
830 self.sendError( | |
831 client, "service-unavailable", session["id"], request | |
832 ) | |
833 raise exceptions.CancelError | |
834 | |
835 content_data["application"] = application | |
836 content_data["application_data"] = {} | |
837 else: | |
838 # the content exists, we check that we have not a former desc_elt | |
839 if "desc_elt" in content_data: | |
840 raise exceptions.InternalError( | |
841 "desc_elt should not exist at this point" | |
842 ) | |
843 | |
844 content_data["desc_elt"] = desc_elt | |
845 | |
846 # transport | |
847 if with_transport: | |
848 transport_elt = content_elt.transport | |
849 if not transport_elt: | |
850 self.sendError(client, "bad-request", session["id"], request) | |
851 raise exceptions.CancelError | |
852 | |
853 if new: | |
854 # the content is new, we need to check and link the transport | |
855 transport_ns = transport_elt.uri | |
856 if not app_ns or app_ns == NS_JINGLE: | |
857 self.sendError(client, "bad-request", session["id"], request) | |
858 raise exceptions.CancelError | |
859 | |
860 try: | |
861 transport = self._transports[transport_ns] | |
862 except KeyError: | |
863 raise exceptions.InternalError( | |
864 "No transport registered for namespace {}".format( | |
865 transport_ns | |
866 ) | |
867 ) | |
868 content_data["transport"] = transport | |
869 content_data["transport_data"] = {} | |
870 else: | |
871 # the content exists, we check that we have not a former transport_elt | |
872 if "transport_elt" in content_data: | |
873 raise exceptions.InternalError( | |
874 "transport_elt should not exist at this point" | |
875 ) | |
876 | |
877 content_data["transport_elt"] = transport_elt | |
878 | |
879 return contents_dict | |
880 | |
881 def _ignore(self, client, action, session, content_name, elt): | |
882 """Dummy method used when not exception must be raised if a method is not implemented in _call_plugins | |
883 | |
884 must be used as app_default_cb and/or transp_default_cb | |
885 """ | |
886 return elt | |
887 | |
888 def _call_plugins( | |
889 self, | |
890 client: SatXMPPEntity, | |
891 action: str, | |
892 session: dict, | |
893 app_method_name: Optional[str] = "jingle_handler", | |
894 transp_method_name: Optional[str] = "jingle_handler", | |
895 app_default_cb: Optional[Callable] = None, | |
896 transp_default_cb: Optional[Callable] = None, | |
897 delete: bool = True, | |
898 elements: bool = True, | |
899 force_element: Optional[domish.Element] = None | |
900 ) -> List[defer.Deferred]: | |
901 """Call application and transport plugin methods for all contents | |
902 | |
903 @param action: jingle action name | |
904 @param session: jingle session data | |
905 @param app_method_name: name of the method to call for applications | |
906 None to ignore | |
907 @param transp_method_name: name of the method to call for transports | |
908 None to ignore | |
909 @param app_default_cb: default callback to use if plugin has not app_method_name | |
910 None to raise an exception instead | |
911 @param transp_default_cb: default callback to use if plugin has not transp_method_name | |
912 None to raise an exception instead | |
913 @param delete: if True, remove desc_elt and transport_elt from session | |
914 ignored if elements is False | |
915 @param elements: True if elements(desc_elt and tranport_elt) must be managed | |
916 must be True if _call_plugins is used in a request, and False if it is used | |
917 after a request (i.e. on <iq> result or error) | |
918 @param force_element: if elements is False, it is used as element parameter | |
919 else it is ignored | |
920 @return : list of launched Deferred | |
921 @raise exceptions.NotFound: method is not implemented | |
922 """ | |
923 contents_dict = session["contents"] | |
924 defers_list = [] | |
925 for content_name, content_data in contents_dict.items(): | |
926 for method_name, handler_key, default_cb, elt_name in ( | |
927 (app_method_name, "application", app_default_cb, "desc_elt"), | |
928 (transp_method_name, "transport", transp_default_cb, "transport_elt"), | |
929 ): | |
930 if method_name is None: | |
931 continue | |
932 | |
933 handler = content_data[handler_key].handler | |
934 try: | |
935 method = getattr(handler, method_name) | |
936 except AttributeError: | |
937 if default_cb is None: | |
938 raise exceptions.NotFound( | |
939 "{} not implemented !".format(method_name) | |
940 ) | |
941 else: | |
942 method = default_cb | |
943 if elements: | |
944 elt = content_data.pop(elt_name) if delete else content_data[elt_name] | |
945 else: | |
946 elt = force_element | |
947 d = utils.as_deferred( | |
948 method, client, action, session, content_name, elt | |
949 ) | |
950 defers_list.append(d) | |
951 | |
952 return defers_list | |
953 | |
954 async def on_session_initiate( | |
955 self, | |
956 client: SatXMPPEntity, | |
957 request: domish.Element, | |
958 jingle_elt: domish.Element, | |
959 session: Dict[str, Any] | |
960 ) -> None: | |
961 """Called on session-initiate action | |
962 | |
963 The "jingle_request_confirmation" method of each application will be called | |
964 (or self.jingle_request_confirmation_default if the former doesn't exist). | |
965 The session is only accepted if all application are confirmed. | |
966 The application must manage itself multiple contents scenari (e.g. audio/video). | |
967 @param client: %(doc_client)s | |
968 @param request(domish.Element): full request | |
969 @param jingle_elt(domish.Element): <jingle> element | |
970 @param session(dict): session data | |
971 """ | |
972 if "contents" in session: | |
973 raise exceptions.InternalError( | |
974 "Contents dict should not already exist at this point" | |
975 ) | |
976 session["contents"] = contents_dict = {} | |
977 | |
978 try: | |
979 self._parse_elements( | |
980 jingle_elt, session, request, client, True, XEP_0166.ROLE_INITIATOR | |
981 ) | |
982 except exceptions.CancelError: | |
983 return | |
984 | |
985 if not contents_dict: | |
986 # there MUST be at least one content | |
987 self.sendError(client, "bad-request", session["id"], request) | |
988 return | |
989 | |
990 # at this point we can send the <iq/> result to confirm reception of the request | |
991 client.send(xmlstream.toResponse(request, "result")) | |
992 | |
993 | |
994 assert "jingle_elt" not in session | |
995 session["jingle_elt"] = jingle_elt | |
996 if not await self.host.trigger.async_point( | |
997 "XEP-0166_on_session_initiate", | |
998 client, session, request, jingle_elt | |
999 ): | |
1000 return | |
1001 | |
1002 await defer.DeferredList(self._call_plugins( | |
1003 client, | |
1004 XEP_0166.A_PREPARE_CONFIRMATION, | |
1005 session, | |
1006 delete=False | |
1007 )) | |
1008 | |
1009 # we now request each application plugin confirmation | |
1010 # and if all are accepted, we can accept the session | |
1011 confirm_defers = self._call_plugins( | |
1012 client, | |
1013 XEP_0166.A_SESSION_INITIATE, | |
1014 session, | |
1015 "jingle_request_confirmation", | |
1016 None, | |
1017 self.jingle_request_confirmation_default, | |
1018 delete=False, | |
1019 ) | |
1020 | |
1021 confirm_dlist = defer.gatherResults(confirm_defers) | |
1022 confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client) | |
1023 confirm_dlist.addErrback(self._jingle_error_cb, session, request, client) | |
1024 | |
1025 def _confirmation_cb(self, confirm_results, session, jingle_elt, client): | |
1026 """Method called when confirmation from user has been received | |
1027 | |
1028 This method is only called for the responder | |
1029 @param confirm_results(list[bool]): all True if session is accepted | |
1030 @param session(dict): session data | |
1031 @param jingle_elt(domish.Element): jingle data of this session | |
1032 @param client: %(doc_client)s | |
1033 """ | |
1034 del session["jingle_elt"] | |
1035 confirmed = all(confirm_results) | |
1036 if not confirmed: | |
1037 return self.terminate(client, XEP_0166.REASON_DECLINE, session) | |
1038 | |
1039 iq_elt, jingle_elt = self._build_jingle_elt( | |
1040 client, session, XEP_0166.A_SESSION_ACCEPT | |
1041 ) | |
1042 jingle_elt["responder"] = session['local_jid'].full() | |
1043 session["jingle_elt"] = jingle_elt | |
1044 | |
1045 # contents | |
1046 | |
1047 def addElement(domish_elt, content_elt): | |
1048 content_elt.addChild(domish_elt) | |
1049 | |
1050 defers_list = [] | |
1051 | |
1052 for content_name, content_data in session["contents"].items(): | |
1053 content_elt = jingle_elt.addElement("content") | |
1054 content_elt["creator"] = XEP_0166.ROLE_INITIATOR | |
1055 content_elt["name"] = content_name | |
1056 | |
1057 application = content_data["application"] | |
1058 app_session_accept_cb = application.handler.jingle_handler | |
1059 | |
1060 app_d = utils.as_deferred( | |
1061 app_session_accept_cb, | |
1062 client, | |
1063 XEP_0166.A_SESSION_INITIATE, | |
1064 session, | |
1065 content_name, | |
1066 content_data.pop("desc_elt"), | |
1067 ) | |
1068 app_d.addCallback(addElement, content_elt) | |
1069 defers_list.append(app_d) | |
1070 | |
1071 transport = content_data["transport"] | |
1072 transport_session_accept_cb = transport.handler.jingle_handler | |
1073 | |
1074 transport_d = utils.as_deferred( | |
1075 transport_session_accept_cb, | |
1076 client, | |
1077 XEP_0166.A_SESSION_INITIATE, | |
1078 session, | |
1079 content_name, | |
1080 content_data.pop("transport_elt"), | |
1081 ) | |
1082 transport_d.addCallback(addElement, content_elt) | |
1083 defers_list.append(transport_d) | |
1084 | |
1085 d_list = defer.DeferredList(defers_list) | |
1086 d_list.addCallback( | |
1087 lambda __: self._call_plugins( | |
1088 client, | |
1089 XEP_0166.A_PREPARE_RESPONDER, | |
1090 session, | |
1091 app_method_name=None, | |
1092 elements=False, | |
1093 ) | |
1094 ) | |
1095 d_list.addCallback(lambda __: session.pop("jingle_elt")) | |
1096 d_list.addCallback(lambda __: iq_elt.send()) | |
1097 | |
1098 def change_state(__, session): | |
1099 session["state"] = STATE_ACTIVE | |
1100 | |
1101 d_list.addCallback(change_state, session) | |
1102 d_list.addCallback( | |
1103 lambda __: self._call_plugins( | |
1104 client, XEP_0166.A_ACCEPTED_ACK, session, elements=False | |
1105 ) | |
1106 ) | |
1107 d_list.addErrback(self._iq_error, session["id"], client) | |
1108 return d_list | |
1109 | |
1110 def on_session_terminate(self, client, request, jingle_elt, session): | |
1111 # TODO: check reason, display a message to user if needed | |
1112 log.debug(f"Jingle Session {session['id']} terminated") | |
1113 try: | |
1114 reason_elt = next(jingle_elt.elements(NS_JINGLE, "reason")) | |
1115 except StopIteration: | |
1116 log.warning("No reason given for session termination") | |
1117 reason_elt = jingle_elt.addElement("reason") | |
1118 | |
1119 terminate_defers = self._call_plugins( | |
1120 client, | |
1121 XEP_0166.A_SESSION_TERMINATE, | |
1122 session, | |
1123 "jingle_terminate", | |
1124 "jingle_terminate", | |
1125 self._ignore, | |
1126 self._ignore, | |
1127 elements=False, | |
1128 force_element=reason_elt, | |
1129 ) | |
1130 terminate_dlist = defer.DeferredList(terminate_defers) | |
1131 | |
1132 terminate_dlist.addCallback(lambda __: self._del_session(client, session["id"])) | |
1133 client.send(xmlstream.toResponse(request, "result")) | |
1134 | |
1135 async def on_session_accept(self, client, request, jingle_elt, session): | |
1136 """Method called once session is accepted | |
1137 | |
1138 This method is only called for initiator | |
1139 @param client: %(doc_client)s | |
1140 @param request(domish.Element): full <iq> request | |
1141 @param jingle_elt(domish.Element): the <jingle> element | |
1142 @param session(dict): session data | |
1143 """ | |
1144 log.debug(f"Jingle session {session['id']} has been accepted") | |
1145 | |
1146 try: | |
1147 self._parse_elements(jingle_elt, session, request, client) | |
1148 except exceptions.CancelError: | |
1149 return | |
1150 | |
1151 # at this point we can send the <iq/> result to confirm reception of the request | |
1152 client.send(xmlstream.toResponse(request, "result")) | |
1153 # and change the state | |
1154 session["state"] = STATE_ACTIVE | |
1155 session["jingle_elt"] = jingle_elt | |
1156 | |
1157 await defer.DeferredList(self._call_plugins( | |
1158 client, | |
1159 XEP_0166.A_PREPARE_INITIATOR, | |
1160 session, | |
1161 delete=False | |
1162 )) | |
1163 | |
1164 negociate_defers = [] | |
1165 negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) | |
1166 | |
1167 negociate_dlist = defer.gatherResults(negociate_defers) | |
1168 | |
1169 # after negociations we start the transfer | |
1170 negociate_dlist.addCallback( | |
1171 lambda __: self._call_plugins( | |
1172 client, XEP_0166.A_START, session, app_method_name=None, elements=False | |
1173 ) | |
1174 ) | |
1175 negociate_dlist.addCallback(lambda __: session.pop("jingle_elt")) | |
1176 | |
1177 def _on_session_cb(self, result, client, request, jingle_elt, session): | |
1178 client.send(xmlstream.toResponse(request, "result")) | |
1179 | |
1180 def _on_session_eb(self, failure_, client, request, jingle_elt, session): | |
1181 log.error("Error while handling on_session_info: {}".format(failure_.value)) | |
1182 # XXX: only error managed so far, maybe some applications/transports need more | |
1183 self.sendError( | |
1184 client, "feature-not-implemented", None, request, "unsupported-info" | |
1185 ) | |
1186 | |
1187 def on_session_info(self, client, request, jingle_elt, session): | |
1188 """Method called when a session-info action is received from other peer | |
1189 | |
1190 This method is only called for initiator | |
1191 @param client: %(doc_client)s | |
1192 @param request(domish.Element): full <iq> request | |
1193 @param jingle_elt(domish.Element): the <jingle> element | |
1194 @param session(dict): session data | |
1195 """ | |
1196 if not jingle_elt.children: | |
1197 # this is a session ping, see XEP-0166 §6.8 | |
1198 client.send(xmlstream.toResponse(request, "result")) | |
1199 return | |
1200 | |
1201 try: | |
1202 # XXX: session-info is most likely only used for application, so we don't call transport plugins | |
1203 # if a future transport use it, this behaviour must be adapted | |
1204 defers = self._call_plugins( | |
1205 client, | |
1206 XEP_0166.A_SESSION_INFO, | |
1207 session, | |
1208 "jingle_session_info", | |
1209 None, | |
1210 elements=False, | |
1211 force_element=jingle_elt, | |
1212 ) | |
1213 except exceptions.NotFound as e: | |
1214 self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) | |
1215 return | |
1216 | |
1217 dlist = defer.DeferredList(defers, fireOnOneErrback=True) | |
1218 dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) | |
1219 dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) | |
1220 | |
1221 async def on_transport_replace(self, client, request, jingle_elt, session): | |
1222 """A transport change is requested | |
1223 | |
1224 The request is parsed, and jingle_handler is called on concerned transport plugin(s) | |
1225 @param client: %(doc_client)s | |
1226 @param request(domish.Element): full <iq> request | |
1227 @param jingle_elt(domish.Element): the <jingle> element | |
1228 @param session(dict): session data | |
1229 """ | |
1230 log.debug("Other peer wants to replace the transport") | |
1231 try: | |
1232 self._parse_elements( | |
1233 jingle_elt, session, request, client, with_application=False | |
1234 ) | |
1235 except exceptions.CancelError: | |
1236 defer.returnValue(None) | |
1237 | |
1238 client.send(xmlstream.toResponse(request, "result")) | |
1239 | |
1240 content_name = None | |
1241 to_replace = [] | |
1242 | |
1243 for content_name, content_data in session["contents"].items(): | |
1244 try: | |
1245 transport_elt = content_data.pop("transport_elt") | |
1246 except KeyError: | |
1247 continue | |
1248 transport_ns = transport_elt.uri | |
1249 try: | |
1250 transport = self._transports[transport_ns] | |
1251 except KeyError: | |
1252 log.warning( | |
1253 "Other peer want to replace current transport with an unknown one: {}".format( | |
1254 transport_ns | |
1255 ) | |
1256 ) | |
1257 content_name = None | |
1258 break | |
1259 to_replace.append((content_name, content_data, transport, transport_elt)) | |
1260 | |
1261 if content_name is None: | |
1262 # wa can't accept the replacement | |
1263 iq_elt, reject_jingle_elt = self._build_jingle_elt( | |
1264 client, session, XEP_0166.A_TRANSPORT_REJECT | |
1265 ) | |
1266 for child in jingle_elt.children: | |
1267 reject_jingle_elt.addChild(child) | |
1268 | |
1269 iq_elt.send() | |
1270 defer.returnValue(None) | |
1271 | |
1272 # at this point, everything is alright and we can replace the transport(s) | |
1273 # this is similar to an session-accept action, but for transports only | |
1274 iq_elt, accept_jingle_elt = self._build_jingle_elt( | |
1275 client, session, XEP_0166.A_TRANSPORT_ACCEPT | |
1276 ) | |
1277 for content_name, content_data, transport, transport_elt in to_replace: | |
1278 # we can now actually replace the transport | |
1279 await utils.as_deferred( | |
1280 content_data["transport"].handler.jingle_handler, | |
1281 client, XEP_0166.A_DESTROY, session, content_name, None | |
1282 ) | |
1283 content_data["transport"] = transport | |
1284 content_data["transport_data"].clear() | |
1285 # and build the element | |
1286 content_elt = accept_jingle_elt.addElement("content") | |
1287 content_elt["name"] = content_name | |
1288 content_elt["creator"] = content_data["creator"] | |
1289 # we notify the transport and insert its <transport/> in the answer | |
1290 accept_transport_elt = await utils.as_deferred( | |
1291 transport.handler.jingle_handler, | |
1292 client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt | |
1293 ) | |
1294 content_elt.addChild(accept_transport_elt) | |
1295 # there is no confirmation needed here, so we can directly prepare it | |
1296 await utils.as_deferred( | |
1297 transport.handler.jingle_handler, | |
1298 client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None | |
1299 ) | |
1300 | |
1301 iq_elt.send() | |
1302 | |
1303 def on_transport_accept(self, client, request, jingle_elt, session): | |
1304 """Method called once transport replacement is accepted | |
1305 | |
1306 @param client: %(doc_client)s | |
1307 @param request(domish.Element): full <iq> request | |
1308 @param jingle_elt(domish.Element): the <jingle> element | |
1309 @param session(dict): session data | |
1310 """ | |
1311 log.debug("new transport has been accepted") | |
1312 | |
1313 try: | |
1314 self._parse_elements( | |
1315 jingle_elt, session, request, client, with_application=False | |
1316 ) | |
1317 except exceptions.CancelError: | |
1318 return | |
1319 | |
1320 # at this point we can send the <iq/> result to confirm reception of the request | |
1321 client.send(xmlstream.toResponse(request, "result")) | |
1322 | |
1323 negociate_defers = [] | |
1324 negociate_defers = self._call_plugins( | |
1325 client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None | |
1326 ) | |
1327 | |
1328 negociate_dlist = defer.DeferredList(negociate_defers) | |
1329 | |
1330 # after negociations we start the transfer | |
1331 negociate_dlist.addCallback( | |
1332 lambda __: self._call_plugins( | |
1333 client, XEP_0166.A_START, session, app_method_name=None, elements=False | |
1334 ) | |
1335 ) | |
1336 | |
1337 def on_transport_reject(self, client, request, jingle_elt, session): | |
1338 """Method called when a transport replacement is refused | |
1339 | |
1340 @param client: %(doc_client)s | |
1341 @param request(domish.Element): full <iq> request | |
1342 @param jingle_elt(domish.Element): the <jingle> element | |
1343 @param session(dict): session data | |
1344 """ | |
1345 # XXX: for now, we terminate the session in case of transport-reject | |
1346 # this behaviour may change in the future | |
1347 self.terminate(client, "failed-transport", session) | |
1348 | |
1349 def on_transport_info( | |
1350 self, | |
1351 client: SatXMPPEntity, | |
1352 request: domish.Element, | |
1353 jingle_elt: domish.Element, | |
1354 session: dict | |
1355 ) -> None: | |
1356 """Method called when a transport-info action is received from other peer | |
1357 | |
1358 The request is parsed, and jingle_handler is called on concerned transport | |
1359 plugin(s) | |
1360 @param client: %(doc_client)s | |
1361 @param request: full <iq> request | |
1362 @param jingle_elt: the <jingle> element | |
1363 @param session: session data | |
1364 """ | |
1365 log.debug(f"Jingle session {session['id']} has been accepted") | |
1366 | |
1367 try: | |
1368 parsed_contents = self._parse_elements( | |
1369 jingle_elt, session, request, client, with_application=False, | |
1370 store_in_session=False | |
1371 ) | |
1372 except exceptions.CancelError: | |
1373 return | |
1374 | |
1375 # The parsing was OK, we send the <iq> result | |
1376 client.send(xmlstream.toResponse(request, "result")) | |
1377 | |
1378 for content_name, content_data in session["contents"].items(): | |
1379 try: | |
1380 transport_elt = parsed_contents[content_name]["transport_elt"] | |
1381 except KeyError: | |
1382 continue | |
1383 else: | |
1384 utils.as_deferred( | |
1385 content_data["transport"].handler.jingle_handler, | |
1386 client, | |
1387 XEP_0166.A_TRANSPORT_INFO, | |
1388 session, | |
1389 content_name, | |
1390 transport_elt, | |
1391 ) | |
1392 | |
1393 | |
1394 @implementer(iwokkel.IDisco) | |
1395 class XEP_0166_handler(xmlstream.XMPPHandler): | |
1396 | |
1397 def __init__(self, plugin_parent): | |
1398 self.plugin_parent = plugin_parent | |
1399 | |
1400 def connectionInitialized(self): | |
1401 self.xmlstream.addObserver( | |
1402 JINGLE_REQUEST, self.plugin_parent._on_jingle_request, client=self.parent | |
1403 ) | |
1404 | |
1405 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
1406 return [disco.DiscoFeature(NS_JINGLE)] | |
1407 | |
1408 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
1409 return [] |