Mercurial > libervia-backend
view sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3743:54c249ec35ce
core (memory/migration): ignore FTS table when autogenerating script for migration:
SQLite Full-Text Search stable are not associated to Python object and can't be detected
by Alembic. To avoid the generation of unwanted drop commands, they are now ignored when
autogenerating migration scripts.
rel 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | 86eea17cafa7 |
children | a8c7e5cef0cb |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Dict, List import json from urllib import parse import re import unicodedata from twisted.web import http, resource as web_resource, server from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from wokkel import pubsub, rsm from sat.core import exceptions from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX, AP_REQUEST_TYPES, PAGE_SIZE) log = getLogger(__name__) VERSION = unicodedata.normalize( 'NFKD', f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" ) class HTTPAPGServer(web_resource.Resource): """HTTP Server handling ActivityPub S2S protocol""" isLeaf = True def __init__(self, ap_gateway): self.apg = ap_gateway super().__init__() async def webfinger(self, request): url_parsed = parse.urlparse(request.uri.decode()) query = parse.parse_qs(url_parsed.query) resource = query.get("resource", [""])[0] account = resource[5:].strip() if not resource.startswith("acct:") or not account: return web_resource.ErrorPage( http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" ).render(request) actor_url = self.apg.buildAPURL(TYPE_ACTOR, account) resp = { "subject": resource, "links": [ { "rel": "self", "type": "application/activity+json", "href": actor_url } ] } request.setHeader("content-type", CONTENT_TYPE_AP) request.write(json.dumps(resp).encode()) request.finish() async def APActorRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, actor_url: str ) -> dict: inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) # we have to use AP account as preferredUsername because it is used to retrieve # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) preferred_username = ap_account.split("@", 1)[0] return { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1" ], "id": actor_url, "type": "Person", "preferredUsername": preferred_username, "inbox": inbox_url, "outbox": outbox_url, "publicKey": { "id": f"{actor_url}#main-key", "owner": actor_url, "publicKeyPem": self.apg.public_key_pem } } def getCanonicalURL(self, request: "HTTPRequest") -> str: return parse.urljoin( f"https://{self.apg.public_url}", request.path.decode().rstrip("/") ) def queryData2RSMRequest( self, query_data: Dict[str, List[str]] ) -> rsm.RSMRequest: """Get RSM kwargs to use with RSMRequest from query data""" page = query_data.get("page") if page == ["first"]: return rsm.RSMRequest(max_=PAGE_SIZE, before="") elif page == ["last"]: return rsm.RSMRequest(max_=PAGE_SIZE) else: for query_key in ("index", "before", "after"): try: kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} except (KeyError, IndexError, ValueError): pass else: return rsm.RSMRequest(**kwargs) raise ValueError(f"Invalid query data: {query_data!r}") async def APOutboxPageRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, query_data: Dict[str, List[str]] ) -> dict: # we only keep useful keys, and sort to have consistent URL which can # be used as ID url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) query_data = {k: query_data[k] for k in url_keys} try: items, metadata = await self.apg._p.getItems( client=self.apg.client, service=account_jid, node=node, rsm_request=self.queryData2RSMRequest(query_data), extra = {C.KEY_USE_CACHE: False} ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") return {} base_url = self.getCanonicalURL(request) url = f"{base_url}?{parse.urlencode(query_data, True)}" data = { "@context": "https://www.w3.org/ns/activitystreams", "id": url, "type": "OrderedCollectionPage", "partOf": base_url, "orderedItems" : [ await self.apg.mbdata2APitem( self.apg.client, await self.apg._m.item2mbdata( self.apg.client, item, account_jid, node ) ) for item in reversed(items) ] } # AP OrderedCollection must be in reversed chronological order, thus the opposite # of what we get with RSM (at least with Libervia Pubsub) if not metadata["complete"]: try: last= metadata["rsm"]["last"] except KeyError: last = None data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" if metadata["rsm"]["index"] != 0: try: first= metadata["rsm"]["first"] except KeyError: first = None data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" return data async def APOutboxRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str ) -> dict: if node is None: node = self.apg._m.namespace parsed_url = parse.urlparse(request.uri.decode()) query_data = parse.parse_qs(parsed_url.query) if query_data: return await self.APOutboxPageRequest( request, account_jid, node, ap_account, ap_url, query_data ) # XXX: we can't use disco#info here because this request won't work on a bare jid # due to security considerations of XEP-0030 (we don't have presence # subscription). # The current workaround is to do a request as if RSM was available, and actually # check its availability according to result. try: __, metadata = await self.apg._p.getItems( client=self.apg.client, service=account_jid, node=node, max_items=0, rsm_request=rsm.RSMRequest(max_=0) ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") return {} try: items_count = metadata["rsm"]["count"] except KeyError: log.warning( f"No RSM metadata found when requesting pubsub node {node} at " f"{account_jid}, defaulting to items_count=20" ) items_count = 20 url = self.getCanonicalURL(request) url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" return { "@context": "https://www.w3.org/ns/activitystreams", "id": url, "totalItems": items_count, "type": "OrderedCollection", "first": url_first_page, "last": url_last_page, } async def APRequest(self, request): path = request.path.decode() ap_url = parse.urljoin( f"https://{self.apg.public_url}", path ) request_type, ap_account = self.apg.parseAPURL(ap_url) account_jid, node = await self.apg.getJIDAndNode(ap_account) if request_type not in AP_REQUEST_TYPES: raise exceptions.DataError(f"Invalid request type: {request_type!r}") method = getattr(self, f"AP{request_type.title()}Request") ret_data = await method(request, account_jid, node, ap_account, ap_url) request.setHeader("content-type", CONTENT_TYPE_AP) request.write(json.dumps(ret_data).encode()) request.finish() def render(self, request): request.setHeader("server", VERSION) return super().render(request) def render_GET(self, request): path = request.path.decode().lstrip("/") if path.startswith(".well-known/webfinger"): defer.ensureDeferred(self.webfinger(request)) return server.NOT_DONE_YET elif path.startswith(self.apg.ap_path): defer.ensureDeferred(self.APRequest(request)) return server.NOT_DONE_YET return web_resource.NoResource().render(request) class HTTPRequest(server.Request): pass class HTTPServer(server.Site): requestFactory = HTTPRequest def __init__(self, ap_gateway): super().__init__(HTTPAPGServer(ap_gateway))