comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3745:a8c7e5cef0cb

comp AP gateway: signature checking, caching and threads management: - HTTP signature is checked for incoming messages - AP actor can now be followed using pubsub subscription. When following is accepted, the node is cached - replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth` option to limit the number of comment nodes for a root message (documentation will come to explain this). ticket 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 86eea17cafa7
children 125c7043b277
comparison
equal deleted inserted replaced
3744:658ddbabaf36 3745:a8c7e5cef0cb
14 # GNU Affero General Public License for more details. 14 # GNU Affero General Public License for more details.
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import time
19 from typing import Optional, Dict, List 20 from typing import Optional, Dict, List
20 import json 21 import json
21 from urllib import parse 22 from urllib import parse
22 import re 23 from collections import deque
23 import unicodedata 24 import unicodedata
25 from pprint import pformat
24 26
25 from twisted.web import http, resource as web_resource, server 27 from twisted.web import http, resource as web_resource, server
26 from twisted.internet import defer 28 from twisted.internet import reactor, defer
27 from twisted.words.protocols.jabber import jid, error 29 from twisted.words.protocols.jabber import jid, error
28 from wokkel import pubsub, rsm 30 from wokkel import pubsub, rsm
29 31
30 from sat.core import exceptions 32 from sat.core import exceptions
31 from sat.core.constants import Const as C 33 from sat.core.constants import Const as C
32 from sat.core.i18n import _ 34 from sat.core.i18n import _
33 from sat.core.log import getLogger 35 from sat.core.log import getLogger
34 36 from sat.tools.common import date_utils
35 from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX, 37 from sat.memory.sqla_mapping import SubscriptionState
36 AP_REQUEST_TYPES, PAGE_SIZE) 38
39 from .constants import (
40 CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
41 AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
42 SIGN_HEADERS, HS2019, SIGN_EXP
43 )
44 from .regex import RE_SIG_PARAM
37 45
38 46
39 log = getLogger(__name__) 47 log = getLogger(__name__)
40 48
41 VERSION = unicodedata.normalize( 49 VERSION = unicodedata.normalize(
48 """HTTP Server handling ActivityPub S2S protocol""" 56 """HTTP Server handling ActivityPub S2S protocol"""
49 isLeaf = True 57 isLeaf = True
50 58
51 def __init__(self, ap_gateway): 59 def __init__(self, ap_gateway):
52 self.apg = ap_gateway 60 self.apg = ap_gateway
61 self._seen_digest = deque(maxlen=50)
53 super().__init__() 62 super().__init__()
63
64 def responseCode(
65 self,
66 request: "HTTPRequest",
67 http_code: int,
68 msg: Optional[str] = None
69 ) -> None:
70 """Log and set HTTP return code and associated message"""
71 log.warning(msg)
72 request.setResponseCode(http_code, None if msg is None else msg.encode())
54 73
55 async def webfinger(self, request): 74 async def webfinger(self, request):
56 url_parsed = parse.urlparse(request.uri.decode()) 75 url_parsed = parse.urlparse(request.uri.decode())
57 query = parse.parse_qs(url_parsed.query) 76 query = parse.parse_qs(url_parsed.query)
58 resource = query.get("resource", [""])[0] 77 resource = query.get("resource", [""])[0]
76 } 95 }
77 request.setHeader("content-type", CONTENT_TYPE_AP) 96 request.setHeader("content-type", CONTENT_TYPE_AP)
78 request.write(json.dumps(resp).encode()) 97 request.write(json.dumps(resp).encode())
79 request.finish() 98 request.finish()
80 99
81 async def APActorRequest( 100 async def handleFollowActivity(
82 self, 101 self,
83 request: "HTTPRequest", 102 request: "HTTPRequest",
103 data: dict,
84 account_jid: jid.JID, 104 account_jid: jid.JID,
85 node: Optional[str], 105 node: Optional[str],
86 ap_account: str, 106 ap_account: str,
87 actor_url: str 107 ap_url: str,
108 signing_actor: str
109 ) -> None:
110 if node is None:
111 node = self.apg._m.namespace
112 client = await self.apg.getVirtualClient(signing_actor)
113 try:
114 subscription = await self.apg._p.subscribe(
115 client,
116 account_jid,
117 node
118 )
119 except pubsub.SubscriptionPending:
120 log.info(f"subscription to node {node!r} of {account_jid} is pending")
121 # TODO: manage SubscriptionUnconfigured
122 else:
123 if subscription.state != "subscribed":
124 # other states should raise an Exception
125 raise exceptions.InternalError('"subscribed" state was expected')
126 inbox = await self.apg.getAPInboxFromId(signing_actor)
127 actor_id = self.apg.buildAPURL(TYPE_ACTOR, ap_account)
128 accept_data = self.apg.createActivity(
129 "Accept", actor_id, object_=data
130 )
131 await self.apg.signAndPost(inbox, actor_id, accept_data)
132
133 async def handleAcceptActivity(
134 self,
135 request: "HTTPRequest",
136 data: dict,
137 account_jid: jid.JID,
138 node: Optional[str],
139 ap_account: str,
140 ap_url: str,
141 signing_actor: str
142 ) -> None:
143 if node is None:
144 node = self.apg._m.namespace
145 client = await self.apg.getVirtualClient(signing_actor)
146 objects = await self.apg.apGetList(data, "object")
147 for obj in objects:
148 type_ = obj.get("type")
149 if type_ == "Follow":
150 follow_node = await self.apg.host.memory.storage.getPubsubNode(
151 client, client.jid, node, with_subscriptions=True
152 )
153 if follow_node is None:
154 log.warning(
155 f"Received a follow accept on an unknown node: {node!r} at "
156 f"{client.jid}. Ignoring it"
157 )
158 continue
159 try:
160 sub = next(
161 s for s in follow_node.subscriptions if s.subscriber==account_jid
162 )
163 except StopIteration:
164 log.warning(
165 "Received a follow accept on a node without subscription: "
166 f"{node!r} at {client.jid}. Ignoring it"
167 )
168 else:
169 if sub.state == SubscriptionState.SUBSCRIBED:
170 log.warning(f"Already subscribed to {node!r} at {client.jid}")
171 elif sub.state == SubscriptionState.PENDING:
172 follow_node.subscribed = True
173 sub.state = SubscriptionState.SUBSCRIBED
174 await self.apg.host.memory.storage.add(follow_node)
175 else:
176 raise exceptions.InternalError(
177 f"Unhandled subscription state {sub.state!r}"
178 )
179 else:
180 log.warning(f"Unmanaged accept type: {type_!r}")
181
182 async def handleCreateActivity(
183 self,
184 request: "HTTPRequest",
185 data: dict,
186 account_jid: Optional[jid.JID],
187 node: Optional[str],
188 ap_account: Optional[str],
189 ap_url: str,
190 signing_actor: str
191 ):
192 digest = request.getHeader("digest")
193 if digest in self._seen_digest:
194 log.debug(f"Ignoring duplicated request (digest: {digest!r})")
195 return
196 self._seen_digest.append(digest)
197 if node is None:
198 node = self.apg._m.namespace
199 client = await self.apg.getVirtualClient(signing_actor)
200 objects = await self.apg.apGetList(data, "object")
201 for obj in objects:
202 sender = await self.apg.apGetSenderActor(obj)
203 if sender != signing_actor:
204 log.warning(
205 "Ignoring object not attributed to signing actor: {obj}"
206 )
207 else:
208 await self.apg.newAPItem(client, account_jid, node, obj)
209
210 async def APActorRequest(
211 self,
212 request: "HTTPRequest",
213 account_jid: jid.JID,
214 node: Optional[str],
215 ap_account: str,
216 actor_url: str,
217 signing_actor: Optional[str]
88 ) -> dict: 218 ) -> dict:
89 inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) 219 inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
220 shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX)
90 outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) 221 outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
91 222
92 # we have to use AP account as preferredUsername because it is used to retrieve 223 # we have to use AP account as preferredUsername because it is used to retrieve
93 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) 224 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
94 preferred_username = ap_account.split("@", 1)[0] 225 preferred_username = ap_account.split("@", 1)[0]
105 "outbox": outbox_url, 236 "outbox": outbox_url,
106 "publicKey": { 237 "publicKey": {
107 "id": f"{actor_url}#main-key", 238 "id": f"{actor_url}#main-key",
108 "owner": actor_url, 239 "owner": actor_url,
109 "publicKeyPem": self.apg.public_key_pem 240 "publicKeyPem": self.apg.public_key_pem
110 } 241 },
242 "endpoints": {
243 "sharedInbox": shared_inbox
244 },
111 } 245 }
112 246
113 def getCanonicalURL(self, request: "HTTPRequest") -> str: 247 def getCanonicalURL(self, request: "HTTPRequest") -> str:
114 return parse.urljoin( 248 return parse.urljoin(
115 f"https://{self.apg.public_url}", 249 f"https://{self.apg.public_url}",
204 self, 338 self,
205 request: "HTTPRequest", 339 request: "HTTPRequest",
206 account_jid: jid.JID, 340 account_jid: jid.JID,
207 node: Optional[str], 341 node: Optional[str],
208 ap_account: str, 342 ap_account: str,
209 ap_url: str 343 ap_url: str,
344 signing_actor: Optional[str]
210 ) -> dict: 345 ) -> dict:
211 if node is None: 346 if node is None:
212 node = self.apg._m.namespace 347 node = self.apg._m.namespace
213 348
214 parsed_url = parse.urlparse(request.uri.decode()) 349 parsed_url = parse.urlparse(request.uri.decode())
227 __, metadata = await self.apg._p.getItems( 362 __, metadata = await self.apg._p.getItems(
228 client=self.apg.client, 363 client=self.apg.client,
229 service=account_jid, 364 service=account_jid,
230 node=node, 365 node=node,
231 max_items=0, 366 max_items=0,
232 rsm_request=rsm.RSMRequest(max_=0) 367 rsm_request=rsm.RSMRequest(max_=0),
368 extra = {C.KEY_USE_CACHE: False}
233 ) 369 )
234 except error.StanzaError as e: 370 except error.StanzaError as e:
235 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") 371 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
236 return {} 372 return {}
237 try: 373 try:
253 "type": "OrderedCollection", 389 "type": "OrderedCollection",
254 "first": url_first_page, 390 "first": url_first_page,
255 "last": url_last_page, 391 "last": url_last_page,
256 } 392 }
257 393
258 async def APRequest(self, request): 394 async def APInboxRequest(
395 self,
396 request: "HTTPRequest",
397 account_jid: Optional[jid.JID],
398 node: Optional[str],
399 ap_account: Optional[str],
400 ap_url: str,
401 signing_actor: Optional[str]
402 ) -> None:
403 if signing_actor is None:
404 raise exceptions.InternalError("signing_actor must be set for inbox requests")
405 if node is None:
406 node = self.apg._m.namespace
407 try:
408 data = json.load(request.content)
409 if not isinstance(data, dict):
410 raise ValueError("data should be an object")
411 except (json.JSONDecodeError, ValueError) as e:
412 return self.responseCode(
413 request,
414 http.BAD_REQUEST,
415 f"invalid json in inbox request: {e}"
416 )
417 await self.checkSigningActor(data, signing_actor)
418 activity_type = (data.get("type") or "").lower()
419 if not activity_type in ACTIVITY_TYPES_LOWER:
420 return self.responseCode(
421 request,
422 http.UNSUPPORTED_MEDIA_TYPE,
423 f"request is not an activity, ignoring"
424 )
425
426 if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED:
427 return self.responseCode(
428 request,
429 http.UNSUPPORTED_MEDIA_TYPE,
430 f"{activity_type.title()!r} activity must target an account"
431 )
432
433 try:
434 method = getattr(self, f"handle{activity_type.title()}Activity")
435 except AttributeError:
436 return self.responseCode(
437 request,
438 http.UNSUPPORTED_MEDIA_TYPE,
439 f"{activity_type.title()} activity is not yet supported"
440 )
441 else:
442 await method(
443 request, data, account_jid, node, ap_account, ap_url, signing_actor
444 )
445
446 async def APRequest(
447 self,
448 request: "HTTPRequest",
449 signing_actor: Optional[str] = None
450 ) -> None:
259 path = request.path.decode() 451 path = request.path.decode()
260 ap_url = parse.urljoin( 452 ap_url = parse.urljoin(
261 f"https://{self.apg.public_url}", 453 f"https://{self.apg.public_url}",
262 path 454 path
263 ) 455 )
264 request_type, ap_account = self.apg.parseAPURL(ap_url) 456 request_type, extra_args = self.apg.parseAPURL(ap_url)
265 account_jid, node = await self.apg.getJIDAndNode(ap_account) 457 if len(extra_args) == 0:
266 if request_type not in AP_REQUEST_TYPES: 458 if request_type != "shared_inbox":
267 raise exceptions.DataError(f"Invalid request type: {request_type!r}") 459 raise exceptions.DataError(f"Invalid request type: {request_type!r}")
268 method = getattr(self, f"AP{request_type.title()}Request") 460 ret_data = await self.APInboxRequest(
269 ret_data = await method(request, account_jid, node, ap_account, ap_url) 461 request, None, None, None, ap_url, signing_actor
270 request.setHeader("content-type", CONTENT_TYPE_AP) 462 )
271 request.write(json.dumps(ret_data).encode()) 463 else:
464 if len(extra_args) > 1:
465 log.warning(f"unexpected extra arguments: {extra_args!r}")
466 ap_account = extra_args[0]
467 account_jid, node = await self.apg.getJIDAndNode(ap_account)
468 if request_type not in AP_REQUEST_TYPES.get(
469 request.method.decode().upper(), []
470 ):
471 raise exceptions.DataError(f"Invalid request type: {request_type!r}")
472 method = getattr(self, f"AP{request_type.title()}Request")
473 ret_data = await method(
474 request, account_jid, node, ap_account, ap_url, signing_actor
475 )
476 if ret_data is not None:
477 request.setHeader("content-type", CONTENT_TYPE_AP)
478 request.write(json.dumps(ret_data).encode())
272 request.finish() 479 request.finish()
480
481 async def APPostRequest(self, request: "HTTPRequest"):
482 try:
483 signing_actor = await self.checkSignature(request)
484 except exceptions.EncryptionError as e:
485 self.responseCode(
486 request,
487 http.FORBIDDEN,
488 f"invalid signature: {e}"
489 )
490 request.finish()
491 return
492
493 return await self.APRequest(request, signing_actor)
494
495 async def checkSigningActor(self, data: dict, signing_actor: str) -> None:
496 """That that signing actor correspond to actor declared in data
497
498 @param data: request payload
499 @param signing_actor: actor ID of the signing entity, as returned by
500 checkSignature
501 @raise exceptions.NotFound: no actor found in data
502 @raise exceptions.EncryptionError: signing actor doesn't match actor in data
503 """
504 actor = await self.apg.apGetSenderActor(data)
505
506 if signing_actor != actor:
507 raise exceptions.EncryptionError(
508 f"signing actor ({signing_actor}) doesn't match actor in data ({actor})"
509 )
510
511 async def checkSignature(self, request: "HTTPRequest") -> str:
512 """Check and validate HTTP signature
513
514 @return: id of the signing actor
515
516 @raise exceptions.EncryptionError: signature is not present or doesn't match
517 """
518 signature = request.getHeader("Signature")
519 if signature is None:
520 raise exceptions.EncryptionError("No signature found")
521 sign_data = {
522 m["key"]: m["uq_value"] or m["quoted_value"][1:-1]
523 for m in RE_SIG_PARAM.finditer(signature)
524 }
525 try:
526 key_id = sign_data["keyId"]
527 except KeyError:
528 raise exceptions.EncryptionError('"keyId" is missing from signature')
529 algorithm = sign_data.get("algorithm", HS2019)
530 signed_headers = sign_data.get(
531 "headers",
532 "(created)" if algorithm==HS2019 else "date"
533 ).lower().split()
534 try:
535 headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method]
536 except KeyError:
537 raise exceptions.InternalError(
538 f"there should be a list of headers for {request.method} method"
539 )
540 if not headers_to_check:
541 raise exceptions.InternalError("headers_to_check must not be empty")
542
543 for header in headers_to_check:
544 if isinstance(header, tuple):
545 if len(set(header).intersection(signed_headers)) == 0:
546 raise exceptions.EncryptionError(
547 f"at least one of following header must be signed: {header}"
548 )
549 elif header not in signed_headers:
550 raise exceptions.EncryptionError(
551 f"the {header!r} header must be signed"
552 )
553
554 body = request.content.read()
555 request.content.seek(0)
556 headers = {}
557 for to_sign in signed_headers:
558 if to_sign == "(request-target)":
559 method = request.method.decode().lower()
560 uri = parse.unquote(request.uri.decode())
561 headers[to_sign] = f"{method} /{uri.lstrip('/')}"
562 elif to_sign in ("(created)", "(expires)"):
563 if algorithm != HS2019:
564 raise exceptions.EncryptionError(
565 f"{to_sign!r} pseudo-header can only be used with {HS2019} "
566 "algorithm"
567 )
568 key = to_sign[1:-1]
569 value = sign_data.get(key)
570 if not value:
571 raise exceptions.EncryptionError(
572 "{key!r} parameter is missing from signature"
573 )
574 try:
575 if float(value) < 0:
576 raise ValueError
577 except ValueError:
578 raise exceptions.EncryptionError(
579 f"{to_sign} must be a Unix timestamp"
580 )
581 headers[to_sign] = value
582 else:
583 value = request.getHeader(to_sign)
584 if not value:
585 raise exceptions.EncryptionError(
586 f"value of header {to_sign!r} is missing!"
587 )
588 elif to_sign == "host":
589 # we check Forwarded/X-Forwarded-Host headers
590 # as we need original host if a proxy has modified the header
591 forwarded = request.getHeader("forwarded")
592 if forwarded is not None:
593 try:
594 host = [
595 f[5:] for f in forwarded.split(";")
596 if f.startswith("host=")
597 ][0] or None
598 except IndexError:
599 host = None
600 else:
601 host = None
602 if host is None:
603 host = request.getHeader("x-forwarded-host")
604 if host:
605 value = host
606 elif to_sign == "digest":
607 hashes = {
608 algo.lower(): hash_ for algo, hash_ in (
609 digest.split("=", 1) for digest in value.split(",")
610 )
611 }
612 try:
613 given_digest = hashes["sha-256"]
614 except KeyError:
615 raise exceptions.EncryptionError(
616 "Only SHA-256 algorithm is currently supported for digest"
617 )
618 __, computed_digest = self.apg.getDigest(body)
619 if given_digest != computed_digest:
620 raise exceptions.EncryptionError(
621 f"SHA-256 given and computed digest differ:\n"
622 f"given: {given_digest!r}\ncomputed: {computed_digest!r}"
623 )
624 headers[to_sign] = value
625
626 # date check
627 limit_ts = time.time() + SIGN_EXP
628 if "(created)" in headers:
629 created = float(headers["created"])
630 else:
631 created = date_utils.date_parse(headers["date"])
632
633
634 try:
635 expires = float(headers["expires"])
636 except KeyError:
637 pass
638 else:
639 if expires < created:
640 log.warning(
641 f"(expires) [{expires}] set in the past of (created) [{created}] "
642 "ignoring it according to specs"
643 )
644 else:
645 limit_ts = min(limit_ts, expires)
646
647 if created > limit_ts:
648 raise exceptions.EncryptionError("Signature has expired")
649
650 return await self.apg.checkSignature(
651 sign_data["signature"],
652 key_id,
653 headers
654 )
273 655
274 def render(self, request): 656 def render(self, request):
275 request.setHeader("server", VERSION) 657 request.setHeader("server", VERSION)
276 return super().render(request) 658 return super().render(request)
277 659
284 defer.ensureDeferred(self.APRequest(request)) 666 defer.ensureDeferred(self.APRequest(request))
285 return server.NOT_DONE_YET 667 return server.NOT_DONE_YET
286 668
287 return web_resource.NoResource().render(request) 669 return web_resource.NoResource().render(request)
288 670
671 def render_POST(self, request):
672 path = request.path.decode().lstrip("/")
673 if not path.startswith(self.apg.ap_path):
674 return web_resource.NoResource().render(request)
675 defer.ensureDeferred(self.APPostRequest(request))
676 return server.NOT_DONE_YET
677
289 678
290 class HTTPRequest(server.Request): 679 class HTTPRequest(server.Request):
291 pass 680 pass
292 681
293 682