view sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3743:54c249ec35ce

core (memory/migration): ignore FTS table when autogenerating script for migration: SQLite Full-Text Search stable are not associated to Python object and can't be detected by Alembic. To avoid the generation of unwanted drop commands, they are now ignored when autogenerating migration scripts. rel 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 86eea17cafa7
children a8c7e5cef0cb
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional, List

from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from wokkel import rsm

from sat.core.i18n import _
from sat.core.log import getLogger
from sat.tools.utils import ensure_deferred


log = getLogger(__name__)


class APPubsubService(rsm.PubSubService):
    """Pubsub service for XMPP requests"""

    def __init__(self, apg):
        super(APPubsubService, self).__init__()
        self.host = apg.host
        self.apg = apg
        self.discoIdentity = {
            "category": "pubsub",
            "type": "service",
            "name": "Libervia ActivityPub Gateway",
        }

    @ensure_deferred
    async def publish(self, requestor, service, nodeIdentifier, items):
        raise NotImplementedError

    @ensure_deferred
    async def items(
        self,
        requestor: jid.JID,
        service: jid.JID,
        node: str,
        maxItems: Optional[int],
        itemIdentifiers: Optional[List[str]],
        rsm_req: Optional[rsm.RSMRequest]
    ) -> List[domish.Element]:
        if not service.user:
            return []
        ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
        if ap_account.count("@") != 1:
            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
            return []
        if node != self.apg._m.namespace:
            raise error.StanzaError(
                "feature-not-implemented",
                text=f"{VERSION} only supports {self.apg._m.namespace} "
                "node for now"
            )
        if rsm_req is None:
            if maxItems is None:
                maxItems = 20
            kwargs = {
                "max_items": maxItems,
                "chronological_pagination": False,
            }
        else:
            if len(
                [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
                 if v is not None]
            ) > 1:
                raise error.StanzaError(
                    "bad-request",
                    text="You can't use after, before and index at the same time"
                )
            kwargs = {"max_items": rsm_req.max}
            if rsm_req.after is not None:
                kwargs["after_id"] = rsm_req.after
            elif rsm_req.before is not None:
                kwargs["chronological_pagination"] = False
                if rsm_req.before != "":
                    kwargs["after_id"] = rsm_req.before
            elif rsm_req.index is not None:
                kwargs["start_index"] = rsm_req.index

        log.info(
            f"No cache found for node {node} at {service} (AP account {ap_account}), "
            "using Collection Paging to RSM translation"
        )
        return await self.apg.getAPItems(ap_account, **kwargs)

    @ensure_deferred
    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
        raise NotImplementedError

    def getNodeInfo(
        self,
        requestor: jid.JID,
        service: jid.JID,
        nodeIdentifier: str,
        pep: bool = False,
        recipient: Optional[jid.JID] = None
    ) -> Optional[dict]:
        if not nodeIdentifier:
            return None
        info = {
            "type": "leaf",
            "meta-data": [
                {"var": "pubsub#persist_items", "type": "boolean", "value": True},
                {"var": "pubsub#max_items", "value": "max"},
                {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
                {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},

            ]

        }
        return info