view libervia/backend/plugins/plugin_xep_0334.py @ 4180:b86912d3fd33

plugin IP: fix use of legacy URL + coroutine use: An https:/salut-a-toi.org URL was used to retrieve external IP, but it's not valid anymore, resulting in an exception. This feature is currently disabled. Also moved several methods from legacy inline callbacks to coroutines.
author Goffi <goffi@goffi.org>
date Sat, 09 Dec 2023 14:30:54 +0100
parents 3b3cd9453d9b
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for Delayed Delivery (XEP-0334)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Iterable
from libervia.backend.core.i18n import _, D_
from libervia.backend.core.log import getLogger

log = getLogger(__name__)
from libervia.backend.core.constants import Const as C

from libervia.backend.tools.common import data_format

from wokkel import disco, iwokkel

from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from zope.interface import implementer
from textwrap import dedent


PLUGIN_INFO = {
    C.PI_NAME: "Message Processing Hints",
    C.PI_IMPORT_NAME: "XEP-0334",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0334"],
    C.PI_MAIN: "XEP_0334",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: D_("""Implementation of Message Processing Hints"""),
    C.PI_USAGE: dedent(
        D_(
            """\
             Frontends can use HINT_* constants in mess_data['extra'] in a serialized 'hints' dict.
             Internal plugins can use directly add_hint([HINT_* constant]).
             Will set mess_data['extra']['history'] to 'skipped' when no store is requested and message is not saved in history."""
        )
    ),
}

NS_HINTS = "urn:xmpp:hints"


class XEP_0334(object):
    HINT_NO_PERMANENT_STORE = "no-permanent-store"
    HINT_NO_STORE = "no-store"
    HINT_NO_COPY = "no-copy"
    HINT_STORE = "store"
    HINTS = (HINT_NO_PERMANENT_STORE, HINT_NO_STORE, HINT_NO_COPY, HINT_STORE)

    def __init__(self, host):
        log.info(_("Message Processing Hints plugin initialization"))
        self.host = host
        host.trigger.add("sendMessage", self.send_message_trigger)
        host.trigger.add("message_received", self.message_received_trigger, priority=-1000)

    def get_handler(self, client):
        return XEP_0334_handler()

    def add_hint(self, mess_data, hint):
        if hint == self.HINT_NO_COPY and not mess_data["to"].resource:
            log.error(
                "{hint} can only be used with full jids! Ignoring it.".format(hint=hint)
            )
            return
        hints = mess_data.setdefault("hints", set())
        if hint in self.HINTS:
            hints.add(hint)
        else:
            log.error("Unknown hint: {}".format(hint))

    def add_hint_elements(self, message_elt: domish.Element, hints: Iterable[str]) -> None:
        """Add hints elements to message stanza

        @param message_elt: stanza where hints must be added
        @param hints: hints to add
        """
        for hint in hints:
            if not list(message_elt.elements(NS_HINTS, hint)):
                message_elt.addElement((NS_HINTS, hint))
            else:
                log.debug(f'Not adding {hint!r} hint: it is already present in <message>')

    def _send_post_xml_treatment(self, mess_data):
        if "hints" in mess_data:
            self.add_hint_elements(mess_data["xml"], mess_data["hints"])
        return mess_data

    def send_message_trigger(
        self, client, mess_data, pre_xml_treatments, post_xml_treatments
    ):
        """Add the hints element to the message to be sent"""
        if "hints" in mess_data["extra"]:
            for hint in data_format.dict2iter("hints", mess_data["extra"], pop=True):
                self.add_hint(hint)

        post_xml_treatments.addCallback(self._send_post_xml_treatment)
        return True

    def _received_skip_history(self, mess_data):
        mess_data["history"] = C.HISTORY_SKIP
        return mess_data

    def message_received_trigger(self, client, message_elt, post_treat):
        """Check for hints in the received message"""
        for elt in message_elt.elements():
            if elt.uri == NS_HINTS and elt.name in (
                self.HINT_NO_PERMANENT_STORE,
                self.HINT_NO_STORE,
            ):
                log.debug("history will be skipped for this message, as requested")
                post_treat.addCallback(self._received_skip_history)
                break
        return True


@implementer(iwokkel.IDisco)
class XEP_0334_handler(xmlstream.XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_HINTS)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []