diff libervia/backend/plugins/plugin_xep_0131.py @ 4314:6a70fcd93a7a

plugin XEP-0131: Stanza Headers and Internet Metadata implementation: - SHIM is now supported and put in `msg_data["extra"]["headers"]`. - `Keywords` are converted from and to list of string in `msg_data["extra"]["keywords"]` field (if present in headers on message sending, values are merged). - Python minimal version upgraded to 3.11 due to use of `StrEnum`. rel 451
author Goffi <goffi@goffi.org>
date Sat, 28 Sep 2024 15:56:04 +0200
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0131.py	Sat Sep 28 15:56:04 2024 +0200
@@ -0,0 +1,273 @@
+#!/usr/bin/env python3
+
+# Libervia plugin Stanza Headers and Internet Metadata (XEP-0131)
+# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from email.utils import quote, unquote
+from enum import StrEnum, auto
+import re
+from typing import Iterator, List, Literal, Optional, Self
+
+from pydantic import BaseModel, ConfigDict, Field, RootModel
+from twisted.internet import defer
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.models.core import MessageData
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Stanza Headers and Internet Metadata Plugin",
+    C.PI_IMPORT_NAME: "XEP-0131",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0131"],
+    C.PI_DEPENDENCIES: [],
+    C.PI_MAIN: "XEP_0131",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _(
+        "Enables the inclusion of non-addressing header information in XMPP stanzas."
+    ),
+}
+
+NS_SHIM = "http://jabber.org/protocol/shim"
+
+# Regex to match quoted and non-quoted values.
+RE_QUOTED_VALUES = re.compile(
+    r"""
+    # Match quoted phrases
+    "
+    (?:
+        # Match any escaped character
+        \\.
+        # Match any character that is not a double quote or a backslash
+        |[^"\\]
+    )*
+    "
+    |
+    # Match non-quoted phrases
+    (?:[^,]+)
+""",
+    re.VERBOSE,
+)
+
+
+class Urgency(StrEnum):
+    low = auto()
+    medium = auto()
+    high = auto()
+
+
+class Priority(StrEnum):
+    non_urgent = "non-urgent"
+    normal = auto()
+    urgent = auto()
+    emergency = auto()
+
+
+class HeadersData(BaseModel):
+    model_config = ConfigDict(extra="allow")
+
+    __pydantic_extra__: dict[str, str] = Field(init=False)  # type: ignore
+
+    keywords: str | None = None
+    urgency: Urgency | None = None
+    priority: Priority | None = None
+
+    def items(self):
+        return self.__pydantic_extra__.items()
+
+    def to_element(self) -> domish.Element:
+        """Build the <headers> element from this instance's data."""
+        headers_elt = domish.Element((NS_SHIM, "headers"))
+        header_names = list(self.model_fields.keys()) + list(
+            self.__pydantic_extra__.keys()
+        )
+        for name in header_names:
+            value = getattr(self, name)
+            if value is None:
+                continue
+            header_elt = headers_elt.addElement("header")
+            header_elt["name"] = name
+            header_elt.addContent(value)
+        return headers_elt
+
+    @classmethod
+    def from_element(cls, headers_elt: domish.Element) -> Self:
+        """Create a HeadersData instance from a <headers> element."""
+        if headers_elt.uri != NS_SHIM or headers_elt.name != "headers":
+            child_headers_elt = next(headers_elt.elements(NS_SHIM, "headers"), None)
+            if child_headers_elt is None:
+                raise exceptions.NotFound("<headers> element not found")
+            else:
+                headers_elt = child_headers_elt
+
+        headers = {}
+        for header_elt in headers_elt.elements(NS_SHIM, "header"):
+            name = header_elt.getAttribute("name")
+            value = str(header_elt)
+            headers[name] = value
+        return cls(**headers)
+
+
+class Keywords(RootModel):
+    root: list[str]
+
+    def __iter__(self) -> Iterator[str]:  # type: ignore
+        return iter(self.root)
+
+    def __getitem__(self, item) -> str:
+        return self.root[item]
+
+    def __len__(self) -> int:
+        return len(self.root)
+
+
+class XEP_0131:
+    """Implementation for XEP-0131"""
+
+    def __init__(self, host):
+        log.info(_("Stanza Headers and Internet Metadata plugin initialization"))
+        self.host = host
+        host.register_namespace("shim", NS_SHIM)
+        host.trigger.add("sendMessage", self.send_message_trigger)
+        host.trigger.add("sendMessageComponent", self.send_message_trigger)
+        host.trigger.add("message_received", self.message_received_trigger)
+
+    def quote_value(self, value: str) -> str:
+        """Quote a value if it contain special characters
+
+        @param value: Value to quote if necessary.
+        @return: Quoted value.
+        """
+        if any(c in value for c in r" ,\""):
+            value = f'"{quote(value)}"'
+        return value
+
+    def unquote_values(self, raw_header: str) -> list[str]:
+        """Unquote raw list of values header.
+
+        This is raw header for potentially quoted values separated by commas, like in the
+        "keywords" header.
+
+        @param raw_keywords_header: Raw Keywords header.
+        @return: A list of unquoted strings.
+        """
+        unquoted_values = []
+
+        for match in RE_QUOTED_VALUES.finditer(raw_header):
+            value = match.group(0).strip()
+
+            # Unquote the keyword if needed.
+            if value.startswith('"') and value.endswith('"'):
+                value = unquote(value)
+
+            value = value.strip()
+            if value:
+                unquoted_values.append(value)
+
+        return unquoted_values
+
+    def move_keywords_to_headers(self, extra: dict) -> None:
+        """Check if keywords are present in extra, and move them to headers.
+
+        The list of keywords will be converted to a header value and set in the right
+        location.
+
+        @param extra: MessageData's ``extra`` field. Will be modified in place by
+        creating/updating the ``headers`` field.
+        """
+        # Keywords can be in a list of strings in extra's "keywords" field.
+        if "keywords" in extra:
+            keywords = Keywords(extra["keywords"])
+            if keywords:
+                headers = extra.setdefault("headers", {})
+                quoted_kw = ",".join(self.quote_value(kw) for kw in keywords)
+                existing_kw = headers.get("keywords")
+                if existing_kw:
+                    # We have also keywords in headers, we merge both.
+                    quoted_kw = f"{existing_kw},{quoted_kw}"
+                headers["keywords"] = quoted_kw
+
+    def send_message_trigger(
+        self, client, mess_data, pre_xml_treatments, post_xml_treatments
+    ) -> Literal[True]:
+        """Process the XEP-0131 related data to be sent"""
+
+        def add_headers(mess_data: MessageData) -> MessageData:
+            extra = mess_data["extra"]
+            self.move_keywords_to_headers(extra)
+            # Now we parse headers, if any.
+            if "headers" in extra:
+                headers_data = HeadersData(**extra["headers"])
+                message_elt = mess_data["xml"]
+                message_elt.addChild(headers_data.to_element())
+            return mess_data
+
+        post_xml_treatments.addCallback(add_headers)
+        return True
+
+    def message_received_trigger(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred,
+    ) -> Literal[True]:
+        """Parse headers information and add them to message data."""
+        try:
+            headers = HeadersData.from_element(message_elt)
+        except exceptions.NotFound:
+            pass
+        else:
+
+            def post_treat_addr(mess_data: MessageData):
+                """Add the headers metadata to the message data"""
+                if headers.keywords:
+                    # We move keywords to a list of string in extra's "keywords" field.
+                    mess_data["extra"]["keywords"] = self.unquote_values(headers.keywords)
+                    headers.keywords = None
+                mess_data["extra"]["headers"] = headers.model_dump(
+                    mode="json", exclude_none=True
+                )
+                return mess_data
+
+            post_treat.addCallback(post_treat_addr)
+        return True
+
+    def get_handler(self, client):
+        return XEP_0131_handler()
+
+
+@implementer(disco.IDisco)
+class XEP_0131_handler(XMPPHandler):
+
+    def getDiscoInfo(
+        self, requestor, target, nodeIdentifier: Optional[str] = ""
+    ) -> List[disco.DiscoFeature]:
+        return [disco.DiscoFeature(NS_SHIM)]
+
+    def getDiscoItems(
+        self, requestor, target, nodeIdentifier: Optional[str] = ""
+    ) -> List[disco.DiscoItem]:
+        return []