view libervia/backend/plugins/plugin_xep_0334.py @ 4095:684ba556a617

core (memory/sqla_mapping): fix legacy pickled values: folloing packages refactoring, legacy pickled values could not be unpickled (due to use of old classes). This temporary workaround fix it, but the right thing to do will be to move from pickle to JSON at some point.
author Goffi <goffi@goffi.org>
date Mon, 12 Jun 2023 14:57:27 +0200
parents 4b842c1fb686
children 3b3cd9453d9b
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for Delayed Delivery (XEP-0334)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Iterable
from libervia.backend.core.i18n import _, D_
from libervia.backend.core.log import getLogger

log = getLogger(__name__)
from libervia.backend.core.constants import Const as C

from libervia.backend.tools.common import data_format

from wokkel import disco, iwokkel

from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from zope.interface import implementer
from textwrap import dedent


PLUGIN_INFO = {
    C.PI_NAME: "Message Processing Hints",
    C.PI_IMPORT_NAME: "XEP-0334",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0334"],
    C.PI_MAIN: "XEP_0334",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: D_("""Implementation of Message Processing Hints"""),
    C.PI_USAGE: dedent(
        D_(
            """\
             Frontends can use HINT_* constants in mess_data['extra'] in a serialized 'hints' dict.
             Internal plugins can use directly add_hint([HINT_* constant]).
             Will set mess_data['extra']['history'] to 'skipped' when no store is requested and message is not saved in history."""
        )
    ),
}

NS_HINTS = "urn:xmpp:hints"


class XEP_0334(object):
    HINT_NO_PERMANENT_STORE = "no-permanent-store"
    HINT_NO_STORE = "no-store"
    HINT_NO_COPY = "no-copy"
    HINT_STORE = "store"
    HINTS = (HINT_NO_PERMANENT_STORE, HINT_NO_STORE, HINT_NO_COPY, HINT_STORE)

    def __init__(self, host):
        log.info(_("Message Processing Hints plugin initialization"))
        self.host = host
        host.trigger.add("sendMessage", self.send_message_trigger)
        host.trigger.add("message_received", self.message_received_trigger, priority=-1000)

    def get_handler(self, client):
        return XEP_0334_handler()

    def add_hint(self, mess_data, hint):
        if hint == self.HINT_NO_COPY and not mess_data["to"].resource:
            log.error(
                "{hint} can only be used with full jids! Ignoring it.".format(hint=hint)
            )
            return
        hints = mess_data.setdefault("hints", set())
        if hint in self.HINTS:
            hints.add(hint)
        else:
            log.error("Unknown hint: {}".format(hint))

    def add_hint_elements(self, message_elt: domish.Element, hints: Iterable[str]) -> None:
        """Add hints elements to message stanza

        @param message_elt: stanza where hints must be added
        @param hints: hints to add
        """
        for hint in hints:
            if not list(message_elt.elements(NS_HINTS, hint)):
                message_elt.addElement((NS_HINTS, hint))
            else:
                log.debug('Not adding {hint!r} hint: it is already present in <message>')

    def _send_post_xml_treatment(self, mess_data):
        if "hints" in mess_data:
            self.add_hint_elements(mess_data["xml"], mess_data["hints"])
        return mess_data

    def send_message_trigger(
        self, client, mess_data, pre_xml_treatments, post_xml_treatments
    ):
        """Add the hints element to the message to be sent"""
        if "hints" in mess_data["extra"]:
            for hint in data_format.dict2iter("hints", mess_data["extra"], pop=True):
                self.add_hint(hint)

        post_xml_treatments.addCallback(self._send_post_xml_treatment)
        return True

    def _received_skip_history(self, mess_data):
        mess_data["history"] = C.HISTORY_SKIP
        return mess_data

    def message_received_trigger(self, client, message_elt, post_treat):
        """Check for hints in the received message"""
        for elt in message_elt.elements():
            if elt.uri == NS_HINTS and elt.name in (
                self.HINT_NO_PERMANENT_STORE,
                self.HINT_NO_STORE,
            ):
                log.debug("history will be skipped for this message, as requested")
                post_treat.addCallback(self._received_skip_history)
                break
        return True


@implementer(iwokkel.IDisco)
class XEP_0334_handler(xmlstream.XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_HINTS)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []