comparison src/plugins/plugin_xep_0065.py @ 394:8f3551ceee17

plugin XEP-0065: refactored and misc stuff fixed. Still not finished plugins XEP-0096: XEP-0065 (Socks5 stream method) managed
author Goffi <goffi@goffi.org>
date Mon, 03 Oct 2011 18:05:15 +0200
parents 7c79d4a8c9e6
children cb0285372818
comparison
equal deleted inserted replaced
393:393b35aa86d2 394:8f3551ceee17
53 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 53 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
54 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 54 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
55 THE SOFTWARE. 55 THE SOFTWARE.
56 """ 56 """
57 57
58 from logging import debug, info, error 58 from logging import debug, info, warning, error
59 from twisted.internet import protocol, reactor 59 from twisted.internet import protocol, reactor
60 from twisted.internet import error as jab_error
61 from twisted.words.protocols.jabber import client, jid
60 from twisted.protocols.basic import FileSender 62 from twisted.protocols.basic import FileSender
61 from twisted.words.xish import domish 63 from twisted.words.xish import domish
62 from twisted.web.client import getPage 64 from twisted.web.client import getPage
63 import struct 65 import struct
64 import urllib 66 import urllib
74 from wokkel import disco, iwokkel 76 from wokkel import disco, iwokkel
75 77
76 IQ_SET = '/iq[@type="set"]' 78 IQ_SET = '/iq[@type="set"]'
77 NS_BS = 'http://jabber.org/protocol/bytestreams' 79 NS_BS = 'http://jabber.org/protocol/bytestreams'
78 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' 80 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]'
81 TIMEOUT = 60 #timeout for workflow
79 82
80 83
81 84
82 PLUGIN_INFO = { 85 PLUGIN_INFO = {
83 "name": "XEP 0065 Plugin", 86 "name": "XEP 0065 Plugin",
125 REPLY_TTL_EXPIRED = 0x06 128 REPLY_TTL_EXPIRED = 0x06
126 REPLY_CMD_NOT_SUPPORTED = 0x07 129 REPLY_CMD_NOT_SUPPORTED = 0x07
127 REPLY_ADDR_NOT_SUPPORTED = 0x08 130 REPLY_ADDR_NOT_SUPPORTED = 0x08
128 131
129 132
133 def calculateHash(from_jid, to_jid, sid):
134 """Calculate SHA1 Hash according to XEP-0065
135 @param from_jid: jid of the requester
136 @param to_jid: jid of the target
137 @param sid: session id
138 @return: hash (string)"""
139 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest()
130 140
131 141
132 142
133 class SOCKSv5(protocol.Protocol, FileSender): 143 class SOCKSv5(protocol.Protocol, FileSender):
134 def __init__(self): 144 def __init__(self):
139 self.supportedAddrs = [ ADDR_DOMAINNAME ] 149 self.supportedAddrs = [ ADDR_DOMAINNAME ]
140 self.enabledCommands = [ CMD_CONNECT ] 150 self.enabledCommands = [ CMD_CONNECT ]
141 self.peersock = None 151 self.peersock = None
142 self.addressType = 0 152 self.addressType = 0
143 self.requestType = 0 153 self.requestType = 0
144 self.activeConns = {}
145 self.pendingConns = {}
146 self.transfered = 0 #nb of bytes already copied
147 154
148 def _startNegotiation(self): 155 def _startNegotiation(self):
149 debug("_startNegotiation") 156 debug("_startNegotiation")
150 self.state = STATE_TARGET_AUTH 157 self.state = STATE_TARGET_AUTH
151 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) 158 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON))
211 # Any other address types are not supported 218 # Any other address types are not supported
212 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) 219 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0)
213 self.transport.write(result) 220 self.transport.write(result)
214 self.transport.loseConnection() 221 self.transport.loseConnection()
215 222
216 def addConnection(self, address, connection):
217 info(_("Adding connection: %(address)s, %(connection)s") % {'address':address, 'connection':connection})
218 olist = self.pendingConns.get(address, [])
219 if len(olist) <= 1:
220 olist.append(connection)
221 self.pendingConns[address] = olist
222 return True
223 else:
224 return False
225
226 def removePendingConnection(self, address, connection):
227 olist = self.pendingConns[address]
228 if len(olist) == 1:
229 del self.pendingConns[address]
230 else:
231 olist.remove(connection)
232 self.pendingConns[address] = olist
233
234 def removeActiveConnection(self, address):
235 del self.activeConns[address]
236
237 def _parseRequest(self): 223 def _parseRequest(self):
238 debug("_parseRequest") 224 debug("_parseRequest")
239 try: 225 try:
240 # Parse out data and trim buffer accordingly 226 # Parse out data and trim buffer accordingly
241 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) 227 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4])
277 return None 263 return None
278 264
279 def _makeRequest(self): 265 def _makeRequest(self):
280 debug("_makeRequest") 266 debug("_makeRequest")
281 self.state = STATE_TARGET_REQUEST 267 self.state = STATE_TARGET_REQUEST
282 sha1 = hashlib.sha1(self.sid + self.initiator_jid + self.target_jid).hexdigest() 268 sha1 = calculateHash(self.data["from"], self.data["to"], self.sid)
283 request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) 269 request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0)
284 self.transport.write(request) 270 self.transport.write(request)
285 271
286 def _parseRequestReply(self): 272 def _parseRequestReply(self):
287 debug("_parseRequestReply") 273 debug("_parseRequestReply")
308 # Ensure reply is OK 294 # Ensure reply is OK
309 if rep != REPLY_SUCCESS: 295 if rep != REPLY_SUCCESS:
310 self.loseConnection() 296 self.loseConnection()
311 return 297 return
312 298
313 debug(_("Saving file in %s."), self.data["dest_path"])
314 self.dest_file = open(self.data["dest_path"], 'w')
315 self.state = STATE_TARGET_READY 299 self.state = STATE_TARGET_READY
316 self.activateCB(self.target_jid, self.initiator_jid, self.sid, self.IQ_id, self.xmlstream) 300 self.factory.activateCb(self.sid, self.factory.iq_id)
317
318 301
319 except struct.error, why: 302 except struct.error, why:
320 return None 303 return None
321 304
322 def connectionMade(self): 305 def connectionMade(self):
323 debug("connectionMade (mode = %s)" % self.mode) 306 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target")
324 self.host.registerProgressCB(self.transfert_id, self.getProgress) 307
325 308 if isinstance(self.factory, Socks5ClientFactory):
326 if self.mode == "target": 309 self.sid = self.factory.sid
310 self.data = self.factory.data
327 self.state = STATE_TARGET_INITIAL 311 self.state = STATE_TARGET_INITIAL
328 self._startNegotiation() 312 self._startNegotiation()
329 313
330 def connectRequested(self, addr, port): 314 def connectRequested(self, addr, port):
331 debug("connectRequested") 315 debug("connectRequested")
332 # Check for special connect to the namespace -- this signifies that the client 316
333 # is just checking to ensure it can connect to the streamhost 317 # Check that this session if expected
334 if addr == "http://jabber.org/protocol/bytestreams": 318 if not self.factory.hash_sid_map.has_key(addr):
335 self.connectCompleted(addr, 0) 319 #no: we refuse it
336 self.transport.loseConnection()
337 return
338
339 # Save addr, for cleanup
340 self.addr = addr
341
342 # Check to see if the requested address is already
343 # activated -- send an error if so
344 if addr in self.activeConns:
345 self.sendErrorReply(socks5.REPLY_CONN_NOT_ALLOWED)
346 return
347
348 # Add this address to the pending connections
349 if self.addConnection(addr, self):
350 self.connectCompleted(addr, 0)
351 self.transport.stopReading()
352 else:
353 self.sendErrorReply(socks5.REPLY_CONN_REFUSED) 320 self.sendErrorReply(socks5.REPLY_CONN_REFUSED)
354 321 return
355 def getProgress(self, data): 322 self.sid = self.factory.hash_sid_map[addr]
356 """Fill data with position of current transfert""" 323 self.factory.current_stream[self.sid]["start_transfer_cb"] = self.startTransfer
357 try: 324 self.connectCompleted(addr, 0)
358 data["position"] = str(self.dest_file.tell()) 325 self.transport.stopReading()
359 data["size"] = self.filesize 326
360 except (ValueError, AttributeError): 327 def startTransfer(self, file_obj):
361 pass 328 """Callback called when the result iq is received"""
362 329 d = self.beginFileTransfer(file_obj, self.transport)
330 d.addCallback(self.fileTransfered)
331
363 def fileTransfered(self, d): 332 def fileTransfered(self, d):
364 info(_("File transfer completed, closing connection")) 333 info(_("File transfer completed, closing connection"))
365 self.transport.loseConnection() 334 self.transport.loseConnection()
366 try: 335 self.factory.finishedCb(self.sid, True)
367 self.dest_file.close()
368 except:
369 pass
370
371 def updateTransfered(self, data):
372 self.transfered+=len(data)
373 return data
374 336
375 def connectCompleted(self, remotehost, remoteport): 337 def connectCompleted(self, remotehost, remoteport):
376 debug("connectCompleted") 338 debug("connectCompleted")
377 if self.addressType == ADDR_IPV4: 339 if self.addressType == ADDR_IPV4:
378 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) 340 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport)
379 elif self.addressType == ADDR_DOMAINNAME: 341 elif self.addressType == ADDR_DOMAINNAME:
380 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, 342 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0,
381 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) 343 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport)
382 self.transport.write(result) 344 self.transport.write(result)
383 self.state = STATE_READY 345 self.state = STATE_READY
384 self.dest_file=open(self.filepath)
385 d=self.beginFileTransfer(self.dest_file, self.transport, self.updateTransfered)
386 d.addCallback(self.fileTransfered)
387 346
388 def bindRequested(self, addr, port): 347 def bindRequested(self, addr, port):
389 pass 348 pass
390 349
391 def authenticateUserPass(self, user, passwd): 350 def authenticateUserPass(self, user, passwd):
392 debug("User/pass: %s/%s", user, passwd) 351 debug("User/pass: %s/%s", user, passwd)
393 return True 352 return True
394 353
395 def dataReceived(self, buf): 354 def dataReceived(self, buf):
396 if self.state == STATE_TARGET_READY: 355 if self.state == STATE_TARGET_READY:
397 self.dest_file.write(buf) 356 self.data["file_obj"].write(buf)
398 self.transfered+=len(buf)
399 return 357 return
400 358
401 self.buf = self.buf + buf 359 self.buf = self.buf + buf
402 if self.state == STATE_INITIAL: 360 if self.state == STATE_INITIAL:
403 self._parseNegotiation() 361 self._parseNegotiation()
420 debug("clientConnectionLost") 378 debug("clientConnectionLost")
421 self.transport.loseConnection() 379 self.transport.loseConnection()
422 380
423 def connectionLost(self, reason): 381 def connectionLost(self, reason):
424 debug("connectionLost") 382 debug("connectionLost")
425 self.host.removeProgressCB(self.transfert_id) 383 if self.state != STATE_CONNECT_PENDING:
426 if self.state == STATE_CONNECT_PENDING:
427 self.removePendingConnection(self.addr, self)
428 else:
429 self.transport.unregisterProducer() 384 self.transport.unregisterProducer()
430 if self.peersock != None: 385 if self.peersock != None:
431 self.peersock.peersock = None 386 self.peersock.peersock = None
432 self.peersock.transport.unregisterProducer() 387 self.peersock.transport.unregisterProducer()
433 self.peersock = None 388 self.peersock = None
434 self.removeActiveConnection(self.addr) 389
435 390
436 class Socks5ServerFactory(protocol.ServerFactory): 391 class Socks5ServerFactory(protocol.ServerFactory):
437 protocol = SOCKSv5 392 protocol = SOCKSv5
438 protocol.mode = "initiator" #FIXME: Q&D way, fix it 393
439 394 def __init__(self, current_stream, hash_sid_map, finishedCb):
395 self.current_stream = current_stream
396 self.hash_sid_map = hash_sid_map
397 self.finishedCb = finishedCb
440 398
441 def startedConnecting(self, connector): 399 def startedConnecting(self, connector):
442 debug (_("Socks 5 server connection started")) 400 debug (_("Socks 5 server connection started"))
443 401
444 def clientConnectionLost(self, connector, reason): 402 def clientConnectionLost(self, connector, reason):
445 debug (_("Socks 5 server connection lost (reason: %s)"), reason) 403 debug (_("Socks 5 server connection lost (reason: %s)"), reason)
446 404
447 class Socks5ClientFactory(protocol.ClientFactory): 405 class Socks5ClientFactory(protocol.ClientFactory):
448 protocol = SOCKSv5 406 protocol = SOCKSv5
449 protocol.mode = "target" #FIXME: Q&D way, fix it 407
408 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb):
409 self.data = current_stream[sid]
410 self.sid = sid
411 self.iq_id = iq_id
412 self.activateCb = activateCb
413 self.finishedCb = finishedCb
450 414
451 def startedConnecting(self, connector): 415 def startedConnecting(self, connector):
452 debug (_("Socks 5 client connection started")) 416 debug (_("Socks 5 client connection started"))
453 417
454 def clientConnectionLost(self, connector, reason): 418 def clientConnectionLost(self, connector, reason):
455 debug (_("Socks 5 client connection lost (reason: %s)"), reason) 419 debug (_("Socks 5 client connection lost"))
420 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone) #TODO: really check if the state is actually successful
456 421
457 422
458 class XEP_0065(): 423 class XEP_0065():
459 424
425 NAMESPACE = NS_BS
426
460 params = """ 427 params = """
461 <params> 428 <params>
462 <general> 429 <general>
463 <category name="File Transfert"> 430 <category name="File Transfer">
464 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> 431 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" />
465 <param name="Port" value="28915" type="string" /> 432 <param name="Port" value="28915" type="string" />
466 </category> 433 </category>
467 </general> 434 </general>
468 </params> 435 </params>
469 """ 436 """
470 437
471 def __init__(self, host): 438 def __init__(self, host):
472 info(_("Plugin XEP_0065 initialization")) 439 info(_("Plugin XEP_0065 initialization"))
440
441 #session data
442 self.current_stream = {} #key: stream_id, value: data(dict)
443 self.hash_sid_map = {} #key: hash of the transfer session, value: session id
444
473 self.host = host 445 self.host = host
474 debug(_("registering")) 446 debug(_("registering"))
475 self.server_factory = Socks5ServerFactory() 447 self.server_factory = Socks5ServerFactory(self.current_stream, self.hash_sid_map, self._killId)
476 self.server_factory.protocol.host = self.host #needed for progress CB
477 self.client_factory = Socks5ClientFactory()
478 448
479 #parameters 449 #parameters
480 host.memory.importParams(XEP_0065.params) 450 host.memory.importParams(XEP_0065.params)
481 host.memory.setDefault("IP", "File Transfert", self.getExternalIP) 451 host.memory.setDefault("IP", "File Transfer", self.getExternalIP)
482 452 port = int(self.host.memory.getParamA("Port", "File Transfer"))
483 port = int(self.host.memory.getParamA("Port", "File Transfert")) 453
484 info(_("Launching Socks5 Stream server on port %d"), port) 454 info(_("Launching Socks5 Stream server on port %d"), port)
485 reactor.listenTCP(port, self.server_factory) 455 reactor.listenTCP(port, self.server_factory)
486 456
487 def getHandler(self, profile): 457 def getHandler(self, profile):
488 return XEP_0065_handler(self) 458 return XEP_0065_handler(self)
489 459
490 def getExternalIP(self): 460 def getExternalIP(self):
491 """Return IP visible from outside, by asking to a website""" 461 """Return IP visible from outside, by asking to a website"""
492 return getPage("http://www.goffi.org/sat_tools/get_ip.php") 462 return getPage("http://www.goffi.org/sat_tools/get_ip.php")
493 463
464 def getProgress(self, sid, data):
465 """Fill data with position of current transfer"""
466 try:
467 file_obj = self.current_stream[sid]["file_obj"]
468 data["position"] = str(file_obj.tell())
469 data["size"] = str(self.current_stream[sid]["size"])
470 except:
471 pass
472
473 def _timeOut(self, sid):
474 """Delecte current_stream id, called after timeout
475 @param id: id of self.current_stream"""
476 info(_("Socks5 Bytestream: TimeOut reached for id %s") % sid);
477 self._killId(sid, False, "TIMEOUT")
478
479 def _killId(self, sid, success=False, failure_reason="UNKNOWN"):
480 """Delete an current_stream id, clean up associated observers
481 @param sid: id of self.current_stream"""
482 if not self.current_stream.has_key(sid):
483 warning(_("kill id called on a non existant id"))
484 return
485 if self.current_stream[sid].has_key("observer_cb"):
486 xmlstream = self.current_stream[sid]["xmlstream"]
487 xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"])
488 if self.current_stream[sid]['timer'].active():
489 self.current_stream[sid]['timer'].cancel()
490 if self.current_stream[sid].has_key("size"):
491 self.host.removeProgressCB(sid)
492
493 file_obj = self.current_stream[sid]['file_obj']
494 success_cb = self.current_stream[sid]['success_cb']
495 failure_cb = self.current_stream[sid]['failure_cb']
496
497 del self.current_stream[sid]
498 if self.hash_sid_map.has_key(sid):
499 del self.hash_sid_map[sid]
500
501 if success:
502 success_cb(sid, file_obj, NS_BS)
503 else:
504 failure_cb(sid, file_obj, NS_BS, failure_reason)
505
494 def setData(self, data, id): 506 def setData(self, data, id):
495 self.data = data 507 self.data = data
496 self.transfert_id = id 508 self.transfer_id = id
497 509
498 def sendFile(self, id, filepath, size): 510 def sendFile(self, id, filepath, size):
499 #lauching socks5 initiator 511 #lauching socks5 requester
500 debug(_("Launching socks5 initiator")) 512 debug(_("Launching socks5 requester"))
501 self.server_factory.protocol.mode = "initiator" 513 self.server_factory.protocol.mode = "requester"
502 self.server_factory.protocol.filepath = filepath 514 self.server_factory.protocol.filepath = filepath
503 self.server_factory.protocol.filesize = size 515 self.server_factory.protocol.filesize = size
504 self.server_factory.protocol.transfert_id = id 516 self.server_factory.protocol.transfer_id = id
505 517
506 def getFile(self, iq, profile_key='@DEFAULT@'): 518
519 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'):
520 """Launch the stream workflow
521 @param file_obj: file_obj to send
522 @param to_jid: JID of the recipient
523 @param sid: Stream session id
524 @param length: number of byte to send, or None to send until the end
525 @param successCb: method to call when stream successfuly finished
526 @param failureCb: method to call when something goes wrong
527 @param profile: %(doc_profile)s"""
528 if length != None:
529 error(_('stream length not managed yet'))
530 return;
531 profile_jid, xmlstream = self.host.getJidNStream(profile)
532 data = self.current_stream[sid] = {}
533 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
534 data["file_obj"] = file_obj
535 data["to"] = to_jid
536 data["success_cb"] = successCb
537 data["failure_cb"] = failureCb
538 data["xmlstream"] = xmlstream
539 data["hash"] = calculateHash(profile_jid, to_jid, sid)
540 self.hash_sid_map[data["hash"]] = sid
541 if size:
542 data["size"] = size
543 self.host.registerProgressCB(sid, self.getProgress)
544 iq_elt = client.IQ(xmlstream,'set')
545 iq_elt["from"] = profile_jid.full()
546 iq_elt["to"] = to_jid.full()
547 query_elt = iq_elt.addElement('query', NS_BS)
548 query_elt['mode'] = 'tcp'
549 query_elt['sid'] = sid
550 streamhost = query_elt.addElement('streamhost')
551 streamhost['host'] = "127.0.0.1" #self.host.memory.getParamA("IP", "File Transfer")
552 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer")
553 streamhost['jid'] = profile_jid.full()
554 iq_elt.addCallback(self.iqResult, sid)
555 iq_elt.send()
556
557 def iqResult(self, sid, iq_elt):
558 """Called when the result of open iq is received"""
559 if iq_elt["type"] == "error":
560 warning(_("Transfer failed"))
561 return
562
563 try:
564 data = self.current_stream[sid]
565 callback = data["start_transfer_cb"]
566 file_obj = data["file_obj"]
567 timer = data["timer"]
568 except KeyError:
569 error(_("Internal error, can't do transfer"))
570 return
571
572 if timer.active():
573 timer.cancel()
574
575 callback(file_obj)
576
577
578 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb):
579 """Called when a bytestream is imminent
580 @param from_jid: jid of the sender
581 @param sid: Stream id
582 @param file_obj: File object where data will be written
583 @param size: full size of the data, or None if unknown
584 @param success_cb: method to call when successfuly finished
585 @param failure_cb: method to call when something goes wrong"""
586 data = self.current_stream[sid] = {}
587 data["from"] = from_jid
588 data["file_obj"] = file_obj
589 data["seq"] = -1
590 if size:
591 data["size"] = size
592 self.host.registerProgressCB(sid, self.getProgress)
593 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid)
594 data["success_cb"] = success_cb
595 data["failure_cb"] = failure_cb
596
597
598 def streamQuery(self, iq_elt, profile):
507 """Get file using byte stream""" 599 """Get file using byte stream"""
508 client = self.host.getClient(profile_key) 600 debug(_("BS stream query"))
509 assert(client) 601 profile_jid, xmlstream = self.host.getJidNStream(profile)
510 iq.handled = True 602 iq_elt.handled = True
511 SI_elem = iq.firstChildElement() 603 query_elt = iq_elt.firstChildElement()
512 IQ_id = iq['id'] 604 sid = query_elt.getAttribute("sid")
513 for element in SI_elem.elements(): 605 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements())
514 if element.name == "streamhost": 606
515 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':element['host'], 'port':element['port']}) 607 if not sid in self.current_stream:
516 factory = self.client_factory 608 warning(_("Ignoring unexpected BS transfer: %s" % sid))
517 self.server_factory.protocol.mode = "target" 609 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream)
518 factory.protocol.host = self.host #needed for progress CB 610 return
519 factory.protocol.xmlstream = client.xmlstream 611
520 factory.protocol.data = self.data 612 self.current_stream[sid]["to"] = jid.JID(iq_elt["to"])
521 factory.protocol.transfert_id = self.transfert_id 613 self.current_stream[sid]["xmlstream"] = xmlstream
522 factory.protocol.filesize = self.data["size"] 614
523 factory.protocol.sid = SI_elem['sid'] 615 if not streamhost_elts:
524 factory.protocol.initiator_jid = element['jid'] 616 warning(_("No streamhost found in stream query %s" % sid))
525 factory.protocol.target_jid = client.jid.full() 617 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
526 factory.protocol.IQ_id = IQ_id 618 return
527 factory.protocol.activateCB = self.activateStream 619
528 reactor.connectTCP(element['host'], int(element['port']), factory) 620 streamhost_elt = streamhost_elts[0] #TODO: manage several streamhost elements case
621 sh_host = streamhost_elt.getAttribute("host")
622 sh_port = streamhost_elt.getAttribute("port")
623 sh_jid = streamhost_elt.getAttribute("jid")
624 if not sh_host or not sh_port or not sh_jid:
625 warning(_("incomplete streamhost element"))
626 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream)
627 return
628
629 self.current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid)
630
631 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port})
632 factory = Socks5ClientFactory(self.current_stream, sid, iq_elt["id"], self.activateStream, self._killId)
633 reactor.connectTCP(sh_host, int(sh_port), factory)
529 634
530 def activateStream(self, from_jid, to_jid, sid, IQ_id, xmlstream): 635 def activateStream(self, sid, iq_id):
531 debug(_("activating stream")) 636 debug(_("activating stream"))
532 result = domish.Element(('', 'iq')) 637 result = domish.Element(('', 'iq'))
638 data = self.current_stream[sid]
533 result['type'] = 'result' 639 result['type'] = 'result'
534 result['id'] = IQ_id 640 result['id'] = iq_id
535 result['from'] = from_jid 641 result['from'] = data["to"].full()
536 result['to'] = to_jid 642 result['to'] = data["from"].full()
537 query = result.addElement('query', 'http://jabber.org/protocol/bytestreams') 643 query = result.addElement('query', NS_BS)
538 query['sid'] = sid 644 query['sid'] = sid
539 streamhost = query.addElement('streamhost-used') 645 streamhost = query.addElement('streamhost-used')
540 streamhost['jid'] = to_jid #FIXME: use real streamhost 646 streamhost['jid'] = data["streamhost"][2]
647 data["xmlstream"].send(result)
648
649 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream):
650 """Not acceptable error used when the stream is not expected or something is going wrong
651 @param iq_id: IQ id
652 @param to_jid: addressee
653 @param xmlstream: XML stream to use to send the error"""
654 result = domish.Element(('', 'iq'))
655 result['type'] = 'result'
656 result['id'] = iq_id
657 result['to'] = to_jid
658 error_el = result.addElement('error')
659 error_el['type'] = 'modify'
660 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable'))
661 xmlstream.send(result)
662
663 def sendBadRequestError(self, iq_id, to_jid, xmlstream):
664 """Not acceptable error used when the stream is not expected or something is going wrong
665 @param iq_id: IQ id
666 @param to_jid: addressee
667 @param xmlstream: XML stream to use to send the error"""
668 result = domish.Element(('', 'iq'))
669 result['type'] = 'result'
670 result['id'] = iq_id
671 result['to'] = to_jid
672 error_el = result.addElement('error')
673 error_el['type'] = 'cancel'
674 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','bad-request'))
541 xmlstream.send(result) 675 xmlstream.send(result)
542 676
543 class XEP_0065_handler(XMPPHandler): 677 class XEP_0065_handler(XMPPHandler):
544 implements(iwokkel.IDisco) 678 implements(iwokkel.IDisco)
545 679
546 def __init__(self, plugin_parent): 680 def __init__(self, plugin_parent):
547 self.plugin_parent = plugin_parent 681 self.plugin_parent = plugin_parent
548 self.host = plugin_parent.host 682 self.host = plugin_parent.host
549 683
550 def connectionInitialized(self): 684 def connectionInitialized(self):
551 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.getFile) 685 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile)
552 686
553 687
554 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): 688 def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
555 return [disco.DiscoFeature(NS_BS)] 689 return [disco.DiscoFeature(NS_BS)]
556 690