view sat/plugins/plugin_xep_0372.py @ 3833:381340b9a9ee

component AP gateway: convert XMPP mentions to AP: When a XEP-0372 mention is received, the linked pubsub item is looked after in cache, and if found, it is send to mentioned entity with `mention` tag added. However, this doesn't work in some cases (see incoming doc for details). To work around that, `@user@server.tld` type mention are also scanned in body, and mentions are added when found (this can be disabled with `auto_mentions` setting). Mention are only scanned in "public" messages, i.e. for pubsub items, and not direct messages. rel 369
author Goffi <goffi@goffi.org>
date Sun, 10 Jul 2022 16:15:06 +0200
parents 68a11b95a7d3
children 524856bd7b19
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for XEP-0372
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Dict, Union
from textwrap import dedent
from sat.core import exceptions
from sat.tools.common import uri as xmpp_uri

from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from zope.interface import implementer
from wokkel import disco, iwokkel

from sat.core.constants import Const as C
from sat.core.i18n import _
from sat.core.log import getLogger
from sat.core.core_types import SatXMPPEntity
from sat.tools.common import data_format


log = getLogger(__name__)

PLUGIN_INFO = {
    C.PI_NAME: "References",
    C.PI_IMPORT_NAME: "XEP-0372",
    C.PI_TYPE: C.PLUG_TYPE_XEP,
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0372"],
    C.PI_DEPENDENCIES: ["XEP-0334"],
    C.PI_MAIN: "XEP_0372",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(dedent("""\
        XEP-0372 (References) implementation

        This plugin implement generic references and mentions.
    """)),
}

NS_REFS = "urn:xmpp:reference:0"
ALLOWED_TYPES = ("mention", "data")


class XEP_0372:
    namespace = NS_REFS

    def __init__(self, host):
        log.info(_("References plugin initialization"))
        host.registerNamespace("refs", NS_REFS)
        self.host = host
        self._h = host.plugins["XEP-0334"]
        host.trigger.add("messageReceived", self._messageReceivedTrigger)
        host.bridge.addMethod(
            "referenceSend",
            ".plugin",
            in_sign="sssss",
            out_sign="",
            method=self._sendReference,
            async_=False,
        )

    def getHandler(self, client):
        return XEP_0372_Handler()

    def refElementToRefData(
        self,
        reference_elt: domish.Element
    ) -> Dict[str, Union[str, int, dict]]:
        ref_data: Dict[str, Union[str, int, dict]] = {
            "uri": reference_elt["uri"],
            "type": reference_elt["type"]
        }

        if ref_data["uri"].startswith("xmpp:"):
            ref_data["parsed_uri"] = xmpp_uri.parseXMPPUri(ref_data["uri"])

        for attr in ("begin", "end"):
            try:
                ref_data[attr] = int(reference_elt[attr])
            except (KeyError, ValueError, TypeError):
                continue

        anchor = reference_elt.getAttribute("anchor")
        if anchor is not None:
            ref_data["anchor"] = anchor
            if anchor.startswith("xmpp:"):
                ref_data["parsed_anchor"] = xmpp_uri.parseXMPPUri(anchor)
        return ref_data

    async def _messageReceivedTrigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred
    ) -> bool:
        """Check if a direct invitation is in the message, and handle it"""
        reference_elt = next(message_elt.elements(NS_REFS, "reference"), None)
        if reference_elt is None:
            return True
        try:
            ref_data = self.refElementToRefData(reference_elt)
        except KeyError:
            log.warning("invalid <reference> element: {reference_elt.toXml}")
            return True

        if not await self.host.trigger.asyncPoint(
            "XEP-0372_ref_received", client, message_elt, ref_data
        ):
            return False
        return True

    def buildRefElement(
        self,
        uri: str,
        type_: str = "mention",
        begin: Optional[int] = None,
        end: Optional[int] = None,
        anchor: Optional[str] = None,
    ) -> domish.Element:
        """Build and return the <reference> element"""
        if type_ not in ALLOWED_TYPES:
            raise ValueError(f"Unknown type: {type_!r}")
        reference_elt = domish.Element(
            (NS_REFS, "reference"),
            attribs={"uri": uri, "type": type_}
        )
        if begin is not None:
            reference_elt["begin"] = str(begin)
        if end is not None:
            reference_elt["end"] = str(end)
        if anchor is not None:
            reference_elt["anchor"] = anchor
        return reference_elt

    def _sendReference(
        self,
        recipient: str,
        anchor: str,
        type_: str,
        extra_s: str,
        profile_key: str
    ) -> defer.Deferred:
        recipient_jid = jid.JID(recipient)
        client = self.host.getClient(profile_key)
        extra: dict = data_format.deserialise(extra_s, default={})
        self.sendReference(
            client,
            uri=extra.get("uri"),
            type_=type_,
            anchor=anchor,
            to_jid=recipient_jid
        )

    def sendReference(
        self,
        client: "SatXMPPEntity",
        uri: Optional[str] = None,
        type_: str = "mention",
        begin: Optional[int] = None,
        end: Optional[int] = None,
        anchor: Optional[str] = None,
        message_elt: Optional[domish.Element] = None,
        to_jid: Optional[jid.JID] = None
    ) -> None:
        """Build and send a reference_elt

        @param uri: URI pointing to referenced object (XMPP entity, Pubsub Item, etc)
            if not set, "to_jid" will be used to build an URI to the entity
        @param type_: type of reference
            one of [ALLOWED_TYPES]
        @param begin: optional begin index
        @param end: optional end index
        @param anchor: URI of refering object (message, pubsub item), when the refence
            is not already in the wrapping message element. In other words, it's the
            object where the reference appears.
        @param message_elt: wrapping <message> element, if not set a new one will be
            generated
        @param to_jid: destinee of the reference. If not specified, "to" attribute of
            message_elt will be used.

        """
        if uri is None:
            if to_jid is None:
                raise exceptions.InternalError(
                    '"to_jid" must be set if "uri is None"'
                )
            uri = xmpp_uri.buildXMPPUri(path=to_jid.full())
        if message_elt is None:
            message_elt = domish.Element((None, "message"))

        if to_jid is not None:
            message_elt["to"] = to_jid.full()
        else:
            try:
                to_jid = jid.JID(message_elt["to"])
            except (KeyError, RuntimeError):
                raise exceptions.InternalError(
                    'invalid "to" attribute in given message element: '
                    '{message_elt.toXml()}'
                )

        message_elt.addChild(self.buildRefElement(uri, type_, begin, end, anchor))
        self._h.addHintElements(message_elt, [self._h.HINT_STORE])
        client.send(message_elt)


@implementer(iwokkel.IDisco)
class XEP_0372_Handler(XMPPHandler):

    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_REFS)]

    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
        return []