diff src/plugins/plugin_xep_0065.py @ 2489:e2a7bb875957

plugin pipe/stream, file transfert: refactoring and improvments: this is a big patch as things had to be changed at the same time. - changed methods using profile argument to use client instead - move SatFile in a new tools.stream module, has it should be part of core, not a plugin - new IStreamProducer interface, to handler starting a pull producer - new FileStreamObject which create a stream producer/consumer from a SatFile - plugin pipe is no more using unix named pipe, as it complicate the thing, special care need to be taken to not block, and it's generally not necessary. Instead a socket is now used, so the plugin has been renomed to jingle stream. - bad connection/error should be better handler in jingle stream plugin, and code should not block anymore - jp pipe commands have been updated accordingly fix bug 237
author Goffi <goffi@goffi.org>
date Thu, 08 Feb 2018 00:37:42 +0100
parents 0046283a285d
children 7ad5f2c4e34a
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0065.py	Thu Feb 01 07:24:34 2018 +0100
+++ b/src/plugins/plugin_xep_0065.py	Thu Feb 08 00:37:42 2018 +0100
@@ -66,7 +66,6 @@
 from twisted.words.protocols.jabber import error as jabber_error
 from twisted.words.protocols.jabber import jid
 from twisted.words.protocols.jabber import xmlstream
-from twisted.protocols.basic import FileSender
 from twisted.internet import defer
 from collections import namedtuple
 import struct
@@ -294,7 +293,7 @@
     return hashlib.sha1((sid + requester_jid.full() + target_jid.full()).encode('utf-8')).hexdigest()
 
 
-class SOCKSv5(protocol.Protocol, FileSender):
+class SOCKSv5(protocol.Protocol):
     CHUNK_SIZE = 2**16
 
     def __init__(self, session_hash=None):
@@ -317,15 +316,17 @@
         self.peersock = None
         self.addressType = 0
         self.requestType = 0
-        self._file_obj = None
+        self._stream_object = None
         self.active = False # set to True when protocol is actually used for transfer
                             # used by factories to know when the finished Deferred can be triggered
 
     @property
-    def file_obj(self):
-        if self._file_obj is None:
-            self._file_obj = self.getSession()['file']
-        return self._file_obj
+    def stream_object(self):
+        if self._stream_object is None:
+            self._stream_object = self.getSession()['stream_object']
+            if self.server_mode:
+                self._stream_object.registerProducer(self.transport, True)
+        return self._stream_object
 
     def getSession(self):
         """Return session associated with this candidate
@@ -508,10 +509,10 @@
         if chunk_size is not None:
             self.CHUNK_SIZE = chunk_size
         log.debug(u"Starting file transfer")
-        d = self.beginFileTransfer(self.file_obj, self.transport)
-        d.addCallback(self.fileTransfered)
+        d = self.stream_object.startStream(self.transport)
+        d.addCallback(self.streamFinished)
 
-    def fileTransfered(self, d):
+    def streamFinished(self, d):
         log.info(_("File transfer completed, closing connection"))
         self.transport.loseConnection()
 
@@ -535,7 +536,7 @@
     def dataReceived(self, buf):
         if self.state == STATE_READY:
             # Everything is set, we just have to write the incoming data
-            self.file_obj.write(buf)
+            self.stream_object.write(buf)
             if not self.active:
                 self.active = True
                 self.getSession()[TIMER_KEY].cancel()
@@ -576,7 +577,7 @@
         self.parent = parent
 
     def getSession(self, session_hash):
-        return self.parent.getSession(session_hash, None)
+        return self.parent.getSession(None, session_hash)
 
     def startTransfer(self, session_hash, chunk_size=None):
         session = self.getSession(session_hash)
@@ -630,17 +631,16 @@
 class Socks5ClientFactory(protocol.ClientFactory):
     protocol = SOCKSv5
 
-    def __init__(self, parent, session, session_hash, profile):
+    def __init__(self, client, parent, session, session_hash):
         """Init the Client Factory
 
         @param session(dict): session data
         @param session_hash(unicode): hash used for peer_connection
             hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
-        @param profile(unciode): %(doc_profile)s
         """
         self.session = session
         self.session_hash = session_hash
-        self.profile = profile
+        self.client = client
         self.connection = defer.Deferred()
         self._protocol_instance = None
         self.connector = None
@@ -696,7 +696,7 @@
         self.host = host
 
         # session data
-        self.hash_profiles_map = {}  # key: hash of the transfer session, value: session data
+        self.hash_clients_map = {}  # key: hash of the transfer session, value: session data
         self._cache_proxies = {} # key: server jid, value: proxy data
 
         # misc data
@@ -751,11 +751,10 @@
         return self._server_factory
 
     @defer.inlineCallbacks
-    def getProxy(self, profile):
+    def getProxy(self, client):
         """Return the proxy available for this profile
 
-        cache is used between profiles using the same server
-        @param profile: %(doc_profile)s
+        cache is used between clients using the same server
         @return ((D)(ProxyInfos, None)): Found proxy infos,
             or None if not acceptable proxy is found
         """
@@ -763,7 +762,6 @@
             log.info(u"No proxy found on this server")
             self._cache_proxies[server] = None
             defer.returnValue(None)
-        client = self.host.getClient(profile)
         server = client.jid.host
         try:
             defer.returnValue(self._cache_proxies[server])
@@ -810,8 +808,8 @@
         """
         self.getSocks5ServerFactory()
         local_port = self._server_factory_port
-        external_ip = yield self._ip.getExternalIP(client.profile)
-        local_ips = yield self._ip.getLocalIPs(client.profile)
+        external_ip = yield self._ip.getExternalIP(client)
+        local_ips = yield self._ip.getLocalIPs(client)
 
         if external_ip is not None and self._external_port is None:
             if external_ip != local_ips[0]:
@@ -828,16 +826,14 @@
         defer.returnValue((local_port, self._external_port, local_ips, external_ip))
 
     @defer.inlineCallbacks
-    def getCandidates(self, profile):
+    def getCandidates(self, client):
         """Return a list of our stream candidates
 
-        @param profile: %(doc_profile)s
         @return (D(list[Candidate])): list of candidates, ordered by priority
         """
-        client = self.host.getClient(profile)
         server_factory = yield self.getSocks5ServerFactory()
         local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client)
-        proxy = yield self.getProxy(profile)
+        proxy = yield self.getProxy(client)
 
         # its time to gather the candidates
         candidates = []
@@ -873,7 +869,7 @@
         candidate.factory.connector = connector
         return candidate.factory.connection
 
-    def connectCandidate(self, candidate, session_hash, peer_session_hash=None, delay=None, profile=C.PROF_KEY_NONE):
+    def connectCandidate(self, client, candidate, session_hash, peer_session_hash=None, delay=None):
         """Connect to a candidate
 
         Connection will be done with a Socks5ClientFactory
@@ -887,14 +883,13 @@
                 - when a peer connect to a proxy *he proposed himself*
             in practice, peer_session_hash is only used by tryCandidates
         @param delay(None, float): optional delay to wait before connection, in seconds
-        @param profile: %(doc_profile)s
         @return (D): Deferred launched when TCP connection + Socks5 connection is done
         """
         if peer_session_hash is None:
             # for XEP-0065, only one hash is needed
             peer_session_hash = session_hash
-        session = self.getSession(session_hash, profile)
-        factory = Socks5ClientFactory(self, session, peer_session_hash, profile)
+        session = self.getSession(client, session_hash)
+        factory = Socks5ClientFactory(client, self, session, peer_session_hash)
         candidate.factory = factory
         if delay is None:
             d = defer.succeed(candidate.host)
@@ -904,23 +899,23 @@
         d.addCallback(self._addConnector, candidate)
         return d
 
-    def tryCandidates(self, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None, profile=C.PROF_KEY_NONE):
+    def tryCandidates(self, client, candidates, session_hash, peer_session_hash, connection_cb=None, connection_eb=None):
         defers_list = []
 
         for candidate in candidates:
             delay = CANDIDATE_DELAY * len(defers_list)
             if candidate.type == XEP_0065.TYPE_PROXY:
                 delay += CANDIDATE_DELAY_PROXY
-            d = self.connectCandidate(candidate, session_hash, peer_session_hash, delay, profile)
+            d = self.connectCandidate(client, candidate, session_hash, peer_session_hash, delay)
             if connection_cb is not None:
-                d.addCallback(lambda dummy, candidate=candidate, profile=profile: connection_cb(candidate, profile))
+                d.addCallback(lambda dummy, candidate=candidate, client=client: connection_cb(client, candidate))
             if connection_eb is not None:
-                d.addErrback(connection_eb, candidate, profile)
+                d.addErrback(connection_eb, client, candidate)
             defers_list.append(d)
 
         return defers_list
 
-    def getBestCandidate(self, candidates, session_hash, peer_session_hash=None, profile=C.PROF_KEY_NONE):
+    def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None):
         """Get best candidate (according to priority) which can connect
 
         @param candidates(iterable[Candidate]): candidates to test
@@ -928,12 +923,11 @@
             hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
         @param peer_session_hash(unicode, None): hash of the other peer
             only useful for XEP-0260, must be None for XEP-0065 streamhost candidates
-        @param profile: %(doc_profile)s
         @return (D(None, Candidate)): best candidate or None if none can connect
         """
         defer_candidates = None
 
-        def connectionCb(candidate, profile):
+        def connectionCb(client, candidate):
             log.info(u"Connection of {} successful".format(unicode(candidate)))
             for idx, other_candidate in enumerate(candidates):
                 try:
@@ -943,7 +937,7 @@
                 except AttributeError:
                     assert other_candidate is None
 
-        def connectionEb(failure, candidate, profile):
+        def connectionEb(failure, client, candidate):
             if failure.check(defer.CancelledError):
                 log.debug(u"Connection of {} has been cancelled".format(candidate))
             else:
@@ -957,7 +951,7 @@
             good_candidates = [c for c in candidates if c]
             return good_candidates[0] if good_candidates else None
 
-        defer_candidates = self.tryCandidates(candidates, session_hash, peer_session_hash, connectionCb, connectionEb, profile)
+        defer_candidates = self.tryCandidates(client, candidates, session_hash, peer_session_hash, connectionCb, connectionEb)
         d_list = defer.DeferredList(defer_candidates)
         d_list.addCallback(allTested)
         return d_list
@@ -969,7 +963,7 @@
         @param client: %(doc_client)s
         """
         log.info(u"Socks5 Bytestream: TimeOut reached")
-        session = self.getSession(session_hash, client.profile)
+        session = self.getSession(client, session_hash)
         session[DEFER_KEY].errback(exceptions.TimeOutError)
 
     def killSession(self, reason, session_hash, sid, client):
@@ -989,8 +983,8 @@
             ))
 
         try:
-            assert self.hash_profiles_map[session_hash] == client.profile
-            del self.hash_profiles_map[session_hash]
+            assert self.hash_clients_map[session_hash] == client
+            del self.hash_clients_map[session_hash]
         except KeyError:
             pass
 
@@ -1015,19 +1009,17 @@
 
         return reason
 
-    def startStream(self, file_obj, to_jid, sid, profile=C.PROF_KEY_NONE):
+    def startStream(self, client, stream_object, to_jid, sid):
         """Launch the stream workflow
 
-        @param file_obj: file_obj to send
+        @param streamProducer: stream_object to use
         @param to_jid: JID of the recipient
         @param sid: Stream session id
         @param successCb: method to call when stream successfuly finished
         @param failureCb: method to call when something goes wrong
-        @param profile: %(doc_profile)s
         @return (D): Deferred fired when session is finished
         """
-        client = self.host.getClient(profile)
-        session_data = self._createSession(file_obj, to_jid, sid, True, client.profile)
+        session_data = self._createSession(client, stream_object, to_jid, sid, True)
 
         session_data[client] = client
 
@@ -1051,7 +1043,7 @@
             args = [session_data, client]
             d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args)
 
-        self.getCandidates(profile).addCallback(gotCandidates)
+        self.getCandidates(client).addCallback(gotCandidates)
         return session_data[DEFER_KEY]
 
     def _IQNegotiationCb(self, iq_elt, session_data, client):
@@ -1080,7 +1072,7 @@
 
         if candidate.type == XEP_0065.TYPE_PROXY:
             log.info(u"A Socks5 proxy is used")
-            d = self.connectCandidate(candidate, session_data['hash'], profile=client.profile)
+            d = self.connectCandidate(client, candidate, session_data['hash'])
             d.addCallback(lambda dummy: candidate.activate(session_data['id'], session_data['peer_jid'], client))
             d.addErrback(self._activationEb)
         else:
@@ -1102,22 +1094,20 @@
         """
         return self._createSession(*args, **kwargs)[DEFER_KEY]
 
-    def _createSession(self, file_obj, to_jid, sid, requester=False, profile=C.PROF_KEY_NONE):
+    def _createSession(self, client, stream_object, to_jid, sid, requester=False):
         """Called when a bytestream is imminent
 
-        @param file_obj(file): File object where data will be written
+        @param stream_object(iface.IStreamProducer): File object where data will be written
         @param to_jid(jid.JId): jid of the other peer
         @param sid(unicode): session id
         @param initiator(bool): if True, this session is create by initiator
-        @param profile: %(doc_profile)s
         @return (dict): session data
         """
-        client = self.host.getClient(profile)
         if sid in client.xep_0065_sid_session:
             raise exceptions.ConflictError(u'A session with this id already exists !')
         if requester:
             session_hash = getSessionHash(client.jid, to_jid, sid)
-            session_data = self._registerHash(session_hash, file_obj, profile)
+            session_data = self._registerHash(client, session_hash, stream_object)
         else:
             session_hash = getSessionHash(to_jid, client.jid, sid)
             session_d = defer.Deferred()
@@ -1130,31 +1120,30 @@
         session_data.update(
             {'id': sid,
              'peer_jid': to_jid,
-             'file': file_obj,
+             'stream_object': stream_object,
              'hash': session_hash,
             })
 
         return session_data
 
-    def getSession(self, session_hash, profile):
+    def getSession(self, client, session_hash):
         """Return session data
 
         @param session_hash(unicode): hash of the session
             hash is the same as hostname computed in XEP-0065 § 5.3.2 #1
-        @param profile(None, unicode): profile of the peer
-            None is used only if profile is unknown (this is only the case
+        @param client(None, SatXMPPClient): client of the peer
+            None is used only if client is unknown (this is only the case
             for incoming request received by Socks5ServerFactory). None must
             only be used by Socks5ServerFactory.
             See comments below for details
         @return (dict): session data
         """
-        if profile is None:
+        if client is None:
             try:
-                profile =  self.hash_profiles_map[session_hash]
+                client =  self.hash_clients_map[session_hash]
             except KeyError as e:
                 log.warning(u"The requested session doesn't exists !")
                 raise e
-        client = self.host.getClient(profile)
         return client._s5b_sessions[session_hash]
 
     def registerHash(self, *args, **kwargs):
@@ -1163,16 +1152,14 @@
         """
         return self._registerHash(*args, **kwargs)[DEFER_KEY]
 
-    def _registerHash(self, session_hash, file_obj, profile):
+    def _registerHash(self, client, session_hash, stream_object):
         """Create a session_data associated to hash
 
         @param session_hash(str): hash of the session
-        @param file_obj(file, None): file-like object
+        @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object
             None if it will be filled later
-        @param profile: %(doc_profile)s
         return (dict): session data
         """
-        client = self.host.getClient(profile)
         assert session_hash not in client._s5b_sessions
         session_d = defer.Deferred()
         session_d.addBoth(self.killSession, session_hash, None, client)
@@ -1181,23 +1168,22 @@
             TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client),
             }
 
-        if file_obj is not None:
-            session_data['file'] = file_obj
+        if stream_object is not None:
+            session_data['stream_object'] = stream_object
 
-        assert session_hash not in self.hash_profiles_map
-        self.hash_profiles_map[session_hash] = profile
+        assert session_hash not in self.hash_clients_map
+        self.hash_clients_map[session_hash] = client
 
         return session_data
 
-    def associateFileObj(self, session_hash, file_obj, profile):
-        """Associate a file obj with  a session"""
-        session_data = self.getSession(session_hash, profile)
-        assert 'file' not in session_data
-        session_data['file'] = file_obj
+    def associateStreamObject(self, client, session_hash, stream_object):
+        """Associate a stream object with  a session"""
+        session_data = self.getSession(client, session_hash)
+        assert 'stream_object' not in session_data
+        session_data['stream_object'] = stream_object
 
-    def streamQuery(self, iq_elt, profile):
+    def streamQuery(self, iq_elt, client):
         log.debug(u"BS stream query")
-        client = self.host.getClient(profile)
 
         iq_elt.handled = True
 
@@ -1238,7 +1224,7 @@
         for candidate in candidates:
             log.info(u"Candidate proposed: {}".format(candidate))
 
-        d = self.getBestCandidate(candidates, session_data['hash'], profile=profile)
+        d = self.getBestCandidate(client, candidates, session_data['hash'])
         d.addCallback(self._ackStream, iq_elt, session_data, client)
 
     def _ackStream(self, candidate, iq_elt, session_data, client):
@@ -1262,7 +1248,7 @@
         self.host = plugin_parent.host
 
     def connectionInitialized(self):
-        self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile=self.parent.profile)
+        self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent)
 
     def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
         return [disco.DiscoFeature(NS_BS)]