Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_email_gateway/cleaning.py @ 4401:ae26233b655f default tip
doc (components): Add message cleaning section to email gateway doc:
fix 464
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 11 Sep 2025 21:17:51 +0200 |
parents | fe09446a09ce |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia Email Gateway Component # Copyright (C) 2009-2025 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from html import escape import re from lxml import etree, html class TextToHtmlConverter: """Convert plain text to semantic HTML with proper quote and list handling.""" def __init__(self) -> None: """Initialize converter with patterns.""" self.ordered_list_pattern = re.compile(r"^\s*(\d+[\.\)])\s+(.+)$") self.unordered_list_pattern = re.compile(r"^\s*([*\-\+])\s+(.+)$") def text_to_html(self, text: str) -> str: """Convert plain text to HTML. Plain text with formatting often seen in emails is converted to HTML to make text-only email consistent with HTML ones. @param text: Plain text to convert. @return: HTML version. """ lines = text.split("\n") html_parts = [] current_paragraph_lines = [] in_ordered_list = False in_unordered_list = False ordered_list_items = [] unordered_list_items = [] i = 0 while i < len(lines): line = lines[i] # Match line type and handle accordingly. line_type = self._classify_line(line) # If we're in a list and encounter a non-list item, close the list. if in_ordered_list and line_type != "ordered_list": html_parts.append(self._create_ordered_list(ordered_list_items)) ordered_list_items.clear() in_ordered_list = False if in_unordered_list and line_type != "unordered_list": html_parts.append(self._create_unordered_list(unordered_list_items)) unordered_list_items.clear() in_unordered_list = False match line_type: case "empty": if current_paragraph_lines: paragraph_content = "<br />".join( escape(line) for line in current_paragraph_lines ) html_parts.append(f"<p>{paragraph_content}</p>") current_paragraph_lines = [] # Handle consecutive empty lines. if not html_parts or html_parts[-1] != "<br />": html_parts.append("<br />") case "blockquote": if current_paragraph_lines: paragraph_content = "<br />".join( escape(line) for line in current_paragraph_lines ) html_parts.append(f"<p>{paragraph_content}</p>") current_paragraph_lines = [] quoted_lines, next_index = self._collect_quoted_lines(lines, i) html_parts.append(self._create_blockquote_html(quoted_lines)) # Adjust for loop increment. i = next_index - 1 case "ordered_list": if current_paragraph_lines: paragraph_content = "<br />".join( escape(line) for line in current_paragraph_lines ) html_parts.append(f"<p>{paragraph_content}</p>") current_paragraph_lines = [] match = self.ordered_list_pattern.match(line) if match: ordered_list_items.append(match.group(2)) in_ordered_list = True case "unordered_list": if current_paragraph_lines: paragraph_content = "<br />".join( escape(line) for line in current_paragraph_lines ) html_parts.append(f"<p>{paragraph_content}</p>") current_paragraph_lines = [] match = self.unordered_list_pattern.match(line) if match: unordered_list_items.append(match.group(2)) in_unordered_list = True case "regular": if in_ordered_list: html_parts.append(self._create_ordered_list(ordered_list_items)) ordered_list_items = [] in_ordered_list = False if in_unordered_list: html_parts.append( self._create_unordered_list(unordered_list_items) ) unordered_list_items = [] in_unordered_list = False current_paragraph_lines.append(line) i += 1 # We now handle remaining paragraphs and lists. if current_paragraph_lines: paragraph_content = "<br />".join( escape(line) for line in current_paragraph_lines ) html_parts.append(f"<p>{paragraph_content}</p>") if in_ordered_list: html_parts.append(self._create_ordered_list(ordered_list_items)) if in_unordered_list: html_parts.append(self._create_unordered_list(unordered_list_items)) # Remove trailing <br /> tags. while html_parts and html_parts[-1] == "<br />": html_parts.pop() return "\n".join(html_parts) def _classify_line(self, line: str) -> str: """Classify a line type for processing. @param line: Line to classify. @return: Line type classification. """ stripped = line.strip() if not stripped: return "empty" if line.lstrip().startswith(">"): return "blockquote" if self.ordered_list_pattern.match(line): return "ordered_list" if self.unordered_list_pattern.match(line): return "unordered_list" return "regular" def _collect_quoted_lines( self, lines: list[str], start_index: int ) -> tuple[list[str], int]: """Collect consecutive quoted lines. @param lines: All lines. @param start_index: Starting index for collection. @return: Tuple of (quoted_lines, next_index). """ quoted_lines = [] i = start_index while i < len(lines) and lines[i].lstrip().startswith(">"): quoted_lines.append(lines[i]) i += 1 return quoted_lines, i def _create_blockquote_html(self, quoted_lines: list[str]) -> str: """Create properly nested blockquote HTML. @param quoted_lines: Lines to convert to blockquotes. @return: HTML with nested blockquote elements. """ if not quoted_lines: return "" # Parse lines to determine nesting structure. parsed_lines = [] for line in quoted_lines: level = 0 content = line.lstrip() # Count and remove quote markers. while content.startswith(">"): level += 1 # Remove first '>'. content = content[1:] content = content.lstrip() parsed_lines.append((level, content)) return self._build_nested_blockquotes(parsed_lines) def _build_nested_blockquotes(self, parsed_lines: list[tuple[int, str]]) -> str: """Build properly nested blockquote elements. @param parsed_lines: List of (level, content) tuples. @return: Nested blockquote HTML. """ if not parsed_lines: return "" html_parts = [] current_level = 0 for level, content in parsed_lines: # Close blockquotes if we're going to a lower level. while current_level > level: html_parts.append("</blockquote>") current_level -= 1 # Open new blockquotes if we're going to a higher level. while current_level < level: html_parts.append("<blockquote>") current_level += 1 # Add the content as a paragraph if it's not empty. if content.strip(): # Handle line breaks within quote content. escaped_content = escape(content) html_parts.append(f"<p>{escaped_content}</p>") else: html_parts.append("<br />") # Close remaining blockquotes. while current_level > 0: html_parts.append("</blockquote>") current_level -= 1 return "".join(html_parts) def _create_ordered_list(self, items: list[str]) -> str: """Create an ordered list from items.""" if not items: return "" list_items = "\n".join(f"<li>{escape(item)}</li>" for item in items) return f"<ol>\n{list_items}\n</ol>" def _create_unordered_list(self, items: list[str]) -> str: """Create an unordered list from items.""" if not items: return "" list_items = "\n".join(f"<li>{escape(item)}</li>" for item in items) return f"<ul>\n{list_items}\n</ul>" class MailingListHtmlCleaner: """Clean mailing list content by converting to HTML and adding semantic classes.""" def __init__(self) -> None: """Initialize the cleaner.""" self.converter = TextToHtmlConverter() def clean_message(self, text: str, is_html: bool = False) -> str: """ Convert text to HTML (if needed) and clean it with noise classification. @param text: The message content (text or HTML). @param is_html: Whether the input is already HTML. @return: Cleaned HTML with semantic noise classification. """ if is_html: html_content = text else: html_content = self.converter.text_to_html(text) return self.clean_html(html_content) def clean_html(self, html_content: str) -> str: """Clean HTML by adding noise classification classes. @param html: HTML to clean. @return: HTML with noise classification classes added. """ # Parse the HTML with lxml. try: # Try to parse as XHTML first. doc = etree.fromstring(f"<div>{html_content}</div>") except etree.XMLSyntaxError: # If that fails, parse as HTML and convert to XHTML. doc = html.fromstring(html_content) # Convert to XHTML string and re-parse. xhtml = html.tostring(doc, encoding="unicode", method="xml") doc = etree.fromstring(f"<div>{xhtml}</div>") # Detect and mark noise elements. self._detect_and_mark_noise(doc) # Convert back to string. result = etree.tostring(doc, encoding="unicode", method="xml") # Remove the wrapper div. if result.startswith("<div>") and result.endswith("</div>"): result = result[5:-6] return result def _detect_and_mark_noise(self, doc: etree.Element) -> None: """Detect noise elements in the document and add appropriate classes. @param doc: Parsed HTML document. """ # Detect long blockquotes at top or bottom. Those are often the result of user. # pressing the "reply" button of their client, and not removing the old message. self._detect_long_blockquotes(doc) # Detect "On XXX YYY wrote" patterns (reply context). self._detect_reply_context(doc) # Detect signatures. self._detect_signatures(doc) def _detect_long_blockquotes(self, doc: etree.Element) -> None: """Detect long blockquotes at beginning or end of message.""" blockquotes = doc.xpath("//blockquote") if not blockquotes: return # Check if first element is a blockquote. first_element = None for child in doc: if child.tag in ["p", "blockquote", "div"]: first_element = child break # Check if last element is a blockquote. last_element = None for child in reversed(list(doc)): if child.tag in ["p", "blockquote", "div"]: last_element = child break for blockquote in blockquotes: # Count the text content length. text_content = "".join(blockquote.itertext()) # Only mark as noise if it's a long blockquote at the beginning or end. if len(text_content) > 500 and ( blockquote is first_element or blockquote is last_element ): self._add_class(blockquote, "noise-old-quote") def _detect_reply_context(self, doc: etree.Element) -> None: """Detect reply context patterns like "On XXX YYY wrote".""" # Look for paragraphs that might contain reply context. paragraphs = doc.xpath("//p") reply_pattern = re.compile( r"On\s+.+?\s+(?:wrote|said|ecrit|a écrit).*?:\s*$", re.IGNORECASE ) # Check if first element is a paragraph with reply context. first_element = None for child in doc: if child.tag in ["p", "blockquote", "div"]: first_element = child break # Check if last element is a paragraph with reply context. last_element = None for child in reversed(list(doc)): if child.tag in ["p", "blockquote", "div"]: last_element = child break for p in paragraphs: text = "".join(p.itertext()).strip() if reply_pattern.search(text): # Only mark as noise if it's at the beginning or end of the message. if p is first_element or p is last_element: self._add_class(p, "noise-reply-quote") # Also check for next sibling blockquote. next_sibling = p.getnext() if next_sibling is not None and next_sibling.tag == "blockquote": self._add_class(next_sibling, "noise-reply-quote") def _detect_signatures(self, doc: etree.Element) -> None: """Detect mailing list signatures.""" # Only consider elements at the end of the document as potential signatures. # Find the last few elements that could be signatures. potential_signature_elements = [] for child in reversed(list(doc)): if child.tag in ["p", "div"]: potential_signature_elements.append(child) # Check last 8 elements max. if len(potential_signature_elements) >= 8: break # Put them back in order. potential_signature_elements.reverse() # More specific pattern for mailing list signatures (unsubscribe links, etc.). mailing_list_signature_pattern = re.compile( r"(?:^--\s*$)" r"|(?:^__+\s*$)" r"|(?:^==+\s*$)" r"|(?:.*(?:" r"unsubscribe" r"|subscribe" r"|list info" r"|mailing list" r"|archives?" r"|digest" # We must have an URL or an email after one of the previous keywords r").*(?:https?:|@).+\..+" r")", re.IGNORECASE, ) for element in potential_signature_elements: text = " ".join(element.itertext()).strip() # Check for mailing list signature patterns. if mailing_list_signature_pattern.search(text): self._add_class(element, "noise-signature") def _add_class(self, element: etree.Element, class_name: str) -> None: """Add a CSS class to an element.""" current_class = element.get("class", "") if current_class: if class_name not in current_class: element.set("class", f"{current_class} noise {class_name}") else: element.set("class", f"noise {class_name}") def convert_to_html_and_detect_noise(text: str, is_html: bool = False) -> str: """Convert content to HTML and mark "noise" elements. "Noise" elements are elements which make the message more difficult to read; elements such as forgotten blockquoted old messages, mailing list generic signatures, etc. Elements detected as "noise" will have the "noise" class added, plus a generic "noise-*" class according to the type of noise detected (old quote, signature). @param text: Text or HTML content to clean. @param is_html: True if the content is HTML, False for plain text. @return: Cleaned HTML with semantic noise classification. """ cleaner = MailingListHtmlCleaner() return cleaner.clean_message(text, is_html)