view sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3737:783d6dc87b80

plugin XEP-0060: specify sender using `client.jid` in `subscribe`: this is necessary to do a subscription with a component managing a virtual JID (i.e. not the JID of the component itself, but the one linked to a account, see `SatXMPPComponent.getVirtualClient`). rel 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 86eea17cafa7
children a8c7e5cef0cb
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, List

from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from wokkel import rsm

from sat.core.i18n import _
from sat.core.log import getLogger
from sat.tools.utils import ensure_deferred


log = getLogger(__name__)


class APPubsubService(rsm.PubSubService):
    """Pubsub service for XMPP requests"""

    def __init__(self, apg):
        super(APPubsubService, self).__init__()
        self.host = apg.host
        self.apg = apg
        self.discoIdentity = {
            "category": "pubsub",
            "type": "service",
            "name": "Libervia ActivityPub Gateway",
        }

    @ensure_deferred
    async def publish(self, requestor, service, nodeIdentifier, items):
        raise NotImplementedError

    @ensure_deferred
    async def items(
        self,
        requestor: jid.JID,
        service: jid.JID,
        node: str,
        maxItems: Optional[int],
        itemIdentifiers: Optional[List[str]],
        rsm_req: Optional[rsm.RSMRequest]
    ) -> List[domish.Element]:
        if not service.user:
            return []
        ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
        if ap_account.count("@") != 1:
            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
            return []
        if node != self.apg._m.namespace:
            raise error.StanzaError(
                "feature-not-implemented",
                text=f"{VERSION} only supports {self.apg._m.namespace} "
                "node for now"
            )
        if rsm_req is None:
            if maxItems is None:
                maxItems = 20
            kwargs = {
                "max_items": maxItems,
                "chronological_pagination": False,
            }
        else:
            if len(
                [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
                 if v is not None]
            ) > 1:
                raise error.StanzaError(
                    "bad-request",
                    text="You can't use after, before and index at the same time"
                )
            kwargs = {"max_items": rsm_req.max}
            if rsm_req.after is not None:
                kwargs["after_id"] = rsm_req.after
            elif rsm_req.before is not None:
                kwargs["chronological_pagination"] = False
                if rsm_req.before != "":
                    kwargs["after_id"] = rsm_req.before
            elif rsm_req.index is not None:
                kwargs["start_index"] = rsm_req.index

        log.info(
            f"No cache found for node {node} at {service} (AP account {ap_account}), "
            "using Collection Paging to RSM translation"
        )
        return await self.apg.getAPItems(ap_account, **kwargs)

    @ensure_deferred
    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
        raise NotImplementedError

    def getNodeInfo(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str,
        pep: bool = False,
        recipient: Optional[jid.JID] = None
    ) -> Optional[dict]:
        if not nodeIdentifier:
            return None
        info = {
            "type": "leaf",
            "meta-data": [
                {"var": "pubsub#persist_items", "type": "boolean", "value": True},
                {"var": "pubsub#max_items", "value": "max"},
                {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
                {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},

            ]

        }
        return info