view libervia/backend/plugins/plugin_xep_0372.py @ 4303:a7ec325246fb

component email-gateway: first draft: Initial implementation of the Email Gateway. This component uses XEP-0100 for registration. Upon registration and subsequent startups, a connection is made to registered IMAP services, and incoming emails (in `INBOX` mailboxes) are immediately forwarded as XMPP messages. In the opposite direction, an SMTP connection is established to send emails on incoming XMPP messages. rel 449
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 18:07:17 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for XEP-0372
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Dict, Union
from textwrap import dedent
from libervia.backend.core import exceptions
from libervia.backend.tools.common import uri as xmpp_uri

from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from zope.interface import implementer
from wokkel import disco, iwokkel

from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.tools.common import data_format


log = getLogger(__name__)

PLUGIN_INFO = {
    C.PI_NAME: "References",
    C.PI_IMPORT_NAME: "XEP-0372",
    C.PI_TYPE: C.PLUG_TYPE_XEP,
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0372"],
    C.PI_DEPENDENCIES: ["XEP-0334"],
    C.PI_MAIN: "XEP_0372",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(
        dedent(
            """\
        XEP-0372 (References) implementation

        This plugin implement generic references and mentions.
    """
        )
    ),
}

NS_REFS = "urn:xmpp:reference:0"
ALLOWED_TYPES = ("mention", "data")


class XEP_0372:
    namespace = NS_REFS

    def __init__(self, host):
        log.info(_("References plugin initialization"))
        host.register_namespace("refs", NS_REFS)
        self.host = host
        self._h = host.plugins["XEP-0334"]
        host.trigger.add("message_received", self._message_received_trigger)
        host.bridge.add_method(
            "reference_send",
            ".plugin",
            in_sign="sssss",
            out_sign="",
            method=self._send_reference,
            async_=False,
        )

    def get_handler(self, client):
        return XEP_0372_Handler()

    def ref_element_to_ref_data(
        self, reference_elt: domish.Element
    ) -> Dict[str, Union[str, int, dict]]:
        ref_data: Dict[str, Union[str, int, dict]] = {
            "uri": reference_elt["uri"],
            "type": reference_elt["type"],
        }

        if ref_data["uri"].startswith("xmpp:"):
            ref_data["parsed_uri"] = xmpp_uri.parse_xmpp_uri(ref_data["uri"])

        for attr in ("begin", "end"):
            try:
                ref_data[attr] = int(reference_elt[attr])
            except (KeyError, ValueError, TypeError):
                continue

        anchor = reference_elt.getAttribute("anchor")
        if anchor is not None:
            ref_data["anchor"] = anchor
            if anchor.startswith("xmpp:"):
                ref_data["parsed_anchor"] = xmpp_uri.parse_xmpp_uri(anchor)
        return ref_data

    async def _message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred,
    ) -> bool:
        """Check if a direct invitation is in the message, and handle it"""
        reference_elt = next(message_elt.elements(NS_REFS, "reference"), None)
        if reference_elt is None:
            return True
        try:
            ref_data = self.ref_element_to_ref_data(reference_elt)
        except KeyError:
            log.warning("invalid <reference> element: {reference_elt.toXml}")
            return True

        if not await self.host.trigger.async_point(
            "XEP-0372_ref_received", client, message_elt, ref_data
        ):
            return False
        return True

    def build_ref_element(
        self,
        uri: str,
        type_: str = "mention",
        begin: Optional[int] = None,
        end: Optional[int] = None,
        anchor: Optional[str] = None,
    ) -> domish.Element:
        """Build and return the <reference> element"""
        if type_ not in ALLOWED_TYPES:
            raise ValueError(f"Unknown type: {type_!r}")
        reference_elt = domish.Element(
            (NS_REFS, "reference"), attribs={"uri": uri, "type": type_}
        )
        if begin is not None:
            reference_elt["begin"] = str(begin)
        if end is not None:
            reference_elt["end"] = str(end)
        if anchor is not None:
            reference_elt["anchor"] = anchor
        return reference_elt

    def _send_reference(
        self, recipient: str, anchor: str, type_: str, extra_s: str, profile_key: str
    ) -> defer.Deferred:
        recipient_jid = jid.JID(recipient)
        client = self.host.get_client(profile_key)
        extra: dict = data_format.deserialise(extra_s, default={})
        self.send_reference(
            client, uri=extra.get("uri"), type_=type_, anchor=anchor, to_jid=recipient_jid
        )

    def send_reference(
        self,
        client: "SatXMPPEntity",
        uri: Optional[str] = None,
        type_: str = "mention",
        begin: Optional[int] = None,
        end: Optional[int] = None,
        anchor: Optional[str] = None,
        message_elt: Optional[domish.Element] = None,
        to_jid: Optional[jid.JID] = None,
    ) -> None:
        """Build and send a reference_elt

        @param uri: URI pointing to referenced object (XMPP entity, Pubsub Item, etc)
            if not set, "to_jid" will be used to build an URI to the entity
        @param type_: type of reference
            one of [ALLOWED_TYPES]
        @param begin: optional begin index
        @param end: optional end index
        @param anchor: URI of refering object (message, pubsub item), when the refence
            is not already in the wrapping message element. In other words, it's the
            object where the reference appears.
        @param message_elt: wrapping <message> element, if not set a new one will be
            generated
        @param to_jid: destinee of the reference. If not specified, "to" attribute of
            message_elt will be used.

        """
        if uri is None:
            if to_jid is None:
                raise exceptions.InternalError('"to_jid" must be set if "uri is None"')
            uri = xmpp_uri.build_xmpp_uri(path=to_jid.full())
        if message_elt is None:
            message_elt = domish.Element((None, "message"))

        if to_jid is not None:
            message_elt["to"] = to_jid.full()
        else:
            try:
                to_jid = jid.JID(message_elt["to"])
            except (KeyError, RuntimeError):
                raise exceptions.InternalError(
                    'invalid "to" attribute in given message element: '
                    "{message_elt.toXml()}"
                )

        message_elt.addChild(self.build_ref_element(uri, type_, begin, end, anchor))
        self._h.add_hint_elements(message_elt, [self._h.HINT_STORE])
        client.send(message_elt)


@implementer(iwokkel.IDisco)
class XEP_0372_Handler(XMPPHandler):

    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_REFS)]

    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
        return []