Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0166/__init__.py @ 4240:79c8a70e1813
backend, frontend: prepare remote control:
This is a series of changes necessary to prepare the implementation of remote control
feature:
- XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several
applications are working in a same session, to know which one must be handled first.
Will be used to make Remote Control have precedence over Call content.
- XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the
benefit to have methods called in parallels is very low, and it cause a lot of trouble
as we can't predict order. Methods are now called sequentially so workflow can be
predicted.
- XEP-0167: fix `senders` XMPP attribute <=> SDP mapping
- XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so
the same key can be used with other jingle applications.
- XEP-0167, XEP-0343: move some method to XEP-0167
- XEP-0353: use new `priority` feature to call preflight methods of applications according
to it.
- frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism
based on Pydantic models. It is now possible to have has many Data Channel as necessary,
to have them in addition to A/V streams, to specify manually GStreamer sources and
sinks, etc.
- frontend (webrtc): rework of the pipeline to reduce latency.
- frontend: new `portal_desktop` method. Screenshare portal handling has been moved there,
and RemoteDesktop portal has been added.
- frontend (webrtc): fix `extract_ufrag_pwd` method.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:41 +0200 |
parents | e11b13418ba6 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4239:a38559e6d6e2 | 4240:79c8a70e1813 |
---|---|
325 d.addErrback(self._terminate_eb) | 325 d.addErrback(self._terminate_eb) |
326 return d | 326 return d |
327 | 327 |
328 ## errors which doesn't imply a stanza sending ## | 328 ## errors which doesn't imply a stanza sending ## |
329 | 329 |
330 def _iq_error(self, failure_, sid, client): | 330 def _jingle_error_cb( |
331 """Called when we got an <iq/> error | 331 self, |
332 | 332 failure_: failure.Failure|BaseException, |
333 @param failure_(failure.Failure): the exceptions raised | 333 session: dict, |
334 @param sid(unicode): jingle session id | 334 request: domish.Element, |
335 """ | 335 client: SatXMPPEntity |
336 log.warning( | 336 ) -> defer.Deferred: |
337 "Error while sending jingle <iq/> stanza: {failure_}".format( | |
338 failure_=failure_.value | |
339 ) | |
340 ) | |
341 self.delete_session(client, sid) | |
342 | |
343 def _jingle_error_cb(self, failure_, session, request, client): | |
344 """Called when something is going wrong while parsing jingle request | 337 """Called when something is going wrong while parsing jingle request |
345 | 338 |
346 The error condition depend of the exceptions raised: | 339 The error condition depend of the exceptions raised: |
347 exceptions.DataError raise a bad-request condition | 340 exceptions.DataError raise a bad-request condition |
348 @param fail(failure.Failure): the exceptions raised | 341 @param fail: the exceptions raised |
349 @param session(dict): data of the session | 342 @param session: data of the session |
350 @param request(domsih.Element): jingle request | 343 @param request: jingle request |
351 @param client: %(doc_client)s | 344 @param client: SatXMPPEntity instance |
352 """ | 345 """ |
346 if not isinstance(failure_, failure.Failure): | |
347 failure_ = failure.Failure(failure_) | |
353 del session["jingle_elt"] | 348 del session["jingle_elt"] |
354 log.warning(f"Error while processing jingle request [{client.profile}]") | 349 log.warning(f"Error while processing jingle request [{client.profile}]") |
355 if isinstance(failure_.value, defer.FirstError): | 350 if isinstance(failure_.value, defer.FirstError): |
356 failure_ = failure_.value.subFailure.value | 351 failure_ = failure_.value.subFailure.value |
357 if isinstance(failure_, exceptions.DataError): | 352 if isinstance(failure_, exceptions.DataError): |
367 ## methods used by other plugins ## | 362 ## methods used by other plugins ## |
368 | 363 |
369 def register_application( | 364 def register_application( |
370 self, | 365 self, |
371 namespace: str, | 366 namespace: str, |
372 handler: BaseApplicationHandler | 367 handler: BaseApplicationHandler, |
368 priority: int = 0 | |
373 ) -> None: | 369 ) -> None: |
374 """Register an application plugin | 370 """Register an application plugin |
375 | 371 |
376 @param namespace(unicode): application namespace managed by the plugin | 372 @param namespace(unicode): application namespace managed by the plugin |
377 @param handler(object): instance of a class which manage the application. | 373 @param handler(object): instance of a class which manage the application. |
388 client, self, action, session, content_name, transport_elt | 384 client, self, action, session, content_name, transport_elt |
389 ): | 385 ): |
390 called on several action to negociate the application or transport | 386 called on several action to negociate the application or transport |
391 - jingle_terminate: called on session terminate, with reason_elt | 387 - jingle_terminate: called on session terminate, with reason_elt |
392 May be used to clean session | 388 May be used to clean session |
389 @param priority: Priority of the application. It is used when several contents | |
390 from different applications are used to determine in which order methods must | |
391 be called. An example use case is for remote control: when using remote | |
392 control, contents with call application may be used at the same time, but the | |
393 overall session is a remote control one (and so, a remote control confirmation | |
394 must be requested to the user, not a call one). | |
395 If two applications have the same priority, methods are called in the same | |
396 order as the content appears. | |
393 """ | 397 """ |
394 if namespace in self._applications: | 398 if namespace in self._applications: |
395 raise exceptions.ConflictError( | 399 raise exceptions.ConflictError( |
396 f"Trying to register already registered namespace {namespace}" | 400 f"Trying to register already registered namespace {namespace}" |
397 ) | 401 ) |
398 self._applications[namespace] = ApplicationData( | 402 self._applications[namespace] = ApplicationData( |
399 namespace=namespace, handler=handler | 403 namespace=namespace, handler=handler, priority=priority |
400 ) | 404 ) |
401 log.debug("new jingle application registered") | 405 log.debug("new jingle application registered") |
402 | 406 |
403 def register_transport( | 407 def register_transport( |
404 self, | 408 self, |
685 "Encryption is requested, but no encryption has been set" | 689 "Encryption is requested, but no encryption has been set" |
686 ) | 690 ) |
687 | 691 |
688 try: | 692 try: |
689 await iq_elt.send() | 693 await iq_elt.send() |
690 except Exception as e: | 694 except Exception: |
691 failure_ = failure.Failure(e) | 695 log.exception("Error while sending jingle <iq/> stanza") |
692 self._iq_error(failure_, sid, client) | 696 self.delete_session(client, sid) |
693 raise failure_ | 697 raise |
694 return sid | 698 return sid |
695 | 699 |
696 def delayed_content_terminate(self, *args, **kwargs): | 700 def delayed_content_terminate(self, *args, **kwargs): |
697 """Put content_terminate in queue but don't execute immediately | 701 """Put content_terminate in queue but don't execute immediately |
698 | 702 |
821 elif action == XEP_0166.A_SESSION_TERMINATE: | 825 elif action == XEP_0166.A_SESSION_TERMINATE: |
822 await self.on_session_terminate(client, request, jingle_elt, session) | 826 await self.on_session_terminate(client, request, jingle_elt, session) |
823 elif action == XEP_0166.A_SESSION_ACCEPT: | 827 elif action == XEP_0166.A_SESSION_ACCEPT: |
824 await self.on_session_accept(client, request, jingle_elt, session) | 828 await self.on_session_accept(client, request, jingle_elt, session) |
825 elif action == XEP_0166.A_SESSION_INFO: | 829 elif action == XEP_0166.A_SESSION_INFO: |
826 self.on_session_info(client, request, jingle_elt, session) | 830 await self.on_session_info(client, request, jingle_elt, session) |
827 elif action == XEP_0166.A_TRANSPORT_INFO: | 831 elif action == XEP_0166.A_TRANSPORT_INFO: |
828 self.on_transport_info(client, request, jingle_elt, session) | 832 self.on_transport_info(client, request, jingle_elt, session) |
829 elif action == XEP_0166.A_TRANSPORT_REPLACE: | 833 elif action == XEP_0166.A_TRANSPORT_REPLACE: |
830 await self.on_transport_replace(client, request, jingle_elt, session) | 834 await self.on_transport_replace(client, request, jingle_elt, session) |
831 elif action == XEP_0166.A_TRANSPORT_ACCEPT: | 835 elif action == XEP_0166.A_TRANSPORT_ACCEPT: |
832 self.on_transport_accept(client, request, jingle_elt, session) | 836 await self.on_transport_accept(client, request, jingle_elt, session) |
833 elif action == XEP_0166.A_TRANSPORT_REJECT: | 837 elif action == XEP_0166.A_TRANSPORT_REJECT: |
834 self.on_transport_reject(client, request, jingle_elt, session) | 838 self.on_transport_reject(client, request, jingle_elt, session) |
835 else: | 839 else: |
836 raise exceptions.InternalError(f"Unknown action {action}") | 840 raise exceptions.InternalError(f"Unknown action {action}") |
837 | 841 |
981 | 985 |
982 must be used as app_default_cb and/or transp_default_cb | 986 must be used as app_default_cb and/or transp_default_cb |
983 """ | 987 """ |
984 return elt | 988 return elt |
985 | 989 |
986 def _call_plugins( | 990 async def _call_plugins( |
987 self, | 991 self, |
988 client: SatXMPPEntity, | 992 client: SatXMPPEntity, |
989 action: str, | 993 action: str, |
990 session: dict, | 994 session: dict, |
991 app_method_name: Optional[str] = "jingle_handler", | 995 app_method_name: Optional[str] = "jingle_handler", |
993 app_default_cb: Optional[Callable] = None, | 997 app_default_cb: Optional[Callable] = None, |
994 transp_default_cb: Optional[Callable] = None, | 998 transp_default_cb: Optional[Callable] = None, |
995 delete: bool = True, | 999 delete: bool = True, |
996 elements: bool = True, | 1000 elements: bool = True, |
997 force_element: Optional[domish.Element] = None | 1001 force_element: Optional[domish.Element] = None |
998 ) -> List[defer.Deferred]: | 1002 ) -> list[Any]: |
999 """Call application and transport plugin methods for all contents | 1003 """Call application and transport plugin methods for all contents |
1000 | 1004 |
1001 @param action: jingle action name | 1005 @param action: jingle action name |
1002 @param session: jingle session data | 1006 @param session: jingle session data |
1003 @param app_method_name: name of the method to call for applications | 1007 @param app_method_name: name of the method to call for applications |
1004 None to ignore | 1008 None to ignore |
1005 @param transp_method_name: name of the method to call for transports | 1009 @param transp_method_name: name of the method to call for transports |
1006 None to ignore | 1010 None to ignore |
1007 @param app_default_cb: default callback to use if plugin has not app_method_name | 1011 @param app_default_cb: default callback to use if plugin has not app_method_name |
1008 None to raise an exception instead | 1012 None to raise an exception instead |
1009 @param transp_default_cb: default callback to use if plugin has not transp_method_name | 1013 @param transp_default_cb: default callback to use if plugin has not |
1014 transp_method_name | |
1010 None to raise an exception instead | 1015 None to raise an exception instead |
1011 @param delete: if True, remove desc_elt and transport_elt from session | 1016 @param delete: if True, remove desc_elt and transport_elt from session |
1012 ignored if elements is False | 1017 ignored if elements is False |
1013 @param elements: True if elements(desc_elt and tranport_elt) must be managed | 1018 @param elements: True if elements(desc_elt and tranport_elt) must be managed |
1014 must be True if _call_plugins is used in a request, and False if it is used | 1019 must be True if _call_plugins is used in a request, and False if it is used |
1015 after a request (i.e. on <iq> result or error) | 1020 after a request (i.e. on <iq> result or error) |
1016 @param force_element: if elements is False, it is used as element parameter | 1021 @param force_element: if elements is False, it is used as element parameter |
1017 else it is ignored | 1022 else it is ignored |
1018 @return : list of launched Deferred | 1023 @return : list of launched methods results |
1019 @raise exceptions.NotFound: method is not implemented | 1024 @raise exceptions.NotFound: method is not implemented |
1020 """ | 1025 """ |
1021 contents_dict = session["contents"] | 1026 contents_dict = session["contents"] |
1022 defers_list = [] | 1027 results = [] |
1023 for content_name, content_data in contents_dict.items(): | 1028 for content_name, content_data in contents_dict.items(): |
1024 for method_name, handler_key, default_cb, elt_name in ( | 1029 for method_name, handler_key, default_cb, elt_name in ( |
1025 (app_method_name, "application", app_default_cb, "desc_elt"), | 1030 (app_method_name, "application", app_default_cb, "desc_elt"), |
1026 (transp_method_name, "transport", transp_default_cb, "transport_elt"), | 1031 (transp_method_name, "transport", transp_default_cb, "transport_elt"), |
1027 ): | 1032 ): |
1040 method = default_cb | 1045 method = default_cb |
1041 if elements: | 1046 if elements: |
1042 elt = content_data.pop(elt_name) if delete else content_data[elt_name] | 1047 elt = content_data.pop(elt_name) if delete else content_data[elt_name] |
1043 else: | 1048 else: |
1044 elt = force_element | 1049 elt = force_element |
1045 d = utils.as_deferred( | 1050 result = await utils.as_deferred( |
1046 method, client, action, session, content_name, elt | 1051 method, client, action, session, content_name, elt |
1047 ) | 1052 ) |
1048 defers_list.append(d) | 1053 results.append(result) |
1049 | 1054 |
1050 return defers_list | 1055 return results |
1051 | 1056 |
1052 async def on_session_initiate( | 1057 async def on_session_initiate( |
1053 self, | 1058 self, |
1054 client: SatXMPPEntity, | 1059 client: SatXMPPEntity, |
1055 request: domish.Element, | 1060 request: domish.Element, |
1095 "XEP-0166_on_session_initiate", | 1100 "XEP-0166_on_session_initiate", |
1096 client, session, request, jingle_elt | 1101 client, session, request, jingle_elt |
1097 ): | 1102 ): |
1098 return | 1103 return |
1099 | 1104 |
1100 await defer.DeferredList(self._call_plugins( | 1105 await self._call_plugins( |
1101 client, | 1106 client, |
1102 XEP_0166.A_PREPARE_CONFIRMATION, | 1107 XEP_0166.A_PREPARE_CONFIRMATION, |
1103 session, | 1108 session, |
1104 delete=False | 1109 delete=False |
1105 )) | 1110 ) |
1106 | 1111 |
1107 # we now request each application plugin confirmation | 1112 # we now request each application plugin confirmation |
1108 # and if all are accepted, we can accept the session | 1113 # and if all are accepted, we can accept the session |
1109 confirm_defers = self._call_plugins( | 1114 try: |
1110 client, | 1115 confirmations = await self._call_plugins( |
1111 XEP_0166.A_SESSION_INITIATE, | |
1112 session, | |
1113 "jingle_request_confirmation", | |
1114 None, | |
1115 self.jingle_request_confirmation_default, | |
1116 delete=False, | |
1117 ) | |
1118 | |
1119 confirm_dlist = defer.gatherResults(confirm_defers) | |
1120 confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client) | |
1121 confirm_dlist.addErrback(self._jingle_error_cb, session, request, client) | |
1122 | |
1123 def _confirmation_cb(self, confirm_results, session, jingle_elt, client): | |
1124 """Method called when confirmation from user has been received | |
1125 | |
1126 This method is only called for the responder | |
1127 @param confirm_results(list[bool]): all True if session is accepted | |
1128 @param session(dict): session data | |
1129 @param jingle_elt(domish.Element): jingle data of this session | |
1130 @param client: %(doc_client)s | |
1131 """ | |
1132 del session["jingle_elt"] | |
1133 confirmed = all(confirm_results) | |
1134 if not confirmed: | |
1135 return self.terminate(client, XEP_0166.REASON_DECLINE, session) | |
1136 | |
1137 iq_elt, jingle_elt = self._build_jingle_elt( | |
1138 client, session, XEP_0166.A_SESSION_ACCEPT | |
1139 ) | |
1140 jingle_elt["responder"] = session['local_jid'].full() | |
1141 session["jingle_elt"] = jingle_elt | |
1142 | |
1143 # contents | |
1144 | |
1145 def addElement(domish_elt, content_elt): | |
1146 content_elt.addChild(domish_elt) | |
1147 | |
1148 defers_list = [] | |
1149 | |
1150 for content_name, content_data in session["contents"].items(): | |
1151 content_elt = jingle_elt.addElement("content") | |
1152 content_elt["creator"] = XEP_0166.ROLE_INITIATOR | |
1153 content_elt["name"] = content_name | |
1154 | |
1155 application = content_data["application"] | |
1156 app_session_accept_cb = application.handler.jingle_handler | |
1157 | |
1158 app_d = utils.as_deferred( | |
1159 app_session_accept_cb, | |
1160 client, | 1116 client, |
1161 XEP_0166.A_SESSION_INITIATE, | 1117 XEP_0166.A_SESSION_INITIATE, |
1162 session, | 1118 session, |
1163 content_name, | 1119 "jingle_request_confirmation", |
1164 content_data.pop("desc_elt"), | 1120 None, |
1165 ) | 1121 self.jingle_request_confirmation_default, |
1166 app_d.addCallback(addElement, content_elt) | 1122 delete=False, |
1167 defers_list.append(app_d) | 1123 ) |
1168 | 1124 except Exception as e: |
1169 transport = content_data["transport"] | 1125 await self._jingle_error_cb(e, session, jingle_elt, client) |
1170 transport_session_accept_cb = transport.handler.jingle_handler | 1126 else: |
1171 | 1127 await self._confirmation_cb(confirmations, session, jingle_elt, client) |
1172 transport_d = utils.as_deferred( | 1128 |
1173 transport_session_accept_cb, | 1129 async def _confirmation_cb( |
1174 client, | 1130 self, |
1175 XEP_0166.A_SESSION_INITIATE, | 1131 confirmations: list[bool], |
1176 session, | 1132 session: dict, |
1177 content_name, | 1133 jingle_elt: domish.Element, |
1178 content_data.pop("transport_elt"), | 1134 client: SatXMPPEntity |
1179 ) | 1135 ) -> None: |
1180 transport_d.addCallback(addElement, content_elt) | 1136 """Method called when confirmation from user has been received |
1181 defers_list.append(transport_d) | 1137 |
1182 | 1138 This method is only called for the responder |
1183 d_list = defer.DeferredList(defers_list) | 1139 @param confirm_results: all True if session is accepted |
1184 d_list.addCallback( | 1140 @param session: session data |
1185 lambda __: self._call_plugins( | 1141 @param jingle_elt: jingle data of this session |
1142 @param client: SatXMPPEntity | |
1143 """ | |
1144 del session["jingle_elt"] | |
1145 confirmed = all(confirmations) | |
1146 if not confirmed: | |
1147 await self.terminate(client, XEP_0166.REASON_DECLINE, session) | |
1148 return | |
1149 | |
1150 iq_elt, jingle_elt = self._build_jingle_elt( | |
1151 client, session, XEP_0166.A_SESSION_ACCEPT | |
1152 ) | |
1153 jingle_elt["responder"] = session['local_jid'].full() | |
1154 session["jingle_elt"] = jingle_elt | |
1155 | |
1156 # contents | |
1157 | |
1158 try: | |
1159 for content_name, content_data in session["contents"].items(): | |
1160 content_elt = jingle_elt.addElement("content") | |
1161 content_elt["creator"] = XEP_0166.ROLE_INITIATOR | |
1162 content_elt["name"] = content_name | |
1163 | |
1164 application = content_data["application"] | |
1165 app_session_accept_cb = application.handler.jingle_handler | |
1166 | |
1167 updated_desc_elt = await utils.as_deferred( | |
1168 app_session_accept_cb, | |
1169 client, | |
1170 XEP_0166.A_SESSION_INITIATE, | |
1171 session, | |
1172 content_name, | |
1173 content_data.pop("desc_elt"), | |
1174 ) | |
1175 content_elt.addChild(updated_desc_elt) | |
1176 | |
1177 transport = content_data["transport"] | |
1178 transport_session_accept_cb = transport.handler.jingle_handler | |
1179 | |
1180 updated_transport_elt = await utils.as_deferred( | |
1181 transport_session_accept_cb, | |
1182 client, | |
1183 XEP_0166.A_SESSION_INITIATE, | |
1184 session, | |
1185 content_name, | |
1186 content_data.pop("transport_elt"), | |
1187 ) | |
1188 content_elt.addChild(updated_transport_elt) | |
1189 | |
1190 await self._call_plugins( | |
1186 client, | 1191 client, |
1187 XEP_0166.A_PREPARE_RESPONDER, | 1192 XEP_0166.A_PREPARE_RESPONDER, |
1188 session, | 1193 session, |
1189 app_method_name=None, | 1194 app_method_name=None, |
1190 elements=False, | 1195 elements=False, |
1191 ) | 1196 ) |
1192 ) | 1197 session.pop("jingle_elt") |
1193 d_list.addCallback(lambda __: session.pop("jingle_elt")) | 1198 await iq_elt.send() |
1194 d_list.addCallback(lambda __: iq_elt.send()) | 1199 |
1195 | |
1196 def change_state(__, session): | |
1197 session["state"] = XEP_0166.STATE_ACTIVE | 1200 session["state"] = XEP_0166.STATE_ACTIVE |
1198 | 1201 |
1199 d_list.addCallback(change_state, session) | 1202 await self._call_plugins( |
1200 d_list.addCallback( | |
1201 lambda __: self._call_plugins( | |
1202 client, XEP_0166.A_ACCEPTED_ACK, session, elements=False | 1203 client, XEP_0166.A_ACCEPTED_ACK, session, elements=False |
1203 ) | 1204 ) |
1204 ) | 1205 except Exception: |
1205 d_list.addErrback(self._iq_error, session["id"], client) | 1206 log.exception("Error while sending jingle <iq/> stanza") |
1206 return d_list | 1207 self.delete_session(client, session["id"]) |
1207 | 1208 |
1208 def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element: | 1209 def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element: |
1209 """Find a <reason> element in parent_elt | 1210 """Find a <reason> element in parent_elt |
1210 | 1211 |
1211 if none is found, add an empty one to the element | 1212 if none is found, add an empty one to the element |
1245 ) -> None: | 1246 ) -> None: |
1246 # TODO: check reason, display a message to user if needed | 1247 # TODO: check reason, display a message to user if needed |
1247 log.debug(f"Jingle Session {session['id']} terminated") | 1248 log.debug(f"Jingle Session {session['id']} terminated") |
1248 reason_elt = self.get_reason_elt(jingle_elt) | 1249 reason_elt = self.get_reason_elt(jingle_elt) |
1249 | 1250 |
1250 terminate_defers = self._call_plugins( | 1251 await self._call_plugins( |
1251 client, | 1252 client, |
1252 XEP_0166.A_SESSION_TERMINATE, | 1253 XEP_0166.A_SESSION_TERMINATE, |
1253 session, | 1254 session, |
1254 "jingle_terminate", | 1255 "jingle_terminate", |
1255 "jingle_terminate", | 1256 "jingle_terminate", |
1256 self._ignore, | 1257 self._ignore, |
1257 self._ignore, | 1258 self._ignore, |
1258 elements=False, | 1259 elements=False, |
1259 force_element=reason_elt, | 1260 force_element=reason_elt, |
1260 ) | 1261 ) |
1261 terminate_dlist = defer.DeferredList(terminate_defers) | 1262 |
1262 | 1263 self.delete_session(client, session["id"]) |
1263 terminate_dlist.addCallback(lambda __: self.delete_session(client, session["id"])) | 1264 await client.a_send(xmlstream.toResponse(request, "result")) |
1264 client.send(xmlstream.toResponse(request, "result")) | |
1265 | 1265 |
1266 async def on_session_accept(self, client, request, jingle_elt, session): | 1266 async def on_session_accept(self, client, request, jingle_elt, session): |
1267 """Method called once session is accepted | 1267 """Method called once session is accepted |
1268 | 1268 |
1269 This method is only called for initiator | 1269 This method is only called for initiator |
1283 client.send(xmlstream.toResponse(request, "result")) | 1283 client.send(xmlstream.toResponse(request, "result")) |
1284 # and change the state | 1284 # and change the state |
1285 session["state"] = XEP_0166.STATE_ACTIVE | 1285 session["state"] = XEP_0166.STATE_ACTIVE |
1286 session["jingle_elt"] = jingle_elt | 1286 session["jingle_elt"] = jingle_elt |
1287 | 1287 |
1288 await defer.DeferredList(self._call_plugins( | 1288 await self._call_plugins( |
1289 client, | 1289 client, |
1290 XEP_0166.A_PREPARE_INITIATOR, | 1290 XEP_0166.A_PREPARE_INITIATOR, |
1291 session, | 1291 session, |
1292 delete=False | 1292 delete=False |
1293 )) | 1293 ) |
1294 | 1294 |
1295 negociate_defers = [] | 1295 await self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) |
1296 negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session) | |
1297 | |
1298 negociate_dlist = defer.gatherResults(negociate_defers) | |
1299 | 1296 |
1300 # after negociations we start the transfer | 1297 # after negociations we start the transfer |
1301 negociate_dlist.addCallback( | 1298 await self._call_plugins( |
1302 lambda __: self._call_plugins( | 1299 client, XEP_0166.A_START, session, app_method_name=None, elements=False |
1303 client, XEP_0166.A_START, session, app_method_name=None, elements=False | 1300 ) |
1304 ) | 1301 session.pop("jingle_elt") |
1305 ) | 1302 |
1306 negociate_dlist.addCallback(lambda __: session.pop("jingle_elt")) | 1303 async def on_session_info(self, client, request, jingle_elt, session): |
1307 | |
1308 def _on_session_cb(self, result, client, request, jingle_elt, session): | |
1309 client.send(xmlstream.toResponse(request, "result")) | |
1310 | |
1311 def _on_session_eb(self, failure_, client, request, jingle_elt, session): | |
1312 log.error("Error while handling on_session_info: {}".format(failure_.value)) | |
1313 # XXX: only error managed so far, maybe some applications/transports need more | |
1314 self.sendError( | |
1315 client, "feature-not-implemented", None, request, "unsupported-info" | |
1316 ) | |
1317 | |
1318 def on_session_info(self, client, request, jingle_elt, session): | |
1319 """Method called when a session-info action is received from other peer | 1304 """Method called when a session-info action is received from other peer |
1320 | 1305 |
1321 This method is only called for initiator | 1306 This method is only called for initiator |
1322 @param client: %(doc_client)s | 1307 @param client: %(doc_client)s |
1323 @param request(domish.Element): full <iq> request | 1308 @param request(domish.Element): full <iq> request |
1328 # this is a session ping, see XEP-0166 §6.8 | 1313 # this is a session ping, see XEP-0166 §6.8 |
1329 client.send(xmlstream.toResponse(request, "result")) | 1314 client.send(xmlstream.toResponse(request, "result")) |
1330 return | 1315 return |
1331 | 1316 |
1332 try: | 1317 try: |
1333 # XXX: session-info is most likely only used for application, so we don't call transport plugins | 1318 # XXX: session-info is most likely only used for application, so we don't call |
1334 # if a future transport use it, this behaviour must be adapted | 1319 # transport plugins if a future transport use it, this behaviour must be |
1335 defers = self._call_plugins( | 1320 # adapted |
1321 await self._call_plugins( | |
1336 client, | 1322 client, |
1337 XEP_0166.A_SESSION_INFO, | 1323 XEP_0166.A_SESSION_INFO, |
1338 session, | 1324 session, |
1339 "jingle_session_info", | 1325 "jingle_session_info", |
1340 None, | 1326 None, |
1341 elements=False, | 1327 elements=False, |
1342 force_element=jingle_elt, | 1328 force_element=jingle_elt, |
1343 ) | 1329 ) |
1344 except exceptions.NotFound as e: | 1330 except exceptions.NotFound as e: |
1345 self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session) | 1331 log.exception("Error while handling on_session_info") |
1346 return | 1332 # XXX: only error managed so far, maybe some applications/transports need more |
1347 | 1333 self.sendError( |
1348 dlist = defer.DeferredList(defers, fireOnOneErrback=True) | 1334 client, "feature-not-implemented", None, request, "unsupported-info" |
1349 dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session) | 1335 ) |
1350 dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session) | 1336 except Exception: |
1337 log.exception("Error while managing session info") | |
1338 else: | |
1339 client.send(xmlstream.toResponse(request, "result")) | |
1351 | 1340 |
1352 async def on_transport_replace(self, client, request, jingle_elt, session): | 1341 async def on_transport_replace(self, client, request, jingle_elt, session): |
1353 """A transport change is requested | 1342 """A transport change is requested |
1354 | 1343 |
1355 The request is parsed, and jingle_handler is called on concerned transport plugin(s) | 1344 The request is parsed, and jingle_handler is called on concerned transport plugin(s) |
1429 client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None | 1418 client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None |
1430 ) | 1419 ) |
1431 | 1420 |
1432 iq_elt.send() | 1421 iq_elt.send() |
1433 | 1422 |
1434 def on_transport_accept(self, client, request, jingle_elt, session): | 1423 async def on_transport_accept( |
1424 self, | |
1425 client: SatXMPPEntity, | |
1426 request: domish.Element, | |
1427 jingle_elt: domish.Element, | |
1428 session: dict | |
1429 ) -> None: | |
1435 """Method called once transport replacement is accepted | 1430 """Method called once transport replacement is accepted |
1436 | 1431 |
1437 @param client: %(doc_client)s | 1432 @param client: SatXMPPEntity instance |
1438 @param request(domish.Element): full <iq> request | 1433 @param request: full <iq> request |
1439 @param jingle_elt(domish.Element): the <jingle> element | 1434 @param jingle_elt: the <jingle> element |
1440 @param session(dict): session data | 1435 @param session: session data |
1441 """ | 1436 """ |
1442 log.debug("new transport has been accepted") | 1437 log.debug("new transport has been accepted") |
1443 | 1438 |
1444 try: | 1439 try: |
1445 self._parse_elements( | 1440 self._parse_elements( |
1447 ) | 1442 ) |
1448 except exceptions.CancelError: | 1443 except exceptions.CancelError: |
1449 return | 1444 return |
1450 | 1445 |
1451 # at this point we can send the <iq/> result to confirm reception of the request | 1446 # at this point we can send the <iq/> result to confirm reception of the request |
1452 client.send(xmlstream.toResponse(request, "result")) | 1447 await client.a_send(xmlstream.toResponse(request, "result")) |
1453 | 1448 |
1454 negociate_defers = [] | 1449 await self._call_plugins( |
1455 negociate_defers = self._call_plugins( | |
1456 client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None | 1450 client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None |
1457 ) | 1451 ) |
1458 | 1452 |
1459 negociate_dlist = defer.DeferredList(negociate_defers) | |
1460 | |
1461 # after negociations we start the transfer | 1453 # after negociations we start the transfer |
1462 negociate_dlist.addCallback( | 1454 await self._call_plugins( |
1463 lambda __: self._call_plugins( | 1455 client, XEP_0166.A_START, session, app_method_name=None, elements=False |
1464 client, XEP_0166.A_START, session, app_method_name=None, elements=False | |
1465 ) | |
1466 ) | 1456 ) |
1467 | 1457 |
1468 def on_transport_reject(self, client, request, jingle_elt, session): | 1458 def on_transport_reject(self, client, request, jingle_elt, session): |
1469 """Method called when a transport replacement is refused | 1459 """Method called when a transport replacement is refused |
1470 | 1460 |