view libervia/backend/plugins/plugin_comp_email_gateway/cleaning.py @ 4401:ae26233b655f default tip

doc (components): Add message cleaning section to email gateway doc: fix 464
author Goffi <goffi@goffi.org>
date Thu, 11 Sep 2025 21:17:51 +0200
parents fe09446a09ce
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Email Gateway Component
# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from html import escape
import re

from lxml import etree, html


class TextToHtmlConverter:
    """Convert plain text to semantic HTML with proper quote and list handling."""

    def __init__(self) -> None:
        """Initialize converter with patterns."""
        self.ordered_list_pattern = re.compile(r"^\s*(\d+[\.\)])\s+(.+)$")
        self.unordered_list_pattern = re.compile(r"^\s*([*\-\+])\s+(.+)$")

    def text_to_html(self, text: str) -> str:
        """Convert plain text to HTML.

        Plain text with formatting often seen in emails is converted to HTML to make
        text-only email consistent with HTML ones.

        @param text: Plain text to convert.
        @return: HTML version.
        """
        lines = text.split("\n")
        html_parts = []
        current_paragraph_lines = []
        in_ordered_list = False
        in_unordered_list = False
        ordered_list_items = []
        unordered_list_items = []

        i = 0
        while i < len(lines):
            line = lines[i]

            # Match line type and handle accordingly.
            line_type = self._classify_line(line)

            # If we're in a list and encounter a non-list item, close the list.
            if in_ordered_list and line_type != "ordered_list":
                html_parts.append(self._create_ordered_list(ordered_list_items))
                ordered_list_items.clear()
                in_ordered_list = False

            if in_unordered_list and line_type != "unordered_list":
                html_parts.append(self._create_unordered_list(unordered_list_items))
                unordered_list_items.clear()
                in_unordered_list = False

            match line_type:
                case "empty":
                    if current_paragraph_lines:
                        paragraph_content = "<br />".join(
                            escape(line) for line in current_paragraph_lines
                        )
                        html_parts.append(f"<p>{paragraph_content}</p>")
                        current_paragraph_lines = []

                    # Handle consecutive empty lines.
                    if not html_parts or html_parts[-1] != "<br />":
                        html_parts.append("<br />")

                case "blockquote":
                    if current_paragraph_lines:
                        paragraph_content = "<br />".join(
                            escape(line) for line in current_paragraph_lines
                        )
                        html_parts.append(f"<p>{paragraph_content}</p>")
                        current_paragraph_lines = []

                    quoted_lines, next_index = self._collect_quoted_lines(lines, i)
                    html_parts.append(self._create_blockquote_html(quoted_lines))
                    # Adjust for loop increment.
                    i = next_index - 1

                case "ordered_list":
                    if current_paragraph_lines:
                        paragraph_content = "<br />".join(
                            escape(line) for line in current_paragraph_lines
                        )
                        html_parts.append(f"<p>{paragraph_content}</p>")
                        current_paragraph_lines = []

                    match = self.ordered_list_pattern.match(line)
                    if match:
                        ordered_list_items.append(match.group(2))
                        in_ordered_list = True

                case "unordered_list":
                    if current_paragraph_lines:
                        paragraph_content = "<br />".join(
                            escape(line) for line in current_paragraph_lines
                        )
                        html_parts.append(f"<p>{paragraph_content}</p>")
                        current_paragraph_lines = []

                    match = self.unordered_list_pattern.match(line)
                    if match:
                        unordered_list_items.append(match.group(2))
                        in_unordered_list = True

                case "regular":
                    if in_ordered_list:
                        html_parts.append(self._create_ordered_list(ordered_list_items))
                        ordered_list_items = []
                        in_ordered_list = False

                    if in_unordered_list:
                        html_parts.append(
                            self._create_unordered_list(unordered_list_items)
                        )
                        unordered_list_items = []
                        in_unordered_list = False

                    current_paragraph_lines.append(line)

            i += 1

        # We now handle remaining paragraphs and lists.
        if current_paragraph_lines:
            paragraph_content = "<br />".join(
                escape(line) for line in current_paragraph_lines
            )
            html_parts.append(f"<p>{paragraph_content}</p>")

        if in_ordered_list:
            html_parts.append(self._create_ordered_list(ordered_list_items))

        if in_unordered_list:
            html_parts.append(self._create_unordered_list(unordered_list_items))

        # Remove trailing <br /> tags.
        while html_parts and html_parts[-1] == "<br />":
            html_parts.pop()

        return "\n".join(html_parts)

    def _classify_line(self, line: str) -> str:
        """Classify a line type for processing.

        @param line: Line to classify.
        @return: Line type classification.
        """
        stripped = line.strip()

        if not stripped:
            return "empty"

        if line.lstrip().startswith(">"):
            return "blockquote"

        if self.ordered_list_pattern.match(line):
            return "ordered_list"

        if self.unordered_list_pattern.match(line):
            return "unordered_list"

        return "regular"

    def _collect_quoted_lines(
        self, lines: list[str], start_index: int
    ) -> tuple[list[str], int]:
        """Collect consecutive quoted lines.

        @param lines: All lines.
        @param start_index: Starting index for collection.
        @return: Tuple of (quoted_lines, next_index).
        """
        quoted_lines = []
        i = start_index

        while i < len(lines) and lines[i].lstrip().startswith(">"):
            quoted_lines.append(lines[i])
            i += 1

        return quoted_lines, i

    def _create_blockquote_html(self, quoted_lines: list[str]) -> str:
        """Create properly nested blockquote HTML.

        @param quoted_lines: Lines to convert to blockquotes.
        @return: HTML with nested blockquote elements.
        """
        if not quoted_lines:
            return ""

        # Parse lines to determine nesting structure.
        parsed_lines = []
        for line in quoted_lines:
            level = 0
            content = line.lstrip()

            # Count and remove quote markers.
            while content.startswith(">"):
                level += 1
                # Remove first '>'.
                content = content[1:]
                content = content.lstrip()

            parsed_lines.append((level, content))

        return self._build_nested_blockquotes(parsed_lines)

    def _build_nested_blockquotes(self, parsed_lines: list[tuple[int, str]]) -> str:
        """Build properly nested blockquote elements.

        @param parsed_lines: List of (level, content) tuples.
        @return: Nested blockquote HTML.
        """
        if not parsed_lines:
            return ""

        html_parts = []
        current_level = 0

        for level, content in parsed_lines:
            # Close blockquotes if we're going to a lower level.
            while current_level > level:
                html_parts.append("</blockquote>")
                current_level -= 1

            # Open new blockquotes if we're going to a higher level.
            while current_level < level:
                html_parts.append("<blockquote>")
                current_level += 1

            # Add the content as a paragraph if it's not empty.
            if content.strip():
                # Handle line breaks within quote content.
                escaped_content = escape(content)
                html_parts.append(f"<p>{escaped_content}</p>")
            else:
                html_parts.append("<br />")

        # Close remaining blockquotes.
        while current_level > 0:
            html_parts.append("</blockquote>")
            current_level -= 1

        return "".join(html_parts)

    def _create_ordered_list(self, items: list[str]) -> str:
        """Create an ordered list from items."""
        if not items:
            return ""
        list_items = "\n".join(f"<li>{escape(item)}</li>" for item in items)
        return f"<ol>\n{list_items}\n</ol>"

    def _create_unordered_list(self, items: list[str]) -> str:
        """Create an unordered list from items."""
        if not items:
            return ""
        list_items = "\n".join(f"<li>{escape(item)}</li>" for item in items)
        return f"<ul>\n{list_items}\n</ul>"


class MailingListHtmlCleaner:
    """Clean mailing list content by converting to HTML and adding semantic classes."""

    def __init__(self) -> None:
        """Initialize the cleaner."""
        self.converter = TextToHtmlConverter()

    def clean_message(self, text: str, is_html: bool = False) -> str:
        """
        Convert text to HTML (if needed) and clean it with noise classification.

        @param text: The message content (text or HTML).
        @param is_html: Whether the input is already HTML.
        @return: Cleaned HTML with semantic noise classification.
        """
        if is_html:
            html_content = text
        else:
            html_content = self.converter.text_to_html(text)

        return self.clean_html(html_content)

    def clean_html(self, html_content: str) -> str:
        """Clean HTML by adding noise classification classes.

        @param html: HTML to clean.
        @return: HTML with noise classification classes added.
        """
        # Parse the HTML with lxml.
        try:
            # Try to parse as XHTML first.
            doc = etree.fromstring(f"<div>{html_content}</div>")
        except etree.XMLSyntaxError:
            # If that fails, parse as HTML and convert to XHTML.
            doc = html.fromstring(html_content)
            # Convert to XHTML string and re-parse.
            xhtml = html.tostring(doc, encoding="unicode", method="xml")
            doc = etree.fromstring(f"<div>{xhtml}</div>")

        # Detect and mark noise elements.
        self._detect_and_mark_noise(doc)

        # Convert back to string.
        result = etree.tostring(doc, encoding="unicode", method="xml")
        # Remove the wrapper div.
        if result.startswith("<div>") and result.endswith("</div>"):
            result = result[5:-6]
        return result

    def _detect_and_mark_noise(self, doc: etree.Element) -> None:
        """Detect noise elements in the document and add appropriate classes.

        @param doc: Parsed HTML document.
        """
        # Detect long blockquotes at top or bottom. Those are often the result of user.
        # pressing the "reply" button of their client, and not removing the old message.
        self._detect_long_blockquotes(doc)

        # Detect "On XXX YYY wrote" patterns (reply context).
        self._detect_reply_context(doc)

        # Detect signatures.
        self._detect_signatures(doc)

    def _detect_long_blockquotes(self, doc: etree.Element) -> None:
        """Detect long blockquotes at beginning or end of message."""
        blockquotes = doc.xpath("//blockquote")

        if not blockquotes:
            return

        # Check if first element is a blockquote.
        first_element = None
        for child in doc:
            if child.tag in ["p", "blockquote", "div"]:
                first_element = child
                break

        # Check if last element is a blockquote.
        last_element = None
        for child in reversed(list(doc)):
            if child.tag in ["p", "blockquote", "div"]:
                last_element = child
                break

        for blockquote in blockquotes:
            # Count the text content length.
            text_content = "".join(blockquote.itertext())
            # Only mark as noise if it's a long blockquote at the beginning or end.
            if len(text_content) > 500 and (
                blockquote is first_element or blockquote is last_element
            ):
                self._add_class(blockquote, "noise-old-quote")

    def _detect_reply_context(self, doc: etree.Element) -> None:
        """Detect reply context patterns like "On XXX YYY wrote"."""
        # Look for paragraphs that might contain reply context.
        paragraphs = doc.xpath("//p")
        reply_pattern = re.compile(
            r"On\s+.+?\s+(?:wrote|said|ecrit|a écrit).*?:\s*$", re.IGNORECASE
        )

        # Check if first element is a paragraph with reply context.
        first_element = None
        for child in doc:
            if child.tag in ["p", "blockquote", "div"]:
                first_element = child
                break

        # Check if last element is a paragraph with reply context.
        last_element = None
        for child in reversed(list(doc)):
            if child.tag in ["p", "blockquote", "div"]:
                last_element = child
                break

        for p in paragraphs:
            text = "".join(p.itertext()).strip()
            if reply_pattern.search(text):
                # Only mark as noise if it's at the beginning or end of the message.
                if p is first_element or p is last_element:
                    self._add_class(p, "noise-reply-quote")
                # Also check for next sibling blockquote.
                next_sibling = p.getnext()
                if next_sibling is not None and next_sibling.tag == "blockquote":
                    self._add_class(next_sibling, "noise-reply-quote")

    def _detect_signatures(self, doc: etree.Element) -> None:
        """Detect mailing list signatures."""
        # Only consider elements at the end of the document as potential signatures.
        # Find the last few elements that could be signatures.
        potential_signature_elements = []
        for child in reversed(list(doc)):
            if child.tag in ["p", "div"]:
                potential_signature_elements.append(child)
                # Check last 8 elements max.
                if len(potential_signature_elements) >= 8:
                    break

        # Put them back in order.
        potential_signature_elements.reverse()

        # More specific pattern for mailing list signatures (unsubscribe links, etc.).
        mailing_list_signature_pattern = re.compile(
            r"(?:^--\s*$)"
            r"|(?:^__+\s*$)"
            r"|(?:^==+\s*$)"
            r"|(?:.*(?:"
            r"unsubscribe"
            r"|subscribe"
            r"|list info"
            r"|mailing list"
            r"|archives?"
            r"|digest"
            # We must have an URL or an email after one of the previous keywords
            r").*(?:https?:|@).+\..+" r")",
            re.IGNORECASE,
        )

        for element in potential_signature_elements:
            text = " ".join(element.itertext()).strip()

            # Check for mailing list signature patterns.
            if mailing_list_signature_pattern.search(text):
                self._add_class(element, "noise-signature")

    def _add_class(self, element: etree.Element, class_name: str) -> None:
        """Add a CSS class to an element."""
        current_class = element.get("class", "")
        if current_class:
            if class_name not in current_class:
                element.set("class", f"{current_class} noise {class_name}")
        else:
            element.set("class", f"noise {class_name}")


def convert_to_html_and_detect_noise(text: str, is_html: bool = False) -> str:
    """Convert content to HTML and mark "noise" elements.

    "Noise" elements are elements which make the message more difficult to read; elements
    such as forgotten blockquoted old messages, mailing list generic signatures, etc.

    Elements detected as "noise" will have the "noise" class added, plus a generic
    "noise-*" class according to the type of noise detected (old quote, signature).

    @param text: Text or HTML content to clean.
    @param is_html: True if the content is HTML, False for plain text.
    @return: Cleaned HTML with semantic noise classification.
    """
    cleaner = MailingListHtmlCleaner()
    return cleaner.clean_message(text, is_html)