Mercurial > libervia-backend
comparison src/plugins/plugin_xep_0065.py @ 993:301b342c697a
core: use of the new core.log module:
/!\ this is a massive refactoring and was largely automated, it probably did bring some bugs /!\
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 19 Apr 2014 19:19:19 +0200 |
parents | 132de9d487ac |
children | 9a85836f0d45 |
comparison
equal
deleted
inserted
replaced
992:f51a1895275c | 993:301b342c697a |
---|---|
53 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | 53 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
54 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | 54 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
55 # THE SOFTWARE. | 55 # THE SOFTWARE. |
56 | 56 |
57 from sat.core.i18n import _ | 57 from sat.core.i18n import _ |
58 from logging import debug, info, warning, error | 58 from sat.core.log import getLogger |
59 log = getLogger(__name__) | |
59 from twisted.internet import protocol, reactor | 60 from twisted.internet import protocol, reactor |
60 from twisted.internet import error as jab_error | 61 from twisted.internet import error |
61 from twisted.words.protocols.jabber import jid, client as jabber_client | 62 from twisted.words.protocols.jabber import jid, client as jabber_client |
62 from twisted.protocols.basic import FileSender | 63 from twisted.protocols.basic import FileSender |
63 from twisted.words.xish import domish | 64 from twisted.words.xish import domish |
64 from twisted.web.client import getPage | 65 from twisted.web.client import getPage |
65 from sat.core.exceptions import ProfileNotInCacheError | 66 from sat.core.exceptions import ProfileNotInCacheError |
137 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() | 138 return hashlib.sha1((sid + from_jid.full() + to_jid.full()).encode('utf-8')).hexdigest() |
138 | 139 |
139 | 140 |
140 class SOCKSv5(protocol.Protocol, FileSender): | 141 class SOCKSv5(protocol.Protocol, FileSender): |
141 def __init__(self): | 142 def __init__(self): |
142 debug(_("Protocol init")) | 143 log.debug(_("Protocol init")) |
143 self.state = STATE_INITIAL | 144 self.state = STATE_INITIAL |
144 self.buf = "" | 145 self.buf = "" |
145 self.supportedAuthMechs = [AUTHMECH_ANON] | 146 self.supportedAuthMechs = [AUTHMECH_ANON] |
146 self.supportedAddrs = [ADDR_DOMAINNAME] | 147 self.supportedAddrs = [ADDR_DOMAINNAME] |
147 self.enabledCommands = [CMD_CONNECT] | 148 self.enabledCommands = [CMD_CONNECT] |
148 self.peersock = None | 149 self.peersock = None |
149 self.addressType = 0 | 150 self.addressType = 0 |
150 self.requestType = 0 | 151 self.requestType = 0 |
151 | 152 |
152 def _startNegotiation(self): | 153 def _startNegotiation(self): |
153 debug("_startNegotiation") | 154 log.debug("_startNegotiation") |
154 self.state = STATE_TARGET_AUTH | 155 self.state = STATE_TARGET_AUTH |
155 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) | 156 self.transport.write(struct.pack('!3B', SOCKS5_VER, 1, AUTHMECH_ANON)) |
156 | 157 |
157 def _parseNegotiation(self): | 158 def _parseNegotiation(self): |
158 debug("_parseNegotiation") | 159 log.debug("_parseNegotiation") |
159 try: | 160 try: |
160 # Parse out data | 161 # Parse out data |
161 ver, nmethod = struct.unpack('!BB', self.buf[:2]) | 162 ver, nmethod = struct.unpack('!BB', self.buf[:2]) |
162 methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2]) | 163 methods = struct.unpack('%dB' % nmethod, self.buf[2:nmethod + 2]) |
163 | 164 |
187 self.transport.loseConnection() | 188 self.transport.loseConnection() |
188 except struct.error: | 189 except struct.error: |
189 pass | 190 pass |
190 | 191 |
191 def _parseUserPass(self): | 192 def _parseUserPass(self): |
192 debug("_parseUserPass") | 193 log.debug("_parseUserPass") |
193 try: | 194 try: |
194 # Parse out data | 195 # Parse out data |
195 ver, ulen = struct.unpack('BB', self.buf[:2]) | 196 ver, ulen = struct.unpack('BB', self.buf[:2]) |
196 uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) | 197 uname, = struct.unpack('%ds' % ulen, self.buf[2:ulen + 2]) |
197 plen, = struct.unpack('B', self.buf[ulen + 2]) | 198 plen, = struct.unpack('B', self.buf[ulen + 2]) |
209 self.transport.loseConnection() | 210 self.transport.loseConnection() |
210 except struct.error: | 211 except struct.error: |
211 pass | 212 pass |
212 | 213 |
213 def sendErrorReply(self, errorcode): | 214 def sendErrorReply(self, errorcode): |
214 debug("sendErrorReply") | 215 log.debug("sendErrorReply") |
215 # Any other address types are not supported | 216 # Any other address types are not supported |
216 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) | 217 result = struct.pack('!BBBBIH', SOCKS5_VER, errorcode, 0, 1, 0, 0) |
217 self.transport.write(result) | 218 self.transport.write(result) |
218 self.transport.loseConnection() | 219 self.transport.loseConnection() |
219 | 220 |
220 def _parseRequest(self): | 221 def _parseRequest(self): |
221 debug("_parseRequest") | 222 log.debug("_parseRequest") |
222 try: | 223 try: |
223 # Parse out data and trim buffer accordingly | 224 # Parse out data and trim buffer accordingly |
224 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) | 225 ver, cmd, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) |
225 | 226 |
226 # Ensure we actually support the requested address type | 227 # Ensure we actually support the requested address type |
258 | 259 |
259 except struct.error: | 260 except struct.error: |
260 return None | 261 return None |
261 | 262 |
262 def _makeRequest(self): | 263 def _makeRequest(self): |
263 debug("_makeRequest") | 264 log.debug("_makeRequest") |
264 self.state = STATE_TARGET_REQUEST | 265 self.state = STATE_TARGET_REQUEST |
265 sha1 = calculateHash(self.data["from"], self.data["to"], self.sid) | 266 sha1 = calculateHash(self.data["from"], self.data["to"], self.sid) |
266 request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) | 267 request = struct.pack('!5B%dsH' % len(sha1), SOCKS5_VER, CMD_CONNECT, 0, ADDR_DOMAINNAME, len(sha1), sha1, 0) |
267 self.transport.write(request) | 268 self.transport.write(request) |
268 | 269 |
269 def _parseRequestReply(self): | 270 def _parseRequestReply(self): |
270 debug("_parseRequestReply") | 271 log.debug("_parseRequestReply") |
271 try: | 272 try: |
272 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) | 273 ver, rep, rsvd, self.addressType = struct.unpack('!BBBB', self.buf[:4]) |
273 # Ensure we actually support the requested address type | 274 # Ensure we actually support the requested address type |
274 if self.addressType not in self.supportedAddrs: | 275 if self.addressType not in self.supportedAddrs: |
275 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) | 276 self.sendErrorReply(REPLY_ADDR_NOT_SUPPORTED) |
302 | 303 |
303 except struct.error: | 304 except struct.error: |
304 return None | 305 return None |
305 | 306 |
306 def connectionMade(self): | 307 def connectionMade(self): |
307 debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") | 308 log.debug("connectionMade (mode = %s)" % "requester" if isinstance(self.factory, Socks5ServerFactory) else "target") |
308 | 309 |
309 if isinstance(self.factory, Socks5ClientFactory): | 310 if isinstance(self.factory, Socks5ClientFactory): |
310 self.sid = self.factory.sid | 311 self.sid = self.factory.sid |
311 self.profile = self.factory.profile | 312 self.profile = self.factory.profile |
312 self.data = self.factory.data | 313 self.data = self.factory.data |
313 self.state = STATE_TARGET_INITIAL | 314 self.state = STATE_TARGET_INITIAL |
314 self._startNegotiation() | 315 self._startNegotiation() |
315 | 316 |
316 def connectRequested(self, addr, port): | 317 def connectRequested(self, addr, port): |
317 debug("connectRequested") | 318 log.debug("connectRequested") |
318 | 319 |
319 # Check that this session is expected | 320 # Check that this session is expected |
320 if addr not in self.factory.hash_sid_map: | 321 if addr not in self.factory.hash_sid_map: |
321 #no: we refuse it | 322 #no: we refuse it |
322 self.sendErrorReply(REPLY_CONN_REFUSED) | 323 self.sendErrorReply(REPLY_CONN_REFUSED) |
331 """Callback called when the result iq is received""" | 332 """Callback called when the result iq is received""" |
332 d = self.beginFileTransfer(file_obj, self.transport) | 333 d = self.beginFileTransfer(file_obj, self.transport) |
333 d.addCallback(self.fileTransfered) | 334 d.addCallback(self.fileTransfered) |
334 | 335 |
335 def fileTransfered(self, d): | 336 def fileTransfered(self, d): |
336 info(_("File transfer completed, closing connection")) | 337 log.info(_("File transfer completed, closing connection")) |
337 self.transport.loseConnection() | 338 self.transport.loseConnection() |
338 self.factory.finishedCb(self.sid, True, self.profile) | 339 self.factory.finishedCb(self.sid, True, self.profile) |
339 | 340 |
340 def connectCompleted(self, remotehost, remoteport): | 341 def connectCompleted(self, remotehost, remoteport): |
341 debug("connectCompleted") | 342 log.debug("connectCompleted") |
342 if self.addressType == ADDR_IPV4: | 343 if self.addressType == ADDR_IPV4: |
343 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) | 344 result = struct.pack('!BBBBIH', SOCKS5_VER, REPLY_SUCCESS, 0, 1, remotehost, remoteport) |
344 elif self.addressType == ADDR_DOMAINNAME: | 345 elif self.addressType == ADDR_DOMAINNAME: |
345 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, | 346 result = struct.pack('!BBBBB%dsH' % len(remotehost), SOCKS5_VER, REPLY_SUCCESS, 0, |
346 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) | 347 ADDR_DOMAINNAME, len(remotehost), remotehost, remoteport) |
349 | 350 |
350 def bindRequested(self, addr, port): | 351 def bindRequested(self, addr, port): |
351 pass | 352 pass |
352 | 353 |
353 def authenticateUserPass(self, user, passwd): | 354 def authenticateUserPass(self, user, passwd): |
354 debug("User/pass: %s/%s", user, passwd) | 355 log.debug("User/pass: %s/%s", user, passwd) |
355 return True | 356 return True |
356 | 357 |
357 def dataReceived(self, buf): | 358 def dataReceived(self, buf): |
358 if self.state == STATE_TARGET_READY: | 359 if self.state == STATE_TARGET_READY: |
359 self.data["file_obj"].write(buf) | 360 self.data["file_obj"].write(buf) |
375 self._makeRequest() | 376 self._makeRequest() |
376 if self.state == STATE_TARGET_REQUEST: | 377 if self.state == STATE_TARGET_REQUEST: |
377 self._parseRequestReply() | 378 self._parseRequestReply() |
378 | 379 |
379 def clientConnectionLost(self, reason): | 380 def clientConnectionLost(self, reason): |
380 debug("clientConnectionLost") | 381 log.debug("clientConnectionLost") |
381 self.transport.loseConnection() | 382 self.transport.loseConnection() |
382 | 383 |
383 def connectionLost(self, reason): | 384 def connectionLost(self, reason): |
384 debug("connectionLost") | 385 log.debug("connectionLost") |
385 if self.state != STATE_CONNECT_PENDING: | 386 if self.state != STATE_CONNECT_PENDING: |
386 self.transport.unregisterProducer() | 387 self.transport.unregisterProducer() |
387 if self.peersock is not None: | 388 if self.peersock is not None: |
388 self.peersock.peersock = None | 389 self.peersock.peersock = None |
389 self.peersock.transport.unregisterProducer() | 390 self.peersock.transport.unregisterProducer() |
397 self.host = host | 398 self.host = host |
398 self.hash_sid_map = hash_sid_map | 399 self.hash_sid_map = hash_sid_map |
399 self.finishedCb = finishedCb | 400 self.finishedCb = finishedCb |
400 | 401 |
401 def startedConnecting(self, connector): | 402 def startedConnecting(self, connector): |
402 debug(_("Socks 5 server connection started")) | 403 log.debug(_("Socks 5 server connection started")) |
403 | 404 |
404 def clientConnectionLost(self, connector, reason): | 405 def clientConnectionLost(self, connector, reason): |
405 debug(_("Socks 5 server connection lost (reason: %s)"), reason) | 406 log.debug(_("Socks 5 server connection lost (reason: %s)"), reason) |
406 | 407 |
407 | 408 |
408 class Socks5ClientFactory(protocol.ClientFactory): | 409 class Socks5ClientFactory(protocol.ClientFactory): |
409 protocol = SOCKSv5 | 410 protocol = SOCKSv5 |
410 | 411 |
425 self.finishedCb = finishedCb | 426 self.finishedCb = finishedCb |
426 self.proxy = proxy | 427 self.proxy = proxy |
427 self.profile = profile | 428 self.profile = profile |
428 | 429 |
429 def startedConnecting(self, connector): | 430 def startedConnecting(self, connector): |
430 debug(_("Socks 5 client connection started")) | 431 log.debug(_("Socks 5 client connection started")) |
431 | 432 |
432 def clientConnectionLost(self, connector, reason): | 433 def clientConnectionLost(self, connector, reason): |
433 debug(_("Socks 5 client connection lost (reason: %s)"), reason) | 434 log.debug(_("Socks 5 client connection lost (reason: %s)"), reason) |
434 self.finishedCb(self.sid, reason.type == jab_error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful | 435 self.finishedCb(self.sid, reason.type == error.ConnectionDone, self.profile) # TODO: really check if the state is actually successful |
435 | 436 |
436 | 437 |
437 class XEP_0065(object): | 438 class XEP_0065(object): |
438 | 439 |
439 NAMESPACE = NS_BS | 440 NAMESPACE = NS_BS |
455 </individual> | 456 </individual> |
456 </params> | 457 </params> |
457 """ | 458 """ |
458 | 459 |
459 def __init__(self, host): | 460 def __init__(self, host): |
460 info(_("Plugin XEP_0065 initialization")) | 461 log.info(_("Plugin XEP_0065 initialization")) |
461 | 462 |
462 #session data | 463 #session data |
463 self.hash_sid_map = {} # key: hash of the transfer session, value: (session id, profile) | 464 self.hash_sid_map = {} # key: hash of the transfer session, value: (session id, profile) |
464 | 465 |
465 self.host = host | 466 self.host = host |
466 debug(_("registering")) | 467 log.debug(_("registering")) |
467 self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) | 468 self.server_factory = Socks5ServerFactory(host, self.hash_sid_map, lambda sid, success, profile: self._killId(sid, success, profile=profile)) |
468 | 469 |
469 #parameters | 470 #parameters |
470 host.memory.updateParams(XEP_0065.params) | 471 host.memory.updateParams(XEP_0065.params) |
471 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) | 472 host.memory.setDefault("IP", "File Transfer", self.getExternalIP) |
472 port = int(self.host.memory.getParamA("Port", "File Transfer")) | 473 port = int(self.host.memory.getParamA("Port", "File Transfer")) |
473 | 474 |
474 info(_("Launching Socks5 Stream server on port %d"), port) | 475 log.info(_("Launching Socks5 Stream server on port %d") % port) |
475 reactor.listenTCP(port, self.server_factory) | 476 reactor.listenTCP(port, self.server_factory) |
476 | 477 |
477 def getHandler(self, profile): | 478 def getHandler(self, profile): |
478 return XEP_0065_handler(self) | 479 return XEP_0065_handler(self) |
479 | 480 |
496 pass | 497 pass |
497 | 498 |
498 def _timeOut(self, sid, profile): | 499 def _timeOut(self, sid, profile): |
499 """Delecte current_stream id, called after timeout | 500 """Delecte current_stream id, called after timeout |
500 @param id: id of client.xep_0065_current_stream""" | 501 @param id: id of client.xep_0065_current_stream""" |
501 info(_("Socks5 Bytestream: TimeOut reached for id %(sid)s [%(profile)s]") | 502 log.info(_("Socks5 Bytestream: TimeOut reached for id %(sid)s [%(profile)s]") |
502 % {"sid": sid, "profile": profile}) | 503 % {"sid": sid, "profile": profile}) |
503 self._killId(sid, False, "TIMEOUT", profile) | 504 self._killId(sid, False, "TIMEOUT", profile) |
504 | 505 |
505 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): | 506 def _killId(self, sid, success=False, failure_reason="UNKNOWN", profile=None): |
506 """Delete an current_stream id, clean up associated observers | 507 """Delete an current_stream id, clean up associated observers |
507 @param sid: id of client.xep_0065_current_stream""" | 508 @param sid: id of client.xep_0065_current_stream""" |
508 assert(profile) | 509 assert(profile) |
509 client = self.host.getClient(profile) | 510 client = self.host.getClient(profile) |
510 if sid not in client.xep_0065_current_stream: | 511 if sid not in client.xep_0065_current_stream: |
511 warning(_("kill id called on a non existant id")) | 512 log.warning(_("kill id called on a non existant id")) |
512 return | 513 return |
513 if "observer_cb" in client.xep_0065_current_stream[sid]: | 514 if "observer_cb" in client.xep_0065_current_stream[sid]: |
514 xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] | 515 xmlstream = client.xep_0065_current_stream[sid]["xmlstream"] |
515 xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) | 516 xmlstream.removeObserver(client.xep_0065_current_stream[sid]["event_data"], client.xep_0065_current_stream[sid]["observer_cb"]) |
516 if client.xep_0065_current_stream[sid]['timer'].active(): | 517 if client.xep_0065_current_stream[sid]['timer'].active(): |
544 @param profile: %(doc_profile)s""" | 545 @param profile: %(doc_profile)s""" |
545 assert(profile) | 546 assert(profile) |
546 client = self.host.getClient(profile) | 547 client = self.host.getClient(profile) |
547 | 548 |
548 if length is not None: | 549 if length is not None: |
549 error(_('stream length not managed yet')) | 550 log.error(_('stream length not managed yet')) |
550 return | 551 return |
551 | 552 |
552 profile_jid = client.jid | 553 profile_jid = client.jid |
553 xmlstream = client.xmlstream | 554 xmlstream = client.xmlstream |
554 | 555 |
587 iq_elt.send() | 588 iq_elt.send() |
588 | 589 |
589 def iqResult(self, sid, profile, iq_elt): | 590 def iqResult(self, sid, profile, iq_elt): |
590 """Called when the result of open iq is received""" | 591 """Called when the result of open iq is received""" |
591 if iq_elt["type"] == "error": | 592 if iq_elt["type"] == "error": |
592 warning(_("Transfer failed")) | 593 log.warning(_("Transfer failed")) |
593 return | 594 return |
594 client = self.host.getClient(profile) | 595 client = self.host.getClient(profile) |
595 try: | 596 try: |
596 data = client.xep_0065_current_stream[sid] | 597 data = client.xep_0065_current_stream[sid] |
597 file_obj = data["file_obj"] | 598 file_obj = data["file_obj"] |
598 timer = data["timer"] | 599 timer = data["timer"] |
599 except KeyError: | 600 except KeyError: |
600 error(_("Internal error, can't do transfer")) | 601 log.error(_("Internal error, can't do transfer")) |
601 return | 602 return |
602 | 603 |
603 if timer.active(): | 604 if timer.active(): |
604 timer.cancel() | 605 timer.cancel() |
605 | 606 |
606 profile_jid, xmlstream = self.host.getJidNStream(profile) | 607 profile_jid, xmlstream = self.host.getJidNStream(profile) |
607 query_elt = iq_elt.firstChildElement() | 608 query_elt = iq_elt.firstChildElement() |
608 streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) | 609 streamhost_elts = filter(lambda elt: elt.name == 'streamhost-used', query_elt.elements()) |
609 if not streamhost_elts: | 610 if not streamhost_elts: |
610 warning(_("No streamhost found in stream query")) | 611 log.warning(_("No streamhost found in stream query")) |
611 return | 612 return |
612 | 613 |
613 streamhost_jid = streamhost_elts[0]['jid'] | 614 streamhost_jid = streamhost_elts[0]['jid'] |
614 if streamhost_jid != profile_jid.full(): | 615 if streamhost_jid != profile_jid.full(): |
615 debug(_("A proxy server is used")) | 616 log.debug(_("A proxy server is used")) |
616 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) | 617 proxy_host = self.host.memory.getParamA("Proxy host", "File Transfer", profile_key=profile) |
617 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) | 618 proxy_port = self.host.memory.getParamA("Proxy port", "File Transfer", profile_key=profile) |
618 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) | 619 proxy_jid = self.host.memory.getParamA("Proxy", "File Transfer", profile_key=profile) |
619 if proxy_jid != streamhost_jid: | 620 if proxy_jid != streamhost_jid: |
620 warning(_("Proxy jid is not the same as in parameters, this should not happen")) | 621 log.warning(_("Proxy jid is not the same as in parameters, this should not happen")) |
621 return | 622 return |
622 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) | 623 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, None, self.activateProxyStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), True, profile) |
623 reactor.connectTCP(proxy_host, int(proxy_port), factory) | 624 reactor.connectTCP(proxy_host, int(proxy_port), factory) |
624 else: | 625 else: |
625 data["start_transfer_cb"](file_obj) # We now activate the stream | 626 data["start_transfer_cb"](file_obj) # We now activate the stream |
626 | 627 |
627 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): | 628 def activateProxyStream(self, sid, iq_id, start_transfer_cb, profile): |
628 debug(_("activating stream")) | 629 log.debug(_("activating stream")) |
629 client = self.host.getClient(profile) | 630 client = self.host.getClient(profile) |
630 data = client.xep_0065_current_stream[sid] | 631 data = client.xep_0065_current_stream[sid] |
631 profile_jid, xmlstream = self.host.getJidNStream(profile) | 632 profile_jid, xmlstream = self.host.getJidNStream(profile) |
632 | 633 |
633 iq_elt = client.IQ(xmlstream, 'set') | 634 iq_elt = client.IQ(xmlstream, 'set') |
639 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) | 640 iq_elt.addCallback(self.proxyResult, sid, start_transfer_cb, data['file_obj']) |
640 iq_elt.send() | 641 iq_elt.send() |
641 | 642 |
642 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): | 643 def proxyResult(self, sid, start_transfer_cb, file_obj, iq_elt): |
643 if iq_elt['type'] == 'error': | 644 if iq_elt['type'] == 'error': |
644 warning(_("Can't activate the proxy stream")) | 645 log.warning(_("Can't activate the proxy stream")) |
645 return | 646 return |
646 else: | 647 else: |
647 start_transfer_cb(file_obj) | 648 start_transfer_cb(file_obj) |
648 | 649 |
649 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): | 650 def prepareToReceive(self, from_jid, sid, file_obj, size, success_cb, failure_cb, profile): |
667 data["success_cb"] = success_cb | 668 data["success_cb"] = success_cb |
668 data["failure_cb"] = failure_cb | 669 data["failure_cb"] = failure_cb |
669 | 670 |
670 def streamQuery(self, iq_elt, profile): | 671 def streamQuery(self, iq_elt, profile): |
671 """Get file using byte stream""" | 672 """Get file using byte stream""" |
672 debug(_("BS stream query")) | 673 log.debug(_("BS stream query")) |
673 client = self.host.getClient(profile) | 674 client = self.host.getClient(profile) |
674 | 675 |
675 if not client: | 676 if not client: |
676 raise ProfileNotInCacheError | 677 raise ProfileNotInCacheError |
677 | 678 |
681 query_elt = iq_elt.firstChildElement() | 682 query_elt = iq_elt.firstChildElement() |
682 sid = query_elt.getAttribute("sid") | 683 sid = query_elt.getAttribute("sid") |
683 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) | 684 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) |
684 | 685 |
685 if not sid in client.xep_0065_current_stream: | 686 if not sid in client.xep_0065_current_stream: |
686 warning(_("Ignoring unexpected BS transfer: %s" % sid)) | 687 log.warning(_("Ignoring unexpected BS transfer: %s" % sid)) |
687 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) | 688 self.sendNotAcceptableError(iq_elt['id'], iq_elt['from'], xmlstream) |
688 return | 689 return |
689 | 690 |
690 client.xep_0065_current_stream[sid]['timer'].cancel() | 691 client.xep_0065_current_stream[sid]['timer'].cancel() |
691 client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) | 692 client.xep_0065_current_stream[sid]["to"] = jid.JID(iq_elt["to"]) |
692 client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream | 693 client.xep_0065_current_stream[sid]["xmlstream"] = xmlstream |
693 | 694 |
694 if not streamhost_elts: | 695 if not streamhost_elts: |
695 warning(_("No streamhost found in stream query %s" % sid)) | 696 log.warning(_("No streamhost found in stream query %s" % sid)) |
696 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) | 697 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) |
697 return | 698 return |
698 | 699 |
699 streamhost_elt = streamhost_elts[0] # TODO: manage several streamhost elements case | 700 streamhost_elt = streamhost_elts[0] # TODO: manage several streamhost elements case |
700 sh_host = streamhost_elt.getAttribute("host") | 701 sh_host = streamhost_elt.getAttribute("host") |
701 sh_port = streamhost_elt.getAttribute("port") | 702 sh_port = streamhost_elt.getAttribute("port") |
702 sh_jid = streamhost_elt.getAttribute("jid") | 703 sh_jid = streamhost_elt.getAttribute("jid") |
703 if not sh_host or not sh_port or not sh_jid: | 704 if not sh_host or not sh_port or not sh_jid: |
704 warning(_("incomplete streamhost element")) | 705 log.warning(_("incomplete streamhost element")) |
705 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) | 706 self.sendBadRequestError(iq_elt['id'], iq_elt['from'], xmlstream) |
706 return | 707 return |
707 | 708 |
708 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) | 709 client.xep_0065_current_stream[sid]["streamhost"] = (sh_host, sh_port, sh_jid) |
709 | 710 |
710 info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) | 711 log.info(_("Stream proposed: host=[%(host)s] port=[%(port)s]") % {'host': sh_host, 'port': sh_port}) |
711 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) | 712 factory = Socks5ClientFactory(client.xep_0065_current_stream, sid, iq_elt["id"], self.activateStream, lambda sid, success, profile: self._killId(sid, success, profile=profile), profile=profile) |
712 reactor.connectTCP(sh_host, int(sh_port), factory) | 713 reactor.connectTCP(sh_host, int(sh_port), factory) |
713 | 714 |
714 def activateStream(self, sid, iq_id, profile): | 715 def activateStream(self, sid, iq_id, profile): |
715 client = self.host.getClient(profile) | 716 client = self.host.getClient(profile) |
716 debug(_("activating stream")) | 717 log.debug(_("activating stream")) |
717 result = domish.Element((None, 'iq')) | 718 result = domish.Element((None, 'iq')) |
718 data = client.xep_0065_current_stream[sid] | 719 data = client.xep_0065_current_stream[sid] |
719 result['type'] = 'result' | 720 result['type'] = 'result' |
720 result['id'] = iq_id | 721 result['id'] = iq_id |
721 result['from'] = data["to"].full() | 722 result['from'] = data["to"].full() |
764 | 765 |
765 def _proxyDataResult(self, iq_elt): | 766 def _proxyDataResult(self, iq_elt): |
766 """Called with the informations about proxy according to XEP-0065 #4 | 767 """Called with the informations about proxy according to XEP-0065 #4 |
767 Params should be filled with these infos""" | 768 Params should be filled with these infos""" |
768 if iq_elt["type"] == "error": | 769 if iq_elt["type"] == "error": |
769 warning(_("Can't determine proxy informations")) | 770 log.warning(_("Can't determine proxy informations")) |
770 return | 771 return |
771 query_elt = iq_elt.firstChildElement() | 772 query_elt = iq_elt.firstChildElement() |
772 if query_elt.name != "query": | 773 if query_elt.name != "query": |
773 warning(_("Bad answer received from proxy")) | 774 log.warning(_("Bad answer received from proxy")) |
774 return | 775 return |
775 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) | 776 streamhost_elts = filter(lambda elt: elt.name == 'streamhost', query_elt.elements()) |
776 if not streamhost_elts: | 777 if not streamhost_elts: |
777 warning(_("No streamhost found in stream query")) | 778 log.warning(_("No streamhost found in stream query")) |
778 return | 779 return |
779 if len(streamhost_elts) != 1: | 780 if len(streamhost_elts) != 1: |
780 warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one")) | 781 log.warning(_("Multiple streamhost elements in proxy not managed, keeping only the first one")) |
781 streamhost_elt = streamhost_elts[0] | 782 streamhost_elt = streamhost_elts[0] |
782 self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid", ""), | 783 self.host.memory.setParam("Proxy", streamhost_elt.getAttribute("jid", ""), |
783 "File Transfer", profile_key=self.parent.profile) | 784 "File Transfer", profile_key=self.parent.profile) |
784 self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host", ""), | 785 self.host.memory.setParam("Proxy host", streamhost_elt.getAttribute("host", ""), |
785 "File Transfer", profile_key=self.parent.profile) | 786 "File Transfer", profile_key=self.parent.profile) |
793 if not proxy: | 794 if not proxy: |
794 def proxiesFound(entities): | 795 def proxiesFound(entities): |
795 try: | 796 try: |
796 proxy_ent = entities.pop() | 797 proxy_ent = entities.pop() |
797 except KeyError: | 798 except KeyError: |
798 info(_("No proxy found on this server")) | 799 log.info(_("No proxy found on this server")) |
799 return | 800 return |
800 iq_elt = jabber_client.IQ(self.parent.xmlstream, 'get') | 801 iq_elt = jabber_client.IQ(self.parent.xmlstream, 'get') |
801 iq_elt["to"] = proxy_ent.full() | 802 iq_elt["to"] = proxy_ent.full() |
802 iq_elt.addElement('query', NS_BS) | 803 iq_elt.addElement('query', NS_BS) |
803 iq_elt.addCallback(self._proxyDataResult) | 804 iq_elt.addCallback(self._proxyDataResult) |