comparison src/plugins/plugin_xep_0166.py @ 1630:c25f63215632

plugin XEP-0166: transport replacement: - actions transport-replace, transport-accept and transport-reject are now handled - transportReplace can be used from an other plugin - new A_DESTROY action, used to clean a transport before its replacement
author Goffi <goffi@goffi.org>
date Thu, 19 Nov 2015 18:15:35 +0100
parents d05f9179fe22
children f4e9f2f7fe0f
comparison
equal deleted inserted replaced
1629:a34d7f621944 1630:c25f63215632
69 TRANSPORT_STREAMING='TCP' 69 TRANSPORT_STREAMING='TCP'
70 REASON_SUCCESS='success' 70 REASON_SUCCESS='success'
71 REASON_DECLINE='decline' 71 REASON_DECLINE='decline'
72 REASON_FAILED_APPLICATION='failed-application' 72 REASON_FAILED_APPLICATION='failed-application'
73 REASON_FAILED_TRANSPORT='failed-transport' 73 REASON_FAILED_TRANSPORT='failed-transport'
74 REASON_CONNECTIVITY_ERROR='connectivity-error'
74 A_SESSION_INITIATE = "session-initiate" 75 A_SESSION_INITIATE = "session-initiate"
75 A_SESSION_ACCEPT = "session-accept" 76 A_SESSION_ACCEPT = "session-accept"
76 A_SESSION_TERMINATE = "session-terminate" 77 A_SESSION_TERMINATE = "session-terminate"
77 A_SESSION_INFO = "session-info" 78 A_SESSION_INFO = "session-info"
79 A_TRANSPORT_REPLACE = "transport-replace"
80 A_TRANSPORT_ACCEPT = "transport-accept"
81 A_TRANSPORT_REJECT = "transport-reject"
78 A_TRANSPORT_INFO = "transport-info" 82 A_TRANSPORT_INFO = "transport-info"
79 # non standard actions 83 # non standard actions
80 A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer 84 A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer
81 A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer 85 A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer
82 A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator 86 A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator
83 A_START = "start" # application can start 87 A_START = "start" # application can start
88 A_DESTROY = "destroy" # called when a transport is destroyed (e.g. because it is remplaced). Used to do cleaning operations
84 89
85 def __init__(self, host): 90 def __init__(self, host):
86 log.info(_("plugin Jingle initialization")) 91 log.info(_("plugin Jingle initialization"))
87 self.host = host 92 self.host = host
88 self._applications = {} # key: namespace, value: application data 93 self._applications = {} # key: namespace, value: application data
228 self._type_transports[transport_type].append(transport_data) 233 self._type_transports[transport_type].append(transport_data)
229 self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True) 234 self._type_transports[transport_type].sort(key=lambda transport_data: transport_data.priority, reverse=True)
230 self._transports[namespace] = transport_data 235 self._transports[namespace] = transport_data
231 log.debug(u"new jingle transport registered") 236 log.debug(u"new jingle transport registered")
232 237
238 @defer.inlineCallbacks
239 def transportReplace(self, transport_ns, session, content_name, profile=C.PROF_KEY_NONE):
240 """Replace a transport
241
242 @param transport_ns(unicode): namespace of the new transport to use
243 @param session(dict): jingle session data
244 @param content_name(unicode): name of the content
245 @param profile: %(doc_profile)s
246 """
247 # XXX: for now we replace the transport before receiving confirmation from other peer
248 # this is acceptable because we terminate the session if transport is rejected.
249 # this behavious may change in the future.
250 client = self.host.getClient(profile)
251 content_data= session['contents'][content_name]
252 transport_data = content_data['transport_data']
253 try:
254 transport = self._transports[transport_ns]
255 except KeyError:
256 raise exceptions.InternalError(u"Unkown transport")
257 yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, profile)
258 content_data['transport'] = transport
259 transport_data.clear()
260
261 iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REPLACE)
262 content_elt = jingle_elt.addElement('content')
263 content_elt['name'] = content_name
264 content_elt['creator'] = content_data['creator']
265
266 transport_elt = transport.handler.jingleSessionInit(session, content_name, profile)
267 content_elt.addChild(transport_elt)
268 iq_elt.send()
269
233 def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE): 270 def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE):
234 """Build an element according to requested action 271 """Build an element according to requested action
235 272
236 @param action(unicode): a jingle action (see XEP-0166 §7.2), 273 @param action(unicode): a jingle action (see XEP-0166 §7.2),
237 session-* actions are not managed here 274 session-* actions are not managed here
275 transport-replace is managed in the dedicated [transportReplace] method
238 @param session(dict): jingle session data 276 @param session(dict): jingle session data
239 @param content_name(unicode): name of the content 277 @param content_name(unicode): name of the content
240 @param profile: %(doc_profile)s 278 @param profile: %(doc_profile)s
241 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action 279 @return (tuple[domish.Element, domish.Element]): parent <iq> element, <transport> or <description> element, according to action
242 """ 280 """
429 try: 467 try:
430 session = client.jingle_sessions[sid] 468 session = client.jingle_sessions[sid]
431 except KeyError: 469 except KeyError:
432 if action != XEP_0166.A_SESSION_INITIATE: 470 if action != XEP_0166.A_SESSION_INITIATE:
433 log.warning(u"Received request for an unknown session id: {}".format(sid)) 471 log.warning(u"Received request for an unknown session id: {}".format(sid))
434 self.sendError('not-acceptable', None, request, profile=profile) 472 self.sendError('item-not-found', None, request, 'unknown-session', profile=profile)
435 return 473 return
436 474
437 session = client.jingle_sessions[sid] = {'id': sid, 475 session = client.jingle_sessions[sid] = {'id': sid,
438 'state': STATE_PENDING, 476 'state': STATE_PENDING,
439 'initiator': peer_jid, 477 'initiator': peer_jid,
459 self.onSessionAccept(client, request, jingle_elt, session) 497 self.onSessionAccept(client, request, jingle_elt, session)
460 elif action == XEP_0166.A_SESSION_INFO: 498 elif action == XEP_0166.A_SESSION_INFO:
461 self.onSessionInfo(client, request, jingle_elt, session) 499 self.onSessionInfo(client, request, jingle_elt, session)
462 elif action == XEP_0166.A_TRANSPORT_INFO: 500 elif action == XEP_0166.A_TRANSPORT_INFO:
463 self.onTransportInfo(client, request, jingle_elt, session) 501 self.onTransportInfo(client, request, jingle_elt, session)
502 elif action == XEP_0166.A_TRANSPORT_REPLACE:
503 self.onTransportReplace(client, request, jingle_elt, session)
504 elif action == XEP_0166.A_TRANSPORT_ACCEPT:
505 self.onTransportAccept(client, request, jingle_elt, session)
506 elif action == XEP_0166.A_TRANSPORT_REJECT:
507 self.onTransportReject(client, request, jingle_elt, session)
464 else: 508 else:
465 raise exceptions.InternalError(u"Unknown action {}".format(action)) 509 raise exceptions.InternalError(u"Unknown action {}".format(action))
466 510
467 ## Actions callbacks ## 511 ## Actions callbacks ##
468 512
472 after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt" 516 after the parsing, contents_dict will containt handlers, "desc_elt" and "transport_elt"
473 @param jingle_elt(domish.Element): parent <jingle> element, containing one or more <content> 517 @param jingle_elt(domish.Element): parent <jingle> element, containing one or more <content>
474 @param session(dict): session data 518 @param session(dict): session data
475 @param request(domish.Element): the whole request 519 @param request(domish.Element): the whole request
476 @param client: %(doc_client)s 520 @param client: %(doc_client)s
477 @param new(bool): if new the content is new and must be created, 521 @param new(bool): True if the content is new and must be created,
478 else the content must exists, and session data will be filled 522 else the content must exists, and session data will be filled
479 @param creator(unicode): only used if new is True: creating pear (see § 7.3) 523 @param creator(unicode): only used if new is True: creating pear (see § 7.3)
480 @param with_application(bool): if True, raise an error if there is no <description> element else ignore it 524 @param with_application(bool): if True, raise an error if there is no <description> element else ignore it
481 @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it 525 @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it
482 @raise exceptions.CancelError: the error is treated the calling method can cancel the treatment (i.e. return) 526 @raise exceptions.CancelError: the error is treated and the calling method can cancel the treatment (i.e. return)
483 """ 527 """
484 contents_dict = session['contents'] 528 contents_dict = session['contents']
485 content_elts = jingle_elt.elements(NS_JINGLE, 'content') 529 content_elts = jingle_elt.elements(NS_JINGLE, 'content')
486 530
487 for content_elt in content_elts: 531 for content_elt in content_elts:
555 content_data['transport'] = transport 599 content_data['transport'] = transport
556 content_data['transport_data'] = {} 600 content_data['transport_data'] = {}
557 else: 601 else:
558 # the content exists, we check that we have not a former transport_elt 602 # the content exists, we check that we have not a former transport_elt
559 if 'transport_elt' in content_data: 603 if 'transport_elt' in content_data:
560 raise exceptions.InternalError(u"desc_elt should not exist at this point") 604 raise exceptions.InternalError(u"transport_elt should not exist at this point")
561 605
562 content_data['transport_elt'] = transport_elt 606 content_data['transport_elt'] = transport_elt
563 607
564 def _callPlugins(self, action, session, app_method_name='jingleHandler', transp_method_name='jingleHandler', 608 def _callPlugins(self, action, session, app_method_name='jingleHandler', transp_method_name='jingleHandler',
565 app_default_cb=None, transp_default_cb=None, delete=True, 609 app_default_cb=None, transp_default_cb=None, delete=True,
772 816
773 dlist = defer.DeferredList(defers, fireOnOneErrback=True) 817 dlist = defer.DeferredList(defers, fireOnOneErrback=True)
774 dlist.addCallback(self._onSessionCb, client, request, jingle_elt, session) 818 dlist.addCallback(self._onSessionCb, client, request, jingle_elt, session)
775 dlist.addErrback(self._onSessionCb, client, request, jingle_elt, session) 819 dlist.addErrback(self._onSessionCb, client, request, jingle_elt, session)
776 820
821 @defer.inlineCallbacks
822 def onTransportReplace(self, client, request, jingle_elt, session):
823 """A transport change is requested
824
825 The request is parsed, and jingleHandler is called on concerned transport plugin(s)
826 @param client: %(doc_client)s
827 @param request(domish.Element): full <iq> request
828 @param jingle_elt(domish.Element): the <jingle> element
829 @param session(dict): session data
830 """
831 log.debug(u"Other peer want to replace the transport")
832 try:
833 self._parseElements(jingle_elt, session, request, client, with_application=False)
834 except exceptions.CancelError:
835 defer.returnValue(None)
836
837 client.xmlstream.send(xmlstream.toResponse(request, 'result'))
838
839 content_name = None
840 to_replace = []
841
842 for content_name, content_data in session['contents'].iteritems():
843 try:
844 transport_elt = content_data.pop('transport_elt')
845 except KeyError:
846 continue
847 transport_ns = transport_elt.uri
848 try:
849 transport = self._transports[transport_ns]
850 except KeyError:
851 log.warning(u"Other peer want to replace current transport with an unknown one: {}".format(transport_ns))
852 content_name = None
853 break
854 to_replace.append((content_name, content_data, transport, transport_elt))
855
856 if content_name is None:
857 # wa can't accept the replacement
858 iq_elt, reject_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REJECT)
859 for child in jingle_elt.children:
860 reject_jingle_elt.addChild(child)
861
862 iq_elt.send()
863 defer.returnValue(None)
864
865 # at this point, everything is alright and we can replace the transport(s)
866 # this is similar to an session-accept action, but for transports only
867 iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT)
868 for content_name, content_data, transport, transport_elt in to_replace:
869 # we can now actually replace the transport
870 yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, client.profile)
871 content_data['transport'] = transport
872 content_data['transport_data'].clear()
873 # and build the element
874 content_elt = accept_jingle_elt.addElement('content')
875 content_elt['name'] = content_name
876 content_elt['creator'] = content_data['creator']
877 # we notify the transport and insert its <transport/> in the answer
878 accept_transport_elt = yield transport.handler.jingleHandler(XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt, client.profile)
879 content_elt.addChild(accept_transport_elt)
880 # there is no confirmation needed here, so we can directly prepare it
881 yield transport.handler.jingleHandler(XEP_0166.A_PREPARE_RESPONDER, session, content_name, None, client.profile)
882
883 iq_elt.send()
884
885 def onTransportAccept(self, client, request, jingle_elt, session):
886 """Method called once transport replacement is accepted
887
888 @param client: %(doc_client)s
889 @param request(domish.Element): full <iq> request
890 @param jingle_elt(domish.Element): the <jingle> element
891 @param session(dict): session data
892 """
893 log.debug(u"new transport has been accepted")
894
895 try:
896 self._parseElements(jingle_elt, session, request, client, with_application=False)
897 except exceptions.CancelError:
898 return
899
900 # at this point we can send the <iq/> result to confirm reception of the request
901 client.xmlstream.send(xmlstream.toResponse(request, 'result'))
902
903 negociate_defers = []
904 negociate_defers = self._callPlugins(XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None, profile=client.profile)
905
906 negociate_dlist = defer.DeferredList(negociate_defers)
907
908 # after negociations we start the transfer
909 negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile))
910
911 def onTransportReject(self, client, request, jingle_elt, session):
912 """Method called when a transport replacement is refused
913
914 @param client: %(doc_client)s
915 @param request(domish.Element): full <iq> request
916 @param jingle_elt(domish.Element): the <jingle> element
917 @param session(dict): session data
918 """
919 # XXX: for now, we terminate the session in case of transport-reject
920 # this behaviour may change in the future
921 self.terminate('failed-transport', session, client.profile)
922
777 def onTransportInfo(self, client, request, jingle_elt, session): 923 def onTransportInfo(self, client, request, jingle_elt, session):
778 """Method called when a transport-info action is received from other peer 924 """Method called when a transport-info action is received from other peer
779 925
780 The request is parsed, and jingleHandler is called on concerned transport plugin(s) 926 The request is parsed, and jingleHandler is called on concerned transport plugin(s)
781 @param client: %(doc_client)s 927 @param client: %(doc_client)s