# HG changeset patch # User Goffi # Date 1447953335 -3600 # Node ID c25f632156326936d28072e283d2268737e667f7 # Parent a34d7f621944ef9bf9f144ae57b1a7573d84cf2c plugin XEP-0166: transport replacement: - actions transport-replace, transport-accept and transport-reject are now handled - transportReplace can be used from an other plugin - new A_DESTROY action, used to clean a transport before its replacement diff -r a34d7f621944 -r c25f63215632 src/plugins/plugin_xep_0166.py --- a/src/plugins/plugin_xep_0166.py Thu Nov 19 18:15:35 2015 +0100 +++ b/src/plugins/plugin_xep_0166.py Thu Nov 19 18:15:35 2015 +0100 @@ -71,16 +71,21 @@ REASON_DECLINE='decline' REASON_FAILED_APPLICATION='failed-application' REASON_FAILED_TRANSPORT='failed-transport' + REASON_CONNECTIVITY_ERROR='connectivity-error' A_SESSION_INITIATE = "session-initiate" A_SESSION_ACCEPT = "session-accept" A_SESSION_TERMINATE = "session-terminate" A_SESSION_INFO = "session-info" + A_TRANSPORT_REPLACE = "transport-replace" + A_TRANSPORT_ACCEPT = "transport-accept" + A_TRANSPORT_REJECT = "transport-reject" A_TRANSPORT_INFO = "transport-info" # non standard actions A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator A_START = "start" # application can start + A_DESTROY = "destroy" # called when a transport is destroyed (e.g. because it is remplaced). Used to do cleaning operations def __init__(self, host): log.info(_("plugin Jingle initialization")) @@ -230,11 +235,44 @@ self._transports[namespace] = transport_data log.debug(u"new jingle transport registered") + @defer.inlineCallbacks + def transportReplace(self, transport_ns, session, content_name, profile=C.PROF_KEY_NONE): + """Replace a transport + + @param transport_ns(unicode): namespace of the new transport to use + @param session(dict): jingle session data + @param content_name(unicode): name of the content + @param profile: %(doc_profile)s + """ + # XXX: for now we replace the transport before receiving confirmation from other peer + # this is acceptable because we terminate the session if transport is rejected. + # this behavious may change in the future. + client = self.host.getClient(profile) + content_data= session['contents'][content_name] + transport_data = content_data['transport_data'] + try: + transport = self._transports[transport_ns] + except KeyError: + raise exceptions.InternalError(u"Unkown transport") + yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, profile) + content_data['transport'] = transport + transport_data.clear() + + iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REPLACE) + content_elt = jingle_elt.addElement('content') + content_elt['name'] = content_name + content_elt['creator'] = content_data['creator'] + + transport_elt = transport.handler.jingleSessionInit(session, content_name, profile) + content_elt.addChild(transport_elt) + iq_elt.send() + def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE): """Build an element according to requested action @param action(unicode): a jingle action (see XEP-0166 §7.2), session-* actions are not managed here + transport-replace is managed in the dedicated [transportReplace] method @param session(dict): jingle session data @param content_name(unicode): name of the content @param profile: %(doc_profile)s @@ -431,7 +469,7 @@ except KeyError: if action != XEP_0166.A_SESSION_INITIATE: log.warning(u"Received request for an unknown session id: {}".format(sid)) - self.sendError('not-acceptable', None, request, profile=profile) + self.sendError('item-not-found', None, request, 'unknown-session', profile=profile) return session = client.jingle_sessions[sid] = {'id': sid, @@ -461,6 +499,12 @@ self.onSessionInfo(client, request, jingle_elt, session) elif action == XEP_0166.A_TRANSPORT_INFO: self.onTransportInfo(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_REPLACE: + self.onTransportReplace(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_ACCEPT: + self.onTransportAccept(client, request, jingle_elt, session) + elif action == XEP_0166.A_TRANSPORT_REJECT: + self.onTransportReject(client, request, jingle_elt, session) else: raise exceptions.InternalError(u"Unknown action {}".format(action)) @@ -474,12 +518,12 @@ @param session(dict): session data @param request(domish.Element): the whole request @param client: %(doc_client)s - @param new(bool): if new the content is new and must be created, + @param new(bool): True if the content is new and must be created, else the content must exists, and session data will be filled @param creator(unicode): only used if new is True: creating pear (see § 7.3) @param with_application(bool): if True, raise an error if there is no element else ignore it @param with_transport(bool): if True, raise an error if there is no element else ignore it - @raise exceptions.CancelError: the error is treated the calling method can cancel the treatment (i.e. return) + @raise exceptions.CancelError: the error is treated and the calling method can cancel the treatment (i.e. return) """ contents_dict = session['contents'] content_elts = jingle_elt.elements(NS_JINGLE, 'content') @@ -557,7 +601,7 @@ else: # the content exists, we check that we have not a former transport_elt if 'transport_elt' in content_data: - raise exceptions.InternalError(u"desc_elt should not exist at this point") + raise exceptions.InternalError(u"transport_elt should not exist at this point") content_data['transport_elt'] = transport_elt @@ -774,6 +818,108 @@ dlist.addCallback(self._onSessionCb, client, request, jingle_elt, session) dlist.addErrback(self._onSessionCb, client, request, jingle_elt, session) + @defer.inlineCallbacks + def onTransportReplace(self, client, request, jingle_elt, session): + """A transport change is requested + + The request is parsed, and jingleHandler is called on concerned transport plugin(s) + @param client: %(doc_client)s + @param request(domish.Element): full request + @param jingle_elt(domish.Element): the element + @param session(dict): session data + """ + log.debug(u"Other peer want to replace the transport") + try: + self._parseElements(jingle_elt, session, request, client, with_application=False) + except exceptions.CancelError: + defer.returnValue(None) + + client.xmlstream.send(xmlstream.toResponse(request, 'result')) + + content_name = None + to_replace = [] + + for content_name, content_data in session['contents'].iteritems(): + try: + transport_elt = content_data.pop('transport_elt') + except KeyError: + continue + transport_ns = transport_elt.uri + try: + transport = self._transports[transport_ns] + except KeyError: + log.warning(u"Other peer want to replace current transport with an unknown one: {}".format(transport_ns)) + content_name = None + break + to_replace.append((content_name, content_data, transport, transport_elt)) + + if content_name is None: + # wa can't accept the replacement + iq_elt, reject_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REJECT) + for child in jingle_elt.children: + reject_jingle_elt.addChild(child) + + iq_elt.send() + defer.returnValue(None) + + # at this point, everything is alright and we can replace the transport(s) + # this is similar to an session-accept action, but for transports only + iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT) + for content_name, content_data, transport, transport_elt in to_replace: + # we can now actually replace the transport + yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, client.profile) + content_data['transport'] = transport + content_data['transport_data'].clear() + # and build the element + content_elt = accept_jingle_elt.addElement('content') + content_elt['name'] = content_name + content_elt['creator'] = content_data['creator'] + # we notify the transport and insert its in the answer + accept_transport_elt = yield transport.handler.jingleHandler(XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt, client.profile) + content_elt.addChild(accept_transport_elt) + # there is no confirmation needed here, so we can directly prepare it + yield transport.handler.jingleHandler(XEP_0166.A_PREPARE_RESPONDER, session, content_name, None, client.profile) + + iq_elt.send() + + def onTransportAccept(self, client, request, jingle_elt, session): + """Method called once transport replacement is accepted + + @param client: %(doc_client)s + @param request(domish.Element): full request + @param jingle_elt(domish.Element): the element + @param session(dict): session data + """ + log.debug(u"new transport has been accepted") + + try: + self._parseElements(jingle_elt, session, request, client, with_application=False) + except exceptions.CancelError: + return + + # at this point we can send the result to confirm reception of the request + client.xmlstream.send(xmlstream.toResponse(request, 'result')) + + negociate_defers = [] + negociate_defers = self._callPlugins(XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None, profile=client.profile) + + negociate_dlist = defer.DeferredList(negociate_defers) + + # after negociations we start the transfer + negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile)) + + def onTransportReject(self, client, request, jingle_elt, session): + """Method called when a transport replacement is refused + + @param client: %(doc_client)s + @param request(domish.Element): full request + @param jingle_elt(domish.Element): the element + @param session(dict): session data + """ + # XXX: for now, we terminate the session in case of transport-reject + # this behaviour may change in the future + self.terminate('failed-transport', session, client.profile) + def onTransportInfo(self, client, request, jingle_elt, session): """Method called when a transport-info action is received from other peer