Mercurial > libervia-backend
changeset 4399:fe09446a09ce
component email message: Mailing list messages cleaning:
For mailing-list messages:
- convert all text-only messages to XHTML for consistency.
- detect and flag "noisy" content, i.e. content which are not useful and annoying in
forum-like view (full-lenght quoted previous messages, mailing-list signature)
rel 464
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 11 Sep 2025 21:17:47 +0200 |
parents | 7ef21e3e5ac9 |
children | b591c7dff8ab |
files | libervia/backend/plugins/plugin_comp_email_gateway/__init__.py libervia/backend/plugins/plugin_comp_email_gateway/cleaning.py libervia/backend/plugins/plugin_misc_text_syntaxes.py |
diffstat | 3 files changed, 482 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py Thu Sep 11 21:10:35 2025 +0200 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py Thu Sep 11 21:17:47 2025 +0200 @@ -76,6 +76,7 @@ from libervia.backend.tools.common import date_utils, regex, uri from libervia.backend.tools.utils import aio +from .cleaning import convert_to_html_and_detect_noise from .imap import IMAPClientFactory from .models import Credentials, UserData @@ -1067,8 +1068,13 @@ elif content_type == 'text/html': content_xhtml = content_text - if content_xhtml is not None: - content_xhtml = self._syntax.clean_xhtml(content_xhtml) + if content_xhtml is None: + assert content is not None + content_xhtml = convert_to_html_and_detect_noise(content) + else: + content_xhtml = convert_to_html_and_detect_noise(content_xhtml, is_html=True) + + content_xhtml = self._syntax.clean_xhtml(content_xhtml) return MbData( service=service,
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/cleaning.py Thu Sep 11 21:17:47 2025 +0200 @@ -0,0 +1,465 @@ +#!/usr/bin/env python3 + +# Libervia Email Gateway Component +# Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from html import escape +import re + +from lxml import etree, html + + +class TextToHtmlConverter: + """Convert plain text to semantic HTML with proper quote and list handling.""" + + def __init__(self) -> None: + """Initialize converter with patterns.""" + self.ordered_list_pattern = re.compile(r"^\s*(\d+[\.\)])\s+(.+)$") + self.unordered_list_pattern = re.compile(r"^\s*([*\-\+])\s+(.+)$") + + def text_to_html(self, text: str) -> str: + """Convert plain text to HTML. + + Plain text with formatting often seen in emails is converted to HTML to make + text-only email consistent with HTML ones. + + @param text: Plain text to convert. + @return: HTML version. + """ + lines = text.split("\n") + html_parts = [] + current_paragraph_lines = [] + in_ordered_list = False + in_unordered_list = False + ordered_list_items = [] + unordered_list_items = [] + + i = 0 + while i < len(lines): + line = lines[i] + + # Match line type and handle accordingly. + line_type = self._classify_line(line) + + # If we're in a list and encounter a non-list item, close the list. + if in_ordered_list and line_type != "ordered_list": + html_parts.append(self._create_ordered_list(ordered_list_items)) + ordered_list_items.clear() + in_ordered_list = False + + if in_unordered_list and line_type != "unordered_list": + html_parts.append(self._create_unordered_list(unordered_list_items)) + unordered_list_items.clear() + in_unordered_list = False + + match line_type: + case "empty": + if current_paragraph_lines: + paragraph_content = "<br />".join( + escape(line) for line in current_paragraph_lines + ) + html_parts.append(f"<p>{paragraph_content}</p>") + current_paragraph_lines = [] + + # Handle consecutive empty lines. + if not html_parts or html_parts[-1] != "<br />": + html_parts.append("<br />") + + case "blockquote": + if current_paragraph_lines: + paragraph_content = "<br />".join( + escape(line) for line in current_paragraph_lines + ) + html_parts.append(f"<p>{paragraph_content}</p>") + current_paragraph_lines = [] + + quoted_lines, next_index = self._collect_quoted_lines(lines, i) + html_parts.append(self._create_blockquote_html(quoted_lines)) + # Adjust for loop increment. + i = next_index - 1 + + case "ordered_list": + if current_paragraph_lines: + paragraph_content = "<br />".join( + escape(line) for line in current_paragraph_lines + ) + html_parts.append(f"<p>{paragraph_content}</p>") + current_paragraph_lines = [] + + match = self.ordered_list_pattern.match(line) + if match: + ordered_list_items.append(match.group(2)) + in_ordered_list = True + + case "unordered_list": + if current_paragraph_lines: + paragraph_content = "<br />".join( + escape(line) for line in current_paragraph_lines + ) + html_parts.append(f"<p>{paragraph_content}</p>") + current_paragraph_lines = [] + + match = self.unordered_list_pattern.match(line) + if match: + unordered_list_items.append(match.group(2)) + in_unordered_list = True + + case "regular": + if in_ordered_list: + html_parts.append(self._create_ordered_list(ordered_list_items)) + ordered_list_items = [] + in_ordered_list = False + + if in_unordered_list: + html_parts.append( + self._create_unordered_list(unordered_list_items) + ) + unordered_list_items = [] + in_unordered_list = False + + current_paragraph_lines.append(line) + + i += 1 + + # We now handle remaining paragraphs and lists. + if current_paragraph_lines: + paragraph_content = "<br />".join( + escape(line) for line in current_paragraph_lines + ) + html_parts.append(f"<p>{paragraph_content}</p>") + + if in_ordered_list: + html_parts.append(self._create_ordered_list(ordered_list_items)) + + if in_unordered_list: + html_parts.append(self._create_unordered_list(unordered_list_items)) + + # Remove trailing <br /> tags. + while html_parts and html_parts[-1] == "<br />": + html_parts.pop() + + return "\n".join(html_parts) + + def _classify_line(self, line: str) -> str: + """Classify a line type for processing. + + @param line: Line to classify. + @return: Line type classification. + """ + stripped = line.strip() + + if not stripped: + return "empty" + + if line.lstrip().startswith(">"): + return "blockquote" + + if self.ordered_list_pattern.match(line): + return "ordered_list" + + if self.unordered_list_pattern.match(line): + return "unordered_list" + + return "regular" + + def _collect_quoted_lines( + self, lines: list[str], start_index: int + ) -> tuple[list[str], int]: + """Collect consecutive quoted lines. + + @param lines: All lines. + @param start_index: Starting index for collection. + @return: Tuple of (quoted_lines, next_index). + """ + quoted_lines = [] + i = start_index + + while i < len(lines) and lines[i].lstrip().startswith(">"): + quoted_lines.append(lines[i]) + i += 1 + + return quoted_lines, i + + def _create_blockquote_html(self, quoted_lines: list[str]) -> str: + """Create properly nested blockquote HTML. + + @param quoted_lines: Lines to convert to blockquotes. + @return: HTML with nested blockquote elements. + """ + if not quoted_lines: + return "" + + # Parse lines to determine nesting structure. + parsed_lines = [] + for line in quoted_lines: + level = 0 + content = line.lstrip() + + # Count and remove quote markers. + while content.startswith(">"): + level += 1 + # Remove first '>'. + content = content[1:] + content = content.lstrip() + + parsed_lines.append((level, content)) + + return self._build_nested_blockquotes(parsed_lines) + + def _build_nested_blockquotes(self, parsed_lines: list[tuple[int, str]]) -> str: + """Build properly nested blockquote elements. + + @param parsed_lines: List of (level, content) tuples. + @return: Nested blockquote HTML. + """ + if not parsed_lines: + return "" + + html_parts = [] + current_level = 0 + + for level, content in parsed_lines: + # Close blockquotes if we're going to a lower level. + while current_level > level: + html_parts.append("</blockquote>") + current_level -= 1 + + # Open new blockquotes if we're going to a higher level. + while current_level < level: + html_parts.append("<blockquote>") + current_level += 1 + + # Add the content as a paragraph if it's not empty. + if content.strip(): + # Handle line breaks within quote content. + escaped_content = escape(content) + html_parts.append(f"<p>{escaped_content}</p>") + else: + html_parts.append("<br />") + + # Close remaining blockquotes. + while current_level > 0: + html_parts.append("</blockquote>") + current_level -= 1 + + return "".join(html_parts) + + def _create_ordered_list(self, items: list[str]) -> str: + """Create an ordered list from items.""" + if not items: + return "" + list_items = "\n".join(f"<li>{escape(item)}</li>" for item in items) + return f"<ol>\n{list_items}\n</ol>" + + def _create_unordered_list(self, items: list[str]) -> str: + """Create an unordered list from items.""" + if not items: + return "" + list_items = "\n".join(f"<li>{escape(item)}</li>" for item in items) + return f"<ul>\n{list_items}\n</ul>" + + +class MailingListHtmlCleaner: + """Clean mailing list content by converting to HTML and adding semantic classes.""" + + def __init__(self) -> None: + """Initialize the cleaner.""" + self.converter = TextToHtmlConverter() + + def clean_message(self, text: str, is_html: bool = False) -> str: + """ + Convert text to HTML (if needed) and clean it with noise classification. + + @param text: The message content (text or HTML). + @param is_html: Whether the input is already HTML. + @return: Cleaned HTML with semantic noise classification. + """ + if is_html: + html_content = text + else: + html_content = self.converter.text_to_html(text) + + return self.clean_html(html_content) + + def clean_html(self, html_content: str) -> str: + """Clean HTML by adding noise classification classes. + + @param html: HTML to clean. + @return: HTML with noise classification classes added. + """ + # Parse the HTML with lxml. + try: + # Try to parse as XHTML first. + doc = etree.fromstring(f"<div>{html_content}</div>") + except etree.XMLSyntaxError: + # If that fails, parse as HTML and convert to XHTML. + doc = html.fromstring(html_content) + # Convert to XHTML string and re-parse. + xhtml = html.tostring(doc, encoding="unicode", method="xml") + doc = etree.fromstring(f"<div>{xhtml}</div>") + + # Detect and mark noise elements. + self._detect_and_mark_noise(doc) + + # Convert back to string. + result = etree.tostring(doc, encoding="unicode", method="xml") + # Remove the wrapper div. + if result.startswith("<div>") and result.endswith("</div>"): + result = result[5:-6] + return result + + def _detect_and_mark_noise(self, doc: etree.Element) -> None: + """Detect noise elements in the document and add appropriate classes. + + @param doc: Parsed HTML document. + """ + # Detect long blockquotes at top or bottom. Those are often the result of user. + # pressing the "reply" button of their client, and not removing the old message. + self._detect_long_blockquotes(doc) + + # Detect "On XXX YYY wrote" patterns (reply context). + self._detect_reply_context(doc) + + # Detect signatures. + self._detect_signatures(doc) + + def _detect_long_blockquotes(self, doc: etree.Element) -> None: + """Detect long blockquotes at beginning or end of message.""" + blockquotes = doc.xpath("//blockquote") + + if not blockquotes: + return + + # Check if first element is a blockquote. + first_element = None + for child in doc: + if child.tag in ["p", "blockquote", "div"]: + first_element = child + break + + # Check if last element is a blockquote. + last_element = None + for child in reversed(list(doc)): + if child.tag in ["p", "blockquote", "div"]: + last_element = child + break + + for blockquote in blockquotes: + # Count the text content length. + text_content = "".join(blockquote.itertext()) + # Only mark as noise if it's a long blockquote at the beginning or end. + if len(text_content) > 500 and ( + blockquote is first_element or blockquote is last_element + ): + self._add_class(blockquote, "noise-old-quote") + + def _detect_reply_context(self, doc: etree.Element) -> None: + """Detect reply context patterns like "On XXX YYY wrote".""" + # Look for paragraphs that might contain reply context. + paragraphs = doc.xpath("//p") + reply_pattern = re.compile( + r"On\s+.+?\s+(?:wrote|said|ecrit|a écrit).*?:\s*$", re.IGNORECASE + ) + + # Check if first element is a paragraph with reply context. + first_element = None + for child in doc: + if child.tag in ["p", "blockquote", "div"]: + first_element = child + break + + # Check if last element is a paragraph with reply context. + last_element = None + for child in reversed(list(doc)): + if child.tag in ["p", "blockquote", "div"]: + last_element = child + break + + for p in paragraphs: + text = "".join(p.itertext()).strip() + if reply_pattern.search(text): + # Only mark as noise if it's at the beginning or end of the message. + if p is first_element or p is last_element: + self._add_class(p, "noise-reply-quote") + # Also check for next sibling blockquote. + next_sibling = p.getnext() + if next_sibling is not None and next_sibling.tag == "blockquote": + self._add_class(next_sibling, "noise-reply-quote") + + def _detect_signatures(self, doc: etree.Element) -> None: + """Detect mailing list signatures.""" + # Only consider elements at the end of the document as potential signatures. + # Find the last few elements that could be signatures. + potential_signature_elements = [] + for child in reversed(list(doc)): + if child.tag in ["p", "div"]: + potential_signature_elements.append(child) + # Check last 8 elements max. + if len(potential_signature_elements) >= 8: + break + + # Put them back in order. + potential_signature_elements.reverse() + + # More specific pattern for mailing list signatures (unsubscribe links, etc.). + mailing_list_signature_pattern = re.compile( + r"(?:^--\s*$)" + r"|(?:^__+\s*$)" + r"|(?:^==+\s*$)" + r"|(?:.*(?:" + r"unsubscribe" + r"|subscribe" + r"|list info" + r"|mailing list" + r"|archives?" + r"|digest" + # We must have an URL or an email after one of the previous keywords + r").*(?:https?:|@).+\..+" r")", + re.IGNORECASE, + ) + + for element in potential_signature_elements: + text = " ".join(element.itertext()).strip() + + # Check for mailing list signature patterns. + if mailing_list_signature_pattern.search(text): + self._add_class(element, "noise-signature") + + def _add_class(self, element: etree.Element, class_name: str) -> None: + """Add a CSS class to an element.""" + current_class = element.get("class", "") + if current_class: + if class_name not in current_class: + element.set("class", f"{current_class} noise {class_name}") + else: + element.set("class", f"noise {class_name}") + + +def convert_to_html_and_detect_noise(text: str, is_html: bool = False) -> str: + """Convert content to HTML and mark "noise" elements. + + "Noise" elements are elements which make the message more difficult to read; elements + such as forgotten blockquoted old messages, mailing list generic signatures, etc. + + Elements detected as "noise" will have the "noise" class added, plus a generic + "noise-*" class according to the type of noise detected (old quote, signature). + + @param text: Text or HTML content to clean. + @param is_html: True if the content is HTML, False for plain text. + @return: Cleaned HTML with semantic noise classification. + """ + cleaner = MailingListHtmlCleaner() + return cleaner.clean_message(text, is_html)
--- a/libervia/backend/plugins/plugin_misc_text_syntaxes.py Thu Sep 11 21:10:35 2025 +0200 +++ b/libervia/backend/plugins/plugin_misc_text_syntaxes.py Thu Sep 11 21:17:47 2025 +0200 @@ -195,16 +195,19 @@ "vm", "w", "write", + # "noise" class is used by gateways (in particular email gateway) to mark annoying and + # not useful content. + "noise" } STYLES_VALUES_REGEX = ( - r"^(" + "^(" + "|".join( [ - "([a-z-]+)", # alphabetical names - "(#[0-9a-f]+)", # hex value - "(\d+(.\d+)? *(|%|em|ex|px|in|cm|mm|pt|pc))", # values with units (or not) - "rgb\( *((\d+(.\d+)?), *){2}(\d+(.\d+)?) *\)", # rgb function - "rgba\( *((\d+(.\d+)?), *){3}(\d+(.\d+)?) *\)", # rgba function + r"([a-z-]+)", # alphabetical names + r"(#[0-9a-f]+)", # hex value + r"(\d+(.\d+)? *(|%|em|ex|px|in|cm|mm|pt|pc))", # values with units (or not) + r"rgb\( *((\d+(.\d+)?), *){2}(\d+(.\d+)?) *\)", # rgb function + r"rgba\( *((\d+(.\d+)?), *){3}(\d+(.\d+)?) *\)", # rgba function ] ) + ") *(!important)?$"