view sat/plugins/plugin_xep_0334.py @ 4049:b56bf0c6c064

tests (unit): XEP-0320 tests: fix 421
author Goffi <goffi@goffi.org>
date Mon, 15 May 2023 17:02:41 +0200
parents 524856bd7b19
children c23cad65ae99
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for Delayed Delivery (XEP-0334)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Iterable
from sat.core.i18n import _, D_
from sat.core.log import getLogger

log = getLogger(__name__)
from sat.core.constants import Const as C

from sat.tools.common import data_format

from wokkel import disco, iwokkel

from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from zope.interface import implementer
from textwrap import dedent


PLUGIN_INFO = {
    C.PI_NAME: "Message Processing Hints",
    C.PI_IMPORT_NAME: "XEP-0334",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0334"],
    C.PI_MAIN: "XEP_0334",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: D_("""Implementation of Message Processing Hints"""),
    C.PI_USAGE: dedent(
        D_(
            """\
             Frontends can use HINT_* constants in mess_data['extra'] in a serialized 'hints' dict.
             Internal plugins can use directly add_hint([HINT_* constant]).
             Will set mess_data['extra']['history'] to 'skipped' when no store is requested and message is not saved in history."""
        )
    ),
}

NS_HINTS = "urn:xmpp:hints"


class XEP_0334(object):
    HINT_NO_PERMANENT_STORE = "no-permanent-store"
    HINT_NO_STORE = "no-store"
    HINT_NO_COPY = "no-copy"
    HINT_STORE = "store"
    HINTS = (HINT_NO_PERMANENT_STORE, HINT_NO_STORE, HINT_NO_COPY, HINT_STORE)

    def __init__(self, host):
        log.info(_("Message Processing Hints plugin initialization"))
        self.host = host
        host.trigger.add("sendMessage", self.send_message_trigger)
        host.trigger.add("messageReceived", self.message_received_trigger, priority=-1000)

    def get_handler(self, client):
        return XEP_0334_handler()

    def add_hint(self, mess_data, hint):
        if hint == self.HINT_NO_COPY and not mess_data["to"].resource:
            log.error(
                "{hint} can only be used with full jids! Ignoring it.".format(hint=hint)
            )
            return
        hints = mess_data.setdefault("hints", set())
        if hint in self.HINTS:
            hints.add(hint)
        else:
            log.error("Unknown hint: {}".format(hint))

    def add_hint_elements(self, message_elt: domish.Element, hints: Iterable[str]) -> None:
        """Add hints elements to message stanza

        @param message_elt: stanza where hints must be added
        @param hints: hints to add
        """
        for hint in hints:
            if not list(message_elt.elements(NS_HINTS, hint)):
                message_elt.addElement((NS_HINTS, hint))
            else:
                log.debug('Not adding {hint!r} hint: it is already present in <message>')

    def _send_post_xml_treatment(self, mess_data):
        if "hints" in mess_data:
            self.add_hint_elements(mess_data["xml"], mess_data["hints"])
        return mess_data

    def send_message_trigger(
        self, client, mess_data, pre_xml_treatments, post_xml_treatments
    ):
        """Add the hints element to the message to be sent"""
        if "hints" in mess_data["extra"]:
            for hint in data_format.dict2iter("hints", mess_data["extra"], pop=True):
                self.add_hint(hint)

        post_xml_treatments.addCallback(self._send_post_xml_treatment)
        return True

    def _received_skip_history(self, mess_data):
        mess_data["history"] = C.HISTORY_SKIP
        return mess_data

    def message_received_trigger(self, client, message_elt, post_treat):
        """Check for hints in the received message"""
        for elt in message_elt.elements():
            if elt.uri == NS_HINTS and elt.name in (
                self.HINT_NO_PERMANENT_STORE,
                self.HINT_NO_STORE,
            ):
                log.debug("history will be skipped for this message, as requested")
                post_treat.addCallback(self._received_skip_history)
                break
        return True


@implementer(iwokkel.IDisco)
class XEP_0334_handler(xmlstream.XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_HINTS)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []