comparison src/plugins/plugin_xep_0065.py @ 538:2c4016921403

core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles - added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress - core, frontends: fixed calls/signals according to new bridge API - user of proper profile namespace for progression indicators and dialogs - memory: getParam* now return bool when param type is bool - memory: added getStringParam* to return string instead of typed value - core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles - plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author Goffi <goffi@goffi.org>
date Sat, 10 Nov 2012 16:38:16 +0100
parents a31abb97310d
children ca13633d3b6b
comparison
equal deleted inserted replaced
537:28cddc96c4ed 538:2c4016921403
56 """ 56 """
57 57
58 from logging import debug, info, warning, error 58 from logging import debug, info, warning, error
59 from twisted.internet import protocol, reactor 59 from twisted.internet import protocol, reactor
60 from twisted.internet import error as jab_error 60 from twisted.internet import error as jab_error
61 from twisted.words.protocols.jabber import client, jid 61 from twisted.words.protocols.jabber import jid, client as jabber_client
62 from twisted.protocols.basic import FileSender 62 from twisted.protocols.basic import FileSender
63 from twisted.words.xish import domish 63 from twisted.words.xish import domish
64 from twisted.web.client import getPage 64 from twisted.web.client import getPage
65 from sat.core.exceptions import ProfileNotInCacheError
65 import struct 66 import struct
66 import urllib 67 import hashlib
67 import hashlib, pdb
68 68
69 from zope.interface import implements 69 from zope.interface import implements
70 70
71 try: 71 try:
72 from twisted.words.protocols.xmlstream import XMPPHandler 72 from twisted.words.protocols.xmlstream import XMPPHandler
296 self.loseConnection() 296 self.loseConnection()
297 return 297 return
298 298
299 if self.factory.proxy: 299 if self.factory.proxy:
300 self.state = STATE_READY 300 self.state = STATE_READY
301 self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer) 301 self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile)
302 else: 302 else:
303 self.state = STATE_TARGET_READY 303 self.state = STATE_TARGET_READY
304 self.factory.activateCb(self.sid, self.factory.iq_id) 304 self.factory.activateCb(self.sid, self.factory.iq_id, self.profile)
305 305
306 except struct.error, why: 306 except struct.error, why:
307 return None 307 return None
308 308
309 def connectionMade(self): 309 def connectionMade(self):
310 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") 310 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target")
311 311
312 if isinstance(self.factory, Socks5ClientFactory): 312 if isinstance(self.factory, Socks5ClientFactory):
313 self.sid = self.factory.sid 313 self.sid = self.factory.sid
314 self.profile = self.factory.profile
314 self.data = self.factory.data 315 self.data = self.factory.data
315 self.state = STATE_TARGET_INITIAL 316 self.state = STATE_TARGET_INITIAL
316 self._startNegotiation() 317 self._startNegotiation()
317 318
318 def connectRequested(self, addr, port): 319 def connectRequested(self, addr, port):
319 debug("connectRequested") 320 debug("connectRequested")
320 321
321 # Check that this session if expected 322 # Check that this session is expected
322 if not self.factory.hash_sid_map.has_key(addr): 323 if not self.factory.hash_sid_map.has_key(addr):
323 #no: we refuse it 324 #no: we refuse it
324 self.sendErrorReply(socks5.REPLY_CONN_REFUSED) 325 self.sendErrorReply(REPLY_CONN_REFUSED)
325 return 326 return
326 self.sid = self.factory.hash_sid_map[addr] 327 self.sid, self.profile = self.factory.hash_sid_map[addr]
327 self.factory.current_stream[self.sid]["start_transfer_cb"] = self.startTransfer 328 client = self.factory.host.getClient(self.profile)
329 if not client:
330 raise ProfileNotInCacheError
331 client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
328 self.connectCompleted(addr, 0) 332 self.connectCompleted(addr, 0)
329 self.transport.stopReading() 333 self.transport.stopReading()
330 334
331 def startTransfer(self, file_obj): 335 def startTransfer(self, file_obj):
332 """Callback called when the result iq is received""" 336 """Callback called when the result iq is received"""
334 d.addCallback(self.fileTransfered) 338 d.addCallback(self.fileTransfered)
335 339
336 def fileTransfered(self, d): 340 def fileTransfered(self, d):
337 info(_("File transfer completed, closing connection")) 341 info(_("File transfer completed, closing connection"))
338 self.transport.loseConnection() 342 self.transport.loseConnection()
339 self.factory.finishedCb(self.sid, True) 343 self.factory.finishedCb(self.sid, True, self.profile)
340 344
341 def connectCompleted(self, remotehost, remoteport): 345 def connectCompleted(self, remotehost, remoteport):
342 debug("connectCompleted") 346 debug("connectCompleted")
343 if self.addressType == ADDR_IPV4: 347 if self.addressType == ADDR_IPV4:
344 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) 348 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
393 397
394 398
395 class Socks5ServerFactory(protocol.ServerFactory): 399 class Socks5ServerFactory(protocol.ServerFactory):
396 protocol = SOCKSv5 400 protocol = SOCKSv5
397 401
398 def __init__(self, current_stream, hash_sid_map, finishedCb): 402 def __init__(self, host, hash_sid_map, finishedCb):
399 self.current_stream = current_stream 403 self.host = host
400 self.hash_sid_map = hash_sid_map 404 self.hash_sid_map = hash_sid_map
401 self.finishedCb = finishedCb 405 self.finishedCb = finishedCb
402 406
403 def startedConnecting(self, connector): 407 def startedConnecting(self, connector):
404 debug (_("Socks 5 server connection started")) 408 debug (_("Socks 5 server connection started"))
407 debug (_("Socks 5 server connection lost (reason: %s)"), reason) 411 debug (_("Socks 5 server connection lost (reason: %s)"), reason)
408 412
409 class Socks5ClientFactory(protocol.ClientFactory): 413 class Socks5ClientFactory(protocol.ClientFactory):
410 protocol = SOCKSv5 414 protocol = SOCKSv5
411 415
412 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False): 416 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None):
413 """Init the Client Factory 417 """Init the Client Factory
414 @param current_stream: current streams data 418 @param current_stream: current streams data
415 @param sid: Session ID 419 @param sid: Session ID
416 @param iq_id: iq id used to initiate the stream 420 @param iq_id: iq id used to initiate the stream
417 @param activateCb: method to call to activate the stream 421 @param activateCb: method to call to activate the stream
418 @param finishedCb: method to call when the stream session is finished 422 @param finishedCb: method to call when the stream session is finished
419 @param proxy: True if we are connecting throught a proxy (and we are a requester)""" 423 @param proxy: True if we are connecting throught a proxy (and we are a requester)
424 @param profile: %(doc_profile)s"""
425 assert(profile)
420 self.data = current_stream[sid] 426 self.data = current_stream[sid]
421 self.sid = sid 427 self.sid = sid
422 self.iq_id = iq_id 428 self.iq_id = iq_id
423 self.activateCb = activateCb 429 self.activateCb = activateCb
424 self.finishedCb = finishedCb 430 self.finishedCb = finishedCb
425 self.proxy = proxy 431 self.proxy = proxy
432 self.profile = profile
426 433
427 def startedConnecting(self, connector): 434 def startedConnecting(self, connector):
428 debug (_("Socks 5 client connection started")) 435 debug (_("Socks 5 client connection started"))
429 436
430 def clientConnectionLost(self, connector, reason): 437 def clientConnectionLost(self, connector, reason):
431 debug (_("Socks 5 client connection lost")) 438 debug (_("Socks 5 client connection lost (reason: %s)"), reason)
432 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone) #TODO: really check if the state is actually successful 439 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone, self.profile) #TODO: really check if the state is actually successful
433 440
434 441
435 class XEP_0065(): 442 class XEP_0065():
436 443
437 NAMESPACE = NS_BS 444 NAMESPACE = NS_BS
456 463
457 def __init__(self, host): 464 def __init__(self, host):
458 info(_("Plugin XEP_0065 initialization")) 465 info(_("Plugin XEP_0065 initialization"))
459 466
460 #session data 467 #session data
461 self.current_stream = {} #key: stream_id, value: data(dict) 468 self.hash_sid_map = {} #key: hash of the transfer session, value: (session id, profile)
462 self.hash_sid_map = {} #key: hash of the transfer session, value: session id
463 469
464 self.host = host 470 self.host = host
465 debug(_("registering")) 471 debug(_("registering"))
466 self.server_factory = Socks5ServerFactory(self.current_stream, self.hash_sid_map, self._killId) 472 self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile))
467 473
468 #parameters 474 #parameters
469 host.memory.importParams(XEP_0065.params) 475 host.memory.importParams(XEP_0065.params)
470 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) 476 host.memory.setDefault("IP", "File Transfer", self.getExternalIP)
471 port = int(self.host.memory.getParamA("Port", "File Transfer")) 477 port = int(self.host.memory.getParamA("Port", "File Transfer"))
474 reactor.listenTCP(port, self.server_factory) 480 reactor.listenTCP(port, self.server_factory)
475 481
476 def getHandler(self, profile): 482 def getHandler(self, profile):
477 return XEP_0065_handler(self) 483 return XEP_0065_handler(self)
478 484
485 def profileConnected(self, profile):
486 client = self.host.getClient(profile)
487 if not client:
488 raise ProfileNotInCacheError
489 client.xep_0065_current_stream = {} #key: stream_id, value: data(dict)
490
479 def getExternalIP(self): 491 def getExternalIP(self):
480 """Return IP visible from outside, by asking to a website""" 492 """Return IP visible from outside, by asking to a website"""
481 return getPage("http://www.goffi.org/sat_tools/get_ip.php") 493 return getPage("http://www.goffi.org/sat_tools/get_ip.php")
482 494
483 def getProgress(self, sid, data): 495 def getProgress(self, sid, data, profile):
484 """Fill data with position of current transfer""" 496 """Fill data with position of current transfer"""
497 client = self.host.getClient(profile)
498 if not client:
499 raise ProfileNotInCacheError
485 try: 500 try:
486 file_obj = self.current_stream[sid]["file_obj"] 501 file_obj = client.xep_0065_current_stream[sid]["file_obj"]
487 data["position"] = str(file_obj.tell()) 502 data["position"] = str(file_obj.tell())
488 data["size"] = str(self.current_stream[sid]["size"]) 503 data["size"] = str(client.xep_0065_current_stream[sid]["size"])
489 except: 504 except:
490 pass 505 pass
491 506
492 def _timeOut(self, sid): 507 def _timeOut(self, sid, profile):
493 """Delecte current_stream id, called after timeout 508 """Delecte current_stream id, called after timeout
494 @param id: id of self.current_stream""" 509 @param id: id of client.xep_0065_current_stream"""
495 info(_("Socks5 Bytestream: TimeOut reached for id %s") % sid); 510 info(_("Socks5 Bytestream: TimeOut reached for id %s [%s]") % (sid, profile));
496 self._killId(sid, False, "TIMEOUT") 511 self._killId(sid, False, "TIMEOUT", profile)
497 512
498 def _killId(self, sid, success=False, failure_reason="UNKNOWN"): 513 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None):
499 """Delete an current_stream id, clean up associated observers 514 """Delete an current_stream id, clean up associated observers
500 @param sid: id of self.current_stream""" 515 @param sid: id of client.xep_0065_current_stream"""
501 if not self.current_stream.has_key(sid): 516 assert(profile)
517 client = self.host.getClient(profile)
518 if not client:
519 warning(_("Client no more in cache"))
520 return
521 if not client.xep_0065_current_stream.has_key(sid):
502 warning(_("kill id called on a non existant id")) 522 warning(_("kill id called on a non existant id"))
503 return 523 return
504 if self.current_stream[sid].has_key("observer_cb"): 524 if client.xep_0065_current_stream[sid].has_key("observer_cb"):
505 xmlstream = self.current_stream[sid]["xmlstream"] 525 xmlstream = client.xep_0065_current_stream[sid]["xmlstream"]
506 xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) 526 xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"])
507 if self.current_stream[sid]['timer'].active(): 527 if client.xep_0065_current_stream[sid]['timer'].active():
508 self.current_stream[sid]['timer'].cancel() 528 client.xep_0065_current_stream[sid]['timer'].cancel()
509 if self.current_stream[sid].has_key("size"): 529 if client.xep_0065_current_stream[sid].has_key("size"):
510 self.host.removeProgressCB(sid) 530 self.host.removeProgressCB(sid, profile)
511 531
512 file_obj = self.current_stream[sid]['file_obj'] 532 file_obj = client.xep_0065_current_stream[sid]['file_obj']
513 success_cb = self.current_stream[sid]['success_cb'] 533 success_cb = client.xep_0065_current_stream[sid]['success_cb']
514 failure_cb = self.current_stream[sid]['failure_cb'] 534 failure_cb = client.xep_0065_current_stream[sid]['failure_cb']
515 535
516 del self.current_stream[sid] 536 session_hash = client.xep_0065_current_stream[sid].get('hash')
517 if self.hash_sid_map.has_key(sid): 537 del client.xep_0065_current_stream[sid]
518 del self.hash_sid_map[sid] 538 if session_hash in self.hash_sid_map:
539 #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc).
540 del self.hash_sid_map[session_hash]
519 541
520 if success: 542 if success:
521 success_cb(sid, file_obj, NS_BS) 543 success_cb(sid, file_obj, NS_BS, profile)
522 else: 544 else:
523 failure_cb(sid, file_obj, NS_BS, failure_reason) 545 failure_cb(sid, file_obj, NS_BS, failure_reason, profile)
524 546
525 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): 547 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None):
526 """Launch the stream workflow 548 """Launch the stream workflow
527 @param file_obj: file_obj to send 549 @param file_obj: file_obj to send
528 @param to_jid: JID of the recipient 550 @param to_jid: JID of the recipient
529 @param sid: Stream session id 551 @param sid: Stream session id
530 @param length: number of byte to send, or None to send until the end 552 @param length: number of byte to send, or None to send until the end
531 @param successCb: method to call when stream successfuly finished 553 @param successCb: method to call when stream successfuly finished
532 @param failureCb: method to call when something goes wrong 554 @param failureCb: method to call when something goes wrong
533 @param profile: %(doc_profile)s""" 555 @param profile: %(doc_profile)s"""
556 assert(profile)
557 client = self.host.getClient(profile)
558 if not client:
559 error(_("Unknown profile, this should not happen"))
560 raise ProfileNotInCacheError
561
534 if length != None: 562 if length != None:
535 error(_('stream length not managed yet')) 563 error(_('stream length not managed yet'))
536 return; 564 return;
537 profile_jid, xmlstream = self.host.getJidNStream(profile) 565
538 if not profile_jid or not xmlstream: 566 profile_jid = client.jid
539 error(_("Unknown profile, this should not happen")) 567 xmlstream = client.xmlstream
540 return; 568
541 data = self.current_stream[sid] = {} 569 data = client.xep_0065_current_stream[sid] = {}
542 data["profile"] = profile 570 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile)
543 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
544 data["file_obj"] = file_obj 571 data["file_obj"] = file_obj
545 data["from"] = profile_jid 572 data["from"] = profile_jid
546 data["to"] = to_jid 573 data["to"] = to_jid
547 data["success_cb"] = successCb 574 data["success_cb"] = successCb
548 data["failure_cb"] = failureCb 575 data["failure_cb"] = failureCb
549 data["xmlstream"] = xmlstream 576 data["xmlstream"] = xmlstream
550 data["hash"] = calculateHash(profile_jid, to_jid, sid) 577 data["hash"] = calculateHash(profile_jid, to_jid, sid)
551 self.hash_sid_map[data["hash"]] = sid 578 self.hash_sid_map[data["hash"]] = (sid, profile)
552 if size: 579 if size:
553 data["size"] = size 580 data["size"] = size
554 self.host.registerProgressCB(sid, self.getProgress) 581 self.host.registerProgressCB(sid, self.getProgress, profile)
555 iq_elt = client.IQ(xmlstream,'set') 582 iq_elt = jabber_client.IQ(xmlstream,'set')
556 iq_elt["from"] = profile_jid.full() 583 iq_elt["from"] = profile_jid.full()
557 iq_elt["to"] = to_jid.full() 584 iq_elt["to"] = to_jid.full()
558 query_elt = iq_elt.addElement('query', NS_BS) 585 query_elt = iq_elt.addElement('query', NS_BS)
559 query_elt['mode'] = 'tcp' 586 query_elt['mode'] = 'tcp'
560 query_elt['sid'] = sid 587 query_elt['sid'] = sid
568 streamhost = query_elt.addElement('streamhost') 595 streamhost = query_elt.addElement('streamhost')
569 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) 596 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile)
570 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) 597 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
571 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) 598 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
572 599
573 iq_elt.addCallback(self.iqResult, sid) 600 iq_elt.addCallback(self.iqResult, sid, profile)
574 iq_elt.send() 601 iq_elt.send()
575 602
576 def iqResult(self, sid, iq_elt): 603 def iqResult(self, sid, profile, iq_elt):
577 """Called when the result of open iq is received""" 604 """Called when the result of open iq is received"""
578 if iq_elt["type"] == "error": 605 if iq_elt["type"] == "error":
579 warning(_("Transfer failed")) 606 warning(_("Transfer failed"))
580 return 607 return
581 608 client = self.host.getClient(profile)
609 if not client:
610 raise ProfileNotInCacheError
582 try: 611 try:
583 data = self.current_stream[sid] 612 data = client.xep_0065_current_stream[sid]
584 file_obj = data["file_obj"] 613 file_obj = data["file_obj"]
585 timer = data["timer"] 614 timer = data["timer"]
586 profile = data["profile"]
587 except KeyError: 615 except KeyError:
588 error(_("Internal error, can't do transfer")) 616 error(_("Internal error, can't do transfer"))
589 return 617 return
590 618
591 if timer.active(): 619 if timer.active():
605 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) 633 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile)
606 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) 634 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
607 if proxy_jid != streamhost_jid: 635 if proxy_jid != streamhost_jid:
608 warning(_("Proxy jid is not the same as in parameters, this should not happen")) 636 warning(_("Proxy jid is not the same as in parameters, this should not happen"))
609 return 637 return
610 factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True) 638 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile)
611 reactor.connectTCP(proxy_host, int(proxy_port), factory) 639 reactor.connectTCP(proxy_host, int(proxy_port), factory)
612 else: 640 else:
613 data["start_transfer_cb"](file_obj) #We now activate the stream 641 data["start_transfer_cb"](file_obj) #We now activate the stream
614 642
615 def activateProxyStream(self, sid, iq_id, start_transfer_cb): 643 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile):
616 debug(_("activating stream")) 644 debug(_("activating stream"))
617 data = self.current_stream[sid] 645 client = self.host.getClient(profile)
618 profile = data['profile'] 646 if not client:
647 raise ProfileNotInCacheError
648 data = client.xep_0065_current_stream[sid]
619 profile_jid, xmlstream = self.host.getJidNStream(profile) 649 profile_jid, xmlstream = self.host.getJidNStream(profile)
620 650
621 iq_elt = client.IQ(xmlstream,'set') 651 iq_elt = client.IQ(xmlstream,'set')
622 iq_elt["from"] = profile_jid.full() 652 iq_elt["from"] = profile_jid.full()
623 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) 653 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile)
632 warning(_("Can't activate the proxy stream")) 662 warning(_("Can't activate the proxy stream"))
633 return 663 return
634 else: 664 else:
635 start_transfer_cb(file_obj) 665 start_transfer_cb(file_obj)
636 666
637 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): 667 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile):
638 """Called when a bytestream is imminent 668 """Called when a bytestream is imminent
639 @param from_jid: jid of the sender 669 @param from_jid: jid of the sender
640 @param sid: Stream id 670 @param sid: Stream id
641 @param file_obj: File object where data will be written 671 @param file_obj: File object where data will be written
642 @param size: full size of the data, or None if unknown 672 @param size: full size of the data, or None if unknown
643 @param success_cb: method to call when successfuly finished 673 @param success_cb: method to call when successfuly finished
644 @param failure_cb: method to call when something goes wrong""" 674 @param failure_cb: method to call when something goes wrong
645 data = self.current_stream[sid] = {} 675 @param profile: %(doc_profile)s"""
676 client = self.host.getClient(profile)
677 if not client:
678 raise ProfileNotInCacheError
679 data = client.xep_0065_current_stream[sid] = {}
646 data["from"] = from_jid 680 data["from"] = from_jid
647 data["file_obj"] = file_obj 681 data["file_obj"] = file_obj
648 data["seq"] = -1 682 data["seq"] = -1
649 if size: 683 if size:
650 data["size"] = size 684 data["size"] = size
651 self.host.registerProgressCB(sid, self.getProgress) 685 self.host.registerProgressCB(sid, self.getProgress, profile)
652 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) 686 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile)
653 data["success_cb"] = success_cb 687 data["success_cb"] = success_cb
654 data["failure_cb"] = failure_cb 688 data["failure_cb"] = failure_cb
655 689
656 690
657 def streamQuery(self, iq_elt, profile): 691 def streamQuery(self, iq_elt, profile):
658 """Get file using byte stream""" 692 """Get file using byte stream"""
659 debug(_("BS stream query")) 693 debug(_("BS stream query"))
660 profile_jid, xmlstream = self.host.getJidNStream(profile) 694 client = self.host.getClient(profile)
695
696 if not client:
697 raise ProfileNotInCacheError
698
699 xmlstream = client.xmlstream
700
661 iq_elt.handled = True 701 iq_elt.handled = True
662 query_elt = iq_elt.firstChildElement() 702 query_elt = iq_elt.firstChildElement()
663 sid = query_elt.getAttribute("sid") 703 sid = query_elt.getAttribute("sid")
664 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) 704 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
665 705
666 if not sid in self.current_stream: 706 if not sid in client.xep_0065_current_stream:
667 warning(_("Ignoring unexpected BS transfer: %s" % sid)) 707 warning(_("Ignoring unexpected BS transfer: %s" % sid))
668 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) 708 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream)
669 return 709 return
670 710
671 self.current_stream[sid]['timer'].cancel() 711 client.xep_0065_current_stream[sid]['timer'].cancel()
672 self.current_stream[sid]["to"] = jid.JID(iq_elt["to"]) 712 client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"])
673 self.current_stream[sid]["xmlstream"] = xmlstream 713 client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream
674 714
675 if not streamhost_elts: 715 if not streamhost_elts:
676 warning(_("No streamhost found in stream query %s" % sid)) 716 warning(_("No streamhost found in stream query %s" % sid))
677 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) 717 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
678 return 718 return
684 if not sh_host or not sh_port or not sh_jid: 724 if not sh_host or not sh_port or not sh_jid:
685 warning(_("incomplete streamhost element")) 725 warning(_("incomplete streamhost element"))
686 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) 726 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
687 return 727 return
688 728
689 self.current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) 729 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
690 730
691 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port}) 731 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port})
692 factory = Socks5ClientFactory(self.current_stream, sid, iq_elt["id"], self.activateStream, self._killId) 732 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile)
693 reactor.connectTCP(sh_host, int(sh_port), factory) 733 reactor.connectTCP(sh_host, int(sh_port), factory)
694 734
695 def activateStream(self, sid, iq_id): 735 def activateStream(self, sid, iq_id, profile):
736 client = self.host.getClient(profile)
737 if not client:
738 raise ProfileNotInCacheError
696 debug(_("activating stream")) 739 debug(_("activating stream"))
697 result = domish.Element((None, 'iq')) 740 result = domish.Element((None, 'iq'))
698 data = self.current_stream[sid] 741 data = client.xep_0065_current_stream[sid]
699 result['type'] = 'result' 742 result['type'] = 'result'
700 result['id'] = iq_id 743 result['id'] = iq_id
701 result['from'] = data["to"].full() 744 result['from'] = data["to"].full()
702 result['to'] = data["from"].full() 745 result['to'] = data["from"].full()
703 query = result.addElement('query', NS_BS) 746 query = result.addElement('query', NS_BS)
767 def after_init(ignore): 810 def after_init(ignore):
768 proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile) 811 proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile)
769 if not proxy_ent: 812 if not proxy_ent:
770 debug(_("No proxy found on this server")) 813 debug(_("No proxy found on this server"))
771 return 814 return
772 iq_elt = client.IQ(self.parent.xmlstream,'get') 815 iq_elt = jabber_client.IQ(self.parent.xmlstream,'get')
773 iq_elt["to"] = proxy_ent.full() 816 iq_elt["to"] = proxy_ent.full()
774 query_elt = iq_elt.addElement('query', NS_BS) 817 query_elt = iq_elt.addElement('query', NS_BS)
775 iq_elt.addCallback(self._proxyDataResult) 818 iq_elt.addCallback(self._proxyDataResult)
776 iq_elt.send() 819 iq_elt.send()
777 820