Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3729:86eea17cafa7
component AP gateway: split plugin in several files:
constants, HTTP server and Pubsub service have been put in separated files.
rel: 363
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 31 Jan 2022 18:35:49 +0100 |
parents | sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d |
children | a8c7e5cef0cb |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Mon Jan 31 18:35:49 2022 +0100 @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Optional, Dict, List +import json +from urllib import parse +import re +import unicodedata + +from twisted.web import http, resource as web_resource, server +from twisted.internet import defer +from twisted.words.protocols.jabber import jid, error +from wokkel import pubsub, rsm + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.i18n import _ +from sat.core.log import getLogger + +from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX, + AP_REQUEST_TYPES, PAGE_SIZE) + + +log = getLogger(__name__) + +VERSION = unicodedata.normalize( + 'NFKD', + f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" +) + + +class HTTPAPGServer(web_resource.Resource): + """HTTP Server handling ActivityPub S2S protocol""" + isLeaf = True + + def __init__(self, ap_gateway): + self.apg = ap_gateway + super().__init__() + + async def webfinger(self, request): + url_parsed = parse.urlparse(request.uri.decode()) + query = parse.parse_qs(url_parsed.query) + resource = query.get("resource", [""])[0] + account = resource[5:].strip() + if not resource.startswith("acct:") or not account: + return web_resource.ErrorPage( + http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" + ).render(request) + + actor_url = self.apg.buildAPURL(TYPE_ACTOR, account) + + resp = { + "subject": resource, + "links": [ + { + "rel": "self", + "type": "application/activity+json", + "href": actor_url + } + ] + } + request.setHeader("content-type", CONTENT_TYPE_AP) + request.write(json.dumps(resp).encode()) + request.finish() + + async def APActorRequest( + self, + request: "HTTPRequest", + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + actor_url: str + ) -> dict: + inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) + outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) + + # we have to use AP account as preferredUsername because it is used to retrieve + # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) + preferred_username = ap_account.split("@", 1)[0] + return { + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1" + ], + + "id": actor_url, + "type": "Person", + "preferredUsername": preferred_username, + "inbox": inbox_url, + "outbox": outbox_url, + "publicKey": { + "id": f"{actor_url}#main-key", + "owner": actor_url, + "publicKeyPem": self.apg.public_key_pem + } + } + + def getCanonicalURL(self, request: "HTTPRequest") -> str: + return parse.urljoin( + f"https://{self.apg.public_url}", + request.path.decode().rstrip("/") + ) + + def queryData2RSMRequest( + self, + query_data: Dict[str, List[str]] + ) -> rsm.RSMRequest: + """Get RSM kwargs to use with RSMRequest from query data""" + page = query_data.get("page") + + if page == ["first"]: + return rsm.RSMRequest(max_=PAGE_SIZE, before="") + elif page == ["last"]: + return rsm.RSMRequest(max_=PAGE_SIZE) + else: + for query_key in ("index", "before", "after"): + try: + kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} + except (KeyError, IndexError, ValueError): + pass + else: + return rsm.RSMRequest(**kwargs) + raise ValueError(f"Invalid query data: {query_data!r}") + + async def APOutboxPageRequest( + self, + request: "HTTPRequest", + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + query_data: Dict[str, List[str]] + ) -> dict: + # we only keep useful keys, and sort to have consistent URL which can + # be used as ID + url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) + query_data = {k: query_data[k] for k in url_keys} + try: + items, metadata = await self.apg._p.getItems( + client=self.apg.client, + service=account_jid, + node=node, + rsm_request=self.queryData2RSMRequest(query_data), + extra = {C.KEY_USE_CACHE: False} + ) + except error.StanzaError as e: + log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") + return {} + + base_url = self.getCanonicalURL(request) + url = f"{base_url}?{parse.urlencode(query_data, True)}" + data = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": url, + "type": "OrderedCollectionPage", + "partOf": base_url, + "orderedItems" : [ + await self.apg.mbdata2APitem( + self.apg.client, + await self.apg._m.item2mbdata( + self.apg.client, + item, + account_jid, + node + ) + ) + for item in reversed(items) + ] + } + + # AP OrderedCollection must be in reversed chronological order, thus the opposite + # of what we get with RSM (at least with Libervia Pubsub) + if not metadata["complete"]: + try: + last= metadata["rsm"]["last"] + except KeyError: + last = None + data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" + if metadata["rsm"]["index"] != 0: + try: + first= metadata["rsm"]["first"] + except KeyError: + first = None + data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" + + return data + + async def APOutboxRequest( + self, + request: "HTTPRequest", + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str + ) -> dict: + if node is None: + node = self.apg._m.namespace + + parsed_url = parse.urlparse(request.uri.decode()) + query_data = parse.parse_qs(parsed_url.query) + if query_data: + return await self.APOutboxPageRequest( + request, account_jid, node, ap_account, ap_url, query_data + ) + + # XXX: we can't use disco#info here because this request won't work on a bare jid + # due to security considerations of XEP-0030 (we don't have presence + # subscription). + # The current workaround is to do a request as if RSM was available, and actually + # check its availability according to result. + try: + __, metadata = await self.apg._p.getItems( + client=self.apg.client, + service=account_jid, + node=node, + max_items=0, + rsm_request=rsm.RSMRequest(max_=0) + ) + except error.StanzaError as e: + log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") + return {} + try: + items_count = metadata["rsm"]["count"] + except KeyError: + log.warning( + f"No RSM metadata found when requesting pubsub node {node} at " + f"{account_jid}, defaulting to items_count=20" + ) + items_count = 20 + + url = self.getCanonicalURL(request) + url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" + url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" + return { + "@context": "https://www.w3.org/ns/activitystreams", + "id": url, + "totalItems": items_count, + "type": "OrderedCollection", + "first": url_first_page, + "last": url_last_page, + } + + async def APRequest(self, request): + path = request.path.decode() + ap_url = parse.urljoin( + f"https://{self.apg.public_url}", + path + ) + request_type, ap_account = self.apg.parseAPURL(ap_url) + account_jid, node = await self.apg.getJIDAndNode(ap_account) + if request_type not in AP_REQUEST_TYPES: + raise exceptions.DataError(f"Invalid request type: {request_type!r}") + method = getattr(self, f"AP{request_type.title()}Request") + ret_data = await method(request, account_jid, node, ap_account, ap_url) + request.setHeader("content-type", CONTENT_TYPE_AP) + request.write(json.dumps(ret_data).encode()) + request.finish() + + def render(self, request): + request.setHeader("server", VERSION) + return super().render(request) + + def render_GET(self, request): + path = request.path.decode().lstrip("/") + if path.startswith(".well-known/webfinger"): + defer.ensureDeferred(self.webfinger(request)) + return server.NOT_DONE_YET + elif path.startswith(self.apg.ap_path): + defer.ensureDeferred(self.APRequest(request)) + return server.NOT_DONE_YET + + return web_resource.NoResource().render(request) + + +class HTTPRequest(server.Request): + pass + + +class HTTPServer(server.Site): + requestFactory = HTTPRequest + + def __init__(self, ap_gateway): + super().__init__(HTTPAPGServer(ap_gateway))