Mercurial > libervia-backend
changeset 4386:c055042c01e3
component Email gateway: Convert mailing list to pubsub nodes:
- `Credentials` is now a Pydantic based model.
- Mailing list related emails are now detected hand saved as pubsub items, using XEP-0277
blog items, and creating suitable nodes. The ID of the mailing list is used for root
node.
- Mailing list items are currently restricted to recipient associated JID.
- Words in square bracket in title `[Like][That]` are removed and converted to blog
categories.
- Method to convert between MbData and email (in both directions) have been created.
rel 462
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 03 Aug 2025 23:45:48 +0200 |
parents | a1ac33fe6b97 |
children | a6270030968d |
files | libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py libervia/backend/plugins/plugin_comp_email_gateway/__init__.py libervia/backend/plugins/plugin_comp_email_gateway/imap.py libervia/backend/plugins/plugin_comp_email_gateway/models.py libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py |
diffstat | 6 files changed, 540 insertions(+), 66 deletions(-) [+] |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py Sun Aug 03 23:45:45 2025 +0200 +++ b/libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py Sun Aug 03 23:45:48 2025 +0200 @@ -33,6 +33,7 @@ Set, Tuple, Union, + cast, overload, ) from urllib import parse @@ -59,6 +60,7 @@ from libervia.backend.core.log import getLogger from libervia.backend.memory import persistent from libervia.backend.memory.sqla_mapping import History, SubscriptionState +from libervia.backend.plugins.plugin_xep_0277 import XEP_0277 from libervia.backend.tools import utils from libervia.backend.tools.common import data_format, tls, uri from libervia.backend.tools.common.async_utils import async_lru @@ -157,7 +159,7 @@ self._p = host.plugins["XEP-0060"] self._a = host.plugins["XEP-0084"] self._e = host.plugins["XEP-0106"] - self._m = host.plugins["XEP-0277"] + self._m = cast(XEP_0277, host.plugins["XEP-0277"]) self._v = host.plugins["XEP-0292"] self._refs = host.plugins["XEP-0372"] self._r = host.plugins["XEP-0424"] @@ -526,6 +528,7 @@ for now, only handle XMPP items to convert to AP """ + assert self.client is not None url_type, url_args = self.parse_apurl(url) if url_type == TYPE_ITEM: try: @@ -2228,6 +2231,7 @@ @param public: if True, the activity will be addressed to public namespace @return: actor_id of the entity deleting the item, activity to send """ + assert self.client is not None if node is None: node = self._m.namespace @@ -2763,7 +2767,7 @@ create_kwargs={"subscribed": True}, ) else: - # it is a root item (i.e. not a reply to an other item) + # it is a root item (i.e. not a reply to another item) create = node == self._events.namespace cached_node = await self.host.memory.storage.get_pubsub_node( client, service, node, with_subscriptions=True, create=create
--- a/libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py Sun Aug 03 23:45:45 2025 +0200 +++ b/libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py Sun Aug 03 23:45:48 2025 +0200 @@ -16,7 +16,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import Optional, Tuple, List, Dict, Any, Union +from typing import Optional, Tuple, List, Dict, Any from urllib.parse import urlparse from pathlib import Path from base64 import b64encode @@ -25,7 +25,7 @@ from twisted.internet import defer, threads from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish -from wokkel import rsm, pubsub, disco +from wokkel import rsm, pubsub from libervia.backend.core.i18n import _ from libervia.backend.core import exceptions @@ -265,7 +265,7 @@ avatar_data = await self.get_avatar_data(client, requestor_actor_id, ap_account) return self.apg._a.build_item_metadata_elt(avatar_data) - def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None: + def _blocking_b64encode_avatar(self, avatar_data: Dict[str, Any]) -> None: with avatar_data["path"].open("rb") as f: avatar_data["base64"] = b64encode(f.read()).decode() @@ -287,7 +287,7 @@ ) if "base64" not in avatar_data: await threads.deferToThread( - self._blocking_b_6_4_encode_avatar, avatar_data + self._blocking_b64encode_avatar, avatar_data ) else: if len(itemIdentifiers) > 1: @@ -300,7 +300,7 @@ if cache_data is None: raise error.StanzaError("item-not-found") avatar_data = {"cache_uid": item_id, "path": cache_data["path"]} - await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data) + await threads.deferToThread(self._blocking_b64encode_avatar, avatar_data) return self.apg._a.build_item_data_elt(avatar_data)
--- a/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py Sun Aug 03 23:45:45 2025 +0200 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py Sun Aug 03 23:45:48 2025 +0200 @@ -23,7 +23,7 @@ from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -from email.utils import formataddr, getaddresses, parseaddr +from email.utils import formataddr, formatdate, getaddresses, make_msgid, parseaddr from functools import partial import hashlib from pathlib import Path @@ -44,6 +44,7 @@ from wokkel import data_form, disco, iwokkel from zope.interface import implementer +from libervia.backend import G from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPComponent, SatXMPPEntity @@ -51,12 +52,13 @@ from libervia.backend.core.log import getLogger from libervia.backend.memory.persistent import LazyPersistentBinaryDict from libervia.backend.memory.sqla import select -from libervia.backend.memory.sqla_mapping import PrivateIndBin +from libervia.backend.memory.sqla_mapping import AccessModel, Affiliation, PrivateIndBin, PublishModel, PubsubAffiliation from libervia.backend.models.core import MessageData from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import ( EmailGWPubsubService, ) from libervia.backend.plugins.plugin_exp_gre import GRE, GetDataHandler +from libervia.backend.plugins.plugin_misc_text_syntaxes import TextSyntaxes from libervia.backend.plugins.plugin_sec_gre_encrypter_openpgp import NS_GRE_OPENPGP from libervia.backend.plugins.plugin_sec_gre_formatter_mime import NS_GRE_MIME from libervia.backend.plugins.plugin_xep_0033 import ( @@ -67,9 +69,10 @@ from libervia.backend.plugins.plugin_xep_0077 import XEP_0077 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency, XEP_0131 +from libervia.backend.plugins.plugin_xep_0277 import Comment, MbData, XEP_0277 from libervia.backend.plugins.plugin_xep_0373 import binary_to_ascii_armor from libervia.backend.plugins.plugin_xep_0498 import XEP_0498 -from libervia.backend.tools.common import regex +from libervia.backend.tools.common import date_utils, regex, uri from libervia.backend.tools.utils import aio from .imap import IMAPClientFactory @@ -91,7 +94,8 @@ C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ - "XEP-0033", "XEP-0077", "XEP-0106", "XEP-0498", "GRE", "GRE-MIME", "GRE-OpenPGP" + "XEP-0033", "XEP-0077", "XEP-0106", "XEP-0277", "XEP-0498", "GRE", "GRE-MIME", + "GRE-OpenPGP", "PUBSUB_CACHE", "TEXT_SYNTAXES" ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "EmailGatewayComponent", @@ -140,8 +144,10 @@ ) self._e = cast(XEP_0106, host.plugins["XEP-0106"]) self._shim = cast(XEP_0131, host.plugins["XEP-0131"]) + self._mb = cast(XEP_0277, host.plugins["XEP-0277"]) self._pfs = cast(XEP_0498, host.plugins["XEP-0498"]) self._gre = cast(GRE, host.plugins["GRE"]) + self._syntax = cast(TextSyntaxes, host.plugins["TEXT_SYNTAXES"]) # TODO: For the moment, all credentials are kept in cache; we should only keep the # X latest. self.users_data: dict[jid.JID, UserData] = {} @@ -173,7 +179,7 @@ ) result = await session.execute(query) return { - jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value + jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): Credentials(**p.value) for p in result.scalars() } @@ -182,7 +188,7 @@ registered_data = await self.get_registered_users() for user_jid, credentials in registered_data.items(): user_data = self.users_data[user_jid] = UserData(credentials=credentials) - if not credentials["imap_success"]: + if not credentials.imap_success: log.warning( f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user " "won't receive message from this gateway." @@ -340,7 +346,7 @@ return mess_data def jid_to_email( - self, client: SatXMPPEntity, address_jid: jid.JID, credentials: dict[str, str] + self, client: SatXMPPEntity, address_jid: jid.JID, credentials: Credentials ) -> str: """Convert a JID to an email address. @@ -354,7 +360,7 @@ if address_jid and address_jid.host.endswith(str(client.jid)): return self._e.unescape(address_jid.user) else: - email_address = credentials["user_email"] + email_address = credentials.user_email if address_jid: email_address = formataddr((f"xmpp:{address_jid}", email_address)) return email_address @@ -404,7 +410,7 @@ assert isinstance(body, bytes) credentials = await self.get_credentials(from_jid) - sender_domain = credentials["user_email"].split("@", 1)[-1] + sender_domain = credentials.user_email.split("@", 1)[-1] recipients = [] if to_email is not None: recipients.append(to_email.encode()) @@ -422,14 +428,14 @@ raise exceptions.InternalError("No recipient found.") await smtp.sendmail( - credentials["smtp_host"].encode(), - credentials["user_email"].encode(), + credentials.smtp_host.encode(), + credentials.user_email.encode(), recipients, body, senderDomainName=sender_domain, - port=int(credentials["smtp_port"]), - username=credentials["smtp_username"].encode(), - password=credentials["smtp_password"].encode(), + port=int(credentials.smtp_port), + username=credentials.smtp_username.encode(), + password=credentials.smtp_password.encode(), requireAuthentication=True, # TODO: only STARTTLS is supported right now, implicit TLS should be supported # too. @@ -468,16 +474,16 @@ if isinstance(body, bytes): assert to_email is not None - sender_domain = credentials["user_email"].split("@", 1)[-1] + sender_domain = credentials.user_email.split("@", 1)[-1] await smtp.sendmail( - credentials["smtp_host"].encode(), - credentials["user_email"].encode(), + credentials.smtp_host.encode(), + credentials.user_email.encode(), [to_email.encode()], body, senderDomainName=sender_domain, - port=int(credentials["smtp_port"]), - username=credentials["smtp_username"].encode(), - password=credentials["smtp_password"].encode(), + port=int(credentials.smtp_port), + username=credentials.smtp_username.encode(), + password=credentials.smtp_password.encode(), requireAuthentication=True, # TODO: only STARTTLS is supported right now, implicit TLS should be supported # too. @@ -489,7 +495,7 @@ if subject is not None: msg["Subject"] = subject msg["From"] = formataddr( - (credentials["user_name"] or None, credentials["user_email"]) + (credentials.user_name or None, credentials.user_email) ) if extra.addresses: assert extra.addresses.to @@ -527,7 +533,7 @@ assert to_email is not None msg["To"] = to_email - sender_domain = credentials["user_email"].split("@", 1)[-1] + sender_domain = credentials.user_email.split("@", 1)[-1] if extra.headers: if extra.headers.keywords: @@ -543,14 +549,14 @@ msg["Autocrypt"] = extra.headers.autocrypt await smtp.sendmail( - credentials["smtp_host"].encode(), - credentials["user_email"].encode(), + credentials.smtp_host.encode(), + credentials.user_email.encode(), [to_email.encode()], msg.as_bytes(), senderDomainName=sender_domain, - port=int(credentials["smtp_port"]), - username=credentials["smtp_username"].encode(), - password=credentials["smtp_password"].encode(), + port=int(credentials.smtp_port), + username=credentials.smtp_username.encode(), + password=credentials.smtp_password.encode(), requireAuthentication=True, # TODO: only STARTTLS is supported right now, implicit TLS should be supported # too. @@ -798,10 +804,10 @@ @param email: Parsed email. """ assert self.client is not None - user_email = user_data.credentials["user_email"] - name, email_addr = parseaddr(email["from"]) - email_addr = email_addr.lower() - from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None)) + user_email = user_data.credentials.user_email + author_name, author_email = parseaddr(email["from"]) + author_email = author_email.lower() + from_jid = jid.JID(None, (self._e.escape(author_email), self.client.jid.host, None)) # Get the email body body_mime = email.get_body(("plain",)) @@ -854,9 +860,9 @@ ] # Set noreply flag - # The is no flag to indicate a no-reply message, so we check common user parts in - # from and reply-to headers. - from_addresses = [email_addr] + # There is no flag to indicate a no-reply message, so we check common user parts + # in from and reply-to headers. + from_addresses = [author_email] if reply_to_addresses: from_addresses.extend( addr for a in reply_to_addresses if (addr := parseaddr(a)[1]) @@ -927,12 +933,382 @@ await self.handle_attachment(part, user_jid) client = self.client.get_virtual_client(from_jid) + # Now that the message is parsed, we check if it's a mailing list. + list_ids = email.get_all("list-id") + if list_ids: + try: + await self.handle_mailing_list( + client, + user_data, + author_name, + author_email, + list_ids[-1], + email, + user_jid, + body, + subject, + extra + ) + except exceptions.DataError as e: + log.warning(f"Can't parse mailing list email: {e}.") + except Exception: + log.exception("Can't parse mailing list email.") + else: + await client.sendMessage( + user_jid, + {"": body}, + {"": subject} if subject else None, + extra=extra, + ) + def parse_references(self, email: EmailMessage) -> list[str]: + """Extract message IDs from the "References" header. - await client.sendMessage( - user_jid, - {"": body}, - {"": subject} if subject else None, - extra=extra, + @param email: The parsed email message. + @returns: Message IDs + """ + references = [] + for header in email.get_all('references', []): + for token in header.split(): + _, address = parseaddr(token) + if address: + references.append(address) + return references + + def email_to_mb_data( + self, + email: EmailMessage, + service: jid.JID, + node: str, + item_id: str, + author_name: str, + author_email: str + ) -> MbData: + """Convert an email to blog data. + + @param email: email to convert. + @return: Blog data. + """ + tags = set() + + # Title + title = email.get('subject', '') + def strip_tags(m): + value = m.group()[1:-1].strip().lower() + if value: + tags.add(value) + return "" + title = re.sub(r"\[.+?\]", strip_tags, title).strip() + + # Dates + date_header = email.get('Date') + if date_header: + try: + published = date_utils.date_parse(date_header) + except ValueError: + published = None + else: + published = None + + # Body + content = None + content_xhtml = None + + if email.is_multipart(): + # We'll collect possible plain and html content parts + plain_parts = [] + html_parts = [] + + for part in email.walk(): + content_type = part.get_content_type() + content_disposition = part.get('Content-Disposition', '').lower() + + # Skip attachments + if 'attachment' in content_disposition: + continue + + payload = part.get_payload(decode=True) + if payload is None: + continue + payload = cast(bytes, payload) + + charset = part.get_content_charset() or 'utf-8' + try: + payload_text = payload.decode(charset, errors='replace') + except (LookupError, TypeError): + payload_text = payload.decode('utf-8', errors='replace') + + if content_type == 'text/plain': + plain_parts.append(payload_text) + elif content_type == 'text/html': + html_parts.append(payload_text) + + # Prefer first valid parts for safety + if plain_parts: + content = plain_parts[0] + if html_parts: + content_xhtml = html_parts[0] + else: + # Single part email + payload = email.get_payload(decode=True) + if payload is not None: + payload = cast(bytes, payload) + charset = email.get_content_charset() or 'utf-8' + + try: + content_text = payload.decode(charset, errors='replace') + except (LookupError, TypeError): + content_text = payload.decode('utf-8', errors='replace') + + content_type = email.get_content_type() + if content_type == 'text/plain': + content = content_text + elif content_type == 'text/html': + content_xhtml = content_text + + if content_xhtml is not None: + content_xhtml = self._syntax.clean_xhtml(content_xhtml) + + return MbData( + service=service, + node=node, + id=item_id, + published=published, + title=title, + content=content, + content_xhtml=content_xhtml, + author=author_name, + author_email=author_email, + tags=list(tags) + ) + + async def mb_data_to_email( + self, + credentials: Credentials, + mb_data: MbData + ) -> EmailMessage: + """Convert blog data to an email message. + + @param mb_data: Blog data to convert. + @return: Email message. + """ + email = EmailMessage() + + # Title + title_parts = [] + tags = [] + + for tag in mb_data.tags: + if stripped_tag := tag.strip(): + title_parts.append(f"[{stripped_tag}]") + tags.append(stripped_tag) + + if mb_data.title: + title_parts.append(mb_data.title) + + email['Subject'] = ' '.join(title_parts) + + if tags: + email['Keywords'] = ', '.join(tags) + + # From + # FIXME: Check email according to sender. + if mb_data.author and mb_data.author_email: + email['From'] = f"{mb_data.author} <{mb_data.author_email}>" + elif mb_data.author: + email['From'] = f"{mb_data.author} <{credentials.user_email}>" + elif mb_data.author_email: + email['From'] = mb_data.author_email + else: + email['From'] = credentials.user_email + + # Message ID + if mb_data.id: + email['Message-ID'] = f"<{mb_data.id}>" + else: + msg_id = make_msgid() + email['Message-ID'] = msg_id + mb_data.id = msg_id[1:-1] + + # In-Reply-To + if mb_data.in_reply_tos: + # Use the first reply-to reference + in_reply_to = mb_data.in_reply_tos[0] + if in_reply_to.ref: + email['In-Reply-To'] = f"<{in_reply_to.ref}>" + + # Dates + date_set = False + if mb_data.published is not None: + try: + email['Date'] = formatdate(mb_data.published, localtime=True) + date_set = True + except Exception: + pass + + if not date_set and mb_data.updated is not None: + try: + email['Date'] = formatdate(mb_data.updated, localtime=True) + except Exception: + pass + + # Content + if mb_data.content_xhtml: + # XHTML content + xhtml_content = mb_data.content_xhtml + if not mb_data.content: + mb_data.content = await self._syntax.convert( + xhtml_content, + self._syntax.SYNTAX_XHTML, + self._syntax.SYNTAX_TEXT, + False, + ) + + email.set_content(mb_data.content) + email.add_alternative(xhtml_content, subtype='html') + elif mb_data.content: + email.set_content(mb_data.content) + else: + email.set_content("") + + return email + + async def handle_mailing_list( + self, + client: SatXMPPEntity, + user_data: UserData, + author_name: str, + author_email: str, + list_id_header: str, + email: EmailMessage, + user_jid: jid.JID, + body: str, + subject: str|None, + extra: dict + ) -> None: + """Handle emails from mailing lists. + + Mailing list emails are converted to pubsub blogs. + """ + assert self.client is not None + pubsub_service = self.client.jid + message_id_list = email.get_all("message-id") + if not message_id_list: + raise exceptions.DataError("Missing message ID.") + message_id = message_id_list[-1].strip() + if message_id.startswith("<") and message_id.endswith(">"): + message_id = message_id[1:-1] + if not message_id: + raise exceptions.DataError("Emtpy message ID.") + + list_name, list_id = parseaddr(list_id_header, strict=False) + list_name = list_name.strip() + list_id = list_id.strip() + if not list_id: + raise exceptions.DataError( + f"Mailing list ID is empty, we can't parse id: {list_id_header=}." + ) + root_node = await G.storage.get_pubsub_node( + client, + pubsub_service, + list_id, + with_subscriptions=True, + create=True, + create_kwargs={ + "access_model": AccessModel.whitelist, + "publish_model": PublishModel.publishers, + "affiliations": [PubsubAffiliation( + entity = user_jid, + affiliation = Affiliation.owner + )], + "subscribed": True + }, + ) + assert root_node is not None + + in_reply_to_value = ''.join(email.get_all("in-reply-to", [])) + __, in_reply_to = parseaddr(in_reply_to_value) + references = self.parse_references(email) + if in_reply_to: + if not references: + log.warning( + '"References" header should not be empty when "In-Reply-To" is set.' + ) + references = [in_reply_to] + else: + if references[-1] != in_reply_to: + log.warning('Last ID in "References" should be "In-Reply-To".') + references.append(in_reply_to) + if references: + # We check that the top message of the thread has a corresponding item. + top_item_id = references[0] + parent_node_name = self._mb.get_comments_node(top_item_id) + parent_node_uri = uri.build_xmpp_uri( + "pubsub", + path=root_node.service.full(), + node=parent_node_name, + ) + top_items = await G.storage.get_items(root_node, item_ids=[top_item_id]) + if not top_items: + # The top item is missing, we make an empty one. + empty_item_data = MbData( + service=pubsub_service, + node=root_node.name, + id=top_item_id, + title="missing item", + content="missing item", + author_jid=pubsub_service, + comments=[ + Comment( + uri=parent_node_uri, + service=pubsub_service, + node=parent_node_name + ) + ] + ) + await G.storage.cache_pubsub_items( + client, + root_node, + [await empty_item_data.to_element(client)] + ) + parent_node = await G.storage.get_pubsub_node( + client, + pubsub_service, + parent_node_name, + with_subscriptions=True, + create=True, + create_kwargs={ + "access_model": AccessModel.whitelist, + "publish_model": PublishModel.publishers, + "subscribed": True, + "affiliations": [PubsubAffiliation( + entity = user_jid, + affiliation = Affiliation.owner + )], + }, + ) + else: + parent_node = root_node + parent_node_name = cast(str, root_node.name) + + mb_data = self.email_to_mb_data( + email, + pubsub_service, + parent_node_name, + message_id, + author_name, + author_email + ) + mb_data.comments.append( + Comment( + service = pubsub_service, + node = self._mb.get_comments_node(message_id), + ) + ) + await G.storage.cache_pubsub_items( + client, + parent_node, + [await mb_data.to_element(client)], + [mb_data.model_dump(mode="json")] ) async def handle_attachment(self, part: EmailMessage, recipient_jid: jid.JID) -> None: @@ -1009,7 +1385,7 @@ connected, ) reactor.connectTCP( - credentials["imap_host"], int(credentials["imap_port"]), factory + credentials.imap_host, int(credentials.imap_port), factory ) await connected @@ -1051,7 +1427,9 @@ return True self.validate_imap_smtp_form(submit_form) - credentials = {key: field.value for key, field in submit_form.fields.items()} + credentials = Credentials(**{ + key: field.value for key, field in submit_form.fields.items() + }) user_data = self.users_data.get(from_jid) if user_data is None: # The user is not in cache, we cache current credentials. @@ -1064,12 +1442,12 @@ await self.connect_imap(from_jid, user_data) except Exception as e: log.warning(f"Can't connect to IMAP server for {from_jid}") - credentials["imap_success"] = False + credentials.imap_success = False await self.storage.aset(key, credentials) raise e else: log.debug(f"Connection successful to IMAP server for {from_jid}") - credentials["imap_success"] = True + credentials.imap_success = True await self.storage.aset(key, credentials) return True @@ -1082,7 +1460,7 @@ ) -> None: from_jid = jid.JID(iq_elt["from"]).userhostJID() credentials = await self.get_credentials(from_jid) - form.addField(data_form.Field(var="sender_id", value=credentials["user_email"])) + form.addField(data_form.Field(var="sender_id", value=credentials.user_email))
--- a/libervia/backend/plugins/plugin_comp_email_gateway/imap.py Sun Aug 03 23:45:45 2025 +0200 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/imap.py Sun Aug 03 23:45:48 2025 +0200 @@ -225,8 +225,8 @@ """ credentials = user_data.credentials self.user_data = user_data - self.username = credentials["imap_username"] - self.password = credentials["imap_password"] + self.username = credentials.imap_username + self.password = credentials.imap_password self.on_new_email = on_new_email self._connected = connected
--- a/libervia/backend/plugins/plugin_comp_email_gateway/models.py Sun Aug 03 23:45:45 2025 +0200 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/models.py Sun Aug 03 23:45:48 2025 +0200 @@ -17,11 +17,24 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. from dataclasses import dataclass -from typing import Any +from pydantic import BaseModel, ConfigDict from twisted.mail import imap4 -Credentials = dict[str, Any] +class Credentials(BaseModel): + model_config = ConfigDict(extra='forbid') + user_name: str + user_email: str + smtp_host: str + smtp_port: int + smtp_username: str + smtp_password: str + imap_success: bool = False + imap_host: str + imap_port: int + imap_username: str + imap_password: str + @dataclass
--- a/libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py Sun Aug 03 23:45:45 2025 +0200 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py Sun Aug 03 23:45:48 2025 +0200 @@ -16,18 +16,22 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast + from twisted.internet import defer -from twisted.words.protocols.jabber import jid, error +from twisted.words.protocols.jabber import error, jid from twisted.words.xish import domish from wokkel import data_form, disco, pubsub, rsm +from libervia.backend import G +from libervia.backend.core import exceptions +from libervia.backend.core.core_types import SatXMPPComponent from libervia.backend.core.i18n import _ -from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger +from libervia.backend.memory.sqla_mapping import AccessModel, Affiliation +from libervia.backend.plugins.plugin_pubsub_cache import PubsubCache from libervia.backend.plugins.plugin_xep_0498 import NodeData from libervia.backend.tools.utils import ensure_deferred - if TYPE_CHECKING: from . import EmailGatewayComponent @@ -57,31 +61,106 @@ self.host = self.gateway.host self.service = service self._pfs = service._pfs + self._ps_cache = cast(PubsubCache, G.host.plugins["PUBSUB_CACHE"]) super().__init__() + @property + def client(self) -> SatXMPPComponent: + client = self.gateway.client + assert client is not None + return client + def getNodes( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str ) -> defer.Deferred[list[str]]: - return defer.succeed([self._pfs.namespace]) + return defer.ensureDeferred(self.get_nodes(requestor, service, nodeIdentifier)) + + async def get_nodes( + self, + requestor: jid.JID, + service: jid.JID, + node_id: str + ) -> list[str]: + nodes = await G.storage.get_pubsub_nodes(self.client, self.client.jid) + return [self._pfs.namespace] + [cast(str, node.name) for node in nodes] @ensure_deferred async def items( self, request: rsm.PubSubRequest, ) -> tuple[list[domish.Element], rsm.RSMResponse | None]: - client = self.gateway.client - assert client is not None - sender = request.sender.userhostJID() - if not client.is_local(sender): + client = self.client + requestor_jid = request.sender.userhostJID() + if not client.is_local(requestor_jid): raise error.StanzaError("forbidden") if request.nodeIdentifier != self._pfs.namespace: - return [], None + return await self.items_from_mailing_list(request, requestor_jid) - files = await self.host.memory.get_files(client, sender) + files = await self.host.memory.get_files(client, requestor_jid) node_data = NodeData.from_files_data(client.jid, files) return node_data.to_elements(), None + async def items_from_mailing_list( + self, + request: rsm.PubSubRequest, + requestor_jid: jid.JID + ) -> tuple[list[domish.Element], rsm.RSMResponse|None]: + """Handle items coming from mailing lists. + + @param request: Pubsub request. + @param requestor_jid: Bare jid of the requestor. + @return: Items matching request, if allowed, and RSM response. + @raise error.StanzaError: One of: + - ``item-not-found`` if no corresponding node or item if found + - ``forbidden`` if the requestor does not have sufficient privileges + """ + node = request.nodeIdentifier + node = await G.storage.get_pubsub_node( + self.client, + self.client.jid, + node, + with_affiliations=True + ) + if node is None: + raise error.StanzaError("item-not-found") + + match str(node.access_model): + case AccessModel.open: + pass + case AccessModel.whitelist: + for affiliation in node.affiliations: + if ( + affiliation.entity == requestor_jid + and affiliation.affiliation in { + Affiliation.owner, Affiliation.publisher, Affiliation.member + } + ): + break + else: + raise error.StanzaError("forbidden") + case _: + raise exceptions.InternalError( + f"Unmanaged access model: {node.access_model}" + ) + + pubsub_items, metadata = await self._ps_cache.get_items_from_cache( + self.client, + node, + request.maxItems, + request.itemIdentifiers, + request.subscriptionIdentifier, + request.rsm + ) + if rsm_data := metadata.get("rsm"): + rsm_response = rsm.RSMResponse(**rsm_data) + else: + rsm_response = None + return ( + [cast(domish.Element, ps_item.data) for ps_item in pubsub_items], + rsm_response + ) + @ensure_deferred async def retract(self, request: rsm.PubSubRequest) -> None: client = self.gateway.client