changeset 3799:60724ff3f273

plugin XEP-0422: Message Fastening implementation: rel 367
author Goffi <goffi@goffi.org>
date Fri, 17 Jun 2022 14:15:23 +0200
parents b5013bada4b6
children 2033fa3c5b85
files sat/plugins/plugin_xep_0422.py
diffstat 1 files changed, 161 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0422.py	Fri Jun 17 14:15:23 2022 +0200
@@ -0,0 +1,161 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Optional, List, Tuple, Union, NamedTuple
+from collections import namedtuple
+
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish
+from wokkel import disco
+from zope.interface import implementer
+
+from sat.core.constants import Const as C
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.core.core_types import SatXMPPEntity
+from sat.memory.sqla_mapping import History
+from sat.tools.common.async_utils import async_lru
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Message Fastening",
+    C.PI_IMPORT_NAME: "XEP-0422",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0359", "XEP-0422"],
+    C.PI_MAIN: "XEP_0422",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Implementation Message Fastening"""),
+}
+
+NS_FASTEN = "urn:xmpp:fasten:0"
+
+
+class FastenMetadata(NamedTuple):
+    elements: List[domish.Element]
+    id: str
+    history: Optional[History]
+    clear: bool
+    shell: bool
+
+
+class XEP_0422(object):
+
+    def __init__(self, host):
+        log.info(_("XEP-0422 (Message Fastening) plugin initialization"))
+        self.host = host
+        host.registerNamespace("fasten", NS_FASTEN)
+
+    def getHandler(self, __):
+        return XEP_0422_handler()
+
+    def applyToElt(
+        self,
+        message_elt: domish.Element,
+        origin_id: str,
+        clear: Optional[bool] = None,
+        shell: Optional[bool] = None,
+        children: Optional[List[domish.Element]] = None,
+        external: Optional[List[Union[str, Tuple[str, str]]]] = None
+    ) -> domish.Element:
+        """Generate, add and return <apply-to> element
+
+        @param message_elt: wrapping <message> element
+        @param origin_id: origin ID of the target message
+        @param clear: set to True to remove a fastening
+        @param shell: set to True when using e2ee shell
+            cf. https://xmpp.org/extensions/xep-0422.html#encryption
+        @param children: element to fasten to the target message
+            <apply-to> element is returned, thus children can also easily be added
+            afterwards
+        @param external: <external> element to add
+            cf. https://xmpp.org/extensions/xep-0422.html#external-payloads
+            the list items can either be a str with only the element name,
+            or a tuple which must then be (namespace, name)
+        @return: <apply-to> element, which is already added to the wrapping message_elt
+        """
+        apply_to_elt = message_elt.addElement((NS_FASTEN, "apply-to"))
+        apply_to_elt["id"] = origin_id
+        if clear is not None:
+            apply_to_elt["clear"] = C.boolConst(clear)
+        if shell is not None:
+            apply_to_elt["shell"] = C.boolConst(shell)
+        if children is not None:
+            for child in children:
+                apply_to_elt.addChild(child)
+        if external is not None:
+            for ext in external:
+                external_elt = apply_to_elt.addElement("external")
+                if isinstance(ext, str):
+                    external_elt["name"] = ext
+                else:
+                    ns, name = ext
+                    external_elt["name"] = name
+                    external_elt["element-namespace"] = ns
+        return apply_to_elt
+
+    @async_lru(maxsize=5)
+    async def getFastenedElts(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element
+    ) -> Optional[FastenMetadata]:
+        """Get fastened elements
+
+        if the message contains no <apply-to> element, None is returned
+        """
+        try:
+            apply_to_elt = next(message_elt.elements(NS_FASTEN, "apply-to"))
+        except StopIteration:
+            return None
+        else:
+            origin_id = apply_to_elt.getAttribute("id")
+            if not origin_id:
+                log.warning(
+                    f"Received invalid fastening message: {message_elt.toXml()}"
+                )
+                return None
+            elements = apply_to_elt.children
+            if not elements:
+                log.warning(f"No element to fasten: {message_elt.toXml()}")
+                return None
+            history = await self.host.memory.storage.get(
+                client,
+                History,
+                History.origin_id,
+                origin_id,
+                (History.messages, History.subjects, History.thread)
+            )
+            return FastenMetadata(
+                elements,
+                origin_id,
+                history,
+                C.bool(apply_to_elt.getAttribute("clear", C.BOOL_FALSE)),
+                C.bool(apply_to_elt.getAttribute("shell", C.BOOL_FALSE)),
+            )
+
+
+@implementer(disco.IDisco)
+class XEP_0422_handler(xmlstream.XMPPHandler):
+
+    def getDiscoInfo(self, __, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_FASTEN)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []