comparison libervia/backend/plugins/plugin_comp_ap_gateway/http_server.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_comp_ap_gateway/http_server.py@524856bd7b19
children 13b1079c27ec
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia ActivityPub Gateway
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import time
20 import html
21 from typing import Optional, Dict, List, Any
22 import json
23 from urllib import parse
24 from collections import deque
25 import unicodedata
26
27 from twisted.web import http, resource as web_resource, server
28 from twisted.web import static
29 from twisted.web import util as web_util
30 from twisted.python import failure
31 from twisted.internet import defer
32 from twisted.words.protocols.jabber import jid, error
33 from wokkel import pubsub, rsm
34
35 from libervia.backend.core import exceptions
36 from libervia.backend.core.constants import Const as C
37 from libervia.backend.core.i18n import _
38 from libervia.backend.core.core_types import SatXMPPEntity
39 from libervia.backend.core.log import getLogger
40 from libervia.backend.tools.common import date_utils, uri
41 from libervia.backend.memory.sqla_mapping import SubscriptionState
42
43 from .constants import (
44 NS_AP, MEDIA_TYPE_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX,
45 TYPE_OUTBOX, TYPE_EVENT, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER,
46 ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS,
47 TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE
48 )
49 from .regex import RE_SIG_PARAM
50
51
52 log = getLogger(__name__)
53
54 VERSION = unicodedata.normalize(
55 'NFKD',
56 f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
57 )
58
59
60 class HTTPAPGServer(web_resource.Resource):
61 """HTTP Server handling ActivityPub S2S protocol"""
62 isLeaf = True
63
64 def __init__(self, ap_gateway):
65 self.apg = ap_gateway
66 self._seen_digest = deque(maxlen=50)
67 super().__init__()
68
69 def response_code(
70 self,
71 request: "HTTPRequest",
72 http_code: int,
73 msg: Optional[str] = None
74 ) -> None:
75 """Log and set HTTP return code and associated message"""
76 if msg is not None:
77 log.warning(msg)
78 request.setResponseCode(http_code, None if msg is None else msg.encode())
79
80 def _on_request_error(self, failure_: failure.Failure, request: "HTTPRequest") -> None:
81 exc = failure_.value
82 if isinstance(exc, exceptions.NotFound):
83 self.response_code(
84 request,
85 http.NOT_FOUND,
86 str(exc)
87 )
88 else:
89 log.exception(f"Internal error: {failure_.value}")
90 self.response_code(
91 request,
92 http.INTERNAL_SERVER_ERROR,
93 f"internal error: {failure_.value}"
94 )
95 request.finish()
96 raise failure_
97
98 request.finish()
99
100 async def webfinger(self, request):
101 url_parsed = parse.urlparse(request.uri.decode())
102 query = parse.parse_qs(url_parsed.query)
103 resource = query.get("resource", [""])[0]
104 account = resource[5:].strip()
105 if not resource.startswith("acct:") or not account:
106 return web_resource.ErrorPage(
107 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
108 ).render(request)
109
110 actor_url = self.apg.build_apurl(TYPE_ACTOR, account)
111
112 resp = {
113 "aliases": [actor_url],
114 "subject": resource,
115 "links": [
116 {
117 "rel": "self",
118 "type": "application/activity+json",
119 "href": actor_url
120 }
121 ]
122 }
123 request.setHeader("content-type", CONTENT_TYPE_AP)
124 request.write(json.dumps(resp).encode())
125 request.finish()
126
127 async def handle_undo_activity(
128 self,
129 request: "HTTPRequest",
130 data: dict,
131 account_jid: jid.JID,
132 node: Optional[str],
133 ap_account: str,
134 ap_url: str,
135 signing_actor: str
136 ) -> None:
137 if node is None:
138 node = self.apg._m.namespace
139 client = await self.apg.get_virtual_client(signing_actor)
140 object_ = data.get("object")
141 if isinstance(object_, str):
142 # we check first if it's not a cached object
143 ap_cache_key = f"{ST_AP_CACHE}{object_}"
144 value = await self.apg.client._ap_storage.get(ap_cache_key)
145 else:
146 value = None
147 if value is not None:
148 objects = [value]
149 # because we'll undo the activity, we can remove it from cache
150 await self.apg.client._ap_storage.remove(ap_cache_key)
151 else:
152 objects = await self.apg.ap_get_list(data, "object")
153 for obj in objects:
154 type_ = obj.get("type")
155 actor = await self.apg.ap_get_sender_actor(obj)
156 if actor != signing_actor:
157 log.warning(f"ignoring object not attributed to signing actor: {data}")
158 continue
159
160 if type_ == "Follow":
161 try:
162 target_account = obj["object"]
163 except KeyError:
164 log.warning(f'ignoring invalid object, missing "object" key: {data}')
165 continue
166 if not self.apg.is_local_url(target_account):
167 log.warning(f"ignoring unfollow request to non local actor: {data}")
168 continue
169 await self.apg._p.unsubscribe(
170 client,
171 account_jid,
172 node,
173 sender=client.jid,
174 )
175 elif type_ == "Announce":
176 # we can use directly the Announce object, as only the "id" field is
177 # needed
178 await self.apg.new_ap_delete_item(client, None, node, obj)
179 elif type_ == TYPE_LIKE:
180 await self.handle_attachment_item(client, obj, {"noticed": False})
181 elif type_ == TYPE_REACTION:
182 await self.handle_attachment_item(client, obj, {
183 "reactions": {"operation": "update", "remove": [obj["content"]]}
184 })
185 else:
186 log.warning(f"Unmanaged undo type: {type_!r}")
187
188 async def handle_follow_activity(
189 self,
190 request: "HTTPRequest",
191 data: dict,
192 account_jid: jid.JID,
193 node: Optional[str],
194 ap_account: str,
195 ap_url: str,
196 signing_actor: str
197 ) -> None:
198 if node is None:
199 node = self.apg._m.namespace
200 client = await self.apg.get_virtual_client(signing_actor)
201 try:
202 subscription = await self.apg._p.subscribe(
203 client,
204 account_jid,
205 node,
206 # subscriptions from AP are always public
207 options=self.apg._pps.set_public_opt()
208 )
209 except pubsub.SubscriptionPending:
210 log.info(f"subscription to node {node!r} of {account_jid} is pending")
211 # TODO: manage SubscriptionUnconfigured
212 else:
213 if subscription.state != "subscribed":
214 # other states should raise an Exception
215 raise exceptions.InternalError('"subscribed" state was expected')
216 inbox = await self.apg.get_ap_inbox_from_id(signing_actor, use_shared=False)
217 actor_id = self.apg.build_apurl(TYPE_ACTOR, ap_account)
218 accept_data = self.apg.create_activity(
219 "Accept", actor_id, object_=data
220 )
221 await self.apg.sign_and_post(inbox, actor_id, accept_data)
222 await self.apg._c.synchronise(client, account_jid, node, resync=False)
223
224 async def handle_accept_activity(
225 self,
226 request: "HTTPRequest",
227 data: dict,
228 account_jid: jid.JID,
229 node: Optional[str],
230 ap_account: str,
231 ap_url: str,
232 signing_actor: str
233 ) -> None:
234 if node is None:
235 node = self.apg._m.namespace
236 client = await self.apg.get_virtual_client(signing_actor)
237 objects = await self.apg.ap_get_list(data, "object")
238 for obj in objects:
239 type_ = obj.get("type")
240 if type_ == "Follow":
241 follow_node = await self.apg.host.memory.storage.get_pubsub_node(
242 client, client.jid, node, with_subscriptions=True
243 )
244 if follow_node is None:
245 log.warning(
246 f"Received a follow accept on an unknown node: {node!r} at "
247 f"{client.jid}. Ignoring it"
248 )
249 continue
250 try:
251 sub = next(
252 s for s in follow_node.subscriptions if s.subscriber==account_jid
253 )
254 except StopIteration:
255 log.warning(
256 "Received a follow accept on a node without subscription: "
257 f"{node!r} at {client.jid}. Ignoring it"
258 )
259 else:
260 if sub.state == SubscriptionState.SUBSCRIBED:
261 log.warning(f"Already subscribed to {node!r} at {client.jid}")
262 elif sub.state == SubscriptionState.PENDING:
263 follow_node.subscribed = True
264 sub.state = SubscriptionState.SUBSCRIBED
265 await self.apg.host.memory.storage.add(follow_node)
266 else:
267 raise exceptions.InternalError(
268 f"Unhandled subscription state {sub.state!r}"
269 )
270 else:
271 log.warning(f"Unmanaged accept type: {type_!r}")
272
273 async def handle_delete_activity(
274 self,
275 request: "HTTPRequest",
276 data: dict,
277 account_jid: Optional[jid.JID],
278 node: Optional[str],
279 ap_account: Optional[str],
280 ap_url: str,
281 signing_actor: str
282 ):
283 if node is None:
284 node = self.apg._m.namespace
285 client = await self.apg.get_virtual_client(signing_actor)
286 objects = await self.apg.ap_get_list(data, "object")
287 for obj in objects:
288 await self.apg.new_ap_delete_item(client, account_jid, node, obj)
289
290 async def handle_new_ap_items(
291 self,
292 request: "HTTPRequest",
293 data: dict,
294 account_jid: Optional[jid.JID],
295 node: Optional[str],
296 signing_actor: str,
297 repeated: bool = False,
298 ):
299 """Helper method to handle workflow for new AP items
300
301 accept globally the same parameter as for handle_create_activity
302 @param repeated: if True, the item is an item republished from somewhere else
303 """
304 if "_repeated" in data:
305 log.error(
306 '"_repeated" field already present in given AP item, this should not '
307 f"happen. Ignoring object from {signing_actor}\n{data}"
308 )
309 raise exceptions.DataError("unexpected field in item")
310 client = await self.apg.get_virtual_client(signing_actor)
311 objects = await self.apg.ap_get_list(data, "object")
312 for obj in objects:
313 if node is None:
314 if obj.get("type") == TYPE_EVENT:
315 node = self.apg._events.namespace
316 else:
317 node = self.apg._m.namespace
318 sender = await self.apg.ap_get_sender_actor(obj)
319 if repeated:
320 # we don't check sender when item is repeated, as it should be different
321 # from post author in this case
322 sender_jid = await self.apg.get_jid_from_id(sender)
323 repeater_jid = await self.apg.get_jid_from_id(signing_actor)
324 repeated_item_id = obj["id"]
325 if self.apg.is_local_url(repeated_item_id):
326 # the repeated object is from XMPP, we need to parse the URL to find
327 # the right ID
328 url_type, url_args = self.apg.parse_apurl(repeated_item_id)
329 if url_type != "item":
330 raise exceptions.DataError(
331 "local URI is not an item: {repeated_id}"
332 )
333 try:
334 url_account, url_item_id = url_args
335 if not url_account or not url_item_id:
336 raise ValueError
337 except (RuntimeError, ValueError):
338 raise exceptions.DataError(
339 "local URI is invalid: {repeated_id}"
340 )
341 else:
342 url_jid, url_node = await self.apg.get_jid_and_node(url_account)
343 if ((url_jid != sender_jid
344 or url_node and url_node != self.apg._m.namespace)):
345 raise exceptions.DataError(
346 "announced ID doesn't match sender ({sender}): "
347 f"[repeated_item_id]"
348 )
349
350 repeated_item_id = url_item_id
351
352 obj["_repeated"] = {
353 "by": repeater_jid.full(),
354 "at": data.get("published"),
355 "uri": uri.build_xmpp_uri(
356 "pubsub",
357 path=sender_jid.full(),
358 node=self.apg._m.namespace,
359 item=repeated_item_id
360 )
361 }
362 # we must use activity's id and targets, not the original item ones
363 for field in ("id", "to", "bto", "cc", "bcc"):
364 obj[field] = data.get(field)
365 else:
366 if sender != signing_actor:
367 log.warning(
368 "Ignoring object not attributed to signing actor: {obj}"
369 )
370 continue
371
372 await self.apg.new_ap_item(client, account_jid, node, obj)
373
374 async def handle_create_activity(
375 self,
376 request: "HTTPRequest",
377 data: dict,
378 account_jid: Optional[jid.JID],
379 node: Optional[str],
380 ap_account: Optional[str],
381 ap_url: str,
382 signing_actor: str
383 ):
384 await self.handle_new_ap_items(request, data, account_jid, node, signing_actor)
385
386 async def handle_update_activity(
387 self,
388 request: "HTTPRequest",
389 data: dict,
390 account_jid: Optional[jid.JID],
391 node: Optional[str],
392 ap_account: Optional[str],
393 ap_url: str,
394 signing_actor: str
395 ):
396 # Update is the same as create: the item ID stays the same, thus the item will be
397 # overwritten
398 await self.handle_new_ap_items(request, data, account_jid, node, signing_actor)
399
400 async def handle_announce_activity(
401 self,
402 request: "HTTPRequest",
403 data: dict,
404 account_jid: Optional[jid.JID],
405 node: Optional[str],
406 ap_account: Optional[str],
407 ap_url: str,
408 signing_actor: str
409 ):
410 # we create a new item
411 await self.handle_new_ap_items(
412 request,
413 data,
414 account_jid,
415 node,
416 signing_actor,
417 repeated=True
418 )
419
420 async def handle_attachment_item(
421 self,
422 client: SatXMPPEntity,
423 data: dict,
424 attachment_data: dict
425 ) -> None:
426 target_ids = data.get("object")
427 if not target_ids:
428 raise exceptions.DataError("object should be set")
429 elif isinstance(target_ids, list):
430 try:
431 target_ids = [o["id"] for o in target_ids]
432 except (KeyError, TypeError):
433 raise exceptions.DataError(f"invalid object: {target_ids!r}")
434 elif isinstance(target_ids, dict):
435 obj_id = target_ids.get("id")
436 if not obj_id or not isinstance(obj_id, str):
437 raise exceptions.DataError(f"invalid object: {target_ids!r}")
438 target_ids = [obj_id]
439 elif isinstance(target_ids, str):
440 target_ids = [target_ids]
441
442 # XXX: we have to cache AP items because some implementation (Pleroma notably)
443 # don't keep object accessible, and we need to be able to retrieve them for
444 # UNDO. Current implementation will grow, we need to add a way to flush it after
445 # a while.
446 # TODO: add a way to flush old cached AP items.
447 await client._ap_storage.aset(f"{ST_AP_CACHE}{data['id']}", data)
448
449 for target_id in target_ids:
450 if not self.apg.is_local_url(target_id):
451 log.debug(f"ignoring non local target ID: {target_id}")
452 continue
453 url_type, url_args = self.apg.parse_apurl(target_id)
454 if url_type != TYPE_ITEM:
455 log.warning(f"unexpected local URL for attachment on item {target_id}")
456 continue
457 try:
458 account, item_id = url_args
459 except ValueError:
460 raise ValueError(f"invalid URL: {target_id}")
461 author_jid, item_node = await self.apg.get_jid_and_node(account)
462 if item_node is None:
463 item_node = self.apg._m.namespace
464 attachment_node = self.apg._pa.get_attachment_node_name(
465 author_jid, item_node, item_id
466 )
467 cached_node = await self.apg.host.memory.storage.get_pubsub_node(
468 client,
469 author_jid,
470 attachment_node,
471 with_subscriptions=True,
472 create=True
473 )
474 found_items, __ = await self.apg.host.memory.storage.get_items(
475 cached_node, item_ids=[client.jid.userhost()]
476 )
477 if not found_items:
478 old_item_elt = None
479 else:
480 found_item = found_items[0]
481 old_item_elt = found_item.data
482
483 item_elt = await self.apg._pa.apply_set_handler(
484 client,
485 {"extra": attachment_data},
486 old_item_elt,
487 None
488 )
489 # we reparse the element, as there can be other attachments
490 attachments_data = self.apg._pa.items_2_attachment_data(client, [item_elt])
491 # and we update the cache
492 await self.apg.host.memory.storage.cache_pubsub_items(
493 client,
494 cached_node,
495 [item_elt],
496 attachments_data or [{}]
497 )
498
499 if self.apg.is_virtual_jid(author_jid):
500 # the attachment is on t a virtual pubsub service (linking to an AP item),
501 # we notify all subscribers
502 for subscription in cached_node.subscriptions:
503 if subscription.state != SubscriptionState.SUBSCRIBED:
504 continue
505 self.apg.pubsub_service.notifyPublish(
506 author_jid,
507 attachment_node,
508 [(subscription.subscriber, None, [item_elt])]
509 )
510 else:
511 # the attachment is on an XMPP item, we publish it to the attachment node
512 await self.apg._p.send_items(
513 client, author_jid, attachment_node, [item_elt]
514 )
515
516 async def handle_like_activity(
517 self,
518 request: "HTTPRequest",
519 data: dict,
520 account_jid: Optional[jid.JID],
521 node: Optional[str],
522 ap_account: Optional[str],
523 ap_url: str,
524 signing_actor: str
525 ) -> None:
526 client = await self.apg.get_virtual_client(signing_actor)
527 await self.handle_attachment_item(client, data, {"noticed": True})
528
529 async def handle_emojireact_activity(
530 self,
531 request: "HTTPRequest",
532 data: dict,
533 account_jid: Optional[jid.JID],
534 node: Optional[str],
535 ap_account: Optional[str],
536 ap_url: str,
537 signing_actor: str
538 ) -> None:
539 client = await self.apg.get_virtual_client(signing_actor)
540 await self.handle_attachment_item(client, data, {
541 "reactions": {"operation": "update", "add": [data["content"]]}
542 })
543
544 async def handle_join_activity(
545 self,
546 request: "HTTPRequest",
547 data: dict,
548 account_jid: Optional[jid.JID],
549 node: Optional[str],
550 ap_account: Optional[str],
551 ap_url: str,
552 signing_actor: str
553 ) -> None:
554 client = await self.apg.get_virtual_client(signing_actor)
555 await self.handle_attachment_item(client, data, {"rsvp": {"attending": "yes"}})
556
557 async def handle_leave_activity(
558 self,
559 request: "HTTPRequest",
560 data: dict,
561 account_jid: Optional[jid.JID],
562 node: Optional[str],
563 ap_account: Optional[str],
564 ap_url: str,
565 signing_actor: str
566 ) -> None:
567 client = await self.apg.get_virtual_client(signing_actor)
568 await self.handle_attachment_item(client, data, {"rsvp": {"attending": "no"}})
569
570 async def ap_actor_request(
571 self,
572 request: "HTTPRequest",
573 data: Optional[dict],
574 account_jid: jid.JID,
575 node: Optional[str],
576 ap_account: str,
577 ap_url: str,
578 signing_actor: Optional[str]
579 ) -> dict:
580 inbox = self.apg.build_apurl(TYPE_INBOX, ap_account)
581 shared_inbox = self.apg.build_apurl(TYPE_SHARED_INBOX)
582 outbox = self.apg.build_apurl(TYPE_OUTBOX, ap_account)
583 followers = self.apg.build_apurl(TYPE_FOLLOWERS, ap_account)
584 following = self.apg.build_apurl(TYPE_FOLLOWING, ap_account)
585
586 # we have to use AP account as preferredUsername because it is used to retrieve
587 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
588 preferred_username = ap_account.split("@", 1)[0]
589
590 identity_data = await self.apg._i.get_identity(self.apg.client, account_jid)
591 if node and node.startswith(self.apg._events.namespace):
592 events = outbox
593 else:
594 events_account = await self.apg.get_ap_account_from_jid_and_node(
595 account_jid, self.apg._events.namespace
596 )
597 events = self.apg.build_apurl(TYPE_OUTBOX, events_account)
598
599 actor_data = {
600 "@context": [
601 "https://www.w3.org/ns/activitystreams",
602 "https://w3id.org/security/v1"
603 ],
604
605 # XXX: Mastodon doesn't like percent-encode arobas, so we have to unescape it
606 # if it is escaped
607 "id": ap_url.replace("%40", "@"),
608 "type": "Person",
609 "preferredUsername": preferred_username,
610 "inbox": inbox,
611 "outbox": outbox,
612 "events": events,
613 "followers": followers,
614 "following": following,
615 "publicKey": {
616 "id": f"{ap_url}#main-key",
617 "owner": ap_url,
618 "publicKeyPem": self.apg.public_key_pem
619 },
620 "endpoints": {
621 "sharedInbox": shared_inbox,
622 "events": events,
623 },
624 }
625
626 if identity_data.get("nicknames"):
627 actor_data["name"] = identity_data["nicknames"][0]
628 if identity_data.get("description"):
629 # description is plain text while summary expects HTML
630 actor_data["summary"] = html.escape(identity_data["description"])
631 if identity_data.get("avatar"):
632 avatar_data = identity_data["avatar"]
633 try:
634 filename = avatar_data["filename"]
635 media_type = avatar_data["media_type"]
636 except KeyError:
637 log.error(f"incomplete avatar data: {identity_data!r}")
638 else:
639 avatar_url = self.apg.build_apurl("avatar", filename)
640 actor_data["icon"] = {
641 "type": "Image",
642 "url": avatar_url,
643 "mediaType": media_type
644 }
645
646 return actor_data
647
648 def get_canonical_url(self, request: "HTTPRequest") -> str:
649 return parse.urljoin(
650 f"https://{self.apg.public_url}",
651 request.path.decode().rstrip("/")
652 # we unescape "@" for the same reason as in [ap_actor_request]
653 ).replace("%40", "@")
654
655 def query_data_2_rsm_request(
656 self,
657 query_data: Dict[str, List[str]]
658 ) -> rsm.RSMRequest:
659 """Get RSM kwargs to use with RSMRequest from query data"""
660 page = query_data.get("page")
661
662 if page == ["first"]:
663 return rsm.RSMRequest(max_=PAGE_SIZE, before="")
664 elif page == ["last"]:
665 return rsm.RSMRequest(max_=PAGE_SIZE)
666 else:
667 for query_key in ("index", "before", "after"):
668 try:
669 kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
670 except (KeyError, IndexError, ValueError):
671 pass
672 else:
673 return rsm.RSMRequest(**kwargs)
674 raise ValueError(f"Invalid query data: {query_data!r}")
675
676 async def ap_outbox_page_request(
677 self,
678 request: "HTTPRequest",
679 data: Optional[dict],
680 account_jid: jid.JID,
681 node: Optional[str],
682 ap_account: str,
683 ap_url: str,
684 query_data: Dict[str, List[str]]
685 ) -> dict:
686 if node is None:
687 node = self.apg._m.namespace
688 # we only keep useful keys, and sort to have consistent URL which can
689 # be used as ID
690 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
691 query_data = {k: query_data[k] for k in url_keys}
692 try:
693 items, metadata = await self.apg._p.get_items(
694 client=self.apg.client,
695 service=account_jid,
696 node=node,
697 rsm_request=self.query_data_2_rsm_request(query_data),
698 extra = {C.KEY_USE_CACHE: False}
699 )
700 except error.StanzaError as e:
701 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
702 return {}
703
704 base_url = self.get_canonical_url(request)
705 url = f"{base_url}?{parse.urlencode(query_data, True)}"
706 if node and node.startswith(self.apg._events.namespace):
707 ordered_items = [
708 await self.apg.ap_events.event_data_2_ap_item(
709 self.apg._events.event_elt_2_event_data(item),
710 account_jid
711 )
712 for item in reversed(items)
713 ]
714 else:
715 ordered_items = [
716 await self.apg.mb_data_2_ap_item(
717 self.apg.client,
718 await self.apg._m.item_2_mb_data(
719 self.apg.client,
720 item,
721 account_jid,
722 node
723 )
724 )
725 for item in reversed(items)
726 ]
727 ret_data = {
728 "@context": ["https://www.w3.org/ns/activitystreams"],
729 "id": url,
730 "type": "OrderedCollectionPage",
731 "partOf": base_url,
732 "orderedItems": ordered_items
733 }
734
735 if "rsm" not in metadata:
736 # no RSM available, we return what we have
737 return ret_data
738
739 # AP OrderedCollection must be in reversed chronological order, thus the opposite
740 # of what we get with RSM (at least with Libervia Pubsub)
741 if not metadata["complete"]:
742 try:
743 last= metadata["rsm"]["last"]
744 except KeyError:
745 last = None
746 ret_data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
747 if metadata["rsm"]["index"] != 0:
748 try:
749 first= metadata["rsm"]["first"]
750 except KeyError:
751 first = None
752 ret_data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"
753
754 return ret_data
755
756 async def ap_outbox_request(
757 self,
758 request: "HTTPRequest",
759 data: Optional[dict],
760 account_jid: jid.JID,
761 node: Optional[str],
762 ap_account: str,
763 ap_url: str,
764 signing_actor: Optional[str]
765 ) -> dict:
766 if node is None:
767 node = self.apg._m.namespace
768
769 parsed_url = parse.urlparse(request.uri.decode())
770 query_data = parse.parse_qs(parsed_url.query)
771 if query_data:
772 return await self.ap_outbox_page_request(
773 request, data, account_jid, node, ap_account, ap_url, query_data
774 )
775
776 # XXX: we can't use disco#info here because this request won't work on a bare jid
777 # due to security considerations of XEP-0030 (we don't have presence
778 # subscription).
779 # The current workaround is to do a request as if RSM was available, and actually
780 # check its availability according to result.
781 try:
782 __, metadata = await self.apg._p.get_items(
783 client=self.apg.client,
784 service=account_jid,
785 node=node,
786 max_items=0,
787 rsm_request=rsm.RSMRequest(max_=0),
788 extra = {C.KEY_USE_CACHE: False}
789 )
790 except error.StanzaError as e:
791 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
792 return {}
793 try:
794 items_count = metadata["rsm"]["count"]
795 except KeyError:
796 log.warning(
797 f"No RSM metadata found when requesting pubsub node {node} at "
798 f"{account_jid}, defaulting to items_count=20"
799 )
800 items_count = 20
801
802 url = self.get_canonical_url(request)
803 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
804 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
805 return {
806 "@context": ["https://www.w3.org/ns/activitystreams"],
807 "id": url,
808 "totalItems": items_count,
809 "type": "OrderedCollection",
810 "first": url_first_page,
811 "last": url_last_page,
812 }
813
814 async def ap_inbox_request(
815 self,
816 request: "HTTPRequest",
817 data: Optional[dict],
818 account_jid: Optional[jid.JID],
819 node: Optional[str],
820 ap_account: Optional[str],
821 ap_url: str,
822 signing_actor: Optional[str]
823 ) -> None:
824 assert data is not None
825 if signing_actor is None:
826 raise exceptions.InternalError("signing_actor must be set for inbox requests")
827 await self.check_signing_actor(data, signing_actor)
828 activity_type = (data.get("type") or "").lower()
829 if not activity_type in ACTIVITY_TYPES_LOWER:
830 return self.response_code(
831 request,
832 http.UNSUPPORTED_MEDIA_TYPE,
833 f"request is not an activity, ignoring"
834 )
835
836 if account_jid is None and activity_type not in ACTIVIY_NO_ACCOUNT_ALLOWED:
837 return self.response_code(
838 request,
839 http.UNSUPPORTED_MEDIA_TYPE,
840 f"{activity_type.title()!r} activity must target an account"
841 )
842
843 try:
844 method = getattr(self, f"handle_{activity_type}_activity")
845 except AttributeError:
846 return self.response_code(
847 request,
848 http.UNSUPPORTED_MEDIA_TYPE,
849 f"{activity_type.title()} activity is not yet supported"
850 )
851 else:
852 await method(
853 request, data, account_jid, node, ap_account, ap_url, signing_actor
854 )
855
856 async def ap_followers_request(
857 self,
858 request: "HTTPRequest",
859 data: Optional[dict],
860 account_jid: jid.JID,
861 node: Optional[str],
862 ap_account: Optional[str],
863 ap_url: str,
864 signing_actor: Optional[str]
865 ) -> dict:
866 if node is None:
867 node = self.apg._m.namespace
868 client = self.apg.client
869 subscribers = await self.apg._pps.get_public_node_subscriptions(
870 client, account_jid, node
871 )
872 followers = []
873 for subscriber in subscribers.keys():
874 if self.apg.is_virtual_jid(subscriber):
875 # the subscriber is an AP user subscribed with this gateway
876 ap_account = self.apg._e.unescape(subscriber.user)
877 else:
878 # regular XMPP user
879 ap_account = await self.apg.get_ap_account_from_jid_and_node(subscriber, node)
880 followers.append(ap_account)
881
882 url = self.get_canonical_url(request)
883 return {
884 "@context": ["https://www.w3.org/ns/activitystreams"],
885 "type": "OrderedCollection",
886 "id": url,
887 "totalItems": len(subscribers),
888 "first": {
889 "type": "OrderedCollectionPage",
890 "id": url,
891 "orderedItems": followers
892 }
893 }
894
895 async def ap_following_request(
896 self,
897 request: "HTTPRequest",
898 data: Optional[dict],
899 account_jid: jid.JID,
900 node: Optional[str],
901 ap_account: Optional[str],
902 ap_url: str,
903 signing_actor: Optional[str]
904 ) -> dict[str, Any]:
905 client = self.apg.client
906 subscriptions = await self.apg._pps.subscriptions(
907 client, account_jid, node
908 )
909 following = []
910 for sub_dict in subscriptions:
911 service = jid.JID(sub_dict["service"])
912 if self.apg.is_virtual_jid(service):
913 # the subscription is to an AP actor with this gateway
914 ap_account = self.apg._e.unescape(service.user)
915 else:
916 # regular XMPP user
917 ap_account = await self.apg.get_ap_account_from_jid_and_node(
918 service, sub_dict["node"]
919 )
920 following.append(ap_account)
921
922 url = self.get_canonical_url(request)
923 return {
924 "@context": ["https://www.w3.org/ns/activitystreams"],
925 "type": "OrderedCollection",
926 "id": url,
927 "totalItems": len(subscriptions),
928 "first": {
929 "type": "OrderedCollectionPage",
930 "id": url,
931 "orderedItems": following
932 }
933 }
934
935 def _get_to_log(
936 self,
937 request: "HTTPRequest",
938 data: Optional[dict] = None,
939 ) -> List[str]:
940 """Get base data to logs in verbose mode"""
941 from pprint import pformat
942 to_log = [
943 "",
944 f"<<< got {request.method.decode()} request - {request.uri.decode()}"
945 ]
946 if data is not None:
947 to_log.append(pformat(data))
948 if self.apg.verbose>=3:
949 headers = "\n".join(
950 f" {k.decode()}: {v.decode()}"
951 for k,v in request.getAllHeaders().items()
952 )
953 to_log.append(f" headers:\n{headers}")
954 return to_log
955
956 async def ap_request(
957 self,
958 request: "HTTPRequest",
959 data: Optional[dict] = None,
960 signing_actor: Optional[str] = None
961 ) -> None:
962 if self.apg.verbose:
963 to_log = self._get_to_log(request, data)
964
965 path = request.path.decode()
966 ap_url = parse.urljoin(
967 f"https://{self.apg.public_url}",
968 path
969 )
970 request_type, extra_args = self.apg.parse_apurl(ap_url)
971 if ((MEDIA_TYPE_AP not in (request.getHeader("accept") or "")
972 and request_type in self.apg.html_redirect)):
973 # this is not a AP request, and we have a redirections for it
974 kw = {}
975 if extra_args:
976 kw["jid"], kw["node"] = await self.apg.get_jid_and_node(extra_args[0])
977 kw["jid_user"] = kw["jid"].user
978 if kw["node"] is None:
979 kw["node"] = self.apg._m.namespace
980 if len(extra_args) > 1:
981 kw["item"] = extra_args[1]
982 else:
983 kw["item"] = ""
984 else:
985 kw["jid"], kw["jid_user"], kw["node"], kw["item"] = "", "", "", ""
986
987 redirections = self.apg.html_redirect[request_type]
988 for redirection in redirections:
989 filters = redirection["filters"]
990 if not filters:
991 break
992 # if we have filter, they must all match
993 elif all(v in kw[k] for k,v in filters.items()):
994 break
995 else:
996 # no redirection is matching
997 redirection = None
998
999 if redirection is not None:
1000 kw = {k: parse.quote(str(v), safe="") for k,v in kw.items()}
1001 target_url = redirection["url"].format(**kw)
1002 content = web_util.redirectTo(target_url.encode(), request)
1003 request.write(content)
1004 request.finish()
1005 return
1006
1007 if len(extra_args) == 0:
1008 if request_type != "shared_inbox":
1009 raise exceptions.DataError(f"Invalid request type: {request_type!r}")
1010 ret_data = await self.ap_inbox_request(
1011 request, data, None, None, None, ap_url, signing_actor
1012 )
1013 elif request_type == "avatar":
1014 if len(extra_args) != 1:
1015 raise exceptions.DataError("avatar argument expected in URL")
1016 avatar_filename = extra_args[0]
1017 avatar_path = self.apg.host.common_cache.getPath(avatar_filename)
1018 return static.File(str(avatar_path)).render(request)
1019 elif request_type == "item":
1020 ret_data = await self.apg.ap_get_local_object(ap_url)
1021 if "@context" not in ret_data:
1022 ret_data["@context"] = [NS_AP]
1023 else:
1024 if len(extra_args) > 1:
1025 log.warning(f"unexpected extra arguments: {extra_args!r}")
1026 ap_account = extra_args[0]
1027 account_jid, node = await self.apg.get_jid_and_node(ap_account)
1028 if request_type not in AP_REQUEST_TYPES.get(
1029 request.method.decode().upper(), []
1030 ):
1031 raise exceptions.DataError(f"Invalid request type: {request_type!r}")
1032 method = getattr(self, f"AP{request_type.title()}Request")
1033 ret_data = await method(
1034 request, data, account_jid, node, ap_account, ap_url, signing_actor
1035 )
1036 if ret_data is not None:
1037 request.setHeader("content-type", CONTENT_TYPE_AP)
1038 request.write(json.dumps(ret_data).encode())
1039 if self.apg.verbose:
1040 to_log.append(f"--- RET (code: {request.code})---")
1041 if self.apg.verbose>=2:
1042 if ret_data is not None:
1043 from pprint import pformat
1044 to_log.append(f"{pformat(ret_data)}")
1045 to_log.append("---")
1046 log.info("\n".join(to_log))
1047 request.finish()
1048
1049 async def ap_post_request(self, request: "HTTPRequest") -> None:
1050 try:
1051 data = json.load(request.content)
1052 if not isinstance(data, dict):
1053 log.warning(f"JSON data should be an object (uri={request.uri.decode()})")
1054 self.response_code(
1055 request,
1056 http.BAD_REQUEST,
1057 f"invalid body, was expecting a JSON object"
1058 )
1059 request.finish()
1060 return
1061 except (json.JSONDecodeError, ValueError) as e:
1062 self.response_code(
1063 request,
1064 http.BAD_REQUEST,
1065 f"invalid json in inbox request: {e}"
1066 )
1067 request.finish()
1068 return
1069 else:
1070 request.content.seek(0)
1071
1072 try:
1073 if data["type"] == "Delete" and data["actor"] == data["object"]:
1074 # we don't handle actor deletion
1075 request.setResponseCode(http.ACCEPTED)
1076 log.debug(f"ignoring actor deletion ({data['actor']})")
1077 # TODO: clean data in cache coming from this actor, maybe with a tombstone
1078 request.finish()
1079 return
1080 except KeyError:
1081 pass
1082
1083 try:
1084 signing_actor = await self.check_signature(request)
1085 except exceptions.EncryptionError as e:
1086 if self.apg.verbose:
1087 to_log = self._get_to_log(request)
1088 to_log.append(f" body: {request.content.read()!r}")
1089 request.content.seek(0)
1090 log.info("\n".join(to_log))
1091 self.response_code(
1092 request,
1093 http.FORBIDDEN,
1094 f"invalid signature: {e}"
1095 )
1096 request.finish()
1097 return
1098 except Exception as e:
1099 self.response_code(
1100 request,
1101 http.INTERNAL_SERVER_ERROR,
1102 f"Can't check signature: {e}"
1103 )
1104 request.finish()
1105 return
1106
1107 request.setResponseCode(http.ACCEPTED)
1108
1109 digest = request.getHeader("digest")
1110 if digest in self._seen_digest:
1111 log.debug(f"Ignoring duplicated request (digest: {digest!r})")
1112 request.finish()
1113 return
1114 self._seen_digest.append(digest)
1115
1116 # default response code, may be changed, e.g. in case of exception
1117 try:
1118 return await self.ap_request(request, data, signing_actor)
1119 except Exception as e:
1120 self._on_request_error(failure.Failure(e), request)
1121
1122 async def check_signing_actor(self, data: dict, signing_actor: str) -> None:
1123 """That that signing actor correspond to actor declared in data
1124
1125 @param data: request payload
1126 @param signing_actor: actor ID of the signing entity, as returned by
1127 check_signature
1128 @raise exceptions.NotFound: no actor found in data
1129 @raise exceptions.EncryptionError: signing actor doesn't match actor in data
1130 """
1131 actor = await self.apg.ap_get_sender_actor(data)
1132
1133 if signing_actor != actor:
1134 raise exceptions.EncryptionError(
1135 f"signing actor ({signing_actor}) doesn't match actor in data ({actor})"
1136 )
1137
1138 async def check_signature(self, request: "HTTPRequest") -> str:
1139 """Check and validate HTTP signature
1140
1141 @return: id of the signing actor
1142
1143 @raise exceptions.EncryptionError: signature is not present or doesn't match
1144 """
1145 signature = request.getHeader("Signature")
1146 if signature is None:
1147 raise exceptions.EncryptionError("No signature found")
1148 sign_data = {
1149 m["key"]: m["uq_value"] or m["quoted_value"][1:-1]
1150 for m in RE_SIG_PARAM.finditer(signature)
1151 }
1152 try:
1153 key_id = sign_data["keyId"]
1154 except KeyError:
1155 raise exceptions.EncryptionError('"keyId" is missing from signature')
1156 algorithm = sign_data.get("algorithm", HS2019)
1157 signed_headers = sign_data.get(
1158 "headers",
1159 "(created)" if algorithm==HS2019 else "date"
1160 ).lower().split()
1161 try:
1162 headers_to_check = SIGN_HEADERS[None] + SIGN_HEADERS[request.method]
1163 except KeyError:
1164 raise exceptions.InternalError(
1165 f"there should be a list of headers for {request.method} method"
1166 )
1167 if not headers_to_check:
1168 raise exceptions.InternalError("headers_to_check must not be empty")
1169
1170 for header in headers_to_check:
1171 if isinstance(header, tuple):
1172 if len(set(header).intersection(signed_headers)) == 0:
1173 raise exceptions.EncryptionError(
1174 f"at least one of following header must be signed: {header}"
1175 )
1176 elif header not in signed_headers:
1177 raise exceptions.EncryptionError(
1178 f"the {header!r} header must be signed"
1179 )
1180
1181 body = request.content.read()
1182 request.content.seek(0)
1183 headers = {}
1184 for to_sign in signed_headers:
1185 if to_sign == "(request-target)":
1186 method = request.method.decode().lower()
1187 uri = request.uri.decode()
1188 headers[to_sign] = f"{method} /{uri.lstrip('/')}"
1189 elif to_sign in ("(created)", "(expires)"):
1190 if algorithm != HS2019:
1191 raise exceptions.EncryptionError(
1192 f"{to_sign!r} pseudo-header can only be used with {HS2019} "
1193 "algorithm"
1194 )
1195 key = to_sign[1:-1]
1196 value = sign_data.get(key)
1197 if not value:
1198 raise exceptions.EncryptionError(
1199 "{key!r} parameter is missing from signature"
1200 )
1201 try:
1202 if float(value) < 0:
1203 raise ValueError
1204 except ValueError:
1205 raise exceptions.EncryptionError(
1206 f"{to_sign} must be a Unix timestamp"
1207 )
1208 headers[to_sign] = value
1209 else:
1210 value = request.getHeader(to_sign)
1211 if not value:
1212 raise exceptions.EncryptionError(
1213 f"value of header {to_sign!r} is missing!"
1214 )
1215 elif to_sign == "host":
1216 # we check Forwarded/X-Forwarded-Host headers
1217 # as we need original host if a proxy has modified the header
1218 forwarded = request.getHeader("forwarded")
1219 if forwarded is not None:
1220 try:
1221 host = [
1222 f[5:] for f in forwarded.split(";")
1223 if f.startswith("host=")
1224 ][0] or None
1225 except IndexError:
1226 host = None
1227 else:
1228 host = None
1229 if host is None:
1230 host = request.getHeader("x-forwarded-host")
1231 if host:
1232 value = host
1233 elif to_sign == "digest":
1234 hashes = {
1235 algo.lower(): hash_ for algo, hash_ in (
1236 digest.split("=", 1) for digest in value.split(",")
1237 )
1238 }
1239 try:
1240 given_digest = hashes["sha-256"]
1241 except KeyError:
1242 raise exceptions.EncryptionError(
1243 "Only SHA-256 algorithm is currently supported for digest"
1244 )
1245 __, computed_digest = self.apg.get_digest(body)
1246 if given_digest != computed_digest:
1247 raise exceptions.EncryptionError(
1248 f"SHA-256 given and computed digest differ:\n"
1249 f"given: {given_digest!r}\ncomputed: {computed_digest!r}"
1250 )
1251 headers[to_sign] = value
1252
1253 # date check
1254 limit_ts = time.time() + SIGN_EXP
1255 if "(created)" in headers:
1256 created = float(headers["created"])
1257 else:
1258 created = date_utils.date_parse(headers["date"])
1259
1260
1261 try:
1262 expires = float(headers["expires"])
1263 except KeyError:
1264 pass
1265 else:
1266 if expires < created:
1267 log.warning(
1268 f"(expires) [{expires}] set in the past of (created) [{created}] "
1269 "ignoring it according to specs"
1270 )
1271 else:
1272 limit_ts = min(limit_ts, expires)
1273
1274 if created > limit_ts:
1275 raise exceptions.EncryptionError("Signature has expired")
1276
1277 try:
1278 return await self.apg.check_signature(
1279 sign_data["signature"],
1280 key_id,
1281 headers
1282 )
1283 except exceptions.EncryptionError:
1284 method, url = headers["(request-target)"].rsplit(' ', 1)
1285 headers["(request-target)"] = f"{method} {parse.unquote(url)}"
1286 log.debug(
1287 "Using workaround for (request-target) encoding bug in signature, "
1288 "see https://github.com/mastodon/mastodon/issues/18871"
1289 )
1290 return await self.apg.check_signature(
1291 sign_data["signature"],
1292 key_id,
1293 headers
1294 )
1295
1296 def render(self, request):
1297 request.setHeader("server", VERSION)
1298 return super().render(request)
1299
1300 def render_GET(self, request):
1301 path = request.path.decode().lstrip("/")
1302 if path.startswith(".well-known/webfinger"):
1303 defer.ensureDeferred(self.webfinger(request))
1304 return server.NOT_DONE_YET
1305 elif path.startswith(self.apg.ap_path):
1306 d = defer.ensureDeferred(self.ap_request(request))
1307 d.addErrback(self._on_request_error, request)
1308 return server.NOT_DONE_YET
1309
1310 return web_resource.NoResource().render(request)
1311
1312 def render_POST(self, request):
1313 path = request.path.decode().lstrip("/")
1314 if not path.startswith(self.apg.ap_path):
1315 return web_resource.NoResource().render(request)
1316 defer.ensureDeferred(self.ap_post_request(request))
1317 return server.NOT_DONE_YET
1318
1319
1320 class HTTPRequest(server.Request):
1321 pass
1322
1323
1324 class HTTPServer(server.Site):
1325 requestFactory = HTTPRequest
1326
1327 def __init__(self, ap_gateway):
1328 super().__init__(HTTPAPGServer(ap_gateway))