Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0106.py @ 4318:27bb22eace65
tests (unit/email gateway): add test for XEP-0131 handling:
rel 451
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 28 Sep 2024 15:59:48 +0200 |
parents | 060d695ae98e |
children |
line wrap: on
line source
#!/usr/bin/env python3 # SAT plugin for Explicit Message Encryption # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger from twisted.words.protocols.jabber import xmlstream from zope.interface import implementer from wokkel import disco log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "JID Escaping", C.PI_IMPORT_NAME: "XEP-0106", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0106"], C.PI_DEPENDENCIES: [], C.PI_MAIN: "XEP_0106", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""(Un)escape JID to use disallowed chars in local parts"""), } NS_JID_ESCAPING = r"jid\20escaping" ESCAPE_MAP = { " ": r"\20", '"': r"\22", "&": r"\26", "'": r"\27", "/": r"\2f", ":": r"\3a", "<": r"\3c", ">": r"\3e", "@": r"\40", "\\": r"\5c", } class XEP_0106(object): def __init__(self, host): self.reverse_map = {v: k for k, v in ESCAPE_MAP.items()} def get_handler(self, client): return XEP_0106_handler() def escape(self, text) -> str: """Escape text @param text(unicode): text to escape @return (unicode): escaped text @raise ValueError: text can't be escaped """ if not text or text[0] == " " or text[-1] == " ": raise ValueError("text must not be empty, or start or end with a whitespace") escaped = [] for c in text: if c in ESCAPE_MAP: escaped.append(ESCAPE_MAP[c]) else: escaped.append(c) return "".join(escaped) def unescape(self, escaped) -> str: """Unescape text @param escaped(unicode): text to unescape @return (unicode): unescaped text @raise ValueError: text can't be unescaped """ if not escaped or escaped.startswith(r"\27") or escaped.endswith(r"\27"): raise ValueError( "escaped value must not be empty, or start or end with a " f"whitespace: rejected value is {escaped!r}" ) unescaped = [] idx = 0 while idx < len(escaped): char_seq = escaped[idx : idx + 3] if char_seq in self.reverse_map: unescaped.append(self.reverse_map[char_seq]) idx += 3 else: unescaped.append(escaped[idx]) idx += 1 return "".join(unescaped) @implementer(disco.IDisco) class XEP_0106_handler(xmlstream.XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JID_ESCAPING)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []