Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3745:a8c7e5cef0cb
comp AP gateway: signature checking, caching and threads management:
- HTTP signature is checked for incoming messages
- AP actor can now be followed using pubsub subscription. When following is accepted, the
node is cached
- replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth`
option to limit the number of comment nodes for a root message (documentation will come
to explain this).
ticket 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | 86eea17cafa7 |
children | 125c7043b277 |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py Tue Mar 22 17:00:42 2022 +0100 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Tue Mar 22 17:00:42 2022 +0100 @@ -16,14 +16,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import time from typing import Optional, Dict, List import json from urllib import parse -import re +from collections import deque import unicodedata +from pprint import pformat from twisted.web import http, resource as web_resource, server -from twisted.internet import defer +from twisted.internet import reactor, defer from twisted.words.protocols.jabber import jid, error from wokkel import pubsub, rsm @@ -31,9 +33,15 @@ from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger +from sat.tools.common import date_utils +from sat.memory.sqla_mapping import SubscriptionState -from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX, - AP_REQUEST_TYPES, PAGE_SIZE) +from .constants import ( + CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, + AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, + SIGN_HEADERS, HS2019, SIGN_EXP +) +from .regex import RE_SIG_PARAM log = getLogger(__name__) @@ -50,8 +58,19 @@ def __init__(self, ap_gateway): self.apg = ap_gateway + self._seen_digest = deque(maxlen=50) super().__init__() + def responseCode( + self, + request: "HTTPRequest", + http_code: int, + msg: Optional[str] = None + ) -> None: + """Log and set HTTP return code and associated message""" + log.warning(msg) + request.setResponseCode(http_code, None if msg is None else msg.encode()) + async def webfinger(self, request): url_parsed = parse.urlparse(request.uri.decode()) query = parse.parse_qs(url_parsed.query) @@ -78,15 +97,127 @@ request.write(json.dumps(resp).encode()) request.finish() + async def handleFollowActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: str + ) -> None: + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + try: + subscription = await self.apg._p.subscribe( + client, + account_jid, + node + ) + except pubsub.SubscriptionPending: + log.info(f"subscription to node {node!r} of {account_jid} is pending") + # TODO: manage SubscriptionUnconfigured + else: + if subscription.state != "subscribed": + # other states should raise an Exception + raise exceptions.InternalError('"subscribed" state was expected') + inbox = await self.apg.getAPInboxFromId(signing_actor) + actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account) + accept_data = self.apg.createActivity( + "Accept", actor_id, object_=data + ) + await self.apg.signAndPost(inbox, actor_id, accept_data) + + async def handleAcceptActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: str + ) -> None: + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + objects = await self.apg.apGetList(data, "object") + for obj in objects: + type_ = obj.get("type") + if type_ == "Follow": + follow_node = await self.apg.host.memory.storage.getPubsubNode( + client, client.jid, node, with_subscriptions=True + ) + if follow_node is None: + log.warning( + f"Received a follow accept on an unknown node: {node!r} at " + f"{client.jid}. Ignoring it" + ) + continue + try: + sub = next( + s for s in follow_node.subscriptions if s.subscriber==account_jid + ) + except StopIteration: + log.warning( + "Received a follow accept on a node without subscription: " + f"{node!r} at {client.jid}. Ignoring it" + ) + else: + if sub.state == SubscriptionState.SUBSCRIBED: + log.warning(f"Already subscribed to {node!r} at {client.jid}") + elif sub.state == SubscriptionState.PENDING: + follow_node.subscribed = True + sub.state = SubscriptionState.SUBSCRIBED + await self.apg.host.memory.storage.add(follow_node) + else: + raise exceptions.InternalError( + f"Unhandled subscription state {sub.state!r}" + ) + else: + log.warning(f"Unmanaged accept type: {type_!r}") + + async def handleCreateActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + digest = request.getHeader("digest") + if digest in self._seen_digest: + log.debug(f"Ignoring duplicated request (digest: {digest!r})") + return + self._seen_digest.append(digest) + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + objects = await self.apg.apGetList(data, "object") + for obj in objects: + sender = await self.apg.apGetSenderActor(obj) + if sender != signing_actor: + log.warning( + "Ignoring object not attributed to signing actor: {obj}" + ) + else: + await self.apg.newAPItem(client, account_jid, node, obj) + async def APActorRequest( self, request: "HTTPRequest", account_jid: jid.JID, node: Optional[str], ap_account: str, - actor_url: str + actor_url: str, + signing_actor: Optional[str] ) -> dict: inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) + shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX) outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) # we have to use AP account as preferredUsername because it is used to retrieve @@ -107,7 +238,10 @@ "id": f"{actor_url}#main-key", "owner": actor_url, "publicKeyPem": self.apg.public_key_pem - } + }, + "endpoints": { + "sharedInbox": shared_inbox + }, } def getCanonicalURL(self, request: "HTTPRequest") -> str: @@ -206,7 +340,8 @@ account_jid: jid.JID, node: Optional[str], ap_account: str, - ap_url: str + ap_url: str, + signing_actor: Optional[str] ) -> dict: if node is None: node = self.apg._m.namespace @@ -229,7 +364,8 @@ service=account_jid, node=node, max_items=0, - rsm_request=rsm.RSMRequest(max_=0) + rsm_request=rsm.RSMRequest(max_=0), + extra = {C.KEY_USE_CACHE: False} ) except error.StanzaError as e: log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") @@ -255,22 +391,268 @@ "last": url_last_page, } - async def APRequest(self, request): + async def APInboxRequest( + self, + request: "HTTPRequest", + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: Optional[str] + ) -> None: + if signing_actor is None: + raise exceptions.InternalError("signing_actor must be set for inbox requests") + if node is None: + node = self.apg._m.namespace + try: + data = json.load(request.content) + if not isinstance(data, dict): + raise ValueError("data should be an object") + except (json.JSONDecodeError, ValueError) as e: + return self.responseCode( + request, + http.BAD_REQUEST, + f"invalid json in inbox request: {e}" + ) + await self.checkSigningActor(data, signing_actor) + activity_type = (data.get("type") or "").lower() + if not activity_type in ACTIVITY_TYPES_LOWER: + return self.responseCode( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"request is not an activity, ignoring" + ) + + if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED: + return self.responseCode( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"{activity_type.title()!r} activity must target an account" + ) + + try: + method = getattr(self, f"handle{activity_type.title()}Activity") + except AttributeError: + return self.responseCode( + request, + http.UNSUPPORTED_MEDIA_TYPE, + f"{activity_type.title()} activity is not yet supported" + ) + else: + await method( + request, data, account_jid, node, ap_account, ap_url, signing_actor + ) + + async def APRequest( + self, + request: "HTTPRequest", + signing_actor: Optional[str] = None + ) -> None: path = request.path.decode() ap_url = parse.urljoin( f"https://{self.apg.public_url}", path ) - request_type, ap_account = self.apg.parseAPURL(ap_url) - account_jid, node = await self.apg.getJIDAndNode(ap_account) - if request_type not in AP_REQUEST_TYPES: - raise exceptions.DataError(f"Invalid request type: {request_type!r}") - method = getattr(self, f"AP{request_type.title()}Request") - ret_data = await method(request, account_jid, node, ap_account, ap_url) - request.setHeader("content-type", CONTENT_TYPE_AP) - request.write(json.dumps(ret_data).encode()) + request_type, extra_args = self.apg.parseAPURL(ap_url) + if len(extra_args) == 0: + if request_type != "shared_inbox": + raise exceptions.DataError(f"Invalid request type: {request_type!r}") + ret_data = await self.APInboxRequest( + request, None, None, None, ap_url, signing_actor + ) + else: + if len(extra_args) > 1: + log.warning(f"unexpected extra arguments: {extra_args!r}") + ap_account = extra_args[0] + account_jid, node = await self.apg.getJIDAndNode(ap_account) + if request_type not in AP_REQUEST_TYPES.get( + request.method.decode().upper(), [] + ): + raise exceptions.DataError(f"Invalid request type: {request_type!r}") + method = getattr(self, f"AP{request_type.title()}Request") + ret_data = await method( + request, account_jid, node, ap_account, ap_url, signing_actor + ) + if ret_data is not None: + request.setHeader("content-type", CONTENT_TYPE_AP) + request.write(json.dumps(ret_data).encode()) request.finish() + async def APPostRequest(self, request: "HTTPRequest"): + try: + signing_actor = await self.checkSignature(request) + except exceptions.EncryptionError as e: + self.responseCode( + request, + http.FORBIDDEN, + f"invalid signature: {e}" + ) + request.finish() + return + + return await self.APRequest(request, signing_actor) + + async def checkSigningActor(self, data: dict, signing_actor: str) -> None: + """That that signing actor correspond to actor declared in data + + @param data: request payload + @param signing_actor: actor ID of the signing entity, as returned by + checkSignature + @raise exceptions.NotFound: no actor found in data + @raise exceptions.EncryptionError: signing actor doesn't match actor in data + """ + actor = await self.apg.apGetSenderActor(data) + + if signing_actor != actor: + raise exceptions.EncryptionError( + f"signing actor ({signing_actor}) doesn't match actor in data ({actor})" + ) + + async def checkSignature(self, request: "HTTPRequest") -> str: + """Check and validate HTTP signature + + @return: id of the signing actor + + @raise exceptions.EncryptionError: signature is not present or doesn't match + """ + signature = request.getHeader("Signature") + if signature is None: + raise exceptions.EncryptionError("No signature found") + sign_data = { + m["key"]: m["uq_value"] or m["quoted_value"][1:-1] + for m in RE_SIG_PARAM.finditer(signature) + } + try: + key_id = sign_data["keyId"] + except KeyError: + raise exceptions.EncryptionError('"keyId" is missing from signature') + algorithm = sign_data.get("algorithm", HS2019) + signed_headers = sign_data.get( + "headers", + "(created)" if algorithm==HS2019 else "date" + ).lower().split() + try: + headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method] + except KeyError: + raise exceptions.InternalError( + f"there should be a list of headers for {request.method} method" + ) + if not headers_to_check: + raise exceptions.InternalError("headers_to_check must not be empty") + + for header in headers_to_check: + if isinstance(header, tuple): + if len(set(header).intersection(signed_headers)) == 0: + raise exceptions.EncryptionError( + f"at least one of following header must be signed: {header}" + ) + elif header not in signed_headers: + raise exceptions.EncryptionError( + f"the {header!r} header must be signed" + ) + + body = request.content.read() + request.content.seek(0) + headers = {} + for to_sign in signed_headers: + if to_sign == "(request-target)": + method = request.method.decode().lower() + uri = parse.unquote(request.uri.decode()) + headers[to_sign] = f"{method} /{uri.lstrip('/')}" + elif to_sign in ("(created)", "(expires)"): + if algorithm != HS2019: + raise exceptions.EncryptionError( + f"{to_sign!r} pseudo-header can only be used with {HS2019} " + "algorithm" + ) + key = to_sign[1:-1] + value = sign_data.get(key) + if not value: + raise exceptions.EncryptionError( + "{key!r} parameter is missing from signature" + ) + try: + if float(value) < 0: + raise ValueError + except ValueError: + raise exceptions.EncryptionError( + f"{to_sign} must be a Unix timestamp" + ) + headers[to_sign] = value + else: + value = request.getHeader(to_sign) + if not value: + raise exceptions.EncryptionError( + f"value of header {to_sign!r} is missing!" + ) + elif to_sign == "host": + # we check Forwarded/X-Forwarded-Host headers + # as we need original host if a proxy has modified the header + forwarded = request.getHeader("forwarded") + if forwarded is not None: + try: + host = [ + f[5:] for f in forwarded.split(";") + if f.startswith("host=") + ][0] or None + except IndexError: + host = None + else: + host = None + if host is None: + host = request.getHeader("x-forwarded-host") + if host: + value = host + elif to_sign == "digest": + hashes = { + algo.lower(): hash_ for algo, hash_ in ( + digest.split("=", 1) for digest in value.split(",") + ) + } + try: + given_digest = hashes["sha-256"] + except KeyError: + raise exceptions.EncryptionError( + "Only SHA-256 algorithm is currently supported for digest" + ) + __, computed_digest = self.apg.getDigest(body) + if given_digest != computed_digest: + raise exceptions.EncryptionError( + f"SHA-256 given and computed digest differ:\n" + f"given: {given_digest!r}\ncomputed: {computed_digest!r}" + ) + headers[to_sign] = value + + # date check + limit_ts = time.time() + SIGN_EXP + if "(created)" in headers: + created = float(headers["created"]) + else: + created = date_utils.date_parse(headers["date"]) + + + try: + expires = float(headers["expires"]) + except KeyError: + pass + else: + if expires < created: + log.warning( + f"(expires) [{expires}] set in the past of (created) [{created}] " + "ignoring it according to specs" + ) + else: + limit_ts = min(limit_ts, expires) + + if created > limit_ts: + raise exceptions.EncryptionError("Signature has expired") + + return await self.apg.checkSignature( + sign_data["signature"], + key_id, + headers + ) + def render(self, request): request.setHeader("server", VERSION) return super().render(request) @@ -286,6 +668,13 @@ return web_resource.NoResource().render(request) + def render_POST(self, request): + path = request.path.decode().lstrip("/") + if not path.startswith(self.apg.ap_path): + return web_resource.NoResource().render(request) + defer.ensureDeferred(self.APPostRequest(request)) + return server.NOT_DONE_YET + class HTTPRequest(server.Request): pass