Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0047.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 003b8b4b56a7 |
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0047.py Wed Jun 27 07:51:29 2018 +0200 +++ b/sat/plugins/plugin_xep_0047.py Wed Jun 27 20:14:46 2018 +0200 @@ -19,6 +19,7 @@ from sat.core.i18n import _ from sat.core.log import getLogger + log = getLogger(__name__) from sat.core.constants import Const as C from sat.core import exceptions @@ -40,15 +41,15 @@ except ImportError: from wokkel.subprotocols import XMPPHandler -MESSAGE = '/message' +MESSAGE = "/message" IQ_SET = '/iq[@type="set"]' -NS_IBB = 'http://jabber.org/protocol/ibb' +NS_IBB = "http://jabber.org/protocol/ibb" IBB_OPEN = IQ_SET + '/open[@xmlns="' + NS_IBB + '"]' IBB_CLOSE = IQ_SET + '/close[@xmlns="' + NS_IBB + '" and @sid="{}"]' IBB_IQ_DATA = IQ_SET + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]' IBB_MESSAGE_DATA = MESSAGE + '/data[@xmlns="' + NS_IBB + '" and @sid="{}"]' TIMEOUT = 120 # timeout for workflow -DEFER_KEY = 'finished' # key of the deferred used to track session end +DEFER_KEY = "finished" # key of the deferred used to track session end PLUGIN_INFO = { C.PI_NAME: "In-Band Bytestream Plugin", @@ -58,7 +59,7 @@ C.PI_PROTOCOLS: ["XEP-0047"], C.PI_MAIN: "XEP_0047", C.PI_HANDLER: "yes", - C.PI_DESCRIPTION: _("""Implementation of In-Band Bytestreams""") + C.PI_DESCRIPTION: _("""Implementation of In-Band Bytestreams"""), } @@ -82,8 +83,11 @@ @param sid(unicode): session id of client.xep_0047_current_stream @param client: %(doc_client)s """ - log.info(u"In-Band Bytestream: TimeOut reached for id {sid} [{profile}]" - .format(sid=sid, profile=client.profile)) + log.info( + u"In-Band Bytestream: TimeOut reached for id {sid} [{profile}]".format( + sid=sid, profile=client.profile + ) + ) self._killSession(sid, client, "TIMEOUT") def _killSession(self, sid, client, failure_reason=None): @@ -101,14 +105,14 @@ return try: - observer_cb = session['observer_cb'] + observer_cb = session["observer_cb"] except KeyError: pass else: client.xmlstream.removeObserver(session["event_data"], observer_cb) - if session['timer'].active(): - session['timer'].cancel() + if session["timer"].active(): + session["timer"].cancel() del client.xep_0047_current_stream[sid] @@ -136,15 +140,15 @@ @return (dict): session data """ if sid in client.xep_0047_current_stream: - raise exceptions.ConflictError(u'A session with this id already exists !') - session_data = client.xep_0047_current_stream[sid] = \ - {'id': sid, - DEFER_KEY: defer.Deferred(), - 'to': to_jid, - 'stream_object': stream_object, - 'seq': -1, - 'timer': reactor.callLater(TIMEOUT, self._timeOut, sid, client), - } + raise exceptions.ConflictError(u"A session with this id already exists !") + session_data = client.xep_0047_current_stream[sid] = { + "id": sid, + DEFER_KEY: defer.Deferred(), + "to": to_jid, + "stream_object": stream_object, + "seq": -1, + "timer": reactor.callLater(TIMEOUT, self._timeOut, sid, client), + } return session_data @@ -155,19 +159,21 @@ """ log.debug(_(u"IBB stream opening")) iq_elt.handled = True - open_elt = iq_elt.elements(NS_IBB, 'open').next() - block_size = open_elt.getAttribute('block-size') - sid = open_elt.getAttribute('sid') - stanza = open_elt.getAttribute('stanza', 'iq') + open_elt = iq_elt.elements(NS_IBB, "open").next() + block_size = open_elt.getAttribute("block-size") + sid = open_elt.getAttribute("sid") + stanza = open_elt.getAttribute("stanza", "iq") if not sid or not block_size or int(block_size) > 65535: - return self._sendError('not-acceptable', sid or None, iq_elt, client) + return self._sendError("not-acceptable", sid or None, iq_elt, client) if not sid in client.xep_0047_current_stream: log.warning(_(u"Ignoring unexpected IBB transfer: %s" % sid)) - return self._sendError('not-acceptable', sid or None, iq_elt, client) + return self._sendError("not-acceptable", sid or None, iq_elt, client) session_data = client.xep_0047_current_stream[sid] - if session_data["to"] != jid.JID(iq_elt['from']): - log.warning(_("sended jid inconsistency (man in the middle attack attempt ?)")) - return self._sendError('not-acceptable', sid, iq_elt, client) + if session_data["to"] != jid.JID(iq_elt["from"]): + log.warning( + _("sended jid inconsistency (man in the middle attack attempt ?)") + ) + return self._sendError("not-acceptable", sid, iq_elt, client) # at this stage, the session looks ok and will be accepted @@ -175,7 +181,9 @@ session_data["timer"].reset(TIMEOUT) # we save the xmlstream, events and observer data to allow observer removal - session_data["event_data"] = event_data = (IBB_MESSAGE_DATA if stanza == 'message' else IBB_IQ_DATA).format(sid) + session_data["event_data"] = event_data = ( + IBB_MESSAGE_DATA if stanza == "message" else IBB_IQ_DATA + ).format(sid) session_data["observer_cb"] = observer_cb = self._onIBBData event_close = IBB_CLOSE.format(sid) # we now set the stream observer to look after data packet @@ -184,7 +192,7 @@ client.xmlstream.addObserver(event_data, observer_cb, client=client) client.xmlstream.addOnetimeObserver(event_close, self._onIBBClose, client=client) # finally, we send the accept stanza - iq_result_elt = xmlstream.toResponse(iq_elt, 'result') + iq_result_elt = xmlstream.toResponse(iq_elt, "result") client.send(iq_result_elt) def _onIBBClose(self, iq_elt, client): @@ -194,11 +202,11 @@ """ iq_elt.handled = True log.debug(_("IBB stream closing")) - close_elt = iq_elt.elements(NS_IBB, 'close').next() + close_elt = iq_elt.elements(NS_IBB, "close").next() # XXX: this observer is only triggered on valid sid, so we don't need to check it - sid = close_elt['sid'] + sid = close_elt["sid"] - iq_result_elt = xmlstream.toResponse(iq_elt, 'result') + iq_result_elt = xmlstream.toResponse(iq_elt, "result") client.send(iq_result_elt) self._killSession(sid, client) @@ -209,29 +217,33 @@ @param element(domish.Element): <iq> or <message> stanza """ element.handled = True - data_elt = element.elements(NS_IBB, 'data').next() - sid = data_elt['sid'] + data_elt = element.elements(NS_IBB, "data").next() + sid = data_elt["sid"] try: session_data = client.xep_0047_current_stream[sid] except KeyError: log.warning(_(u"Received data for an unknown session id")) - return self._sendError('item-not-found', None, element, client) + return self._sendError("item-not-found", None, element, client) from_jid = session_data["to"] stream_object = session_data["stream_object"] - if from_jid.full() != element['from']: - log.warning(_(u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}").format(initial=from_jid, given=element['from'])) - if element.name == 'iq': - self._sendError('not-acceptable', sid, element, client) + if from_jid.full() != element["from"]: + log.warning( + _( + u"sended jid inconsistency (man in the middle attack attempt ?)\ninitial={initial}\ngiven={given}" + ).format(initial=from_jid, given=element["from"]) + ) + if element.name == "iq": + self._sendError("not-acceptable", sid, element, client) return session_data["seq"] = (session_data["seq"] + 1) % 65535 if int(data_elt.getAttribute("seq", -1)) != session_data["seq"]: log.warning(_(u"Sequence error")) - if element.name == 'iq': - reason = 'not-acceptable' + if element.name == "iq": + reason = "not-acceptable" self._sendError(reason, sid, element, client) self.terminateStream(session_data, client, reason) return @@ -245,14 +257,14 @@ except TypeError: # The base64 data is invalid log.warning(_(u"Invalid base64 data")) - if element.name == 'iq': - self._sendError('not-acceptable', sid, element, client) + if element.name == "iq": + self._sendError("not-acceptable", sid, element, client) self.terminateStream(session_data, client, reason) return # we can now ack success - if element.name == 'iq': - iq_result_elt = xmlstream.toResponse(element, 'result') + if element.name == "iq": + iq_result_elt = xmlstream.toResponse(element, "result") client.send(iq_result_elt) def _sendError(self, error_condition, sid, iq_elt, client): @@ -264,7 +276,11 @@ @param client: %(doc_client)s """ iq_elt = error.StanzaError(error_condition).toResponse(iq_elt) - log.warning(u"Error while managing in-band bytestream session, cancelling: {}".format(error_condition)) + log.warning( + u"Error while managing in-band bytestream session, cancelling: {}".format( + error_condition + ) + ) if sid is not None: self._killSession(sid, client, error_condition) client.send(iq_elt) @@ -285,11 +301,11 @@ session_data["block_size"] = block_size iq_elt = client.IQ() - iq_elt['to'] = to_jid.full() - open_elt = iq_elt.addElement((NS_IBB, 'open')) - open_elt['block-size'] = str(block_size) - open_elt['sid'] = sid - open_elt['stanza'] = 'iq' # TODO: manage <message> stanza ? + iq_elt["to"] = to_jid.full() + open_elt = iq_elt.addElement((NS_IBB, "open")) + open_elt["block-size"] = str(block_size) + open_elt["sid"] = sid + open_elt["stanza"] = "iq" # TODO: manage <message> stanza ? args = [session_data, client] d = iq_elt.send() d.addCallbacks(self._IQDataStreamCb, self._IQDataStreamEb, args, None, args) @@ -307,11 +323,11 @@ buffer_ = session_data["stream_object"].read(session_data["block_size"]) if buffer_: next_iq_elt = client.IQ() - next_iq_elt['to'] = session_data["to"].full() - data_elt = next_iq_elt.addElement((NS_IBB, 'data')) - seq = session_data['seq'] = (session_data['seq'] + 1) % 65535 - data_elt['seq'] = unicode(seq) - data_elt['sid'] = session_data['id'] + next_iq_elt["to"] = session_data["to"].full() + data_elt = next_iq_elt.addElement((NS_IBB, "data")) + seq = session_data["seq"] = (session_data["seq"] + 1) % 65535 + data_elt["seq"] = unicode(seq) + data_elt["sid"] = session_data["id"] data_elt.addContent(base64.b64encode(buffer_)) args = [session_data, client] d = next_iq_elt.send() @@ -334,11 +350,11 @@ @param failure_reason(unicode, None): reason of the failure, or None if steam was successful """ iq_elt = client.IQ() - iq_elt['to'] = session_data["to"].full() - close_elt = iq_elt.addElement((NS_IBB, 'close')) - close_elt['sid'] = session_data['id'] + iq_elt["to"] = session_data["to"].full() + close_elt = iq_elt.addElement((NS_IBB, "close")) + close_elt["sid"] = session_data["id"] iq_elt.send() - self._killSession(session_data['id'], client, failure_reason) + self._killSession(session_data["id"], client, failure_reason) class XEP_0047_handler(XMPPHandler): @@ -348,10 +364,12 @@ self.plugin_parent = parent def connectionInitialized(self): - self.xmlstream.addObserver(IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent) + self.xmlstream.addObserver( + IBB_OPEN, self.plugin_parent._onIBBOpen, client=self.parent + ) - def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_IBB)] - def getDiscoItems(self, requestor, target, nodeIdentifier=''): + def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []