Mercurial > libervia-backend
view sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3729:86eea17cafa7
component AP gateway: split plugin in several files:
constants, HTTP server and Pubsub service have been put in separated files.
rel: 363
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 31 Jan 2022 18:35:49 +0100 |
parents | sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d |
children | a8c7e5cef0cb |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Dict, List import json from urllib import parse import re import unicodedata from twisted.web import http, resource as web_resource, server from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from wokkel import pubsub, rsm from sat.core import exceptions from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX, AP_REQUEST_TYPES, PAGE_SIZE) log = getLogger(__name__) VERSION = unicodedata.normalize( 'NFKD', f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" ) class HTTPAPGServer(web_resource.Resource): """HTTP Server handling ActivityPub S2S protocol""" isLeaf = True def __init__(self, ap_gateway): self.apg = ap_gateway super().__init__() async def webfinger(self, request): url_parsed = parse.urlparse(request.uri.decode()) query = parse.parse_qs(url_parsed.query) resource = query.get("resource", [""])[0] account = resource[5:].strip() if not resource.startswith("acct:") or not account: return web_resource.ErrorPage( http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" ).render(request) actor_url = self.apg.buildAPURL(TYPE_ACTOR, account) resp = { "subject": resource, "links": [ { "rel": "self", "type": "application/activity+json", "href": actor_url } ] } request.setHeader("content-type", CONTENT_TYPE_AP) request.write(json.dumps(resp).encode()) request.finish() async def APActorRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, actor_url: str ) -> dict: inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) # we have to use AP account as preferredUsername because it is used to retrieve # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) preferred_username = ap_account.split("@", 1)[0] return { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1" ], "id": actor_url, "type": "Person", "preferredUsername": preferred_username, "inbox": inbox_url, "outbox": outbox_url, "publicKey": { "id": f"{actor_url}#main-key", "owner": actor_url, "publicKeyPem": self.apg.public_key_pem } } def getCanonicalURL(self, request: "HTTPRequest") -> str: return parse.urljoin( f"https://{self.apg.public_url}", request.path.decode().rstrip("/") ) def queryData2RSMRequest( self, query_data: Dict[str, List[str]] ) -> rsm.RSMRequest: """Get RSM kwargs to use with RSMRequest from query data""" page = query_data.get("page") if page == ["first"]: return rsm.RSMRequest(max_=PAGE_SIZE, before="") elif page == ["last"]: return rsm.RSMRequest(max_=PAGE_SIZE) else: for query_key in ("index", "before", "after"): try: kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} except (KeyError, IndexError, ValueError): pass else: return rsm.RSMRequest(**kwargs) raise ValueError(f"Invalid query data: {query_data!r}") async def APOutboxPageRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, query_data: Dict[str, List[str]] ) -> dict: # we only keep useful keys, and sort to have consistent URL which can # be used as ID url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) query_data = {k: query_data[k] for k in url_keys} try: items, metadata = await self.apg._p.getItems( client=self.apg.client, service=account_jid, node=node, rsm_request=self.queryData2RSMRequest(query_data), extra = {C.KEY_USE_CACHE: False} ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") return {} base_url = self.getCanonicalURL(request) url = f"{base_url}?{parse.urlencode(query_data, True)}" data = { "@context": "https://www.w3.org/ns/activitystreams", "id": url, "type": "OrderedCollectionPage", "partOf": base_url, "orderedItems" : [ await self.apg.mbdata2APitem( self.apg.client, await self.apg._m.item2mbdata( self.apg.client, item, account_jid, node ) ) for item in reversed(items) ] } # AP OrderedCollection must be in reversed chronological order, thus the opposite # of what we get with RSM (at least with Libervia Pubsub) if not metadata["complete"]: try: last= metadata["rsm"]["last"] except KeyError: last = None data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" if metadata["rsm"]["index"] != 0: try: first= metadata["rsm"]["first"] except KeyError: first = None data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" return data async def APOutboxRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str ) -> dict: if node is None: node = self.apg._m.namespace parsed_url = parse.urlparse(request.uri.decode()) query_data = parse.parse_qs(parsed_url.query) if query_data: return await self.APOutboxPageRequest( request, account_jid, node, ap_account, ap_url, query_data ) # XXX: we can't use disco#info here because this request won't work on a bare jid # due to security considerations of XEP-0030 (we don't have presence # subscription). # The current workaround is to do a request as if RSM was available, and actually # check its availability according to result. try: __, metadata = await self.apg._p.getItems( client=self.apg.client, service=account_jid, node=node, max_items=0, rsm_request=rsm.RSMRequest(max_=0) ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") return {} try: items_count = metadata["rsm"]["count"] except KeyError: log.warning( f"No RSM metadata found when requesting pubsub node {node} at " f"{account_jid}, defaulting to items_count=20" ) items_count = 20 url = self.getCanonicalURL(request) url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" return { "@context": "https://www.w3.org/ns/activitystreams", "id": url, "totalItems": items_count, "type": "OrderedCollection", "first": url_first_page, "last": url_last_page, } async def APRequest(self, request): path = request.path.decode() ap_url = parse.urljoin( f"https://{self.apg.public_url}", path ) request_type, ap_account = self.apg.parseAPURL(ap_url) account_jid, node = await self.apg.getJIDAndNode(ap_account) if request_type not in AP_REQUEST_TYPES: raise exceptions.DataError(f"Invalid request type: {request_type!r}") method = getattr(self, f"AP{request_type.title()}Request") ret_data = await method(request, account_jid, node, ap_account, ap_url) request.setHeader("content-type", CONTENT_TYPE_AP) request.write(json.dumps(ret_data).encode()) request.finish() def render(self, request): request.setHeader("server", VERSION) return super().render(request) def render_GET(self, request): path = request.path.decode().lstrip("/") if path.startswith(".well-known/webfinger"): defer.ensureDeferred(self.webfinger(request)) return server.NOT_DONE_YET elif path.startswith(self.apg.ap_path): defer.ensureDeferred(self.APRequest(request)) return server.NOT_DONE_YET return web_resource.NoResource().render(request) class HTTPRequest(server.Request): pass class HTTPServer(server.Site): requestFactory = HTTPRequest def __init__(self, ap_gateway): super().__init__(HTTPAPGServer(ap_gateway))