comparison libervia/backend/plugins/plugin_xep_0166/__init__.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents e11b13418ba6
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4239:a38559e6d6e2 4240:79c8a70e1813
325 d.addErrback(self._terminate_eb) 325 d.addErrback(self._terminate_eb)
326 return d 326 return d
327 327
328 ## errors which doesn't imply a stanza sending ## 328 ## errors which doesn't imply a stanza sending ##
329 329
330 def _iq_error(self, failure_, sid, client): 330 def _jingle_error_cb(
331 """Called when we got an <iq/> error 331 self,
332 332 failure_: failure.Failure|BaseException,
333 @param failure_(failure.Failure): the exceptions raised 333 session: dict,
334 @param sid(unicode): jingle session id 334 request: domish.Element,
335 """ 335 client: SatXMPPEntity
336 log.warning( 336 ) -> defer.Deferred:
337 "Error while sending jingle <iq/> stanza: {failure_}".format(
338 failure_=failure_.value
339 )
340 )
341 self.delete_session(client, sid)
342
343 def _jingle_error_cb(self, failure_, session, request, client):
344 """Called when something is going wrong while parsing jingle request 337 """Called when something is going wrong while parsing jingle request
345 338
346 The error condition depend of the exceptions raised: 339 The error condition depend of the exceptions raised:
347 exceptions.DataError raise a bad-request condition 340 exceptions.DataError raise a bad-request condition
348 @param fail(failure.Failure): the exceptions raised 341 @param fail: the exceptions raised
349 @param session(dict): data of the session 342 @param session: data of the session
350 @param request(domsih.Element): jingle request 343 @param request: jingle request
351 @param client: %(doc_client)s 344 @param client: SatXMPPEntity instance
352 """ 345 """
346 if not isinstance(failure_, failure.Failure):
347 failure_ = failure.Failure(failure_)
353 del session["jingle_elt"] 348 del session["jingle_elt"]
354 log.warning(f"Error while processing jingle request [{client.profile}]") 349 log.warning(f"Error while processing jingle request [{client.profile}]")
355 if isinstance(failure_.value, defer.FirstError): 350 if isinstance(failure_.value, defer.FirstError):
356 failure_ = failure_.value.subFailure.value 351 failure_ = failure_.value.subFailure.value
357 if isinstance(failure_, exceptions.DataError): 352 if isinstance(failure_, exceptions.DataError):
367 ## methods used by other plugins ## 362 ## methods used by other plugins ##
368 363
369 def register_application( 364 def register_application(
370 self, 365 self,
371 namespace: str, 366 namespace: str,
372 handler: BaseApplicationHandler 367 handler: BaseApplicationHandler,
368 priority: int = 0
373 ) -> None: 369 ) -> None:
374 """Register an application plugin 370 """Register an application plugin
375 371
376 @param namespace(unicode): application namespace managed by the plugin 372 @param namespace(unicode): application namespace managed by the plugin
377 @param handler(object): instance of a class which manage the application. 373 @param handler(object): instance of a class which manage the application.
388 client, self, action, session, content_name, transport_elt 384 client, self, action, session, content_name, transport_elt
389 ): 385 ):
390 called on several action to negociate the application or transport 386 called on several action to negociate the application or transport
391 - jingle_terminate: called on session terminate, with reason_elt 387 - jingle_terminate: called on session terminate, with reason_elt
392 May be used to clean session 388 May be used to clean session
389 @param priority: Priority of the application. It is used when several contents
390 from different applications are used to determine in which order methods must
391 be called. An example use case is for remote control: when using remote
392 control, contents with call application may be used at the same time, but the
393 overall session is a remote control one (and so, a remote control confirmation
394 must be requested to the user, not a call one).
395 If two applications have the same priority, methods are called in the same
396 order as the content appears.
393 """ 397 """
394 if namespace in self._applications: 398 if namespace in self._applications:
395 raise exceptions.ConflictError( 399 raise exceptions.ConflictError(
396 f"Trying to register already registered namespace {namespace}" 400 f"Trying to register already registered namespace {namespace}"
397 ) 401 )
398 self._applications[namespace] = ApplicationData( 402 self._applications[namespace] = ApplicationData(
399 namespace=namespace, handler=handler 403 namespace=namespace, handler=handler, priority=priority
400 ) 404 )
401 log.debug("new jingle application registered") 405 log.debug("new jingle application registered")
402 406
403 def register_transport( 407 def register_transport(
404 self, 408 self,
685 "Encryption is requested, but no encryption has been set" 689 "Encryption is requested, but no encryption has been set"
686 ) 690 )
687 691
688 try: 692 try:
689 await iq_elt.send() 693 await iq_elt.send()
690 except Exception as e: 694 except Exception:
691 failure_ = failure.Failure(e) 695 log.exception("Error while sending jingle <iq/> stanza")
692 self._iq_error(failure_, sid, client) 696 self.delete_session(client, sid)
693 raise failure_ 697 raise
694 return sid 698 return sid
695 699
696 def delayed_content_terminate(self, *args, **kwargs): 700 def delayed_content_terminate(self, *args, **kwargs):
697 """Put content_terminate in queue but don't execute immediately 701 """Put content_terminate in queue but don't execute immediately
698 702
821 elif action == XEP_0166.A_SESSION_TERMINATE: 825 elif action == XEP_0166.A_SESSION_TERMINATE:
822 await self.on_session_terminate(client, request, jingle_elt, session) 826 await self.on_session_terminate(client, request, jingle_elt, session)
823 elif action == XEP_0166.A_SESSION_ACCEPT: 827 elif action == XEP_0166.A_SESSION_ACCEPT:
824 await self.on_session_accept(client, request, jingle_elt, session) 828 await self.on_session_accept(client, request, jingle_elt, session)
825 elif action == XEP_0166.A_SESSION_INFO: 829 elif action == XEP_0166.A_SESSION_INFO:
826 self.on_session_info(client, request, jingle_elt, session) 830 await self.on_session_info(client, request, jingle_elt, session)
827 elif action == XEP_0166.A_TRANSPORT_INFO: 831 elif action == XEP_0166.A_TRANSPORT_INFO:
828 self.on_transport_info(client, request, jingle_elt, session) 832 self.on_transport_info(client, request, jingle_elt, session)
829 elif action == XEP_0166.A_TRANSPORT_REPLACE: 833 elif action == XEP_0166.A_TRANSPORT_REPLACE:
830 await self.on_transport_replace(client, request, jingle_elt, session) 834 await self.on_transport_replace(client, request, jingle_elt, session)
831 elif action == XEP_0166.A_TRANSPORT_ACCEPT: 835 elif action == XEP_0166.A_TRANSPORT_ACCEPT:
832 self.on_transport_accept(client, request, jingle_elt, session) 836 await self.on_transport_accept(client, request, jingle_elt, session)
833 elif action == XEP_0166.A_TRANSPORT_REJECT: 837 elif action == XEP_0166.A_TRANSPORT_REJECT:
834 self.on_transport_reject(client, request, jingle_elt, session) 838 self.on_transport_reject(client, request, jingle_elt, session)
835 else: 839 else:
836 raise exceptions.InternalError(f"Unknown action {action}") 840 raise exceptions.InternalError(f"Unknown action {action}")
837 841
981 985
982 must be used as app_default_cb and/or transp_default_cb 986 must be used as app_default_cb and/or transp_default_cb
983 """ 987 """
984 return elt 988 return elt
985 989
986 def _call_plugins( 990 async def _call_plugins(
987 self, 991 self,
988 client: SatXMPPEntity, 992 client: SatXMPPEntity,
989 action: str, 993 action: str,
990 session: dict, 994 session: dict,
991 app_method_name: Optional[str] = "jingle_handler", 995 app_method_name: Optional[str] = "jingle_handler",
993 app_default_cb: Optional[Callable] = None, 997 app_default_cb: Optional[Callable] = None,
994 transp_default_cb: Optional[Callable] = None, 998 transp_default_cb: Optional[Callable] = None,
995 delete: bool = True, 999 delete: bool = True,
996 elements: bool = True, 1000 elements: bool = True,
997 force_element: Optional[domish.Element] = None 1001 force_element: Optional[domish.Element] = None
998 ) -> List[defer.Deferred]: 1002 ) -> list[Any]:
999 """Call application and transport plugin methods for all contents 1003 """Call application and transport plugin methods for all contents
1000 1004
1001 @param action: jingle action name 1005 @param action: jingle action name
1002 @param session: jingle session data 1006 @param session: jingle session data
1003 @param app_method_name: name of the method to call for applications 1007 @param app_method_name: name of the method to call for applications
1004 None to ignore 1008 None to ignore
1005 @param transp_method_name: name of the method to call for transports 1009 @param transp_method_name: name of the method to call for transports
1006 None to ignore 1010 None to ignore
1007 @param app_default_cb: default callback to use if plugin has not app_method_name 1011 @param app_default_cb: default callback to use if plugin has not app_method_name
1008 None to raise an exception instead 1012 None to raise an exception instead
1009 @param transp_default_cb: default callback to use if plugin has not transp_method_name 1013 @param transp_default_cb: default callback to use if plugin has not
1014 transp_method_name
1010 None to raise an exception instead 1015 None to raise an exception instead
1011 @param delete: if True, remove desc_elt and transport_elt from session 1016 @param delete: if True, remove desc_elt and transport_elt from session
1012 ignored if elements is False 1017 ignored if elements is False
1013 @param elements: True if elements(desc_elt and tranport_elt) must be managed 1018 @param elements: True if elements(desc_elt and tranport_elt) must be managed
1014 must be True if _call_plugins is used in a request, and False if it is used 1019 must be True if _call_plugins is used in a request, and False if it is used
1015 after a request (i.e. on <iq> result or error) 1020 after a request (i.e. on <iq> result or error)
1016 @param force_element: if elements is False, it is used as element parameter 1021 @param force_element: if elements is False, it is used as element parameter
1017 else it is ignored 1022 else it is ignored
1018 @return : list of launched Deferred 1023 @return : list of launched methods results
1019 @raise exceptions.NotFound: method is not implemented 1024 @raise exceptions.NotFound: method is not implemented
1020 """ 1025 """
1021 contents_dict = session["contents"] 1026 contents_dict = session["contents"]
1022 defers_list = [] 1027 results = []
1023 for content_name, content_data in contents_dict.items(): 1028 for content_name, content_data in contents_dict.items():
1024 for method_name, handler_key, default_cb, elt_name in ( 1029 for method_name, handler_key, default_cb, elt_name in (
1025 (app_method_name, "application", app_default_cb, "desc_elt"), 1030 (app_method_name, "application", app_default_cb, "desc_elt"),
1026 (transp_method_name, "transport", transp_default_cb, "transport_elt"), 1031 (transp_method_name, "transport", transp_default_cb, "transport_elt"),
1027 ): 1032 ):
1040 method = default_cb 1045 method = default_cb
1041 if elements: 1046 if elements:
1042 elt = content_data.pop(elt_name) if delete else content_data[elt_name] 1047 elt = content_data.pop(elt_name) if delete else content_data[elt_name]
1043 else: 1048 else:
1044 elt = force_element 1049 elt = force_element
1045 d = utils.as_deferred( 1050 result = await utils.as_deferred(
1046 method, client, action, session, content_name, elt 1051 method, client, action, session, content_name, elt
1047 ) 1052 )
1048 defers_list.append(d) 1053 results.append(result)
1049 1054
1050 return defers_list 1055 return results
1051 1056
1052 async def on_session_initiate( 1057 async def on_session_initiate(
1053 self, 1058 self,
1054 client: SatXMPPEntity, 1059 client: SatXMPPEntity,
1055 request: domish.Element, 1060 request: domish.Element,
1095 "XEP-0166_on_session_initiate", 1100 "XEP-0166_on_session_initiate",
1096 client, session, request, jingle_elt 1101 client, session, request, jingle_elt
1097 ): 1102 ):
1098 return 1103 return
1099 1104
1100 await defer.DeferredList(self._call_plugins( 1105 await self._call_plugins(
1101 client, 1106 client,
1102 XEP_0166.A_PREPARE_CONFIRMATION, 1107 XEP_0166.A_PREPARE_CONFIRMATION,
1103 session, 1108 session,
1104 delete=False 1109 delete=False
1105 )) 1110 )
1106 1111
1107 # we now request each application plugin confirmation 1112 # we now request each application plugin confirmation
1108 # and if all are accepted, we can accept the session 1113 # and if all are accepted, we can accept the session
1109 confirm_defers = self._call_plugins( 1114 try:
1110 client, 1115 confirmations = await self._call_plugins(
1111 XEP_0166.A_SESSION_INITIATE,
1112 session,
1113 "jingle_request_confirmation",
1114 None,
1115 self.jingle_request_confirmation_default,
1116 delete=False,
1117 )
1118
1119 confirm_dlist = defer.gatherResults(confirm_defers)
1120 confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client)
1121 confirm_dlist.addErrback(self._jingle_error_cb, session, request, client)
1122
1123 def _confirmation_cb(self, confirm_results, session, jingle_elt, client):
1124 """Method called when confirmation from user has been received
1125
1126 This method is only called for the responder
1127 @param confirm_results(list[bool]): all True if session is accepted
1128 @param session(dict): session data
1129 @param jingle_elt(domish.Element): jingle data of this session
1130 @param client: %(doc_client)s
1131 """
1132 del session["jingle_elt"]
1133 confirmed = all(confirm_results)
1134 if not confirmed:
1135 return self.terminate(client, XEP_0166.REASON_DECLINE, session)
1136
1137 iq_elt, jingle_elt = self._build_jingle_elt(
1138 client, session, XEP_0166.A_SESSION_ACCEPT
1139 )
1140 jingle_elt["responder"] = session['local_jid'].full()
1141 session["jingle_elt"] = jingle_elt
1142
1143 # contents
1144
1145 def addElement(domish_elt, content_elt):
1146 content_elt.addChild(domish_elt)
1147
1148 defers_list = []
1149
1150 for content_name, content_data in session["contents"].items():
1151 content_elt = jingle_elt.addElement("content")
1152 content_elt["creator"] = XEP_0166.ROLE_INITIATOR
1153 content_elt["name"] = content_name
1154
1155 application = content_data["application"]
1156 app_session_accept_cb = application.handler.jingle_handler
1157
1158 app_d = utils.as_deferred(
1159 app_session_accept_cb,
1160 client, 1116 client,
1161 XEP_0166.A_SESSION_INITIATE, 1117 XEP_0166.A_SESSION_INITIATE,
1162 session, 1118 session,
1163 content_name, 1119 "jingle_request_confirmation",
1164 content_data.pop("desc_elt"), 1120 None,
1165 ) 1121 self.jingle_request_confirmation_default,
1166 app_d.addCallback(addElement, content_elt) 1122 delete=False,
1167 defers_list.append(app_d) 1123 )
1168 1124 except Exception as e:
1169 transport = content_data["transport"] 1125 await self._jingle_error_cb(e, session, jingle_elt, client)
1170 transport_session_accept_cb = transport.handler.jingle_handler 1126 else:
1171 1127 await self._confirmation_cb(confirmations, session, jingle_elt, client)
1172 transport_d = utils.as_deferred( 1128
1173 transport_session_accept_cb, 1129 async def _confirmation_cb(
1174 client, 1130 self,
1175 XEP_0166.A_SESSION_INITIATE, 1131 confirmations: list[bool],
1176 session, 1132 session: dict,
1177 content_name, 1133 jingle_elt: domish.Element,
1178 content_data.pop("transport_elt"), 1134 client: SatXMPPEntity
1179 ) 1135 ) -> None:
1180 transport_d.addCallback(addElement, content_elt) 1136 """Method called when confirmation from user has been received
1181 defers_list.append(transport_d) 1137
1182 1138 This method is only called for the responder
1183 d_list = defer.DeferredList(defers_list) 1139 @param confirm_results: all True if session is accepted
1184 d_list.addCallback( 1140 @param session: session data
1185 lambda __: self._call_plugins( 1141 @param jingle_elt: jingle data of this session
1142 @param client: SatXMPPEntity
1143 """
1144 del session["jingle_elt"]
1145 confirmed = all(confirmations)
1146 if not confirmed:
1147 await self.terminate(client, XEP_0166.REASON_DECLINE, session)
1148 return
1149
1150 iq_elt, jingle_elt = self._build_jingle_elt(
1151 client, session, XEP_0166.A_SESSION_ACCEPT
1152 )
1153 jingle_elt["responder"] = session['local_jid'].full()
1154 session["jingle_elt"] = jingle_elt
1155
1156 # contents
1157
1158 try:
1159 for content_name, content_data in session["contents"].items():
1160 content_elt = jingle_elt.addElement("content")
1161 content_elt["creator"] = XEP_0166.ROLE_INITIATOR
1162 content_elt["name"] = content_name
1163
1164 application = content_data["application"]
1165 app_session_accept_cb = application.handler.jingle_handler
1166
1167 updated_desc_elt = await utils.as_deferred(
1168 app_session_accept_cb,
1169 client,
1170 XEP_0166.A_SESSION_INITIATE,
1171 session,
1172 content_name,
1173 content_data.pop("desc_elt"),
1174 )
1175 content_elt.addChild(updated_desc_elt)
1176
1177 transport = content_data["transport"]
1178 transport_session_accept_cb = transport.handler.jingle_handler
1179
1180 updated_transport_elt = await utils.as_deferred(
1181 transport_session_accept_cb,
1182 client,
1183 XEP_0166.A_SESSION_INITIATE,
1184 session,
1185 content_name,
1186 content_data.pop("transport_elt"),
1187 )
1188 content_elt.addChild(updated_transport_elt)
1189
1190 await self._call_plugins(
1186 client, 1191 client,
1187 XEP_0166.A_PREPARE_RESPONDER, 1192 XEP_0166.A_PREPARE_RESPONDER,
1188 session, 1193 session,
1189 app_method_name=None, 1194 app_method_name=None,
1190 elements=False, 1195 elements=False,
1191 ) 1196 )
1192 ) 1197 session.pop("jingle_elt")
1193 d_list.addCallback(lambda __: session.pop("jingle_elt")) 1198 await iq_elt.send()
1194 d_list.addCallback(lambda __: iq_elt.send()) 1199
1195
1196 def change_state(__, session):
1197 session["state"] = XEP_0166.STATE_ACTIVE 1200 session["state"] = XEP_0166.STATE_ACTIVE
1198 1201
1199 d_list.addCallback(change_state, session) 1202 await self._call_plugins(
1200 d_list.addCallback(
1201 lambda __: self._call_plugins(
1202 client, XEP_0166.A_ACCEPTED_ACK, session, elements=False 1203 client, XEP_0166.A_ACCEPTED_ACK, session, elements=False
1203 ) 1204 )
1204 ) 1205 except Exception:
1205 d_list.addErrback(self._iq_error, session["id"], client) 1206 log.exception("Error while sending jingle <iq/> stanza")
1206 return d_list 1207 self.delete_session(client, session["id"])
1207 1208
1208 def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element: 1209 def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element:
1209 """Find a <reason> element in parent_elt 1210 """Find a <reason> element in parent_elt
1210 1211
1211 if none is found, add an empty one to the element 1212 if none is found, add an empty one to the element
1245 ) -> None: 1246 ) -> None:
1246 # TODO: check reason, display a message to user if needed 1247 # TODO: check reason, display a message to user if needed
1247 log.debug(f"Jingle Session {session['id']} terminated") 1248 log.debug(f"Jingle Session {session['id']} terminated")
1248 reason_elt = self.get_reason_elt(jingle_elt) 1249 reason_elt = self.get_reason_elt(jingle_elt)
1249 1250
1250 terminate_defers = self._call_plugins( 1251 await self._call_plugins(
1251 client, 1252 client,
1252 XEP_0166.A_SESSION_TERMINATE, 1253 XEP_0166.A_SESSION_TERMINATE,
1253 session, 1254 session,
1254 "jingle_terminate", 1255 "jingle_terminate",
1255 "jingle_terminate", 1256 "jingle_terminate",
1256 self._ignore, 1257 self._ignore,
1257 self._ignore, 1258 self._ignore,
1258 elements=False, 1259 elements=False,
1259 force_element=reason_elt, 1260 force_element=reason_elt,
1260 ) 1261 )
1261 terminate_dlist = defer.DeferredList(terminate_defers) 1262
1262 1263 self.delete_session(client, session["id"])
1263 terminate_dlist.addCallback(lambda __: self.delete_session(client, session["id"])) 1264 await client.a_send(xmlstream.toResponse(request, "result"))
1264 client.send(xmlstream.toResponse(request, "result"))
1265 1265
1266 async def on_session_accept(self, client, request, jingle_elt, session): 1266 async def on_session_accept(self, client, request, jingle_elt, session):
1267 """Method called once session is accepted 1267 """Method called once session is accepted
1268 1268
1269 This method is only called for initiator 1269 This method is only called for initiator
1283 client.send(xmlstream.toResponse(request, "result")) 1283 client.send(xmlstream.toResponse(request, "result"))
1284 # and change the state 1284 # and change the state
1285 session["state"] = XEP_0166.STATE_ACTIVE 1285 session["state"] = XEP_0166.STATE_ACTIVE
1286 session["jingle_elt"] = jingle_elt 1286 session["jingle_elt"] = jingle_elt
1287 1287
1288 await defer.DeferredList(self._call_plugins( 1288 await self._call_plugins(
1289 client, 1289 client,
1290 XEP_0166.A_PREPARE_INITIATOR, 1290 XEP_0166.A_PREPARE_INITIATOR,
1291 session, 1291 session,
1292 delete=False 1292 delete=False
1293 )) 1293 )
1294 1294
1295 negociate_defers = [] 1295 await self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session)
1296 negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session)
1297
1298 negociate_dlist = defer.gatherResults(negociate_defers)
1299 1296
1300 # after negociations we start the transfer 1297 # after negociations we start the transfer
1301 negociate_dlist.addCallback( 1298 await self._call_plugins(
1302 lambda __: self._call_plugins( 1299 client, XEP_0166.A_START, session, app_method_name=None, elements=False
1303 client, XEP_0166.A_START, session, app_method_name=None, elements=False 1300 )
1304 ) 1301 session.pop("jingle_elt")
1305 ) 1302
1306 negociate_dlist.addCallback(lambda __: session.pop("jingle_elt")) 1303 async def on_session_info(self, client, request, jingle_elt, session):
1307
1308 def _on_session_cb(self, result, client, request, jingle_elt, session):
1309 client.send(xmlstream.toResponse(request, "result"))
1310
1311 def _on_session_eb(self, failure_, client, request, jingle_elt, session):
1312 log.error("Error while handling on_session_info: {}".format(failure_.value))
1313 # XXX: only error managed so far, maybe some applications/transports need more
1314 self.sendError(
1315 client, "feature-not-implemented", None, request, "unsupported-info"
1316 )
1317
1318 def on_session_info(self, client, request, jingle_elt, session):
1319 """Method called when a session-info action is received from other peer 1304 """Method called when a session-info action is received from other peer
1320 1305
1321 This method is only called for initiator 1306 This method is only called for initiator
1322 @param client: %(doc_client)s 1307 @param client: %(doc_client)s
1323 @param request(domish.Element): full <iq> request 1308 @param request(domish.Element): full <iq> request
1328 # this is a session ping, see XEP-0166 §6.8 1313 # this is a session ping, see XEP-0166 §6.8
1329 client.send(xmlstream.toResponse(request, "result")) 1314 client.send(xmlstream.toResponse(request, "result"))
1330 return 1315 return
1331 1316
1332 try: 1317 try:
1333 # XXX: session-info is most likely only used for application, so we don't call transport plugins 1318 # XXX: session-info is most likely only used for application, so we don't call
1334 # if a future transport use it, this behaviour must be adapted 1319 # transport plugins if a future transport use it, this behaviour must be
1335 defers = self._call_plugins( 1320 # adapted
1321 await self._call_plugins(
1336 client, 1322 client,
1337 XEP_0166.A_SESSION_INFO, 1323 XEP_0166.A_SESSION_INFO,
1338 session, 1324 session,
1339 "jingle_session_info", 1325 "jingle_session_info",
1340 None, 1326 None,
1341 elements=False, 1327 elements=False,
1342 force_element=jingle_elt, 1328 force_element=jingle_elt,
1343 ) 1329 )
1344 except exceptions.NotFound as e: 1330 except exceptions.NotFound as e:
1345 self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) 1331 log.exception("Error while handling on_session_info")
1346 return 1332 # XXX: only error managed so far, maybe some applications/transports need more
1347 1333 self.sendError(
1348 dlist = defer.DeferredList(defers, fireOnOneErrback=True) 1334 client, "feature-not-implemented", None, request, "unsupported-info"
1349 dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) 1335 )
1350 dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) 1336 except Exception:
1337 log.exception("Error while managing session info")
1338 else:
1339 client.send(xmlstream.toResponse(request, "result"))
1351 1340
1352 async def on_transport_replace(self, client, request, jingle_elt, session): 1341 async def on_transport_replace(self, client, request, jingle_elt, session):
1353 """A transport change is requested 1342 """A transport change is requested
1354 1343
1355 The request is parsed, and jingle_handler is called on concerned transport plugin(s) 1344 The request is parsed, and jingle_handler is called on concerned transport plugin(s)
1429 client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None 1418 client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None
1430 ) 1419 )
1431 1420
1432 iq_elt.send() 1421 iq_elt.send()
1433 1422
1434 def on_transport_accept(self, client, request, jingle_elt, session): 1423 async def on_transport_accept(
1424 self,
1425 client: SatXMPPEntity,
1426 request: domish.Element,
1427 jingle_elt: domish.Element,
1428 session: dict
1429 ) -> None:
1435 """Method called once transport replacement is accepted 1430 """Method called once transport replacement is accepted
1436 1431
1437 @param client: %(doc_client)s 1432 @param client: SatXMPPEntity instance
1438 @param request(domish.Element): full <iq> request 1433 @param request: full <iq> request
1439 @param jingle_elt(domish.Element): the <jingle> element 1434 @param jingle_elt: the <jingle> element
1440 @param session(dict): session data 1435 @param session: session data
1441 """ 1436 """
1442 log.debug("new transport has been accepted") 1437 log.debug("new transport has been accepted")
1443 1438
1444 try: 1439 try:
1445 self._parse_elements( 1440 self._parse_elements(
1447 ) 1442 )
1448 except exceptions.CancelError: 1443 except exceptions.CancelError:
1449 return 1444 return
1450 1445
1451 # at this point we can send the <iq/> result to confirm reception of the request 1446 # at this point we can send the <iq/> result to confirm reception of the request
1452 client.send(xmlstream.toResponse(request, "result")) 1447 await client.a_send(xmlstream.toResponse(request, "result"))
1453 1448
1454 negociate_defers = [] 1449 await self._call_plugins(
1455 negociate_defers = self._call_plugins(
1456 client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None 1450 client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None
1457 ) 1451 )
1458 1452
1459 negociate_dlist = defer.DeferredList(negociate_defers)
1460
1461 # after negociations we start the transfer 1453 # after negociations we start the transfer
1462 negociate_dlist.addCallback( 1454 await self._call_plugins(
1463 lambda __: self._call_plugins( 1455 client, XEP_0166.A_START, session, app_method_name=None, elements=False
1464 client, XEP_0166.A_START, session, app_method_name=None, elements=False
1465 )
1466 ) 1456 )
1467 1457
1468 def on_transport_reject(self, client, request, jingle_elt, session): 1458 def on_transport_reject(self, client, request, jingle_elt, session):
1469 """Method called when a transport replacement is refused 1459 """Method called when a transport replacement is refused
1470 1460