view libervia/backend/plugins/plugin_xep_0131.py @ 4337:95792a1f26c7

component email gateway: attachments handling: attachments are now stored, and metadata are created in database. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 6a70fcd93a7a
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin Stanza Headers and Internet Metadata (XEP-0131)
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from email.utils import quote, unquote
from enum import StrEnum, auto
import re
from typing import Iterator, List, Literal, Optional, Self

from pydantic import BaseModel, ConfigDict, Field, RootModel
from twisted.internet import defer
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.models.core import MessageData

log = getLogger(__name__)

PLUGIN_INFO = {
    C.PI_NAME: "Stanza Headers and Internet Metadata Plugin",
    C.PI_IMPORT_NAME: "XEP-0131",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0131"],
    C.PI_DEPENDENCIES: [],
    C.PI_MAIN: "XEP_0131",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(
        "Enables the inclusion of non-addressing header information in XMPP stanzas."
    ),
}

NS_SHIM = "http://jabber.org/protocol/shim"

# Regex to match quoted and non-quoted values.
RE_QUOTED_VALUES = re.compile(
    r"""
    # Match quoted phrases
    "
    (?:
        # Match any escaped character
        \\.
        # Match any character that is not a double quote or a backslash
        |[^"\\]
    )*
    "
    |
    # Match non-quoted phrases
    (?:[^,]+)
""",
    re.VERBOSE,
)


class Urgency(StrEnum):
    low = auto()
    medium = auto()
    high = auto()


class Priority(StrEnum):
    non_urgent = "non-urgent"
    normal = auto()
    urgent = auto()
    emergency = auto()


class HeadersData(BaseModel):
    model_config = ConfigDict(extra="allow")

    __pydantic_extra__: dict[str, str] = Field(init=False)  # type: ignore

    keywords: str | None = None
    urgency: Urgency | None = None
    priority: Priority | None = None

    def items(self):
        return self.__pydantic_extra__.items()

    def to_element(self) -> domish.Element:
        """Build the <headers> element from this instance's data."""
        headers_elt = domish.Element((NS_SHIM, "headers"))
        header_names = list(self.model_fields.keys()) + list(
            self.__pydantic_extra__.keys()
        )
        for name in header_names:
            value = getattr(self, name)
            if value is None:
                continue
            header_elt = headers_elt.addElement("header")
            header_elt["name"] = name
            header_elt.addContent(value)
        return headers_elt

    @classmethod
    def from_element(cls, headers_elt: domish.Element) -> Self:
        """Create a HeadersData instance from a <headers> element."""
        if headers_elt.uri != NS_SHIM or headers_elt.name != "headers":
            child_headers_elt = next(headers_elt.elements(NS_SHIM, "headers"), None)
            if child_headers_elt is None:
                raise exceptions.NotFound("<headers> element not found")
            else:
                headers_elt = child_headers_elt

        headers = {}
        for header_elt in headers_elt.elements(NS_SHIM, "header"):
            name = header_elt.getAttribute("name")
            value = str(header_elt)
            headers[name] = value
        return cls(**headers)


class Keywords(RootModel):
    root: list[str]

    def __iter__(self) -> Iterator[str]:  # type: ignore
        return iter(self.root)

    def __getitem__(self, item) -> str:
        return self.root[item]

    def __len__(self) -> int:
        return len(self.root)


class XEP_0131:
    """Implementation for XEP-0131"""

    def __init__(self, host):
        log.info(_("Stanza Headers and Internet Metadata plugin initialization"))
        self.host = host
        host.register_namespace("shim", NS_SHIM)
        host.trigger.add("sendMessage", self.send_message_trigger)
        host.trigger.add("sendMessageComponent", self.send_message_trigger)
        host.trigger.add("message_received", self.message_received_trigger)

    def quote_value(self, value: str) -> str:
        """Quote a value if it contain special characters

        @param value: Value to quote if necessary.
        @return: Quoted value.
        """
        if any(c in value for c in r" ,\""):
            value = f'"{quote(value)}"'
        return value

    def unquote_values(self, raw_header: str) -> list[str]:
        """Unquote raw list of values header.

        This is raw header for potentially quoted values separated by commas, like in the
        "keywords" header.

        @param raw_keywords_header: Raw Keywords header.
        @return: A list of unquoted strings.
        """
        unquoted_values = []

        for match in RE_QUOTED_VALUES.finditer(raw_header):
            value = match.group(0).strip()

            # Unquote the keyword if needed.
            if value.startswith('"') and value.endswith('"'):
                value = unquote(value)

            value = value.strip()
            if value:
                unquoted_values.append(value)

        return unquoted_values

    def move_keywords_to_headers(self, extra: dict) -> None:
        """Check if keywords are present in extra, and move them to headers.

        The list of keywords will be converted to a header value and set in the right
        location.

        @param extra: MessageData's ``extra`` field. Will be modified in place by
        creating/updating the ``headers`` field.
        """
        # Keywords can be in a list of strings in extra's "keywords" field.
        if "keywords" in extra:
            keywords = Keywords(extra["keywords"])
            if keywords:
                headers = extra.setdefault("headers", {})
                quoted_kw = ",".join(self.quote_value(kw) for kw in keywords)
                existing_kw = headers.get("keywords")
                if existing_kw:
                    # We have also keywords in headers, we merge both.
                    quoted_kw = f"{existing_kw},{quoted_kw}"
                headers["keywords"] = quoted_kw

    def send_message_trigger(
        self, client, mess_data, pre_xml_treatments, post_xml_treatments
    ) -> Literal[True]:
        """Process the XEP-0131 related data to be sent"""

        def add_headers(mess_data: MessageData) -> MessageData:
            extra = mess_data["extra"]
            self.move_keywords_to_headers(extra)
            # Now we parse headers, if any.
            if "headers" in extra:
                headers_data = HeadersData(**extra["headers"])
                message_elt = mess_data["xml"]
                message_elt.addChild(headers_data.to_element())
            return mess_data

        post_xml_treatments.addCallback(add_headers)
        return True

    def message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred,
    ) -> Literal[True]:
        """Parse headers information and add them to message data."""
        try:
            headers = HeadersData.from_element(message_elt)
        except exceptions.NotFound:
            pass
        else:

            def post_treat_addr(mess_data: MessageData):
                """Add the headers metadata to the message data"""
                if headers.keywords:
                    # We move keywords to a list of string in extra's "keywords" field.
                    mess_data["extra"]["keywords"] = self.unquote_values(headers.keywords)
                    headers.keywords = None
                mess_data["extra"]["headers"] = headers.model_dump(
                    mode="json", exclude_none=True
                )
                return mess_data

            post_treat.addCallback(post_treat_addr)
        return True

    def get_handler(self, client):
        return XEP_0131_handler()


@implementer(disco.IDisco)
class XEP_0131_handler(XMPPHandler):

    def getDiscoInfo(
        self, requestor, target, nodeIdentifier: Optional[str] = ""
    ) -> List[disco.DiscoFeature]:
        return [disco.DiscoFeature(NS_SHIM)]

    def getDiscoItems(
        self, requestor, target, nodeIdentifier: Optional[str] = ""
    ) -> List[disco.DiscoItem]:
        return []