view sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3808:39fc2e1b3793

tests (unit/ap gateway): fix `onMessage` call following change in the component: the component now uses the normal message workflow, thus the tests had to be updated rel 367
author Goffi <goffi@goffi.org>
date Fri, 17 Jun 2022 15:50:34 +0200
parents 865167c34b82
children 6329ee6b6df4
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, Tuple, List, Union

from twisted.internet import defer
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from wokkel import rsm, pubsub, disco

from sat.core.i18n import _
from sat.core import exceptions
from sat.core.log import getLogger
from sat.core.constants import Const as C
from sat.tools.utils import ensure_deferred
from sat.memory.sqla_mapping import PubsubSub, SubscriptionState

from .constants import (
    TYPE_ACTOR,
)


log = getLogger(__name__)

# all nodes have the same config
NODE_CONFIG = [
    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
    {"var": "pubsub#max_items", "value": "max"},
    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},

]

NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
for c in NODE_CONFIG:
    NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")})


class APPubsubService(rsm.PubSubService):
    """Pubsub service for XMPP requests"""

    def __init__(self, apg):
        super(APPubsubService, self).__init__()
        self.host = apg.host
        self.apg = apg
        self.discoIdentity = {
            "category": "pubsub",
            "type": "service",
            "name": "Libervia ActivityPub Gateway",
        }

    async def getAPActorIdsAndInbox(
        self,
        requestor: jid.JID,
        recipient: jid.JID,
    ) -> Tuple[str, str, str]:
        """Get AP actor IDs from requestor and destinee JIDs

        @param requestor: XMPP entity doing a request to an AP actor via the gateway
        @param recipient: JID mapping an AP actor via the gateway
        @return: requestor actor ID, recipient actor ID and recipient inbox
        @raise error.StanzaError: "item-not-found" is raised if not user part is specified
            in requestor
        """
        if not recipient.user:
            raise error.StanzaError(
                "item-not-found",
                text="No user part specified"
            )
        requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost())
        recipient_account = self.apg._e.unescape(recipient.user)
        recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account)
        inbox = await self.apg.getAPInboxFromId(recipient_actor_id)
        return requestor_actor_id, recipient_actor_id, inbox


    @ensure_deferred
    async def publish(self, requestor, service, nodeIdentifier, items):
        if self.apg.local_only and not self.apg.isLocal(requestor):
            raise error.StanzaError(
                "forbidden",
                "Only local users can publish on this gateway."
            )
        if not service.user:
            raise error.StanzaError(
                "bad-request",
                "You must specify an ActivityPub actor account in JID user part."
            )
        ap_account = self.apg._e.unescape(service.user)
        if ap_account.count("@") != 1:
            raise error.StanzaError(
                "bad-request",
                f"{ap_account!r} is not a valid ActivityPub actor account."
            )

        client = self.apg.client.getVirtualClient(requestor)
        await self.apg.convertAndPostItems(
            client, ap_account, service, nodeIdentifier, items
        )

    async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
        """Convert actor ID from following collection to XMPP item"""
        actor_id = ap_item["id"]
        actor_jid = await self.apg.getJIDFromId(actor_id)
        subscription_elt = self.apg._pps.buildSubscriptionElt(
            self.apg._m.namespace, actor_jid
        )
        item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
        return item_elt

    async def apFollower2Elt(self, ap_item: dict) -> domish.Element:
        """Convert actor ID from followers collection to XMPP item"""
        actor_id = ap_item["id"]
        actor_jid = await self.apg.getJIDFromId(actor_id)
        subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid)
        item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
        return item_elt

    @ensure_deferred
    async def items(
        self,
        requestor: jid.JID,
        service: jid.JID,
        node: str,
        maxItems: Optional[int],
        itemIdentifiers: Optional[List[str]],
        rsm_req: Optional[rsm.RSMRequest]
    ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
        if not service.user:
            return [], None
        ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
        if ap_account.count("@") != 1:
            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
            return [], None

        kwargs = {}

        if node == self.apg._pps.subscriptions_node:
            collection_name = "following"
            parser = self.apFollowing2Elt
            kwargs["only_ids"] = True
            use_cache = False
        elif node.startswith(self.apg._pps.subscribers_node_prefix):
            collection_name = "followers"
            parser = self.apFollower2Elt
            kwargs["only_ids"] = True
            use_cache = False
        else:
            if not node.startswith(self.apg._m.namespace):
                raise error.StanzaError(
                    "feature-not-implemented",
                    text=f"AP Gateway {C.APP_VERSION} only supports "
                    f"{self.apg._m.namespace} node for now"
                )
            collection_name = "outbox"
            parser = self.apg.apItem2Elt
            use_cache = True

        client = self.apg.client
        if use_cache:
            cached_node = await self.host.memory.storage.getPubsubNode(
                client, service, node
            )
            # TODO: check if node is synchronised
            if cached_node is not None:
                # the node is cached, we return items from cache
                log.debug(f"node {node!r} from {service} is in cache")
                pubsub_items, metadata = await self.apg._c.getItemsFromCache(
                    client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
                )
                try:
                    rsm_resp = rsm.RSMResponse(**metadata["rsm"])
                except KeyError:
                    rsm_resp = None
                return [i.data for i in pubsub_items], rsm_resp

        if itemIdentifiers:
            items = []
            for item_id in itemIdentifiers:
                item_data = await self.apg.apGet(item_id)
                item_elt = await parser(item_data)
                items.append(item_elt)
            return items, None
        else:
            if rsm_req is None:
                if maxItems is None:
                    maxItems = 20
                kwargs.update({
                    "max_items": maxItems,
                    "chronological_pagination": False,
                })
            else:
                if len(
                    [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
                     if v is not None]
                ) > 1:
                    raise error.StanzaError(
                        "bad-request",
                        text="You can't use after, before and index at the same time"
                    )
                kwargs.update({"max_items": rsm_req.max})
                if rsm_req.after is not None:
                    kwargs["after_id"] = rsm_req.after
                elif rsm_req.before is not None:
                    kwargs["chronological_pagination"] = False
                    if rsm_req.before != "":
                        kwargs["after_id"] = rsm_req.before
                elif rsm_req.index is not None:
                    kwargs["start_index"] = rsm_req.index

            log.info(
                f"No cache found for node {node} at {service} (AP account {ap_account}), "
                "using Collection Paging to RSM translation"
            )
            if self.apg._m.isCommentNode(node):
                parent_item = self.apg._m.getParentItem(node)
                try:
                    parent_data = await self.apg.apGet(parent_item)
                    collection = await self.apg.apGetObject(
                        parent_data.get("object", {}),
                        "replies"
                    )
                except Exception as e:
                    raise error.StanzaError(
                        "item-not-found",
                        text=e
                    )
            else:
                actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
                collection = await self.apg.apGetObject(actor_data, collection_name)
            if not collection:
                raise error.StanzaError(
                    "item-not-found",
                    text=f"No collection found for node {node!r} (account: {ap_account})"
                )

            kwargs["parser"] = parser
            return await self.apg.getAPItems(collection, **kwargs)

    @ensure_deferred
    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
        raise error.StanzaError("forbidden")

    @ensure_deferred
    async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
        # TODO: handle comments nodes
        client = self.apg.client
        node = await self.host.memory.storage.getPubsubNode(
            client, service, nodeIdentifier, with_subscriptions=True
        )
        if node is None:
            node = await self.host.memory.storage.setPubsubNode(
                client,
                service,
                nodeIdentifier,
            )
            subscription = None
        else:
            try:
                subscription = next(
                    s for s in node.subscriptions
                    if s.subscriber == requestor.userhostJID()
                )
            except StopIteration:
                subscription = None

        if subscription is None:
            subscription = PubsubSub(
                subscriber=requestor.userhostJID(),
                state=SubscriptionState.PENDING
            )
            node.subscriptions.append(subscription)
            await self.host.memory.storage.add(node)
        else:
            if subscription.state is None:
                subscription.state = SubscriptionState.PENDING
                await self.host.memory.storage.add(node)
            elif subscription.state == SubscriptionState.SUBSCRIBED:
                log.info(
                    f"{requestor.userhostJID()} has already a subscription to {node!r} "
                    f"at {service}. Doing the request anyway."
                )
            elif subscription.state == SubscriptionState.PENDING:
                log.info(
                    f"{requestor.userhostJID()} has already a pending subscription to "
                    f"{node!r} at {service}. Doing the request anyway."
                )
            else:
                raise exceptions.InternalError(
                    f"unmanaged subscription state: {subscription.state}"
                )

        req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
            requestor, service
        )

        data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)

        resp = await self.apg.signAndPost(inbox, req_actor_id, data)
        if resp.code >= 400:
            text = await resp.text()
            raise error.StanzaError("service-unavailable", text=text)
        return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")

    @ensure_deferred
    async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
        req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
            requestor, service
        )
        data = self.apg.createActivity(
            "Undo",
            req_actor_id,
            self.apg.createActivity(
                "Follow",
                req_actor_id,
                recip_actor_id
            )
        )

        resp = await self.apg.signAndPost(inbox, req_actor_id, data)
        if resp.code >= 400:
            text = await resp.text()
            raise error.StanzaError("service-unavailable", text=text)

    def getConfigurationOptions(self):
        return NODE_OPTIONS

    def getConfiguration(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str
    ) -> defer.Deferred:
        return defer.succeed(NODE_CONFIG_VALUES)

    def getNodeInfo(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str,
        pep: bool = False,
        recipient: Optional[jid.JID] = None
    ) -> Optional[dict]:
        if not nodeIdentifier:
            return None
        info = {
            "type": "leaf",
            "meta-data": NODE_CONFIG
        }
        return info