Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0166.py @ 1630:c25f63215632
plugin XEP-0166: transport replacement:
- actions transport-replace, transport-accept and transport-reject are now handled
- transportReplace can be used from an other plugin
- new A_DESTROY action, used to clean a transport before its replacement
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 19 Nov 2015 18:15:35 +0100 |
parents | d05f9179fe22 |
children | f4e9f2f7fe0f |
comparison
equal
deleted
inserted
replaced
1629:a34d7f621944 | 1630:c25f63215632 |
---|---|
69 TRANSPORT_STREAMING='TCP' | 69 TRANSPORT_STREAMING='TCP' |
70 REASON_SUCCESS='success' | 70 REASON_SUCCESS='success' |
71 REASON_DECLINE='decline' | 71 REASON_DECLINE='decline' |
72 REASON_FAILED_APPLICATION='failed-application' | 72 REASON_FAILED_APPLICATION='failed-application' |
73 REASON_FAILED_TRANSPORT='failed-transport' | 73 REASON_FAILED_TRANSPORT='failed-transport' |
74 REASON_CONNECTIVITY_ERROR='connectivity-error' | |
74 A_SESSION_INITIATE = "session-initiate" | 75 A_SESSION_INITIATE = "session-initiate" |
75 A_SESSION_ACCEPT = "session-accept" | 76 A_SESSION_ACCEPT = "session-accept" |
76 A_SESSION_TERMINATE = "session-terminate" | 77 A_SESSION_TERMINATE = "session-terminate" |
77 A_SESSION_INFO = "session-info" | 78 A_SESSION_INFO = "session-info" |
79 A_TRANSPORT_REPLACE = "transport-replace" | |
80 A_TRANSPORT_ACCEPT = "transport-accept" | |
81 A_TRANSPORT_REJECT = "transport-reject" | |
78 A_TRANSPORT_INFO = "transport-info" | 82 A_TRANSPORT_INFO = "transport-info" |
79 # non standard actions | 83 # non standard actions |
80 A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer | 84 A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer |
81 A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer | 85 A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer |
82 A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator | 86 A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator |
83 A_START = "start" # application can start | 87 A_START = "start" # application can start |
88 A_DESTROY = "destroy" # called when a transport is destroyed (e.g. because it is remplaced). Used to do cleaning operations | |
84 | 89 |
85 def __init__(self, host): | 90 def __init__(self, host): |
86 log.info(_("plugin Jingle initialization")) | 91 log.info(_("plugin Jingle initialization")) |
87 self.host = host | 92 self.host = host |
88 self._applications = {} # key: namespace, value: application data | 93 self._applications = {} # key: namespace, value: application data |
228 self._type_transports[transport_type].append(transport_data) | 233 self._type_transports[transport_type].append(transport_data) |
229 self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True) | 234 self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True) |
230 self._transports[namespace] = transport_data | 235 self._transports[namespace] = transport_data |
231 log.debug(u"new jingle transport registered") | 236 log.debug(u"new jingle transport registered") |
232 | 237 |
238 @defer.inlineCallbacks | |
239 def transportReplace(self, transport_ns, session, content_name, profile=C.PROF_KEY_NONE): | |
240 """Replace a transport | |
241 | |
242 @param transport_ns(unicode): namespace of the new transport to use | |
243 @param session(dict): jingle session data | |
244 @param content_name(unicode): name of the content | |
245 @param profile: %(doc_profile)s | |
246 """ | |
247 # XXX: for now we replace the transport before receiving confirmation from other peer | |
248 # this is acceptable because we terminate the session if transport is rejected. | |
249 # this behavious may change in the future. | |
250 client = self.host.getClient(profile) | |
251 content_data= session['contents'][content_name] | |
252 transport_data = content_data['transport_data'] | |
253 try: | |
254 transport = self._transports[transport_ns] | |
255 except KeyError: | |
256 raise exceptions.InternalError(u"Unkown transport") | |
257 yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, profile) | |
258 content_data['transport'] = transport | |
259 transport_data.clear() | |
260 | |
261 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REPLACE) | |
262 content_elt = jingle_elt.addElement('content') | |
263 content_elt['name'] = content_name | |
264 content_elt['creator'] = content_data['creator'] | |
265 | |
266 transport_elt = transport.handler.jingleSessionInit(session, content_name, profile) | |
267 content_elt.addChild(transport_elt) | |
268 iq_elt.send() | |
269 | |
233 def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE): | 270 def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE): |
234 """Build an element according to requested action | 271 """Build an element according to requested action |
235 | 272 |
236 @param action(unicode): a jingle action (see XEP-0166 §7.2), | 273 @param action(unicode): a jingle action (see XEP-0166 §7.2), |
237 session-* actions are not managed here | 274 session-* actions are not managed here |
275 transport-replace is managed in the dedicated [transportReplace] method | |
238 @param session(dict): jingle session data | 276 @param session(dict): jingle session data |
239 @param content_name(unicode): name of the content | 277 @param content_name(unicode): name of the content |
240 @param profile: %(doc_profile)s | 278 @param profile: %(doc_profile)s |
241 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action | 279 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action |
242 """ | 280 """ |
429 try: | 467 try: |
430 session = client.jingle_sessions[sid] | 468 session = client.jingle_sessions[sid] |
431 except KeyError: | 469 except KeyError: |
432 if action != XEP_0166.A_SESSION_INITIATE: | 470 if action != XEP_0166.A_SESSION_INITIATE: |
433 log.warning(u"Received request for an unknown session id: {}".format(sid)) | 471 log.warning(u"Received request for an unknown session id: {}".format(sid)) |
434 self.sendError('not-acceptable', None, request, profile=profile) | 472 self.sendError('item-not-found', None, request, 'unknown-session', profile=profile) |
435 return | 473 return |
436 | 474 |
437 session = client.jingle_sessions[sid] = {'id': sid, | 475 session = client.jingle_sessions[sid] = {'id': sid, |
438 'state': STATE_PENDING, | 476 'state': STATE_PENDING, |
439 'initiator': peer_jid, | 477 'initiator': peer_jid, |
459 self.onSessionAccept(client, request, jingle_elt, session) | 497 self.onSessionAccept(client, request, jingle_elt, session) |
460 elif action == XEP_0166.A_SESSION_INFO: | 498 elif action == XEP_0166.A_SESSION_INFO: |
461 self.onSessionInfo(client, request, jingle_elt, session) | 499 self.onSessionInfo(client, request, jingle_elt, session) |
462 elif action == XEP_0166.A_TRANSPORT_INFO: | 500 elif action == XEP_0166.A_TRANSPORT_INFO: |
463 self.onTransportInfo(client, request, jingle_elt, session) | 501 self.onTransportInfo(client, request, jingle_elt, session) |
502 elif action == XEP_0166.A_TRANSPORT_REPLACE: | |
503 self.onTransportReplace(client, request, jingle_elt, session) | |
504 elif action == XEP_0166.A_TRANSPORT_ACCEPT: | |
505 self.onTransportAccept(client, request, jingle_elt, session) | |
506 elif action == XEP_0166.A_TRANSPORT_REJECT: | |
507 self.onTransportReject(client, request, jingle_elt, session) | |
464 else: | 508 else: |
465 raise exceptions.InternalError(u"Unknown action {}".format(action)) | 509 raise exceptions.InternalError(u"Unknown action {}".format(action)) |
466 | 510 |
467 ## Actions callbacks ## | 511 ## Actions callbacks ## |
468 | 512 |
472 after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" | 516 after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" |
473 @param jingle_elt(domish.Element): parent <jingle> element, containing one or more <content> | 517 @param jingle_elt(domish.Element): parent <jingle> element, containing one or more <content> |
474 @param session(dict): session data | 518 @param session(dict): session data |
475 @param request(domish.Element): the whole request | 519 @param request(domish.Element): the whole request |
476 @param client: %(doc_client)s | 520 @param client: %(doc_client)s |
477 @param new(bool): if new the content is new and must be created, | 521 @param new(bool): True if the content is new and must be created, |
478 else the content must exists, and session data will be filled | 522 else the content must exists, and session data will be filled |
479 @param creator(unicode): only used if new is True: creating pear (see § 7.3) | 523 @param creator(unicode): only used if new is True: creating pear (see § 7.3) |
480 @param with_application(bool): if True, raise an error if there is no <description> element else ignore it | 524 @param with_application(bool): if True, raise an error if there is no <description> element else ignore it |
481 @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it | 525 @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it |
482 @raise exceptions.CancelError: the error is treated the calling method can cancel the treatment (i.e. return) | 526 @raise exceptions.CancelError: the error is treated and the calling method can cancel the treatment (i.e. return) |
483 """ | 527 """ |
484 contents_dict = session['contents'] | 528 contents_dict = session['contents'] |
485 content_elts = jingle_elt.elements(NS_JINGLE, 'content') | 529 content_elts = jingle_elt.elements(NS_JINGLE, 'content') |
486 | 530 |
487 for content_elt in content_elts: | 531 for content_elt in content_elts: |
555 content_data['transport'] = transport | 599 content_data['transport'] = transport |
556 content_data['transport_data'] = {} | 600 content_data['transport_data'] = {} |
557 else: | 601 else: |
558 # the content exists, we check that we have not a former transport_elt | 602 # the content exists, we check that we have not a former transport_elt |
559 if 'transport_elt' in content_data: | 603 if 'transport_elt' in content_data: |
560 raise exceptions.InternalError(u"desc_elt should not exist at this point") | 604 raise exceptions.InternalError(u"transport_elt should not exist at this point") |
561 | 605 |
562 content_data['transport_elt'] = transport_elt | 606 content_data['transport_elt'] = transport_elt |
563 | 607 |
564 def _callPlugins(self, action, session, app_method_name='jingleHandler', transp_method_name='jingleHandler', | 608 def _callPlugins(self, action, session, app_method_name='jingleHandler', transp_method_name='jingleHandler', |
565 app_default_cb=None, transp_default_cb=None, delete=True, | 609 app_default_cb=None, transp_default_cb=None, delete=True, |
772 | 816 |
773 dlist = defer.DeferredList(defers, fireOnOneErrback=True) | 817 dlist = defer.DeferredList(defers, fireOnOneErrback=True) |
774 dlist.addCallback(self._onSessionCb, client, request, jingle_elt, session) | 818 dlist.addCallback(self._onSessionCb, client, request, jingle_elt, session) |
775 dlist.addErrback(self._onSessionCb, client, request, jingle_elt, session) | 819 dlist.addErrback(self._onSessionCb, client, request, jingle_elt, session) |
776 | 820 |
821 @defer.inlineCallbacks | |
822 def onTransportReplace(self, client, request, jingle_elt, session): | |
823 """A transport change is requested | |
824 | |
825 The request is parsed, and jingleHandler is called on concerned transport plugin(s) | |
826 @param client: %(doc_client)s | |
827 @param request(domish.Element): full <iq> request | |
828 @param jingle_elt(domish.Element): the <jingle> element | |
829 @param session(dict): session data | |
830 """ | |
831 log.debug(u"Other peer want to replace the transport") | |
832 try: | |
833 self._parseElements(jingle_elt, session, request, client, with_application=False) | |
834 except exceptions.CancelError: | |
835 defer.returnValue(None) | |
836 | |
837 client.xmlstream.send(xmlstream.toResponse(request, 'result')) | |
838 | |
839 content_name = None | |
840 to_replace = [] | |
841 | |
842 for content_name, content_data in session['contents'].iteritems(): | |
843 try: | |
844 transport_elt = content_data.pop('transport_elt') | |
845 except KeyError: | |
846 continue | |
847 transport_ns = transport_elt.uri | |
848 try: | |
849 transport = self._transports[transport_ns] | |
850 except KeyError: | |
851 log.warning(u"Other peer want to replace current transport with an unknown one: {}".format(transport_ns)) | |
852 content_name = None | |
853 break | |
854 to_replace.append((content_name, content_data, transport, transport_elt)) | |
855 | |
856 if content_name is None: | |
857 # wa can't accept the replacement | |
858 iq_elt, reject_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REJECT) | |
859 for child in jingle_elt.children: | |
860 reject_jingle_elt.addChild(child) | |
861 | |
862 iq_elt.send() | |
863 defer.returnValue(None) | |
864 | |
865 # at this point, everything is alright and we can replace the transport(s) | |
866 # this is similar to an session-accept action, but for transports only | |
867 iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT) | |
868 for content_name, content_data, transport, transport_elt in to_replace: | |
869 # we can now actually replace the transport | |
870 yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, client.profile) | |
871 content_data['transport'] = transport | |
872 content_data['transport_data'].clear() | |
873 # and build the element | |
874 content_elt = accept_jingle_elt.addElement('content') | |
875 content_elt['name'] = content_name | |
876 content_elt['creator'] = content_data['creator'] | |
877 # we notify the transport and insert its <transport/> in the answer | |
878 accept_transport_elt = yield transport.handler.jingleHandler(XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt, client.profile) | |
879 content_elt.addChild(accept_transport_elt) | |
880 # there is no confirmation needed here, so we can directly prepare it | |
881 yield transport.handler.jingleHandler(XEP_0166.A_PREPARE_RESPONDER, session, content_name, None, client.profile) | |
882 | |
883 iq_elt.send() | |
884 | |
885 def onTransportAccept(self, client, request, jingle_elt, session): | |
886 """Method called once transport replacement is accepted | |
887 | |
888 @param client: %(doc_client)s | |
889 @param request(domish.Element): full <iq> request | |
890 @param jingle_elt(domish.Element): the <jingle> element | |
891 @param session(dict): session data | |
892 """ | |
893 log.debug(u"new transport has been accepted") | |
894 | |
895 try: | |
896 self._parseElements(jingle_elt, session, request, client, with_application=False) | |
897 except exceptions.CancelError: | |
898 return | |
899 | |
900 # at this point we can send the <iq/> result to confirm reception of the request | |
901 client.xmlstream.send(xmlstream.toResponse(request, 'result')) | |
902 | |
903 negociate_defers = [] | |
904 negociate_defers = self._callPlugins(XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None, profile=client.profile) | |
905 | |
906 negociate_dlist = defer.DeferredList(negociate_defers) | |
907 | |
908 # after negociations we start the transfer | |
909 negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile)) | |
910 | |
911 def onTransportReject(self, client, request, jingle_elt, session): | |
912 """Method called when a transport replacement is refused | |
913 | |
914 @param client: %(doc_client)s | |
915 @param request(domish.Element): full <iq> request | |
916 @param jingle_elt(domish.Element): the <jingle> element | |
917 @param session(dict): session data | |
918 """ | |
919 # XXX: for now, we terminate the session in case of transport-reject | |
920 # this behaviour may change in the future | |
921 self.terminate('failed-transport', session, client.profile) | |
922 | |
777 def onTransportInfo(self, client, request, jingle_elt, session): | 923 def onTransportInfo(self, client, request, jingle_elt, session): |
778 """Method called when a transport-info action is received from other peer | 924 """Method called when a transport-info action is received from other peer |
779 | 925 |
780 The request is parsed, and jingleHandler is called on concerned transport plugin(s) | 926 The request is parsed, and jingleHandler is called on concerned transport plugin(s) |
781 @param client: %(doc_client)s | 927 @param client: %(doc_client)s |