Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 538:2c4016921403
core, frontends, bridgen plugins: fixed methods which were unproperly managing multi-profiles
- added profile argument to askConfirmation, actionResult, actionResultExt, entityDataUpdated, confirmationAnswer, getProgress
- core, frontends: fixed calls/signals according to new bridge API
- user of proper profile namespace for progression indicators and dialogs
- memory: getParam* now return bool when param type is bool
- memory: added getStringParam* to return string instead of typed value
- core, memory, storage, quick_frontend: getHistory now manage properly multi-profiles
- plugins XEP-0047, XEP-0054, XEP-0065, XEP-0077, XEP-0096; multi-profiles proper handling
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 10 Nov 2012 16:38:16 +0100 |
parents | a31abb97310d |
children | ca13633d3b6b |
comparison
equal
deleted
inserted
replaced
537:28cddc96c4ed | 538:2c4016921403 |
---|---|
56 """ | 56 """ |
57 | 57 |
58 from logging import debug, info, warning, error | 58 from logging import debug, info, warning, error |
59 from twisted.internet import protocol, reactor | 59 from twisted.internet import protocol, reactor |
60 from twisted.internet import error as jab_error | 60 from twisted.internet import error as jab_error |
61 from twisted.words.protocols.jabber import client, jid | 61 from twisted.words.protocols.jabber import jid, client as jabber_client |
62 from twisted.protocols.basic import FileSender | 62 from twisted.protocols.basic import FileSender |
63 from twisted.words.xish import domish | 63 from twisted.words.xish import domish |
64 from twisted.web.client import getPage | 64 from twisted.web.client import getPage |
65 from sat.core.exceptions import ProfileNotInCacheError | |
65 import struct | 66 import struct |
66 import urllib | 67 import hashlib |
67 import hashlib, pdb | |
68 | 68 |
69 from zope.interface import implements | 69 from zope.interface import implements |
70 | 70 |
71 try: | 71 try: |
72 from twisted.words.protocols.xmlstream import XMPPHandler | 72 from twisted.words.protocols.xmlstream import XMPPHandler |
296 self.loseConnection() | 296 self.loseConnection() |
297 return | 297 return |
298 | 298 |
299 if self.factory.proxy: | 299 if self.factory.proxy: |
300 self.state = STATE_READY | 300 self.state = STATE_READY |
301 self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer) | 301 self.factory.activateCb(self.sid, self.factory.iq_id, self.startTransfer, self.profile) |
302 else: | 302 else: |
303 self.state = STATE_TARGET_READY | 303 self.state = STATE_TARGET_READY |
304 self.factory.activateCb(self.sid, self.factory.iq_id) | 304 self.factory.activateCb(self.sid, self.factory.iq_id, self.profile) |
305 | 305 |
306 except struct.error, why: | 306 except struct.error, why: |
307 return None | 307 return None |
308 | 308 |
309 def connectionMade(self): | 309 def connectionMade(self): |
310 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") | 310 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") |
311 | 311 |
312 if isinstance(self.factory, Socks5ClientFactory): | 312 if isinstance(self.factory, Socks5ClientFactory): |
313 self.sid = self.factory.sid | 313 self.sid = self.factory.sid |
314 self.profile = self.factory.profile | |
314 self.data = self.factory.data | 315 self.data = self.factory.data |
315 self.state = STATE_TARGET_INITIAL | 316 self.state = STATE_TARGET_INITIAL |
316 self._startNegotiation() | 317 self._startNegotiation() |
317 | 318 |
318 def connectRequested(self, addr, port): | 319 def connectRequested(self, addr, port): |
319 debug("connectRequested") | 320 debug("connectRequested") |
320 | 321 |
321 # Check that this session if expected | 322 # Check that this session is expected |
322 if not self.factory.hash_sid_map.has_key(addr): | 323 if not self.factory.hash_sid_map.has_key(addr): |
323 #no: we refuse it | 324 #no: we refuse it |
324 self.sendErrorReply(socks5.REPLY_CONN_REFUSED) | 325 self.sendErrorReply(REPLY_CONN_REFUSED) |
325 return | 326 return |
326 self.sid = self.factory.hash_sid_map[addr] | 327 self.sid, self.profile = self.factory.hash_sid_map[addr] |
327 self.factory.current_stream[self.sid]["start_transfer_cb"] = self.startTransfer | 328 client = self.factory.host.getClient(self.profile) |
329 if not client: | |
330 raise ProfileNotInCacheError | |
331 client.xep_0065_current_stream[self.sid]["start_transfer_cb"] = self.startTransfer | |
328 self.connectCompleted(addr, 0) | 332 self.connectCompleted(addr, 0) |
329 self.transport.stopReading() | 333 self.transport.stopReading() |
330 | 334 |
331 def startTransfer(self, file_obj): | 335 def startTransfer(self, file_obj): |
332 """Callback called when the result iq is received""" | 336 """Callback called when the result iq is received""" |
334 d.addCallback(self.fileTransfered) | 338 d.addCallback(self.fileTransfered) |
335 | 339 |
336 def fileTransfered(self, d): | 340 def fileTransfered(self, d): |
337 info(_("File transfer completed, closing connection")) | 341 info(_("File transfer completed, closing connection")) |
338 self.transport.loseConnection() | 342 self.transport.loseConnection() |
339 self.factory.finishedCb(self.sid, True) | 343 self.factory.finishedCb(self.sid, True, self.profile) |
340 | 344 |
341 def connectCompleted(self, remotehost, remoteport): | 345 def connectCompleted(self, remotehost, remoteport): |
342 debug("connectCompleted") | 346 debug("connectCompleted") |
343 if self.addressType == ADDR_IPV4: | 347 if self.addressType == ADDR_IPV4: |
344 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) | 348 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) |
393 | 397 |
394 | 398 |
395 class Socks5ServerFactory(protocol.ServerFactory): | 399 class Socks5ServerFactory(protocol.ServerFactory): |
396 protocol = SOCKSv5 | 400 protocol = SOCKSv5 |
397 | 401 |
398 def __init__(self, current_stream, hash_sid_map, finishedCb): | 402 def __init__(self, host, hash_sid_map, finishedCb): |
399 self.current_stream = current_stream | 403 self.host = host |
400 self.hash_sid_map = hash_sid_map | 404 self.hash_sid_map = hash_sid_map |
401 self.finishedCb = finishedCb | 405 self.finishedCb = finishedCb |
402 | 406 |
403 def startedConnecting(self, connector): | 407 def startedConnecting(self, connector): |
404 debug (_("Socks 5 server connection started")) | 408 debug (_("Socks 5 server connection started")) |
407 debug (_("Socks 5 server connection lost (reason: %s)"), reason) | 411 debug (_("Socks 5 server connection lost (reason: %s)"), reason) |
408 | 412 |
409 class Socks5ClientFactory(protocol.ClientFactory): | 413 class Socks5ClientFactory(protocol.ClientFactory): |
410 protocol = SOCKSv5 | 414 protocol = SOCKSv5 |
411 | 415 |
412 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False): | 416 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb, proxy=False, profile=None): |
413 """Init the Client Factory | 417 """Init the Client Factory |
414 @param current_stream: current streams data | 418 @param current_stream: current streams data |
415 @param sid: Session ID | 419 @param sid: Session ID |
416 @param iq_id: iq id used to initiate the stream | 420 @param iq_id: iq id used to initiate the stream |
417 @param activateCb: method to call to activate the stream | 421 @param activateCb: method to call to activate the stream |
418 @param finishedCb: method to call when the stream session is finished | 422 @param finishedCb: method to call when the stream session is finished |
419 @param proxy: True if we are connecting throught a proxy (and we are a requester)""" | 423 @param proxy: True if we are connecting throught a proxy (and we are a requester) |
424 @param profile: %(doc_profile)s""" | |
425 assert(profile) | |
420 self.data = current_stream[sid] | 426 self.data = current_stream[sid] |
421 self.sid = sid | 427 self.sid = sid |
422 self.iq_id = iq_id | 428 self.iq_id = iq_id |
423 self.activateCb = activateCb | 429 self.activateCb = activateCb |
424 self.finishedCb = finishedCb | 430 self.finishedCb = finishedCb |
425 self.proxy = proxy | 431 self.proxy = proxy |
432 self.profile = profile | |
426 | 433 |
427 def startedConnecting(self, connector): | 434 def startedConnecting(self, connector): |
428 debug (_("Socks 5 client connection started")) | 435 debug (_("Socks 5 client connection started")) |
429 | 436 |
430 def clientConnectionLost(self, connector, reason): | 437 def clientConnectionLost(self, connector, reason): |
431 debug (_("Socks 5 client connection lost")) | 438 debug (_("Socks 5 client connection lost (reason: %s)"), reason) |
432 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone) #TODO: really check if the state is actually successful | 439 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone, self.profile) #TODO: really check if the state is actually successful |
433 | 440 |
434 | 441 |
435 class XEP_0065(): | 442 class XEP_0065(): |
436 | 443 |
437 NAMESPACE = NS_BS | 444 NAMESPACE = NS_BS |
456 | 463 |
457 def __init__(self, host): | 464 def __init__(self, host): |
458 info(_("Plugin XEP_0065 initialization")) | 465 info(_("Plugin XEP_0065 initialization")) |
459 | 466 |
460 #session data | 467 #session data |
461 self.current_stream = {} #key: stream_id, value: data(dict) | 468 self.hash_sid_map = {} #key: hash of the transfer session, value: (session id, profile) |
462 self.hash_sid_map = {} #key: hash of the transfer session, value: session id | |
463 | 469 |
464 self.host = host | 470 self.host = host |
465 debug(_("registering")) | 471 debug(_("registering")) |
466 self.server_factory = Socks5ServerFactory(self.current_stream, self.hash_sid_map, self._killId) | 472 self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) |
467 | 473 |
468 #parameters | 474 #parameters |
469 host.memory.importParams(XEP_0065.params) | 475 host.memory.importParams(XEP_0065.params) |
470 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) | 476 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) |
471 port = int(self.host.memory.getParamA("Port", "File Transfer")) | 477 port = int(self.host.memory.getParamA("Port", "File Transfer")) |
474 reactor.listenTCP(port, self.server_factory) | 480 reactor.listenTCP(port, self.server_factory) |
475 | 481 |
476 def getHandler(self, profile): | 482 def getHandler(self, profile): |
477 return XEP_0065_handler(self) | 483 return XEP_0065_handler(self) |
478 | 484 |
485 def profileConnected(self, profile): | |
486 client = self.host.getClient(profile) | |
487 if not client: | |
488 raise ProfileNotInCacheError | |
489 client.xep_0065_current_stream = {} #key: stream_id, value: data(dict) | |
490 | |
479 def getExternalIP(self): | 491 def getExternalIP(self): |
480 """Return IP visible from outside, by asking to a website""" | 492 """Return IP visible from outside, by asking to a website""" |
481 return getPage("http://www.goffi.org/sat_tools/get_ip.php") | 493 return getPage("http://www.goffi.org/sat_tools/get_ip.php") |
482 | 494 |
483 def getProgress(self, sid, data): | 495 def getProgress(self, sid, data, profile): |
484 """Fill data with position of current transfer""" | 496 """Fill data with position of current transfer""" |
497 client = self.host.getClient(profile) | |
498 if not client: | |
499 raise ProfileNotInCacheError | |
485 try: | 500 try: |
486 file_obj = self.current_stream[sid]["file_obj"] | 501 file_obj = client.xep_0065_current_stream[sid]["file_obj"] |
487 data["position"] = str(file_obj.tell()) | 502 data["position"] = str(file_obj.tell()) |
488 data["size"] = str(self.current_stream[sid]["size"]) | 503 data["size"] = str(client.xep_0065_current_stream[sid]["size"]) |
489 except: | 504 except: |
490 pass | 505 pass |
491 | 506 |
492 def _timeOut(self, sid): | 507 def _timeOut(self, sid, profile): |
493 """Delecte current_stream id, called after timeout | 508 """Delecte current_stream id, called after timeout |
494 @param id: id of self.current_stream""" | 509 @param id: id of client.xep_0065_current_stream""" |
495 info(_("Socks5 Bytestream: TimeOut reached for id %s") % sid); | 510 info(_("Socks5 Bytestream: TimeOut reached for id %s [%s]") % (sid, profile)); |
496 self._killId(sid, False, "TIMEOUT") | 511 self._killId(sid, False, "TIMEOUT", profile) |
497 | 512 |
498 def _killId(self, sid, success=False, failure_reason="UNKNOWN"): | 513 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): |
499 """Delete an current_stream id, clean up associated observers | 514 """Delete an current_stream id, clean up associated observers |
500 @param sid: id of self.current_stream""" | 515 @param sid: id of client.xep_0065_current_stream""" |
501 if not self.current_stream.has_key(sid): | 516 assert(profile) |
517 client = self.host.getClient(profile) | |
518 if not client: | |
519 warning(_("Client no more in cache")) | |
520 return | |
521 if not client.xep_0065_current_stream.has_key(sid): | |
502 warning(_("kill id called on a non existant id")) | 522 warning(_("kill id called on a non existant id")) |
503 return | 523 return |
504 if self.current_stream[sid].has_key("observer_cb"): | 524 if client.xep_0065_current_stream[sid].has_key("observer_cb"): |
505 xmlstream = self.current_stream[sid]["xmlstream"] | 525 xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] |
506 xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) | 526 xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) |
507 if self.current_stream[sid]['timer'].active(): | 527 if client.xep_0065_current_stream[sid]['timer'].active(): |
508 self.current_stream[sid]['timer'].cancel() | 528 client.xep_0065_current_stream[sid]['timer'].cancel() |
509 if self.current_stream[sid].has_key("size"): | 529 if client.xep_0065_current_stream[sid].has_key("size"): |
510 self.host.removeProgressCB(sid) | 530 self.host.removeProgressCB(sid, profile) |
511 | 531 |
512 file_obj = self.current_stream[sid]['file_obj'] | 532 file_obj = client.xep_0065_current_stream[sid]['file_obj'] |
513 success_cb = self.current_stream[sid]['success_cb'] | 533 success_cb = client.xep_0065_current_stream[sid]['success_cb'] |
514 failure_cb = self.current_stream[sid]['failure_cb'] | 534 failure_cb = client.xep_0065_current_stream[sid]['failure_cb'] |
515 | 535 |
516 del self.current_stream[sid] | 536 session_hash = client.xep_0065_current_stream[sid].get('hash') |
517 if self.hash_sid_map.has_key(sid): | 537 del client.xep_0065_current_stream[sid] |
518 del self.hash_sid_map[sid] | 538 if session_hash in self.hash_sid_map: |
539 #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc). | |
540 del self.hash_sid_map[session_hash] | |
519 | 541 |
520 if success: | 542 if success: |
521 success_cb(sid, file_obj, NS_BS) | 543 success_cb(sid, file_obj, NS_BS, profile) |
522 else: | 544 else: |
523 failure_cb(sid, file_obj, NS_BS, failure_reason) | 545 failure_cb(sid, file_obj, NS_BS, failure_reason, profile) |
524 | 546 |
525 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): | 547 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile=None): |
526 """Launch the stream workflow | 548 """Launch the stream workflow |
527 @param file_obj: file_obj to send | 549 @param file_obj: file_obj to send |
528 @param to_jid: JID of the recipient | 550 @param to_jid: JID of the recipient |
529 @param sid: Stream session id | 551 @param sid: Stream session id |
530 @param length: number of byte to send, or None to send until the end | 552 @param length: number of byte to send, or None to send until the end |
531 @param successCb: method to call when stream successfuly finished | 553 @param successCb: method to call when stream successfuly finished |
532 @param failureCb: method to call when something goes wrong | 554 @param failureCb: method to call when something goes wrong |
533 @param profile: %(doc_profile)s""" | 555 @param profile: %(doc_profile)s""" |
556 assert(profile) | |
557 client = self.host.getClient(profile) | |
558 if not client: | |
559 error(_("Unknown profile, this should not happen")) | |
560 raise ProfileNotInCacheError | |
561 | |
534 if length != None: | 562 if length != None: |
535 error(_('stream length not managed yet')) | 563 error(_('stream length not managed yet')) |
536 return; | 564 return; |
537 profile_jid, xmlstream = self.host.getJidNStream(profile) | 565 |
538 if not profile_jid or not xmlstream: | 566 profile_jid = client.jid |
539 error(_("Unknown profile, this should not happen")) | 567 xmlstream = client.xmlstream |
540 return; | 568 |
541 data = self.current_stream[sid] = {} | 569 data = client.xep_0065_current_stream[sid] = {} |
542 data["profile"] = profile | 570 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) |
543 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | |
544 data["file_obj"] = file_obj | 571 data["file_obj"] = file_obj |
545 data["from"] = profile_jid | 572 data["from"] = profile_jid |
546 data["to"] = to_jid | 573 data["to"] = to_jid |
547 data["success_cb"] = successCb | 574 data["success_cb"] = successCb |
548 data["failure_cb"] = failureCb | 575 data["failure_cb"] = failureCb |
549 data["xmlstream"] = xmlstream | 576 data["xmlstream"] = xmlstream |
550 data["hash"] = calculateHash(profile_jid, to_jid, sid) | 577 data["hash"] = calculateHash(profile_jid, to_jid, sid) |
551 self.hash_sid_map[data["hash"]] = sid | 578 self.hash_sid_map[data["hash"]] = (sid, profile) |
552 if size: | 579 if size: |
553 data["size"] = size | 580 data["size"] = size |
554 self.host.registerProgressCB(sid, self.getProgress) | 581 self.host.registerProgressCB(sid, self.getProgress, profile) |
555 iq_elt = client.IQ(xmlstream,'set') | 582 iq_elt = jabber_client.IQ(xmlstream,'set') |
556 iq_elt["from"] = profile_jid.full() | 583 iq_elt["from"] = profile_jid.full() |
557 iq_elt["to"] = to_jid.full() | 584 iq_elt["to"] = to_jid.full() |
558 query_elt = iq_elt.addElement('query', NS_BS) | 585 query_elt = iq_elt.addElement('query', NS_BS) |
559 query_elt['mode'] = 'tcp' | 586 query_elt['mode'] = 'tcp' |
560 query_elt['sid'] = sid | 587 query_elt['sid'] = sid |
568 streamhost = query_elt.addElement('streamhost') | 595 streamhost = query_elt.addElement('streamhost') |
569 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) | 596 streamhost['host'] = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) |
570 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) | 597 streamhost['port'] = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) |
571 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | 598 streamhost['jid'] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) |
572 | 599 |
573 iq_elt.addCallback(self.iqResult, sid) | 600 iq_elt.addCallback(self.iqResult, sid, profile) |
574 iq_elt.send() | 601 iq_elt.send() |
575 | 602 |
576 def iqResult(self, sid, iq_elt): | 603 def iqResult(self, sid, profile, iq_elt): |
577 """Called when the result of open iq is received""" | 604 """Called when the result of open iq is received""" |
578 if iq_elt["type"] == "error": | 605 if iq_elt["type"] == "error": |
579 warning(_("Transfer failed")) | 606 warning(_("Transfer failed")) |
580 return | 607 return |
581 | 608 client = self.host.getClient(profile) |
609 if not client: | |
610 raise ProfileNotInCacheError | |
582 try: | 611 try: |
583 data = self.current_stream[sid] | 612 data = client.xep_0065_current_stream[sid] |
584 file_obj = data["file_obj"] | 613 file_obj = data["file_obj"] |
585 timer = data["timer"] | 614 timer = data["timer"] |
586 profile = data["profile"] | |
587 except KeyError: | 615 except KeyError: |
588 error(_("Internal error, can't do transfer")) | 616 error(_("Internal error, can't do transfer")) |
589 return | 617 return |
590 | 618 |
591 if timer.active(): | 619 if timer.active(): |
605 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) | 633 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) |
606 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | 634 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) |
607 if proxy_jid != streamhost_jid: | 635 if proxy_jid != streamhost_jid: |
608 warning(_("Proxy jid is not the same as in parameters, this should not happen")) | 636 warning(_("Proxy jid is not the same as in parameters, this should not happen")) |
609 return | 637 return |
610 factory = Socks5ClientFactory(self.current_stream, sid, None, self.activateProxyStream, self._killId, True) | 638 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) |
611 reactor.connectTCP(proxy_host, int(proxy_port), factory) | 639 reactor.connectTCP(proxy_host, int(proxy_port), factory) |
612 else: | 640 else: |
613 data["start_transfer_cb"](file_obj) #We now activate the stream | 641 data["start_transfer_cb"](file_obj) #We now activate the stream |
614 | 642 |
615 def activateProxyStream(self, sid, iq_id, start_transfer_cb): | 643 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): |
616 debug(_("activating stream")) | 644 debug(_("activating stream")) |
617 data = self.current_stream[sid] | 645 client = self.host.getClient(profile) |
618 profile = data['profile'] | 646 if not client: |
647 raise ProfileNotInCacheError | |
648 data = client.xep_0065_current_stream[sid] | |
619 profile_jid, xmlstream = self.host.getJidNStream(profile) | 649 profile_jid, xmlstream = self.host.getJidNStream(profile) |
620 | 650 |
621 iq_elt = client.IQ(xmlstream,'set') | 651 iq_elt = client.IQ(xmlstream,'set') |
622 iq_elt["from"] = profile_jid.full() | 652 iq_elt["from"] = profile_jid.full() |
623 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | 653 iq_elt["to"] = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) |
632 warning(_("Can't activate the proxy stream")) | 662 warning(_("Can't activate the proxy stream")) |
633 return | 663 return |
634 else: | 664 else: |
635 start_transfer_cb(file_obj) | 665 start_transfer_cb(file_obj) |
636 | 666 |
637 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): | 667 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): |
638 """Called when a bytestream is imminent | 668 """Called when a bytestream is imminent |
639 @param from_jid: jid of the sender | 669 @param from_jid: jid of the sender |
640 @param sid: Stream id | 670 @param sid: Stream id |
641 @param file_obj: File object where data will be written | 671 @param file_obj: File object where data will be written |
642 @param size: full size of the data, or None if unknown | 672 @param size: full size of the data, or None if unknown |
643 @param success_cb: method to call when successfuly finished | 673 @param success_cb: method to call when successfuly finished |
644 @param failure_cb: method to call when something goes wrong""" | 674 @param failure_cb: method to call when something goes wrong |
645 data = self.current_stream[sid] = {} | 675 @param profile: %(doc_profile)s""" |
676 client = self.host.getClient(profile) | |
677 if not client: | |
678 raise ProfileNotInCacheError | |
679 data = client.xep_0065_current_stream[sid] = {} | |
646 data["from"] = from_jid | 680 data["from"] = from_jid |
647 data["file_obj"] = file_obj | 681 data["file_obj"] = file_obj |
648 data["seq"] = -1 | 682 data["seq"] = -1 |
649 if size: | 683 if size: |
650 data["size"] = size | 684 data["size"] = size |
651 self.host.registerProgressCB(sid, self.getProgress) | 685 self.host.registerProgressCB(sid, self.getProgress, profile) |
652 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | 686 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) |
653 data["success_cb"] = success_cb | 687 data["success_cb"] = success_cb |
654 data["failure_cb"] = failure_cb | 688 data["failure_cb"] = failure_cb |
655 | 689 |
656 | 690 |
657 def streamQuery(self, iq_elt, profile): | 691 def streamQuery(self, iq_elt, profile): |
658 """Get file using byte stream""" | 692 """Get file using byte stream""" |
659 debug(_("BS stream query")) | 693 debug(_("BS stream query")) |
660 profile_jid, xmlstream = self.host.getJidNStream(profile) | 694 client = self.host.getClient(profile) |
695 | |
696 if not client: | |
697 raise ProfileNotInCacheError | |
698 | |
699 xmlstream = client.xmlstream | |
700 | |
661 iq_elt.handled = True | 701 iq_elt.handled = True |
662 query_elt = iq_elt.firstChildElement() | 702 query_elt = iq_elt.firstChildElement() |
663 sid = query_elt.getAttribute("sid") | 703 sid = query_elt.getAttribute("sid") |
664 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) | 704 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) |
665 | 705 |
666 if not sid in self.current_stream: | 706 if not sid in client.xep_0065_current_stream: |
667 warning(_("Ignoring unexpected BS transfer: %s" % sid)) | 707 warning(_("Ignoring unexpected BS transfer: %s" % sid)) |
668 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) | 708 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) |
669 return | 709 return |
670 | 710 |
671 self.current_stream[sid]['timer'].cancel() | 711 client.xep_0065_current_stream[sid]['timer'].cancel() |
672 self.current_stream[sid]["to"] = jid.JID(iq_elt["to"]) | 712 client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) |
673 self.current_stream[sid]["xmlstream"] = xmlstream | 713 client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream |
674 | 714 |
675 if not streamhost_elts: | 715 if not streamhost_elts: |
676 warning(_("No streamhost found in stream query %s" % sid)) | 716 warning(_("No streamhost found in stream query %s" % sid)) |
677 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) | 717 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) |
678 return | 718 return |
684 if not sh_host or not sh_port or not sh_jid: | 724 if not sh_host or not sh_port or not sh_jid: |
685 warning(_("incomplete streamhost element")) | 725 warning(_("incomplete streamhost element")) |
686 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) | 726 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) |
687 return | 727 return |
688 | 728 |
689 self.current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) | 729 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) |
690 | 730 |
691 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port}) | 731 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port}) |
692 factory = Socks5ClientFactory(self.current_stream, sid, iq_elt["id"], self.activateStream, self._killId) | 732 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) |
693 reactor.connectTCP(sh_host, int(sh_port), factory) | 733 reactor.connectTCP(sh_host, int(sh_port), factory) |
694 | 734 |
695 def activateStream(self, sid, iq_id): | 735 def activateStream(self, sid, iq_id, profile): |
736 client = self.host.getClient(profile) | |
737 if not client: | |
738 raise ProfileNotInCacheError | |
696 debug(_("activating stream")) | 739 debug(_("activating stream")) |
697 result = domish.Element((None, 'iq')) | 740 result = domish.Element((None, 'iq')) |
698 data = self.current_stream[sid] | 741 data = client.xep_0065_current_stream[sid] |
699 result['type'] = 'result' | 742 result['type'] = 'result' |
700 result['id'] = iq_id | 743 result['id'] = iq_id |
701 result['from'] = data["to"].full() | 744 result['from'] = data["to"].full() |
702 result['to'] = data["from"].full() | 745 result['to'] = data["from"].full() |
703 query = result.addElement('query', NS_BS) | 746 query = result.addElement('query', NS_BS) |
767 def after_init(ignore): | 810 def after_init(ignore): |
768 proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile) | 811 proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile) |
769 if not proxy_ent: | 812 if not proxy_ent: |
770 debug(_("No proxy found on this server")) | 813 debug(_("No proxy found on this server")) |
771 return | 814 return |
772 iq_elt = client.IQ(self.parent.xmlstream,'get') | 815 iq_elt = jabber_client.IQ(self.parent.xmlstream,'get') |
773 iq_elt["to"] = proxy_ent.full() | 816 iq_elt["to"] = proxy_ent.full() |
774 query_elt = iq_elt.addElement('query', NS_BS) | 817 query_elt = iq_elt.addElement('query', NS_BS) |
775 iq_elt.addCallback(self._proxyDataResult) | 818 iq_elt.addCallback(self._proxyDataResult) |
776 iq_elt.send() | 819 iq_elt.send() |
777 | 820 |