# HG changeset patch # User Goffi # Date 1655468123 -7200 # Node ID 60724ff3f27334c4cb941611a43a3da58a9a003f # Parent b5013bada4b6b4e7d130dc433712e929fa13293a plugin XEP-0422: Message Fastening implementation: rel 367 diff -r b5013bada4b6 -r 60724ff3f273 sat/plugins/plugin_xep_0422.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0422.py Fri Jun 17 14:15:23 2022 +0200 @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Optional, List, Tuple, Union, NamedTuple +from collections import namedtuple + +from twisted.words.protocols.jabber import xmlstream +from twisted.words.xish import domish +from wokkel import disco +from zope.interface import implementer + +from sat.core.constants import Const as C +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.core.core_types import SatXMPPEntity +from sat.memory.sqla_mapping import History +from sat.tools.common.async_utils import async_lru + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "Message Fastening", + C.PI_IMPORT_NAME: "XEP-0422", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0359", "XEP-0422"], + C.PI_MAIN: "XEP_0422", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation Message Fastening"""), +} + +NS_FASTEN = "urn:xmpp:fasten:0" + + +class FastenMetadata(NamedTuple): + elements: List[domish.Element] + id: str + history: Optional[History] + clear: bool + shell: bool + + +class XEP_0422(object): + + def __init__(self, host): + log.info(_("XEP-0422 (Message Fastening) plugin initialization")) + self.host = host + host.registerNamespace("fasten", NS_FASTEN) + + def getHandler(self, __): + return XEP_0422_handler() + + def applyToElt( + self, + message_elt: domish.Element, + origin_id: str, + clear: Optional[bool] = None, + shell: Optional[bool] = None, + children: Optional[List[domish.Element]] = None, + external: Optional[List[Union[str, Tuple[str, str]]]] = None + ) -> domish.Element: + """Generate, add and return element + + @param message_elt: wrapping element + @param origin_id: origin ID of the target message + @param clear: set to True to remove a fastening + @param shell: set to True when using e2ee shell + cf. https://xmpp.org/extensions/xep-0422.html#encryption + @param children: element to fasten to the target message + element is returned, thus children can also easily be added + afterwards + @param external: element to add + cf. https://xmpp.org/extensions/xep-0422.html#external-payloads + the list items can either be a str with only the element name, + or a tuple which must then be (namespace, name) + @return: element, which is already added to the wrapping message_elt + """ + apply_to_elt = message_elt.addElement((NS_FASTEN, "apply-to")) + apply_to_elt["id"] = origin_id + if clear is not None: + apply_to_elt["clear"] = C.boolConst(clear) + if shell is not None: + apply_to_elt["shell"] = C.boolConst(shell) + if children is not None: + for child in children: + apply_to_elt.addChild(child) + if external is not None: + for ext in external: + external_elt = apply_to_elt.addElement("external") + if isinstance(ext, str): + external_elt["name"] = ext + else: + ns, name = ext + external_elt["name"] = name + external_elt["element-namespace"] = ns + return apply_to_elt + + @async_lru(maxsize=5) + async def getFastenedElts( + self, + client: SatXMPPEntity, + message_elt: domish.Element + ) -> Optional[FastenMetadata]: + """Get fastened elements + + if the message contains no element, None is returned + """ + try: + apply_to_elt = next(message_elt.elements(NS_FASTEN, "apply-to")) + except StopIteration: + return None + else: + origin_id = apply_to_elt.getAttribute("id") + if not origin_id: + log.warning( + f"Received invalid fastening message: {message_elt.toXml()}" + ) + return None + elements = apply_to_elt.children + if not elements: + log.warning(f"No element to fasten: {message_elt.toXml()}") + return None + history = await self.host.memory.storage.get( + client, + History, + History.origin_id, + origin_id, + (History.messages, History.subjects, History.thread) + ) + return FastenMetadata( + elements, + origin_id, + history, + C.bool(apply_to_elt.getAttribute("clear", C.BOOL_FALSE)), + C.bool(apply_to_elt.getAttribute("shell", C.BOOL_FALSE)), + ) + + +@implementer(disco.IDisco) +class XEP_0422_handler(xmlstream.XMPPHandler): + + def getDiscoInfo(self, __, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_FASTEN)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []