Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0131.py @ 4337:95792a1f26c7
component email gateway: attachments handling:
attachments are now stored, and metadata are created in database.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:23 +0100 |
parents | 6a70fcd93a7a |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin Stanza Headers and Internet Metadata (XEP-0131) # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from email.utils import quote, unquote from enum import StrEnum, auto import re from typing import Iterator, List, Literal, Optional, Self from pydantic import BaseModel, ConfigDict, Field, RootModel from twisted.internet import defer from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.models.core import MessageData log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Stanza Headers and Internet Metadata Plugin", C.PI_IMPORT_NAME: "XEP-0131", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0131"], C.PI_DEPENDENCIES: [], C.PI_MAIN: "XEP_0131", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _( "Enables the inclusion of non-addressing header information in XMPP stanzas." ), } NS_SHIM = "http://jabber.org/protocol/shim" # Regex to match quoted and non-quoted values. RE_QUOTED_VALUES = re.compile( r""" # Match quoted phrases " (?: # Match any escaped character \\. # Match any character that is not a double quote or a backslash |[^"\\] )* " | # Match non-quoted phrases (?:[^,]+) """, re.VERBOSE, ) class Urgency(StrEnum): low = auto() medium = auto() high = auto() class Priority(StrEnum): non_urgent = "non-urgent" normal = auto() urgent = auto() emergency = auto() class HeadersData(BaseModel): model_config = ConfigDict(extra="allow") __pydantic_extra__: dict[str, str] = Field(init=False) # type: ignore keywords: str | None = None urgency: Urgency | None = None priority: Priority | None = None def items(self): return self.__pydantic_extra__.items() def to_element(self) -> domish.Element: """Build the <headers> element from this instance's data.""" headers_elt = domish.Element((NS_SHIM, "headers")) header_names = list(self.model_fields.keys()) + list( self.__pydantic_extra__.keys() ) for name in header_names: value = getattr(self, name) if value is None: continue header_elt = headers_elt.addElement("header") header_elt["name"] = name header_elt.addContent(value) return headers_elt @classmethod def from_element(cls, headers_elt: domish.Element) -> Self: """Create a HeadersData instance from a <headers> element.""" if headers_elt.uri != NS_SHIM or headers_elt.name != "headers": child_headers_elt = next(headers_elt.elements(NS_SHIM, "headers"), None) if child_headers_elt is None: raise exceptions.NotFound("<headers> element not found") else: headers_elt = child_headers_elt headers = {} for header_elt in headers_elt.elements(NS_SHIM, "header"): name = header_elt.getAttribute("name") value = str(header_elt) headers[name] = value return cls(**headers) class Keywords(RootModel): root: list[str] def __iter__(self) -> Iterator[str]: # type: ignore return iter(self.root) def __getitem__(self, item) -> str: return self.root[item] def __len__(self) -> int: return len(self.root) class XEP_0131: """Implementation for XEP-0131""" def __init__(self, host): log.info(_("Stanza Headers and Internet Metadata plugin initialization")) self.host = host host.register_namespace("shim", NS_SHIM) host.trigger.add("sendMessage", self.send_message_trigger) host.trigger.add("sendMessageComponent", self.send_message_trigger) host.trigger.add("message_received", self.message_received_trigger) def quote_value(self, value: str) -> str: """Quote a value if it contain special characters @param value: Value to quote if necessary. @return: Quoted value. """ if any(c in value for c in r" ,\""): value = f'"{quote(value)}"' return value def unquote_values(self, raw_header: str) -> list[str]: """Unquote raw list of values header. This is raw header for potentially quoted values separated by commas, like in the "keywords" header. @param raw_keywords_header: Raw Keywords header. @return: A list of unquoted strings. """ unquoted_values = [] for match in RE_QUOTED_VALUES.finditer(raw_header): value = match.group(0).strip() # Unquote the keyword if needed. if value.startswith('"') and value.endswith('"'): value = unquote(value) value = value.strip() if value: unquoted_values.append(value) return unquoted_values def move_keywords_to_headers(self, extra: dict) -> None: """Check if keywords are present in extra, and move them to headers. The list of keywords will be converted to a header value and set in the right location. @param extra: MessageData's ``extra`` field. Will be modified in place by creating/updating the ``headers`` field. """ # Keywords can be in a list of strings in extra's "keywords" field. if "keywords" in extra: keywords = Keywords(extra["keywords"]) if keywords: headers = extra.setdefault("headers", {}) quoted_kw = ",".join(self.quote_value(kw) for kw in keywords) existing_kw = headers.get("keywords") if existing_kw: # We have also keywords in headers, we merge both. quoted_kw = f"{existing_kw},{quoted_kw}" headers["keywords"] = quoted_kw def send_message_trigger( self, client, mess_data, pre_xml_treatments, post_xml_treatments ) -> Literal[True]: """Process the XEP-0131 related data to be sent""" def add_headers(mess_data: MessageData) -> MessageData: extra = mess_data["extra"] self.move_keywords_to_headers(extra) # Now we parse headers, if any. if "headers" in extra: headers_data = HeadersData(**extra["headers"]) message_elt = mess_data["xml"] message_elt.addChild(headers_data.to_element()) return mess_data post_xml_treatments.addCallback(add_headers) return True def message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred, ) -> Literal[True]: """Parse headers information and add them to message data.""" try: headers = HeadersData.from_element(message_elt) except exceptions.NotFound: pass else: def post_treat_addr(mess_data: MessageData): """Add the headers metadata to the message data""" if headers.keywords: # We move keywords to a list of string in extra's "keywords" field. mess_data["extra"]["keywords"] = self.unquote_values(headers.keywords) headers.keywords = None mess_data["extra"]["headers"] = headers.model_dump( mode="json", exclude_none=True ) return mess_data post_treat.addCallback(post_treat_addr) return True def get_handler(self, client): return XEP_0131_handler() @implementer(disco.IDisco) class XEP_0131_handler(XMPPHandler): def getDiscoInfo( self, requestor, target, nodeIdentifier: Optional[str] = "" ) -> List[disco.DiscoFeature]: return [disco.DiscoFeature(NS_SHIM)] def getDiscoItems( self, requestor, target, nodeIdentifier: Optional[str] = "" ) -> List[disco.DiscoItem]: return []