changeset 1630:c25f63215632

plugin XEP-0166: transport replacement: - actions transport-replace, transport-accept and transport-reject are now handled - transportReplace can be used from an other plugin - new A_DESTROY action, used to clean a transport before its replacement
author Goffi <goffi@goffi.org>
date Thu, 19 Nov 2015 18:15:35 +0100
parents a34d7f621944
children 25906c0dbc63
files src/plugins/plugin_xep_0166.py
diffstat 1 files changed, 150 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0166.py	Thu Nov 19 18:15:35 2015 +0100
+++ b/src/plugins/plugin_xep_0166.py	Thu Nov 19 18:15:35 2015 +0100
@@ -71,16 +71,21 @@
     REASON_DECLINE='decline'
     REASON_FAILED_APPLICATION='failed-application'
     REASON_FAILED_TRANSPORT='failed-transport'
+    REASON_CONNECTIVITY_ERROR='connectivity-error'
     A_SESSION_INITIATE = "session-initiate"
     A_SESSION_ACCEPT = "session-accept"
     A_SESSION_TERMINATE = "session-terminate"
     A_SESSION_INFO = "session-info"
+    A_TRANSPORT_REPLACE = "transport-replace"
+    A_TRANSPORT_ACCEPT = "transport-accept"
+    A_TRANSPORT_REJECT = "transport-reject"
     A_TRANSPORT_INFO = "transport-info"
     # non standard actions
     A_PREPARE_INITIATOR = "prepare-initiator" # initiator must prepare tranfer
     A_PREPARE_RESPONDER = "prepare-responder" # responder must prepare tranfer
     A_ACCEPTED_ACK = "accepted-ack" # session accepted ack has been received from initiator
     A_START = "start" # application can start
+    A_DESTROY = "destroy" # called when a transport is destroyed (e.g. because it is remplaced). Used to do cleaning operations
 
     def __init__(self, host):
         log.info(_("plugin Jingle initialization"))
@@ -230,11 +235,44 @@
         self._transports[namespace] = transport_data
         log.debug(u"new jingle transport registered")
 
+    @defer.inlineCallbacks
+    def transportReplace(self, transport_ns, session, content_name, profile=C.PROF_KEY_NONE):
+        """Replace a transport
+
+        @param transport_ns(unicode): namespace of the new transport to use
+        @param session(dict): jingle session data
+        @param content_name(unicode): name of the content
+        @param profile: %(doc_profile)s
+        """
+        # XXX: for now we replace the transport before receiving confirmation from other peer
+        #      this is acceptable because we terminate the session if transport is rejected.
+        #      this behavious may change in the future.
+        client = self.host.getClient(profile)
+        content_data= session['contents'][content_name]
+        transport_data = content_data['transport_data']
+        try:
+            transport = self._transports[transport_ns]
+        except KeyError:
+            raise exceptions.InternalError(u"Unkown transport")
+        yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, profile)
+        content_data['transport'] = transport
+        transport_data.clear()
+
+        iq_elt, jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REPLACE)
+        content_elt = jingle_elt.addElement('content')
+        content_elt['name'] = content_name
+        content_elt['creator'] = content_data['creator']
+
+        transport_elt = transport.handler.jingleSessionInit(session, content_name, profile)
+        content_elt.addChild(transport_elt)
+        iq_elt.send()
+
     def buildAction(self, action, session, content_name, profile=C.PROF_KEY_NONE):
         """Build an element according to requested action
 
         @param action(unicode): a jingle action (see XEP-0166 §7.2),
             session-* actions are not managed here
+            transport-replace is managed in the dedicated [transportReplace] method
         @param session(dict): jingle session data
         @param content_name(unicode): name of the content
         @param profile: %(doc_profile)s
@@ -431,7 +469,7 @@
         except KeyError:
             if action != XEP_0166.A_SESSION_INITIATE:
                 log.warning(u"Received request for an unknown session id: {}".format(sid))
-                self.sendError('not-acceptable', None, request, profile=profile)
+                self.sendError('item-not-found', None, request, 'unknown-session', profile=profile)
                 return
 
             session = client.jingle_sessions[sid] = {'id': sid,
@@ -461,6 +499,12 @@
             self.onSessionInfo(client, request, jingle_elt, session)
         elif action == XEP_0166.A_TRANSPORT_INFO:
             self.onTransportInfo(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_TRANSPORT_REPLACE:
+            self.onTransportReplace(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_TRANSPORT_ACCEPT:
+            self.onTransportAccept(client, request, jingle_elt, session)
+        elif action == XEP_0166.A_TRANSPORT_REJECT:
+            self.onTransportReject(client, request, jingle_elt, session)
         else:
             raise exceptions.InternalError(u"Unknown action {}".format(action))
 
@@ -474,12 +518,12 @@
         @param session(dict): session data
         @param request(domish.Element): the whole request
         @param client: %(doc_client)s
-        @param new(bool): if new the content is new and must be created,
+        @param new(bool): True if the content is new and must be created,
             else the content must exists, and session data will be filled
         @param creator(unicode): only used if new is True: creating pear (see § 7.3)
         @param with_application(bool): if True, raise an error if there is no <description> element else ignore it
         @param with_transport(bool): if True, raise an error if there is no <transport> element else ignore it
-        @raise exceptions.CancelError: the error is treated the calling method can cancel the treatment (i.e. return)
+        @raise exceptions.CancelError: the error is treated and the calling method can cancel the treatment (i.e. return)
         """
         contents_dict = session['contents']
         content_elts = jingle_elt.elements(NS_JINGLE, 'content')
@@ -557,7 +601,7 @@
                 else:
                     # the content exists, we check that we have not a former transport_elt
                     if 'transport_elt' in content_data:
-                        raise exceptions.InternalError(u"desc_elt should not exist at this point")
+                        raise exceptions.InternalError(u"transport_elt should not exist at this point")
 
                 content_data['transport_elt'] = transport_elt
 
@@ -774,6 +818,108 @@
         dlist.addCallback(self._onSessionCb, client, request, jingle_elt, session)
         dlist.addErrback(self._onSessionCb, client, request, jingle_elt, session)
 
+    @defer.inlineCallbacks
+    def onTransportReplace(self, client, request, jingle_elt, session):
+        """A transport change is requested
+
+        The request is parsed, and jingleHandler is called on concerned transport plugin(s)
+        @param client: %(doc_client)s
+        @param request(domish.Element): full <iq> request
+        @param jingle_elt(domish.Element): the <jingle> element
+        @param session(dict): session data
+        """
+        log.debug(u"Other peer want to replace the transport")
+        try:
+            self._parseElements(jingle_elt, session, request, client, with_application=False)
+        except exceptions.CancelError:
+            defer.returnValue(None)
+
+        client.xmlstream.send(xmlstream.toResponse(request, 'result'))
+
+        content_name = None
+        to_replace = []
+
+        for content_name, content_data in session['contents'].iteritems():
+            try:
+                transport_elt = content_data.pop('transport_elt')
+            except KeyError:
+                continue
+            transport_ns = transport_elt.uri
+            try:
+                transport = self._transports[transport_ns]
+            except KeyError:
+                log.warning(u"Other peer want to replace current transport with an unknown one: {}".format(transport_ns))
+                content_name = None
+                break
+            to_replace.append((content_name, content_data, transport, transport_elt))
+
+        if content_name is None:
+            # wa can't accept the replacement
+            iq_elt, reject_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_REJECT)
+            for child in jingle_elt.children:
+                reject_jingle_elt.addChild(child)
+
+            iq_elt.send()
+            defer.returnValue(None)
+
+        # at this point, everything is alright and we can replace the transport(s)
+        # this is similar to an session-accept action, but for transports only
+        iq_elt, accept_jingle_elt = self._buildJingleElt(client, session, XEP_0166.A_TRANSPORT_ACCEPT)
+        for content_name, content_data, transport, transport_elt in to_replace:
+            # we can now actually replace the transport
+            yield content_data['transport'].handler.jingleHandler(XEP_0166.A_DESTROY, session, content_name, None, client.profile)
+            content_data['transport'] = transport
+            content_data['transport_data'].clear()
+            # and build the element
+            content_elt = accept_jingle_elt.addElement('content')
+            content_elt['name'] = content_name
+            content_elt['creator'] = content_data['creator']
+            # we notify the transport and insert its <transport/> in the answer
+            accept_transport_elt = yield transport.handler.jingleHandler(XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt, client.profile)
+            content_elt.addChild(accept_transport_elt)
+            # there is no confirmation needed here, so we can directly prepare it
+            yield transport.handler.jingleHandler(XEP_0166.A_PREPARE_RESPONDER, session, content_name, None, client.profile)
+
+        iq_elt.send()
+
+    def onTransportAccept(self, client, request, jingle_elt, session):
+        """Method called once transport replacement is accepted
+
+        @param client: %(doc_client)s
+        @param request(domish.Element): full <iq> request
+        @param jingle_elt(domish.Element): the <jingle> element
+        @param session(dict): session data
+        """
+        log.debug(u"new transport has been accepted")
+
+        try:
+            self._parseElements(jingle_elt, session, request, client, with_application=False)
+        except exceptions.CancelError:
+            return
+
+        # at this point we can send the <iq/> result to confirm reception of the request
+        client.xmlstream.send(xmlstream.toResponse(request, 'result'))
+
+        negociate_defers = []
+        negociate_defers = self._callPlugins(XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None, profile=client.profile)
+
+        negociate_dlist = defer.DeferredList(negociate_defers)
+
+        # after negociations we start the transfer
+        negociate_dlist.addCallback(lambda dummy: self._callPlugins(XEP_0166.A_START, session, app_method_name=None, elements=False, profile=client.profile))
+
+    def onTransportReject(self, client, request, jingle_elt, session):
+        """Method called when a transport replacement is refused
+
+        @param client: %(doc_client)s
+        @param request(domish.Element): full <iq> request
+        @param jingle_elt(domish.Element): the <jingle> element
+        @param session(dict): session data
+        """
+        # XXX: for now, we terminate the session in case of transport-reject
+        #      this behaviour may change in the future
+        self.terminate('failed-transport', session, client.profile)
+
     def onTransportInfo(self, client, request, jingle_elt, session):
         """Method called when a transport-info action is received from other peer