Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0131.py @ 4314:6a70fcd93a7a
plugin XEP-0131: Stanza Headers and Internet Metadata implementation:
- SHIM is now supported and put in `msg_data["extra"]["headers"]`.
- `Keywords` are converted from and to list of string in `msg_data["extra"]["keywords"]`
field (if present in headers on message sending, values are merged).
- Python minimal version upgraded to 3.11 due to use of `StrEnum`.
rel 451
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 28 Sep 2024 15:56:04 +0200 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0131.py Sat Sep 28 15:56:04 2024 +0200 @@ -0,0 +1,273 @@ +#!/usr/bin/env python3 + +# Libervia plugin Stanza Headers and Internet Metadata (XEP-0131) +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from email.utils import quote, unquote +from enum import StrEnum, auto +import re +from typing import Iterator, List, Literal, Optional, Self + +from pydantic import BaseModel, ConfigDict, Field, RootModel +from twisted.internet import defer +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.models.core import MessageData + +log = getLogger(__name__) + +PLUGIN_INFO = { + C.PI_NAME: "Stanza Headers and Internet Metadata Plugin", + C.PI_IMPORT_NAME: "XEP-0131", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0131"], + C.PI_DEPENDENCIES: [], + C.PI_MAIN: "XEP_0131", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _( + "Enables the inclusion of non-addressing header information in XMPP stanzas." + ), +} + +NS_SHIM = "http://jabber.org/protocol/shim" + +# Regex to match quoted and non-quoted values. +RE_QUOTED_VALUES = re.compile( + r""" + # Match quoted phrases + " + (?: + # Match any escaped character + \\. + # Match any character that is not a double quote or a backslash + |[^"\\] + )* + " + | + # Match non-quoted phrases + (?:[^,]+) +""", + re.VERBOSE, +) + + +class Urgency(StrEnum): + low = auto() + medium = auto() + high = auto() + + +class Priority(StrEnum): + non_urgent = "non-urgent" + normal = auto() + urgent = auto() + emergency = auto() + + +class HeadersData(BaseModel): + model_config = ConfigDict(extra="allow") + + __pydantic_extra__: dict[str, str] = Field(init=False) # type: ignore + + keywords: str | None = None + urgency: Urgency | None = None + priority: Priority | None = None + + def items(self): + return self.__pydantic_extra__.items() + + def to_element(self) -> domish.Element: + """Build the <headers> element from this instance's data.""" + headers_elt = domish.Element((NS_SHIM, "headers")) + header_names = list(self.model_fields.keys()) + list( + self.__pydantic_extra__.keys() + ) + for name in header_names: + value = getattr(self, name) + if value is None: + continue + header_elt = headers_elt.addElement("header") + header_elt["name"] = name + header_elt.addContent(value) + return headers_elt + + @classmethod + def from_element(cls, headers_elt: domish.Element) -> Self: + """Create a HeadersData instance from a <headers> element.""" + if headers_elt.uri != NS_SHIM or headers_elt.name != "headers": + child_headers_elt = next(headers_elt.elements(NS_SHIM, "headers"), None) + if child_headers_elt is None: + raise exceptions.NotFound("<headers> element not found") + else: + headers_elt = child_headers_elt + + headers = {} + for header_elt in headers_elt.elements(NS_SHIM, "header"): + name = header_elt.getAttribute("name") + value = str(header_elt) + headers[name] = value + return cls(**headers) + + +class Keywords(RootModel): + root: list[str] + + def __iter__(self) -> Iterator[str]: # type: ignore + return iter(self.root) + + def __getitem__(self, item) -> str: + return self.root[item] + + def __len__(self) -> int: + return len(self.root) + + +class XEP_0131: + """Implementation for XEP-0131""" + + def __init__(self, host): + log.info(_("Stanza Headers and Internet Metadata plugin initialization")) + self.host = host + host.register_namespace("shim", NS_SHIM) + host.trigger.add("sendMessage", self.send_message_trigger) + host.trigger.add("sendMessageComponent", self.send_message_trigger) + host.trigger.add("message_received", self.message_received_trigger) + + def quote_value(self, value: str) -> str: + """Quote a value if it contain special characters + + @param value: Value to quote if necessary. + @return: Quoted value. + """ + if any(c in value for c in r" ,\""): + value = f'"{quote(value)}"' + return value + + def unquote_values(self, raw_header: str) -> list[str]: + """Unquote raw list of values header. + + This is raw header for potentially quoted values separated by commas, like in the + "keywords" header. + + @param raw_keywords_header: Raw Keywords header. + @return: A list of unquoted strings. + """ + unquoted_values = [] + + for match in RE_QUOTED_VALUES.finditer(raw_header): + value = match.group(0).strip() + + # Unquote the keyword if needed. + if value.startswith('"') and value.endswith('"'): + value = unquote(value) + + value = value.strip() + if value: + unquoted_values.append(value) + + return unquoted_values + + def move_keywords_to_headers(self, extra: dict) -> None: + """Check if keywords are present in extra, and move them to headers. + + The list of keywords will be converted to a header value and set in the right + location. + + @param extra: MessageData's ``extra`` field. Will be modified in place by + creating/updating the ``headers`` field. + """ + # Keywords can be in a list of strings in extra's "keywords" field. + if "keywords" in extra: + keywords = Keywords(extra["keywords"]) + if keywords: + headers = extra.setdefault("headers", {}) + quoted_kw = ",".join(self.quote_value(kw) for kw in keywords) + existing_kw = headers.get("keywords") + if existing_kw: + # We have also keywords in headers, we merge both. + quoted_kw = f"{existing_kw},{quoted_kw}" + headers["keywords"] = quoted_kw + + def send_message_trigger( + self, client, mess_data, pre_xml_treatments, post_xml_treatments + ) -> Literal[True]: + """Process the XEP-0131 related data to be sent""" + + def add_headers(mess_data: MessageData) -> MessageData: + extra = mess_data["extra"] + self.move_keywords_to_headers(extra) + # Now we parse headers, if any. + if "headers" in extra: + headers_data = HeadersData(**extra["headers"]) + message_elt = mess_data["xml"] + message_elt.addChild(headers_data.to_element()) + return mess_data + + post_xml_treatments.addCallback(add_headers) + return True + + def message_received_trigger( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + post_treat: defer.Deferred, + ) -> Literal[True]: + """Parse headers information and add them to message data.""" + try: + headers = HeadersData.from_element(message_elt) + except exceptions.NotFound: + pass + else: + + def post_treat_addr(mess_data: MessageData): + """Add the headers metadata to the message data""" + if headers.keywords: + # We move keywords to a list of string in extra's "keywords" field. + mess_data["extra"]["keywords"] = self.unquote_values(headers.keywords) + headers.keywords = None + mess_data["extra"]["headers"] = headers.model_dump( + mode="json", exclude_none=True + ) + return mess_data + + post_treat.addCallback(post_treat_addr) + return True + + def get_handler(self, client): + return XEP_0131_handler() + + +@implementer(disco.IDisco) +class XEP_0131_handler(XMPPHandler): + + def getDiscoInfo( + self, requestor, target, nodeIdentifier: Optional[str] = "" + ) -> List[disco.DiscoFeature]: + return [disco.DiscoFeature(NS_SHIM)] + + def getDiscoItems( + self, requestor, target, nodeIdentifier: Optional[str] = "" + ) -> List[disco.DiscoItem]: + return []