changeset 4399:fe09446a09ce

component email message: Mailing list messages cleaning: For mailing-list messages: - convert all text-only messages to XHTML for consistency. - detect and flag "noisy" content, i.e. content which are not useful and annoying in forum-like view (full-lenght quoted previous messages, mailing-list signature) rel 464
author Goffi <goffi@goffi.org>
date Thu, 11 Sep 2025 21:17:47 +0200
parents 7ef21e3e5ac9
children b591c7dff8ab
files libervia/backend/plugins/plugin_comp_email_gateway/__init__.py libervia/backend/plugins/plugin_comp_email_gateway/cleaning.py libervia/backend/plugins/plugin_misc_text_syntaxes.py
diffstat 3 files changed, 482 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py	Thu Sep 11 21:10:35 2025 +0200
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py	Thu Sep 11 21:17:47 2025 +0200
@@ -76,6 +76,7 @@
 from libervia.backend.tools.common import date_utils, regex, uri
 from libervia.backend.tools.utils import aio
 
+from .cleaning import convert_to_html_and_detect_noise
 from .imap import IMAPClientFactory
 from .models import Credentials, UserData
 
@@ -1067,8 +1068,13 @@
                 elif content_type == 'text/html':
                     content_xhtml = content_text
 
-        if content_xhtml is not None:
-            content_xhtml = self._syntax.clean_xhtml(content_xhtml)
+        if content_xhtml is None:
+            assert content is not None
+            content_xhtml = convert_to_html_and_detect_noise(content)
+        else:
+            content_xhtml = convert_to_html_and_detect_noise(content_xhtml, is_html=True)
+
+        content_xhtml = self._syntax.clean_xhtml(content_xhtml)
 
         return MbData(
             service=service,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/cleaning.py	Thu Sep 11 21:17:47 2025 +0200
@@ -0,0 +1,465 @@
+#!/usr/bin/env python3
+
+# Libervia Email Gateway Component
+# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from html import escape
+import re
+
+from lxml import etree, html
+
+
+class TextToHtmlConverter:
+    """Convert plain text to semantic HTML with proper quote and list handling."""
+
+    def __init__(self) -> None:
+        """Initialize converter with patterns."""
+        self.ordered_list_pattern = re.compile(r"^\s*(\d+[\.\)])\s+(.+)$")
+        self.unordered_list_pattern = re.compile(r"^\s*([*\-\+])\s+(.+)$")
+
+    def text_to_html(self, text: str) -> str:
+        """Convert plain text to HTML.
+
+        Plain text with formatting often seen in emails is converted to HTML to make
+        text-only email consistent with HTML ones.
+
+        @param text: Plain text to convert.
+        @return: HTML version.
+        """
+        lines = text.split("\n")
+        html_parts = []
+        current_paragraph_lines = []
+        in_ordered_list = False
+        in_unordered_list = False
+        ordered_list_items = []
+        unordered_list_items = []
+
+        i = 0
+        while i < len(lines):
+            line = lines[i]
+
+            # Match line type and handle accordingly.
+            line_type = self._classify_line(line)
+
+            # If we're in a list and encounter a non-list item, close the list.
+            if in_ordered_list and line_type != "ordered_list":
+                html_parts.append(self._create_ordered_list(ordered_list_items))
+                ordered_list_items.clear()
+                in_ordered_list = False
+
+            if in_unordered_list and line_type != "unordered_list":
+                html_parts.append(self._create_unordered_list(unordered_list_items))
+                unordered_list_items.clear()
+                in_unordered_list = False
+
+            match line_type:
+                case "empty":
+                    if current_paragraph_lines:
+                        paragraph_content = "<br />".join(
+                            escape(line) for line in current_paragraph_lines
+                        )
+                        html_parts.append(f"<p>{paragraph_content}</p>")
+                        current_paragraph_lines = []
+
+                    # Handle consecutive empty lines.
+                    if not html_parts or html_parts[-1] != "<br />":
+                        html_parts.append("<br />")
+
+                case "blockquote":
+                    if current_paragraph_lines:
+                        paragraph_content = "<br />".join(
+                            escape(line) for line in current_paragraph_lines
+                        )
+                        html_parts.append(f"<p>{paragraph_content}</p>")
+                        current_paragraph_lines = []
+
+                    quoted_lines, next_index = self._collect_quoted_lines(lines, i)
+                    html_parts.append(self._create_blockquote_html(quoted_lines))
+                    # Adjust for loop increment.
+                    i = next_index - 1
+
+                case "ordered_list":
+                    if current_paragraph_lines:
+                        paragraph_content = "<br />".join(
+                            escape(line) for line in current_paragraph_lines
+                        )
+                        html_parts.append(f"<p>{paragraph_content}</p>")
+                        current_paragraph_lines = []
+
+                    match = self.ordered_list_pattern.match(line)
+                    if match:
+                        ordered_list_items.append(match.group(2))
+                        in_ordered_list = True
+
+                case "unordered_list":
+                    if current_paragraph_lines:
+                        paragraph_content = "<br />".join(
+                            escape(line) for line in current_paragraph_lines
+                        )
+                        html_parts.append(f"<p>{paragraph_content}</p>")
+                        current_paragraph_lines = []
+
+                    match = self.unordered_list_pattern.match(line)
+                    if match:
+                        unordered_list_items.append(match.group(2))
+                        in_unordered_list = True
+
+                case "regular":
+                    if in_ordered_list:
+                        html_parts.append(self._create_ordered_list(ordered_list_items))
+                        ordered_list_items = []
+                        in_ordered_list = False
+
+                    if in_unordered_list:
+                        html_parts.append(
+                            self._create_unordered_list(unordered_list_items)
+                        )
+                        unordered_list_items = []
+                        in_unordered_list = False
+
+                    current_paragraph_lines.append(line)
+
+            i += 1
+
+        # We now handle remaining paragraphs and lists.
+        if current_paragraph_lines:
+            paragraph_content = "<br />".join(
+                escape(line) for line in current_paragraph_lines
+            )
+            html_parts.append(f"<p>{paragraph_content}</p>")
+
+        if in_ordered_list:
+            html_parts.append(self._create_ordered_list(ordered_list_items))
+
+        if in_unordered_list:
+            html_parts.append(self._create_unordered_list(unordered_list_items))
+
+        # Remove trailing <br /> tags.
+        while html_parts and html_parts[-1] == "<br />":
+            html_parts.pop()
+
+        return "\n".join(html_parts)
+
+    def _classify_line(self, line: str) -> str:
+        """Classify a line type for processing.
+
+        @param line: Line to classify.
+        @return: Line type classification.
+        """
+        stripped = line.strip()
+
+        if not stripped:
+            return "empty"
+
+        if line.lstrip().startswith(">"):
+            return "blockquote"
+
+        if self.ordered_list_pattern.match(line):
+            return "ordered_list"
+
+        if self.unordered_list_pattern.match(line):
+            return "unordered_list"
+
+        return "regular"
+
+    def _collect_quoted_lines(
+        self, lines: list[str], start_index: int
+    ) -> tuple[list[str], int]:
+        """Collect consecutive quoted lines.
+
+        @param lines: All lines.
+        @param start_index: Starting index for collection.
+        @return: Tuple of (quoted_lines, next_index).
+        """
+        quoted_lines = []
+        i = start_index
+
+        while i < len(lines) and lines[i].lstrip().startswith(">"):
+            quoted_lines.append(lines[i])
+            i += 1
+
+        return quoted_lines, i
+
+    def _create_blockquote_html(self, quoted_lines: list[str]) -> str:
+        """Create properly nested blockquote HTML.
+
+        @param quoted_lines: Lines to convert to blockquotes.
+        @return: HTML with nested blockquote elements.
+        """
+        if not quoted_lines:
+            return ""
+
+        # Parse lines to determine nesting structure.
+        parsed_lines = []
+        for line in quoted_lines:
+            level = 0
+            content = line.lstrip()
+
+            # Count and remove quote markers.
+            while content.startswith(">"):
+                level += 1
+                # Remove first '>'.
+                content = content[1:]
+                content = content.lstrip()
+
+            parsed_lines.append((level, content))
+
+        return self._build_nested_blockquotes(parsed_lines)
+
+    def _build_nested_blockquotes(self, parsed_lines: list[tuple[int, str]]) -> str:
+        """Build properly nested blockquote elements.
+
+        @param parsed_lines: List of (level, content) tuples.
+        @return: Nested blockquote HTML.
+        """
+        if not parsed_lines:
+            return ""
+
+        html_parts = []
+        current_level = 0
+
+        for level, content in parsed_lines:
+            # Close blockquotes if we're going to a lower level.
+            while current_level > level:
+                html_parts.append("</blockquote>")
+                current_level -= 1
+
+            # Open new blockquotes if we're going to a higher level.
+            while current_level < level:
+                html_parts.append("<blockquote>")
+                current_level += 1
+
+            # Add the content as a paragraph if it's not empty.
+            if content.strip():
+                # Handle line breaks within quote content.
+                escaped_content = escape(content)
+                html_parts.append(f"<p>{escaped_content}</p>")
+            else:
+                html_parts.append("<br />")
+
+        # Close remaining blockquotes.
+        while current_level > 0:
+            html_parts.append("</blockquote>")
+            current_level -= 1
+
+        return "".join(html_parts)
+
+    def _create_ordered_list(self, items: list[str]) -> str:
+        """Create an ordered list from items."""
+        if not items:
+            return ""
+        list_items = "\n".join(f"<li>{escape(item)}</li>" for item in items)
+        return f"<ol>\n{list_items}\n</ol>"
+
+    def _create_unordered_list(self, items: list[str]) -> str:
+        """Create an unordered list from items."""
+        if not items:
+            return ""
+        list_items = "\n".join(f"<li>{escape(item)}</li>" for item in items)
+        return f"<ul>\n{list_items}\n</ul>"
+
+
+class MailingListHtmlCleaner:
+    """Clean mailing list content by converting to HTML and adding semantic classes."""
+
+    def __init__(self) -> None:
+        """Initialize the cleaner."""
+        self.converter = TextToHtmlConverter()
+
+    def clean_message(self, text: str, is_html: bool = False) -> str:
+        """
+        Convert text to HTML (if needed) and clean it with noise classification.
+
+        @param text: The message content (text or HTML).
+        @param is_html: Whether the input is already HTML.
+        @return: Cleaned HTML with semantic noise classification.
+        """
+        if is_html:
+            html_content = text
+        else:
+            html_content = self.converter.text_to_html(text)
+
+        return self.clean_html(html_content)
+
+    def clean_html(self, html_content: str) -> str:
+        """Clean HTML by adding noise classification classes.
+
+        @param html: HTML to clean.
+        @return: HTML with noise classification classes added.
+        """
+        # Parse the HTML with lxml.
+        try:
+            # Try to parse as XHTML first.
+            doc = etree.fromstring(f"<div>{html_content}</div>")
+        except etree.XMLSyntaxError:
+            # If that fails, parse as HTML and convert to XHTML.
+            doc = html.fromstring(html_content)
+            # Convert to XHTML string and re-parse.
+            xhtml = html.tostring(doc, encoding="unicode", method="xml")
+            doc = etree.fromstring(f"<div>{xhtml}</div>")
+
+        # Detect and mark noise elements.
+        self._detect_and_mark_noise(doc)
+
+        # Convert back to string.
+        result = etree.tostring(doc, encoding="unicode", method="xml")
+        # Remove the wrapper div.
+        if result.startswith("<div>") and result.endswith("</div>"):
+            result = result[5:-6]
+        return result
+
+    def _detect_and_mark_noise(self, doc: etree.Element) -> None:
+        """Detect noise elements in the document and add appropriate classes.
+
+        @param doc: Parsed HTML document.
+        """
+        # Detect long blockquotes at top or bottom. Those are often the result of user.
+        # pressing the "reply" button of their client, and not removing the old message.
+        self._detect_long_blockquotes(doc)
+
+        # Detect "On XXX YYY wrote" patterns (reply context).
+        self._detect_reply_context(doc)
+
+        # Detect signatures.
+        self._detect_signatures(doc)
+
+    def _detect_long_blockquotes(self, doc: etree.Element) -> None:
+        """Detect long blockquotes at beginning or end of message."""
+        blockquotes = doc.xpath("//blockquote")
+
+        if not blockquotes:
+            return
+
+        # Check if first element is a blockquote.
+        first_element = None
+        for child in doc:
+            if child.tag in ["p", "blockquote", "div"]:
+                first_element = child
+                break
+
+        # Check if last element is a blockquote.
+        last_element = None
+        for child in reversed(list(doc)):
+            if child.tag in ["p", "blockquote", "div"]:
+                last_element = child
+                break
+
+        for blockquote in blockquotes:
+            # Count the text content length.
+            text_content = "".join(blockquote.itertext())
+            # Only mark as noise if it's a long blockquote at the beginning or end.
+            if len(text_content) > 500 and (
+                blockquote is first_element or blockquote is last_element
+            ):
+                self._add_class(blockquote, "noise-old-quote")
+
+    def _detect_reply_context(self, doc: etree.Element) -> None:
+        """Detect reply context patterns like "On XXX YYY wrote"."""
+        # Look for paragraphs that might contain reply context.
+        paragraphs = doc.xpath("//p")
+        reply_pattern = re.compile(
+            r"On\s+.+?\s+(?:wrote|said|ecrit|a écrit).*?:\s*$", re.IGNORECASE
+        )
+
+        # Check if first element is a paragraph with reply context.
+        first_element = None
+        for child in doc:
+            if child.tag in ["p", "blockquote", "div"]:
+                first_element = child
+                break
+
+        # Check if last element is a paragraph with reply context.
+        last_element = None
+        for child in reversed(list(doc)):
+            if child.tag in ["p", "blockquote", "div"]:
+                last_element = child
+                break
+
+        for p in paragraphs:
+            text = "".join(p.itertext()).strip()
+            if reply_pattern.search(text):
+                # Only mark as noise if it's at the beginning or end of the message.
+                if p is first_element or p is last_element:
+                    self._add_class(p, "noise-reply-quote")
+                # Also check for next sibling blockquote.
+                next_sibling = p.getnext()
+                if next_sibling is not None and next_sibling.tag == "blockquote":
+                    self._add_class(next_sibling, "noise-reply-quote")
+
+    def _detect_signatures(self, doc: etree.Element) -> None:
+        """Detect mailing list signatures."""
+        # Only consider elements at the end of the document as potential signatures.
+        # Find the last few elements that could be signatures.
+        potential_signature_elements = []
+        for child in reversed(list(doc)):
+            if child.tag in ["p", "div"]:
+                potential_signature_elements.append(child)
+                # Check last 8 elements max.
+                if len(potential_signature_elements) >= 8:
+                    break
+
+        # Put them back in order.
+        potential_signature_elements.reverse()
+
+        # More specific pattern for mailing list signatures (unsubscribe links, etc.).
+        mailing_list_signature_pattern = re.compile(
+            r"(?:^--\s*$)"
+            r"|(?:^__+\s*$)"
+            r"|(?:^==+\s*$)"
+            r"|(?:.*(?:"
+            r"unsubscribe"
+            r"|subscribe"
+            r"|list info"
+            r"|mailing list"
+            r"|archives?"
+            r"|digest"
+            # We must have an URL or an email after one of the previous keywords
+            r").*(?:https?:|@).+\..+" r")",
+            re.IGNORECASE,
+        )
+
+        for element in potential_signature_elements:
+            text = " ".join(element.itertext()).strip()
+
+            # Check for mailing list signature patterns.
+            if mailing_list_signature_pattern.search(text):
+                self._add_class(element, "noise-signature")
+
+    def _add_class(self, element: etree.Element, class_name: str) -> None:
+        """Add a CSS class to an element."""
+        current_class = element.get("class", "")
+        if current_class:
+            if class_name not in current_class:
+                element.set("class", f"{current_class} noise {class_name}")
+        else:
+            element.set("class", f"noise {class_name}")
+
+
+def convert_to_html_and_detect_noise(text: str, is_html: bool = False) -> str:
+    """Convert content to HTML and mark "noise" elements.
+
+    "Noise" elements are elements which make the message more difficult to read; elements
+    such as forgotten blockquoted old messages, mailing list generic signatures, etc.
+
+    Elements detected as "noise" will have the "noise" class added, plus a generic
+    "noise-*" class according to the type of noise detected (old quote, signature).
+
+    @param text: Text or HTML content to clean.
+    @param is_html: True if the content is HTML, False for plain text.
+    @return: Cleaned HTML with semantic noise classification.
+    """
+    cleaner = MailingListHtmlCleaner()
+    return cleaner.clean_message(text, is_html)
--- a/libervia/backend/plugins/plugin_misc_text_syntaxes.py	Thu Sep 11 21:10:35 2025 +0200
+++ b/libervia/backend/plugins/plugin_misc_text_syntaxes.py	Thu Sep 11 21:17:47 2025 +0200
@@ -195,16 +195,19 @@
     "vm",
     "w",
     "write",
+    # "noise" class is used by gateways (in particular email gateway) to mark annoying and
+    # not useful content.
+    "noise"
 }
 STYLES_VALUES_REGEX = (
-    r"^("
+    "^("
     + "|".join(
         [
-            "([a-z-]+)",  # alphabetical names
-            "(#[0-9a-f]+)",  # hex value
-            "(\d+(.\d+)? *(|%|em|ex|px|in|cm|mm|pt|pc))",  # values with units (or not)
-            "rgb\( *((\d+(.\d+)?), *){2}(\d+(.\d+)?) *\)",  # rgb function
-            "rgba\( *((\d+(.\d+)?), *){3}(\d+(.\d+)?) *\)",  # rgba function
+            r"([a-z-]+)",  # alphabetical names
+            r"(#[0-9a-f]+)",  # hex value
+            r"(\d+(.\d+)? *(|%|em|ex|px|in|cm|mm|pt|pc))",  # values with units (or not)
+            r"rgb\( *((\d+(.\d+)?), *){2}(\d+(.\d+)?) *\)",  # rgb function
+            r"rgba\( *((\d+(.\d+)?), *){3}(\d+(.\d+)?) *\)",  # rgba function
         ]
     )
     + ") *(!important)?$"