diff libervia/backend/plugins/plugin_xep_0334.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0334.py@c23cad65ae99
children 3b3cd9453d9b
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0334.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,140 @@
+#!/usr/bin/env python3
+
+
+# SAT plugin for Delayed Delivery (XEP-0334)
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Iterable
+from libervia.backend.core.i18n import _, D_
+from libervia.backend.core.log import getLogger
+
+log = getLogger(__name__)
+from libervia.backend.core.constants import Const as C
+
+from libervia.backend.tools.common import data_format
+
+from wokkel import disco, iwokkel
+
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish
+from zope.interface import implementer
+from textwrap import dedent
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Message Processing Hints",
+    C.PI_IMPORT_NAME: "XEP-0334",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0334"],
+    C.PI_MAIN: "XEP_0334",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: D_("""Implementation of Message Processing Hints"""),
+    C.PI_USAGE: dedent(
+        D_(
+            """\
+             Frontends can use HINT_* constants in mess_data['extra'] in a serialized 'hints' dict.
+             Internal plugins can use directly add_hint([HINT_* constant]).
+             Will set mess_data['extra']['history'] to 'skipped' when no store is requested and message is not saved in history."""
+        )
+    ),
+}
+
+NS_HINTS = "urn:xmpp:hints"
+
+
+class XEP_0334(object):
+    HINT_NO_PERMANENT_STORE = "no-permanent-store"
+    HINT_NO_STORE = "no-store"
+    HINT_NO_COPY = "no-copy"
+    HINT_STORE = "store"
+    HINTS = (HINT_NO_PERMANENT_STORE, HINT_NO_STORE, HINT_NO_COPY, HINT_STORE)
+
+    def __init__(self, host):
+        log.info(_("Message Processing Hints plugin initialization"))
+        self.host = host
+        host.trigger.add("sendMessage", self.send_message_trigger)
+        host.trigger.add("message_received", self.message_received_trigger, priority=-1000)
+
+    def get_handler(self, client):
+        return XEP_0334_handler()
+
+    def add_hint(self, mess_data, hint):
+        if hint == self.HINT_NO_COPY and not mess_data["to"].resource:
+            log.error(
+                "{hint} can only be used with full jids! Ignoring it.".format(hint=hint)
+            )
+            return
+        hints = mess_data.setdefault("hints", set())
+        if hint in self.HINTS:
+            hints.add(hint)
+        else:
+            log.error("Unknown hint: {}".format(hint))
+
+    def add_hint_elements(self, message_elt: domish.Element, hints: Iterable[str]) -> None:
+        """Add hints elements to message stanza
+
+        @param message_elt: stanza where hints must be added
+        @param hints: hints to add
+        """
+        for hint in hints:
+            if not list(message_elt.elements(NS_HINTS, hint)):
+                message_elt.addElement((NS_HINTS, hint))
+            else:
+                log.debug('Not adding {hint!r} hint: it is already present in <message>')
+
+    def _send_post_xml_treatment(self, mess_data):
+        if "hints" in mess_data:
+            self.add_hint_elements(mess_data["xml"], mess_data["hints"])
+        return mess_data
+
+    def send_message_trigger(
+        self, client, mess_data, pre_xml_treatments, post_xml_treatments
+    ):
+        """Add the hints element to the message to be sent"""
+        if "hints" in mess_data["extra"]:
+            for hint in data_format.dict2iter("hints", mess_data["extra"], pop=True):
+                self.add_hint(hint)
+
+        post_xml_treatments.addCallback(self._send_post_xml_treatment)
+        return True
+
+    def _received_skip_history(self, mess_data):
+        mess_data["history"] = C.HISTORY_SKIP
+        return mess_data
+
+    def message_received_trigger(self, client, message_elt, post_treat):
+        """Check for hints in the received message"""
+        for elt in message_elt.elements():
+            if elt.uri == NS_HINTS and elt.name in (
+                self.HINT_NO_PERMANENT_STORE,
+                self.HINT_NO_STORE,
+            ):
+                log.debug("history will be skipped for this message, as requested")
+                post_treat.addCallback(self._received_skip_history)
+                break
+        return True
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0334_handler(xmlstream.XMPPHandler):
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_HINTS)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []