Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 398:cb0285372818
File transfer:
- proxy managed in XEP-0065 (Socks5 bytestream)
- bug: fixed a bad id used during stream negociation
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 05 Oct 2011 16:49:57 +0200 |
parents | 8f3551ceee17 |
children | c513328ade9d |
comparison
equal
deleted
inserted
replaced
397:ccfd69d090c3 | 398:cb0285372818 |
---|---|
294 # Ensure reply is OK | 294 # Ensure reply is OK |
295 if rep != REPLY_SUCCESS: | 295 if rep != REPLY_SUCCESS: |
296 self.loseConnection() | 296 self.loseConnection() |
297 return | 297 return |
298 | 298 |
299 self.state = STATE_TARGET_READY | 299 if self.factory.proxy: |
300 self.factory.activateCb(self.sid, self.factory.iq_id) | 300 self.state = STATE_READY |
301 self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer) | |
302 else: | |
303 self.state = STATE_TARGET_READY | |
304 self.factory.activateCb(self.sid, self.factory.iq_id) | |
301 | 305 |
302 except struct.error, why: | 306 except struct.error, why: |
303 return None | 307 return None |
304 | 308 |
305 def connectionMade(self): | 309 def connectionMade(self): |
403 debug (_("Socks 5 server connection lost (reason: %s)"), reason) | 407 debug (_("Socks 5 server connection lost (reason: %s)"), reason) |
404 | 408 |
405 class Socks5ClientFactory(protocol.ClientFactory): | 409 class Socks5ClientFactory(protocol.ClientFactory): |
406 protocol = SOCKSv5 | 410 protocol = SOCKSv5 |
407 | 411 |
408 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb): | 412 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False): |
413 """Init the Client Factory | |
414 @param current_stream: current streams data | |
415 @param sid: Session ID | |
416 @param iq_id: iq id used to initiate the stream | |
417 @param activateCb: method to call to activate the stream | |
418 @param finishedCb: method to call when the stream session is finished | |
419 @param proxy: True if we are connecting throught a proxy (and we are a requester)""" | |
409 self.data = current_stream[sid] | 420 self.data = current_stream[sid] |
410 self.sid = sid | 421 self.sid = sid |
411 self.iq_id = iq_id | 422 self.iq_id = iq_id |
412 self.activateCb = activateCb | 423 self.activateCb = activateCb |
413 self.finishedCb = finishedCb | 424 self.finishedCb = finishedCb |
425 self.proxy = proxy | |
414 | 426 |
415 def startedConnecting(self, connector): | 427 def startedConnecting(self, connector): |
416 debug (_("Socks 5 client connection started")) | 428 debug (_("Socks 5 client connection started")) |
417 | 429 |
418 def clientConnectionLost(self, connector, reason): | 430 def clientConnectionLost(self, connector, reason): |
430 <category name="File Transfer"> | 442 <category name="File Transfer"> |
431 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> | 443 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> |
432 <param name="Port" value="28915" type="string" /> | 444 <param name="Port" value="28915" type="string" /> |
433 </category> | 445 </category> |
434 </general> | 446 </general> |
447 <individual> | |
448 <category name="File Transfer"> | |
449 <param name="Proxy" value="" type="string" /> | |
450 <param name="Proxy host" value="" type="string" /> | |
451 <param name="Proxy port" value="" type="string" /> | |
452 </category> | |
453 </individual> | |
435 </params> | 454 </params> |
436 """ | 455 """ |
437 | 456 |
438 def __init__(self, host): | 457 def __init__(self, host): |
439 info(_("Plugin XEP_0065 initialization")) | 458 info(_("Plugin XEP_0065 initialization")) |
500 | 519 |
501 if success: | 520 if success: |
502 success_cb(sid, file_obj, NS_BS) | 521 success_cb(sid, file_obj, NS_BS) |
503 else: | 522 else: |
504 failure_cb(sid, file_obj, NS_BS, failure_reason) | 523 failure_cb(sid, file_obj, NS_BS, failure_reason) |
505 | |
506 def setData(self, data, id): | |
507 self.data = data | |
508 self.transfer_id = id | |
509 | |
510 def sendFile(self, id, filepath, size): | |
511 #lauching socks5 requester | |
512 debug(_("Launching socks5 requester")) | |
513 self.server_factory.protocol.mode = "requester" | |
514 self.server_factory.protocol.filepath = filepath | |
515 self.server_factory.protocol.filesize = size | |
516 self.server_factory.protocol.transfer_id = id | |
517 | |
518 | 524 |
519 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): | 525 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): |
520 """Launch the stream workflow | 526 """Launch the stream workflow |
521 @param file_obj: file_obj to send | 527 @param file_obj: file_obj to send |
522 @param to_jid: JID of the recipient | 528 @param to_jid: JID of the recipient |
527 @param profile: %(doc_profile)s""" | 533 @param profile: %(doc_profile)s""" |
528 if length != None: | 534 if length != None: |
529 error(_('stream length not managed yet')) | 535 error(_('stream length not managed yet')) |
530 return; | 536 return; |
531 profile_jid, xmlstream = self.host.getJidNStream(profile) | 537 profile_jid, xmlstream = self.host.getJidNStream(profile) |
538 if not profile_jid or not xmlstream: | |
539 error(_("Unknown profile, this should not happen")) | |
540 return; | |
532 data = self.current_stream[sid] = {} | 541 data = self.current_stream[sid] = {} |
542 data["profile"] = profile | |
533 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | 543 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) |
534 data["file_obj"] = file_obj | 544 data["file_obj"] = file_obj |
545 data["from"] = profile_jid | |
535 data["to"] = to_jid | 546 data["to"] = to_jid |
536 data["success_cb"] = successCb | 547 data["success_cb"] = successCb |
537 data["failure_cb"] = failureCb | 548 data["failure_cb"] = failureCb |
538 data["xmlstream"] = xmlstream | 549 data["xmlstream"] = xmlstream |
539 data["hash"] = calculateHash(profile_jid, to_jid, sid) | 550 data["hash"] = calculateHash(profile_jid, to_jid, sid) |
545 iq_elt["from"] = profile_jid.full() | 556 iq_elt["from"] = profile_jid.full() |
546 iq_elt["to"] = to_jid.full() | 557 iq_elt["to"] = to_jid.full() |
547 query_elt = iq_elt.addElement('query', NS_BS) | 558 query_elt = iq_elt.addElement('query', NS_BS) |
548 query_elt['mode'] = 'tcp' | 559 query_elt['mode'] = 'tcp' |
549 query_elt['sid'] = sid | 560 query_elt['sid'] = sid |
561 #first streamhost: direct connection | |
550 streamhost = query_elt.addElement('streamhost') | 562 streamhost = query_elt.addElement('streamhost') |
551 streamhost['host'] = "127.0.0.1" #self.host.memory.getParamA("IP", "File Transfer") | 563 streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer") |
552 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") | 564 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") |
553 streamhost['jid'] = profile_jid.full() | 565 streamhost['jid'] = profile_jid.full() |
566 | |
567 #second streamhost: mediated connection, using proxy | |
568 streamhost = query_elt.addElement('streamhost') | |
569 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) | |
570 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) | |
571 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | |
572 | |
554 iq_elt.addCallback(self.iqResult, sid) | 573 iq_elt.addCallback(self.iqResult, sid) |
555 iq_elt.send() | 574 iq_elt.send() |
556 | 575 |
557 def iqResult(self, sid, iq_elt): | 576 def iqResult(self, sid, iq_elt): |
558 """Called when the result of open iq is received""" | 577 """Called when the result of open iq is received""" |
559 if iq_elt["type"] == "error": | 578 if iq_elt["type"] == "error": |
560 warning(_("Transfer failed")) | 579 warning(_("Transfer failed")) |
561 return | 580 return |
562 | 581 |
563 try: | 582 try: |
564 data = self.current_stream[sid] | 583 data = self.current_stream[sid] |
565 callback = data["start_transfer_cb"] | |
566 file_obj = data["file_obj"] | 584 file_obj = data["file_obj"] |
567 timer = data["timer"] | 585 timer = data["timer"] |
586 profile = data["profile"] | |
568 except KeyError: | 587 except KeyError: |
569 error(_("Internal error, can't do transfer")) | 588 error(_("Internal error, can't do transfer")) |
570 return | 589 return |
571 | 590 |
572 if timer.active(): | 591 if timer.active(): |
573 timer.cancel() | 592 timer.cancel() |
574 | 593 |
575 callback(file_obj) | 594 profile_jid, xmlstream = self.host.getJidNStream(profile) |
576 | 595 query_elt = iq_elt.firstChildElement() |
596 streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) | |
597 if not streamhost_elts: | |
598 warning(_("No streamhost found in stream query")) | |
599 return | |
600 | |
601 streamhost_jid = streamhost_elts[0]['jid'] | |
602 if streamhost_jid != profile_jid.full(): | |
603 debug(_("A proxy server is used")) | |
604 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) | |
605 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) | |
606 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | |
607 if proxy_jid != streamhost_jid: | |
608 warning(_("Proxy jid is not the same as in parameters, this should not happen")) | |
609 return | |
610 factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True) | |
611 reactor.connectTCP(proxy_host, int(proxy_port), factory) | |
612 else: | |
613 data["start_transfer_cb"](file_obj) #We now activate the stream | |
614 | |
615 def activateProxyStream(self, sid, iq_id, start_transfer_cb): | |
616 debug(_("activating stream")) | |
617 data = self.current_stream[sid] | |
618 profile = data['profile'] | |
619 profile_jid, xmlstream = self.host.getJidNStream(profile) | |
620 | |
621 iq_elt = client.IQ(xmlstream,'set') | |
622 iq_elt["from"] = profile_jid.full() | |
623 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | |
624 query_elt = iq_elt.addElement('query', NS_BS) | |
625 query_elt['sid'] = sid | |
626 query_elt.addElement('activate', content=data['to'].full()) | |
627 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) | |
628 iq_elt.send() | |
629 | |
630 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): | |
631 if iq_elt['type'] == 'error': | |
632 warning(_("Can't activate the proxy stream")) | |
633 return | |
634 else: | |
635 start_transfer_cb(file_obj) | |
577 | 636 |
578 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): | 637 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): |
579 """Called when a bytestream is imminent | 638 """Called when a bytestream is imminent |
580 @param from_jid: jid of the sender | 639 @param from_jid: jid of the sender |
581 @param sid: Stream id | 640 @param sid: Stream id |
678 implements(iwokkel.IDisco) | 737 implements(iwokkel.IDisco) |
679 | 738 |
680 def __init__(self, plugin_parent): | 739 def __init__(self, plugin_parent): |
681 self.plugin_parent = plugin_parent | 740 self.plugin_parent = plugin_parent |
682 self.host = plugin_parent.host | 741 self.host = plugin_parent.host |
742 | |
743 def _proxyDataResult(self, iq_elt): | |
744 """Called with the informations about proxy according to XEP-0065 #4 | |
745 Params should be filled with these infos""" | |
746 if iq_elt["type"] == "error": | |
747 warning(_("Can't determine proxy informations")) | |
748 return | |
749 query_elt = iq_elt.firstChildElement() | |
750 if query_elt.name != "query": | |
751 warning(_("Bad answer received from proxy")) | |
752 return | |
753 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) | |
754 if not streamhost_elts: | |
755 warning(_("No streamhost found in stream query")) | |
756 return | |
757 if len(streamhost_elts) != 1: | |
758 warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one")) | |
759 streamhost_elt = streamhost_elts[0] | |
760 proxy = self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid",""), "File Transfer", self.parent.profile) | |
761 proxy = self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host",""), "File Transfer", self.parent.profile) | |
762 proxy = self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port",""), "File Transfer", self.parent.profile) | |
763 | |
683 | 764 |
684 def connectionInitialized(self): | 765 def connectionInitialized(self): |
766 def after_init(ignore): | |
767 proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile) | |
768 if not proxy_ent: | |
769 debug(_("No proxy found on this server")) | |
770 return | |
771 iq_elt = client.IQ(self.parent.xmlstream,'get') | |
772 iq_elt["to"] = proxy_ent.full() | |
773 query_elt = iq_elt.addElement('query', NS_BS) | |
774 iq_elt.addCallback(self._proxyDataResult) | |
775 iq_elt.send() | |
776 | |
777 | |
685 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile) | 778 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile) |
779 proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key = self.parent.profile) | |
780 if not proxy: | |
781 self.parent.client_initialized.addCallback(after_init) | |
782 | |
686 | 783 |
687 | 784 |
688 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 785 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
689 return [disco.DiscoFeature(NS_BS)] | 786 return [disco.DiscoFeature(NS_BS)] |
690 | 787 |