Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 394:8f3551ceee17
plugin XEP-0065: refactored and misc stuff fixed. Still not finished
plugins XEP-0096: XEP-0065 (Socks5 stream method) managed
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 03 Oct 2011 18:05:15 +0200 |
parents | 7c79d4a8c9e6 |
children | cb0285372818 |
comparison
equal
deleted
inserted
replaced
393:393b35aa86d2 | 394:8f3551ceee17 |
---|---|
53 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | 53 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
54 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | 54 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
55 THE SOFTWARE. | 55 THE SOFTWARE. |
56 """ | 56 """ |
57 | 57 |
58 from logging import debug, info, error | 58 from logging import debug, info, warning, error |
59 from twisted.internet import protocol, reactor | 59 from twisted.internet import protocol, reactor |
60 from twisted.internet import error as jab_error | |
61 from twisted.words.protocols.jabber import client, jid | |
60 from twisted.protocols.basic import FileSender | 62 from twisted.protocols.basic import FileSender |
61 from twisted.words.xish import domish | 63 from twisted.words.xish import domish |
62 from twisted.web.client import getPage | 64 from twisted.web.client import getPage |
63 import struct | 65 import struct |
64 import urllib | 66 import urllib |
74 from wokkel import disco, iwokkel | 76 from wokkel import disco, iwokkel |
75 | 77 |
76 IQ_SET = '/iq[@type="set"]' | 78 IQ_SET = '/iq[@type="set"]' |
77 NS_BS = 'http://jabber.org/protocol/bytestreams' | 79 NS_BS = 'http://jabber.org/protocol/bytestreams' |
78 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' | 80 BS_REQUEST = IQ_SET + '/query[@xmlns="' + NS_BS + '"]' |
81 TIMEOUT = 60 #timeout for workflow | |
79 | 82 |
80 | 83 |
81 | 84 |
82 PLUGIN_INFO = { | 85 PLUGIN_INFO = { |
83 "name": "XEP 0065 Plugin", | 86 "name": "XEP 0065 Plugin", |
125 REPLY_TTL_EXPIRED = 0x06 | 128 REPLY_TTL_EXPIRED = 0x06 |
126 REPLY_CMD_NOT_SUPPORTED = 0x07 | 129 REPLY_CMD_NOT_SUPPORTED = 0x07 |
127 REPLY_ADDR_NOT_SUPPORTED = 0x08 | 130 REPLY_ADDR_NOT_SUPPORTED = 0x08 |
128 | 131 |
129 | 132 |
133 def calculateHash(from_jid, to_jid, sid): | |
134 """Calculate SHA1 Hash according to XEP-0065 | |
135 @param from_jid: jid of the requester | |
136 @param to_jid: jid of the target | |
137 @param sid: session id | |
138 @return: hash (string)""" | |
139 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() | |
130 | 140 |
131 | 141 |
132 | 142 |
133 class SOCKSv5(protocol.Protocol, FileSender): | 143 class SOCKSv5(protocol.Protocol, FileSender): |
134 def __init__(self): | 144 def __init__(self): |
139 self.supportedAddrs = [ ADDR_DOMAINNAME ] | 149 self.supportedAddrs = [ ADDR_DOMAINNAME ] |
140 self.enabledCommands = [ CMD_CONNECT ] | 150 self.enabledCommands = [ CMD_CONNECT ] |
141 self.peersock = None | 151 self.peersock = None |
142 self.addressType = 0 | 152 self.addressType = 0 |
143 self.requestType = 0 | 153 self.requestType = 0 |
144 self.activeConns = {} | |
145 self.pendingConns = {} | |
146 self.transfered = 0 #nb of bytes already copied | |
147 | 154 |
148 def _startNegotiation(self): | 155 def _startNegotiation(self): |
149 debug("_startNegotiation") | 156 debug("_startNegotiation") |
150 self.state = STATE_TARGET_AUTH | 157 self.state = STATE_TARGET_AUTH |
151 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) | 158 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) |
211 # Any other address types are not supported | 218 # Any other address types are not supported |
212 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) | 219 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) |
213 self.transport.write(result) | 220 self.transport.write(result) |
214 self.transport.loseConnection() | 221 self.transport.loseConnection() |
215 | 222 |
216 def addConnection(self, address, connection): | |
217 info(_("Adding connection: %(address)s, %(connection)s") % {'address':address, 'connection':connection}) | |
218 olist = self.pendingConns.get(address, []) | |
219 if len(olist) <= 1: | |
220 olist.append(connection) | |
221 self.pendingConns[address] = olist | |
222 return True | |
223 else: | |
224 return False | |
225 | |
226 def removePendingConnection(self, address, connection): | |
227 olist = self.pendingConns[address] | |
228 if len(olist) == 1: | |
229 del self.pendingConns[address] | |
230 else: | |
231 olist.remove(connection) | |
232 self.pendingConns[address] = olist | |
233 | |
234 def removeActiveConnection(self, address): | |
235 del self.activeConns[address] | |
236 | |
237 def _parseRequest(self): | 223 def _parseRequest(self): |
238 debug("_parseRequest") | 224 debug("_parseRequest") |
239 try: | 225 try: |
240 # Parse out data and trim buffer accordingly | 226 # Parse out data and trim buffer accordingly |
241 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) | 227 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) |
277 return None | 263 return None |
278 | 264 |
279 def _makeRequest(self): | 265 def _makeRequest(self): |
280 debug("_makeRequest") | 266 debug("_makeRequest") |
281 self.state = STATE_TARGET_REQUEST | 267 self.state = STATE_TARGET_REQUEST |
282 sha1 = hashlib.sha1(self.sid + self.initiator_jid + self.target_jid).hexdigest() | 268 sha1 = calculateHash(self.data["from"], self.data["to"], self.sid) |
283 request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) | 269 request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) |
284 self.transport.write(request) | 270 self.transport.write(request) |
285 | 271 |
286 def _parseRequestReply(self): | 272 def _parseRequestReply(self): |
287 debug("_parseRequestReply") | 273 debug("_parseRequestReply") |
308 # Ensure reply is OK | 294 # Ensure reply is OK |
309 if rep != REPLY_SUCCESS: | 295 if rep != REPLY_SUCCESS: |
310 self.loseConnection() | 296 self.loseConnection() |
311 return | 297 return |
312 | 298 |
313 debug(_("Saving file in %s."), self.data["dest_path"]) | |
314 self.dest_file = open(self.data["dest_path"], 'w') | |
315 self.state = STATE_TARGET_READY | 299 self.state = STATE_TARGET_READY |
316 self.activateCB(self.target_jid, self.initiator_jid, self.sid, self.IQ_id, self.xmlstream) | 300 self.factory.activateCb(self.sid, self.factory.iq_id) |
317 | |
318 | 301 |
319 except struct.error, why: | 302 except struct.error, why: |
320 return None | 303 return None |
321 | 304 |
322 def connectionMade(self): | 305 def connectionMade(self): |
323 debug("connectionMade (mode = %s)" % self.mode) | 306 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") |
324 self.host.registerProgressCB(self.transfert_id, self.getProgress) | 307 |
325 | 308 if isinstance(self.factory, Socks5ClientFactory): |
326 if self.mode == "target": | 309 self.sid = self.factory.sid |
310 self.data = self.factory.data | |
327 self.state = STATE_TARGET_INITIAL | 311 self.state = STATE_TARGET_INITIAL |
328 self._startNegotiation() | 312 self._startNegotiation() |
329 | 313 |
330 def connectRequested(self, addr, port): | 314 def connectRequested(self, addr, port): |
331 debug("connectRequested") | 315 debug("connectRequested") |
332 # Check for special connect to the namespace -- this signifies that the client | 316 |
333 # is just checking to ensure it can connect to the streamhost | 317 # Check that this session if expected |
334 if addr == "http://jabber.org/protocol/bytestreams": | 318 if not self.factory.hash_sid_map.has_key(addr): |
335 self.connectCompleted(addr, 0) | 319 #no: we refuse it |
336 self.transport.loseConnection() | |
337 return | |
338 | |
339 # Save addr, for cleanup | |
340 self.addr = addr | |
341 | |
342 # Check to see if the requested address is already | |
343 # activated -- send an error if so | |
344 if addr in self.activeConns: | |
345 self.sendErrorReply(socks5.REPLY_CONN_NOT_ALLOWED) | |
346 return | |
347 | |
348 # Add this address to the pending connections | |
349 if self.addConnection(addr, self): | |
350 self.connectCompleted(addr, 0) | |
351 self.transport.stopReading() | |
352 else: | |
353 self.sendErrorReply(socks5.REPLY_CONN_REFUSED) | 320 self.sendErrorReply(socks5.REPLY_CONN_REFUSED) |
354 | 321 return |
355 def getProgress(self, data): | 322 self.sid = self.factory.hash_sid_map[addr] |
356 """Fill data with position of current transfert""" | 323 self.factory.current_stream[self.sid]["start_transfer_cb"] = self.startTransfer |
357 try: | 324 self.connectCompleted(addr, 0) |
358 data["position"] = str(self.dest_file.tell()) | 325 self.transport.stopReading() |
359 data["size"] = self.filesize | 326 |
360 except (ValueError, AttributeError): | 327 def startTransfer(self, file_obj): |
361 pass | 328 """Callback called when the result iq is received""" |
362 | 329 d = self.beginFileTransfer(file_obj, self.transport) |
330 d.addCallback(self.fileTransfered) | |
331 | |
363 def fileTransfered(self, d): | 332 def fileTransfered(self, d): |
364 info(_("File transfer completed, closing connection")) | 333 info(_("File transfer completed, closing connection")) |
365 self.transport.loseConnection() | 334 self.transport.loseConnection() |
366 try: | 335 self.factory.finishedCb(self.sid, True) |
367 self.dest_file.close() | |
368 except: | |
369 pass | |
370 | |
371 def updateTransfered(self, data): | |
372 self.transfered+=len(data) | |
373 return data | |
374 | 336 |
375 def connectCompleted(self, remotehost, remoteport): | 337 def connectCompleted(self, remotehost, remoteport): |
376 debug("connectCompleted") | 338 debug("connectCompleted") |
377 if self.addressType == ADDR_IPV4: | 339 if self.addressType == ADDR_IPV4: |
378 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) | 340 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) |
379 elif self.addressType == ADDR_DOMAINNAME: | 341 elif self.addressType == ADDR_DOMAINNAME: |
380 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, | 342 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, |
381 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) | 343 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) |
382 self.transport.write(result) | 344 self.transport.write(result) |
383 self.state = STATE_READY | 345 self.state = STATE_READY |
384 self.dest_file=open(self.filepath) | |
385 d=self.beginFileTransfer(self.dest_file, self.transport, self.updateTransfered) | |
386 d.addCallback(self.fileTransfered) | |
387 | 346 |
388 def bindRequested(self, addr, port): | 347 def bindRequested(self, addr, port): |
389 pass | 348 pass |
390 | 349 |
391 def authenticateUserPass(self, user, passwd): | 350 def authenticateUserPass(self, user, passwd): |
392 debug("User/pass: %s/%s", user, passwd) | 351 debug("User/pass: %s/%s", user, passwd) |
393 return True | 352 return True |
394 | 353 |
395 def dataReceived(self, buf): | 354 def dataReceived(self, buf): |
396 if self.state == STATE_TARGET_READY: | 355 if self.state == STATE_TARGET_READY: |
397 self.dest_file.write(buf) | 356 self.data["file_obj"].write(buf) |
398 self.transfered+=len(buf) | |
399 return | 357 return |
400 | 358 |
401 self.buf = self.buf + buf | 359 self.buf = self.buf + buf |
402 if self.state == STATE_INITIAL: | 360 if self.state == STATE_INITIAL: |
403 self._parseNegotiation() | 361 self._parseNegotiation() |
420 debug("clientConnectionLost") | 378 debug("clientConnectionLost") |
421 self.transport.loseConnection() | 379 self.transport.loseConnection() |
422 | 380 |
423 def connectionLost(self, reason): | 381 def connectionLost(self, reason): |
424 debug("connectionLost") | 382 debug("connectionLost") |
425 self.host.removeProgressCB(self.transfert_id) | 383 if self.state != STATE_CONNECT_PENDING: |
426 if self.state == STATE_CONNECT_PENDING: | |
427 self.removePendingConnection(self.addr, self) | |
428 else: | |
429 self.transport.unregisterProducer() | 384 self.transport.unregisterProducer() |
430 if self.peersock != None: | 385 if self.peersock != None: |
431 self.peersock.peersock = None | 386 self.peersock.peersock = None |
432 self.peersock.transport.unregisterProducer() | 387 self.peersock.transport.unregisterProducer() |
433 self.peersock = None | 388 self.peersock = None |
434 self.removeActiveConnection(self.addr) | 389 |
435 | 390 |
436 class Socks5ServerFactory(protocol.ServerFactory): | 391 class Socks5ServerFactory(protocol.ServerFactory): |
437 protocol = SOCKSv5 | 392 protocol = SOCKSv5 |
438 protocol.mode = "initiator" #FIXME: Q&D way, fix it | 393 |
439 | 394 def __init__(self, current_stream, hash_sid_map, finishedCb): |
395 self.current_stream = current_stream | |
396 self.hash_sid_map = hash_sid_map | |
397 self.finishedCb = finishedCb | |
440 | 398 |
441 def startedConnecting(self, connector): | 399 def startedConnecting(self, connector): |
442 debug (_("Socks 5 server connection started")) | 400 debug (_("Socks 5 server connection started")) |
443 | 401 |
444 def clientConnectionLost(self, connector, reason): | 402 def clientConnectionLost(self, connector, reason): |
445 debug (_("Socks 5 server connection lost (reason: %s)"), reason) | 403 debug (_("Socks 5 server connection lost (reason: %s)"), reason) |
446 | 404 |
447 class Socks5ClientFactory(protocol.ClientFactory): | 405 class Socks5ClientFactory(protocol.ClientFactory): |
448 protocol = SOCKSv5 | 406 protocol = SOCKSv5 |
449 protocol.mode = "target" #FIXME: Q&D way, fix it | 407 |
408 def __init__(self, current_stream, sid, iq_id, activateCb, finishedCb): | |
409 self.data = current_stream[sid] | |
410 self.sid = sid | |
411 self.iq_id = iq_id | |
412 self.activateCb = activateCb | |
413 self.finishedCb = finishedCb | |
450 | 414 |
451 def startedConnecting(self, connector): | 415 def startedConnecting(self, connector): |
452 debug (_("Socks 5 client connection started")) | 416 debug (_("Socks 5 client connection started")) |
453 | 417 |
454 def clientConnectionLost(self, connector, reason): | 418 def clientConnectionLost(self, connector, reason): |
455 debug (_("Socks 5 client connection lost (reason: %s)"), reason) | 419 debug (_("Socks 5 client connection lost")) |
420 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone) #TODO: really check if the state is actually successful | |
456 | 421 |
457 | 422 |
458 class XEP_0065(): | 423 class XEP_0065(): |
459 | 424 |
425 NAMESPACE = NS_BS | |
426 | |
460 params = """ | 427 params = """ |
461 <params> | 428 <params> |
462 <general> | 429 <general> |
463 <category name="File Transfert"> | 430 <category name="File Transfer"> |
464 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> | 431 <param name="IP" value='0.0.0.0' default_cb='yes' type="string" /> |
465 <param name="Port" value="28915" type="string" /> | 432 <param name="Port" value="28915" type="string" /> |
466 </category> | 433 </category> |
467 </general> | 434 </general> |
468 </params> | 435 </params> |
469 """ | 436 """ |
470 | 437 |
471 def __init__(self, host): | 438 def __init__(self, host): |
472 info(_("Plugin XEP_0065 initialization")) | 439 info(_("Plugin XEP_0065 initialization")) |
440 | |
441 #session data | |
442 self.current_stream = {} #key: stream_id, value: data(dict) | |
443 self.hash_sid_map = {} #key: hash of the transfer session, value: session id | |
444 | |
473 self.host = host | 445 self.host = host |
474 debug(_("registering")) | 446 debug(_("registering")) |
475 self.server_factory = Socks5ServerFactory() | 447 self.server_factory = Socks5ServerFactory(self.current_stream, self.hash_sid_map, self._killId) |
476 self.server_factory.protocol.host = self.host #needed for progress CB | |
477 self.client_factory = Socks5ClientFactory() | |
478 | 448 |
479 #parameters | 449 #parameters |
480 host.memory.importParams(XEP_0065.params) | 450 host.memory.importParams(XEP_0065.params) |
481 host.memory.setDefault("IP", "File Transfert", self.getExternalIP) | 451 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) |
482 | 452 port = int(self.host.memory.getParamA("Port", "File Transfer")) |
483 port = int(self.host.memory.getParamA("Port", "File Transfert")) | 453 |
484 info(_("Launching Socks5 Stream server on port %d"), port) | 454 info(_("Launching Socks5 Stream server on port %d"), port) |
485 reactor.listenTCP(port, self.server_factory) | 455 reactor.listenTCP(port, self.server_factory) |
486 | 456 |
487 def getHandler(self, profile): | 457 def getHandler(self, profile): |
488 return XEP_0065_handler(self) | 458 return XEP_0065_handler(self) |
489 | 459 |
490 def getExternalIP(self): | 460 def getExternalIP(self): |
491 """Return IP visible from outside, by asking to a website""" | 461 """Return IP visible from outside, by asking to a website""" |
492 return getPage("http://www.goffi.org/sat_tools/get_ip.php") | 462 return getPage("http://www.goffi.org/sat_tools/get_ip.php") |
493 | 463 |
464 def getProgress(self, sid, data): | |
465 """Fill data with position of current transfer""" | |
466 try: | |
467 file_obj = self.current_stream[sid]["file_obj"] | |
468 data["position"] = str(file_obj.tell()) | |
469 data["size"] = str(self.current_stream[sid]["size"]) | |
470 except: | |
471 pass | |
472 | |
473 def _timeOut(self, sid): | |
474 """Delecte current_stream id, called after timeout | |
475 @param id: id of self.current_stream""" | |
476 info(_("Socks5 Bytestream: TimeOut reached for id %s") % sid); | |
477 self._killId(sid, False, "TIMEOUT") | |
478 | |
479 def _killId(self, sid, success=False, failure_reason="UNKNOWN"): | |
480 """Delete an current_stream id, clean up associated observers | |
481 @param sid: id of self.current_stream""" | |
482 if not self.current_stream.has_key(sid): | |
483 warning(_("kill id called on a non existant id")) | |
484 return | |
485 if self.current_stream[sid].has_key("observer_cb"): | |
486 xmlstream = self.current_stream[sid]["xmlstream"] | |
487 xmlstream.removeObserver(self.current_stream[sid]["event_data"], self.current_stream[sid]["observer_cb"]) | |
488 if self.current_stream[sid]['timer'].active(): | |
489 self.current_stream[sid]['timer'].cancel() | |
490 if self.current_stream[sid].has_key("size"): | |
491 self.host.removeProgressCB(sid) | |
492 | |
493 file_obj = self.current_stream[sid]['file_obj'] | |
494 success_cb = self.current_stream[sid]['success_cb'] | |
495 failure_cb = self.current_stream[sid]['failure_cb'] | |
496 | |
497 del self.current_stream[sid] | |
498 if self.hash_sid_map.has_key(sid): | |
499 del self.hash_sid_map[sid] | |
500 | |
501 if success: | |
502 success_cb(sid, file_obj, NS_BS) | |
503 else: | |
504 failure_cb(sid, file_obj, NS_BS, failure_reason) | |
505 | |
494 def setData(self, data, id): | 506 def setData(self, data, id): |
495 self.data = data | 507 self.data = data |
496 self.transfert_id = id | 508 self.transfer_id = id |
497 | 509 |
498 def sendFile(self, id, filepath, size): | 510 def sendFile(self, id, filepath, size): |
499 #lauching socks5 initiator | 511 #lauching socks5 requester |
500 debug(_("Launching socks5 initiator")) | 512 debug(_("Launching socks5 requester")) |
501 self.server_factory.protocol.mode = "initiator" | 513 self.server_factory.protocol.mode = "requester" |
502 self.server_factory.protocol.filepath = filepath | 514 self.server_factory.protocol.filepath = filepath |
503 self.server_factory.protocol.filesize = size | 515 self.server_factory.protocol.filesize = size |
504 self.server_factory.protocol.transfert_id = id | 516 self.server_factory.protocol.transfer_id = id |
505 | 517 |
506 def getFile(self, iq, profile_key='@DEFAULT@'): | 518 |
519 def startStream(self, file_obj, to_jid, sid, length, successCb, failureCb, size = None, profile='@NONE@'): | |
520 """Launch the stream workflow | |
521 @param file_obj: file_obj to send | |
522 @param to_jid: JID of the recipient | |
523 @param sid: Stream session id | |
524 @param length: number of byte to send, or None to send until the end | |
525 @param successCb: method to call when stream successfuly finished | |
526 @param failureCb: method to call when something goes wrong | |
527 @param profile: %(doc_profile)s""" | |
528 if length != None: | |
529 error(_('stream length not managed yet')) | |
530 return; | |
531 profile_jid, xmlstream = self.host.getJidNStream(profile) | |
532 data = self.current_stream[sid] = {} | |
533 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | |
534 data["file_obj"] = file_obj | |
535 data["to"] = to_jid | |
536 data["success_cb"] = successCb | |
537 data["failure_cb"] = failureCb | |
538 data["xmlstream"] = xmlstream | |
539 data["hash"] = calculateHash(profile_jid, to_jid, sid) | |
540 self.hash_sid_map[data["hash"]] = sid | |
541 if size: | |
542 data["size"] = size | |
543 self.host.registerProgressCB(sid, self.getProgress) | |
544 iq_elt = client.IQ(xmlstream,'set') | |
545 iq_elt["from"] = profile_jid.full() | |
546 iq_elt["to"] = to_jid.full() | |
547 query_elt = iq_elt.addElement('query', NS_BS) | |
548 query_elt['mode'] = 'tcp' | |
549 query_elt['sid'] = sid | |
550 streamhost = query_elt.addElement('streamhost') | |
551 streamhost['host'] = "127.0.0.1" #self.host.memory.getParamA("IP", "File Transfer") | |
552 streamhost['port'] = self.host.memory.getParamA("Port", "File Transfer") | |
553 streamhost['jid'] = profile_jid.full() | |
554 iq_elt.addCallback(self.iqResult, sid) | |
555 iq_elt.send() | |
556 | |
557 def iqResult(self, sid, iq_elt): | |
558 """Called when the result of open iq is received""" | |
559 if iq_elt["type"] == "error": | |
560 warning(_("Transfer failed")) | |
561 return | |
562 | |
563 try: | |
564 data = self.current_stream[sid] | |
565 callback = data["start_transfer_cb"] | |
566 file_obj = data["file_obj"] | |
567 timer = data["timer"] | |
568 except KeyError: | |
569 error(_("Internal error, can't do transfer")) | |
570 return | |
571 | |
572 if timer.active(): | |
573 timer.cancel() | |
574 | |
575 callback(file_obj) | |
576 | |
577 | |
578 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb): | |
579 """Called when a bytestream is imminent | |
580 @param from_jid: jid of the sender | |
581 @param sid: Stream id | |
582 @param file_obj: File object where data will be written | |
583 @param size: full size of the data, or None if unknown | |
584 @param success_cb: method to call when successfuly finished | |
585 @param failure_cb: method to call when something goes wrong""" | |
586 data = self.current_stream[sid] = {} | |
587 data["from"] = from_jid | |
588 data["file_obj"] = file_obj | |
589 data["seq"] = -1 | |
590 if size: | |
591 data["size"] = size | |
592 self.host.registerProgressCB(sid, self.getProgress) | |
593 data["timer"] = reactor.callLater(TIMEOUT, self._timeOut, sid) | |
594 data["success_cb"] = success_cb | |
595 data["failure_cb"] = failure_cb | |
596 | |
597 | |
598 def streamQuery(self, iq_elt, profile): | |
507 """Get file using byte stream""" | 599 """Get file using byte stream""" |
508 client = self.host.getClient(profile_key) | 600 debug(_("BS stream query")) |
509 assert(client) | 601 profile_jid, xmlstream = self.host.getJidNStream(profile) |
510 iq.handled = True | 602 iq_elt.handled = True |
511 SI_elem = iq.firstChildElement() | 603 query_elt = iq_elt.firstChildElement() |
512 IQ_id = iq['id'] | 604 sid = query_elt.getAttribute("sid") |
513 for element in SI_elem.elements(): | 605 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) |
514 if element.name == "streamhost": | 606 |
515 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':element['host'], 'port':element['port']}) | 607 if not sid in self.current_stream: |
516 factory = self.client_factory | 608 warning(_("Ignoring unexpected BS transfer: %s" % sid)) |
517 self.server_factory.protocol.mode = "target" | 609 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) |
518 factory.protocol.host = self.host #needed for progress CB | 610 return |
519 factory.protocol.xmlstream = client.xmlstream | 611 |
520 factory.protocol.data = self.data | 612 self.current_stream[sid]["to"] = jid.JID(iq_elt["to"]) |
521 factory.protocol.transfert_id = self.transfert_id | 613 self.current_stream[sid]["xmlstream"] = xmlstream |
522 factory.protocol.filesize = self.data["size"] | 614 |
523 factory.protocol.sid = SI_elem['sid'] | 615 if not streamhost_elts: |
524 factory.protocol.initiator_jid = element['jid'] | 616 warning(_("No streamhost found in stream query %s" % sid)) |
525 factory.protocol.target_jid = client.jid.full() | 617 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) |
526 factory.protocol.IQ_id = IQ_id | 618 return |
527 factory.protocol.activateCB = self.activateStream | 619 |
528 reactor.connectTCP(element['host'], int(element['port']), factory) | 620 streamhost_elt = streamhost_elts[0] #TODO: manage several streamhost elements case |
621 sh_host = streamhost_elt.getAttribute("host") | |
622 sh_port = streamhost_elt.getAttribute("port") | |
623 sh_jid = streamhost_elt.getAttribute("jid") | |
624 if not sh_host or not sh_port or not sh_jid: | |
625 warning(_("incomplete streamhost element")) | |
626 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) | |
627 return | |
628 | |
629 self.current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) | |
630 | |
631 info (_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host':sh_host, 'port':sh_port}) | |
632 factory = Socks5ClientFactory(self.current_stream, sid, iq_elt["id"], self.activateStream, self._killId) | |
633 reactor.connectTCP(sh_host, int(sh_port), factory) | |
529 | 634 |
530 def activateStream(self, from_jid, to_jid, sid, IQ_id, xmlstream): | 635 def activateStream(self, sid, iq_id): |
531 debug(_("activating stream")) | 636 debug(_("activating stream")) |
532 result = domish.Element(('', 'iq')) | 637 result = domish.Element(('', 'iq')) |
638 data = self.current_stream[sid] | |
533 result['type'] = 'result' | 639 result['type'] = 'result' |
534 result['id'] = IQ_id | 640 result['id'] = iq_id |
535 result['from'] = from_jid | 641 result['from'] = data["to"].full() |
536 result['to'] = to_jid | 642 result['to'] = data["from"].full() |
537 query = result.addElement('query', 'http://jabber.org/protocol/bytestreams') | 643 query = result.addElement('query', NS_BS) |
538 query['sid'] = sid | 644 query['sid'] = sid |
539 streamhost = query.addElement('streamhost-used') | 645 streamhost = query.addElement('streamhost-used') |
540 streamhost['jid'] = to_jid #FIXME: use real streamhost | 646 streamhost['jid'] = data["streamhost"][2] |
647 data["xmlstream"].send(result) | |
648 | |
649 def sendNotAcceptableError(self, iq_id, to_jid, xmlstream): | |
650 """Not acceptable error used when the stream is not expected or something is going wrong | |
651 @param iq_id: IQ id | |
652 @param to_jid: addressee | |
653 @param xmlstream: XML stream to use to send the error""" | |
654 result = domish.Element(('', 'iq')) | |
655 result['type'] = 'result' | |
656 result['id'] = iq_id | |
657 result['to'] = to_jid | |
658 error_el = result.addElement('error') | |
659 error_el['type'] = 'modify' | |
660 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','not-acceptable')) | |
661 xmlstream.send(result) | |
662 | |
663 def sendBadRequestError(self, iq_id, to_jid, xmlstream): | |
664 """Not acceptable error used when the stream is not expected or something is going wrong | |
665 @param iq_id: IQ id | |
666 @param to_jid: addressee | |
667 @param xmlstream: XML stream to use to send the error""" | |
668 result = domish.Element(('', 'iq')) | |
669 result['type'] = 'result' | |
670 result['id'] = iq_id | |
671 result['to'] = to_jid | |
672 error_el = result.addElement('error') | |
673 error_el['type'] = 'cancel' | |
674 error_el.addElement(('urn:ietf:params:xml:ns:xmpp-stanzas','bad-request')) | |
541 xmlstream.send(result) | 675 xmlstream.send(result) |
542 | 676 |
543 class XEP_0065_handler(XMPPHandler): | 677 class XEP_0065_handler(XMPPHandler): |
544 implements(iwokkel.IDisco) | 678 implements(iwokkel.IDisco) |
545 | 679 |
546 def __init__(self, plugin_parent): | 680 def __init__(self, plugin_parent): |
547 self.plugin_parent = plugin_parent | 681 self.plugin_parent = plugin_parent |
548 self.host = plugin_parent.host | 682 self.host = plugin_parent.host |
549 | 683 |
550 def connectionInitialized(self): | 684 def connectionInitialized(self): |
551 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.getFile) | 685 self.xmlstream.addObserver(BS_REQUEST, self.plugin_parent.streamQuery, profile = self.parent.profile) |
552 | 686 |
553 | 687 |
554 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): | 688 def getDiscoInfo(self, requestor, target, nodeIdentifier=''): |
555 return [disco.DiscoFeature(NS_BS)] | 689 return [disco.DiscoFeature(NS_BS)] |
556 | 690 |