Mercurial > libervia-backend
view sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3888:aa7197b67c26
component AP gateway: AP <=> XMPP reactions conversions:
- Pubsub Attachments plugin has been renamed to XEP-0470 following publication
- XEP-0470 has been updated to follow 0.2 changes
- AP reactions (as implemented in Pleroma) are converted to XEP-0470
- XEP-0470 events are converted to AP reactions (again, using "EmojiReact" from Pleroma)
- AP activities related to attachments (like/reactions) are cached in Libervia because
it's not possible to retrieve them from Pleroma instances once they have been emitted
(doing an HTTP get on their ID returns a 404). For now those cache are not flushed, this
should be improved in the future.
- `sharedInbox` is used when available. Pleroma returns a 500 HTTP error when ``to`` or
``cc`` are used in a direct inbox.
- reactions and like are not currently used for direct messages, because they can't be
emitted from Pleroma in this case, thus there is no point in implementing them for the
moment.
rel 371
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 31 Aug 2022 17:07:03 +0200 |
parents | cea52400623d |
children | 0aa7023dcd08 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time import html from typing import Optional, Dict, List, Any import json from urllib import parse from collections import deque import unicodedata from twisted.web import http, resource as web_resource, server from twisted.web import static from twisted.python import failure from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from wokkel import pubsub, rsm from sat.core import exceptions from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.core_types import SatXMPPEntity from sat.core.log import getLogger from sat.tools.common import date_utils, uri from sat.memory.sqla_mapping import SubscriptionState from .constants import ( NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE ) from .regex import RE_SIG_PARAM log = getLogger(__name__) VERSION = unicodedata.normalize( 'NFKD', f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" ) class HTTPAPGServer(web_resource.Resource): """HTTP Server handling ActivityPub S2S protocol""" isLeaf = True def __init__(self, ap_gateway): self.apg = ap_gateway self._seen_digest = deque(maxlen=50) super().__init__() def responseCode( self, request: "HTTPRequest", http_code: int, msg: Optional[str] = None ) -> None: """Log and set HTTP return code and associated message""" if msg is not None: log.warning(msg) request.setResponseCode(http_code, None if msg is None else msg.encode()) def _onRequestError(self, failure_: failure.Failure, request: "HTTPRequest") -> None: log.error(f"Internal error: {failure_.value}") self.responseCode( request, http.INTERNAL_SERVER_ERROR, f"internal error: {failure_.value}" ) request.finish() async def webfinger(self, request): url_parsed = parse.urlparse(request.uri.decode()) query = parse.parse_qs(url_parsed.query) resource = query.get("resource", [""])[0] account = resource[5:].strip() if not resource.startswith("acct:") or not account: return web_resource.ErrorPage( http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" ).render(request) actor_url = self.apg.buildAPURL(TYPE_ACTOR, account) resp = { "aliases": [actor_url], "subject": resource, "links": [ { "rel": "self", "type": "application/activity+json", "href": actor_url } ] } request.setHeader("content-type", CONTENT_TYPE_AP) request.write(json.dumps(resp).encode()) request.finish() async def handleUndoActivity( self, request: "HTTPRequest", data: dict, account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: str ) -> None: if node is None: node = self.apg._m.namespace client = await self.apg.getVirtualClient(signing_actor) object_ = data.get("object") if isinstance(object_, str): # we check first if it's not a cached object ap_cache_key = f"{ST_AP_CACHE}{object_}" value = await self.apg.client._ap_storage.get(ap_cache_key) else: value = None if value is not None: objects = [value] # because we'll undo the activity, we can remove it from cache await self.apg.client._ap_storage.remove(ap_cache_key) else: objects = await self.apg.apGetList(data, "object") for obj in objects: type_ = obj.get("type") actor = await self.apg.apGetSenderActor(obj) if actor != signing_actor: log.warning(f"ignoring object not attributed to signing actor: {data}") continue if type_ == "Follow": try: target_account = obj["object"] except KeyError: log.warning(f'ignoring invalid object, missing "object" key: {data}') continue if not self.apg.isLocalURL(target_account): log.warning(f"ignoring unfollow request to non local actor: {data}") continue await self.apg._p.unsubscribe( client, account_jid, node, sender=client.jid, ) elif type_ == "Announce": # we can use directly the Announce object, as only the "id" field is # needed await self.apg.newAPDeleteItem(client, None, node, obj) elif type_ == TYPE_LIKE: await self.handleAttachmentItem(client, obj, {"noticed": False}) elif type_ == TYPE_REACTION: await self.handleAttachmentItem(client, obj, { "reactions": {"operation": "update", "remove": [obj["content"]]} }) else: log.warning(f"Unmanaged undo type: {type_!r}") async def handleFollowActivity( self, request: "HTTPRequest", data: dict, account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: str ) -> None: if node is None: node = self.apg._m.namespace client = await self.apg.getVirtualClient(signing_actor) try: subscription = await self.apg._p.subscribe( client, account_jid, node, # subscriptions from AP are always public options=self.apg._pps.setPublicOpt() ) except pubsub.SubscriptionPending: log.info(f"subscription to node {node!r} of {account_jid} is pending") # TODO: manage SubscriptionUnconfigured else: if subscription.state != "subscribed": # other states should raise an Exception raise exceptions.InternalError('"subscribed" state was expected') inbox = await self.apg.getAPInboxFromId(signing_actor, use_shared=False) actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) accept_data = self.apg.createActivity( "Accept", actor_id, object_=data ) await self.apg.signAndPost(inbox, actor_id, accept_data) await self.apg._c.synchronise(client, account_jid, node, resync=False) async def handleAcceptActivity( self, request: "HTTPRequest", data: dict, account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: str ) -> None: if node is None: node = self.apg._m.namespace client = await self.apg.getVirtualClient(signing_actor) objects = await self.apg.apGetList(data, "object") for obj in objects: type_ = obj.get("type") if type_ == "Follow": follow_node = await self.apg.host.memory.storage.getPubsubNode( client, client.jid, node, with_subscriptions=True ) if follow_node is None: log.warning( f"Received a follow accept on an unknown node: {node!r} at " f"{client.jid}. Ignoring it" ) continue try: sub = next( s for s in follow_node.subscriptions if s.subscriber==account_jid ) except StopIteration: log.warning( "Received a follow accept on a node without subscription: " f"{node!r} at {client.jid}. Ignoring it" ) else: if sub.state == SubscriptionState.SUBSCRIBED: log.warning(f"Already subscribed to {node!r} at {client.jid}") elif sub.state == SubscriptionState.PENDING: follow_node.subscribed = True sub.state = SubscriptionState.SUBSCRIBED await self.apg.host.memory.storage.add(follow_node) else: raise exceptions.InternalError( f"Unhandled subscription state {sub.state!r}" ) else: log.warning(f"Unmanaged accept type: {type_!r}") async def handleDeleteActivity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ): if node is None: node = self.apg._m.namespace client = await self.apg.getVirtualClient(signing_actor) objects = await self.apg.apGetList(data, "object") for obj in objects: await self.apg.newAPDeleteItem(client, account_jid, node, obj) async def handleNewAPItems( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], signing_actor: str, repeated: bool = False, ): """Helper method to handle workflow for new AP items accept globally the same parameter as for handleCreateActivity @param repeated: if True, the item is an item republished from somewhere else """ if "_repeated" in data: log.error( '"_repeated" field already present in given AP item, this should not ' f"happen. Ignoring object from {signing_actor}\n{data}" ) raise exceptions.DataError("unexpected field in item") if node is None: node = self.apg._m.namespace client = await self.apg.getVirtualClient(signing_actor) objects = await self.apg.apGetList(data, "object") for obj in objects: sender = await self.apg.apGetSenderActor(obj) if repeated: # we don't check sender when item is repeated, as it should be different # from post author in this case sender_jid = await self.apg.getJIDFromId(sender) repeater_jid = await self.apg.getJIDFromId(signing_actor) repeated_item_id = obj["id"] if self.apg.isLocalURL(repeated_item_id): # the repeated object is from XMPP, we need to parse the URL to find # the right ID url_type, url_args = self.apg.parseAPURL(repeated_item_id) if url_type != "item": raise exceptions.DataError( "local URI is not an item: {repeated_id}" ) try: url_account, url_item_id = url_args if not url_account or not url_item_id: raise ValueError except (RuntimeError, ValueError): raise exceptions.DataError( "local URI is invalid: {repeated_id}" ) else: url_jid, url_node = await self.apg.getJIDAndNode(url_account) if ((url_jid != sender_jid or url_node and url_node != self.apg._m.namespace)): raise exceptions.DataError( "announced ID doesn't match sender ({sender}): " f"[repeated_item_id]" ) repeated_item_id = url_item_id obj["_repeated"] = { "by": repeater_jid.full(), "at": data.get("published"), "uri": uri.buildXMPPUri( "pubsub", path=sender_jid.full(), node=self.apg._m.namespace, item=repeated_item_id ) } # we must use activity's id and targets, not the original item ones for field in ("id", "to", "bto", "cc", "bcc"): obj[field] = data.get(field) else: if sender != signing_actor: log.warning( "Ignoring object not attributed to signing actor: {obj}" ) continue await self.apg.newAPItem(client, account_jid, node, obj) async def handleCreateActivity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ): await self.handleNewAPItems(request, data, account_jid, node, signing_actor) async def handleAnnounceActivity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ): # we create a new item await self.handleNewAPItems( request, data, account_jid, node, signing_actor, repeated=True ) async def handleAttachmentItem( self, client: SatXMPPEntity, data: dict, attachment_data: dict ) -> None: target_ids = data.get("object") if not target_ids: raise exceptions.DataError("object should be set") elif isinstance(target_ids, list): try: target_ids = [o["id"] for o in target_ids] except (KeyError, TypeError): raise exceptions.DataError(f"invalid object: {target_ids!r}") elif isinstance(target_ids, dict): obj_id = target_ids.get("id") if not obj_id or not isinstance(obj_id, str): raise exceptions.DataError(f"invalid object: {target_ids!r}") target_ids = [obj_id] elif isinstance(target_ids, str): target_ids = [target_ids] # XXX: we have to cache AP items because some implementation (Pleroma notably) # don't keep object accessible, and we need to be able to retrieve them for # UNDO. Current implementation will grow, we need to add a way to flush it after # a while. # TODO: add a way to flush old cached AP items. await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data) for target_id in target_ids: if not self.apg.isLocalURL(target_id): log.debug(f"ignoring non local target ID: {target_id}") continue url_type, url_args = self.apg.parseAPURL(target_id) if url_type != TYPE_ITEM: log.warning(f"unexpected local URL for attachment on item {target_id}") continue try: account, item_id = url_args except ValueError: raise ValueError(f"invalid URL: {target_id}") author_jid, item_node = await self.apg.getJIDAndNode(account) if item_node is None: item_node = self.apg._m.namespace attachment_node = self.apg._pa.getAttachmentNodeName( author_jid, item_node, item_id ) cached_node = await self.apg.host.memory.storage.getPubsubNode( client, author_jid, attachment_node, with_subscriptions=True, create=True ) found_items, __ = await self.apg.host.memory.storage.getItems( cached_node, item_ids=[client.jid.userhost()] ) if not found_items: old_item_elt = None else: found_item = found_items[0] old_item_elt = found_item.data item_elt = self.apg._pa.applySetHandler( client, {"extra": attachment_data}, old_item_elt, None ) # we reparse the element, as there can be other attachments attachments_data = self.apg._pa.items2attachmentData(client, [item_elt]) # and we update the cache await self.apg.host.memory.storage.cachePubsubItems( client, cached_node, [item_elt], attachments_data or [{}] ) if self.apg.isVirtualJID(author_jid): # the attachment is on t a virtual pubsub service (linking to an AP item), # we notify all subscribers for subscription in cached_node.subscriptions: if subscription.state != SubscriptionState.SUBSCRIBED: continue self.apg.pubsub_service.notifyPublish( author_jid, attachment_node, [(subscription.subscriber, None, [item_elt])] ) else: # the attachment is on an XMPP item, we publish it to the attachment node await self.apg._p.sendItems( client, author_jid, attachment_node, [item_elt] ) async def handleLikeActivity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ) -> None: client = await self.apg.getVirtualClient(signing_actor) await self.handleAttachmentItem(client, data, {"noticed": True}) async def handleEmojireactActivity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ) -> None: client = await self.apg.getVirtualClient(signing_actor) await self.handleAttachmentItem(client, data, { "reactions": {"operation": "update", "add": [data["content"]]} }) async def APActorRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: Optional[str] ) -> dict: inbox = self.apg.buildAPURL(TYPE_INBOX, ap_account) shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX) outbox = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) followers = self.apg.buildAPURL(TYPE_FOLLOWERS, ap_account) following = self.apg.buildAPURL(TYPE_FOLLOWING, ap_account) # we have to use AP account as preferredUsername because it is used to retrieve # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) preferred_username = ap_account.split("@", 1)[0] identity_data = await self.apg._i.getIdentity(self.apg.client, account_jid) actor_data = { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1" ], "id": ap_url, "type": "Person", "preferredUsername": preferred_username, "inbox": inbox, "outbox": outbox, "followers": followers, "following": following, "publicKey": { "id": f"{ap_url}#main-key", "owner": ap_url, "publicKeyPem": self.apg.public_key_pem }, "endpoints": { "sharedInbox": shared_inbox }, } if identity_data.get("nicknames"): actor_data["name"] = identity_data["nicknames"][0] if identity_data.get("description"): # description is plain text while summary expects HTML actor_data["summary"] = html.escape(identity_data["description"]) if identity_data.get("avatar"): avatar_data = identity_data["avatar"] try: filename = avatar_data["filename"] media_type = avatar_data["media_type"] except KeyError: log.error(f"incomplete avatar data: {identity_data!r}") else: avatar_url = self.apg.buildAPURL("avatar", filename) actor_data["icon"] = { "type": "Image", "url": avatar_url, "mediaType": media_type } return actor_data def getCanonicalURL(self, request: "HTTPRequest") -> str: return parse.urljoin( f"https://{self.apg.public_url}", request.path.decode().rstrip("/") ) def queryData2RSMRequest( self, query_data: Dict[str, List[str]] ) -> rsm.RSMRequest: """Get RSM kwargs to use with RSMRequest from query data""" page = query_data.get("page") if page == ["first"]: return rsm.RSMRequest(max_=PAGE_SIZE, before="") elif page == ["last"]: return rsm.RSMRequest(max_=PAGE_SIZE) else: for query_key in ("index", "before", "after"): try: kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} except (KeyError, IndexError, ValueError): pass else: return rsm.RSMRequest(**kwargs) raise ValueError(f"Invalid query data: {query_data!r}") async def APOutboxPageRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, query_data: Dict[str, List[str]] ) -> dict: if node is None: node = self.apg._m.namespace # we only keep useful keys, and sort to have consistent URL which can # be used as ID url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) query_data = {k: query_data[k] for k in url_keys} try: items, metadata = await self.apg._p.getItems( client=self.apg.client, service=account_jid, node=node, rsm_request=self.queryData2RSMRequest(query_data), extra = {C.KEY_USE_CACHE: False} ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") return {} base_url = self.getCanonicalURL(request) url = f"{base_url}?{parse.urlencode(query_data, True)}" data = { "@context": "https://www.w3.org/ns/activitystreams", "id": url, "type": "OrderedCollectionPage", "partOf": base_url, "orderedItems" : [ await self.apg.mbdata2APitem( self.apg.client, await self.apg._m.item2mbdata( self.apg.client, item, account_jid, node ) ) for item in reversed(items) ] } # AP OrderedCollection must be in reversed chronological order, thus the opposite # of what we get with RSM (at least with Libervia Pubsub) if not metadata["complete"]: try: last= metadata["rsm"]["last"] except KeyError: last = None data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" if metadata["rsm"]["index"] != 0: try: first= metadata["rsm"]["first"] except KeyError: first = None data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" return data async def APOutboxRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: Optional[str] ) -> dict: if node is None: node = self.apg._m.namespace parsed_url = parse.urlparse(request.uri.decode()) query_data = parse.parse_qs(parsed_url.query) if query_data: return await self.APOutboxPageRequest( request, account_jid, node, ap_account, ap_url, query_data ) # XXX: we can't use disco#info here because this request won't work on a bare jid # due to security considerations of XEP-0030 (we don't have presence # subscription). # The current workaround is to do a request as if RSM was available, and actually # check its availability according to result. try: __, metadata = await self.apg._p.getItems( client=self.apg.client, service=account_jid, node=node, max_items=0, rsm_request=rsm.RSMRequest(max_=0), extra = {C.KEY_USE_CACHE: False} ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") return {} try: items_count = metadata["rsm"]["count"] except KeyError: log.warning( f"No RSM metadata found when requesting pubsub node {node} at " f"{account_jid}, defaulting to items_count=20" ) items_count = 20 url = self.getCanonicalURL(request) url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" return { "@context": "https://www.w3.org/ns/activitystreams", "id": url, "totalItems": items_count, "type": "OrderedCollection", "first": url_first_page, "last": url_last_page, } async def APInboxRequest( self, request: "HTTPRequest", account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: Optional[str] ) -> None: if signing_actor is None: raise exceptions.InternalError("signing_actor must be set for inbox requests") if node is None: node = self.apg._m.namespace try: data = json.load(request.content) if not isinstance(data, dict): raise ValueError("data should be an object") except (json.JSONDecodeError, ValueError) as e: return self.responseCode( request, http.BAD_REQUEST, f"invalid json in inbox request: {e}" ) await self.checkSigningActor(data, signing_actor) activity_type = (data.get("type") or "").lower() if not activity_type in ACTIVITY_TYPES_LOWER: return self.responseCode( request, http.UNSUPPORTED_MEDIA_TYPE, f"request is not an activity, ignoring" ) if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED: return self.responseCode( request, http.UNSUPPORTED_MEDIA_TYPE, f"{activity_type.title()!r} activity must target an account" ) try: method = getattr(self, f"handle{activity_type.title()}Activity") except AttributeError: return self.responseCode( request, http.UNSUPPORTED_MEDIA_TYPE, f"{activity_type.title()} activity is not yet supported" ) else: await method( request, data, account_jid, node, ap_account, ap_url, signing_actor ) async def APFollowersRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: Optional[str] ) -> dict: if node is None: node = self.apg._m.namespace client = self.apg.client subscribers = await self.apg._pps.getPublicNodeSubscriptions( client, account_jid, node ) followers = [] for subscriber in subscribers.keys(): if self.apg.isVirtualJID(subscriber): # the subscriber is an AP user subscribed with this gateway ap_account = self.apg._e.unescape(subscriber.user) else: # regular XMPP user ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node) followers.append(ap_account) url = self.getCanonicalURL(request) return { "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": url, "totalItems": len(subscribers), "first": { "type": "OrderedCollectionPage", "id": url, "orderedItems": followers } } async def APFollowingRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: Optional[str] ) -> dict[str, Any]: client = self.apg.client subscriptions = await self.apg._pps.subscriptions( client, account_jid, node ) following = [] for sub_dict in subscriptions: service = jid.JID(sub_dict["service"]) if self.apg.isVirtualJID(service): # the subscription is to an AP actor with this gateway ap_account = self.apg._e.unescape(service.user) else: # regular XMPP user ap_account = await self.apg.getAPAccountFromJidAndNode( service, sub_dict["node"] ) following.append(ap_account) url = self.getCanonicalURL(request) return { "@context": "https://www.w3.org/ns/activitystreams", "type": "OrderedCollection", "id": url, "totalItems": len(subscriptions), "first": { "type": "OrderedCollectionPage", "id": url, "orderedItems": following } } async def APRequest( self, request: "HTTPRequest", signing_actor: Optional[str] = None ) -> None: if self.apg.verbose: from pprint import pformat to_log = [ "", f"<<< got {request.method.decode()} request - {request.uri.decode()}" ] try: data = json.load(request.content) except (json.JSONDecodeError, ValueError): pass else: to_log.append(pformat(data)) finally: request.content.seek(0) if self.apg.verbose>=3: headers = "\n".join( f" {k.decode()}: {v.decode()}" for k,v in request.getAllHeaders().items() ) to_log.append(f" headers:\n{headers}") path = request.path.decode() ap_url = parse.urljoin( f"https://{self.apg.public_url}", path ) request_type, extra_args = self.apg.parseAPURL(ap_url) if len(extra_args) == 0: if request_type != "shared_inbox": raise exceptions.DataError(f"Invalid request type: {request_type!r}") ret_data = await self.APInboxRequest( request, None, None, None, ap_url, signing_actor ) elif request_type == "avatar": if len(extra_args) != 1: raise exceptions.DataError("avatar argument expected in URL") avatar_filename = extra_args[0] avatar_path = self.apg.host.common_cache.getPath(avatar_filename) return static.File(str(avatar_path)).render(request) elif request_type == "item": ret_data = await self.apg.apGetLocalObject(ap_url) ret_data["@context"] = NS_AP else: if len(extra_args) > 1: log.warning(f"unexpected extra arguments: {extra_args!r}") ap_account = extra_args[0] account_jid, node = await self.apg.getJIDAndNode(ap_account) if request_type not in AP_REQUEST_TYPES.get( request.method.decode().upper(), [] ): raise exceptions.DataError(f"Invalid request type: {request_type!r}") method = getattr(self, f"AP{request_type.title()}Request") ret_data = await method( request, account_jid, node, ap_account, ap_url, signing_actor ) if ret_data is not None: request.setHeader("content-type", CONTENT_TYPE_AP) request.write(json.dumps(ret_data).encode()) if self.apg.verbose: to_log.append(f"--- RET (code: {request.code})---") if self.apg.verbose>=2: if ret_data is not None: to_log.append(f"{pformat(ret_data)}") to_log.append("---") log.info("\n".join(to_log)) request.finish() async def APPostRequest(self, request: "HTTPRequest"): try: signing_actor = await self.checkSignature(request) except exceptions.EncryptionError as e: self.responseCode( request, http.FORBIDDEN, f"invalid signature: {e}" ) request.finish() return request.setResponseCode(http.ACCEPTED) digest = request.getHeader("digest") if digest in self._seen_digest: log.debug(f"Ignoring duplicated request (digest: {digest!r})") request.finish() return self._seen_digest.append(digest) # default response code, may be changed, e.g. in case of exception try: return await self.APRequest(request, signing_actor) except Exception as e: self._onRequestError(failure.Failure(e), request) async def checkSigningActor(self, data: dict, signing_actor: str) -> None: """That that signing actor correspond to actor declared in data @param data: request payload @param signing_actor: actor ID of the signing entity, as returned by checkSignature @raise exceptions.NotFound: no actor found in data @raise exceptions.EncryptionError: signing actor doesn't match actor in data """ actor = await self.apg.apGetSenderActor(data) if signing_actor != actor: raise exceptions.EncryptionError( f"signing actor ({signing_actor}) doesn't match actor in data ({actor})" ) async def checkSignature(self, request: "HTTPRequest") -> str: """Check and validate HTTP signature @return: id of the signing actor @raise exceptions.EncryptionError: signature is not present or doesn't match """ signature = request.getHeader("Signature") if signature is None: raise exceptions.EncryptionError("No signature found") sign_data = { m["key"]: m["uq_value"] or m["quoted_value"][1:-1] for m in RE_SIG_PARAM.finditer(signature) } try: key_id = sign_data["keyId"] except KeyError: raise exceptions.EncryptionError('"keyId" is missing from signature') algorithm = sign_data.get("algorithm", HS2019) signed_headers = sign_data.get( "headers", "(created)" if algorithm==HS2019 else "date" ).lower().split() try: headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method] except KeyError: raise exceptions.InternalError( f"there should be a list of headers for {request.method} method" ) if not headers_to_check: raise exceptions.InternalError("headers_to_check must not be empty") for header in headers_to_check: if isinstance(header, tuple): if len(set(header).intersection(signed_headers)) == 0: raise exceptions.EncryptionError( f"at least one of following header must be signed: {header}" ) elif header not in signed_headers: raise exceptions.EncryptionError( f"the {header!r} header must be signed" ) body = request.content.read() request.content.seek(0) headers = {} for to_sign in signed_headers: if to_sign == "(request-target)": method = request.method.decode().lower() uri = request.uri.decode() headers[to_sign] = f"{method} /{uri.lstrip('/')}" elif to_sign in ("(created)", "(expires)"): if algorithm != HS2019: raise exceptions.EncryptionError( f"{to_sign!r} pseudo-header can only be used with {HS2019} " "algorithm" ) key = to_sign[1:-1] value = sign_data.get(key) if not value: raise exceptions.EncryptionError( "{key!r} parameter is missing from signature" ) try: if float(value) < 0: raise ValueError except ValueError: raise exceptions.EncryptionError( f"{to_sign} must be a Unix timestamp" ) headers[to_sign] = value else: value = request.getHeader(to_sign) if not value: raise exceptions.EncryptionError( f"value of header {to_sign!r} is missing!" ) elif to_sign == "host": # we check Forwarded/X-Forwarded-Host headers # as we need original host if a proxy has modified the header forwarded = request.getHeader("forwarded") if forwarded is not None: try: host = [ f[5:] for f in forwarded.split(";") if f.startswith("host=") ][0] or None except IndexError: host = None else: host = None if host is None: host = request.getHeader("x-forwarded-host") if host: value = host elif to_sign == "digest": hashes = { algo.lower(): hash_ for algo, hash_ in ( digest.split("=", 1) for digest in value.split(",") ) } try: given_digest = hashes["sha-256"] except KeyError: raise exceptions.EncryptionError( "Only SHA-256 algorithm is currently supported for digest" ) __, computed_digest = self.apg.getDigest(body) if given_digest != computed_digest: raise exceptions.EncryptionError( f"SHA-256 given and computed digest differ:\n" f"given: {given_digest!r}\ncomputed: {computed_digest!r}" ) headers[to_sign] = value # date check limit_ts = time.time() + SIGN_EXP if "(created)" in headers: created = float(headers["created"]) else: created = date_utils.date_parse(headers["date"]) try: expires = float(headers["expires"]) except KeyError: pass else: if expires < created: log.warning( f"(expires) [{expires}] set in the past of (created) [{created}] " "ignoring it according to specs" ) else: limit_ts = min(limit_ts, expires) if created > limit_ts: raise exceptions.EncryptionError("Signature has expired") try: return await self.apg.checkSignature( sign_data["signature"], key_id, headers ) except exceptions.EncryptionError: method, url = headers["(request-target)"].rsplit(' ', 1) headers["(request-target)"] = f"{method} {parse.unquote(url)}" log.debug( "Using workaround for (request-target) encoding bug in signature, " "see https://github.com/mastodon/mastodon/issues/18871" ) return await self.apg.checkSignature( sign_data["signature"], key_id, headers ) def render(self, request): request.setHeader("server", VERSION) return super().render(request) def render_GET(self, request): path = request.path.decode().lstrip("/") if path.startswith(".well-known/webfinger"): defer.ensureDeferred(self.webfinger(request)) return server.NOT_DONE_YET elif path.startswith(self.apg.ap_path): d = defer.ensureDeferred(self.APRequest(request)) d.addErrback(self._onRequestError, request) return server.NOT_DONE_YET return web_resource.NoResource().render(request) def render_POST(self, request): path = request.path.decode().lstrip("/") if not path.startswith(self.apg.ap_path): return web_resource.NoResource().render(request) defer.ensureDeferred(self.APPostRequest(request)) return server.NOT_DONE_YET class HTTPRequest(server.Request): pass class HTTPServer(server.Site): requestFactory = HTTPRequest def __init__(self, ap_gateway): super().__init__(HTTPAPGServer(ap_gateway))