view libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 4314:6a70fcd93a7a

plugin XEP-0131: Stanza Headers and Internet Metadata implementation: - SHIM is now supported and put in `msg_data["extra"]["headers"]`. - `Keywords` are converted from and to list of string in `msg_data["extra"]["keywords"]` field (if present in headers on message sending, values are merged). - Python minimal version upgraded to 3.11 due to use of `StrEnum`. rel 451
author Goffi <goffi@goffi.org>
date Sat, 28 Sep 2024 15:56:04 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Tuple, List, Dict, Any, Union
from urllib.parse import urlparse
from pathlib import Path
from base64 import b64encode
import tempfile

from twisted.internet import defer, threads
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from wokkel import rsm, pubsub, disco

from libervia.backend.core.i18n import _
from libervia.backend.core import exceptions
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.log import getLogger
from libervia.backend.core.constants import Const as C
from libervia.backend.tools import image
from libervia.backend.tools.utils import ensure_deferred
from libervia.backend.tools.web import download_file
from libervia.backend.memory.sqla_mapping import PubsubSub, SubscriptionState

from .constants import TYPE_ACTOR, ST_AVATAR, MAX_AVATAR_SIZE


log = getLogger(__name__)

# all nodes have the same config
NODE_CONFIG = [
    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
    {"var": "pubsub#max_items", "value": "max"},
    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
]

NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
for c in NODE_CONFIG:
    NODE_OPTIONS[c["var"]].update(
        {k: v for k, v in c.items() if k not in ("var", "value")}
    )


class APPubsubService(rsm.PubSubService):
    """Pubsub service for XMPP requests"""

    def __init__(self, apg):
        super(APPubsubService, self).__init__()
        self.host = apg.host
        self.apg = apg
        self.discoIdentity = {
            "category": "pubsub",
            "type": "service",
            "name": "Libervia ActivityPub Gateway",
        }

    async def get_ap_actor_ids_and_inbox(
        self,
        requestor: jid.JID,
        recipient: jid.JID,
    ) -> Tuple[str, str, str]:
        """Get AP actor IDs from requestor and destinee JIDs

        @param requestor: XMPP entity doing a request to an AP actor via the gateway
        @param recipient: JID mapping an AP actor via the gateway
        @return: requestor actor ID, recipient actor ID and recipient inbox
        @raise error.StanzaError: "item-not-found" is raised if not user part is specified
            in requestor
        """
        if not recipient.user:
            raise error.StanzaError("item-not-found", text="No user part specified")
        requestor_actor_id = self.apg.build_apurl(TYPE_ACTOR, requestor.userhost())
        recipient_account = self.apg._e.unescape(recipient.user)
        recipient_actor_id = await self.apg.get_ap_actor_id_from_account(
            recipient_account
        )
        inbox = await self.apg.get_ap_inbox_from_id(recipient_actor_id, use_shared=False)
        return requestor_actor_id, recipient_actor_id, inbox

    @ensure_deferred
    async def publish(self, requestor, service, nodeIdentifier, items):
        if self.apg.local_only and not self.apg.is_local(requestor):
            raise error.StanzaError(
                "forbidden", "Only local users can publish on this gateway."
            )
        if not service.user:
            raise error.StanzaError(
                "bad-request",
                "You must specify an ActivityPub actor account in JID user part.",
            )
        ap_account = self.apg._e.unescape(service.user)
        if ap_account.count("@") != 1:
            raise error.StanzaError(
                "bad-request", f"{ap_account!r} is not a valid ActivityPub actor account."
            )

        client = self.apg.client.get_virtual_client(requestor)
        if self.apg._pa.is_attachment_node(nodeIdentifier):
            await self.apg.convert_and_post_attachments(
                client, ap_account, service, nodeIdentifier, items, publisher=requestor
            )
        else:
            await self.apg.convert_and_post_items(
                client, ap_account, service, nodeIdentifier, items
            )
            cached_node = await self.host.memory.storage.get_pubsub_node(
                client, service, nodeIdentifier, with_subscriptions=True, create=True
            )
            await self.host.memory.storage.cache_pubsub_items(client, cached_node, items)
            for subscription in cached_node.subscriptions:
                if subscription.state != SubscriptionState.SUBSCRIBED:
                    continue
                self.notifyPublish(
                    service, nodeIdentifier, [(subscription.subscriber, None, items)]
                )

    async def ap_following_2_elt(
        self, requestor_actor_id: str, ap_item: dict
    ) -> domish.Element:
        """Convert actor ID from following collection to XMPP item

        @param requestor_actor_id: ID of the actor doing the request.
        @param ap_item: AP item from which actor ID must be extracted.
        """
        actor_id = ap_item["id"]
        actor_jid = await self.apg.get_jid_from_id(requestor_actor_id, actor_id)
        subscription_elt = self.apg._pps.build_subscription_elt(
            self.apg._m.namespace, actor_jid
        )
        item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
        return item_elt

    async def ap_follower_2_elt(
        self, requestor_actor_id: str, ap_item: dict
    ) -> domish.Element:
        """Convert actor ID from followers collection to XMPP item

        @param requestor_actor_id: ID of the actor doing the request.
        @param ap_item: AP item from which actor ID must be extracted.
        """
        actor_id = ap_item["id"]
        actor_jid = await self.apg.get_jid_from_id(requestor_actor_id, actor_id)
        subscriber_elt = self.apg._pps.build_subscriber_elt(actor_jid)
        item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
        return item_elt

    async def generate_v_card(
        self, requestor_actor_id: str, ap_account: str
    ) -> domish.Element:
        """Generate vCard4 (XEP-0292) item element from ap_account's metadata

        @param requestor_actor_id: ID of the actor doing the request.
        @param ap_account: AP account from where the vcard must be retrieved.
        @return: <item> with the <vcard> element
        """
        actor_data = await self.apg.get_ap_actor_data_from_account(
            requestor_actor_id, ap_account
        )
        identity_data = {}

        summary = actor_data.get("summary")
        # summary is HTML, we have to convert it to text
        if summary:
            identity_data["description"] = await self.apg._t.convert(
                summary,
                self.apg._t.SYNTAX_XHTML,
                self.apg._t.SYNTAX_TEXT,
                False,
            )

        for field in ("name", "preferredUsername"):
            value = actor_data.get(field)
            if value:
                identity_data.setdefault("nicknames", []).append(value)
        vcard_elt = self.apg._v.dict_2_v_card(identity_data)
        item_elt = domish.Element((pubsub.NS_PUBSUB, "item"))
        item_elt.addChild(vcard_elt)
        item_elt["id"] = self.apg._p.ID_SINGLETON
        return item_elt

    async def get_avatar_data(
        self, client: SatXMPPEntity, requestor_actor_id: str, ap_account: str
    ) -> dict[str, Any]:
        """Retrieve actor's avatar if any, cache it and file actor_data

        @param client: client to use for the request.
        @param requestor_actor_id: ID of the actor doing the request.
        @param ap_account: AP account from where the avatar data must be retrieved.
        @return: Avatar data.
            ``cache_uid``, `path``` and ``media_type`` keys are always filed.
            ``base64`` key is only filled if the file was not already in cache.
        """
        actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)

        for icon in await self.apg.ap_get_list(requestor_actor_id, actor_data, "icon"):
            url = icon.get("url")
            if icon["type"] != "Image" or not url:
                continue
            parsed_url = urlparse(url)
            if not parsed_url.scheme in ("http", "https"):
                log.warning(f"unexpected URL scheme: {url!r}")
                continue
            filename = Path(parsed_url.path).name
            if not filename:
                log.warning(f"ignoring URL with invald path: {url!r}")
                continue
            break
        else:
            raise error.StanzaError("item-not-found")

        key = f"{ST_AVATAR}{url}"
        cache_uid = await client._ap_storage.get(key)

        if cache_uid is None:
            cache = None
        else:
            cache = self.apg.host.common_cache.get_metadata(cache_uid)

        if cache is None:
            with tempfile.TemporaryDirectory() as dir_name:
                dest_path = Path(dir_name, filename)
                await download_file(url, dest_path, max_size=MAX_AVATAR_SIZE)
                avatar_data = {
                    "path": dest_path,
                    "filename": filename,
                    "media_type": image.guess_type(dest_path),
                }

                await self.apg._i.cache_avatar(self.apg.IMPORT_NAME, avatar_data)
        else:
            avatar_data = {
                "cache_uid": cache["uid"],
                "path": cache["path"],
                "media_type": cache["mime_type"],
            }

        return avatar_data

    async def generate_avatar_metadata(
        self, client: SatXMPPEntity, requestor_actor_id: str, ap_account: str
    ) -> domish.Element:
        """Generate the metadata element for user avatar

        @param requestor_actor_id: ID of the actor doing the request.
        @raise StanzaError("item-not-found"): no avatar is present in actor data (in
            ``icon`` field)
        """
        avatar_data = await self.get_avatar_data(client, requestor_actor_id, ap_account)
        return self.apg._a.build_item_metadata_elt(avatar_data)

    def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None:
        with avatar_data["path"].open("rb") as f:
            avatar_data["base64"] = b64encode(f.read()).decode()

    async def generate_avatar_data(
        self,
        client: SatXMPPEntity,
        requestor_actor_id: str,
        ap_account: str,
        itemIdentifiers: Optional[List[str]],
    ) -> domish.Element:
        """Generate the data element for user avatar

        @param requestor_actor_id: ID of the actor doing the request.
        @raise StanzaError("item-not-found"): no avatar cached with requested ID
        """
        if not itemIdentifiers:
            avatar_data = await self.get_avatar_data(
                client, requestor_actor_id, ap_account
            )
            if "base64" not in avatar_data:
                await threads.deferToThread(
                    self._blocking_b_6_4_encode_avatar, avatar_data
                )
        else:
            if len(itemIdentifiers) > 1:
                # only a single item ID is supported
                raise error.StanzaError("item-not-found")
            item_id = itemIdentifiers[0]
            # just to be sure that that we don't have an empty string
            assert item_id
            cache_data = self.apg.host.common_cache.get_metadata(item_id)
            if cache_data is None:
                raise error.StanzaError("item-not-found")
            avatar_data = {"cache_uid": item_id, "path": cache_data["path"]}
            await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data)

        return self.apg._a.build_item_data_elt(avatar_data)

    @ensure_deferred
    async def items(
        self,
        requestor: jid.JID,
        service: jid.JID,
        node: str,
        maxItems: Optional[int],
        itemIdentifiers: Optional[List[str]],
        rsm_req: Optional[rsm.RSMRequest],
    ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
        if not service.user:
            return [], None
        ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
        if ap_account.count("@") != 1:
            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
            return [], None

        requestor_actor_id = self.apg.build_apurl(
            TYPE_ACTOR, await self.apg.get_ap_account_from_jid_and_node(service, node)
        )

        # cached_node may be pre-filled with some nodes (e.g. attachments nodes),
        # otherwise it is filled when suitable
        cached_node = None
        client = self.apg.client
        kwargs = {}

        if node == self.apg._pps.subscriptions_node:
            collection_name = "following"
            parser = self.ap_following_2_elt
            kwargs["only_ids"] = True
            use_cache = False
        elif node.startswith(self.apg._pps.subscribers_node_prefix):
            collection_name = "followers"
            parser = self.ap_follower_2_elt
            kwargs["only_ids"] = True
            use_cache = False
        elif node == self.apg._v.node:
            # vCard4 request
            item_elt = await self.generate_v_card(requestor_actor_id, ap_account)
            return [item_elt], None
        elif node == self.apg._a.namespace_metadata:
            item_elt = await self.generate_avatar_metadata(
                self.apg.client, requestor_actor_id, ap_account
            )
            return [item_elt], None
        elif node == self.apg._a.namespace_data:
            item_elt = await self.generate_avatar_data(
                self.apg.client, requestor_actor_id, ap_account, itemIdentifiers
            )
            return [item_elt], None
        elif self.apg._pa.is_attachment_node(node):
            use_cache = True
            # we check cache here because we emit an item-not-found error if the node is
            # not in cache, as we are not dealing with real AP items
            cached_node = await self.host.memory.storage.get_pubsub_node(
                client, service, node
            )
            if cached_node is None:
                raise error.StanzaError("item-not-found")
        else:
            if node.startswith(self.apg._m.namespace):
                parser = self.apg.ap_item_2_mb_elt
            elif node.startswith(self.apg._events.namespace):
                parser = self.apg.ap_events.ap_item_2_event_elt
            else:
                raise error.StanzaError(
                    "feature-not-implemented",
                    text=f"AP Gateway {C.APP_VERSION} only supports "
                    f"{self.apg._m.namespace} node for now",
                )
            collection_name = "outbox"
            use_cache = True

        if use_cache:
            if cached_node is None:
                cached_node = await self.host.memory.storage.get_pubsub_node(
                    client, service, node
                )
            # TODO: check if node is synchronised
            if cached_node is not None:
                # the node is cached, we return items from cache
                log.debug(f"node {node!r} from {service} is in cache")
                pubsub_items, metadata = await self.apg._c.get_items_from_cache(
                    client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
                )
                try:
                    rsm_resp = rsm.RSMResponse(**metadata["rsm"])
                except KeyError:
                    rsm_resp = None
                return [i.data for i in pubsub_items], rsm_resp

        if itemIdentifiers:
            items = []
            for item_id in itemIdentifiers:
                item_data = await self.apg.ap_get(item_id, requestor_actor_id)
                item_elt = await parser(requestor_actor_id, item_data)
                items.append(item_elt)
            return items, None
        else:
            if rsm_req is None:
                if maxItems is None:
                    maxItems = 20
                kwargs.update(
                    {
                        "max_items": maxItems,
                        "chronological_pagination": False,
                    }
                )
            else:
                if (
                    len(
                        [
                            v
                            for v in (rsm_req.after, rsm_req.before, rsm_req.index)
                            if v is not None
                        ]
                    )
                    > 1
                ):
                    raise error.StanzaError(
                        "bad-request",
                        text="You can't use after, before and index at the same time",
                    )
                kwargs.update({"max_items": rsm_req.max})
                if rsm_req.after is not None:
                    kwargs["after_id"] = rsm_req.after
                elif rsm_req.before is not None:
                    kwargs["chronological_pagination"] = False
                    if rsm_req.before != "":
                        kwargs["after_id"] = rsm_req.before
                elif rsm_req.index is not None:
                    kwargs["start_index"] = rsm_req.index

            log.info(
                f"No cache found for node {node} at {service} (AP account {ap_account}), "
                "using Collection Paging to RSM translation"
            )
            if self.apg._m.is_comment_node(node):
                parent_item = self.apg._m.get_parent_item(node)
                try:
                    parent_data = await self.apg.ap_get(parent_item, requestor_actor_id)
                    collection = await self.apg.ap_get_object(
                        requestor_actor_id, parent_data.get("object", {}), "replies"
                    )
                except Exception as e:
                    raise error.StanzaError("item-not-found", text=str(e))
            else:
                actor_data = await self.apg.get_ap_actor_data_from_account(
                    requestor_actor_id, ap_account
                )
                collection = await self.apg.ap_get_object(
                    requestor_actor_id, actor_data, collection_name
                )
            if not collection:
                raise error.StanzaError(
                    "item-not-found",
                    text=f"No collection found for node {node!r} (account: {ap_account})",
                )

            kwargs["parser"] = parser
            return await self.apg.get_ap_items(requestor_actor_id, collection, **kwargs)

    @ensure_deferred
    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
        raise error.StanzaError("forbidden")

    @ensure_deferred
    async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
        # TODO: handle comments nodes
        client = self.apg.client
        # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow
        # is accepted. Other nodes are directly set to subscribed, their subscriptions
        # being internal.
        if nodeIdentifier == self.apg._m.namespace:
            sub_state = SubscriptionState.PENDING
        else:
            sub_state = SubscriptionState.SUBSCRIBED
        node = await self.host.memory.storage.get_pubsub_node(
            client, service, nodeIdentifier, with_subscriptions=True
        )
        if node is None:
            node = await self.host.memory.storage.set_pubsub_node(
                client,
                service,
                nodeIdentifier,
            )
            subscription = None
        else:
            try:
                subscription = next(
                    s
                    for s in node.subscriptions
                    if s.subscriber == requestor.userhostJID()
                )
            except StopIteration:
                subscription = None

        if subscription is None:
            subscription = PubsubSub(subscriber=requestor.userhostJID(), state=sub_state)
            node.subscriptions.append(subscription)
            await self.host.memory.storage.add(node)
        else:
            if subscription.state is None:
                subscription.state = sub_state
                await self.host.memory.storage.add(node)
            elif subscription.state == SubscriptionState.SUBSCRIBED:
                log.info(
                    f"{requestor.userhostJID()} has already a subscription to {node!r} "
                    f"at {service}. Doing the request anyway."
                )
            elif subscription.state == SubscriptionState.PENDING:
                log.info(
                    f"{requestor.userhostJID()} has already a pending subscription to "
                    f"{node!r} at {service}. Doing the request anyway."
                )
                if sub_state != SubscriptionState.PENDING:
                    subscription.state = sub_state
                    await self.host.memory.storage.add(node)
            else:
                raise exceptions.InternalError(
                    f"unmanaged subscription state: {subscription.state}"
                )

        if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace):
            # if we subscribe to microblog or events node, we follow the corresponding
            # account
            req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox(
                requestor, service
            )

            data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id)

            resp = await self.apg.sign_and_post(inbox, req_actor_id, data)
            if resp.code >= 300:
                text = await resp.text()
                raise error.StanzaError("service-unavailable", text=text)
        return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")

    @ensure_deferred
    async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
        req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox(
            requestor, service
        )
        data = self.apg.create_activity(
            "Undo",
            req_actor_id,
            self.apg.create_activity("Follow", req_actor_id, recip_actor_id),
        )

        resp = await self.apg.sign_and_post(inbox, req_actor_id, data)
        if resp.code >= 300:
            text = await resp.text()
            raise error.StanzaError("service-unavailable", text=text)

    def getConfigurationOptions(self):
        return NODE_OPTIONS

    def getConfiguration(
        self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str
    ) -> defer.Deferred:
        return defer.succeed(NODE_CONFIG_VALUES)

    def getNodeInfo(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str,
        pep: bool = False,
        recipient: Optional[jid.JID] = None,
    ) -> Optional[dict]:
        if not nodeIdentifier:
            return None
        info = {"type": "leaf", "meta-data": NODE_CONFIG}
        return info