Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_ap_gateway/http_server.py @ 4141:ba8ddfdd334f
cli (loops): run GLib loop in same thread as asyncio:
use the new `install_glib_asyncio_iteration` to run GLib in the same thread as asyncio.
rel 426
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 01 Nov 2023 14:05:53 +0100 |
parents | 7067b0d73183 |
children | 5f2d496c633f |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time import html from typing import Optional, Dict, List, Any import json from urllib import parse from collections import deque import unicodedata from twisted.web import http, resource as web_resource, server from twisted.web import static from twisted.web import util as web_util from twisted.python import failure from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from wokkel import pubsub, rsm from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.log import getLogger from libervia.backend.tools.common import date_utils, uri from libervia.backend.memory.sqla_mapping import SubscriptionState from .constants import ( NS_AP, MEDIA_TYPE_AP, MEDIA_TYPE_AP_ALT, CONTENT_TYPE_WEBFINGER, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, TYPE_EVENT, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE ) from .regex import RE_SIG_PARAM log = getLogger(__name__) VERSION = unicodedata.normalize( 'NFKD', f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" ) class HTTPAPGServer(web_resource.Resource): """HTTP Server handling ActivityPub S2S protocol""" isLeaf = True def __init__(self, ap_gateway): self.apg = ap_gateway self._seen_digest = deque(maxlen=50) super().__init__() def response_code( self, request: "HTTPRequest", http_code: int, msg: Optional[str] = None ) -> None: """Log and set HTTP return code and associated message""" if msg is not None: log.warning(msg) request.setResponseCode(http_code, None if msg is None else msg.encode()) def _on_request_error(self, failure_: failure.Failure, request: "HTTPRequest") -> None: exc = failure_.value if isinstance(exc, exceptions.NotFound): self.response_code( request, http.NOT_FOUND, str(exc) ) else: log.exception(f"Internal error: {failure_.value}") self.response_code( request, http.INTERNAL_SERVER_ERROR, f"internal error: {failure_.value}" ) request.finish() raise failure_ request.finish() async def webfinger(self, request): url_parsed = parse.urlparse(request.uri.decode()) query = parse.parse_qs(url_parsed.query) resource = query.get("resource", [""])[0] account = resource[5:].strip() if not resource.startswith("acct:") or not account: return web_resource.ErrorPage( http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" ).render(request) actor_url = self.apg.build_apurl(TYPE_ACTOR, account) resp = { "aliases": [actor_url], "subject": resource, "links": [ { "rel": "self", "type": "application/activity+json", "href": actor_url } ] } request.setHeader("content-type", CONTENT_TYPE_WEBFINGER) request.write(json.dumps(resp).encode()) request.finish() async def handle_undo_activity( self, request: "HTTPRequest", data: dict, account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: str ) -> None: if node is None: node = self.apg._m.namespace client = await self.apg.get_virtual_client(signing_actor) object_ = data.get("object") if isinstance(object_, str): # we check first if it's not a cached object ap_cache_key = f"{ST_AP_CACHE}{object_}" value = await self.apg.client._ap_storage.get(ap_cache_key) else: value = None if value is not None: objects = [value] # because we'll undo the activity, we can remove it from cache await self.apg.client._ap_storage.remove(ap_cache_key) else: objects = await self.apg.ap_get_list(data, "object") for obj in objects: type_ = obj.get("type") actor = await self.apg.ap_get_sender_actor(obj) if actor != signing_actor: log.warning(f"ignoring object not attributed to signing actor: {data}") continue if type_ == "Follow": try: target_account = obj["object"] except KeyError: log.warning(f'ignoring invalid object, missing "object" key: {data}') continue if not self.apg.is_local_url(target_account): log.warning(f"ignoring unfollow request to non local actor: {data}") continue await self.apg._p.unsubscribe( client, account_jid, node, sender=client.jid, ) elif type_ == "Announce": # we can use directly the Announce object, as only the "id" field is # needed await self.apg.new_ap_delete_item(client, None, node, obj) elif type_ == TYPE_LIKE: await self.handle_attachment_item(client, obj, {"noticed": False}) elif type_ == TYPE_REACTION: await self.handle_attachment_item(client, obj, { "reactions": {"operation": "update", "remove": [obj["content"]]} }) else: log.warning(f"Unmanaged undo type: {type_!r}") async def handle_follow_activity( self, request: "HTTPRequest", data: dict, account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: str ) -> None: if node is None: node = self.apg._m.namespace client = await self.apg.get_virtual_client(signing_actor) try: subscription = await self.apg._p.subscribe( client, account_jid, node, # subscriptions from AP are always public options=self.apg._pps.set_public_opt() ) except pubsub.SubscriptionPending: log.info(f"subscription to node {node!r} of {account_jid} is pending") # TODO: manage SubscriptionUnconfigured else: if subscription.state != "subscribed": # other states should raise an Exception raise exceptions.InternalError('"subscribed" state was expected') inbox = await self.apg.get_ap_inbox_from_id(signing_actor, use_shared=False) actor_id = self.apg.build_apurl(TYPE_ACTOR, ap_account) accept_data = self.apg.create_activity( "Accept", actor_id, object_=data ) await self.apg.sign_and_post(inbox, actor_id, accept_data) await self.apg._c.synchronise(client, account_jid, node, resync=False) async def handle_accept_activity( self, request: "HTTPRequest", data: dict, account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: str ) -> None: if node is None: node = self.apg._m.namespace client = await self.apg.get_virtual_client(signing_actor) objects = await self.apg.ap_get_list(data, "object") for obj in objects: type_ = obj.get("type") if type_ == "Follow": follow_node = await self.apg.host.memory.storage.get_pubsub_node( client, client.jid, node, with_subscriptions=True ) if follow_node is None: log.warning( f"Received a follow accept on an unknown node: {node!r} at " f"{client.jid}. Ignoring it" ) continue try: sub = next( s for s in follow_node.subscriptions if s.subscriber==account_jid ) except StopIteration: log.warning( "Received a follow accept on a node without subscription: " f"{node!r} at {client.jid}. Ignoring it" ) else: if sub.state == SubscriptionState.SUBSCRIBED: log.warning(f"Already subscribed to {node!r} at {client.jid}") elif sub.state == SubscriptionState.PENDING: follow_node.subscribed = True sub.state = SubscriptionState.SUBSCRIBED await self.apg.host.memory.storage.add(follow_node) else: raise exceptions.InternalError( f"Unhandled subscription state {sub.state!r}" ) else: log.warning(f"Unmanaged accept type: {type_!r}") async def handle_delete_activity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ): if node is None: node = self.apg._m.namespace client = await self.apg.get_virtual_client(signing_actor) objects = await self.apg.ap_get_list(data, "object") for obj in objects: await self.apg.new_ap_delete_item(client, account_jid, node, obj) async def handle_new_ap_items( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], signing_actor: str, repeated: bool = False, ): """Helper method to handle workflow for new AP items accept globally the same parameter as for handle_create_activity @param repeated: if True, the item is an item republished from somewhere else """ if "_repeated" in data: log.error( '"_repeated" field already present in given AP item, this should not ' f"happen. Ignoring object from {signing_actor}\n{data}" ) raise exceptions.DataError("unexpected field in item") client = await self.apg.get_virtual_client(signing_actor) objects = await self.apg.ap_get_list(data, "object") for obj in objects: if node is None: if obj.get("type") == TYPE_EVENT: node = self.apg._events.namespace else: node = self.apg._m.namespace sender = await self.apg.ap_get_sender_actor(obj) if repeated: # we don't check sender when item is repeated, as it should be different # from post author in this case sender_jid = await self.apg.get_jid_from_id(sender) repeater_jid = await self.apg.get_jid_from_id(signing_actor) repeated_item_id = obj["id"] if self.apg.is_local_url(repeated_item_id): # the repeated object is from XMPP, we need to parse the URL to find # the right ID url_type, url_args = self.apg.parse_apurl(repeated_item_id) if url_type != "item": raise exceptions.DataError( "local URI is not an item: {repeated_id}" ) try: url_account, url_item_id = url_args if not url_account or not url_item_id: raise ValueError except (RuntimeError, ValueError): raise exceptions.DataError( "local URI is invalid: {repeated_id}" ) else: url_jid, url_node = await self.apg.get_jid_and_node(url_account) if ((url_jid != sender_jid or url_node and url_node != self.apg._m.namespace)): raise exceptions.DataError( "announced ID doesn't match sender ({sender}): " f"[repeated_item_id]" ) repeated_item_id = url_item_id obj["_repeated"] = { "by": repeater_jid.full(), "at": data.get("published"), "uri": uri.build_xmpp_uri( "pubsub", path=sender_jid.full(), node=self.apg._m.namespace, item=repeated_item_id ) } # we must use activity's id and targets, not the original item ones for field in ("id", "to", "bto", "cc", "bcc"): obj[field] = data.get(field) else: if sender != signing_actor: log.warning( "Ignoring object not attributed to signing actor: {obj}" ) continue await self.apg.new_ap_item(client, account_jid, node, obj) async def handle_create_activity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ): await self.handle_new_ap_items(request, data, account_jid, node, signing_actor) async def handle_update_activity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ): # Update is the same as create: the item ID stays the same, thus the item will be # overwritten await self.handle_new_ap_items(request, data, account_jid, node, signing_actor) async def handle_announce_activity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ): # we create a new item await self.handle_new_ap_items( request, data, account_jid, node, signing_actor, repeated=True ) async def handle_attachment_item( self, client: SatXMPPEntity, data: dict, attachment_data: dict ) -> None: target_ids = data.get("object") if not target_ids: raise exceptions.DataError("object should be set") elif isinstance(target_ids, list): try: target_ids = [o["id"] for o in target_ids] except (KeyError, TypeError): raise exceptions.DataError(f"invalid object: {target_ids!r}") elif isinstance(target_ids, dict): obj_id = target_ids.get("id") if not obj_id or not isinstance(obj_id, str): raise exceptions.DataError(f"invalid object: {target_ids!r}") target_ids = [obj_id] elif isinstance(target_ids, str): target_ids = [target_ids] # XXX: we have to cache AP items because some implementation (Pleroma notably) # don't keep object accessible, and we need to be able to retrieve them for # UNDO. Current implementation will grow, we need to add a way to flush it after # a while. # TODO: add a way to flush old cached AP items. await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data) for target_id in target_ids: if not self.apg.is_local_url(target_id): log.debug(f"ignoring non local target ID: {target_id}") continue url_type, url_args = self.apg.parse_apurl(target_id) if url_type != TYPE_ITEM: log.warning(f"unexpected local URL for attachment on item {target_id}") continue try: account, item_id = url_args except ValueError: raise ValueError(f"invalid URL: {target_id}") author_jid, item_node = await self.apg.get_jid_and_node(account) if item_node is None: item_node = self.apg._m.namespace attachment_node = self.apg._pa.get_attachment_node_name( author_jid, item_node, item_id ) cached_node = await self.apg.host.memory.storage.get_pubsub_node( client, author_jid, attachment_node, with_subscriptions=True, create=True ) found_items, __ = await self.apg.host.memory.storage.get_items( cached_node, item_ids=[client.jid.userhost()] ) if not found_items: old_item_elt = None else: found_item = found_items[0] old_item_elt = found_item.data item_elt = await self.apg._pa.apply_set_handler( client, {"extra": attachment_data}, old_item_elt, None ) # we reparse the element, as there can be other attachments attachments_data = self.apg._pa.items_2_attachment_data(client, [item_elt]) # and we update the cache await self.apg.host.memory.storage.cache_pubsub_items( client, cached_node, [item_elt], attachments_data or [{}] ) if self.apg.is_virtual_jid(author_jid): # the attachment is on t a virtual pubsub service (linking to an AP item), # we notify all subscribers for subscription in cached_node.subscriptions: if subscription.state != SubscriptionState.SUBSCRIBED: continue self.apg.pubsub_service.notifyPublish( author_jid, attachment_node, [(subscription.subscriber, None, [item_elt])] ) else: # the attachment is on an XMPP item, we publish it to the attachment node await self.apg._p.send_items( client, author_jid, attachment_node, [item_elt] ) async def handle_like_activity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ) -> None: client = await self.apg.get_virtual_client(signing_actor) await self.handle_attachment_item(client, data, {"noticed": True}) async def handle_emojireact_activity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ) -> None: client = await self.apg.get_virtual_client(signing_actor) await self.handle_attachment_item(client, data, { "reactions": {"operation": "update", "add": [data["content"]]} }) async def handle_join_activity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ) -> None: client = await self.apg.get_virtual_client(signing_actor) await self.handle_attachment_item(client, data, {"rsvp": {"attending": "yes"}}) async def handle_leave_activity( self, request: "HTTPRequest", data: dict, account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: str ) -> None: client = await self.apg.get_virtual_client(signing_actor) await self.handle_attachment_item(client, data, {"rsvp": {"attending": "no"}}) async def ap_actor_request( self, request: "HTTPRequest", data: Optional[dict], account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: Optional[str] ) -> dict: inbox = self.apg.build_apurl(TYPE_INBOX, ap_account) shared_inbox = self.apg.build_apurl(TYPE_SHARED_INBOX) outbox = self.apg.build_apurl(TYPE_OUTBOX, ap_account) followers = self.apg.build_apurl(TYPE_FOLLOWERS, ap_account) following = self.apg.build_apurl(TYPE_FOLLOWING, ap_account) # we have to use AP account as preferredUsername because it is used to retrieve # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) preferred_username = ap_account.split("@", 1)[0] identity_data = await self.apg._i.get_identity(self.apg.client, account_jid) if node and node.startswith(self.apg._events.namespace): events = outbox else: events_account = await self.apg.get_ap_account_from_jid_and_node( account_jid, self.apg._events.namespace ) events = self.apg.build_apurl(TYPE_OUTBOX, events_account) actor_data = { "@context": [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1" ], # XXX: Mastodon doesn't like percent-encode arobas, so we have to unescape it # if it is escaped "id": ap_url.replace("%40", "@"), "type": "Person", "preferredUsername": preferred_username, "inbox": inbox, "outbox": outbox, "events": events, "followers": followers, "following": following, "publicKey": { "id": f"{ap_url}#main-key", "owner": ap_url, "publicKeyPem": self.apg.public_key_pem }, "endpoints": { "sharedInbox": shared_inbox, "events": events, }, } if identity_data.get("nicknames"): actor_data["name"] = identity_data["nicknames"][0] if identity_data.get("description"): # description is plain text while summary expects HTML actor_data["summary"] = html.escape(identity_data["description"]) if identity_data.get("avatar"): avatar_data = identity_data["avatar"] try: filename = avatar_data["filename"] media_type = avatar_data["media_type"] except KeyError: log.error(f"incomplete avatar data: {identity_data!r}") else: avatar_url = self.apg.build_apurl("avatar", filename) actor_data["icon"] = { "type": "Image", "url": avatar_url, "mediaType": media_type } return actor_data def get_canonical_url(self, request: "HTTPRequest") -> str: return parse.urljoin( f"https://{self.apg.public_url}", request.path.decode().rstrip("/") # we unescape "@" for the same reason as in [ap_actor_request] ).replace("%40", "@") def query_data_2_rsm_request( self, query_data: Dict[str, List[str]] ) -> rsm.RSMRequest: """Get RSM kwargs to use with RSMRequest from query data""" page = query_data.get("page") if page == ["first"]: return rsm.RSMRequest(max_=PAGE_SIZE, before="") elif page == ["last"]: return rsm.RSMRequest(max_=PAGE_SIZE) else: for query_key in ("index", "before", "after"): try: kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} except (KeyError, IndexError, ValueError): pass else: return rsm.RSMRequest(**kwargs) raise ValueError(f"Invalid query data: {query_data!r}") async def ap_outbox_page_request( self, request: "HTTPRequest", data: Optional[dict], account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, query_data: Dict[str, List[str]] ) -> dict: if node is None: node = self.apg._m.namespace # we only keep useful keys, and sort to have consistent URL which can # be used as ID url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) query_data = {k: query_data[k] for k in url_keys} try: items, metadata = await self.apg._p.get_items( client=self.apg.client, service=account_jid, node=node, rsm_request=self.query_data_2_rsm_request(query_data), extra = {C.KEY_USE_CACHE: False} ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") return {} base_url = self.get_canonical_url(request) url = f"{base_url}?{parse.urlencode(query_data, True)}" if node and node.startswith(self.apg._events.namespace): ordered_items = [ await self.apg.ap_events.event_data_2_ap_item( self.apg._events.event_elt_2_event_data(item), account_jid ) for item in reversed(items) ] else: ordered_items = [ await self.apg.mb_data_2_ap_item( self.apg.client, await self.apg._m.item_2_mb_data( self.apg.client, item, account_jid, node ) ) for item in reversed(items) ] ret_data = { "@context": ["https://www.w3.org/ns/activitystreams"], "id": url, "type": "OrderedCollectionPage", "partOf": base_url, "orderedItems": ordered_items } if "rsm" not in metadata: # no RSM available, we return what we have return ret_data # AP OrderedCollection must be in reversed chronological order, thus the opposite # of what we get with RSM (at least with Libervia Pubsub) if not metadata["complete"]: try: last= metadata["rsm"]["last"] except KeyError: last = None ret_data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" if metadata["rsm"]["index"] != 0: try: first= metadata["rsm"]["first"] except KeyError: first = None ret_data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" return ret_data async def ap_outbox_request( self, request: "HTTPRequest", data: Optional[dict], account_jid: jid.JID, node: Optional[str], ap_account: str, ap_url: str, signing_actor: Optional[str] ) -> dict: if node is None: node = self.apg._m.namespace parsed_url = parse.urlparse(request.uri.decode()) query_data = parse.parse_qs(parsed_url.query) if query_data: return await self.ap_outbox_page_request( request, data, account_jid, node, ap_account, ap_url, query_data ) # XXX: we can't use disco#info here because this request won't work on a bare jid # due to security considerations of XEP-0030 (we don't have presence # subscription). # The current workaround is to do a request as if RSM was available, and actually # check its availability according to result. try: __, metadata = await self.apg._p.get_items( client=self.apg.client, service=account_jid, node=node, max_items=0, rsm_request=rsm.RSMRequest(max_=0), extra = {C.KEY_USE_CACHE: False} ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") return {} try: items_count = metadata["rsm"]["count"] except KeyError: log.warning( f"No RSM metadata found when requesting pubsub node {node} at " f"{account_jid}, defaulting to items_count=20" ) items_count = 20 url = self.get_canonical_url(request) url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" return { "@context": ["https://www.w3.org/ns/activitystreams"], "id": url, "totalItems": items_count, "type": "OrderedCollection", "first": url_first_page, "last": url_last_page, } async def ap_inbox_request( self, request: "HTTPRequest", data: Optional[dict], account_jid: Optional[jid.JID], node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: Optional[str] ) -> None: assert data is not None if signing_actor is None: raise exceptions.InternalError("signing_actor must be set for inbox requests") await self.check_signing_actor(data, signing_actor) activity_type = (data.get("type") or "").lower() if not activity_type in ACTIVITY_TYPES_LOWER: return self.response_code( request, http.UNSUPPORTED_MEDIA_TYPE, f"request is not an activity, ignoring" ) if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED: return self.response_code( request, http.UNSUPPORTED_MEDIA_TYPE, f"{activity_type.title()!r} activity must target an account" ) try: method = getattr(self, f"handle_{activity_type}_activity") except AttributeError: return self.response_code( request, http.UNSUPPORTED_MEDIA_TYPE, f"{activity_type.title()} activity is not yet supported" ) else: await method( request, data, account_jid, node, ap_account, ap_url, signing_actor ) async def ap_followers_request( self, request: "HTTPRequest", data: Optional[dict], account_jid: jid.JID, node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: Optional[str] ) -> dict: if node is None: node = self.apg._m.namespace client = self.apg.client subscribers = await self.apg._pps.get_public_node_subscriptions( client, account_jid, node ) followers = [] for subscriber in subscribers.keys(): if self.apg.is_virtual_jid(subscriber): # the subscriber is an AP user subscribed with this gateway ap_account = self.apg._e.unescape(subscriber.user) else: # regular XMPP user ap_account = await self.apg.get_ap_account_from_jid_and_node(subscriber, node) followers.append(ap_account) url = self.get_canonical_url(request) return { "@context": ["https://www.w3.org/ns/activitystreams"], "type": "OrderedCollection", "id": url, "totalItems": len(subscribers), "first": { "type": "OrderedCollectionPage", "id": url, "orderedItems": followers } } async def ap_following_request( self, request: "HTTPRequest", data: Optional[dict], account_jid: jid.JID, node: Optional[str], ap_account: Optional[str], ap_url: str, signing_actor: Optional[str] ) -> dict[str, Any]: client = self.apg.client subscriptions = await self.apg._pps.subscriptions( client, account_jid, node ) following = [] for sub_dict in subscriptions: service = jid.JID(sub_dict["service"]) if self.apg.is_virtual_jid(service): # the subscription is to an AP actor with this gateway ap_account = self.apg._e.unescape(service.user) else: # regular XMPP user ap_account = await self.apg.get_ap_account_from_jid_and_node( service, sub_dict["node"] ) following.append(ap_account) url = self.get_canonical_url(request) return { "@context": ["https://www.w3.org/ns/activitystreams"], "type": "OrderedCollection", "id": url, "totalItems": len(subscriptions), "first": { "type": "OrderedCollectionPage", "id": url, "orderedItems": following } } def _get_to_log( self, request: "HTTPRequest", data: Optional[dict] = None, ) -> List[str]: """Get base data to logs in verbose mode""" from pprint import pformat to_log = [ "", f"<<< got {request.method.decode()} request - {request.uri.decode()}" ] if data is not None: to_log.append(pformat(data)) if self.apg.verbose>=3: headers = "\n".join( f" {k.decode()}: {v.decode()}" for k,v in request.getAllHeaders().items() ) to_log.append(f" headers:\n{headers}") return to_log async def ap_request( self, request: "HTTPRequest", data: Optional[dict] = None, signing_actor: Optional[str] = None ) -> None: if self.apg.verbose: to_log = self._get_to_log(request, data) path = request.path.decode() ap_url = parse.urljoin( f"https://{self.apg.public_url}", path ) request_type, extra_args = self.apg.parse_apurl(ap_url) header_accept = request.getHeader("accept") or "" if ((MEDIA_TYPE_AP not in header_accept and MEDIA_TYPE_AP_ALT not in header_accept and request_type in self.apg.html_redirect)): # this is not a AP request, and we have a redirections for it kw = {} if extra_args: kw["jid"], kw["node"] = await self.apg.get_jid_and_node(extra_args[0]) kw["jid_user"] = kw["jid"].user if kw["node"] is None: kw["node"] = self.apg._m.namespace if len(extra_args) > 1: kw["item"] = extra_args[1] else: kw["item"] = "" else: kw["jid"], kw["jid_user"], kw["node"], kw["item"] = "", "", "", "" redirections = self.apg.html_redirect[request_type] for redirection in redirections: filters = redirection["filters"] if not filters: break # if we have filter, they must all match elif all(v in kw[k] for k,v in filters.items()): break else: # no redirection is matching redirection = None if redirection is not None: kw = {k: parse.quote(str(v), safe="") for k,v in kw.items()} target_url = redirection["url"].format(**kw) content = web_util.redirectTo(target_url.encode(), request) request.write(content) request.finish() return if len(extra_args) == 0: if request_type != "shared_inbox": raise exceptions.DataError(f"Invalid request type: {request_type!r}") ret_data = await self.ap_inbox_request( request, data, None, None, None, ap_url, signing_actor ) elif request_type == "avatar": if len(extra_args) != 1: raise exceptions.DataError("avatar argument expected in URL") avatar_filename = extra_args[0] avatar_path = self.apg.host.common_cache.getPath(avatar_filename) return static.File(str(avatar_path)).render(request) elif request_type == "item": ret_data = await self.apg.ap_get_local_object(ap_url) if "@context" not in ret_data: ret_data["@context"] = [NS_AP] else: if len(extra_args) > 1: log.warning(f"unexpected extra arguments: {extra_args!r}") ap_account = extra_args[0] account_jid, node = await self.apg.get_jid_and_node(ap_account) if request_type not in AP_REQUEST_TYPES.get( request.method.decode().upper(), [] ): raise exceptions.DataError(f"Invalid request type: {request_type!r}") method = getattr(self, f"ap_{request_type}_request") ret_data = await method( request, data, account_jid, node, ap_account, ap_url, signing_actor ) if ret_data is not None: request.setHeader("content-type", CONTENT_TYPE_AP) request.write(json.dumps(ret_data).encode()) if self.apg.verbose: to_log.append(f"--- RET (code: {request.code})---") if self.apg.verbose>=2: if ret_data is not None: from pprint import pformat to_log.append(f"{pformat(ret_data)}") to_log.append("---") log.info("\n".join(to_log)) request.finish() async def ap_post_request(self, request: "HTTPRequest") -> None: try: data = json.load(request.content) if not isinstance(data, dict): log.warning(f"JSON data should be an object (uri={request.uri.decode()})") self.response_code( request, http.BAD_REQUEST, f"invalid body, was expecting a JSON object" ) request.finish() return except (json.JSONDecodeError, ValueError) as e: self.response_code( request, http.BAD_REQUEST, f"invalid json in inbox request: {e}" ) request.finish() return else: request.content.seek(0) try: if data["type"] == "Delete" and data["actor"] == data["object"]: # we don't handle actor deletion request.setResponseCode(http.ACCEPTED) log.debug(f"ignoring actor deletion ({data['actor']})") # TODO: clean data in cache coming from this actor, maybe with a tombstone request.finish() return except KeyError: pass try: signing_actor = await self.check_signature(request) except exceptions.EncryptionError as e: if self.apg.verbose: to_log = self._get_to_log(request) to_log.append(f" body: {request.content.read()!r}") request.content.seek(0) log.info("\n".join(to_log)) self.response_code( request, http.FORBIDDEN, f"invalid signature: {e}" ) request.finish() return except Exception as e: self.response_code( request, http.INTERNAL_SERVER_ERROR, f"Can't check signature: {e}" ) request.finish() return request.setResponseCode(http.ACCEPTED) digest = request.getHeader("digest") if digest in self._seen_digest: log.debug(f"Ignoring duplicated request (digest: {digest!r})") request.finish() return self._seen_digest.append(digest) # default response code, may be changed, e.g. in case of exception try: return await self.ap_request(request, data, signing_actor) except Exception as e: self._on_request_error(failure.Failure(e), request) async def check_signing_actor(self, data: dict, signing_actor: str) -> None: """That that signing actor correspond to actor declared in data @param data: request payload @param signing_actor: actor ID of the signing entity, as returned by check_signature @raise exceptions.NotFound: no actor found in data @raise exceptions.EncryptionError: signing actor doesn't match actor in data """ actor = await self.apg.ap_get_sender_actor(data) if signing_actor != actor: raise exceptions.EncryptionError( f"signing actor ({signing_actor}) doesn't match actor in data ({actor})" ) async def check_signature(self, request: "HTTPRequest") -> str: """Check and validate HTTP signature @return: id of the signing actor @raise exceptions.EncryptionError: signature is not present or doesn't match """ signature = request.getHeader("Signature") if signature is None: raise exceptions.EncryptionError("No signature found") sign_data = { m["key"]: m["uq_value"] or m["quoted_value"][1:-1] for m in RE_SIG_PARAM.finditer(signature) } try: key_id = sign_data["keyId"] except KeyError: raise exceptions.EncryptionError('"keyId" is missing from signature') algorithm = sign_data.get("algorithm", HS2019) signed_headers = sign_data.get( "headers", "(created)" if algorithm==HS2019 else "date" ).lower().split() try: headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method] except KeyError: raise exceptions.InternalError( f"there should be a list of headers for {request.method} method" ) if not headers_to_check: raise exceptions.InternalError("headers_to_check must not be empty") for header in headers_to_check: if isinstance(header, tuple): if len(set(header).intersection(signed_headers)) == 0: raise exceptions.EncryptionError( f"at least one of following header must be signed: {header}" ) elif header not in signed_headers: raise exceptions.EncryptionError( f"the {header!r} header must be signed" ) body = request.content.read() request.content.seek(0) headers = {} for to_sign in signed_headers: if to_sign == "(request-target)": method = request.method.decode().lower() uri = request.uri.decode() headers[to_sign] = f"{method} /{uri.lstrip('/')}" elif to_sign in ("(created)", "(expires)"): if algorithm != HS2019: raise exceptions.EncryptionError( f"{to_sign!r} pseudo-header can only be used with {HS2019} " "algorithm" ) key = to_sign[1:-1] value = sign_data.get(key) if not value: raise exceptions.EncryptionError( "{key!r} parameter is missing from signature" ) try: if float(value) < 0: raise ValueError except ValueError: raise exceptions.EncryptionError( f"{to_sign} must be a Unix timestamp" ) headers[to_sign] = value else: value = request.getHeader(to_sign) if not value: raise exceptions.EncryptionError( f"value of header {to_sign!r} is missing!" ) elif to_sign == "host": # we check Forwarded/X-Forwarded-Host headers # as we need original host if a proxy has modified the header forwarded = request.getHeader("forwarded") if forwarded is not None: try: host = [ f[5:] for f in forwarded.split(";") if f.startswith("host=") ][0] or None except IndexError: host = None else: host = None if host is None: host = request.getHeader("x-forwarded-host") if host: value = host elif to_sign == "digest": hashes = { algo.lower(): hash_ for algo, hash_ in ( digest.split("=", 1) for digest in value.split(",") ) } try: given_digest = hashes["sha-256"] except KeyError: raise exceptions.EncryptionError( "Only SHA-256 algorithm is currently supported for digest" ) __, computed_digest = self.apg.get_digest(body) if given_digest != computed_digest: raise exceptions.EncryptionError( f"SHA-256 given and computed digest differ:\n" f"given: {given_digest!r}\ncomputed: {computed_digest!r}" ) headers[to_sign] = value # date check limit_ts = time.time() + SIGN_EXP if "(created)" in headers: created = float(headers["created"]) else: created = date_utils.date_parse(headers["date"]) try: expires = float(headers["expires"]) except KeyError: pass else: if expires < created: log.warning( f"(expires) [{expires}] set in the past of (created) [{created}] " "ignoring it according to specs" ) else: limit_ts = min(limit_ts, expires) if created > limit_ts: raise exceptions.EncryptionError("Signature has expired") try: return await self.apg.check_signature( sign_data["signature"], key_id, headers ) except exceptions.EncryptionError: method, url = headers["(request-target)"].rsplit(' ', 1) headers["(request-target)"] = f"{method} {parse.unquote(url)}" log.debug( "Using workaround for (request-target) encoding bug in signature, " "see https://github.com/mastodon/mastodon/issues/18871" ) return await self.apg.check_signature( sign_data["signature"], key_id, headers ) def render(self, request): request.setHeader("server", VERSION) return super().render(request) def render_GET(self, request): path = request.path.decode().lstrip("/") if path.startswith(".well-known/webfinger"): defer.ensureDeferred(self.webfinger(request)) return server.NOT_DONE_YET elif path.startswith(self.apg.ap_path): d = defer.ensureDeferred(self.ap_request(request)) d.addErrback(self._on_request_error, request) return server.NOT_DONE_YET return web_resource.NoResource().render(request) def render_POST(self, request): path = request.path.decode().lstrip("/") if not path.startswith(self.apg.ap_path): return web_resource.NoResource().render(request) defer.ensureDeferred(self.ap_post_request(request)) return server.NOT_DONE_YET class HTTPRequest(server.Request): pass class HTTPServer(server.Site): requestFactory = HTTPRequest def __init__(self, ap_gateway): super().__init__(HTTPAPGServer(ap_gateway))