Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_comp_ap_gateway/http_server.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_comp_ap_gateway/http_server.py@524856bd7b19 |
children | 13b1079c27ec |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_comp_ap_gateway/http_server.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,1328 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import time +import html +from typing import Optional, Dict, List, Any +import json +from urllib import parse +from collections import deque +import unicodedata + +from twisted.web import http, resource as web_resource, server +from twisted.web import static +from twisted.web import util as web_util +from twisted.python import failure +from twisted.internet import defer +from twisted.words.protocols.jabber import jid, error +from wokkel import pubsub, rsm + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.i18n import _ +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.log import getLogger +from libervia.backend.tools.common import date_utils, uri +from libervia.backend.memory.sqla_mapping import SubscriptionState + +from .constants import ( + NS_AP, MEDIA_TYPE_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, + TYPE_OUTBOX, TYPE_EVENT, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, + ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, + TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE +) +from .regex import RE_SIG_PARAM + + +log = getLogger(__name__) + +VERSION = unicodedata.normalize( + 'NFKD', + f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" +) + + +class HTTPAPGServer(web_resource.Resource): + """HTTP Server handling ActivityPub S2S protocol""" + isLeaf = True + + def __init__(self, ap_gateway): + self.apg = ap_gateway + self._seen_digest = deque(maxlen=50) + super().__init__() + + def response_code( + self, + request: "HTTPRequest", + http_code: int, + msg: Optional[str] = None + ) -> None: + """Log and set HTTP return code and associated message""" + if msg is not None: + log.warning(msg) + request.setResponseCode(http_code, None if msg is None else msg.encode()) + + def _on_request_error(self, failure_: failure.Failure, request: "HTTPRequest") -> None: + exc = failure_.value + if isinstance(exc, exceptions.NotFound): + self.response_code( + request, + http.NOT_FOUND, + str(exc) + ) + else: + log.exception(f"Internal error: {failure_.value}") + self.response_code( + request, + http.INTERNAL_SERVER_ERROR, + f"internal error: {failure_.value}" + ) + request.finish() + raise failure_ + + request.finish() + + async def webfinger(self, request): + url_parsed = parse.urlparse(request.uri.decode()) + query = parse.parse_qs(url_parsed.query) + resource = query.get("resource", [""])[0] + account = resource[5:].strip() + if not resource.startswith("acct:") or not account: + return web_resource.ErrorPage( + http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" + ).render(request) + + actor_url = self.apg.build_apurl(TYPE_ACTOR, account) + + resp = { + "aliases": [actor_url], + "subject": resource, + "links": [ + { + "rel": "self", + "type": "application/activity+json", + "href": actor_url + } + ] + } + request.setHeader("content-type", CONTENT_TYPE_AP) + request.write(json.dumps(resp).encode()) + request.finish() + + async def handle_undo_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: str + ) -> None: + if node is None: + node = self.apg._m.namespace + client = await self.apg.get_virtual_client(signing_actor) + object_ = data.get("object") + if isinstance(object_, str): + # we check first if it's not a cached object + ap_cache_key = f"{ST_AP_CACHE}{object_}" + value = await self.apg.client._ap_storage.get(ap_cache_key) + else: + value = None + if value is not None: + objects = [value] + # because we'll undo the activity, we can remove it from cache + await self.apg.client._ap_storage.remove(ap_cache_key) + else: + objects = await self.apg.ap_get_list(data, "object") + for obj in objects: + type_ = obj.get("type") + actor = await self.apg.ap_get_sender_actor(obj) + if actor != signing_actor: + log.warning(f"ignoring object not attributed to signing actor: {data}") + continue + + if type_ == "Follow": + try: + target_account = obj["object"] + except KeyError: + log.warning(f'ignoring invalid object, missing "object" key: {data}') + continue + if not self.apg.is_local_url(target_account): + log.warning(f"ignoring unfollow request to non local actor: {data}") + continue + await self.apg._p.unsubscribe( + client, + account_jid, + node, + sender=client.jid, + ) + elif type_ == "Announce": + # we can use directly the Announce object, as only the "id" field is + # needed + await self.apg.new_ap_delete_item(client, None, node, obj) + elif type_ == TYPE_LIKE: + await self.handle_attachment_item(client, obj, {"noticed": False}) + elif type_ == TYPE_REACTION: + await self.handle_attachment_item(client, obj, { + "reactions": {"operation": "update", "remove": [obj["content"]]} + }) + else: + log.warning(f"Unmanaged undo type: {type_!r}") + + async def handle_follow_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: str + ) -> None: + if node is None: + node = self.apg._m.namespace + client = await self.apg.get_virtual_client(signing_actor) + try: + subscription = await self.apg._p.subscribe( + client, + account_jid, + node, + # subscriptions from AP are always public + options=self.apg._pps.set_public_opt() + ) + except pubsub.SubscriptionPending: + log.info(f"subscription to node {node!r} of {account_jid} is pending") + # TODO: manage SubscriptionUnconfigured + else: + if subscription.state != "subscribed": + # other states should raise an Exception + raise exceptions.InternalError('"subscribed" state was expected') + inbox = await self.apg.get_ap_inbox_from_id(signing_actor, use_shared=False) + actor_id = self.apg.build_apurl(TYPE_ACTOR, ap_account) + accept_data = self.apg.create_activity( + "Accept", actor_id, object_=data + ) + await self.apg.sign_and_post(inbox, actor_id, accept_data) + await self.apg._c.synchronise(client, account_jid, node, resync=False) + + async def handle_accept_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: str + ) -> None: + if node is None: + node = self.apg._m.namespace + client = await self.apg.get_virtual_client(signing_actor) + objects = await self.apg.ap_get_list(data, "object") + for obj in objects: + type_ = obj.get("type") + if type_ == "Follow": + follow_node = await self.apg.host.memory.storage.get_pubsub_node( + client, client.jid, node, with_subscriptions=True + ) + if follow_node is None: + log.warning( + f"Received a follow accept on an unknown node: {node!r} at " + f"{client.jid}. Ignoring it" + ) + continue + try: + sub = next( + s for s in follow_node.subscriptions if s.subscriber==account_jid + ) + except StopIteration: + log.warning( + "Received a follow accept on a node without subscription: " + f"{node!r} at {client.jid}. Ignoring it" + ) + else: + if sub.state == SubscriptionState.SUBSCRIBED: + log.warning(f"Already subscribed to {node!r} at {client.jid}") + elif sub.state == SubscriptionState.PENDING: + follow_node.subscribed = True + sub.state = SubscriptionState.SUBSCRIBED + await self.apg.host.memory.storage.add(follow_node) + else: + raise exceptions.InternalError( + f"Unhandled subscription state {sub.state!r}" + ) + else: + log.warning(f"Unmanaged accept type: {type_!r}") + + async def handle_delete_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + if node is None: + node = self.apg._m.namespace + client = await self.apg.get_virtual_client(signing_actor) + objects = await self.apg.ap_get_list(data, "object") + for obj in objects: + await self.apg.new_ap_delete_item(client, account_jid, node, obj) + + async def handle_new_ap_items( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + signing_actor: str, + repeated: bool = False, + ): + """Helper method to handle workflow for new AP items + + accept globally the same parameter as for handle_create_activity + @param repeated: if True, the item is an item republished from somewhere else + """ + if "_repeated" in data: + log.error( + '"_repeated" field already present in given AP item, this should not ' + f"happen. Ignoring object from {signing_actor}\n{data}" + ) + raise exceptions.DataError("unexpected field in item") + client = await self.apg.get_virtual_client(signing_actor) + objects = await self.apg.ap_get_list(data, "object") + for obj in objects: + if node is None: + if obj.get("type") == TYPE_EVENT: + node = self.apg._events.namespace + else: + node = self.apg._m.namespace + sender = await self.apg.ap_get_sender_actor(obj) + if repeated: + # we don't check sender when item is repeated, as it should be different + # from post author in this case + sender_jid = await self.apg.get_jid_from_id(sender) + repeater_jid = await self.apg.get_jid_from_id(signing_actor) + repeated_item_id = obj["id"] + if self.apg.is_local_url(repeated_item_id): + # the repeated object is from XMPP, we need to parse the URL to find + # the right ID + url_type, url_args = self.apg.parse_apurl(repeated_item_id) + if url_type != "item": + raise exceptions.DataError( + "local URI is not an item: {repeated_id}" + ) + try: + url_account, url_item_id = url_args + if not url_account or not url_item_id: + raise ValueError + except (RuntimeError, ValueError): + raise exceptions.DataError( + "local URI is invalid: {repeated_id}" + ) + else: + url_jid, url_node = await self.apg.get_jid_and_node(url_account) + if ((url_jid != sender_jid + or url_node and url_node != self.apg._m.namespace)): + raise exceptions.DataError( + "announced ID doesn't match sender ({sender}): " + f"[repeated_item_id]" + ) + + repeated_item_id = url_item_id + + obj["_repeated"] = { + "by": repeater_jid.full(), + "at": data.get("published"), + "uri": uri.build_xmpp_uri( + "pubsub", + path=sender_jid.full(), + node=self.apg._m.namespace, + item=repeated_item_id + ) + } + # we must use activity's id and targets, not the original item ones + for field in ("id", "to", "bto", "cc", "bcc"): + obj[field] = data.get(field) + else: + if sender != signing_actor: + log.warning( + "Ignoring object not attributed to signing actor: {obj}" + ) + continue + + await self.apg.new_ap_item(client, account_jid, node, obj) + + async def handle_create_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + await self.handle_new_ap_items(request, data, account_jid, node, signing_actor) + + async def handle_update_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + # Update is the same as create: the item ID stays the same, thus the item will be + # overwritten + await self.handle_new_ap_items(request, data, account_jid, node, signing_actor) + + async def handle_announce_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + # we create a new item + await self.handle_new_ap_items( + request, + data, + account_jid, + node, + signing_actor, + repeated=True + ) + + async def handle_attachment_item( + self, + client: SatXMPPEntity, + data: dict, + attachment_data: dict + ) -> None: + target_ids = data.get("object") + if not target_ids: + raise exceptions.DataError("object should be set") + elif isinstance(target_ids, list): + try: + target_ids = [o["id"] for o in target_ids] + except (KeyError, TypeError): + raise exceptions.DataError(f"invalid object: {target_ids!r}") + elif isinstance(target_ids, dict): + obj_id = target_ids.get("id") + if not obj_id or not isinstance(obj_id, str): + raise exceptions.DataError(f"invalid object: {target_ids!r}") + target_ids = [obj_id] + elif isinstance(target_ids, str): + target_ids = [target_ids] + + # XXX: we have to cache AP items because some implementation (Pleroma notably) + # don't keep object accessible, and we need to be able to retrieve them for + # UNDO. Current implementation will grow, we need to add a way to flush it after + # a while. + # TODO: add a way to flush old cached AP items. + await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data) + + for target_id in target_ids: + if not self.apg.is_local_url(target_id): + log.debug(f"ignoring non local target ID: {target_id}") + continue + url_type, url_args = self.apg.parse_apurl(target_id) + if url_type != TYPE_ITEM: + log.warning(f"unexpected local URL for attachment on item {target_id}") + continue + try: + account, item_id = url_args + except ValueError: + raise ValueError(f"invalid URL: {target_id}") + author_jid, item_node = await self.apg.get_jid_and_node(account) + if item_node is None: + item_node = self.apg._m.namespace + attachment_node = self.apg._pa.get_attachment_node_name( + author_jid, item_node, item_id + ) + cached_node = await self.apg.host.memory.storage.get_pubsub_node( + client, + author_jid, + attachment_node, + with_subscriptions=True, + create=True + ) + found_items, __ = await self.apg.host.memory.storage.get_items( + cached_node, item_ids=[client.jid.userhost()] + ) + if not found_items: + old_item_elt = None + else: + found_item = found_items[0] + old_item_elt = found_item.data + + item_elt = await self.apg._pa.apply_set_handler( + client, + {"extra": attachment_data}, + old_item_elt, + None + ) + # we reparse the element, as there can be other attachments + attachments_data = self.apg._pa.items_2_attachment_data(client, [item_elt]) + # and we update the cache + await self.apg.host.memory.storage.cache_pubsub_items( + client, + cached_node, + [item_elt], + attachments_data or [{}] + ) + + if self.apg.is_virtual_jid(author_jid): + # the attachment is on t a virtual pubsub service (linking to an AP item), + # we notify all subscribers + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.apg.pubsub_service.notifyPublish( + author_jid, + attachment_node, + [(subscription.subscriber, None, [item_elt])] + ) + else: + # the attachment is on an XMPP item, we publish it to the attachment node + await self.apg._p.send_items( + client, author_jid, attachment_node, [item_elt] + ) + + async def handle_like_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ) -> None: + client = await self.apg.get_virtual_client(signing_actor) + await self.handle_attachment_item(client, data, {"noticed": True}) + + async def handle_emojireact_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ) -> None: + client = await self.apg.get_virtual_client(signing_actor) + await self.handle_attachment_item(client, data, { + "reactions": {"operation": "update", "add": [data["content"]]} + }) + + async def handle_join_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ) -> None: + client = await self.apg.get_virtual_client(signing_actor) + await self.handle_attachment_item(client, data, {"rsvp": {"attending": "yes"}}) + + async def handle_leave_activity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ) -> None: + client = await self.apg.get_virtual_client(signing_actor) + await self.handle_attachment_item(client, data, {"rsvp": {"attending": "no"}}) + + async def ap_actor_request( + self, + request: "HTTPRequest", + data: Optional[dict], + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: Optional[str] + ) -> dict: + inbox = self.apg.build_apurl(TYPE_INBOX, ap_account) + shared_inbox = self.apg.build_apurl(TYPE_SHARED_INBOX) + outbox = self.apg.build_apurl(TYPE_OUTBOX, ap_account) + followers = self.apg.build_apurl(TYPE_FOLLOWERS, ap_account) + following = self.apg.build_apurl(TYPE_FOLLOWING, ap_account) + + # we have to use AP account as preferredUsername because it is used to retrieve + # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) + preferred_username = ap_account.split("@", 1)[0] + + identity_data = await self.apg._i.get_identity(self.apg.client, account_jid) + if node and node.startswith(self.apg._events.namespace): + events = outbox + else: + events_account = await self.apg.get_ap_account_from_jid_and_node( + account_jid, self.apg._events.namespace + ) + events = self.apg.build_apurl(TYPE_OUTBOX, events_account) + + actor_data = { + "@context": [ + "https://www.w3.org/ns/activitystreams", + "https://w3id.org/security/v1" + ], + + # XXX: Mastodon doesn't like percent-encode arobas, so we have to unescape it + # if it is escaped + "id": ap_url.replace("%40", "@"), + "type": "Person", + "preferredUsername": preferred_username, + "inbox": inbox, + "outbox": outbox, + "events": events, + "followers": followers, + "following": following, + "publicKey": { + "id": f"{ap_url}#main-key", + "owner": ap_url, + "publicKeyPem": self.apg.public_key_pem + }, + "endpoints": { + "sharedInbox": shared_inbox, + "events": events, + }, + } + + if identity_data.get("nicknames"): + actor_data["name"] = identity_data["nicknames"][0] + if identity_data.get("description"): + # description is plain text while summary expects HTML + actor_data["summary"] = html.escape(identity_data["description"]) + if identity_data.get("avatar"): + avatar_data = identity_data["avatar"] + try: + filename = avatar_data["filename"] + media_type = avatar_data["media_type"] + except KeyError: + log.error(f"incomplete avatar data: {identity_data!r}") + else: + avatar_url = self.apg.build_apurl("avatar", filename) + actor_data["icon"] = { + "type": "Image", + "url": avatar_url, + "mediaType": media_type + } + + return actor_data + + def get_canonical_url(self, request: "HTTPRequest") -> str: + return parse.urljoin( + f"https://{self.apg.public_url}", + request.path.decode().rstrip("/") + # we unescape "@" for the same reason as in [ap_actor_request] + ).replace("%40", "@") + + def query_data_2_rsm_request( + self, + query_data: Dict[str, List[str]] + ) -> rsm.RSMRequest: + """Get RSM kwargs to use with RSMRequest from query data""" + page = query_data.get("page") + + if page == ["first"]: + return rsm.RSMRequest(max_=PAGE_SIZE, before="") + elif page == ["last"]: + return rsm.RSMRequest(max_=PAGE_SIZE) + else: + for query_key in ("index", "before", "after"): + try: + kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} + except (KeyError, IndexError, ValueError): + pass + else: + return rsm.RSMRequest(**kwargs) + raise ValueError(f"Invalid query data: {query_data!r}") + + async def ap_outbox_page_request( + self, + request: "HTTPRequest", + data: Optional[dict], + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + query_data: Dict[str, List[str]] + ) -> dict: + if node is None: + node = self.apg._m.namespace + # we only keep useful keys, and sort to have consistent URL which can + # be used as ID + url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) + query_data = {k: query_data[k] for k in url_keys} + try: + items, metadata = await self.apg._p.get_items( + client=self.apg.client, + service=account_jid, + node=node, + rsm_request=self.query_data_2_rsm_request(query_data), + extra = {C.KEY_USE_CACHE: False} + ) + except error.StanzaError as e: + log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") + return {} + + base_url = self.get_canonical_url(request) + url = f"{base_url}?{parse.urlencode(query_data, True)}" + if node and node.startswith(self.apg._events.namespace): + ordered_items = [ + await self.apg.ap_events.event_data_2_ap_item( + self.apg._events.event_elt_2_event_data(item), + account_jid + ) + for item in reversed(items) + ] + else: + ordered_items = [ + await self.apg.mb_data_2_ap_item( + self.apg.client, + await self.apg._m.item_2_mb_data( + self.apg.client, + item, + account_jid, + node + ) + ) + for item in reversed(items) + ] + ret_data = { + "@context": ["https://www.w3.org/ns/activitystreams"], + "id": url, + "type": "OrderedCollectionPage", + "partOf": base_url, + "orderedItems": ordered_items + } + + if "rsm" not in metadata: + # no RSM available, we return what we have + return ret_data + + # AP OrderedCollection must be in reversed chronological order, thus the opposite + # of what we get with RSM (at least with Libervia Pubsub) + if not metadata["complete"]: + try: + last= metadata["rsm"]["last"] + except KeyError: + last = None + ret_data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" + if metadata["rsm"]["index"] != 0: + try: + first= metadata["rsm"]["first"] + except KeyError: + first = None + ret_data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" + + return ret_data + + async def ap_outbox_request( + self, + request: "HTTPRequest", + data: Optional[dict], + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: Optional[str] + ) -> dict: + if node is None: + node = self.apg._m.namespace + + parsed_url = parse.urlparse(request.uri.decode()) + query_data = parse.parse_qs(parsed_url.query) + if query_data: + return await self.ap_outbox_page_request( + request, data, account_jid, node, ap_account, ap_url, query_data + ) + + # XXX: we can't use disco#info here because this request won't work on a bare jid + # due to security considerations of XEP-0030 (we don't have presence + # subscription). + # The current workaround is to do a request as if RSM was available, and actually + # check its availability according to result. + try: + __, metadata = await self.apg._p.get_items( + client=self.apg.client, + service=account_jid, + node=node, + max_items=0, + rsm_request=rsm.RSMRequest(max_=0), + extra = {C.KEY_USE_CACHE: False} + ) + except error.StanzaError as e: + log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") + return {} + try: + items_count = metadata["rsm"]["count"] + except KeyError: + log.warning( + f"No RSM metadata found when requesting pubsub node {node} at " + f"{account_jid}, defaulting to items_count=20" + ) + items_count = 20 + + url = self.get_canonical_url(request) + url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" + url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" + return { + "@context": ["https://www.w3.org/ns/activitystreams"], + "id": url, + "totalItems": items_count, + "type": "OrderedCollection", + "first": url_first_page, + "last": url_last_page, + } + + async def ap_inbox_request( + self, + request: "HTTPRequest", + data: Optional[dict], + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: Optional[str] + ) -> None: + assert data is not None + if signing_actor is None: + raise exceptions.InternalError("signing_actor must be set for inbox requests") + await self.check_signing_actor(data, signing_actor) + activity_type = (data.get("type") or "").lower() + if not activity_type in ACTIVITY_TYPES_LOWER: + return self.response_code( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"request is not an activity, ignoring" + ) + + if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED: + return self.response_code( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"{activity_type.title()!r} activity must target an account" + ) + + try: + method = getattr(self, f"handle_{activity_type}_activity") + except AttributeError: + return self.response_code( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"{activity_type.title()} activity is not yet supported" + ) + else: + await method( + request, data, account_jid, node, ap_account, ap_url, signing_actor + ) + + async def ap_followers_request( + self, + request: "HTTPRequest", + data: Optional[dict], + account_jid: jid.JID, + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: Optional[str] + ) -> dict: + if node is None: + node = self.apg._m.namespace + client = self.apg.client + subscribers = await self.apg._pps.get_public_node_subscriptions( + client, account_jid, node + ) + followers = [] + for subscriber in subscribers.keys(): + if self.apg.is_virtual_jid(subscriber): + # the subscriber is an AP user subscribed with this gateway + ap_account = self.apg._e.unescape(subscriber.user) + else: + # regular XMPP user + ap_account = await self.apg.get_ap_account_from_jid_and_node(subscriber, node) + followers.append(ap_account) + + url = self.get_canonical_url(request) + return { + "@context": ["https://www.w3.org/ns/activitystreams"], + "type": "OrderedCollection", + "id": url, + "totalItems": len(subscribers), + "first": { + "type": "OrderedCollectionPage", + "id": url, + "orderedItems": followers + } + } + + async def ap_following_request( + self, + request: "HTTPRequest", + data: Optional[dict], + account_jid: jid.JID, + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: Optional[str] + ) -> dict[str, Any]: + client = self.apg.client + subscriptions = await self.apg._pps.subscriptions( + client, account_jid, node + ) + following = [] + for sub_dict in subscriptions: + service = jid.JID(sub_dict["service"]) + if self.apg.is_virtual_jid(service): + # the subscription is to an AP actor with this gateway + ap_account = self.apg._e.unescape(service.user) + else: + # regular XMPP user + ap_account = await self.apg.get_ap_account_from_jid_and_node( + service, sub_dict["node"] + ) + following.append(ap_account) + + url = self.get_canonical_url(request) + return { + "@context": ["https://www.w3.org/ns/activitystreams"], + "type": "OrderedCollection", + "id": url, + "totalItems": len(subscriptions), + "first": { + "type": "OrderedCollectionPage", + "id": url, + "orderedItems": following + } + } + + def _get_to_log( + self, + request: "HTTPRequest", + data: Optional[dict] = None, + ) -> List[str]: + """Get base data to logs in verbose mode""" + from pprint import pformat + to_log = [ + "", + f"<<< got {request.method.decode()} request - {request.uri.decode()}" + ] + if data is not None: + to_log.append(pformat(data)) + if self.apg.verbose>=3: + headers = "\n".join( + f" {k.decode()}: {v.decode()}" + for k,v in request.getAllHeaders().items() + ) + to_log.append(f" headers:\n{headers}") + return to_log + + async def ap_request( + self, + request: "HTTPRequest", + data: Optional[dict] = None, + signing_actor: Optional[str] = None + ) -> None: + if self.apg.verbose: + to_log = self._get_to_log(request, data) + + path = request.path.decode() + ap_url = parse.urljoin( + f"https://{self.apg.public_url}", + path + ) + request_type, extra_args = self.apg.parse_apurl(ap_url) + if ((MEDIA_TYPE_AP not in (request.getHeader("accept") or "") + and request_type in self.apg.html_redirect)): + # this is not a AP request, and we have a redirections for it + kw = {} + if extra_args: + kw["jid"], kw["node"] = await self.apg.get_jid_and_node(extra_args[0]) + kw["jid_user"] = kw["jid"].user + if kw["node"] is None: + kw["node"] = self.apg._m.namespace + if len(extra_args) > 1: + kw["item"] = extra_args[1] + else: + kw["item"] = "" + else: + kw["jid"], kw["jid_user"], kw["node"], kw["item"] = "", "", "", "" + + redirections = self.apg.html_redirect[request_type] + for redirection in redirections: + filters = redirection["filters"] + if not filters: + break + # if we have filter, they must all match + elif all(v in kw[k] for k,v in filters.items()): + break + else: + # no redirection is matching + redirection = None + + if redirection is not None: + kw = {k: parse.quote(str(v), safe="") for k,v in kw.items()} + target_url = redirection["url"].format(**kw) + content = web_util.redirectTo(target_url.encode(), request) + request.write(content) + request.finish() + return + + if len(extra_args) == 0: + if request_type != "shared_inbox": + raise exceptions.DataError(f"Invalid request type: {request_type!r}") + ret_data = await self.ap_inbox_request( + request, data, None, None, None, ap_url, signing_actor + ) + elif request_type == "avatar": + if len(extra_args) != 1: + raise exceptions.DataError("avatar argument expected in URL") + avatar_filename = extra_args[0] + avatar_path = self.apg.host.common_cache.getPath(avatar_filename) + return static.File(str(avatar_path)).render(request) + elif request_type == "item": + ret_data = await self.apg.ap_get_local_object(ap_url) + if "@context" not in ret_data: + ret_data["@context"] = [NS_AP] + else: + if len(extra_args) > 1: + log.warning(f"unexpected extra arguments: {extra_args!r}") + ap_account = extra_args[0] + account_jid, node = await self.apg.get_jid_and_node(ap_account) + if request_type not in AP_REQUEST_TYPES.get( + request.method.decode().upper(), [] + ): + raise exceptions.DataError(f"Invalid request type: {request_type!r}") + method = getattr(self, f"AP{request_type.title()}Request") + ret_data = await method( + request, data, account_jid, node, ap_account, ap_url, signing_actor + ) + if ret_data is not None: + request.setHeader("content-type", CONTENT_TYPE_AP) + request.write(json.dumps(ret_data).encode()) + if self.apg.verbose: + to_log.append(f"--- RET (code: {request.code})---") + if self.apg.verbose>=2: + if ret_data is not None: + from pprint import pformat + to_log.append(f"{pformat(ret_data)}") + to_log.append("---") + log.info("\n".join(to_log)) + request.finish() + + async def ap_post_request(self, request: "HTTPRequest") -> None: + try: + data = json.load(request.content) + if not isinstance(data, dict): + log.warning(f"JSON data should be an object (uri={request.uri.decode()})") + self.response_code( + request, + http.BAD_REQUEST, + f"invalid body, was expecting a JSON object" + ) + request.finish() + return + except (json.JSONDecodeError, ValueError) as e: + self.response_code( + request, + http.BAD_REQUEST, + f"invalid json in inbox request: {e}" + ) + request.finish() + return + else: + request.content.seek(0) + + try: + if data["type"] == "Delete" and data["actor"] == data["object"]: + # we don't handle actor deletion + request.setResponseCode(http.ACCEPTED) + log.debug(f"ignoring actor deletion ({data['actor']})") + # TODO: clean data in cache coming from this actor, maybe with a tombstone + request.finish() + return + except KeyError: + pass + + try: + signing_actor = await self.check_signature(request) + except exceptions.EncryptionError as e: + if self.apg.verbose: + to_log = self._get_to_log(request) + to_log.append(f" body: {request.content.read()!r}") + request.content.seek(0) + log.info("\n".join(to_log)) + self.response_code( + request, + http.FORBIDDEN, + f"invalid signature: {e}" + ) + request.finish() + return + except Exception as e: + self.response_code( + request, + http.INTERNAL_SERVER_ERROR, + f"Can't check signature: {e}" + ) + request.finish() + return + + request.setResponseCode(http.ACCEPTED) + + digest = request.getHeader("digest") + if digest in self._seen_digest: + log.debug(f"Ignoring duplicated request (digest: {digest!r})") + request.finish() + return + self._seen_digest.append(digest) + + # default response code, may be changed, e.g. in case of exception + try: + return await self.ap_request(request, data, signing_actor) + except Exception as e: + self._on_request_error(failure.Failure(e), request) + + async def check_signing_actor(self, data: dict, signing_actor: str) -> None: + """That that signing actor correspond to actor declared in data + + @param data: request payload + @param signing_actor: actor ID of the signing entity, as returned by + check_signature + @raise exceptions.NotFound: no actor found in data + @raise exceptions.EncryptionError: signing actor doesn't match actor in data + """ + actor = await self.apg.ap_get_sender_actor(data) + + if signing_actor != actor: + raise exceptions.EncryptionError( + f"signing actor ({signing_actor}) doesn't match actor in data ({actor})" + ) + + async def check_signature(self, request: "HTTPRequest") -> str: + """Check and validate HTTP signature + + @return: id of the signing actor + + @raise exceptions.EncryptionError: signature is not present or doesn't match + """ + signature = request.getHeader("Signature") + if signature is None: + raise exceptions.EncryptionError("No signature found") + sign_data = { + m["key"]: m["uq_value"] or m["quoted_value"][1:-1] + for m in RE_SIG_PARAM.finditer(signature) + } + try: + key_id = sign_data["keyId"] + except KeyError: + raise exceptions.EncryptionError('"keyId" is missing from signature') + algorithm = sign_data.get("algorithm", HS2019) + signed_headers = sign_data.get( + "headers", + "(created)" if algorithm==HS2019 else "date" + ).lower().split() + try: + headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method] + except KeyError: + raise exceptions.InternalError( + f"there should be a list of headers for {request.method} method" + ) + if not headers_to_check: + raise exceptions.InternalError("headers_to_check must not be empty") + + for header in headers_to_check: + if isinstance(header, tuple): + if len(set(header).intersection(signed_headers)) == 0: + raise exceptions.EncryptionError( + f"at least one of following header must be signed: {header}" + ) + elif header not in signed_headers: + raise exceptions.EncryptionError( + f"the {header!r} header must be signed" + ) + + body = request.content.read() + request.content.seek(0) + headers = {} + for to_sign in signed_headers: + if to_sign == "(request-target)": + method = request.method.decode().lower() + uri = request.uri.decode() + headers[to_sign] = f"{method} /{uri.lstrip('/')}" + elif to_sign in ("(created)", "(expires)"): + if algorithm != HS2019: + raise exceptions.EncryptionError( + f"{to_sign!r} pseudo-header can only be used with {HS2019} " + "algorithm" + ) + key = to_sign[1:-1] + value = sign_data.get(key) + if not value: + raise exceptions.EncryptionError( + "{key!r} parameter is missing from signature" + ) + try: + if float(value) < 0: + raise ValueError + except ValueError: + raise exceptions.EncryptionError( + f"{to_sign} must be a Unix timestamp" + ) + headers[to_sign] = value + else: + value = request.getHeader(to_sign) + if not value: + raise exceptions.EncryptionError( + f"value of header {to_sign!r} is missing!" + ) + elif to_sign == "host": + # we check Forwarded/X-Forwarded-Host headers + # as we need original host if a proxy has modified the header + forwarded = request.getHeader("forwarded") + if forwarded is not None: + try: + host = [ + f[5:] for f in forwarded.split(";") + if f.startswith("host=") + ][0] or None + except IndexError: + host = None + else: + host = None + if host is None: + host = request.getHeader("x-forwarded-host") + if host: + value = host + elif to_sign == "digest": + hashes = { + algo.lower(): hash_ for algo, hash_ in ( + digest.split("=", 1) for digest in value.split(",") + ) + } + try: + given_digest = hashes["sha-256"] + except KeyError: + raise exceptions.EncryptionError( + "Only SHA-256 algorithm is currently supported for digest" + ) + __, computed_digest = self.apg.get_digest(body) + if given_digest != computed_digest: + raise exceptions.EncryptionError( + f"SHA-256 given and computed digest differ:\n" + f"given: {given_digest!r}\ncomputed: {computed_digest!r}" + ) + headers[to_sign] = value + + # date check + limit_ts = time.time() + SIGN_EXP + if "(created)" in headers: + created = float(headers["created"]) + else: + created = date_utils.date_parse(headers["date"]) + + + try: + expires = float(headers["expires"]) + except KeyError: + pass + else: + if expires < created: + log.warning( + f"(expires) [{expires}] set in the past of (created) [{created}] " + "ignoring it according to specs" + ) + else: + limit_ts = min(limit_ts, expires) + + if created > limit_ts: + raise exceptions.EncryptionError("Signature has expired") + + try: + return await self.apg.check_signature( + sign_data["signature"], + key_id, + headers + ) + except exceptions.EncryptionError: + method, url = headers["(request-target)"].rsplit(' ', 1) + headers["(request-target)"] = f"{method} {parse.unquote(url)}" + log.debug( + "Using workaround for (request-target) encoding bug in signature, " + "see https://github.com/mastodon/mastodon/issues/18871" + ) + return await self.apg.check_signature( + sign_data["signature"], + key_id, + headers + ) + + def render(self, request): + request.setHeader("server", VERSION) + return super().render(request) + + def render_GET(self, request): + path = request.path.decode().lstrip("/") + if path.startswith(".well-known/webfinger"): + defer.ensureDeferred(self.webfinger(request)) + return server.NOT_DONE_YET + elif path.startswith(self.apg.ap_path): + d = defer.ensureDeferred(self.ap_request(request)) + d.addErrback(self._on_request_error, request) + return server.NOT_DONE_YET + + return web_resource.NoResource().render(request) + + def render_POST(self, request): + path = request.path.decode().lstrip("/") + if not path.startswith(self.apg.ap_path): + return web_resource.NoResource().render(request) + defer.ensureDeferred(self.ap_post_request(request)) + return server.NOT_DONE_YET + + +class HTTPRequest(server.Request): + pass + + +class HTTPServer(server.Site): + requestFactory = HTTPRequest + + def __init__(self, ap_gateway): + super().__init__(HTTPAPGServer(ap_gateway))