Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0065.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | be6d91572633 |
children |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
184 assert isinstance(jid_, jid.JID) | 184 assert isinstance(jid_, jid.JID) |
185 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_) | 185 self.host, self.port, self.type, self.jid = (host, int(port), type_, jid_) |
186 self.id = id_ if id_ is not None else str(uuid.uuid4()) | 186 self.id = id_ if id_ is not None else str(uuid.uuid4()) |
187 if priority_local: | 187 if priority_local: |
188 self._local_priority = int(priority) | 188 self._local_priority = int(priority) |
189 self._priority = self.calculatePriority() | 189 self._priority = self.calculate_priority() |
190 else: | 190 else: |
191 self._local_priority = 0 | 191 self._local_priority = 0 |
192 self._priority = int(priority) | 192 self._priority = int(priority) |
193 self.factory = factory | 193 self.factory = factory |
194 | 194 |
229 return False | 229 return False |
230 | 230 |
231 def __ne__(self, other): | 231 def __ne__(self, other): |
232 return not self.__eq__(other) | 232 return not self.__eq__(other) |
233 | 233 |
234 def calculatePriority(self): | 234 def calculate_priority(self): |
235 """Calculate candidate priority according to XEP-0260 §2.2 | 235 """Calculate candidate priority according to XEP-0260 §2.2 |
236 | 236 |
237 | 237 |
238 @return (int): priority | 238 @return (int): priority |
239 """ | 239 """ |
252 def activate(self, client, sid, peer_jid, local_jid): | 252 def activate(self, client, sid, peer_jid, local_jid): |
253 """Activate the proxy candidate | 253 """Activate the proxy candidate |
254 | 254 |
255 Send activation request as explained in XEP-0065 § 6.3.5 | 255 Send activation request as explained in XEP-0065 § 6.3.5 |
256 Must only be used with proxy candidates | 256 Must only be used with proxy candidates |
257 @param sid(unicode): session id (same as for getSessionHash) | 257 @param sid(unicode): session id (same as for get_session_hash) |
258 @param peer_jid(jid.JID): jid of the other peer | 258 @param peer_jid(jid.JID): jid of the other peer |
259 @return (D(domish.Element)): IQ result (or error) | 259 @return (D(domish.Element)): IQ result (or error) |
260 """ | 260 """ |
261 assert self.type == XEP_0065.TYPE_PROXY | 261 assert self.type == XEP_0065.TYPE_PROXY |
262 iq_elt = client.IQ() | 262 iq_elt = client.IQ() |
265 query_elt = iq_elt.addElement((NS_BS, "query")) | 265 query_elt = iq_elt.addElement((NS_BS, "query")) |
266 query_elt["sid"] = sid | 266 query_elt["sid"] = sid |
267 query_elt.addElement("activate", content=peer_jid.full()) | 267 query_elt.addElement("activate", content=peer_jid.full()) |
268 return iq_elt.send() | 268 return iq_elt.send() |
269 | 269 |
270 def startTransfer(self, session_hash=None): | 270 def start_transfer(self, session_hash=None): |
271 if self.type == XEP_0065.TYPE_PROXY: | 271 if self.type == XEP_0065.TYPE_PROXY: |
272 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default | 272 chunk_size = 4096 # Prosody's proxy reject bigger chunks by default |
273 else: | 273 else: |
274 chunk_size = None | 274 chunk_size = None |
275 self.factory.startTransfer(session_hash, chunk_size=chunk_size) | 275 self.factory.start_transfer(session_hash, chunk_size=chunk_size) |
276 | 276 |
277 | 277 |
278 def getSessionHash(requester_jid, target_jid, sid): | 278 def get_session_hash(requester_jid, target_jid, sid): |
279 """Calculate SHA1 Hash according to XEP-0065 §5.3.2 | 279 """Calculate SHA1 Hash according to XEP-0065 §5.3.2 |
280 | 280 |
281 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) | 281 @param requester_jid(jid.JID): jid of the requester (the one which activate the proxy) |
282 @param target_jid(jid.JID): jid of the target | 282 @param target_jid(jid.JID): jid of the target |
283 @param sid(unicode): session id | 283 @param sid(unicode): session id |
332 if self.server_mode: | 332 if self.server_mode: |
333 return self.factory.getSession(self._session_hash) | 333 return self.factory.getSession(self._session_hash) |
334 else: | 334 else: |
335 return self.factory.getSession() | 335 return self.factory.getSession() |
336 | 336 |
337 def _startNegotiation(self): | 337 def _start_negotiation(self): |
338 log.debug("starting negotiation (client mode)") | 338 log.debug("starting negotiation (client mode)") |
339 self.state = STATE_CLIENT_AUTH | 339 self.state = STATE_CLIENT_AUTH |
340 self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON)) | 340 self.transport.write(struct.pack("!3B", SOCKS5_VER, 1, AUTHMECH_ANON)) |
341 | 341 |
342 def _parseNegotiation(self): | 342 def _parse_negotiation(self): |
343 try: | 343 try: |
344 # Parse out data | 344 # Parse out data |
345 ver, nmethod = struct.unpack("!BB", self.buf[:2]) | 345 ver, nmethod = struct.unpack("!BB", self.buf[:2]) |
346 methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2]) | 346 methods = struct.unpack("%dB" % nmethod, self.buf[2 : nmethod + 2]) |
347 | 347 |
371 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) | 371 self.transport.write(struct.pack("!BB", SOCKS5_VER, AUTHMECH_INVALID)) |
372 self.transport.loseConnection() | 372 self.transport.loseConnection() |
373 except struct.error: | 373 except struct.error: |
374 pass | 374 pass |
375 | 375 |
376 def _parseUserPass(self): | 376 def _parse_user_pass(self): |
377 try: | 377 try: |
378 # Parse out data | 378 # Parse out data |
379 ver, ulen = struct.unpack("BB", self.buf[:2]) | 379 ver, ulen = struct.unpack("BB", self.buf[:2]) |
380 uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2]) | 380 uname, = struct.unpack("%ds" % ulen, self.buf[2 : ulen + 2]) |
381 plen, = struct.unpack("B", self.buf[ulen + 2]) | 381 plen, = struct.unpack("B", self.buf[ulen + 2]) |
382 password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen]) | 382 password, = struct.unpack("%ds" % plen, self.buf[ulen + 3 : ulen + 3 + plen]) |
383 # Trim off fron of the buffer | 383 # Trim off fron of the buffer |
384 self.buf = self.buf[3 + ulen + plen :] | 384 self.buf = self.buf[3 + ulen + plen :] |
385 # Fire event to authenticate user | 385 # Fire event to authenticate user |
386 if self.authenticateUserPass(uname, password): | 386 if self.authenticate_user_pass(uname, password): |
387 # Signal success | 387 # Signal success |
388 self.state = STATE_REQUEST | 388 self.state = STATE_REQUEST |
389 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00)) | 389 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x00)) |
390 else: | 390 else: |
391 # Signal failure | 391 # Signal failure |
392 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01)) | 392 self.transport.write(struct.pack("!BB", SOCKS5_VER, 0x01)) |
393 self.transport.loseConnection() | 393 self.transport.loseConnection() |
394 except struct.error: | 394 except struct.error: |
395 pass | 395 pass |
396 | 396 |
397 def sendErrorReply(self, errorcode): | 397 def send_error_reply(self, errorcode): |
398 # Any other address types are not supported | 398 # Any other address types are not supported |
399 result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0) | 399 result = struct.pack("!BBBBIH", SOCKS5_VER, errorcode, 0, 1, 0, 0) |
400 self.transport.write(result) | 400 self.transport.write(result) |
401 self.transport.loseConnection() | 401 self.transport.loseConnection() |
402 | 402 |
405 # Parse out data and trim buffer accordingly | 405 # Parse out data and trim buffer accordingly |
406 ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) | 406 ver, cmd, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) |
407 | 407 |
408 # Ensure we actually support the requested address type | 408 # Ensure we actually support the requested address type |
409 if self.addressType not in self.supportedAddrs: | 409 if self.addressType not in self.supportedAddrs: |
410 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | 410 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) |
411 return | 411 return |
412 | 412 |
413 # Deal with addresses | 413 # Deal with addresses |
414 if self.addressType == ADDR_IPV4: | 414 if self.addressType == ADDR_IPV4: |
415 addr, port = struct.unpack("!IH", self.buf[4:10]) | 415 addr, port = struct.unpack("!IH", self.buf[4:10]) |
418 nlen = self.buf[4] | 418 nlen = self.buf[4] |
419 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) | 419 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) |
420 self.buf = self.buf[7 + len(addr) :] | 420 self.buf = self.buf[7 + len(addr) :] |
421 else: | 421 else: |
422 # Any other address types are not supported | 422 # Any other address types are not supported |
423 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | 423 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) |
424 return | 424 return |
425 | 425 |
426 # Ensure command is supported | 426 # Ensure command is supported |
427 if cmd not in self.enabledCommands: | 427 if cmd not in self.enabledCommands: |
428 # Send a not supported error | 428 # Send a not supported error |
429 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) | 429 self.send_error_reply(REPLY_CMD_NOT_SUPPORTED) |
430 return | 430 return |
431 | 431 |
432 # Process the command | 432 # Process the command |
433 if cmd == CMD_CONNECT: | 433 if cmd == CMD_CONNECT: |
434 self.connectRequested(addr, port) | 434 self.connect_requested(addr, port) |
435 elif cmd == CMD_BIND: | 435 elif cmd == CMD_BIND: |
436 self.bindRequested(addr, port) | 436 self.bind_requested(addr, port) |
437 else: | 437 else: |
438 # Any other command is not supported | 438 # Any other command is not supported |
439 self.sendErrorReply(REPLY_CMD_NOT_SUPPORTED) | 439 self.send_error_reply(REPLY_CMD_NOT_SUPPORTED) |
440 | 440 |
441 except struct.error: | 441 except struct.error: |
442 # The buffer is probably not complete, we need to wait more | 442 # The buffer is probably not complete, we need to wait more |
443 return None | 443 return None |
444 | 444 |
445 def _makeRequest(self): | 445 def _make_request(self): |
446 hash_ = self._session_hash.encode('utf-8') | 446 hash_ = self._session_hash.encode('utf-8') |
447 request = struct.pack( | 447 request = struct.pack( |
448 "!5B%dsH" % len(hash_), | 448 "!5B%dsH" % len(hash_), |
449 SOCKS5_VER, | 449 SOCKS5_VER, |
450 CMD_CONNECT, | 450 CMD_CONNECT, |
455 0, | 455 0, |
456 ) | 456 ) |
457 self.transport.write(request) | 457 self.transport.write(request) |
458 self.state = STATE_CLIENT_REQUEST | 458 self.state = STATE_CLIENT_REQUEST |
459 | 459 |
460 def _parseRequestReply(self): | 460 def _parse_request_reply(self): |
461 try: | 461 try: |
462 ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) | 462 ver, rep, rsvd, self.addressType = struct.unpack("!BBBB", self.buf[:4]) |
463 # Ensure we actually support the requested address type | 463 # Ensure we actually support the requested address type |
464 if self.addressType not in self.supportedAddrs: | 464 if self.addressType not in self.supportedAddrs: |
465 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | 465 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) |
466 return | 466 return |
467 | 467 |
468 # Deal with addresses | 468 # Deal with addresses |
469 if self.addressType == ADDR_IPV4: | 469 if self.addressType == ADDR_IPV4: |
470 addr, port = struct.unpack("!IH", self.buf[4:10]) | 470 addr, port = struct.unpack("!IH", self.buf[4:10]) |
473 nlen = self.buf[4] | 473 nlen = self.buf[4] |
474 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) | 474 addr, port = struct.unpack("!%dsH" % nlen, self.buf[5:]) |
475 self.buf = self.buf[7 + len(addr) :] | 475 self.buf = self.buf[7 + len(addr) :] |
476 else: | 476 else: |
477 # Any other address types are not supported | 477 # Any other address types are not supported |
478 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | 478 self.send_error_reply(REPLY_ADDR_NOT_SUPPORTED) |
479 return | 479 return |
480 | 480 |
481 # Ensure reply is OK | 481 # Ensure reply is OK |
482 if rep != REPLY_SUCCESS: | 482 if rep != REPLY_SUCCESS: |
483 self.loseConnection() | 483 self.loseConnection() |
495 "Socks5 connectionMade (mode = {})".format( | 495 "Socks5 connectionMade (mode = {})".format( |
496 "server" if self.state == STATE_INITIAL else "client" | 496 "server" if self.state == STATE_INITIAL else "client" |
497 ) | 497 ) |
498 ) | 498 ) |
499 if self.state == STATE_CLIENT_INITIAL: | 499 if self.state == STATE_CLIENT_INITIAL: |
500 self._startNegotiation() | 500 self._start_negotiation() |
501 | 501 |
502 def connectRequested(self, addr, port): | 502 def connect_requested(self, addr, port): |
503 # Check that this session is expected | 503 # Check that this session is expected |
504 if not self.factory.addToSession(addr.decode('utf-8'), self): | 504 if not self.factory.add_to_session(addr.decode('utf-8'), self): |
505 log.warning( | 505 log.warning( |
506 "Unexpected connection request received from {host}".format( | 506 "Unexpected connection request received from {host}".format( |
507 host=self.transport.getPeer().host | 507 host=self.transport.getPeer().host |
508 ) | 508 ) |
509 ) | 509 ) |
510 self.sendErrorReply(REPLY_CONN_REFUSED) | 510 self.send_error_reply(REPLY_CONN_REFUSED) |
511 return | 511 return |
512 self._session_hash = addr.decode('utf-8') | 512 self._session_hash = addr.decode('utf-8') |
513 self.connectCompleted(addr, 0) | 513 self.connect_completed(addr, 0) |
514 | 514 |
515 def startTransfer(self, chunk_size): | 515 def start_transfer(self, chunk_size): |
516 """Callback called when the result iq is received | 516 """Callback called when the result iq is received |
517 | 517 |
518 @param chunk_size(None, int): size of the buffer, or None for default | 518 @param chunk_size(None, int): size of the buffer, or None for default |
519 """ | 519 """ |
520 self.active = True | 520 self.active = True |
521 if chunk_size is not None: | 521 if chunk_size is not None: |
522 self.CHUNK_SIZE = chunk_size | 522 self.CHUNK_SIZE = chunk_size |
523 log.debug("Starting file transfer") | 523 log.debug("Starting file transfer") |
524 d = self.stream_object.startStream(self.transport) | 524 d = self.stream_object.start_stream(self.transport) |
525 d.addCallback(self.streamFinished) | 525 d.addCallback(self.stream_finished) |
526 | 526 |
527 def streamFinished(self, d): | 527 def stream_finished(self, d): |
528 log.info(_("File transfer completed, closing connection")) | 528 log.info(_("File transfer completed, closing connection")) |
529 self.transport.loseConnection() | 529 self.transport.loseConnection() |
530 | 530 |
531 def connectCompleted(self, remotehost, remoteport): | 531 def connect_completed(self, remotehost, remoteport): |
532 if self.addressType == ADDR_IPV4: | 532 if self.addressType == ADDR_IPV4: |
533 result = struct.pack( | 533 result = struct.pack( |
534 "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport | 534 "!BBBBIH", SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport |
535 ) | 535 ) |
536 elif self.addressType == ADDR_DOMAINNAME: | 536 elif self.addressType == ADDR_DOMAINNAME: |
545 remoteport, | 545 remoteport, |
546 ) | 546 ) |
547 self.transport.write(result) | 547 self.transport.write(result) |
548 self.state = STATE_READY | 548 self.state = STATE_READY |
549 | 549 |
550 def bindRequested(self, addr, port): | 550 def bind_requested(self, addr, port): |
551 pass | 551 pass |
552 | 552 |
553 def authenticateUserPass(self, user, passwd): | 553 def authenticate_user_pass(self, user, passwd): |
554 # FIXME: implement authentication and remove the debug printing a password | 554 # FIXME: implement authentication and remove the debug printing a password |
555 log.debug("User/pass: %s/%s" % (user, passwd)) | 555 log.debug("User/pass: %s/%s" % (user, passwd)) |
556 return True | 556 return True |
557 | 557 |
558 def dataReceived(self, buf): | 558 def dataReceived(self, buf): |
564 self.getSession()[TIMER_KEY].cancel() | 564 self.getSession()[TIMER_KEY].cancel() |
565 return | 565 return |
566 | 566 |
567 self.buf = self.buf + buf | 567 self.buf = self.buf + buf |
568 if self.state == STATE_INITIAL: | 568 if self.state == STATE_INITIAL: |
569 self._parseNegotiation() | 569 self._parse_negotiation() |
570 if self.state == STATE_AUTH_USERPASS: | 570 if self.state == STATE_AUTH_USERPASS: |
571 self._parseUserPass() | 571 self._parse_user_pass() |
572 if self.state == STATE_REQUEST: | 572 if self.state == STATE_REQUEST: |
573 self._parseRequest() | 573 self._parseRequest() |
574 if self.state == STATE_CLIENT_REQUEST: | 574 if self.state == STATE_CLIENT_REQUEST: |
575 self._parseRequestReply() | 575 self._parse_request_reply() |
576 if self.state == STATE_CLIENT_AUTH: | 576 if self.state == STATE_CLIENT_AUTH: |
577 ver, method = struct.unpack("!BB", buf) | 577 ver, method = struct.unpack("!BB", buf) |
578 self.buf = self.buf[2:] | 578 self.buf = self.buf[2:] |
579 if ver != SOCKS5_VER or method != AUTHMECH_ANON: | 579 if ver != SOCKS5_VER or method != AUTHMECH_ANON: |
580 self.transport.loseConnection() | 580 self.transport.loseConnection() |
581 else: | 581 else: |
582 self._makeRequest() | 582 self._make_request() |
583 | 583 |
584 def connectionLost(self, reason): | 584 def connectionLost(self, reason): |
585 log.debug("Socks5 connection lost: {}".format(reason.value)) | 585 log.debug("Socks5 connection lost: {}".format(reason.value)) |
586 if self.state != STATE_READY: | 586 if self.state != STATE_READY: |
587 self.connection.errback(reason) | 587 self.connection.errback(reason) |
589 try: | 589 try: |
590 session_hash = self._session_hash | 590 session_hash = self._session_hash |
591 except AttributeError: | 591 except AttributeError: |
592 log.debug("no session has been received yet") | 592 log.debug("no session has been received yet") |
593 else: | 593 else: |
594 self.factory.removeFromSession(session_hash, self, reason) | 594 self.factory.remove_from_session(session_hash, self, reason) |
595 | 595 |
596 | 596 |
597 class Socks5ServerFactory(protocol.ServerFactory): | 597 class Socks5ServerFactory(protocol.ServerFactory): |
598 protocol = SOCKSv5 | 598 protocol = SOCKSv5 |
599 | 599 |
604 self.parent = parent | 604 self.parent = parent |
605 | 605 |
606 def getSession(self, session_hash): | 606 def getSession(self, session_hash): |
607 return self.parent.getSession(None, session_hash) | 607 return self.parent.getSession(None, session_hash) |
608 | 608 |
609 def startTransfer(self, session_hash, chunk_size=None): | 609 def start_transfer(self, session_hash, chunk_size=None): |
610 session = self.getSession(session_hash) | 610 session = self.getSession(session_hash) |
611 try: | 611 try: |
612 protocol = session["protocols"][0] | 612 protocol = session["protocols"][0] |
613 except (KeyError, IndexError): | 613 except (KeyError, IndexError): |
614 log.error("Can't start file transfer, can't find protocol") | 614 log.error("Can't start file transfer, can't find protocol") |
615 else: | 615 else: |
616 session[TIMER_KEY].cancel() | 616 session[TIMER_KEY].cancel() |
617 protocol.startTransfer(chunk_size) | 617 protocol.start_transfer(chunk_size) |
618 | 618 |
619 def addToSession(self, session_hash, protocol): | 619 def add_to_session(self, session_hash, protocol): |
620 """Check is session_hash is valid, and associate protocol with it | 620 """Check is session_hash is valid, and associate protocol with it |
621 | 621 |
622 the session will be associated to the corresponding candidate | 622 the session will be associated to the corresponding candidate |
623 @param session_hash(str): hash of the session | 623 @param session_hash(str): hash of the session |
624 @param protocol(SOCKSv5): protocol instance | 624 @param protocol(SOCKSv5): protocol instance |
631 return False | 631 return False |
632 else: | 632 else: |
633 session_data.setdefault("protocols", []).append(protocol) | 633 session_data.setdefault("protocols", []).append(protocol) |
634 return True | 634 return True |
635 | 635 |
636 def removeFromSession(self, session_hash, protocol, reason): | 636 def remove_from_session(self, session_hash, protocol, reason): |
637 """Remove a protocol from session_data | 637 """Remove a protocol from session_data |
638 | 638 |
639 There can be several protocol instances while candidates are tried, they | 639 There can be several protocol instances while candidates are tried, they |
640 have removed when candidate connection is closed | 640 have removed when candidate connection is closed |
641 @param session_hash(str): hash of the session | 641 @param session_hash(str): hash of the session |
681 self.connector.disconnect() | 681 self.connector.disconnect() |
682 | 682 |
683 def getSession(self): | 683 def getSession(self): |
684 return self.session | 684 return self.session |
685 | 685 |
686 def startTransfer(self, __=None, chunk_size=None): | 686 def start_transfer(self, __=None, chunk_size=None): |
687 self.session[TIMER_KEY].cancel() | 687 self.session[TIMER_KEY].cancel() |
688 self._protocol_instance.startTransfer(chunk_size) | 688 self._protocol_instance.start_transfer(chunk_size) |
689 | 689 |
690 def clientConnectionFailed(self, connector, reason): | 690 def clientConnectionFailed(self, connector, reason): |
691 log.debug("Connection failed") | 691 log.debug("Connection failed") |
692 self.connection.errback(reason) | 692 self.connection.errback(reason) |
693 | 693 |
739 log.debug("NAT Port plugin not available") | 739 log.debug("NAT Port plugin not available") |
740 self._np = None | 740 self._np = None |
741 | 741 |
742 # parameters | 742 # parameters |
743 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP | 743 # XXX: params are not used for now, but they may be used in the futur to force proxy/IP |
744 # host.memory.updateParams(PARAMS) | 744 # host.memory.update_params(PARAMS) |
745 | 745 |
746 def getHandler(self, client): | 746 def get_handler(self, client): |
747 return XEP_0065_handler(self) | 747 return XEP_0065_handler(self) |
748 | 748 |
749 def profileConnected(self, client): | 749 def profile_connected(self, client): |
750 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) | 750 client.xep_0065_sid_session = {} # key: stream_id, value: session_data(dict) |
751 client._s5b_sessions = {} | 751 client._s5b_sessions = {} |
752 | 752 |
753 def getSessionHash(self, from_jid, to_jid, sid): | 753 def get_session_hash(self, from_jid, to_jid, sid): |
754 return getSessionHash(from_jid, to_jid, sid) | 754 return get_session_hash(from_jid, to_jid, sid) |
755 | 755 |
756 def getSocks5ServerFactory(self): | 756 def get_socks_5_server_factory(self): |
757 """Return server factory | 757 """Return server factory |
758 | 758 |
759 The server is created if it doesn't exists yet | 759 The server is created if it doesn't exists yet |
760 self._server_factory_port is set on server creation | 760 self._server_factory_port is set on server creation |
761 """ | 761 """ |
783 ) | 783 ) |
784 ) | 784 ) |
785 return self._server_factory | 785 return self._server_factory |
786 | 786 |
787 @defer.inlineCallbacks | 787 @defer.inlineCallbacks |
788 def getProxy(self, client, local_jid): | 788 def get_proxy(self, client, local_jid): |
789 """Return the proxy available for this profile | 789 """Return the proxy available for this profile |
790 | 790 |
791 cache is used between clients using the same server | 791 cache is used between clients using the same server |
792 @param local_jid(jid.JID): same as for [getCandidates] | 792 @param local_jid(jid.JID): same as for [get_candidates] |
793 @return ((D)(ProxyInfos, None)): Found proxy infos, | 793 @return ((D)(ProxyInfos, None)): Found proxy infos, |
794 or None if not acceptable proxy is found | 794 or None if not acceptable proxy is found |
795 @raise exceptions.NotFound: no Proxy found | 795 @raise exceptions.NotFound: no Proxy found |
796 """ | 796 """ |
797 | 797 |
805 defer.returnValue(self._cache_proxies[server]) | 805 defer.returnValue(self._cache_proxies[server]) |
806 except KeyError: | 806 except KeyError: |
807 pass | 807 pass |
808 try: | 808 try: |
809 proxy = ( | 809 proxy = ( |
810 yield self.host.findServiceEntities(client, "proxy", "bytestreams") | 810 yield self.host.find_service_entities(client, "proxy", "bytestreams") |
811 ).pop() | 811 ).pop() |
812 except (defer.CancelledError, StopIteration, KeyError): | 812 except (defer.CancelledError, StopIteration, KeyError): |
813 notFound(server) | 813 notFound(server) |
814 iq_elt = client.IQ("get") | 814 iq_elt = client.IQ("get") |
815 iq_elt["from"] = local_jid.full() | 815 iq_elt["from"] = local_jid.full() |
842 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) | 842 proxy_infos = self._cache_proxies[server] = ProxyInfos(host, jid_, port) |
843 log.info("Proxy found: {}".format(proxy_infos)) | 843 log.info("Proxy found: {}".format(proxy_infos)) |
844 defer.returnValue(proxy_infos) | 844 defer.returnValue(proxy_infos) |
845 | 845 |
846 @defer.inlineCallbacks | 846 @defer.inlineCallbacks |
847 def _getNetworkData(self, client): | 847 def _get_network_data(self, client): |
848 """Retrieve information about network | 848 """Retrieve information about network |
849 | 849 |
850 @param client: %(doc_client)s | 850 @param client: %(doc_client)s |
851 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data | 851 @return (D(tuple[local_port, external_port, local_ips, external_ip])): network data |
852 """ | 852 """ |
853 self.getSocks5ServerFactory() | 853 self.get_socks_5_server_factory() |
854 local_port = self._server_factory_port | 854 local_port = self._server_factory_port |
855 external_ip = yield self._ip.getExternalIP(client) | 855 external_ip = yield self._ip.get_external_ip(client) |
856 local_ips = yield self._ip.getLocalIPs(client) | 856 local_ips = yield self._ip.get_local_i_ps(client) |
857 | 857 |
858 if external_ip is not None and self._external_port is None: | 858 if external_ip is not None and self._external_port is None: |
859 if external_ip != local_ips[0]: | 859 if external_ip != local_ips[0]: |
860 log.info("We are probably behind a NAT") | 860 log.info("We are probably behind a NAT") |
861 if self._np is None: | 861 if self._np is None: |
862 log.warning("NAT port plugin not available, we can't map port") | 862 log.warning("NAT port plugin not available, we can't map port") |
863 else: | 863 else: |
864 ext_port = yield self._np.mapPort( | 864 ext_port = yield self._np.map_port( |
865 local_port, desc="SaT socks5 stream" | 865 local_port, desc="SaT socks5 stream" |
866 ) | 866 ) |
867 if ext_port is None: | 867 if ext_port is None: |
868 log.warning("Can't map NAT port") | 868 log.warning("Can't map NAT port") |
869 else: | 869 else: |
870 self._external_port = ext_port | 870 self._external_port = ext_port |
871 | 871 |
872 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) | 872 defer.returnValue((local_port, self._external_port, local_ips, external_ip)) |
873 | 873 |
874 @defer.inlineCallbacks | 874 @defer.inlineCallbacks |
875 def getCandidates(self, client, local_jid): | 875 def get_candidates(self, client, local_jid): |
876 """Return a list of our stream candidates | 876 """Return a list of our stream candidates |
877 | 877 |
878 @param local_jid(jid.JID): jid to use as local jid | 878 @param local_jid(jid.JID): jid to use as local jid |
879 This is needed for client which can be addressed with a different jid than | 879 This is needed for client which can be addressed with a different jid than |
880 client.jid if a local part is used (e.g. piotr@file.example.net where | 880 client.jid if a local part is used (e.g. piotr@file.example.net where |
881 client.jid would be file.example.net) | 881 client.jid would be file.example.net) |
882 @return (D(list[Candidate])): list of candidates, ordered by priority | 882 @return (D(list[Candidate])): list of candidates, ordered by priority |
883 """ | 883 """ |
884 server_factory = yield self.getSocks5ServerFactory() | 884 server_factory = yield self.get_socks_5_server_factory() |
885 local_port, ext_port, local_ips, external_ip = yield self._getNetworkData(client) | 885 local_port, ext_port, local_ips, external_ip = yield self._get_network_data(client) |
886 try: | 886 try: |
887 proxy = yield self.getProxy(client, local_jid) | 887 proxy = yield self.get_proxy(client, local_jid) |
888 except exceptions.NotFound: | 888 except exceptions.NotFound: |
889 proxy = None | 889 proxy = None |
890 | 890 |
891 # its time to gather the candidates | 891 # its time to gather the candidates |
892 candidates = [] | 892 candidates = [] |
948 | 948 |
949 # should be already sorted, but just in case the priorities get weird | 949 # should be already sorted, but just in case the priorities get weird |
950 candidates.sort(key=lambda c: c.priority, reverse=True) | 950 candidates.sort(key=lambda c: c.priority, reverse=True) |
951 defer.returnValue(candidates) | 951 defer.returnValue(candidates) |
952 | 952 |
953 def _addConnector(self, connector, candidate): | 953 def _add_connector(self, connector, candidate): |
954 """Add connector used to connect to candidate, and return client factory's connection Deferred | 954 """Add connector used to connect to candidate, and return client factory's connection Deferred |
955 | 955 |
956 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion | 956 the connector can be used to disconnect the candidate, and returning the factory's connection Deferred allow to wait for connection completion |
957 @param connector: a connector implementing IConnector | 957 @param connector: a connector implementing IConnector |
958 @param candidate(Candidate): candidate linked to the connector | 958 @param candidate(Candidate): candidate linked to the connector |
959 @return (D): Deferred fired when factory connection is done or has failed | 959 @return (D): Deferred fired when factory connection is done or has failed |
960 """ | 960 """ |
961 candidate.factory.connector = connector | 961 candidate.factory.connector = connector |
962 return candidate.factory.connection | 962 return candidate.factory.connection |
963 | 963 |
964 def connectCandidate( | 964 def connect_candidate( |
965 self, client, candidate, session_hash, peer_session_hash=None, delay=None | 965 self, client, candidate, session_hash, peer_session_hash=None, delay=None |
966 ): | 966 ): |
967 """Connect to a candidate | 967 """Connect to a candidate |
968 | 968 |
969 Connection will be done with a Socks5ClientFactory | 969 Connection will be done with a Socks5ClientFactory |
973 @param peer_session_hash(unicode, None): hash used with the peer | 973 @param peer_session_hash(unicode, None): hash used with the peer |
974 None to use session_hash. | 974 None to use session_hash. |
975 None must be used in 2 cases: | 975 None must be used in 2 cases: |
976 - when XEP-0065 is used with XEP-0096 | 976 - when XEP-0065 is used with XEP-0096 |
977 - when a peer connect to a proxy *he proposed himself* | 977 - when a peer connect to a proxy *he proposed himself* |
978 in practice, peer_session_hash is only used by tryCandidates | 978 in practice, peer_session_hash is only used by try_candidates |
979 @param delay(None, float): optional delay to wait before connection, in seconds | 979 @param delay(None, float): optional delay to wait before connection, in seconds |
980 @return (D): Deferred launched when TCP connection + Socks5 connection is done | 980 @return (D): Deferred launched when TCP connection + Socks5 connection is done |
981 """ | 981 """ |
982 if peer_session_hash is None: | 982 if peer_session_hash is None: |
983 # for XEP-0065, only one hash is needed | 983 # for XEP-0065, only one hash is needed |
988 if delay is None: | 988 if delay is None: |
989 d = defer.succeed(candidate.host) | 989 d = defer.succeed(candidate.host) |
990 else: | 990 else: |
991 d = sat_defer.DelayedDeferred(delay, candidate.host) | 991 d = sat_defer.DelayedDeferred(delay, candidate.host) |
992 d.addCallback(reactor.connectTCP, candidate.port, factory) | 992 d.addCallback(reactor.connectTCP, candidate.port, factory) |
993 d.addCallback(self._addConnector, candidate) | 993 d.addCallback(self._add_connector, candidate) |
994 return d | 994 return d |
995 | 995 |
996 def tryCandidates( | 996 def try_candidates( |
997 self, | 997 self, |
998 client, | 998 client, |
999 candidates, | 999 candidates, |
1000 session_hash, | 1000 session_hash, |
1001 peer_session_hash, | 1001 peer_session_hash, |
1006 | 1006 |
1007 for candidate in candidates: | 1007 for candidate in candidates: |
1008 delay = CANDIDATE_DELAY * len(defers_list) | 1008 delay = CANDIDATE_DELAY * len(defers_list) |
1009 if candidate.type == XEP_0065.TYPE_PROXY: | 1009 if candidate.type == XEP_0065.TYPE_PROXY: |
1010 delay += CANDIDATE_DELAY_PROXY | 1010 delay += CANDIDATE_DELAY_PROXY |
1011 d = self.connectCandidate( | 1011 d = self.connect_candidate( |
1012 client, candidate, session_hash, peer_session_hash, delay | 1012 client, candidate, session_hash, peer_session_hash, delay |
1013 ) | 1013 ) |
1014 if connection_cb is not None: | 1014 if connection_cb is not None: |
1015 d.addCallback( | 1015 d.addCallback( |
1016 lambda __, candidate=candidate, client=client: connection_cb( | 1016 lambda __, candidate=candidate, client=client: connection_cb( |
1021 d.addErrback(connection_eb, client, candidate) | 1021 d.addErrback(connection_eb, client, candidate) |
1022 defers_list.append(d) | 1022 defers_list.append(d) |
1023 | 1023 |
1024 return defers_list | 1024 return defers_list |
1025 | 1025 |
1026 def getBestCandidate(self, client, candidates, session_hash, peer_session_hash=None): | 1026 def get_best_candidate(self, client, candidates, session_hash, peer_session_hash=None): |
1027 """Get best candidate (according to priority) which can connect | 1027 """Get best candidate (according to priority) which can connect |
1028 | 1028 |
1029 @param candidates(iterable[Candidate]): candidates to test | 1029 @param candidates(iterable[Candidate]): candidates to test |
1030 @param session_hash(unicode): hash of the session | 1030 @param session_hash(unicode): hash of the session |
1031 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 | 1031 hash is the same as hostname computed in XEP-0065 § 5.3.2 #1 |
1033 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates | 1033 only useful for XEP-0260, must be None for XEP-0065 streamhost candidates |
1034 @return (D(None, Candidate)): best candidate or None if none can connect | 1034 @return (D(None, Candidate)): best candidate or None if none can connect |
1035 """ | 1035 """ |
1036 defer_candidates = None | 1036 defer_candidates = None |
1037 | 1037 |
1038 def connectionCb(client, candidate): | 1038 def connection_cb(client, candidate): |
1039 log.info("Connection of {} successful".format(str(candidate))) | 1039 log.info("Connection of {} successful".format(str(candidate))) |
1040 for idx, other_candidate in enumerate(candidates): | 1040 for idx, other_candidate in enumerate(candidates): |
1041 try: | 1041 try: |
1042 if other_candidate.priority < candidate.priority: | 1042 if other_candidate.priority < candidate.priority: |
1043 log.debug("Cancelling {}".format(other_candidate)) | 1043 log.debug("Cancelling {}".format(other_candidate)) |
1044 defer_candidates[idx].cancel() | 1044 defer_candidates[idx].cancel() |
1045 except AttributeError: | 1045 except AttributeError: |
1046 assert other_candidate is None | 1046 assert other_candidate is None |
1047 | 1047 |
1048 def connectionEb(failure, client, candidate): | 1048 def connection_eb(failure, client, candidate): |
1049 if failure.check(defer.CancelledError): | 1049 if failure.check(defer.CancelledError): |
1050 log.debug("Connection of {} has been cancelled".format(candidate)) | 1050 log.debug("Connection of {} has been cancelled".format(candidate)) |
1051 else: | 1051 else: |
1052 log.info( | 1052 log.info( |
1053 "Connection of {candidate} Failed: {error}".format( | 1053 "Connection of {candidate} Failed: {error}".format( |
1054 candidate=candidate, error=failure.value | 1054 candidate=candidate, error=failure.value |
1055 ) | 1055 ) |
1056 ) | 1056 ) |
1057 candidates[candidates.index(candidate)] = None | 1057 candidates[candidates.index(candidate)] = None |
1058 | 1058 |
1059 def allTested(__): | 1059 def all_tested(__): |
1060 log.debug("All candidates have been tested") | 1060 log.debug("All candidates have been tested") |
1061 good_candidates = [c for c in candidates if c] | 1061 good_candidates = [c for c in candidates if c] |
1062 return good_candidates[0] if good_candidates else None | 1062 return good_candidates[0] if good_candidates else None |
1063 | 1063 |
1064 defer_candidates = self.tryCandidates( | 1064 defer_candidates = self.try_candidates( |
1065 client, | 1065 client, |
1066 candidates, | 1066 candidates, |
1067 session_hash, | 1067 session_hash, |
1068 peer_session_hash, | 1068 peer_session_hash, |
1069 connectionCb, | 1069 connection_cb, |
1070 connectionEb, | 1070 connection_eb, |
1071 ) | 1071 ) |
1072 d_list = defer.DeferredList(defer_candidates) | 1072 d_list = defer.DeferredList(defer_candidates) |
1073 d_list.addCallback(allTested) | 1073 d_list.addCallback(all_tested) |
1074 return d_list | 1074 return d_list |
1075 | 1075 |
1076 def _timeOut(self, session_hash, client): | 1076 def _time_out(self, session_hash, client): |
1077 """Called when stream was not started quickly enough | 1077 """Called when stream was not started quickly enough |
1078 | 1078 |
1079 @param session_hash(str): hash as returned by getSessionHash | 1079 @param session_hash(str): hash as returned by get_session_hash |
1080 @param client: %(doc_client)s | 1080 @param client: %(doc_client)s |
1081 """ | 1081 """ |
1082 log.info("Socks5 Bytestream: TimeOut reached") | 1082 log.info("Socks5 Bytestream: TimeOut reached") |
1083 session = self.getSession(client, session_hash) | 1083 session = self.getSession(client, session_hash) |
1084 session[DEFER_KEY].errback(exceptions.TimeOutError()) | 1084 session[DEFER_KEY].errback(exceptions.TimeOutError()) |
1085 | 1085 |
1086 def killSession(self, failure_, session_hash, sid, client): | 1086 def kill_session(self, failure_, session_hash, sid, client): |
1087 """Clean the current session | 1087 """Clean the current session |
1088 | 1088 |
1089 @param session_hash(str): hash as returned by getSessionHash | 1089 @param session_hash(str): hash as returned by get_session_hash |
1090 @param sid(None, unicode): session id | 1090 @param sid(None, unicode): session id |
1091 or None if self.xep_0065_sid_session was not used | 1091 or None if self.xep_0065_sid_session was not used |
1092 @param client: %(doc_client)s | 1092 @param client: %(doc_client)s |
1093 @param failure_(None, failure.Failure): None if eveything was fine, a failure else | 1093 @param failure_(None, failure.Failure): None if eveything was fine, a failure else |
1094 @return (None, failure.Failure): failure_ is returned | 1094 @return (None, failure.Failure): failure_ is returned |
1126 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): | 1126 except (internet_error.AlreadyCalled, internet_error.AlreadyCancelled): |
1127 pass | 1127 pass |
1128 | 1128 |
1129 return failure_ | 1129 return failure_ |
1130 | 1130 |
1131 def startStream(self, client, stream_object, local_jid, to_jid, sid): | 1131 def start_stream(self, client, stream_object, local_jid, to_jid, sid): |
1132 """Launch the stream workflow | 1132 """Launch the stream workflow |
1133 | 1133 |
1134 @param streamProducer: stream_object to use | 1134 @param streamProducer: stream_object to use |
1135 @param local_jid(jid.JID): same as for [getCandidates] | 1135 @param local_jid(jid.JID): same as for [get_candidates] |
1136 @param to_jid: JID of the recipient | 1136 @param to_jid: JID of the recipient |
1137 @param sid: Stream session id | 1137 @param sid: Stream session id |
1138 @param successCb: method to call when stream successfuly finished | 1138 @param successCb: method to call when stream successfuly finished |
1139 @param failureCb: method to call when something goes wrong | 1139 @param failureCb: method to call when something goes wrong |
1140 @return (D): Deferred fired when session is finished | 1140 @return (D): Deferred fired when session is finished |
1141 """ | 1141 """ |
1142 session_data = self._createSession( | 1142 session_data = self._create_session( |
1143 client, stream_object, local_jid, to_jid, sid, True) | 1143 client, stream_object, local_jid, to_jid, sid, True) |
1144 | 1144 |
1145 session_data[client] = client | 1145 session_data[client] = client |
1146 | 1146 |
1147 def gotCandidates(candidates): | 1147 def got_candidates(candidates): |
1148 session_data["candidates"] = candidates | 1148 session_data["candidates"] = candidates |
1149 iq_elt = client.IQ() | 1149 iq_elt = client.IQ() |
1150 iq_elt["from"] = local_jid.full() | 1150 iq_elt["from"] = local_jid.full() |
1151 iq_elt["to"] = to_jid.full() | 1151 iq_elt["to"] = to_jid.full() |
1152 query_elt = iq_elt.addElement((NS_BS, "query")) | 1152 query_elt = iq_elt.addElement((NS_BS, "query")) |
1160 streamhost["jid"] = candidate.jid.full() | 1160 streamhost["jid"] = candidate.jid.full() |
1161 log.debug("Candidate proposed: {}".format(candidate)) | 1161 log.debug("Candidate proposed: {}".format(candidate)) |
1162 | 1162 |
1163 d = iq_elt.send() | 1163 d = iq_elt.send() |
1164 args = [client, session_data, local_jid] | 1164 args = [client, session_data, local_jid] |
1165 d.addCallbacks(self._IQNegotiationCb, self._IQNegotiationEb, args, None, args) | 1165 d.addCallbacks(self._iq_negotiation_cb, self._iq_negotiation_eb, args, None, args) |
1166 | 1166 |
1167 self.getCandidates(client, local_jid).addCallback(gotCandidates) | 1167 self.get_candidates(client, local_jid).addCallback(got_candidates) |
1168 return session_data[DEFER_KEY] | 1168 return session_data[DEFER_KEY] |
1169 | 1169 |
1170 def _IQNegotiationCb(self, iq_elt, client, session_data, local_jid): | 1170 def _iq_negotiation_cb(self, iq_elt, client, session_data, local_jid): |
1171 """Called when the result of open iq is received | 1171 """Called when the result of open iq is received |
1172 | 1172 |
1173 @param session_data(dict): data of the session | 1173 @param session_data(dict): data of the session |
1174 @param client: %(doc_client)s | 1174 @param client: %(doc_client)s |
1175 @param iq_elt(domish.Element): <iq> result | 1175 @param iq_elt(domish.Element): <iq> result |
1195 else: | 1195 else: |
1196 log.info("Candidate choosed by target: {}".format(candidate)) | 1196 log.info("Candidate choosed by target: {}".format(candidate)) |
1197 | 1197 |
1198 if candidate.type == XEP_0065.TYPE_PROXY: | 1198 if candidate.type == XEP_0065.TYPE_PROXY: |
1199 log.info("A Socks5 proxy is used") | 1199 log.info("A Socks5 proxy is used") |
1200 d = self.connectCandidate(client, candidate, session_data["hash"]) | 1200 d = self.connect_candidate(client, candidate, session_data["hash"]) |
1201 d.addCallback( | 1201 d.addCallback( |
1202 lambda __: candidate.activate( | 1202 lambda __: candidate.activate( |
1203 client, session_data["id"], session_data["peer_jid"], local_jid | 1203 client, session_data["id"], session_data["peer_jid"], local_jid |
1204 ) | 1204 ) |
1205 ) | 1205 ) |
1206 d.addErrback(self._activationEb) | 1206 d.addErrback(self._activation_eb) |
1207 else: | 1207 else: |
1208 d = defer.succeed(None) | 1208 d = defer.succeed(None) |
1209 | 1209 |
1210 d.addCallback(lambda __: candidate.startTransfer(session_data["hash"])) | 1210 d.addCallback(lambda __: candidate.start_transfer(session_data["hash"])) |
1211 | 1211 |
1212 def _activationEb(self, failure): | 1212 def _activation_eb(self, failure): |
1213 log.warning("Proxy activation error: {}".format(failure.value)) | 1213 log.warning("Proxy activation error: {}".format(failure.value)) |
1214 | 1214 |
1215 def _IQNegotiationEb(self, stanza_err, client, session_data, local_jid): | 1215 def _iq_negotiation_eb(self, stanza_err, client, session_data, local_jid): |
1216 log.warning("Socks5 transfer failed: {}".format(stanza_err.value)) | 1216 log.warning("Socks5 transfer failed: {}".format(stanza_err.value)) |
1217 # FIXME: must clean session | 1217 # FIXME: must clean session |
1218 | 1218 |
1219 def createSession(self, *args, **kwargs): | 1219 def create_session(self, *args, **kwargs): |
1220 """like [_createSession] but return the session deferred instead of the whole session | 1220 """like [_create_session] but return the session deferred instead of the whole session |
1221 | 1221 |
1222 session deferred is fired when transfer is finished | 1222 session deferred is fired when transfer is finished |
1223 """ | 1223 """ |
1224 return self._createSession(*args, **kwargs)[DEFER_KEY] | 1224 return self._create_session(*args, **kwargs)[DEFER_KEY] |
1225 | 1225 |
1226 def _createSession(self, client, stream_object, local_jid, to_jid, sid, | 1226 def _create_session(self, client, stream_object, local_jid, to_jid, sid, |
1227 requester=False): | 1227 requester=False): |
1228 """Called when a bytestream is imminent | 1228 """Called when a bytestream is imminent |
1229 | 1229 |
1230 @param stream_object(iface.IStreamProducer): File object where data will be | 1230 @param stream_object(iface.IStreamProducer): File object where data will be |
1231 written | 1231 written |
1235 @return (dict): session data | 1235 @return (dict): session data |
1236 """ | 1236 """ |
1237 if sid in client.xep_0065_sid_session: | 1237 if sid in client.xep_0065_sid_session: |
1238 raise exceptions.ConflictError("A session with this id already exists !") | 1238 raise exceptions.ConflictError("A session with this id already exists !") |
1239 if requester: | 1239 if requester: |
1240 session_hash = getSessionHash(local_jid, to_jid, sid) | 1240 session_hash = get_session_hash(local_jid, to_jid, sid) |
1241 session_data = self._registerHash(client, session_hash, stream_object) | 1241 session_data = self._register_hash(client, session_hash, stream_object) |
1242 else: | 1242 else: |
1243 session_hash = getSessionHash(to_jid, local_jid, sid) | 1243 session_hash = get_session_hash(to_jid, local_jid, sid) |
1244 session_d = defer.Deferred() | 1244 session_d = defer.Deferred() |
1245 session_d.addBoth(self.killSession, session_hash, sid, client) | 1245 session_d.addBoth(self.kill_session, session_hash, sid, client) |
1246 session_data = client._s5b_sessions[session_hash] = { | 1246 session_data = client._s5b_sessions[session_hash] = { |
1247 DEFER_KEY: session_d, | 1247 DEFER_KEY: session_d, |
1248 TIMER_KEY: reactor.callLater( | 1248 TIMER_KEY: reactor.callLater( |
1249 TIMEOUT, self._timeOut, session_hash, client | 1249 TIMEOUT, self._time_out, session_hash, client |
1250 ), | 1250 ), |
1251 } | 1251 } |
1252 client.xep_0065_sid_session[sid] = session_data | 1252 client.xep_0065_sid_session[sid] = session_data |
1253 session_data.update( | 1253 session_data.update( |
1254 { | 1254 { |
1281 except KeyError as e: | 1281 except KeyError as e: |
1282 log.warning("The requested session doesn't exists !") | 1282 log.warning("The requested session doesn't exists !") |
1283 raise e | 1283 raise e |
1284 return client._s5b_sessions[session_hash] | 1284 return client._s5b_sessions[session_hash] |
1285 | 1285 |
1286 def registerHash(self, *args, **kwargs): | 1286 def register_hash(self, *args, **kwargs): |
1287 """like [_registerHash] but return the session deferred instead of the whole session | 1287 """like [_register_hash] but return the session deferred instead of the whole session |
1288 session deferred is fired when transfer is finished | 1288 session deferred is fired when transfer is finished |
1289 """ | 1289 """ |
1290 return self._registerHash(*args, **kwargs)[DEFER_KEY] | 1290 return self._register_hash(*args, **kwargs)[DEFER_KEY] |
1291 | 1291 |
1292 def _registerHash(self, client, session_hash, stream_object): | 1292 def _register_hash(self, client, session_hash, stream_object): |
1293 """Create a session_data associated to hash | 1293 """Create a session_data associated to hash |
1294 | 1294 |
1295 @param session_hash(str): hash of the session | 1295 @param session_hash(str): hash of the session |
1296 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object | 1296 @param stream_object(iface.IStreamProducer, IConsumer, None): file-like object |
1297 None if it will be filled later | 1297 None if it will be filled later |
1298 return (dict): session data | 1298 return (dict): session data |
1299 """ | 1299 """ |
1300 assert session_hash not in client._s5b_sessions | 1300 assert session_hash not in client._s5b_sessions |
1301 session_d = defer.Deferred() | 1301 session_d = defer.Deferred() |
1302 session_d.addBoth(self.killSession, session_hash, None, client) | 1302 session_d.addBoth(self.kill_session, session_hash, None, client) |
1303 session_data = client._s5b_sessions[session_hash] = { | 1303 session_data = client._s5b_sessions[session_hash] = { |
1304 DEFER_KEY: session_d, | 1304 DEFER_KEY: session_d, |
1305 TIMER_KEY: reactor.callLater(TIMEOUT, self._timeOut, session_hash, client), | 1305 TIMER_KEY: reactor.callLater(TIMEOUT, self._time_out, session_hash, client), |
1306 } | 1306 } |
1307 | 1307 |
1308 if stream_object is not None: | 1308 if stream_object is not None: |
1309 session_data["stream_object"] = stream_object | 1309 session_data["stream_object"] = stream_object |
1310 | 1310 |
1311 assert session_hash not in self.hash_clients_map | 1311 assert session_hash not in self.hash_clients_map |
1312 self.hash_clients_map[session_hash] = client | 1312 self.hash_clients_map[session_hash] = client |
1313 | 1313 |
1314 return session_data | 1314 return session_data |
1315 | 1315 |
1316 def associateStreamObject(self, client, session_hash, stream_object): | 1316 def associate_stream_object(self, client, session_hash, stream_object): |
1317 """Associate a stream object with a session""" | 1317 """Associate a stream object with a session""" |
1318 session_data = self.getSession(client, session_hash) | 1318 session_data = self.getSession(client, session_hash) |
1319 assert "stream_object" not in session_data | 1319 assert "stream_object" not in session_data |
1320 session_data["stream_object"] = stream_object | 1320 session_data["stream_object"] = stream_object |
1321 | 1321 |
1322 def streamQuery(self, iq_elt, client): | 1322 def stream_query(self, iq_elt, client): |
1323 log.debug("BS stream query") | 1323 log.debug("BS stream query") |
1324 | 1324 |
1325 iq_elt.handled = True | 1325 iq_elt.handled = True |
1326 | 1326 |
1327 query_elt = next(iq_elt.elements(NS_BS, "query")) | 1327 query_elt = next(iq_elt.elements(NS_BS, "query")) |
1359 candidates.append(Candidate(host, port, type_, priority, jid_)) | 1359 candidates.append(Candidate(host, port, type_, priority, jid_)) |
1360 | 1360 |
1361 for candidate in candidates: | 1361 for candidate in candidates: |
1362 log.info("Candidate proposed: {}".format(candidate)) | 1362 log.info("Candidate proposed: {}".format(candidate)) |
1363 | 1363 |
1364 d = self.getBestCandidate(client, candidates, session_data["hash"]) | 1364 d = self.get_best_candidate(client, candidates, session_data["hash"]) |
1365 d.addCallback(self._ackStream, iq_elt, session_data, client) | 1365 d.addCallback(self._ack_stream, iq_elt, session_data, client) |
1366 | 1366 |
1367 def _ackStream(self, candidate, iq_elt, session_data, client): | 1367 def _ack_stream(self, candidate, iq_elt, session_data, client): |
1368 if candidate is None: | 1368 if candidate is None: |
1369 log.info("No streamhost candidate worked, we have to end negotiation") | 1369 log.info("No streamhost candidate worked, we have to end negotiation") |
1370 return client.sendError(iq_elt, "item-not-found") | 1370 return client.sendError(iq_elt, "item-not-found") |
1371 log.info("We choose: {}".format(candidate)) | 1371 log.info("We choose: {}".format(candidate)) |
1372 result_elt = xmlstream.toResponse(iq_elt, "result") | 1372 result_elt = xmlstream.toResponse(iq_elt, "result") |
1384 self.plugin_parent = plugin_parent | 1384 self.plugin_parent = plugin_parent |
1385 self.host = plugin_parent.host | 1385 self.host = plugin_parent.host |
1386 | 1386 |
1387 def connectionInitialized(self): | 1387 def connectionInitialized(self): |
1388 self.xmlstream.addObserver( | 1388 self.xmlstream.addObserver( |
1389 BS_REQUEST, self.plugin_parent.streamQuery, client=self.parent | 1389 BS_REQUEST, self.plugin_parent.stream_query, client=self.parent |
1390 ) | 1390 ) |
1391 | 1391 |
1392 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | 1392 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): |
1393 return [disco.DiscoFeature(NS_BS)] | 1393 return [disco.DiscoFeature(NS_BS)] |
1394 | 1394 |