diff sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3729:86eea17cafa7

component AP gateway: split plugin in several files: constants, HTTP server and Pubsub service have been put in separated files. rel: 363
author Goffi <goffi@goffi.org>
date Mon, 31 Jan 2022 18:35:49 +0100
parents sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d
children a8c7e5cef0cb
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Mon Jan 31 18:35:49 2022 +0100
@@ -0,0 +1,129 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Optional, List
+
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+from wokkel import rsm
+
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.tools.utils import ensure_deferred
+
+
+log = getLogger(__name__)
+
+
+class APPubsubService(rsm.PubSubService):
+    """Pubsub service for XMPP requests"""
+
+    def __init__(self, apg):
+        super(APPubsubService, self).__init__()
+        self.host = apg.host
+        self.apg = apg
+        self.discoIdentity = {
+            "category": "pubsub",
+            "type": "service",
+            "name": "Libervia ActivityPub Gateway",
+        }
+
+    @ensure_deferred
+    async def publish(self, requestor, service, nodeIdentifier, items):
+        raise NotImplementedError
+
+    @ensure_deferred
+    async def items(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        node: str,
+        maxItems: Optional[int],
+        itemIdentifiers: Optional[List[str]],
+        rsm_req: Optional[rsm.RSMRequest]
+    ) -> List[domish.Element]:
+        if not service.user:
+            return []
+        ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
+        if ap_account.count("@") != 1:
+            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
+            return []
+        if node != self.apg._m.namespace:
+            raise error.StanzaError(
+                "feature-not-implemented",
+                text=f"{VERSION} only supports {self.apg._m.namespace} "
+                "node for now"
+            )
+        if rsm_req is None:
+            if maxItems is None:
+                maxItems = 20
+            kwargs = {
+                "max_items": maxItems,
+                "chronological_pagination": False,
+            }
+        else:
+            if len(
+                [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
+                 if v is not None]
+            ) > 1:
+                raise error.StanzaError(
+                    "bad-request",
+                    text="You can't use after, before and index at the same time"
+                )
+            kwargs = {"max_items": rsm_req.max}
+            if rsm_req.after is not None:
+                kwargs["after_id"] = rsm_req.after
+            elif rsm_req.before is not None:
+                kwargs["chronological_pagination"] = False
+                if rsm_req.before != "":
+                    kwargs["after_id"] = rsm_req.before
+            elif rsm_req.index is not None:
+                kwargs["start_index"] = rsm_req.index
+
+        log.info(
+            f"No cache found for node {node} at {service} (AP account {ap_account}), "
+            "using Collection Paging to RSM translation"
+        )
+        return await self.apg.getAPItems(ap_account, **kwargs)
+
+    @ensure_deferred
+    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
+        raise NotImplementedError
+
+    def getNodeInfo(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        nodeIdentifier: str,
+        pep: bool = False,
+        recipient: Optional[jid.JID] = None
+    ) -> Optional[dict]:
+        if not nodeIdentifier:
+            return None
+        info = {
+            "type": "leaf",
+            "meta-data": [
+                {"var": "pubsub#persist_items", "type": "boolean", "value": True},
+                {"var": "pubsub#max_items", "value": "max"},
+                {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
+                {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
+
+            ]
+
+        }
+        return info