diff sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3745:a8c7e5cef0cb

comp AP gateway: signature checking, caching and threads management: - HTTP signature is checked for incoming messages - AP actor can now be followed using pubsub subscription. When following is accepted, the node is cached - replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth` option to limit the number of comment nodes for a root message (documentation will come to explain this). ticket 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 86eea17cafa7
children 125c7043b277
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Tue Mar 22 17:00:42 2022 +0100
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Tue Mar 22 17:00:42 2022 +0100
@@ -16,14 +16,16 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import time
 from typing import Optional, Dict, List
 import json
 from urllib import parse
-import re
+from collections import deque
 import unicodedata
+from pprint import pformat
 
 from twisted.web import http, resource as web_resource, server
-from twisted.internet import defer
+from twisted.internet import reactor, defer
 from twisted.words.protocols.jabber import jid, error
 from wokkel import pubsub, rsm
 
@@ -31,9 +33,15 @@
 from sat.core.constants import Const as C
 from sat.core.i18n import _
 from sat.core.log import getLogger
+from sat.tools.common import date_utils
+from sat.memory.sqla_mapping import SubscriptionState
 
-from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX,
-                       AP_REQUEST_TYPES, PAGE_SIZE)
+from .constants import (
+    CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
+    AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
+    SIGN_HEADERS, HS2019, SIGN_EXP
+)
+from .regex import RE_SIG_PARAM
 
 
 log = getLogger(__name__)
@@ -50,8 +58,19 @@
 
     def __init__(self, ap_gateway):
         self.apg = ap_gateway
+        self._seen_digest = deque(maxlen=50)
         super().__init__()
 
+    def responseCode(
+        self,
+        request: "HTTPRequest",
+        http_code: int,
+        msg: Optional[str] = None
+    ) -> None:
+        """Log and set HTTP return code and associated message"""
+        log.warning(msg)
+        request.setResponseCode(http_code, None if msg is None else msg.encode())
+
     async def webfinger(self, request):
         url_parsed = parse.urlparse(request.uri.decode())
         query = parse.parse_qs(url_parsed.query)
@@ -78,15 +97,127 @@
         request.write(json.dumps(resp).encode())
         request.finish()
 
+    async def handleFollowActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        try:
+            subscription = await self.apg._p.subscribe(
+                client,
+                account_jid,
+                node
+            )
+        except pubsub.SubscriptionPending:
+            log.info(f"subscription to node {node!r} of {account_jid} is pending")
+        # TODO: manage SubscriptionUnconfigured
+        else:
+            if subscription.state != "subscribed":
+                # other states should raise an Exception
+                raise exceptions.InternalError('"subscribed" state was expected')
+            inbox = await self.apg.getAPInboxFromId(signing_actor)
+            actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account)
+            accept_data = self.apg.createActivity(
+                "Accept", actor_id, object_=data
+            )
+            await self.apg.signAndPost(inbox, actor_id, accept_data)
+
+    async def handleAcceptActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        objects = await self.apg.apGetList(data, "object")
+        for obj in objects:
+            type_ = obj.get("type")
+            if type_ == "Follow":
+                follow_node = await self.apg.host.memory.storage.getPubsubNode(
+                    client, client.jid, node, with_subscriptions=True
+                )
+                if follow_node is None:
+                    log.warning(
+                        f"Received a follow accept on an unknown node: {node!r} at "
+                        f"{client.jid}. Ignoring it"
+                    )
+                    continue
+                try:
+                    sub = next(
+                        s for s in follow_node.subscriptions if s.subscriber==account_jid
+                    )
+                except StopIteration:
+                    log.warning(
+                        "Received a follow accept on a node without subscription: "
+                        f"{node!r} at {client.jid}. Ignoring it"
+                    )
+                else:
+                    if sub.state == SubscriptionState.SUBSCRIBED:
+                        log.warning(f"Already subscribed to {node!r} at {client.jid}")
+                    elif sub.state == SubscriptionState.PENDING:
+                        follow_node.subscribed = True
+                        sub.state = SubscriptionState.SUBSCRIBED
+                        await self.apg.host.memory.storage.add(follow_node)
+                    else:
+                        raise exceptions.InternalError(
+                            f"Unhandled subscription state {sub.state!r}"
+                        )
+            else:
+                log.warning(f"Unmanaged accept type: {type_!r}")
+
+    async def handleCreateActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        digest = request.getHeader("digest")
+        if digest in self._seen_digest:
+            log.debug(f"Ignoring duplicated request (digest: {digest!r})")
+            return
+        self._seen_digest.append(digest)
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        objects = await self.apg.apGetList(data, "object")
+        for obj in objects:
+            sender = await self.apg.apGetSenderActor(obj)
+            if sender != signing_actor:
+                log.warning(
+                    "Ignoring object not attributed to signing actor: {obj}"
+                )
+            else:
+                await self.apg.newAPItem(client, account_jid, node, obj)
+
     async def APActorRequest(
         self,
         request: "HTTPRequest",
         account_jid: jid.JID,
         node: Optional[str],
         ap_account: str,
-        actor_url: str
+        actor_url: str,
+        signing_actor: Optional[str]
     ) -> dict:
         inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
+        shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX)
         outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
 
         # we have to use AP account as preferredUsername because it is used to retrieve
@@ -107,7 +238,10 @@
                 "id": f"{actor_url}#main-key",
                 "owner": actor_url,
                 "publicKeyPem": self.apg.public_key_pem
-            }
+            },
+            "endpoints": {
+                "sharedInbox": shared_inbox
+            },
         }
 
     def getCanonicalURL(self, request: "HTTPRequest") -> str:
@@ -206,7 +340,8 @@
         account_jid: jid.JID,
         node: Optional[str],
         ap_account: str,
-        ap_url: str
+        ap_url: str,
+        signing_actor: Optional[str]
     ) -> dict:
         if node is None:
             node = self.apg._m.namespace
@@ -229,7 +364,8 @@
                 service=account_jid,
                 node=node,
                 max_items=0,
-                rsm_request=rsm.RSMRequest(max_=0)
+                rsm_request=rsm.RSMRequest(max_=0),
+                extra = {C.KEY_USE_CACHE: False}
             )
         except error.StanzaError as e:
             log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
@@ -255,22 +391,268 @@
             "last": url_last_page,
         }
 
-    async def APRequest(self, request):
+    async def APInboxRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> None:
+        if signing_actor is None:
+            raise exceptions.InternalError("signing_actor must be set for inbox requests")
+        if node is None:
+            node = self.apg._m.namespace
+        try:
+            data = json.load(request.content)
+            if not isinstance(data, dict):
+                raise ValueError("data should be an object")
+        except (json.JSONDecodeError, ValueError) as e:
+            return self.responseCode(
+                request,
+                http.BAD_REQUEST,
+                f"invalid json in inbox request: {e}"
+            )
+        await self.checkSigningActor(data, signing_actor)
+        activity_type = (data.get("type") or "").lower()
+        if not activity_type in ACTIVITY_TYPES_LOWER:
+            return self.responseCode(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"request is not an activity, ignoring"
+            )
+
+        if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED:
+            return self.responseCode(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"{activity_type.title()!r} activity must target an account"
+            )
+
+        try:
+            method = getattr(self, f"handle{activity_type.title()}Activity")
+        except AttributeError:
+            return self.responseCode(
+                request,
+                http.UNSUPPORTED_MEDIA_TYPE,
+                f"{activity_type.title()} activity is not yet supported"
+            )
+        else:
+            await method(
+                request, data, account_jid, node, ap_account, ap_url, signing_actor
+            )
+
+    async def APRequest(
+        self,
+        request: "HTTPRequest",
+        signing_actor: Optional[str] = None
+    ) -> None:
         path = request.path.decode()
         ap_url = parse.urljoin(
             f"https://{self.apg.public_url}",
             path
         )
-        request_type, ap_account = self.apg.parseAPURL(ap_url)
-        account_jid, node = await self.apg.getJIDAndNode(ap_account)
-        if request_type not in AP_REQUEST_TYPES:
-            raise exceptions.DataError(f"Invalid request type: {request_type!r}")
-        method = getattr(self, f"AP{request_type.title()}Request")
-        ret_data = await method(request, account_jid, node, ap_account, ap_url)
-        request.setHeader("content-type", CONTENT_TYPE_AP)
-        request.write(json.dumps(ret_data).encode())
+        request_type, extra_args = self.apg.parseAPURL(ap_url)
+        if len(extra_args) == 0:
+            if request_type != "shared_inbox":
+                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+            ret_data = await self.APInboxRequest(
+                request, None, None, None, ap_url, signing_actor
+            )
+        else:
+            if len(extra_args) > 1:
+                log.warning(f"unexpected extra arguments: {extra_args!r}")
+            ap_account = extra_args[0]
+            account_jid, node = await self.apg.getJIDAndNode(ap_account)
+            if request_type not in AP_REQUEST_TYPES.get(
+                    request.method.decode().upper(), []
+            ):
+                raise exceptions.DataError(f"Invalid request type: {request_type!r}")
+            method = getattr(self, f"AP{request_type.title()}Request")
+            ret_data = await method(
+                request, account_jid, node, ap_account, ap_url, signing_actor
+            )
+        if ret_data is not None:
+            request.setHeader("content-type", CONTENT_TYPE_AP)
+            request.write(json.dumps(ret_data).encode())
         request.finish()
 
+    async def APPostRequest(self, request: "HTTPRequest"):
+        try:
+            signing_actor = await self.checkSignature(request)
+        except exceptions.EncryptionError as e:
+            self.responseCode(
+                request,
+                http.FORBIDDEN,
+                f"invalid signature: {e}"
+            )
+            request.finish()
+            return
+
+        return await self.APRequest(request, signing_actor)
+
+    async def checkSigningActor(self, data: dict, signing_actor: str) -> None:
+        """That that signing actor correspond to actor declared in data
+
+        @param data: request payload
+        @param signing_actor: actor ID of the signing entity, as returned by
+            checkSignature
+        @raise exceptions.NotFound: no actor found in data
+        @raise exceptions.EncryptionError: signing actor doesn't match actor in data
+        """
+        actor = await self.apg.apGetSenderActor(data)
+
+        if signing_actor != actor:
+            raise exceptions.EncryptionError(
+                f"signing actor ({signing_actor}) doesn't match actor in data ({actor})"
+            )
+
+    async def checkSignature(self, request: "HTTPRequest") -> str:
+        """Check and validate HTTP signature
+
+        @return: id of the signing actor
+
+        @raise exceptions.EncryptionError: signature is not present or doesn't match
+        """
+        signature = request.getHeader("Signature")
+        if signature is None:
+            raise exceptions.EncryptionError("No signature found")
+        sign_data = {
+            m["key"]: m["uq_value"] or m["quoted_value"][1:-1]
+            for m in RE_SIG_PARAM.finditer(signature)
+        }
+        try:
+            key_id = sign_data["keyId"]
+        except KeyError:
+            raise exceptions.EncryptionError('"keyId" is missing from signature')
+        algorithm = sign_data.get("algorithm", HS2019)
+        signed_headers = sign_data.get(
+            "headers",
+            "(created)" if algorithm==HS2019 else "date"
+        ).lower().split()
+        try:
+            headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method]
+        except KeyError:
+            raise exceptions.InternalError(
+                f"there should be a list of headers for {request.method} method"
+            )
+        if not headers_to_check:
+            raise exceptions.InternalError("headers_to_check must not be empty")
+
+        for header in headers_to_check:
+            if isinstance(header, tuple):
+                if len(set(header).intersection(signed_headers)) == 0:
+                    raise exceptions.EncryptionError(
+                        f"at least one of following header must be signed: {header}"
+                    )
+            elif header not in signed_headers:
+                raise exceptions.EncryptionError(
+                    f"the {header!r} header must be signed"
+                )
+
+        body = request.content.read()
+        request.content.seek(0)
+        headers = {}
+        for to_sign in signed_headers:
+            if to_sign == "(request-target)":
+                method = request.method.decode().lower()
+                uri = parse.unquote(request.uri.decode())
+                headers[to_sign] = f"{method} /{uri.lstrip('/')}"
+            elif to_sign in ("(created)", "(expires)"):
+                if algorithm != HS2019:
+                    raise exceptions.EncryptionError(
+                        f"{to_sign!r} pseudo-header can only be used with {HS2019} "
+                        "algorithm"
+                    )
+                key = to_sign[1:-1]
+                value = sign_data.get(key)
+                if not value:
+                    raise exceptions.EncryptionError(
+                        "{key!r} parameter is missing from signature"
+                    )
+                try:
+                    if float(value) < 0:
+                        raise ValueError
+                except ValueError:
+                    raise exceptions.EncryptionError(
+                        f"{to_sign} must be a Unix timestamp"
+                    )
+                headers[to_sign] = value
+            else:
+                value = request.getHeader(to_sign)
+                if not value:
+                    raise exceptions.EncryptionError(
+                        f"value of header {to_sign!r} is missing!"
+                    )
+                elif to_sign == "host":
+                    # we check Forwarded/X-Forwarded-Host headers
+                    # as we need original host if a proxy has modified the header
+                    forwarded = request.getHeader("forwarded")
+                    if forwarded is not None:
+                        try:
+                            host = [
+                                f[5:] for f in forwarded.split(";")
+                                if f.startswith("host=")
+                            ][0] or None
+                        except IndexError:
+                            host = None
+                    else:
+                        host = None
+                    if host is None:
+                        host = request.getHeader("x-forwarded-host")
+                    if host:
+                        value = host
+                elif to_sign == "digest":
+                    hashes = {
+                        algo.lower(): hash_ for algo, hash_ in (
+                            digest.split("=", 1) for digest in value.split(",")
+                        )
+                    }
+                    try:
+                        given_digest = hashes["sha-256"]
+                    except KeyError:
+                        raise exceptions.EncryptionError(
+                            "Only SHA-256 algorithm is currently supported for digest"
+                        )
+                    __, computed_digest = self.apg.getDigest(body)
+                    if given_digest != computed_digest:
+                        raise exceptions.EncryptionError(
+                            f"SHA-256 given and computed digest differ:\n"
+                            f"given: {given_digest!r}\ncomputed: {computed_digest!r}"
+                        )
+                headers[to_sign] = value
+
+        # date check
+        limit_ts = time.time() + SIGN_EXP
+        if "(created)" in headers:
+            created = float(headers["created"])
+        else:
+            created = date_utils.date_parse(headers["date"])
+
+
+        try:
+            expires = float(headers["expires"])
+        except KeyError:
+            pass
+        else:
+            if expires < created:
+                log.warning(
+                    f"(expires) [{expires}] set in the past of (created) [{created}] "
+                    "ignoring it according to specs"
+                )
+            else:
+                limit_ts = min(limit_ts, expires)
+
+        if created > limit_ts:
+            raise exceptions.EncryptionError("Signature has expired")
+
+        return await self.apg.checkSignature(
+            sign_data["signature"],
+            key_id,
+            headers
+        )
+
     def render(self, request):
         request.setHeader("server", VERSION)
         return super().render(request)
@@ -286,6 +668,13 @@
 
         return web_resource.NoResource().render(request)
 
+    def render_POST(self, request):
+        path = request.path.decode().lstrip("/")
+        if not path.startswith(self.apg.ap_path):
+            return web_resource.NoResource().render(request)
+        defer.ensureDeferred(self.APPostRequest(request))
+        return server.NOT_DONE_YET
+
 
 class HTTPRequest(server.Request):
     pass