Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3729:86eea17cafa7
component AP gateway: split plugin in several files:
constants, HTTP server and Pubsub service have been put in separated files.
rel: 363
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 31 Jan 2022 18:35:49 +0100 |
parents | sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d |
children | a8c7e5cef0cb |
comparison
equal
deleted
inserted
replaced
3728:b15644cae50d | 3729:86eea17cafa7 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia ActivityPub Gateway | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import Optional, Dict, List | |
20 import json | |
21 from urllib import parse | |
22 import re | |
23 import unicodedata | |
24 | |
25 from twisted.web import http, resource as web_resource, server | |
26 from twisted.internet import defer | |
27 from twisted.words.protocols.jabber import jid, error | |
28 from wokkel import pubsub, rsm | |
29 | |
30 from sat.core import exceptions | |
31 from sat.core.constants import Const as C | |
32 from sat.core.i18n import _ | |
33 from sat.core.log import getLogger | |
34 | |
35 from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX, | |
36 AP_REQUEST_TYPES, PAGE_SIZE) | |
37 | |
38 | |
39 log = getLogger(__name__) | |
40 | |
41 VERSION = unicodedata.normalize( | |
42 'NFKD', | |
43 f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}" | |
44 ) | |
45 | |
46 | |
47 class HTTPAPGServer(web_resource.Resource): | |
48 """HTTP Server handling ActivityPub S2S protocol""" | |
49 isLeaf = True | |
50 | |
51 def __init__(self, ap_gateway): | |
52 self.apg = ap_gateway | |
53 super().__init__() | |
54 | |
55 async def webfinger(self, request): | |
56 url_parsed = parse.urlparse(request.uri.decode()) | |
57 query = parse.parse_qs(url_parsed.query) | |
58 resource = query.get("resource", [""])[0] | |
59 account = resource[5:].strip() | |
60 if not resource.startswith("acct:") or not account: | |
61 return web_resource.ErrorPage( | |
62 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource" | |
63 ).render(request) | |
64 | |
65 actor_url = self.apg.buildAPURL(TYPE_ACTOR, account) | |
66 | |
67 resp = { | |
68 "subject": resource, | |
69 "links": [ | |
70 { | |
71 "rel": "self", | |
72 "type": "application/activity+json", | |
73 "href": actor_url | |
74 } | |
75 ] | |
76 } | |
77 request.setHeader("content-type", CONTENT_TYPE_AP) | |
78 request.write(json.dumps(resp).encode()) | |
79 request.finish() | |
80 | |
81 async def APActorRequest( | |
82 self, | |
83 request: "HTTPRequest", | |
84 account_jid: jid.JID, | |
85 node: Optional[str], | |
86 ap_account: str, | |
87 actor_url: str | |
88 ) -> dict: | |
89 inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) | |
90 outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) | |
91 | |
92 # we have to use AP account as preferredUsername because it is used to retrieve | |
93 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) | |
94 preferred_username = ap_account.split("@", 1)[0] | |
95 return { | |
96 "@context": [ | |
97 "https://www.w3.org/ns/activitystreams", | |
98 "https://w3id.org/security/v1" | |
99 ], | |
100 | |
101 "id": actor_url, | |
102 "type": "Person", | |
103 "preferredUsername": preferred_username, | |
104 "inbox": inbox_url, | |
105 "outbox": outbox_url, | |
106 "publicKey": { | |
107 "id": f"{actor_url}#main-key", | |
108 "owner": actor_url, | |
109 "publicKeyPem": self.apg.public_key_pem | |
110 } | |
111 } | |
112 | |
113 def getCanonicalURL(self, request: "HTTPRequest") -> str: | |
114 return parse.urljoin( | |
115 f"https://{self.apg.public_url}", | |
116 request.path.decode().rstrip("/") | |
117 ) | |
118 | |
119 def queryData2RSMRequest( | |
120 self, | |
121 query_data: Dict[str, List[str]] | |
122 ) -> rsm.RSMRequest: | |
123 """Get RSM kwargs to use with RSMRequest from query data""" | |
124 page = query_data.get("page") | |
125 | |
126 if page == ["first"]: | |
127 return rsm.RSMRequest(max_=PAGE_SIZE, before="") | |
128 elif page == ["last"]: | |
129 return rsm.RSMRequest(max_=PAGE_SIZE) | |
130 else: | |
131 for query_key in ("index", "before", "after"): | |
132 try: | |
133 kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE} | |
134 except (KeyError, IndexError, ValueError): | |
135 pass | |
136 else: | |
137 return rsm.RSMRequest(**kwargs) | |
138 raise ValueError(f"Invalid query data: {query_data!r}") | |
139 | |
140 async def APOutboxPageRequest( | |
141 self, | |
142 request: "HTTPRequest", | |
143 account_jid: jid.JID, | |
144 node: Optional[str], | |
145 ap_account: str, | |
146 ap_url: str, | |
147 query_data: Dict[str, List[str]] | |
148 ) -> dict: | |
149 # we only keep useful keys, and sort to have consistent URL which can | |
150 # be used as ID | |
151 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) | |
152 query_data = {k: query_data[k] for k in url_keys} | |
153 try: | |
154 items, metadata = await self.apg._p.getItems( | |
155 client=self.apg.client, | |
156 service=account_jid, | |
157 node=node, | |
158 rsm_request=self.queryData2RSMRequest(query_data), | |
159 extra = {C.KEY_USE_CACHE: False} | |
160 ) | |
161 except error.StanzaError as e: | |
162 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") | |
163 return {} | |
164 | |
165 base_url = self.getCanonicalURL(request) | |
166 url = f"{base_url}?{parse.urlencode(query_data, True)}" | |
167 data = { | |
168 "@context": "https://www.w3.org/ns/activitystreams", | |
169 "id": url, | |
170 "type": "OrderedCollectionPage", | |
171 "partOf": base_url, | |
172 "orderedItems" : [ | |
173 await self.apg.mbdata2APitem( | |
174 self.apg.client, | |
175 await self.apg._m.item2mbdata( | |
176 self.apg.client, | |
177 item, | |
178 account_jid, | |
179 node | |
180 ) | |
181 ) | |
182 for item in reversed(items) | |
183 ] | |
184 } | |
185 | |
186 # AP OrderedCollection must be in reversed chronological order, thus the opposite | |
187 # of what we get with RSM (at least with Libervia Pubsub) | |
188 if not metadata["complete"]: | |
189 try: | |
190 last= metadata["rsm"]["last"] | |
191 except KeyError: | |
192 last = None | |
193 data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}" | |
194 if metadata["rsm"]["index"] != 0: | |
195 try: | |
196 first= metadata["rsm"]["first"] | |
197 except KeyError: | |
198 first = None | |
199 data["next"] = f"{base_url}?{parse.urlencode({'before': first})}" | |
200 | |
201 return data | |
202 | |
203 async def APOutboxRequest( | |
204 self, | |
205 request: "HTTPRequest", | |
206 account_jid: jid.JID, | |
207 node: Optional[str], | |
208 ap_account: str, | |
209 ap_url: str | |
210 ) -> dict: | |
211 if node is None: | |
212 node = self.apg._m.namespace | |
213 | |
214 parsed_url = parse.urlparse(request.uri.decode()) | |
215 query_data = parse.parse_qs(parsed_url.query) | |
216 if query_data: | |
217 return await self.APOutboxPageRequest( | |
218 request, account_jid, node, ap_account, ap_url, query_data | |
219 ) | |
220 | |
221 # XXX: we can't use disco#info here because this request won't work on a bare jid | |
222 # due to security considerations of XEP-0030 (we don't have presence | |
223 # subscription). | |
224 # The current workaround is to do a request as if RSM was available, and actually | |
225 # check its availability according to result. | |
226 try: | |
227 __, metadata = await self.apg._p.getItems( | |
228 client=self.apg.client, | |
229 service=account_jid, | |
230 node=node, | |
231 max_items=0, | |
232 rsm_request=rsm.RSMRequest(max_=0) | |
233 ) | |
234 except error.StanzaError as e: | |
235 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}") | |
236 return {} | |
237 try: | |
238 items_count = metadata["rsm"]["count"] | |
239 except KeyError: | |
240 log.warning( | |
241 f"No RSM metadata found when requesting pubsub node {node} at " | |
242 f"{account_jid}, defaulting to items_count=20" | |
243 ) | |
244 items_count = 20 | |
245 | |
246 url = self.getCanonicalURL(request) | |
247 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" | |
248 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" | |
249 return { | |
250 "@context": "https://www.w3.org/ns/activitystreams", | |
251 "id": url, | |
252 "totalItems": items_count, | |
253 "type": "OrderedCollection", | |
254 "first": url_first_page, | |
255 "last": url_last_page, | |
256 } | |
257 | |
258 async def APRequest(self, request): | |
259 path = request.path.decode() | |
260 ap_url = parse.urljoin( | |
261 f"https://{self.apg.public_url}", | |
262 path | |
263 ) | |
264 request_type, ap_account = self.apg.parseAPURL(ap_url) | |
265 account_jid, node = await self.apg.getJIDAndNode(ap_account) | |
266 if request_type not in AP_REQUEST_TYPES: | |
267 raise exceptions.DataError(f"Invalid request type: {request_type!r}") | |
268 method = getattr(self, f"AP{request_type.title()}Request") | |
269 ret_data = await method(request, account_jid, node, ap_account, ap_url) | |
270 request.setHeader("content-type", CONTENT_TYPE_AP) | |
271 request.write(json.dumps(ret_data).encode()) | |
272 request.finish() | |
273 | |
274 def render(self, request): | |
275 request.setHeader("server", VERSION) | |
276 return super().render(request) | |
277 | |
278 def render_GET(self, request): | |
279 path = request.path.decode().lstrip("/") | |
280 if path.startswith(".well-known/webfinger"): | |
281 defer.ensureDeferred(self.webfinger(request)) | |
282 return server.NOT_DONE_YET | |
283 elif path.startswith(self.apg.ap_path): | |
284 defer.ensureDeferred(self.APRequest(request)) | |
285 return server.NOT_DONE_YET | |
286 | |
287 return web_resource.NoResource().render(request) | |
288 | |
289 | |
290 class HTTPRequest(server.Request): | |
291 pass | |
292 | |
293 | |
294 class HTTPServer(server.Site): | |
295 requestFactory = HTTPRequest | |
296 | |
297 def __init__(self, ap_gateway): | |
298 super().__init__(HTTPAPGServer(ap_gateway)) |