view sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3745:a8c7e5cef0cb

comp AP gateway: signature checking, caching and threads management: - HTTP signature is checked for incoming messages - AP actor can now be followed using pubsub subscription. When following is accepted, the node is cached - replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth` option to limit the number of comment nodes for a root message (documentation will come to explain this). ticket 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 86eea17cafa7
children 125c7043b277
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Tuple, List, Dict, Any

from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from wokkel import rsm, pubsub, data_form

from sat.core.i18n import _
from sat.core import exceptions
from sat.core.log import getLogger
from sat.core.constants import Const as C
from sat.tools.utils import ensure_deferred
from sat.memory.sqla_mapping import PubsubSub, SubscriptionState

from .constants import (
    TYPE_ACTOR,
)


log = getLogger(__name__)

# all nodes have the same config
NODE_CONFIG = [
    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
    {"var": "pubsub#max_items", "value": "max"},
    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},

]

NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
for c in NODE_CONFIG:
    NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")})


class APPubsubService(rsm.PubSubService):
    """Pubsub service for XMPP requests"""

    def __init__(self, apg):
        super(APPubsubService, self).__init__()
        self.host = apg.host
        self.apg = apg
        self.discoIdentity = {
            "category": "pubsub",
            "type": "service",
            "name": "Libervia ActivityPub Gateway",
        }

    async def getAPActorIdsAndInbox(
        self,
        requestor: jid.JID,
        recipient: jid.JID,
    ) -> Tuple[str, str, str]:
        """Get AP actor IDs from requestor and destinee JIDs

        @param requestor: XMPP entity doing a request to an AP actor via the gateway
        @param recipient: JID mapping an AP actor via the gateway
        @return: requestor actor ID, recipient actor ID and recipient inbox
        @raise error.StanzaError: "item-not-found" is raised if not user part is specified
            in requestor
        """
        if not recipient.user:
            raise error.StanzaError(
                "item-not-found",
                text="No user part specified"
            )
        requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost())
        recipient_account = self.apg._e.unescape(recipient.user)
        recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account)
        inbox = await self.apg.getAPInboxFromId(recipient_actor_id)
        return requestor_actor_id, recipient_actor_id, inbox


    @ensure_deferred
    async def publish(self, requestor, service, nodeIdentifier, items):
        raise NotImplementedError

    @ensure_deferred
    async def items(
        self,
        requestor: jid.JID,
        service: jid.JID,
        node: str,
        maxItems: Optional[int],
        itemIdentifiers: Optional[List[str]],
        rsm_req: Optional[rsm.RSMRequest]
    ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
        if not service.user:
            return [], None
        ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
        if ap_account.count("@") != 1:
            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
            return [], None
        if not node.startswith(self.apg._m.namespace):
            raise error.StanzaError(
                "feature-not-implemented",
                text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} "
                "node for now"
            )
        client = self.apg.client
        cached_node = await self.host.memory.storage.getPubsubNode(
            client, service, node
        )
        # TODO: check if node is synchronised
        if cached_node is not None:
            # the node is cached, we return items from cache
            log.debug(f"node {node!r} from {service} is in cache")
            pubsub_items, metadata = await self.apg._c.getItemsFromCache(
                client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
            )
            try:
                rsm_resp = rsm.RSMResponse(**metadata["rsm"])
            except KeyError:
                rsm_resp = None
            return [i.data for i in pubsub_items], rsm_resp

        if itemIdentifiers:
            items = []
            for item_id in itemIdentifiers:
                item_data = await self.apg.apGet(item_id)
                item_elt = await self.apg.apItem2Elt(item_data)
                items.append(item_elt)
            return items, None
        else:
            if rsm_req is None:
                if maxItems is None:
                    maxItems = 20
                kwargs = {
                    "max_items": maxItems,
                    "chronological_pagination": False,
                }
            else:
                if len(
                    [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
                     if v is not None]
                ) > 1:
                    raise error.StanzaError(
                        "bad-request",
                        text="You can't use after, before and index at the same time"
                    )
                kwargs = {"max_items": rsm_req.max}
                if rsm_req.after is not None:
                    kwargs["after_id"] = rsm_req.after
                elif rsm_req.before is not None:
                    kwargs["chronological_pagination"] = False
                    if rsm_req.before != "":
                        kwargs["after_id"] = rsm_req.before
                elif rsm_req.index is not None:
                    kwargs["start_index"] = rsm_req.index

            log.info(
                f"No cache found for node {node} at {service} (AP account {ap_account}), "
                "using Collection Paging to RSM translation"
            )
            if self.apg._m.isCommentsNode(node):
                parent_node = self.apg._m.getParentNode(node)
                try:
                    parent_data = await self.apg.apGet(parent_node)
                    collection = await self.apg.apGetObject(
                        parent_data.get("object", {}),
                        "replies"
                    )
                except Exception as e:
                    raise error.StanzaError(
                        "item-not-found",
                        text=e
                    )
            else:
                actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
                collection = await self.apg.apGetObject(actor_data, "outbox")
            if not collection:
                raise error.StanzaError(
                    "item-not-found",
                    text=f"No collection found for node {node!r} (account: {ap_account})"
                )
            return await self.apg.getAPItems(collection, **kwargs)

    @ensure_deferred
    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
        raise NotImplementedError

    @ensure_deferred
    async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
        # TODO: handle comments nodes
        client = self.apg.client
        node = await self.host.memory.storage.getPubsubNode(
            client, service, nodeIdentifier, with_subscriptions=True
        )
        if node is None:
            node = await self.host.memory.storage.setPubsubNode(
                client,
                service,
                nodeIdentifier,
            )
            subscription = None
        else:
            try:
                subscription = next(
                    s for s in node.subscriptions
                    if s.subscriber == requestor.userhostJID()
                )
            except StopIteration:
                subscription = None

        if subscription is None:
            subscription = PubsubSub(
                subscriber=requestor.userhostJID(),
                state=SubscriptionState.PENDING
            )
            node.subscriptions.append(subscription)
            await self.host.memory.storage.add(node)
        else:
            if subscription.state is None:
                subscription.state = SubscriptionState.PENDING
                await self.host.memory.storage.add(node)
            elif subscription.state == SubscriptionState.SUBSCRIBED:
                log.info(
                    f"{requestor.userhostJID()} has already a subscription to {node!r} "
                    f"at {service}. Doing the request anyway."
                )
            elif subscription.state == SubscriptionState.PENDING:
                log.info(
                    f"{requestor.userhostJID()} has already a pending subscription to "
                    f"{node!r} at {service}. Doing the request anyway."
                )
            else:
                raise exceptions.InternalError(
                    f"unmanaged subscription state: {subscription.state}"
                )

        req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
            requestor, service
        )

        data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)

        resp = await self.apg.signAndPost(inbox, req_actor_id, data)
        if resp.code >= 400:
            text = await resp.text()
            raise error.StanzaError("service-unavailable", text=text)
        return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")

    @ensure_deferred
    async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
        req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
            requestor, service
        )
        data = self.apg.createActivity(
            "Undo",
            req_actor_id,
            self.apg.createActivity(
                "Follow",
                req_actor_id,
                recip_actor_id
            )
        )

        resp = await self.apg.signAndPost(inbox, req_actor_id, data)
        if resp.code >= 400:
            text = await resp.text()
            raise error.StanzaError("service-unavailable", text=text)

    def getConfigurationOptions(self):
        return NODE_OPTIONS

    def getConfiguration(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str
    ) -> defer.Deferred:
        return defer.succeed(NODE_CONFIG_VALUES)

    def getNodeInfo(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str,
        pep: bool = False,
        recipient: Optional[jid.JID] = None
    ) -> Optional[dict]:
        if not nodeIdentifier:
            return None
        info = {
            "type": "leaf",
            "meta-data": NODE_CONFIG
        }
        return info