comparison src/plugins/plugin_xep_0065.py @ 398:cb0285372818

File transfer: - proxy managed in XEP-0065 (Socks5 bytestream) - bug: fixed a bad id used during stream negociation
author Goffi <goffi@goffi.org>
date Wed, 05 Oct 2011 16:49:57 +0200
parents 8f3551ceee17
children c513328ade9d
comparison
equal deleted inserted replaced
397:ccfd69d090c3 398:cb0285372818
294 # Ensure reply is OK 294 # Ensure reply is OK
295 if rep != REPLY_SUCCESS: 295 if rep != REPLY_SUCCESS:
296 self.loseConnection() 296 self.loseConnection()
297 return 297 return
298 298
299 self.state = STATE_TARGET_READY 299 if self.factory.proxy:
300 self.factory.activateCb(self.sid, self.factory.iq_id) 300 self.state = STATE_READY
301 self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer)
302 else:
303 self.state = STATE_TARGET_READY
304 self.factory.activateCb(self.sid, self.factory.iq_id)
301 305
302 except struct.error, why: 306 except struct.error, why:
303 return None 307 return None
304 308
305 def connectionMade(self): 309 def connectionMade(self):
403 debug (_("Socks 5 server connection lost (reason: %s)"), reason) 407 debug (_("Socks 5 server connection lost (reason: %s)"), reason)
404 408
405 class Socks5ClientFactory(protocol.ClientFactory): 409 class Socks5ClientFactory(protocol.ClientFactory):
406 protocol = SOCKSv5 410 protocol = SOCKSv5
407 411
408 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb): 412 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False):
413 """Init the Client Factory
414 @param current_stream: current streams data
415 @param sid: Session ID
416 @param iq_id: iq id used to initiate the stream
417 @param activateCb: method to call to activate the stream
418 @param finishedCb: method to call when the stream session is finished
419 @param proxy: True if we are connecting throught a proxy (and we are a requester)"""
409 self.data = current_stream[sid] 420 self.data = current_stream[sid]
410 self.sid = sid 421 self.sid = sid
411 self.iq_id = iq_id 422 self.iq_id = iq_id
412 self.activateCb = activateCb 423 self.activateCb = activateCb
413 self.finishedCb = finishedCb 424 self.finishedCb = finishedCb
425 self.proxy = proxy
414 426
415 def startedConnecting(self, connector): 427 def startedConnecting(self, connector):
416 debug (_("Socks 5 client connection started")) 428 debug (_("Socks 5 client connection started"))
417 429
418 def clientConnectionLost(self, connector, reason): 430 def clientConnectionLost(self, connector, reason):
430 <category name="File Transfer"> 442 <category name="File Transfer">
431 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> 443 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" />
432 <param name="Port" value="28915" type="string" /> 444 <param name="Port" value="28915" type="string" />
433 </category> 445 </category>
434 </general> 446 </general>
447 <individual>
448 <category name="File Transfer">
449 <param name="Proxy" value="" type="string" />
450 <param name="Proxy host" value="" type="string" />
451 <param name="Proxy port" value="" type="string" />
452 </category>
453 </individual>
435 </params> 454 </params>
436 """ 455 """
437 456
438 def __init__(self, host): 457 def __init__(self, host):
439 info(_("Plugin XEP_0065 initialization")) 458 info(_("Plugin XEP_0065 initialization"))
500 519
501 if success: 520 if success:
502 success_cb(sid, file_obj, NS_BS) 521 success_cb(sid, file_obj, NS_BS)
503 else: 522 else:
504 failure_cb(sid, file_obj, NS_BS, failure_reason) 523 failure_cb(sid, file_obj, NS_BS, failure_reason)
505
506 def setData(self, data, id):
507 self.data = data
508 self.transfer_id = id
509
510 def sendFile(self, id, filepath, size):
511 #lauching socks5 requester
512 debug(_("Launching socks5 requester"))
513 self.server_factory.protocol.mode = "requester"
514 self.server_factory.protocol.filepath = filepath
515 self.server_factory.protocol.filesize = size
516 self.server_factory.protocol.transfer_id = id
517
518 524
519 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): 525 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'):
520 """Launch the stream workflow 526 """Launch the stream workflow
521 @param file_obj: file_obj to send 527 @param file_obj: file_obj to send
522 @param to_jid: JID of the recipient 528 @param to_jid: JID of the recipient
527 @param profile: %(doc_profile)s""" 533 @param profile: %(doc_profile)s"""
528 if length != None: 534 if length != None:
529 error(_('stream length not managed yet')) 535 error(_('stream length not managed yet'))
530 return; 536 return;
531 profile_jid, xmlstream = self.host.getJidNStream(profile) 537 profile_jid, xmlstream = self.host.getJidNStream(profile)
538 if not profile_jid or not xmlstream:
539 error(_("Unknown profile, this should not happen"))
540 return;
532 data = self.current_stream[sid] = {} 541 data = self.current_stream[sid] = {}
542 data["profile"] = profile
533 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) 543 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
534 data["file_obj"] = file_obj 544 data["file_obj"] = file_obj
545 data["from"] = profile_jid
535 data["to"] = to_jid 546 data["to"] = to_jid
536 data["success_cb"] = successCb 547 data["success_cb"] = successCb
537 data["failure_cb"] = failureCb 548 data["failure_cb"] = failureCb
538 data["xmlstream"] = xmlstream 549 data["xmlstream"] = xmlstream
539 data["hash"] = calculateHash(profile_jid, to_jid, sid) 550 data["hash"] = calculateHash(profile_jid, to_jid, sid)
545 iq_elt["from"] = profile_jid.full() 556 iq_elt["from"] = profile_jid.full()
546 iq_elt["to"] = to_jid.full() 557 iq_elt["to"] = to_jid.full()
547 query_elt = iq_elt.addElement('query', NS_BS) 558 query_elt = iq_elt.addElement('query', NS_BS)
548 query_elt['mode'] = 'tcp' 559 query_elt['mode'] = 'tcp'
549 query_elt['sid'] = sid 560 query_elt['sid'] = sid
561 #first streamhost: direct connection
550 streamhost = query_elt.addElement('streamhost') 562 streamhost = query_elt.addElement('streamhost')
551 streamhost['host'] = "127.0.0.1" #self.host.memory.getParamA("IP", "File Transfer") 563 streamhost['host'] = self.host.memory.getParamA("IP", "File Transfer")
552 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") 564 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer")
553 streamhost['jid'] = profile_jid.full() 565 streamhost['jid'] = profile_jid.full()
566
567 #second streamhost: mediated connection, using proxy
568 streamhost = query_elt.addElement('streamhost')
569 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
570 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
571 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
572
554 iq_elt.addCallback(self.iqResult, sid) 573 iq_elt.addCallback(self.iqResult, sid)
555 iq_elt.send() 574 iq_elt.send()
556 575
557 def iqResult(self, sid, iq_elt): 576 def iqResult(self, sid, iq_elt):
558 """Called when the result of open iq is received""" 577 """Called when the result of open iq is received"""
559 if iq_elt["type"] == "error": 578 if iq_elt["type"] == "error":
560 warning(_("Transfer failed")) 579 warning(_("Transfer failed"))
561 return 580 return
562 581
563 try: 582 try:
564 data = self.current_stream[sid] 583 data = self.current_stream[sid]
565 callback = data["start_transfer_cb"]
566 file_obj = data["file_obj"] 584 file_obj = data["file_obj"]
567 timer = data["timer"] 585 timer = data["timer"]
586 profile = data["profile"]
568 except KeyError: 587 except KeyError:
569 error(_("Internal error, can't do transfer")) 588 error(_("Internal error, can't do transfer"))
570 return 589 return
571 590
572 if timer.active(): 591 if timer.active():
573 timer.cancel() 592 timer.cancel()
574 593
575 callback(file_obj) 594 profile_jid, xmlstream = self.host.getJidNStream(profile)
576 595 query_elt = iq_elt.firstChildElement()
596 streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements())
597 if not streamhost_elts:
598 warning(_("No streamhost found in stream query"))
599 return
600
601 streamhost_jid = streamhost_elts[0]['jid']
602 if streamhost_jid != profile_jid.full():
603 debug(_("A proxy server is used"))
604 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
605 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
606 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
607 if proxy_jid != streamhost_jid:
608 warning(_("Proxy jid is not the same as in parameters, this should not happen"))
609 return
610 factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True)
611 reactor.connectTCP(proxy_host, int(proxy_port), factory)
612 else:
613 data["start_transfer_cb"](file_obj) #We now activate the stream
614
615 def activateProxyStream(self, sid, iq_id, start_transfer_cb):
616 debug(_("activating stream"))
617 data = self.current_stream[sid]
618 profile = data['profile']
619 profile_jid, xmlstream = self.host.getJidNStream(profile)
620
621 iq_elt = client.IQ(xmlstream,'set')
622 iq_elt["from"] = profile_jid.full()
623 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
624 query_elt = iq_elt.addElement('query', NS_BS)
625 query_elt['sid'] = sid
626 query_elt.addElement('activate', content=data['to'].full())
627 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj'])
628 iq_elt.send()
629
630 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt):
631 if iq_elt['type'] == 'error':
632 warning(_("Can't activate the proxy stream"))
633 return
634 else:
635 start_transfer_cb(file_obj)
577 636
578 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): 637 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb):
579 """Called when a bytestream is imminent 638 """Called when a bytestream is imminent
580 @param from_jid: jid of the sender 639 @param from_jid: jid of the sender
581 @param sid: Stream id 640 @param sid: Stream id
678 implements(iwokkel.IDisco) 737 implements(iwokkel.IDisco)
679 738
680 def __init__(self, plugin_parent): 739 def __init__(self, plugin_parent):
681 self.plugin_parent = plugin_parent 740 self.plugin_parent = plugin_parent
682 self.host = plugin_parent.host 741 self.host = plugin_parent.host
742
743 def _proxyDataResult(self, iq_elt):
744 """Called with the informations about proxy according to XEP-0065 #4
745 Params should be filled with these infos"""
746 if iq_elt["type"] == "error":
747 warning(_("Can't determine proxy informations"))
748 return
749 query_elt = iq_elt.firstChildElement()
750 if query_elt.name != "query":
751 warning(_("Bad answer received from proxy"))
752 return
753 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
754 if not streamhost_elts:
755 warning(_("No streamhost found in stream query"))
756 return
757 if len(streamhost_elts) != 1:
758 warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one"))
759 streamhost_elt = streamhost_elts[0]
760 proxy = self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid",""), "File Transfer", self.parent.profile)
761 proxy = self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host",""), "File Transfer", self.parent.profile)
762 proxy = self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port",""), "File Transfer", self.parent.profile)
763
683 764
684 def connectionInitialized(self): 765 def connectionInitialized(self):
766 def after_init(ignore):
767 proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile)
768 if not proxy_ent:
769 debug(_("No proxy found on this server"))
770 return
771 iq_elt = client.IQ(self.parent.xmlstream,'get')
772 iq_elt["to"] = proxy_ent.full()
773 query_elt = iq_elt.addElement('query', NS_BS)
774 iq_elt.addCallback(self._proxyDataResult)
775 iq_elt.send()
776
777
685 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile) 778 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile)
779 proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key = self.parent.profile)
780 if not proxy:
781 self.parent.client_initialized.addCallback(after_init)
782
686 783
687 784
688 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 785 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
689 return [disco.DiscoFeature(NS_BS)] 786 return [disco.DiscoFeature(NS_BS)]
690 787