diff sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3729:86eea17cafa7

component AP gateway: split plugin in several files: constants, HTTP server and Pubsub service have been put in separated files. rel: 363
author Goffi <goffi@goffi.org>
date Mon, 31 Jan 2022 18:35:49 +0100
parents sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d
children a8c7e5cef0cb
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Mon Jan 31 18:35:49 2022 +0100
@@ -0,0 +1,298 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Optional, Dict, List
+import json
+from urllib import parse
+import re
+import unicodedata
+
+from twisted.web import http, resource as web_resource, server
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid, error
+from wokkel import pubsub, rsm
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.i18n import _
+from sat.core.log import getLogger
+
+from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX,
+                       AP_REQUEST_TYPES, PAGE_SIZE)
+
+
+log = getLogger(__name__)
+
+VERSION = unicodedata.normalize(
+    'NFKD',
+    f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
+)
+
+
+class HTTPAPGServer(web_resource.Resource):
+    """HTTP Server handling ActivityPub S2S protocol"""
+    isLeaf = True
+
+    def __init__(self, ap_gateway):
+        self.apg = ap_gateway
+        super().__init__()
+
+    async def webfinger(self, request):
+        url_parsed = parse.urlparse(request.uri.decode())
+        query = parse.parse_qs(url_parsed.query)
+        resource = query.get("resource", [""])[0]
+        account = resource[5:].strip()
+        if not resource.startswith("acct:") or not account:
+            return web_resource.ErrorPage(
+                http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
+            ).render(request)
+
+        actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)
+
+        resp = {
+            "subject": resource,
+            "links": [
+                {
+                    "rel": "self",
+                    "type": "application/activity+json",
+                    "href": actor_url
+                }
+            ]
+        }
+        request.setHeader("content-type", CONTENT_TYPE_AP)
+        request.write(json.dumps(resp).encode())
+        request.finish()
+
+    async def APActorRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        actor_url: str
+    ) -> dict:
+        inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
+        outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
+
+        # we have to use AP account as preferredUsername because it is used to retrieve
+        # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
+        preferred_username = ap_account.split("@", 1)[0]
+        return {
+            "@context": [
+                "https://www.w3.org/ns/activitystreams",
+                "https://w3id.org/security/v1"
+            ],
+
+            "id": actor_url,
+            "type": "Person",
+            "preferredUsername": preferred_username,
+            "inbox": inbox_url,
+            "outbox": outbox_url,
+            "publicKey": {
+                "id": f"{actor_url}#main-key",
+                "owner": actor_url,
+                "publicKeyPem": self.apg.public_key_pem
+            }
+        }
+
+    def getCanonicalURL(self, request: "HTTPRequest") -> str:
+        return parse.urljoin(
+            f"https://{self.apg.public_url}",
+            request.path.decode().rstrip("/")
+        )
+
+    def queryData2RSMRequest(
+        self,
+        query_data: Dict[str, List[str]]
+    ) -> rsm.RSMRequest:
+        """Get RSM kwargs to use with RSMRequest from query data"""
+        page = query_data.get("page")
+
+        if page == ["first"]:
+            return rsm.RSMRequest(max_=PAGE_SIZE, before="")
+        elif page == ["last"]:
+            return rsm.RSMRequest(max_=PAGE_SIZE)
+        else:
+            for query_key in ("index", "before", "after"):
+                try:
+                    kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
+                except (KeyError, IndexError, ValueError):
+                    pass
+                else:
+                    return rsm.RSMRequest(**kwargs)
+        raise ValueError(f"Invalid query data: {query_data!r}")
+
+    async def APOutboxPageRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        query_data: Dict[str, List[str]]
+    ) -> dict:
+        # we only keep useful keys, and sort to have consistent URL which can
+        # be used as ID
+        url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
+        query_data = {k: query_data[k] for k in url_keys}
+        try:
+            items, metadata = await self.apg._p.getItems(
+                client=self.apg.client,
+                service=account_jid,
+                node=node,
+                rsm_request=self.queryData2RSMRequest(query_data),
+                extra = {C.KEY_USE_CACHE: False}
+            )
+        except error.StanzaError as e:
+            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
+            return {}
+
+        base_url = self.getCanonicalURL(request)
+        url = f"{base_url}?{parse.urlencode(query_data, True)}"
+        data = {
+            "@context": "https://www.w3.org/ns/activitystreams",
+            "id": url,
+            "type": "OrderedCollectionPage",
+            "partOf": base_url,
+            "orderedItems" : [
+                await self.apg.mbdata2APitem(
+                    self.apg.client,
+                    await self.apg._m.item2mbdata(
+                        self.apg.client,
+                        item,
+                        account_jid,
+                        node
+                    )
+                )
+                for item in reversed(items)
+            ]
+        }
+
+        # AP OrderedCollection must be in reversed chronological order, thus the opposite
+        # of what we get with RSM (at least with Libervia Pubsub)
+        if not metadata["complete"]:
+            try:
+                last= metadata["rsm"]["last"]
+            except KeyError:
+                last = None
+            data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
+        if metadata["rsm"]["index"] != 0:
+            try:
+                first= metadata["rsm"]["first"]
+            except KeyError:
+                first = None
+            data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"
+
+        return data
+
+    async def APOutboxRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str
+    ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
+
+        parsed_url = parse.urlparse(request.uri.decode())
+        query_data = parse.parse_qs(parsed_url.query)
+        if query_data:
+            return await self.APOutboxPageRequest(
+                request, account_jid, node, ap_account, ap_url, query_data
+            )
+
+        # XXX: we can't use disco#info here because this request won't work on a bare jid
+        # due to security considerations of XEP-0030 (we don't have presence
+        # subscription).
+        # The current workaround is to do a request as if RSM was available, and actually
+        # check its availability according to result.
+        try:
+            __, metadata = await self.apg._p.getItems(
+                client=self.apg.client,
+                service=account_jid,
+                node=node,
+                max_items=0,
+                rsm_request=rsm.RSMRequest(max_=0)
+            )
+        except error.StanzaError as e:
+            log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
+            return {}
+        try:
+            items_count = metadata["rsm"]["count"]
+        except KeyError:
+            log.warning(
+                f"No RSM metadata found when requesting pubsub node {node} at "
+                f"{account_jid}, defaulting to items_count=20"
+            )
+            items_count = 20
+
+        url = self.getCanonicalURL(request)
+        url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
+        url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
+        return {
+            "@context": "https://www.w3.org/ns/activitystreams",
+            "id": url,
+            "totalItems": items_count,
+            "type": "OrderedCollection",
+            "first": url_first_page,
+            "last": url_last_page,
+        }
+
+    async def APRequest(self, request):
+        path = request.path.decode()
+        ap_url = parse.urljoin(
+            f"https://{self.apg.public_url}",
+            path
+        )
+        request_type, ap_account = self.apg.parseAPURL(ap_url)
+        account_jid, node = await self.apg.getJIDAndNode(ap_account)
+        if request_type not in AP_REQUEST_TYPES:
+            raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+        method = getattr(self, f"AP{request_type.title()}Request")
+        ret_data = await method(request, account_jid, node, ap_account, ap_url)
+        request.setHeader("content-type", CONTENT_TYPE_AP)
+        request.write(json.dumps(ret_data).encode())
+        request.finish()
+
+    def render(self, request):
+        request.setHeader("server", VERSION)
+        return super().render(request)
+
+    def render_GET(self, request):
+        path = request.path.decode().lstrip("/")
+        if path.startswith(".well-known/webfinger"):
+            defer.ensureDeferred(self.webfinger(request))
+            return server.NOT_DONE_YET
+        elif path.startswith(self.apg.ap_path):
+            defer.ensureDeferred(self.APRequest(request))
+            return server.NOT_DONE_YET
+
+        return web_resource.NoResource().render(request)
+
+
+class HTTPRequest(server.Request):
+    pass
+
+
+class HTTPServer(server.Site):
+    requestFactory = HTTPRequest
+
+    def __init__(self, ap_gateway):
+        super().__init__(HTTPAPGServer(ap_gateway))