Mercurial > libervia-backend
diff sat/plugins/plugin_sec_otr.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | 967a8e109cda |
children | c23cad65ae99 |
line wrap: on
line diff
--- a/sat/plugins/plugin_sec_otr.py Fri Apr 07 15:18:39 2023 +0200 +++ b/sat/plugins/plugin_sec_otr.py Sat Apr 08 13:54:42 2023 +0200 @@ -92,7 +92,7 @@ def _p_carbons(self): return self.context_manager.parent._p_carbons - def getPolicy(self, key): + def get_policy(self, key): if key in DEFAULT_POLICY_FLAGS: return DEFAULT_POLICY_FLAGS[key] else: @@ -123,10 +123,10 @@ "extra": {}, "timestamp": time.time(), } - client.generateMessageXML(mess_data) + client.generate_message_xml(mess_data) xml = mess_data['xml'] - self._p_carbons.setPrivate(xml) - self._p_hints.addHintElements(xml, [ + self._p_carbons.set_private(xml) + self._p_hints.add_hint_elements(xml, [ self._p_hints.HINT_NO_COPY, self._p_hints.HINT_NO_PERMANENT_STORE]) client.send(mess_data["xml"]) @@ -135,19 +135,19 @@ assert message_elt.name == "message" message_elt.addElement("body", content=msg) - def stopCb(self, __, feedback): + def stop_cb(self, __, feedback): client = self.user.client - self.host.bridge.otrState( + self.host.bridge.otr_state( OTR_STATE_UNENCRYPTED, self.peer.full(), client.profile ) client.feedback(self.peer, feedback) - def stopEb(self, failure_): + def stop_eb(self, failure_): # encryption may be already stopped in case of manual stop if not failure_.check(exceptions.NotFound): log.error("Error while stopping OTR encryption: {msg}".format(msg=failure_)) - def isTrusted(self): + def is_trusted(self): # we have to check value because potr code says that a 2-tuples should be # returned while in practice it's either None or u"trusted" trusted = self.getCurrentTrust() @@ -160,24 +160,24 @@ value=trusted)) return False - def setState(self, state): + def set_state(self, state): client = self.user.client old_state = self.state - super(Context, self).setState(state) - log.debug("setState: %s (old_state=%s)" % (state, old_state)) + super(Context, self).set_state(state) + log.debug("set_state: %s (old_state=%s)" % (state, old_state)) if state == potr.context.STATE_PLAINTEXT: feedback = _("/!\\ conversation with %(other_jid)s is now UNENCRYPTED") % { "other_jid": self.peer.full() } d = defer.ensureDeferred(client.encryption.stop(self.peer, NS_OTR)) - d.addCallback(self.stopCb, feedback=feedback) - d.addErrback(self.stopEb) + d.addCallback(self.stop_cb, feedback=feedback) + d.addErrback(self.stop_eb) return elif state == potr.context.STATE_ENCRYPTED: defer.ensureDeferred(client.encryption.start(self.peer, NS_OTR)) try: - trusted = self.isTrusted() + trusted = self.is_trusted() except TypeError: trusted = False trusted_str = _("trusted") if trusted else _("untrusted") @@ -195,7 +195,7 @@ other_jid=self.peer.full(), extra_info=NO_ADV_FEATURES, ) - self.host.bridge.otrState( + self.host.bridge.otr_state( OTR_STATE_ENCRYPTED, self.peer.full(), client.profile ) elif state == potr.context.STATE_FINISHED: @@ -203,8 +203,8 @@ other_jid=self.peer.full() ) d = defer.ensureDeferred(client.encryption.stop(self.peer, NS_OTR)) - d.addCallback(self.stopCb, feedback=feedback) - d.addErrback(self.stopEb) + d.addCallback(self.stop_cb, feedback=feedback) + d.addErrback(self.stop_eb) return else: log.error(D_("Unknown OTR state")) @@ -240,19 +240,19 @@ self.host = host self.client = client - def loadPrivkey(self): - log.debug("loadPrivkey") + def load_privkey(self): + log.debug("load_privkey") return self.privkey - def savePrivkey(self): - log.debug("savePrivkey") + def save_privkey(self): + log.debug("save_privkey") if self.privkey is None: raise exceptions.InternalError(_("Save is called but privkey is None !")) priv_key = hexlify(self.privkey.serializePrivateKey()) - encrypted_priv_key = self.host.memory.encryptValue(priv_key, self.client.profile) + encrypted_priv_key = self.host.memory.encrypt_value(priv_key, self.client.profile) self.client._otr_data[PRIVATE_KEY] = encrypted_priv_key - def loadTrusts(self): + def load_trusts(self): trust_data = self.client._otr_data.get("trust", {}) for jid_, jid_data in trust_data.items(): for fingerprint, trust_level in jid_data.items(): @@ -263,12 +263,12 @@ ) self.trusts.setdefault(jid.JID(jid_), {})[fingerprint] = trust_level - def saveTrusts(self): + def save_trusts(self): log.debug("saving trusts for {profile}".format(profile=self.client.profile)) log.debug("trusts = {}".format(self.client._otr_data["trust"])) self.client._otr_data.force("trust") - def setTrust(self, other_jid, fingerprint, trustLevel): + def set_trust(self, other_jid, fingerprint, trustLevel): try: trust_data = self.client._otr_data["trust"] except KeyError: @@ -276,7 +276,7 @@ self.client._otr_data["trust"] = trust_data jid_data = trust_data.setdefault(other_jid.full(), {}) jid_data[fingerprint] = trustLevel - super(Account, self).setTrust(other_jid, fingerprint, trustLevel) + super(Account, self).set_trust(other_jid, fingerprint, trustLevel) class ContextManager(object): @@ -289,18 +289,18 @@ def host(self): return self.parent.host - def startContext(self, other_jid): + def start_context(self, other_jid): assert isinstance(other_jid, jid.JID) context = self.contexts.setdefault( other_jid, Context(self, other_jid) ) return context - def getContextForUser(self, other): - log.debug("getContextForUser [%s]" % other) + def get_context_for_user(self, other): + log.debug("get_context_for_user [%s]" % other) if not other.resource: - log.warning("getContextForUser called with a bare jid: %s" % other.full()) - return self.startContext(other) + log.warning("get_context_for_user called with a bare jid: %s" % other.full()) + return self.start_context(other) class OTR(object): @@ -314,48 +314,48 @@ ) # FIXME: OTR should not be skipped per profile, this need to be refactored self._p_hints = host.plugins["XEP-0334"] self._p_carbons = host.plugins["XEP-0280"] - host.trigger.add("messageReceived", self.messageReceivedTrigger, priority=100000) - host.trigger.add("sendMessage", self.sendMessageTrigger, priority=100000) - host.trigger.add("sendMessageData", self._sendMessageDataTrigger) - host.bridge.addMethod( - "skipOTR", ".plugin", in_sign="s", out_sign="", method=self._skipOTR + host.trigger.add("messageReceived", self.message_received_trigger, priority=100000) + host.trigger.add("sendMessage", self.send_message_trigger, priority=100000) + host.trigger.add("send_message_data", self._send_message_data_trigger) + host.bridge.add_method( + "skip_otr", ".plugin", in_sign="s", out_sign="", method=self._skip_otr ) # FIXME: must be removed, must be done on per-message basis - host.bridge.addSignal( - "otrState", ".plugin", signature="sss" + host.bridge.add_signal( + "otr_state", ".plugin", signature="sss" ) # args: state, destinee_jid, profile # XXX: menus are disabled in favor to the new more generic encryption menu # there are let here commented for a little while as a reference - # host.importMenu( + # host.import_menu( # (OTR_MENU, D_(u"Start/Refresh")), - # self._otrStartRefresh, + # self._otr_start_refresh, # security_limit=0, # help_string=D_(u"Start or refresh an OTR session"), # type_=C.MENU_SINGLE, # ) - # host.importMenu( + # host.import_menu( # (OTR_MENU, D_(u"End session")), - # self._otrSessionEnd, + # self._otr_session_end, # security_limit=0, # help_string=D_(u"Finish an OTR session"), # type_=C.MENU_SINGLE, # ) - # host.importMenu( + # host.import_menu( # (OTR_MENU, D_(u"Authenticate")), - # self._otrAuthenticate, + # self._otr_authenticate, # security_limit=0, # help_string=D_(u"Authenticate user/see your fingerprint"), # type_=C.MENU_SINGLE, # ) - # host.importMenu( + # host.import_menu( # (OTR_MENU, D_(u"Drop private key")), - # self._dropPrivKey, + # self._drop_priv_key, # security_limit=0, # type_=C.MENU_SINGLE, # ) - host.trigger.add("presence_received", self._presenceReceivedTrigger) - self.host.registerEncryptionPlugin(self, "OTR", NS_OTR, directed=True) + host.trigger.add("presence_received", self._presence_received_trigger) + self.host.register_encryption_plugin(self, "OTR", NS_OTR, directed=True) - def _skipOTR(self, profile): + def _skip_otr(self, profile): """Tell the backend to not handle OTR for this profile. @param profile (str): %(doc_profile)s @@ -366,7 +366,7 @@ self.skipped_profiles.add(profile) @defer.inlineCallbacks - def profileConnecting(self, client): + def profile_connecting(self, client): if client.profile in self.skipped_profiles: return ctxMng = client._otr_context_manager = ContextManager(self, client) @@ -374,7 +374,7 @@ yield client._otr_data.load() encrypted_priv_key = client._otr_data.get(PRIVATE_KEY, None) if encrypted_priv_key is not None: - priv_key = self.host.memory.decryptValue( + priv_key = self.host.memory.decrypt_value( encrypted_priv_key, client.profile ) ctxMng.account.privkey = potr.crypt.PK.parsePrivateKey( @@ -382,9 +382,9 @@ )[0] else: ctxMng.account.privkey = None - ctxMng.account.loadTrusts() + ctxMng.account.load_trusts() - def profileDisconnected(self, client): + def profile_disconnected(self, client): if client.profile in self.skipped_profiles: self.skipped_profiles.remove(client.profile) return @@ -394,20 +394,20 @@ # encryption plugin methods - def startEncryption(self, client, entity_jid): - self.startRefresh(client, entity_jid) + def start_encryption(self, client, entity_jid): + self.start_refresh(client, entity_jid) - def stopEncryption(self, client, entity_jid): - self.endSession(client, entity_jid) + def stop_encryption(self, client, entity_jid): + self.end_session(client, entity_jid) - def getTrustUI(self, client, entity_jid): + def get_trust_ui(self, client, entity_jid): if not entity_jid.resource: - entity_jid.resource = self.host.memory.getMainResource( + entity_jid.resource = self.host.memory.main_resource_get( client, entity_jid ) # FIXME: temporary and unsecure, must be changed when frontends # are refactored ctxMng = client._otr_context_manager - otrctx = ctxMng.getContextForUser(entity_jid) + otrctx = ctxMng.get_context_for_user(entity_jid) priv_key = ctxMng.account.privkey if priv_key is None: @@ -444,21 +444,21 @@ ) return dialog - def setTrust(raw_data, profile): - if xml_tools.isXMLUICancelled(raw_data): + def set_trust(raw_data, profile): + if xml_tools.is_xmlui_cancelled(raw_data): return {} # This method is called when authentication form is submited - data = xml_tools.XMLUIResult2DataFormResult(raw_data) + data = xml_tools.xmlui_result_2_data_form_result(raw_data) if data["match"] == "yes": otrctx.setCurrentTrust(OTR_STATE_TRUSTED) note_msg = _("Your correspondent {correspondent} is now TRUSTED") - self.host.bridge.otrState( + self.host.bridge.otr_state( OTR_STATE_TRUSTED, entity_jid.full(), client.profile ) else: otrctx.setCurrentTrust("") note_msg = _("Your correspondent {correspondent} is now UNTRUSTED") - self.host.bridge.otrState( + self.host.bridge.otr_state( OTR_STATE_UNTRUSTED, entity_jid.full(), client.profile ) note = xml_tools.XMLUI( @@ -470,8 +470,8 @@ ) return {"xmlui": note.toXml()} - submit_id = self.host.registerCallback(setTrust, with_data=True, one_shot=True) - trusted = otrctx.isTrusted() + submit_id = self.host.register_callback(set_trust, with_data=True, one_shot=True) + trusted = otrctx.is_trusted() xmlui = xml_tools.XMLUI( C.XMLUI_FORM, @@ -489,29 +489,29 @@ ) ) xmlui.addDivider("blank") - xmlui.changeContainer("pairs") + xmlui.change_container("pairs") xmlui.addLabel(D_("Is your correspondent fingerprint the same as here ?")) xmlui.addList( "match", [("yes", _("yes")), ("no", _("no"))], ["yes" if trusted else "no"] ) return xmlui - def _otrStartRefresh(self, menu_data, profile): + def _otr_start_refresh(self, menu_data, profile): """Start or refresh an OTR session @param menu_data: %(menu_data)s @param profile: %(doc_profile)s """ - client = self.host.getClient(profile) + client = self.host.get_client(profile) try: to_jid = jid.JID(menu_data["jid"]) except KeyError: log.error(_("jid key is not present !")) return defer.fail(exceptions.DataError) - self.startRefresh(client, to_jid) + self.start_refresh(client, to_jid) return {} - def startRefresh(self, client, to_jid): + def start_refresh(self, client, to_jid): """Start or refresh an OTR session @param to_jid(jid.JID): jid to start encrypted session with @@ -522,47 +522,47 @@ "Can't start an OTR session, there is already an encrypted session " "with {name}").format(name=encrypted_session['plugin'].name)) if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource( + to_jid.resource = self.host.memory.main_resource_get( client, to_jid ) # FIXME: temporary and unsecure, must be changed when frontends # are refactored - otrctx = client._otr_context_manager.getContextForUser(to_jid) + otrctx = client._otr_context_manager.get_context_for_user(to_jid) query = otrctx.sendMessage(0, b"?OTRv?") otrctx.inject(query) - def _otrSessionEnd(self, menu_data, profile): + def _otr_session_end(self, menu_data, profile): """End an OTR session @param menu_data: %(menu_data)s @param profile: %(doc_profile)s """ - client = self.host.getClient(profile) + client = self.host.get_client(profile) try: to_jid = jid.JID(menu_data["jid"]) except KeyError: log.error(_("jid key is not present !")) return defer.fail(exceptions.DataError) - self.endSession(client, to_jid) + self.end_session(client, to_jid) return {} - def endSession(self, client, to_jid): + def end_session(self, client, to_jid): """End an OTR session""" if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource( + to_jid.resource = self.host.memory.main_resource_get( client, to_jid ) # FIXME: temporary and unsecure, must be changed when frontends # are refactored - otrctx = client._otr_context_manager.getContextForUser(to_jid) + otrctx = client._otr_context_manager.get_context_for_user(to_jid) otrctx.disconnect() return {} - def _otrAuthenticate(self, menu_data, profile): + def _otr_authenticate(self, menu_data, profile): """End an OTR session @param menu_data: %(menu_data)s @param profile: %(doc_profile)s """ - client = self.host.getClient(profile) + client = self.host.get_client(profile) try: to_jid = jid.JID(menu_data["jid"]) except KeyError: @@ -572,20 +572,20 @@ def authenticate(self, client, to_jid): """Authenticate other user and see our own fingerprint""" - xmlui = self.getTrustUI(client, to_jid) + xmlui = self.get_trust_ui(client, to_jid) return {"xmlui": xmlui.toXml()} - def _dropPrivKey(self, menu_data, profile): + def _drop_priv_key(self, menu_data, profile): """Drop our private Key @param menu_data: %(menu_data)s @param profile: %(doc_profile)s """ - client = self.host.getClient(profile) + client = self.host.get_client(profile) try: to_jid = jid.JID(menu_data["jid"]) if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource( + to_jid.resource = self.host.memory.main_resource_get( client, to_jid ) # FIXME: temporary and unsecure, must be changed when frontends # are refactored @@ -599,7 +599,7 @@ "xmlui": xml_tools.note(_("You don't have a private key yet !")).toXml() } - def dropKey(data, profile): + def drop_key(data, profile): if C.bool(data["answer"]): # we end all sessions for context in list(ctxMng.contexts.values()): @@ -614,7 +614,7 @@ } return {} - submit_id = self.host.registerCallback(dropKey, with_data=True, one_shot=True) + submit_id = self.host.register_callback(drop_key, with_data=True, one_shot=True) confirm = xml_tools.XMLUI( C.XMLUI_DIALOG, @@ -624,10 +624,10 @@ ) return {"xmlui": confirm.toXml()} - def _receivedTreatment(self, data, client): + def _received_treatment(self, data, client): from_jid = data["from"] - log.debug("_receivedTreatment [from_jid = %s]" % from_jid) - otrctx = client._otr_context_manager.getContextForUser(from_jid) + log.debug("_received_treatment [from_jid = %s]" % from_jid) + otrctx = client._otr_context_manager.get_context_for_user(from_jid) try: message = ( @@ -699,17 +699,17 @@ exceptions.CancelError("Cancelled by OTR") ) # no message at all (no history, no signal) - client.encryption.markAsEncrypted(data, namespace=NS_OTR) - trusted = otrctx.isTrusted() + client.encryption.mark_as_encrypted(data, namespace=NS_OTR) + trusted = otrctx.is_trusted() if trusted: - client.encryption.markAsTrusted(data) + client.encryption.mark_as_trusted(data) else: - client.encryption.markAsUntrusted(data) + client.encryption.mark_as_untrusted(data) return data - def _receivedTreatmentForSkippedProfiles(self, data): + def _received_treatment_for_skipped_profiles(self, data): """This profile must be skipped because the frontend manages OTR itself, but we still need to check if the message must be stored in history or not @@ -731,7 +731,7 @@ data["history"] = C.HISTORY_SKIP return data - def messageReceivedTrigger(self, client, message_elt, post_treat): + def message_received_trigger(self, client, message_elt, post_treat): if client.is_component: return True if message_elt.getAttribute("type") == C.MESS_TYPE_GROUPCHAT: @@ -742,12 +742,12 @@ # OTR is only usable when resources are present return True if client.profile in self.skipped_profiles: - post_treat.addCallback(self._receivedTreatmentForSkippedProfiles) + post_treat.addCallback(self._received_treatment_for_skipped_profiles) else: - post_treat.addCallback(self._receivedTreatment, client) + post_treat.addCallback(self._received_treatment, client) return True - def _sendMessageDataTrigger(self, client, mess_data): + def _send_message_data_trigger(self, client, mess_data): if client.is_component: return True encryption = mess_data.get(C.MESS_KEY_ENCRYPTION) @@ -755,10 +755,10 @@ return to_jid = mess_data['to'] if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource( + to_jid.resource = self.host.memory.main_resource_get( client, to_jid ) # FIXME: temporary and unsecure, must be changed when frontends - otrctx = client._otr_context_manager.getContextForUser(to_jid) + otrctx = client._otr_context_manager.get_context_for_user(to_jid) message_elt = mess_data["xml"] if otrctx.state == potr.context.STATE_ENCRYPTED: log.debug("encrypting message") @@ -776,8 +776,8 @@ if body is None: log.warning("No message found") else: - self._p_carbons.setPrivate(message_elt) - self._p_hints.addHintElements(message_elt, [ + self._p_carbons.set_private(message_elt) + self._p_hints.add_hint_elements(message_elt, [ self._p_hints.HINT_NO_COPY, self._p_hints.HINT_NO_PERMANENT_STORE]) otrctx.sendMessage(0, str(body).encode("utf-8"), appdata=mess_data) @@ -791,7 +791,7 @@ client.feedback(to_jid, feedback) raise failure.Failure(exceptions.CancelError("Cancelled by OTR plugin")) - def sendMessageTrigger(self, client, mess_data, pre_xml_treatments, + def send_message_trigger(self, client, mess_data, pre_xml_treatments, post_xml_treatments): if client.is_component: return True @@ -808,32 +808,32 @@ return True if not to_jid.resource: - to_jid.resource = self.host.memory.getMainResource( + to_jid.resource = self.host.memory.main_resource_get( client, to_jid ) # FIXME: full jid may not be known - otrctx = client._otr_context_manager.getContextForUser(to_jid) + otrctx = client._otr_context_manager.get_context_for_user(to_jid) if otrctx.state != potr.context.STATE_PLAINTEXT: defer.ensureDeferred(client.encryption.start(to_jid, NS_OTR)) - client.encryption.setEncryptionFlag(mess_data) + client.encryption.set_encryption_flag(mess_data) if not mess_data["to"].resource: # if not resource was given, we force it here mess_data["to"] = to_jid return True - def _presenceReceivedTrigger(self, client, entity, show, priority, statuses): + def _presence_received_trigger(self, client, entity, show, priority, statuses): if show != C.PRESENCE_UNAVAILABLE: return True if not entity.resource: try: - entity.resource = self.host.memory.getMainResource( + entity.resource = self.host.memory.main_resource_get( client, entity ) # FIXME: temporary and unsecure, must be changed when frontends # are refactored except exceptions.UnknownEntityError: return True # entity was not connected if entity in client._otr_context_manager.contexts: - otrctx = client._otr_context_manager.getContextForUser(entity) + otrctx = client._otr_context_manager.get_context_for_user(entity) otrctx.disconnect() return True