Mercurial > libervia-backend
diff sat/plugins/plugin_sec_otr.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 189e38fb11ff |
line wrap: on
line diff
--- a/sat/plugins/plugin_sec_otr.py Wed Jun 27 07:51:29 2018 +0200 +++ b/sat/plugins/plugin_sec_otr.py Wed Jun 27 20:14:46 2018 +0200 @@ -24,6 +24,7 @@ from sat.core.constants import Const as C from sat.core.log import getLogger from sat.core import exceptions + log = getLogger(__name__) from sat.tools import xml_tools from twisted.words.protocols.jabber import jid @@ -44,27 +45,27 @@ C.PI_DEPENDENCIES: [u"XEP-0280", u"XEP-0334"], C.PI_MAIN: u"OTR", C.PI_HANDLER: u"no", - C.PI_DESCRIPTION: _(u"""Implementation of OTR""") + C.PI_DESCRIPTION: _(u"""Implementation of OTR"""), } NS_OTR = "otr_plugin" PRIVATE_KEY = "PRIVATE KEY" -OTR_MENU = D_(u'OTR') -AUTH_TXT = D_(u"To authenticate your correspondent, you need to give your below fingerprint *BY AN EXTERNAL CANAL* (i.e. not in this chat), and check that the one he gives you is the same as below. If there is a mismatch, there can be a spy between you!") -DROP_TXT = D_(u"You private key is used to encrypt messages for your correspondent, nobody except you must know it, if you are in doubt, you should drop it!\n\nAre you sure you want to drop your private key?") +OTR_MENU = D_(u"OTR") +AUTH_TXT = D_( + u"To authenticate your correspondent, you need to give your below fingerprint *BY AN EXTERNAL CANAL* (i.e. not in this chat), and check that the one he gives you is the same as below. If there is a mismatch, there can be a spy between you!" +) +DROP_TXT = D_( + u"You private key is used to encrypt messages for your correspondent, nobody except you must know it, if you are in doubt, you should drop it!\n\nAre you sure you want to drop your private key?" +) # NO_LOG_AND = D_(u"/!\\Your history is not logged anymore, and") # FIXME: not used at the moment NO_ADV_FEATURES = D_(u"Some of advanced features are disabled !") -DEFAULT_POLICY_FLAGS = { - 'ALLOW_V1':False, - 'ALLOW_V2':True, - 'REQUIRE_ENCRYPTION':True, -} +DEFAULT_POLICY_FLAGS = {"ALLOW_V1": False, "ALLOW_V2": True, "REQUIRE_ENCRYPTION": True} -OTR_STATE_TRUSTED = 'trusted' -OTR_STATE_UNTRUSTED = 'untrusted' -OTR_STATE_UNENCRYPTED = 'unencrypted' -OTR_STATE_ENCRYPTED = 'encrypted' +OTR_STATE_TRUSTED = "trusted" +OTR_STATE_UNTRUSTED = "untrusted" +OTR_STATE_UNENCRYPTED = "unencrypted" +OTR_STATE_ENCRYPTED = "encrypted" class Context(potr.context.Context): @@ -89,25 +90,25 @@ message data when an encrypted message is going to be sent """ assert isinstance(self.peer, jid.JID) - msg = msg_str.decode('utf-8') + msg = msg_str.decode("utf-8") client = self.user.client - log.debug(u'injecting encrypted message to {to}'.format(to=self.peer)) + log.debug(u"injecting encrypted message to {to}".format(to=self.peer)) if appdata is None: mess_data = { - 'from': client.jid, - 'to': self.peer, - 'uid': unicode(uuid.uuid4()), - 'message': {'': msg}, - 'subject': {}, - 'type': 'chat', - 'extra': {}, - 'timestamp': time.time(), - } + "from": client.jid, + "to": self.peer, + "uid": unicode(uuid.uuid4()), + "message": {"": msg}, + "subject": {}, + "type": "chat", + "extra": {}, + "timestamp": time.time(), + } client.generateMessageXML(mess_data) - client.send(mess_data['xml']) + client.send(mess_data["xml"]) else: - message_elt = appdata[u'xml'] - assert message_elt.name == u'message' + message_elt = appdata[u"xml"] + assert message_elt.name == u"message" message_elt.addElement("body", content=msg) def setState(self, state): @@ -117,8 +118,12 @@ log.debug(u"setState: %s (old_state=%s)" % (state, old_state)) if state == potr.context.STATE_PLAINTEXT: - feedback = _(u"/!\\ conversation with %(other_jid)s is now UNENCRYPTED") % {'other_jid': self.peer.full()} - self.host.bridge.otrState(OTR_STATE_UNENCRYPTED, self.peer.full(), client.profile) + feedback = _(u"/!\\ conversation with %(other_jid)s is now UNENCRYPTED") % { + "other_jid": self.peer.full() + } + self.host.bridge.otrState( + OTR_STATE_UNENCRYPTED, self.peer.full(), client.profile + ) elif state == potr.context.STATE_ENCRYPTED: try: trusted = self.getCurrentTrust() @@ -127,18 +132,27 @@ trusted_str = _(u"trusted") if trusted else _(u"untrusted") if old_state == potr.context.STATE_ENCRYPTED: - feedback = D_(u"{trusted} OTR conversation with {other_jid} REFRESHED").format( - trusted = trusted_str, - other_jid = self.peer.full()) + feedback = D_( + u"{trusted} OTR conversation with {other_jid} REFRESHED" + ).format(trusted=trusted_str, other_jid=self.peer.full()) else: - feedback = D_(u"{trusted} encrypted OTR conversation started with {other_jid}\n{extra_info}").format( - trusted = trusted_str, - other_jid = self.peer.full(), - extra_info = NO_ADV_FEATURES) - self.host.bridge.otrState(OTR_STATE_ENCRYPTED, self.peer.full(), client.profile) + feedback = D_( + u"{trusted} encrypted OTR conversation started with {other_jid}\n{extra_info}" + ).format( + trusted=trusted_str, + other_jid=self.peer.full(), + extra_info=NO_ADV_FEATURES, + ) + self.host.bridge.otrState( + OTR_STATE_ENCRYPTED, self.peer.full(), client.profile + ) elif state == potr.context.STATE_FINISHED: - feedback = D_(u"OTR conversation with {other_jid} is FINISHED").format(other_jid = self.peer.full()) - self.host.bridge.otrState(OTR_STATE_UNENCRYPTED, self.peer.full(), client.profile) + feedback = D_(u"OTR conversation with {other_jid} is FINISHED").format( + other_jid=self.peer.full() + ) + self.host.bridge.otrState( + OTR_STATE_UNENCRYPTED, self.peer.full(), client.profile + ) else: log.error(D_(u"Unknown OTR state")) return @@ -176,37 +190,42 @@ log.debug(u"savePrivkey") if self.privkey is None: raise exceptions.InternalError(_(u"Save is called but privkey is None !")) - priv_key = self.privkey.serializePrivateKey().encode('hex') + priv_key = self.privkey.serializePrivateKey().encode("hex") d = self.host.memory.encryptValue(priv_key, self.client.profile) + def save_encrypted_key(encrypted_priv_key): self.client._otr_data[PRIVATE_KEY] = encrypted_priv_key + d.addCallback(save_encrypted_key) def loadTrusts(self): - trust_data = self.client._otr_data.get('trust', {}) + trust_data = self.client._otr_data.get("trust", {}) for jid_, jid_data in trust_data.iteritems(): for fingerprint, trust_level in jid_data.iteritems(): - log.debug(u'setting trust for {jid}: [{fingerprint}] = "{trust_level}"'.format(jid=jid_, fingerprint=fingerprint, trust_level=trust_level)) + log.debug( + u'setting trust for {jid}: [{fingerprint}] = "{trust_level}"'.format( + jid=jid_, fingerprint=fingerprint, trust_level=trust_level + ) + ) self.trusts.setdefault(jid.JID(jid_), {})[fingerprint] = trust_level def saveTrusts(self): log.debug(u"saving trusts for {profile}".format(profile=self.client.profile)) - log.debug(u"trusts = {}".format(self.client._otr_data['trust'])) - self.client._otr_data.force('trust') + log.debug(u"trusts = {}".format(self.client._otr_data["trust"])) + self.client._otr_data.force("trust") def setTrust(self, other_jid, fingerprint, trustLevel): try: - trust_data = self.client._otr_data['trust'] + trust_data = self.client._otr_data["trust"] except KeyError: trust_data = {} - self.client._otr_data['trust'] = trust_data + self.client._otr_data["trust"] = trust_data jid_data = trust_data.setdefault(other_jid.full(), {}) jid_data[fingerprint] = trustLevel super(Account, self).setTrust(other_jid, fingerprint, trustLevel) class ContextManager(object): - def __init__(self, host, client): self.host = host self.account = Account(host, client) @@ -214,7 +233,9 @@ def startContext(self, other_jid): assert isinstance(other_jid, jid.JID) - context = self.contexts.setdefault(other_jid, Context(self.host, self.account, other_jid)) + context = self.contexts.setdefault( + other_jid, Context(self.host, self.account, other_jid) + ) return context def getContextForUser(self, other): @@ -225,23 +246,51 @@ class OTR(object): - def __init__(self, host): log.info(_(u"OTR plugin initialization")) self.host = host self.context_managers = {} - self.skipped_profiles = set() # FIXME: OTR should not be skipped per profile, this need to be refactored - self._p_hints = host.plugins[u'XEP-0334'] - self._p_carbons = host.plugins[u'XEP-0280'] + self.skipped_profiles = ( + set() + ) # FIXME: OTR should not be skipped per profile, this need to be refactored + self._p_hints = host.plugins[u"XEP-0334"] + self._p_carbons = host.plugins[u"XEP-0280"] host.trigger.add("MessageReceived", self.MessageReceivedTrigger, priority=100000) host.trigger.add("sendMessage", self.sendMessageTrigger, priority=100000) host.trigger.add("sendMessageData", self._sendMessageDataTrigger) - host.bridge.addMethod("skipOTR", ".plugin", in_sign='s', out_sign='', method=self._skipOTR) # FIXME: must be removed, must be done on per-message basis - host.bridge.addSignal("otrState", ".plugin", signature='sss') # args: state, destinee_jid, profile - host.importMenu((OTR_MENU, D_(u"Start/Refresh")), self._otrStartRefresh, security_limit=0, help_string=D_(u"Start or refresh an OTR session"), type_=C.MENU_SINGLE) - host.importMenu((OTR_MENU, D_(u"End session")), self._otrSessionEnd, security_limit=0, help_string=D_(u"Finish an OTR session"), type_=C.MENU_SINGLE) - host.importMenu((OTR_MENU, D_(u"Authenticate")), self._otrAuthenticate, security_limit=0, help_string=D_(u"Authenticate user/see your fingerprint"), type_=C.MENU_SINGLE) - host.importMenu((OTR_MENU, D_(u"Drop private key")), self._dropPrivKey, security_limit=0, type_=C.MENU_SINGLE) + host.bridge.addMethod( + "skipOTR", ".plugin", in_sign="s", out_sign="", method=self._skipOTR + ) # FIXME: must be removed, must be done on per-message basis + host.bridge.addSignal( + "otrState", ".plugin", signature="sss" + ) # args: state, destinee_jid, profile + host.importMenu( + (OTR_MENU, D_(u"Start/Refresh")), + self._otrStartRefresh, + security_limit=0, + help_string=D_(u"Start or refresh an OTR session"), + type_=C.MENU_SINGLE, + ) + host.importMenu( + (OTR_MENU, D_(u"End session")), + self._otrSessionEnd, + security_limit=0, + help_string=D_(u"Finish an OTR session"), + type_=C.MENU_SINGLE, + ) + host.importMenu( + (OTR_MENU, D_(u"Authenticate")), + self._otrAuthenticate, + security_limit=0, + help_string=D_(u"Authenticate user/see your fingerprint"), + type_=C.MENU_SINGLE, + ) + host.importMenu( + (OTR_MENU, D_(u"Drop private key")), + self._dropPrivKey, + security_limit=0, + type_=C.MENU_SINGLE, + ) host.trigger.add("presenceReceived", self._presenceReceivedTrigger) def _skipOTR(self, profile): @@ -263,8 +312,12 @@ yield client._otr_data.load() encrypted_priv_key = client._otr_data.get(PRIVATE_KEY, None) if encrypted_priv_key is not None: - priv_key = yield self.host.memory.decryptValue(encrypted_priv_key, client.profile) - ctxMng.account.privkey = potr.crypt.PK.parsePrivateKey(priv_key.decode('hex'))[0] + priv_key = yield self.host.memory.decryptValue( + encrypted_priv_key, client.profile + ) + ctxMng.account.privkey = potr.crypt.PK.parsePrivateKey( + priv_key.decode("hex") + )[0] else: ctxMng.account.privkey = None ctxMng.account.loadTrusts() @@ -285,7 +338,7 @@ """ client = self.host.getClient(profile) try: - to_jid = jid.JID(menu_data['jid']) + to_jid = jid.JID(menu_data["jid"]) except KeyError: log.error(_(u"jid key is not present !")) return defer.fail(exceptions.DataError) @@ -298,9 +351,11 @@ @param to_jid(jid.JID): jid to start encrypted session with """ if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource(client, to_jid) # FIXME: temporary and unsecure, must be changed when frontends are refactored + to_jid.resource = self.host.memory.getMainResource( + client, to_jid + ) # FIXME: temporary and unsecure, must be changed when frontends are refactored otrctx = client._otr_context_manager.getContextForUser(to_jid) - query = otrctx.sendMessage(0, '?OTRv?') + query = otrctx.sendMessage(0, "?OTRv?") otrctx.inject(query) def _otrSessionEnd(self, menu_data, profile): @@ -311,7 +366,7 @@ """ client = self.host.getClient(profile) try: - to_jid = jid.JID(menu_data['jid']) + to_jid = jid.JID(menu_data["jid"]) except KeyError: log.error(_(u"jid key is not present !")) return defer.fail(exceptions.DataError) @@ -321,7 +376,9 @@ def endSession(self, client, to_jid): """End an OTR session""" if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource(client, to_jid) # FIXME: temporary and unsecure, must be changed when frontends are refactored + to_jid.resource = self.host.memory.getMainResource( + client, to_jid + ) # FIXME: temporary and unsecure, must be changed when frontends are refactored otrctx = client._otr_context_manager.getContextForUser(to_jid) otrctx.disconnect() return {} @@ -334,7 +391,7 @@ """ client = self.host.getClient(profile) try: - to_jid = jid.JID(menu_data['jid']) + to_jid = jid.JID(menu_data["jid"]) except KeyError: log.error(_(u"jid key is not present !")) return defer.fail(exceptions.DataError) @@ -343,65 +400,94 @@ def authenticate(self, client, to_jid): """Authenticate other user and see our own fingerprint""" if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource(client, to_jid) # FIXME: temporary and unsecure, must be changed when frontends are refactored + to_jid.resource = self.host.memory.getMainResource( + client, to_jid + ) # FIXME: temporary and unsecure, must be changed when frontends are refactored ctxMng = client._otr_context_manager otrctx = ctxMng.getContextForUser(to_jid) priv_key = ctxMng.account.privkey if priv_key is None: # we have no private key yet - dialog = xml_tools.XMLUI(C.XMLUI_DIALOG, - dialog_opt = {C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_MESSAGE, - C.XMLUI_DATA_MESS: _(u"You have no private key yet, start an OTR conversation to have one"), - C.XMLUI_DATA_LVL: C.XMLUI_DATA_LVL_WARNING - }, - title = _(u"No private key"), - ) - return {'xmlui': dialog.toXml()} + dialog = xml_tools.XMLUI( + C.XMLUI_DIALOG, + dialog_opt={ + C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_MESSAGE, + C.XMLUI_DATA_MESS: _( + u"You have no private key yet, start an OTR conversation to have one" + ), + C.XMLUI_DATA_LVL: C.XMLUI_DATA_LVL_WARNING, + }, + title=_(u"No private key"), + ) + return {"xmlui": dialog.toXml()} other_fingerprint = otrctx.getCurrentKey() if other_fingerprint is None: # we have a private key, but not the fingerprint of our correspondent - dialog = xml_tools.XMLUI(C.XMLUI_DIALOG, - dialog_opt = {C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_MESSAGE, - C.XMLUI_DATA_MESS: _(u"Your fingerprint is:\n{fingerprint}\n\nStart an OTR conversation to have your correspondent one.").format(fingerprint=priv_key), - C.XMLUI_DATA_LVL: C.XMLUI_DATA_LVL_INFO - }, - title = _(u"Fingerprint"), - ) - return {'xmlui': dialog.toXml()} + dialog = xml_tools.XMLUI( + C.XMLUI_DIALOG, + dialog_opt={ + C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_MESSAGE, + C.XMLUI_DATA_MESS: _( + u"Your fingerprint is:\n{fingerprint}\n\nStart an OTR conversation to have your correspondent one." + ).format(fingerprint=priv_key), + C.XMLUI_DATA_LVL: C.XMLUI_DATA_LVL_INFO, + }, + title=_(u"Fingerprint"), + ) + return {"xmlui": dialog.toXml()} def setTrust(raw_data, profile): # This method is called when authentication form is submited data = xml_tools.XMLUIResult2DataFormResult(raw_data) - if data['match'] == 'yes': + if data["match"] == "yes": otrctx.setCurrentTrust(OTR_STATE_TRUSTED) note_msg = _(u"Your correspondent {correspondent} is now TRUSTED") - self.host.bridge.otrState(OTR_STATE_TRUSTED, to_jid.full(), client.profile) + self.host.bridge.otrState( + OTR_STATE_TRUSTED, to_jid.full(), client.profile + ) else: - otrctx.setCurrentTrust('') + otrctx.setCurrentTrust("") note_msg = _(u"Your correspondent {correspondent} is now UNTRUSTED") - self.host.bridge.otrState(OTR_STATE_UNTRUSTED, to_jid.full(), client.profile) - note = xml_tools.XMLUI(C.XMLUI_DIALOG, dialog_opt = { - C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_NOTE, - C.XMLUI_DATA_MESS: note_msg.format(correspondent=otrctx.peer)} - ) - return {'xmlui': note.toXml()} + self.host.bridge.otrState( + OTR_STATE_UNTRUSTED, to_jid.full(), client.profile + ) + note = xml_tools.XMLUI( + C.XMLUI_DIALOG, + dialog_opt={ + C.XMLUI_DATA_TYPE: C.XMLUI_DIALOG_NOTE, + C.XMLUI_DATA_MESS: note_msg.format(correspondent=otrctx.peer), + }, + ) + return {"xmlui": note.toXml()} submit_id = self.host.registerCallback(setTrust, with_data=True, one_shot=True) trusted = bool(otrctx.getCurrentTrust()) - xmlui = xml_tools.XMLUI(C.XMLUI_FORM, title=_('Authentication (%s)') % to_jid.full(), submit_id=submit_id) + xmlui = xml_tools.XMLUI( + C.XMLUI_FORM, + title=_("Authentication (%s)") % to_jid.full(), + submit_id=submit_id, + ) xmlui.addText(_(AUTH_TXT)) xmlui.addDivider() - xmlui.addText(D_(u"Your own fingerprint is:\n{fingerprint}").format(fingerprint=priv_key)) - xmlui.addText(D_(u"Your correspondent fingerprint should be:\n{fingerprint}").format(fingerprint=other_fingerprint)) - xmlui.addDivider('blank') - xmlui.changeContainer('pairs') - xmlui.addLabel(D_(u'Is your correspondent fingerprint the same as here ?')) - xmlui.addList("match", [('yes', _('yes')),('no', _('no'))], ['yes' if trusted else 'no']) - return {'xmlui': xmlui.toXml()} + xmlui.addText( + D_(u"Your own fingerprint is:\n{fingerprint}").format(fingerprint=priv_key) + ) + xmlui.addText( + D_(u"Your correspondent fingerprint should be:\n{fingerprint}").format( + fingerprint=other_fingerprint + ) + ) + xmlui.addDivider("blank") + xmlui.changeContainer("pairs") + xmlui.addLabel(D_(u"Is your correspondent fingerprint the same as here ?")) + xmlui.addList( + "match", [("yes", _("yes")), ("no", _("no"))], ["yes" if trusted else "no"] + ) + return {"xmlui": xmlui.toXml()} def _dropPrivKey(self, menu_data, profile): """Drop our private Key @@ -411,47 +497,69 @@ """ client = self.host.getClient(profile) try: - to_jid = jid.JID(menu_data['jid']) + to_jid = jid.JID(menu_data["jid"]) if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource(client, to_jid) # FIXME: temporary and unsecure, must be changed when frontends are refactored + to_jid.resource = self.host.memory.getMainResource( + client, to_jid + ) # FIXME: temporary and unsecure, must be changed when frontends are refactored except KeyError: log.error(_(u"jid key is not present !")) return defer.fail(exceptions.DataError) ctxMng = client._otr_context_manager if ctxMng.account.privkey is None: - return {'xmlui': xml_tools.note(_(u"You don't have a private key yet !")).toXml()} + return { + "xmlui": xml_tools.note(_(u"You don't have a private key yet !")).toXml() + } def dropKey(data, profile): - if C.bool(data['answer']): + if C.bool(data["answer"]): # we end all sessions for context in ctxMng.contexts.values(): context.disconnect() ctxMng.account.privkey = None ctxMng.account.getPrivkey() # as account.privkey is None, getPrivkey will generate a new key, and save it - return {'xmlui': xml_tools.note(D_(u"Your private key has been dropped")).toXml()} + return { + "xmlui": xml_tools.note( + D_(u"Your private key has been dropped") + ).toXml() + } return {} submit_id = self.host.registerCallback(dropKey, with_data=True, one_shot=True) - confirm = xml_tools.XMLUI(C.XMLUI_DIALOG, title=_(u'Confirm private key drop'), dialog_opt = {'type': C.XMLUI_DIALOG_CONFIRM, 'message': _(DROP_TXT)}, submit_id = submit_id) - return {'xmlui': confirm.toXml()} + confirm = xml_tools.XMLUI( + C.XMLUI_DIALOG, + title=_(u"Confirm private key drop"), + dialog_opt={"type": C.XMLUI_DIALOG_CONFIRM, "message": _(DROP_TXT)}, + submit_id=submit_id, + ) + return {"xmlui": confirm.toXml()} def _receivedTreatment(self, data, client): - from_jid = data['from'] + from_jid = data["from"] log.debug(u"_receivedTreatment [from_jid = %s]" % from_jid) otrctx = client._otr_context_manager.getContextForUser(from_jid) try: - message = data['message'].itervalues().next() # FIXME: Q&D fix for message refactoring, message is now a dict - res = otrctx.receiveMessage(message.encode('utf-8')) + message = ( + data["message"].itervalues().next() + ) # FIXME: Q&D fix for message refactoring, message is now a dict + res = otrctx.receiveMessage(message.encode("utf-8")) except potr.context.UnencryptedMessage: encrypted = False if otrctx.state == potr.context.STATE_ENCRYPTED: - log.warning(u"Received unencrypted message in an encrypted context (from {jid})".format( - jid = from_jid.full())) + log.warning( + u"Received unencrypted message in an encrypted context (from {jid})".format( + jid=from_jid.full() + ) + ) - feedback=D_(u"WARNING: received unencrypted data in a supposedly encrypted context"), + feedback = ( + D_( + u"WARNING: received unencrypted data in a supposedly encrypted context" + ), + ) client.feedback(from_jid, feedback) except StopIteration: return data @@ -462,17 +570,25 @@ if res[0] != None: # decrypted messages handling. # receiveMessage() will return a tuple, the first part of which will be the decrypted message - data['message'] = {'':res[0].decode('utf-8')} # FIXME: Q&D fix for message refactoring, message is now a dict + data["message"] = { + "": res[0].decode("utf-8") + } # FIXME: Q&D fix for message refactoring, message is now a dict try: # we want to keep message in history, even if no store is requested in message hints - del data[u'history'] + del data[u"history"] except KeyError: pass # TODO: add skip history as an option, but by default we don't skip it # data[u'history'] = C.HISTORY_SKIP # we send the decrypted message to frontends, but we don't want it in history else: - log.warning(u"An encrypted message was expected, but got {}".format(data['message'])) - raise failure.Failure(exceptions.CancelError('Cancelled by OTR')) # no message at all (no history, no signal) + log.warning( + u"An encrypted message was expected, but got {}".format( + data["message"] + ) + ) + raise failure.Failure( + exceptions.CancelError("Cancelled by OTR") + ) # no message at all (no history, no signal) return data def _receivedTreatmentForSkippedProfiles(self, data): @@ -480,21 +596,23 @@ but we still need to check if the message must be stored in history or not """ - # XXX: FIXME: this should not be done on a per-profile basis, but per-message + # XXX: FIXME: this should not be done on a per-profile basis, but per-message try: - message = data['message'].itervalues().next().encode('utf-8') # FIXME: Q&D fix for message refactoring, message is now a dict + message = ( + data["message"].itervalues().next().encode("utf-8") + ) # FIXME: Q&D fix for message refactoring, message is now a dict except StopIteration: return data if message.startswith(potr.proto.OTRTAG): - # FIXME: it may be better to cancel the message and send it direclty to bridge + # FIXME: it may be better to cancel the message and send it direclty to bridge # this is used by Libervia, but this may send garbage message to other frontends # if they are used at the same time as Libervia. # Hard to avoid with decryption on Libervia though. - data[u'history'] = C.HISTORY_SKIP + data[u"history"] = C.HISTORY_SKIP return data def MessageReceivedTrigger(self, client, message_elt, post_treat): - if message_elt.getAttribute('type') == C.MESS_TYPE_GROUPCHAT: + if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT: # OTR is not possible in group chats return True if client.profile in self.skipped_profiles: @@ -504,51 +622,65 @@ return True def _sendMessageDataTrigger(self, client, mess_data): - if not 'OTR' in mess_data: + if not "OTR" in mess_data: return - otrctx = mess_data['OTR'] - message_elt = mess_data['xml'] - to_jid = mess_data['to'] + otrctx = mess_data["OTR"] + message_elt = mess_data["xml"] + to_jid = mess_data["to"] if otrctx.state == potr.context.STATE_ENCRYPTED: log.debug(u"encrypting message") body = None for child in list(message_elt.children): - if child.name == 'body': + if child.name == "body": # we remove all unencrypted body, # and will only encrypt the first one if body is None: body = child message_elt.children.remove(child) - elif child.name == 'html': + elif child.name == "html": # we don't want any XHTML-IM element message_elt.children.remove(child) if body is None: log.warning(u"No message found") else: self._p_carbons.setPrivate(message_elt) - otrctx.sendMessage(0, unicode(body).encode('utf-8'), appdata=mess_data) + otrctx.sendMessage(0, unicode(body).encode("utf-8"), appdata=mess_data) else: - feedback = D_(u"Your message was not sent because your correspondent closed the encrypted conversation on his/her side. " - u"Either close your own side, or refresh the session.") + feedback = D_( + u"Your message was not sent because your correspondent closed the encrypted conversation on his/her side. " + u"Either close your own side, or refresh the session." + ) log.warning(_(u"Message discarded because closed encryption channel")) client.feedback(to_jid, feedback) - raise failure.Failure(exceptions.CancelError(u'Cancelled by OTR plugin')) + raise failure.Failure(exceptions.CancelError(u"Cancelled by OTR plugin")) - def sendMessageTrigger(self, client, mess_data, pre_xml_treatments, post_xml_treatments): - if mess_data['type'] == 'groupchat': + def sendMessageTrigger( + self, client, mess_data, pre_xml_treatments, post_xml_treatments + ): + if mess_data["type"] == "groupchat": return True - if client.profile in self.skipped_profiles: # FIXME: should not be done on a per-profile basis + if ( + client.profile in self.skipped_profiles + ): # FIXME: should not be done on a per-profile basis return True - to_jid = copy.copy(mess_data['to']) + to_jid = copy.copy(mess_data["to"]) if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource(client, to_jid) # FIXME: full jid may not be known + to_jid.resource = self.host.memory.getMainResource( + client, to_jid + ) # FIXME: full jid may not be known otrctx = client._otr_context_manager.getContextForUser(to_jid) if otrctx.state != potr.context.STATE_PLAINTEXT: self._p_hints.addHint(mess_data, self._p_hints.HINT_NO_COPY) self._p_hints.addHint(mess_data, self._p_hints.HINT_NO_PERMANENT_STORE) - mess_data['OTR'] = otrctx # this indicate that encryption is needed in sendMessageData trigger - if not mess_data['to'].resource: # if not resource was given, we force it here - mess_data['to'] = to_jid + mess_data[ + "OTR" + ] = ( + otrctx + ) # this indicate that encryption is needed in sendMessageData trigger + if not mess_data[ + "to" + ].resource: # if not resource was given, we force it here + mess_data["to"] = to_jid return True def _presenceReceivedTrigger(self, entity, show, priority, statuses, profile): @@ -557,7 +689,9 @@ client = self.host.getClient(profile) if not entity.resource: try: - entity.resource = self.host.memory.getMainResource(client, entity) # FIXME: temporary and unsecure, must be changed when frontends are refactored + entity.resource = self.host.memory.getMainResource( + client, entity + ) # FIXME: temporary and unsecure, must be changed when frontends are refactored except exceptions.UnknownEntityError: return True # entity was not connected if entity in client._otr_context_manager.contexts: