Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0065.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | be6d91572633 |
children |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0065.py Fri Apr 07 15:18:39 2023 +0200 +++ b/sat/plugins/plugin_xep_0065.py Sat Apr 08 13:54:42 2023 +0200 @@ -186,7 +186,7 @@ self.id = id_ if id_ is not None else str(uuid.uuid4()) if priority_local: self._local_priority = int(priority) - self._priority = self.calculatePriority() + self._priority = self.calculate_priority() else: self._local_priority = 0 self._priority = int(priority) @@ -231,7 +231,7 @@ def __ne__(self, other): return not self.__eq__(other) - def calculatePriority(self): + def calculate_priority(self): """Calculate candidate priority according to XEP-0260 §2.2 @@ -254,7 +254,7 @@ Send activation request as explained in XEP-0065 § 6.3.5 Must only be used with proxy candidates - @param sid(unicode): session id (same as for getSessionHash) + @param sid(unicode): session id (same as for get_session_hash) @param peer_jid(jid.JID): jid of the other peer @return (D(domish.Element)): IQ result (or error) """ @@ -267,15 +267,15 @@ query_elt.addElement("activate", content=peer_jid.full()) return iq_elt.send() - def startTransfer(self, session_hash=None): + def start_transfer(self, session_hash=None): if self.type == XEP_0065.TYPE_PROXY: chunk_size = 4096 # Prosody's proxy reject bigger chunks by default else: chunk_size = None - self.factory.startTransfer(session_hash, chunk_size=chunk_size) + self.factory.start_transfer(session_hash, chunk_size=chunk_size) -def getSessionHash(requester_jid, target_jid, sid): +def get_session_hash(requester_jid, target_jid, sid): """Calculate SHA1 Hash according to XEP-0065 §5.3.2 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) @@ -334,12 +334,12 @@ else: return self.factory.getSession() - def _startNegotiation(self): + def _start_negotiation(self): log.debug("starting negotiation (client mode)") self.state = STATE_CLIENT_AUTH self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON)) - def _parseNegotiation(self): + def _parse_negotiation(self): try: # Parse out data ver, nmethod = struct.unpack("!BB", self.buf[:2]) @@ -373,7 +373,7 @@ except struct.error: pass - def _parseUserPass(self): + def _parse_user_pass(self): try: # Parse out data ver, ulen = struct.unpack("BB", self.buf[:2]) @@ -383,7 +383,7 @@ # Trim off fron of the buffer self.buf = self.buf[3 + ulen + plen :] # Fire event to authenticate user - if self.authenticateUserPass(uname, password): + if self.authenticate_user_pass(uname, password): # Signal success self.state = STATE_REQUEST self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00)) @@ -394,7 +394,7 @@ except struct.error: pass - def sendErrorReply(self, errorcode): + def send_error_reply(self, errorcode): # Any other address types are not supported result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0) self.transport.write(result) @@ -407,7 +407,7 @@ # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: - self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) return # Deal with addresses @@ -420,29 +420,29 @@ self.buf = self.buf[7 + len(addr) :] else: # Any other address types are not supported - self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) return # Ensure command is supported if cmd not in self.enabledCommands: # Send a not supported error - self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) + self.send_error_reply(REPLY_CMD_NOT_SUPPORTED) return # Process the command if cmd == CMD_CONNECT: - self.connectRequested(addr, port) + self.connect_requested(addr, port) elif cmd == CMD_BIND: - self.bindRequested(addr, port) + self.bind_requested(addr, port) else: # Any other command is not supported - self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) + self.send_error_reply(REPLY_CMD_NOT_SUPPORTED) except struct.error: # The buffer is probably not complete, we need to wait more return None - def _makeRequest(self): + def _make_request(self): hash_ = self._session_hash.encode('utf-8') request = struct.pack( "!5B%dsH" % len(hash_), @@ -457,12 +457,12 @@ self.transport.write(request) self.state = STATE_CLIENT_REQUEST - def _parseRequestReply(self): + def _parse_request_reply(self): try: ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) # Ensure we actually support the requested address type if self.addressType not in self.supportedAddrs: - self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) return # Deal with addresses @@ -475,7 +475,7 @@ self.buf = self.buf[7 + len(addr) :] else: # Any other address types are not supported - self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) + self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) return # Ensure reply is OK @@ -497,22 +497,22 @@ ) ) if self.state == STATE_CLIENT_INITIAL: - self._startNegotiation() + self._start_negotiation() - def connectRequested(self, addr, port): + def connect_requested(self, addr, port): # Check that this session is expected - if not self.factory.addToSession(addr.decode('utf-8'), self): + if not self.factory.add_to_session(addr.decode('utf-8'), self): log.warning( "Unexpected connection request received from {host}".format( host=self.transport.getPeer().host ) ) - self.sendErrorReply(REPLY_CONN_REFUSED) + self.send_error_reply(REPLY_CONN_REFUSED) return self._session_hash = addr.decode('utf-8') - self.connectCompleted(addr, 0) + self.connect_completed(addr, 0) - def startTransfer(self, chunk_size): + def start_transfer(self, chunk_size): """Callback called when the result iq is received @param chunk_size(None, int): size of the buffer, or None for default @@ -521,14 +521,14 @@ if chunk_size is not None: self.CHUNK_SIZE = chunk_size log.debug("Starting file transfer") - d = self.stream_object.startStream(self.transport) - d.addCallback(self.streamFinished) + d = self.stream_object.start_stream(self.transport) + d.addCallback(self.stream_finished) - def streamFinished(self, d): + def stream_finished(self, d): log.info(_("File transfer completed, closing connection")) self.transport.loseConnection() - def connectCompleted(self, remotehost, remoteport): + def connect_completed(self, remotehost, remoteport): if self.addressType == ADDR_IPV4: result = struct.pack( "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport @@ -547,10 +547,10 @@ self.transport.write(result) self.state = STATE_READY - def bindRequested(self, addr, port): + def bind_requested(self, addr, port): pass - def authenticateUserPass(self, user, passwd): + def authenticate_user_pass(self, user, passwd): # FIXME: implement authentication and remove the debug printing a password log.debug("User/pass: %s/%s" % (user, passwd)) return True @@ -566,20 +566,20 @@ self.buf = self.buf + buf if self.state == STATE_INITIAL: - self._parseNegotiation() + self._parse_negotiation() if self.state == STATE_AUTH_USERPASS: - self._parseUserPass() + self._parse_user_pass() if self.state == STATE_REQUEST: self._parseRequest() if self.state == STATE_CLIENT_REQUEST: - self._parseRequestReply() + self._parse_request_reply() if self.state == STATE_CLIENT_AUTH: ver, method = struct.unpack("!BB", buf) self.buf = self.buf[2:] if ver != SOCKS5_VER or method != AUTHMECH_ANON: self.transport.loseConnection() else: - self._makeRequest() + self._make_request() def connectionLost(self, reason): log.debug("Socks5 connection lost: {}".format(reason.value)) @@ -591,7 +591,7 @@ except AttributeError: log.debug("no session has been received yet") else: - self.factory.removeFromSession(session_hash, self, reason) + self.factory.remove_from_session(session_hash, self, reason) class Socks5ServerFactory(protocol.ServerFactory): @@ -606,7 +606,7 @@ def getSession(self, session_hash): return self.parent.getSession(None, session_hash) - def startTransfer(self, session_hash, chunk_size=None): + def start_transfer(self, session_hash, chunk_size=None): session = self.getSession(session_hash) try: protocol = session["protocols"][0] @@ -614,9 +614,9 @@ log.error("Can't start file transfer, can't find protocol") else: session[TIMER_KEY].cancel() - protocol.startTransfer(chunk_size) + protocol.start_transfer(chunk_size) - def addToSession(self, session_hash, protocol): + def add_to_session(self, session_hash, protocol): """Check is session_hash is valid, and associate protocol with it the session will be associated to the corresponding candidate @@ -633,7 +633,7 @@ session_data.setdefault("protocols", []).append(protocol) return True - def removeFromSession(self, session_hash, protocol, reason): + def remove_from_session(self, session_hash, protocol, reason): """Remove a protocol from session_data There can be several protocol instances while candidates are tried, they @@ -683,9 +683,9 @@ def getSession(self): return self.session - def startTransfer(self, __=None, chunk_size=None): + def start_transfer(self, __=None, chunk_size=None): self.session[TIMER_KEY].cancel() - self._protocol_instance.startTransfer(chunk_size) + self._protocol_instance.start_transfer(chunk_size) def clientConnectionFailed(self, connector, reason): log.debug("Connection failed") @@ -741,19 +741,19 @@ # parameters # XXX: params are not used for now, but they may be used in the futur to force proxy/IP - # host.memory.updateParams(PARAMS) + # host.memory.update_params(PARAMS) - def getHandler(self, client): + def get_handler(self, client): return XEP_0065_handler(self) - def profileConnected(self, client): + def profile_connected(self, client): client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) client._s5b_sessions = {} - def getSessionHash(self, from_jid, to_jid, sid): - return getSessionHash(from_jid, to_jid, sid) + def get_session_hash(self, from_jid, to_jid, sid): + return get_session_hash(from_jid, to_jid, sid) - def getSocks5ServerFactory(self): + def get_socks_5_server_factory(self): """Return server factory The server is created if it doesn't exists yet @@ -785,11 +785,11 @@ return self._server_factory @defer.inlineCallbacks - def getProxy(self, client, local_jid): + def get_proxy(self, client, local_jid): """Return the proxy available for this profile cache is used between clients using the same server - @param local_jid(jid.JID): same as for [getCandidates] + @param local_jid(jid.JID): same as for [get_candidates] @return ((D)(ProxyInfos, None)): Found proxy infos, or None if not acceptable proxy is found @raise exceptions.NotFound: no Proxy found @@ -807,7 +807,7 @@ pass try: proxy = ( - yield self.host.findServiceEntities(client, "proxy", "bytestreams") + yield self.host.find_service_entities(client, "proxy", "bytestreams") ).pop() except (defer.CancelledError, StopIteration, KeyError): notFound(server) @@ -844,16 +844,16 @@ defer.returnValue(proxy_infos) @defer.inlineCallbacks - def _getNetworkData(self, client): + def _get_network_data(self, client): """Retrieve information about network @param client: %(doc_client)s @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data """ - self.getSocks5ServerFactory() + self.get_socks_5_server_factory() local_port = self._server_factory_port - external_ip = yield self._ip.getExternalIP(client) - local_ips = yield self._ip.getLocalIPs(client) + external_ip = yield self._ip.get_external_ip(client) + local_ips = yield self._ip.get_local_i_ps(client) if external_ip is not None and self._external_port is None: if external_ip != local_ips[0]: @@ -861,7 +861,7 @@ if self._np is None: log.warning("NAT port plugin not available, we can't map port") else: - ext_port = yield self._np.mapPort( + ext_port = yield self._np.map_port( local_port, desc="SaT socks5 stream" ) if ext_port is None: @@ -872,7 +872,7 @@ defer.returnValue((local_port, self._external_port, local_ips, external_ip)) @defer.inlineCallbacks - def getCandidates(self, client, local_jid): + def get_candidates(self, client, local_jid): """Return a list of our stream candidates @param local_jid(jid.JID): jid to use as local jid @@ -881,10 +881,10 @@ client.jid would be file.example.net) @return (D(list[Candidate])): list of candidates, ordered by priority """ - server_factory = yield self.getSocks5ServerFactory() - local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) + server_factory = yield self.get_socks_5_server_factory() + local_port, ext_port, local_ips, external_ip = yield self._get_network_data(client) try: - proxy = yield self.getProxy(client, local_jid) + proxy = yield self.get_proxy(client, local_jid) except exceptions.NotFound: proxy = None @@ -950,7 +950,7 @@ candidates.sort(key=lambda c: c.priority, reverse=True) defer.returnValue(candidates) - def _addConnector(self, connector, candidate): + def _add_connector(self, connector, candidate): """Add connector used to connect to candidate, and return client factory's connection Deferred the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion @@ -961,7 +961,7 @@ candidate.factory.connector = connector return candidate.factory.connection - def connectCandidate( + def connect_candidate( self, client, candidate, session_hash, peer_session_hash=None, delay=None ): """Connect to a candidate @@ -975,7 +975,7 @@ None must be used in 2 cases: - when XEP-0065 is used with XEP-0096 - when a peer connect to a proxy *he proposed himself* - in practice, peer_session_hash is only used by tryCandidates + in practice, peer_session_hash is only used by try_candidates @param delay(None, float): optional delay to wait before connection, in seconds @return (D): Deferred launched when TCP connection + Socks5 connection is done """ @@ -990,10 +990,10 @@ else: d = sat_defer.DelayedDeferred(delay, candidate.host) d.addCallback(reactor.connectTCP, candidate.port, factory) - d.addCallback(self._addConnector, candidate) + d.addCallback(self._add_connector, candidate) return d - def tryCandidates( + def try_candidates( self, client, candidates, @@ -1008,7 +1008,7 @@ delay = CANDIDATE_DELAY * len(defers_list) if candidate.type == XEP_0065.TYPE_PROXY: delay += CANDIDATE_DELAY_PROXY - d = self.connectCandidate( + d = self.connect_candidate( client, candidate, session_hash, peer_session_hash, delay ) if connection_cb is not None: @@ -1023,7 +1023,7 @@ return defers_list - def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None): + def get_best_candidate(self, client, candidates, session_hash, peer_session_hash=None): """Get best candidate (according to priority) which can connect @param candidates(iterable[Candidate]): candidates to test @@ -1035,7 +1035,7 @@ """ defer_candidates = None - def connectionCb(client, candidate): + def connection_cb(client, candidate): log.info("Connection of {} successful".format(str(candidate))) for idx, other_candidate in enumerate(candidates): try: @@ -1045,7 +1045,7 @@ except AttributeError: assert other_candidate is None - def connectionEb(failure, client, candidate): + def connection_eb(failure, client, candidate): if failure.check(defer.CancelledError): log.debug("Connection of {} has been cancelled".format(candidate)) else: @@ -1056,37 +1056,37 @@ ) candidates[candidates.index(candidate)] = None - def allTested(__): + def all_tested(__): log.debug("All candidates have been tested") good_candidates = [c for c in candidates if c] return good_candidates[0] if good_candidates else None - defer_candidates = self.tryCandidates( + defer_candidates = self.try_candidates( client, candidates, session_hash, peer_session_hash, - connectionCb, - connectionEb, + connection_cb, + connection_eb, ) d_list = defer.DeferredList(defer_candidates) - d_list.addCallback(allTested) + d_list.addCallback(all_tested) return d_list - def _timeOut(self, session_hash, client): + def _time_out(self, session_hash, client): """Called when stream was not started quickly enough - @param session_hash(str): hash as returned by getSessionHash + @param session_hash(str): hash as returned by get_session_hash @param client: %(doc_client)s """ log.info("Socks5 Bytestream: TimeOut reached") session = self.getSession(client, session_hash) session[DEFER_KEY].errback(exceptions.TimeOutError()) - def killSession(self, failure_, session_hash, sid, client): + def kill_session(self, failure_, session_hash, sid, client): """Clean the current session - @param session_hash(str): hash as returned by getSessionHash + @param session_hash(str): hash as returned by get_session_hash @param sid(None, unicode): session id or None if self.xep_0065_sid_session was not used @param client: %(doc_client)s @@ -1128,23 +1128,23 @@ return failure_ - def startStream(self, client, stream_object, local_jid, to_jid, sid): + def start_stream(self, client, stream_object, local_jid, to_jid, sid): """Launch the stream workflow @param streamProducer: stream_object to use - @param local_jid(jid.JID): same as for [getCandidates] + @param local_jid(jid.JID): same as for [get_candidates] @param to_jid: JID of the recipient @param sid: Stream session id @param successCb: method to call when stream successfuly finished @param failureCb: method to call when something goes wrong @return (D): Deferred fired when session is finished """ - session_data = self._createSession( + session_data = self._create_session( client, stream_object, local_jid, to_jid, sid, True) session_data[client] = client - def gotCandidates(candidates): + def got_candidates(candidates): session_data["candidates"] = candidates iq_elt = client.IQ() iq_elt["from"] = local_jid.full() @@ -1162,12 +1162,12 @@ d = iq_elt.send() args = [client, session_data, local_jid] - d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) + d.addCallbacks(self._iq_negotiation_cb, self._iq_negotiation_eb, args, None, args) - self.getCandidates(client, local_jid).addCallback(gotCandidates) + self.get_candidates(client, local_jid).addCallback(got_candidates) return session_data[DEFER_KEY] - def _IQNegotiationCb(self, iq_elt, client, session_data, local_jid): + def _iq_negotiation_cb(self, iq_elt, client, session_data, local_jid): """Called when the result of open iq is received @param session_data(dict): data of the session @@ -1197,33 +1197,33 @@ if candidate.type == XEP_0065.TYPE_PROXY: log.info("A Socks5 proxy is used") - d = self.connectCandidate(client, candidate, session_data["hash"]) + d = self.connect_candidate(client, candidate, session_data["hash"]) d.addCallback( lambda __: candidate.activate( client, session_data["id"], session_data["peer_jid"], local_jid ) ) - d.addErrback(self._activationEb) + d.addErrback(self._activation_eb) else: d = defer.succeed(None) - d.addCallback(lambda __: candidate.startTransfer(session_data["hash"])) + d.addCallback(lambda __: candidate.start_transfer(session_data["hash"])) - def _activationEb(self, failure): + def _activation_eb(self, failure): log.warning("Proxy activation error: {}".format(failure.value)) - def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid): + def _iq_negotiation_eb(self, stanza_err, client, session_data, local_jid): log.warning("Socks5 transfer failed: {}".format(stanza_err.value)) # FIXME: must clean session - def createSession(self, *args, **kwargs): - """like [_createSession] but return the session deferred instead of the whole session + def create_session(self, *args, **kwargs): + """like [_create_session] but return the session deferred instead of the whole session session deferred is fired when transfer is finished """ - return self._createSession(*args, **kwargs)[DEFER_KEY] + return self._create_session(*args, **kwargs)[DEFER_KEY] - def _createSession(self, client, stream_object, local_jid, to_jid, sid, + def _create_session(self, client, stream_object, local_jid, to_jid, sid, requester=False): """Called when a bytestream is imminent @@ -1237,16 +1237,16 @@ if sid in client.xep_0065_sid_session: raise exceptions.ConflictError("A session with this id already exists !") if requester: - session_hash = getSessionHash(local_jid, to_jid, sid) - session_data = self._registerHash(client, session_hash, stream_object) + session_hash = get_session_hash(local_jid, to_jid, sid) + session_data = self._register_hash(client, session_hash, stream_object) else: - session_hash = getSessionHash(to_jid, local_jid, sid) + session_hash = get_session_hash(to_jid, local_jid, sid) session_d = defer.Deferred() - session_d.addBoth(self.killSession, session_hash, sid, client) + session_d.addBoth(self.kill_session, session_hash, sid, client) session_data = client._s5b_sessions[session_hash] = { DEFER_KEY: session_d, TIMER_KEY: reactor.callLater( - TIMEOUT, self._timeOut, session_hash, client + TIMEOUT, self._time_out, session_hash, client ), } client.xep_0065_sid_session[sid] = session_data @@ -1283,13 +1283,13 @@ raise e return client._s5b_sessions[session_hash] - def registerHash(self, *args, **kwargs): - """like [_registerHash] but return the session deferred instead of the whole session + def register_hash(self, *args, **kwargs): + """like [_register_hash] but return the session deferred instead of the whole session session deferred is fired when transfer is finished """ - return self._registerHash(*args, **kwargs)[DEFER_KEY] + return self._register_hash(*args, **kwargs)[DEFER_KEY] - def _registerHash(self, client, session_hash, stream_object): + def _register_hash(self, client, session_hash, stream_object): """Create a session_data associated to hash @param session_hash(str): hash of the session @@ -1299,10 +1299,10 @@ """ assert session_hash not in client._s5b_sessions session_d = defer.Deferred() - session_d.addBoth(self.killSession, session_hash, None, client) + session_d.addBoth(self.kill_session, session_hash, None, client) session_data = client._s5b_sessions[session_hash] = { DEFER_KEY: session_d, - TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), + TIMER_KEY: reactor.callLater(TIMEOUT, self._time_out, session_hash, client), } if stream_object is not None: @@ -1313,13 +1313,13 @@ return session_data - def associateStreamObject(self, client, session_hash, stream_object): + def associate_stream_object(self, client, session_hash, stream_object): """Associate a stream object with a session""" session_data = self.getSession(client, session_hash) assert "stream_object" not in session_data session_data["stream_object"] = stream_object - def streamQuery(self, iq_elt, client): + def stream_query(self, iq_elt, client): log.debug("BS stream query") iq_elt.handled = True @@ -1361,10 +1361,10 @@ for candidate in candidates: log.info("Candidate proposed: {}".format(candidate)) - d = self.getBestCandidate(client, candidates, session_data["hash"]) - d.addCallback(self._ackStream, iq_elt, session_data, client) + d = self.get_best_candidate(client, candidates, session_data["hash"]) + d.addCallback(self._ack_stream, iq_elt, session_data, client) - def _ackStream(self, candidate, iq_elt, session_data, client): + def _ack_stream(self, candidate, iq_elt, session_data, client): if candidate is None: log.info("No streamhost candidate worked, we have to end negotiation") return client.sendError(iq_elt, "item-not-found") @@ -1386,7 +1386,7 @@ def connectionInitialized(self): self.xmlstream.addObserver( - BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent + BS_REQUEST, self.plugin_parent.stream_query, client=self.parent ) def getDiscoInfo(self, requestor, target, nodeIdentifier=""):