comparison src/plugins/plugin_xep_0065.py @ 587:952322b1d490

Remove trailing whitespaces.
author Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
date Fri, 18 Jan 2013 17:55:34 +0100
parents 9902ec2d8d9b
children beaf6bec2fcd
comparison
equal deleted inserted replaced
586:6a718ede8be1 587:952322b1d490
30 30
31 -- 31 --
32 32
33 Here is a copy of the original license: 33 Here is a copy of the original license:
34 34
35 Copyright (C) 35 Copyright (C)
36 2002-2004 Dave Smith (dizzyd@jabber.org) 36 2002-2004 Dave Smith (dizzyd@jabber.org)
37 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com) 37 2007-2008 Fabio Forno (xmpp:ff@jabber.bluendo.com)
38 38
39 Permission is hereby granted, free of charge, to any person obtaining a copy 39 Permission is hereby granted, free of charge, to any person obtaining a copy
40 of this software and associated documentation files (the "Software"), to deal 40 of this software and associated documentation files (the "Software"), to deal
170 self.transport.loseConnection() 170 self.transport.loseConnection()
171 return 171 return
172 172
173 # Trim off front of the buffer 173 # Trim off front of the buffer
174 self.buf = self.buf[nmethod+2:] 174 self.buf = self.buf[nmethod+2:]
175 175
176 # Check for supported auth mechs 176 # Check for supported auth mechs
177 for m in self.supportedAuthMechs: 177 for m in self.supportedAuthMechs:
178 if m in methods: 178 if m in methods:
179 # Update internal state, according to selected method 179 # Update internal state, according to selected method
180 if m == AUTHMECH_ANON: 180 if m == AUTHMECH_ANON:
217 debug("sendErrorReply") 217 debug("sendErrorReply")
218 # Any other address types are not supported 218 # Any other address types are not supported
219 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) 219 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0)
220 self.transport.write(result) 220 self.transport.write(result)
221 self.transport.loseConnection() 221 self.transport.loseConnection()
222 222
223 def _parseRequest(self): 223 def _parseRequest(self):
224 debug("_parseRequest") 224 debug("_parseRequest")
225 try: 225 try:
226 # Parse out data and trim buffer accordingly 226 # Parse out data and trim buffer accordingly
227 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) 227 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
233 233
234 # Deal with addresses 234 # Deal with addresses
235 if self.addressType == ADDR_IPV4: 235 if self.addressType == ADDR_IPV4:
236 addr, port = struct.unpack('!IH', self.buf[4:10]) 236 addr, port = struct.unpack('!IH', self.buf[4:10])
237 self.buf = self.buf[10:] 237 self.buf = self.buf[10:]
238 elif self.addressType == ADDR_DOMAINNAME: 238 elif self.addressType == ADDR_DOMAINNAME:
239 nlen = ord(self.buf[4]) 239 nlen = ord(self.buf[4])
240 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) 240 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
241 self.buf = self.buf[7 + len(addr):] 241 self.buf = self.buf[7 + len(addr):]
242 else: 242 else:
243 # Any other address types are not supported 243 # Any other address types are not supported
280 280
281 # Deal with addresses 281 # Deal with addresses
282 if self.addressType == ADDR_IPV4: 282 if self.addressType == ADDR_IPV4:
283 addr, port = struct.unpack('!IH', self.buf[4:10]) 283 addr, port = struct.unpack('!IH', self.buf[4:10])
284 self.buf = self.buf[10:] 284 self.buf = self.buf[10:]
285 elif self.addressType == ADDR_DOMAINNAME: 285 elif self.addressType == ADDR_DOMAINNAME:
286 nlen = ord(self.buf[4]) 286 nlen = ord(self.buf[4])
287 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:]) 287 addr, port = struct.unpack('!%dsH' % nlen, self.buf[5:])
288 self.buf = self.buf[7 + len(addr):] 288 self.buf = self.buf[7 + len(addr):]
289 else: 289 else:
290 # Any other address types are not supported 290 # Any other address types are not supported
306 except struct.error, why: 306 except struct.error, why:
307 return None 307 return None
308 308
309 def connectionMade(self): 309 def connectionMade(self):
310 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") 310 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target")
311 311
312 if isinstance(self.factory, Socks5ClientFactory): 312 if isinstance(self.factory, Socks5ClientFactory):
313 self.sid = self.factory.sid 313 self.sid = self.factory.sid
314 self.profile = self.factory.profile 314 self.profile = self.factory.profile
315 self.data = self.factory.data 315 self.data = self.factory.data
316 self.state = STATE_TARGET_INITIAL 316 self.state = STATE_TARGET_INITIAL
317 self._startNegotiation() 317 self._startNegotiation()
318 318
319 def connectRequested(self, addr, port): 319 def connectRequested(self, addr, port):
320 debug("connectRequested") 320 debug("connectRequested")
321 321
322 # Check that this session is expected 322 # Check that this session is expected
323 if not self.factory.hash_sid_map.has_key(addr): 323 if not self.factory.hash_sid_map.has_key(addr):
324 #no: we refuse it 324 #no: we refuse it
325 self.sendErrorReply(REPLY_CONN_REFUSED) 325 self.sendErrorReply(REPLY_CONN_REFUSED)
326 return 326 return
334 334
335 def startTransfer(self, file_obj): 335 def startTransfer(self, file_obj):
336 """Callback called when the result iq is received""" 336 """Callback called when the result iq is received"""
337 d = self.beginFileTransfer(file_obj, self.transport) 337 d = self.beginFileTransfer(file_obj, self.transport)
338 d.addCallback(self.fileTransfered) 338 d.addCallback(self.fileTransfered)
339 339
340 def fileTransfered(self, d): 340 def fileTransfered(self, d):
341 info(_("File transfer completed, closing connection")) 341 info(_("File transfer completed, closing connection"))
342 self.transport.loseConnection() 342 self.transport.loseConnection()
343 self.factory.finishedCb(self.sid, True, self.profile) 343 self.factory.finishedCb(self.sid, True, self.profile)
344 344
349 elif self.addressType == ADDR_DOMAINNAME: 349 elif self.addressType == ADDR_DOMAINNAME:
350 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, 350 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0,
351 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) 351 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport)
352 self.transport.write(result) 352 self.transport.write(result)
353 self.state = STATE_READY 353 self.state = STATE_READY
354 354
355 def bindRequested(self, addr, port): 355 def bindRequested(self, addr, port):
356 pass 356 pass
357 357
358 def authenticateUserPass(self, user, passwd): 358 def authenticateUserPass(self, user, passwd):
359 debug("User/pass: %s/%s", user, passwd) 359 debug("User/pass: %s/%s", user, passwd)
360 return True 360 return True
361 361
362 def dataReceived(self, buf): 362 def dataReceived(self, buf):
438 debug (_("Socks 5 client connection lost (reason: %s)"), reason) 438 debug (_("Socks 5 client connection lost (reason: %s)"), reason)
439 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone, self.profile) #TODO: really check if the state is actually successful 439 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone, self.profile) #TODO: really check if the state is actually successful
440 440
441 441
442 class XEP_0065(): 442 class XEP_0065():
443 443
444 NAMESPACE = NS_BS 444 NAMESPACE = NS_BS
445 445
446 params = """ 446 params = """
447 <params> 447 <params>
448 <general> 448 <general>
461 </params> 461 </params>
462 """ 462 """
463 463
464 def __init__(self, host): 464 def __init__(self, host):
465 info(_("Plugin XEP_0065 initialization")) 465 info(_("Plugin XEP_0065 initialization"))
466 466
467 #session data 467 #session data
468 self.hash_sid_map = {} #key: hash of the transfer session, value: (session id, profile) 468 self.hash_sid_map = {} #key: hash of the transfer session, value: (session id, profile)
469 469
470 self.host = host 470 self.host = host
471 debug(_("registering")) 471 debug(_("registering"))
472 self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) 472 self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile))
473 473
474 #parameters 474 #parameters
475 host.memory.importParams(XEP_0065.params) 475 host.memory.importParams(XEP_0065.params)
476 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) 476 host.memory.setDefault("IP", "File Transfer", self.getExternalIP)
477 port = int(self.host.memory.getParamA("Port", "File Transfer")) 477 port = int(self.host.memory.getParamA("Port", "File Transfer"))
478 478
479 info(_("Launching Socks5 Stream server on port %d"), port) 479 info(_("Launching Socks5 Stream server on port %d"), port)
480 reactor.listenTCP(port, self.server_factory) 480 reactor.listenTCP(port, self.server_factory)
481 481
482 def getHandler(self, profile): 482 def getHandler(self, profile):
483 return XEP_0065_handler(self) 483 return XEP_0065_handler(self)
484 484
485 def profileConnected(self, profile): 485 def profileConnected(self, profile):
486 client = self.host.getClient(profile) 486 client = self.host.getClient(profile)
487 if not client: 487 if not client:
488 raise ProfileNotInCacheError 488 raise ProfileNotInCacheError
489 client.xep_0065_current_stream = {} #key: stream_id, value: data(dict) 489 client.xep_0065_current_stream = {} #key: stream_id, value: data(dict)
501 file_obj = client.xep_0065_current_stream[sid]["file_obj"] 501 file_obj = client.xep_0065_current_stream[sid]["file_obj"]
502 data["position"] = str(file_obj.tell()) 502 data["position"] = str(file_obj.tell())
503 data["size"] = str(client.xep_0065_current_stream[sid]["size"]) 503 data["size"] = str(client.xep_0065_current_stream[sid]["size"])
504 except: 504 except:
505 pass 505 pass
506 506
507 def _timeOut(self, sid, profile): 507 def _timeOut(self, sid, profile):
508 """Delecte current_stream id, called after timeout 508 """Delecte current_stream id, called after timeout
509 @param id: id of client.xep_0065_current_stream""" 509 @param id: id of client.xep_0065_current_stream"""
510 info(_("Socks5 Bytestream: TimeOut reached for id %s [%s]") % (sid, profile)) 510 info(_("Socks5 Bytestream: TimeOut reached for id %s [%s]") % (sid, profile))
511 self._killId(sid, False, "TIMEOUT", profile) 511 self._killId(sid, False, "TIMEOUT", profile)
512 512
513 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): 513 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None):
514 """Delete an current_stream id, clean up associated observers 514 """Delete an current_stream id, clean up associated observers
515 @param sid: id of client.xep_0065_current_stream""" 515 @param sid: id of client.xep_0065_current_stream"""
516 assert(profile) 516 assert(profile)
517 client = self.host.getClient(profile) 517 client = self.host.getClient(profile)
526 xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) 526 xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"])
527 if client.xep_0065_current_stream[sid]['timer'].active(): 527 if client.xep_0065_current_stream[sid]['timer'].active():
528 client.xep_0065_current_stream[sid]['timer'].cancel() 528 client.xep_0065_current_stream[sid]['timer'].cancel()
529 if client.xep_0065_current_stream[sid].has_key("size"): 529 if client.xep_0065_current_stream[sid].has_key("size"):
530 self.host.removeProgressCB(sid, profile) 530 self.host.removeProgressCB(sid, profile)
531 531
532 file_obj = client.xep_0065_current_stream[sid]['file_obj'] 532 file_obj = client.xep_0065_current_stream[sid]['file_obj']
533 success_cb = client.xep_0065_current_stream[sid]['success_cb'] 533 success_cb = client.xep_0065_current_stream[sid]['success_cb']
534 failure_cb = client.xep_0065_current_stream[sid]['failure_cb'] 534 failure_cb = client.xep_0065_current_stream[sid]['failure_cb']
535 535
536 session_hash = client.xep_0065_current_stream[sid].get('hash') 536 session_hash = client.xep_0065_current_stream[sid].get('hash')
537 del client.xep_0065_current_stream[sid] 537 del client.xep_0065_current_stream[sid]
538 if session_hash in self.hash_sid_map: 538 if session_hash in self.hash_sid_map:
539 #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc). 539 #FIXME: check that self.hash_sid_map is correctly cleaned in all cases (timeout, normal flow, etc).
540 del self.hash_sid_map[session_hash] 540 del self.hash_sid_map[session_hash]
556 assert(profile) 556 assert(profile)
557 client = self.host.getClient(profile) 557 client = self.host.getClient(profile)
558 if not client: 558 if not client:
559 error(_("Unknown profile, this should not happen")) 559 error(_("Unknown profile, this should not happen"))
560 raise ProfileNotInCacheError 560 raise ProfileNotInCacheError
561 561
562 if length != None: 562 if length != None:
563 error(_('stream length not managed yet')) 563 error(_('stream length not managed yet'))
564 return 564 return
565 565
566 profile_jid = client.jid 566 profile_jid = client.jid
567 xmlstream = client.xmlstream 567 xmlstream = client.xmlstream
568 568
569 data = client.xep_0065_current_stream[sid] = {} 569 data = client.xep_0065_current_stream[sid] = {}
570 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) 570 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile)
571 data["file_obj"] = file_obj 571 data["file_obj"] = file_obj
572 data["from"] = profile_jid 572 data["from"] = profile_jid
573 data["to"] = to_jid 573 data["to"] = to_jid
613 file_obj = data["file_obj"] 613 file_obj = data["file_obj"]
614 timer = data["timer"] 614 timer = data["timer"]
615 except KeyError: 615 except KeyError:
616 error(_("Internal error, can't do transfer")) 616 error(_("Internal error, can't do transfer"))
617 return 617 return
618 618
619 if timer.active(): 619 if timer.active():
620 timer.cancel() 620 timer.cancel()
621 621
622 profile_jid, xmlstream = self.host.getJidNStream(profile) 622 profile_jid, xmlstream = self.host.getJidNStream(profile)
623 query_elt = iq_elt.firstChildElement() 623 query_elt = iq_elt.firstChildElement()
624 streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) 624 streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements())
625 if not streamhost_elts: 625 if not streamhost_elts:
626 warning(_("No streamhost found in stream query")) 626 warning(_("No streamhost found in stream query"))
684 data["size"] = size 684 data["size"] = size
685 self.host.registerProgressCB(sid, self.getProgress, profile) 685 self.host.registerProgressCB(sid, self.getProgress, profile)
686 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile) 686 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid, profile)
687 data["success_cb"] = success_cb 687 data["success_cb"] = success_cb
688 data["failure_cb"] = failure_cb 688 data["failure_cb"] = failure_cb
689 689
690 690
691 def streamQuery(self, iq_elt, profile): 691 def streamQuery(self, iq_elt, profile):
692 """Get file using byte stream""" 692 """Get file using byte stream"""
693 debug(_("BS stream query")) 693 debug(_("BS stream query"))
694 client = self.host.getClient(profile) 694 client = self.host.getClient(profile)
695 695
696 if not client: 696 if not client:
697 raise ProfileNotInCacheError 697 raise ProfileNotInCacheError
698 698
699 xmlstream = client.xmlstream 699 xmlstream = client.xmlstream
700 700
701 iq_elt.handled = True 701 iq_elt.handled = True
702 query_elt = iq_elt.firstChildElement() 702 query_elt = iq_elt.firstChildElement()
703 sid = query_elt.getAttribute("sid") 703 sid = query_elt.getAttribute("sid")
704 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) 704 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
705 705
706 if not sid in client.xep_0065_current_stream: 706 if not sid in client.xep_0065_current_stream:
707 warning(_("Ignoring unexpected BS transfer: %s" % sid)) 707 warning(_("Ignoring unexpected BS transfer: %s" % sid))
708 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) 708 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream)
709 return 709 return
710 710
729 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) 729 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
730 730
731 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port}) 731 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port})
732 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) 732 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile)
733 reactor.connectTCP(sh_host, int(sh_port), factory) 733 reactor.connectTCP(sh_host, int(sh_port), factory)
734 734
735 def activateStream(self, sid, iq_id, profile): 735 def activateStream(self, sid, iq_id, profile):
736 client = self.host.getClient(profile) 736 client = self.host.getClient(profile)
737 if not client: 737 if not client:
738 raise ProfileNotInCacheError 738 raise ProfileNotInCacheError
739 debug(_("activating stream")) 739 debug(_("activating stream"))
755 @param to_jid: addressee 755 @param to_jid: addressee
756 @param xmlstream: XML stream to use to send the error""" 756 @param xmlstream: XML stream to use to send the error"""
757 result = domish.Element((None, 'iq')) 757 result = domish.Element((None, 'iq'))
758 result['type'] = 'result' 758 result['type'] = 'result'
759 result['id'] = iq_id 759 result['id'] = iq_id
760 result['to'] = to_jid 760 result['to'] = to_jid
761 error_el = result.addElement('error') 761 error_el = result.addElement('error')
762 error_el['type'] = 'modify' 762 error_el['type'] = 'modify'
763 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) 763 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable'))
764 xmlstream.send(result) 764 xmlstream.send(result)
765 765
769 @param to_jid: addressee 769 @param to_jid: addressee
770 @param xmlstream: XML stream to use to send the error""" 770 @param xmlstream: XML stream to use to send the error"""
771 result = domish.Element((None, 'iq')) 771 result = domish.Element((None, 'iq'))
772 result['type'] = 'result' 772 result['type'] = 'result'
773 result['id'] = iq_id 773 result['id'] = iq_id
774 result['to'] = to_jid 774 result['to'] = to_jid
775 error_el = result.addElement('error') 775 error_el = result.addElement('error')
776 error_el['type'] = 'cancel' 776 error_el['type'] = 'cancel'
777 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','bad-request')) 777 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','bad-request'))
778 xmlstream.send(result) 778 xmlstream.send(result)
779 779
780 class XEP_0065_handler(XMPPHandler): 780 class XEP_0065_handler(XMPPHandler):
781 implements(iwokkel.IDisco) 781 implements(iwokkel.IDisco)
782 782
783 def __init__(self, plugin_parent): 783 def __init__(self, plugin_parent):
784 self.plugin_parent = plugin_parent 784 self.plugin_parent = plugin_parent
785 self.host = plugin_parent.host 785 self.host = plugin_parent.host
786 786
787 def _proxyDataResult(self, iq_elt): 787 def _proxyDataResult(self, iq_elt):
803 streamhost_elt = streamhost_elts[0] 803 streamhost_elt = streamhost_elts[0]
804 proxy = self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid",""), "File Transfer", self.parent.profile) 804 proxy = self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid",""), "File Transfer", self.parent.profile)
805 proxy = self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host",""), "File Transfer", self.parent.profile) 805 proxy = self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host",""), "File Transfer", self.parent.profile)
806 proxy = self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port",""), "File Transfer", self.parent.profile) 806 proxy = self.host.memory.setParam("Proxy port", streamhost_elt.getAttribute("port",""), "File Transfer", self.parent.profile)
807 807
808 808
809 def connectionInitialized(self): 809 def connectionInitialized(self):
810 def after_init(ignore): 810 def after_init(ignore):
811 proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile) 811 proxy_ent = self.host.memory.getServerServiceEntity("proxy", "bytestreams", self.parent.profile)
812 if not proxy_ent: 812 if not proxy_ent:
813 debug(_("No proxy found on this server")) 813 debug(_("No proxy found on this server"))
821 821
822 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile) 822 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile)
823 proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key = self.parent.profile) 823 proxy = self.host.memory.getParamA("Proxy", "File Transfer", profile_key = self.parent.profile)
824 if not proxy: 824 if not proxy:
825 self.parent.client_initialized.addCallback(after_init) 825 self.parent.client_initialized.addCallback(after_init)
826 826
827 827
828 828
829 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 829 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
830 return [disco.DiscoFeature(NS_BS)] 830 return [disco.DiscoFeature(NS_BS)]
831 831