comparison sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3729:86eea17cafa7

component AP gateway: split plugin in several files: constants, HTTP server and Pubsub service have been put in separated files. rel: 363
author Goffi <goffi@goffi.org>
date Mon, 31 Jan 2022 18:35:49 +0100
parents sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d
children a8c7e5cef0cb
comparison
equal deleted inserted replaced
3728:b15644cae50d 3729:86eea17cafa7
1 #!/usr/bin/env python3
2
3 # Libervia ActivityPub Gateway
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Optional, Dict, List
20 import json
21 from urllib import parse
22 import re
23 import unicodedata
24
25 from twisted.web import http, resource as web_resource, server
26 from twisted.internet import defer
27 from twisted.words.protocols.jabber import jid, error
28 from wokkel import pubsub, rsm
29
30 from sat.core import exceptions
31 from sat.core.constants import Const as C
32 from sat.core.i18n import _
33 from sat.core.log import getLogger
34
35 from .constants import (CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_OUTBOX,
36 AP_REQUEST_TYPES, PAGE_SIZE)
37
38
39 log = getLogger(__name__)
40
41 VERSION = unicodedata.normalize(
42 'NFKD',
43 f"{C.APP_NAME} ActivityPub Gateway {C.APP_VERSION}"
44 )
45
46
47 class HTTPAPGServer(web_resource.Resource):
48 """HTTP Server handling ActivityPub S2S protocol"""
49 isLeaf = True
50
51 def __init__(self, ap_gateway):
52 self.apg = ap_gateway
53 super().__init__()
54
55 async def webfinger(self, request):
56 url_parsed = parse.urlparse(request.uri.decode())
57 query = parse.parse_qs(url_parsed.query)
58 resource = query.get("resource", [""])[0]
59 account = resource[5:].strip()
60 if not resource.startswith("acct:") or not account:
61 return web_resource.ErrorPage(
62 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
63 ).render(request)
64
65 actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)
66
67 resp = {
68 "subject": resource,
69 "links": [
70 {
71 "rel": "self",
72 "type": "application/activity+json",
73 "href": actor_url
74 }
75 ]
76 }
77 request.setHeader("content-type", CONTENT_TYPE_AP)
78 request.write(json.dumps(resp).encode())
79 request.finish()
80
81 async def APActorRequest(
82 self,
83 request: "HTTPRequest",
84 account_jid: jid.JID,
85 node: Optional[str],
86 ap_account: str,
87 actor_url: str
88 ) -> dict:
89 inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
90 outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
91
92 # we have to use AP account as preferredUsername because it is used to retrieve
93 # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
94 preferred_username = ap_account.split("@", 1)[0]
95 return {
96 "@context": [
97 "https://www.w3.org/ns/activitystreams",
98 "https://w3id.org/security/v1"
99 ],
100
101 "id": actor_url,
102 "type": "Person",
103 "preferredUsername": preferred_username,
104 "inbox": inbox_url,
105 "outbox": outbox_url,
106 "publicKey": {
107 "id": f"{actor_url}#main-key",
108 "owner": actor_url,
109 "publicKeyPem": self.apg.public_key_pem
110 }
111 }
112
113 def getCanonicalURL(self, request: "HTTPRequest") -> str:
114 return parse.urljoin(
115 f"https://{self.apg.public_url}",
116 request.path.decode().rstrip("/")
117 )
118
119 def queryData2RSMRequest(
120 self,
121 query_data: Dict[str, List[str]]
122 ) -> rsm.RSMRequest:
123 """Get RSM kwargs to use with RSMRequest from query data"""
124 page = query_data.get("page")
125
126 if page == ["first"]:
127 return rsm.RSMRequest(max_=PAGE_SIZE, before="")
128 elif page == ["last"]:
129 return rsm.RSMRequest(max_=PAGE_SIZE)
130 else:
131 for query_key in ("index", "before", "after"):
132 try:
133 kwargs={query_key: query_data[query_key][0], "max_": PAGE_SIZE}
134 except (KeyError, IndexError, ValueError):
135 pass
136 else:
137 return rsm.RSMRequest(**kwargs)
138 raise ValueError(f"Invalid query data: {query_data!r}")
139
140 async def APOutboxPageRequest(
141 self,
142 request: "HTTPRequest",
143 account_jid: jid.JID,
144 node: Optional[str],
145 ap_account: str,
146 ap_url: str,
147 query_data: Dict[str, List[str]]
148 ) -> dict:
149 # we only keep useful keys, and sort to have consistent URL which can
150 # be used as ID
151 url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
152 query_data = {k: query_data[k] for k in url_keys}
153 try:
154 items, metadata = await self.apg._p.getItems(
155 client=self.apg.client,
156 service=account_jid,
157 node=node,
158 rsm_request=self.queryData2RSMRequest(query_data),
159 extra = {C.KEY_USE_CACHE: False}
160 )
161 except error.StanzaError as e:
162 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
163 return {}
164
165 base_url = self.getCanonicalURL(request)
166 url = f"{base_url}?{parse.urlencode(query_data, True)}"
167 data = {
168 "@context": "https://www.w3.org/ns/activitystreams",
169 "id": url,
170 "type": "OrderedCollectionPage",
171 "partOf": base_url,
172 "orderedItems" : [
173 await self.apg.mbdata2APitem(
174 self.apg.client,
175 await self.apg._m.item2mbdata(
176 self.apg.client,
177 item,
178 account_jid,
179 node
180 )
181 )
182 for item in reversed(items)
183 ]
184 }
185
186 # AP OrderedCollection must be in reversed chronological order, thus the opposite
187 # of what we get with RSM (at least with Libervia Pubsub)
188 if not metadata["complete"]:
189 try:
190 last= metadata["rsm"]["last"]
191 except KeyError:
192 last = None
193 data["prev"] = f"{base_url}?{parse.urlencode({'after': last})}"
194 if metadata["rsm"]["index"] != 0:
195 try:
196 first= metadata["rsm"]["first"]
197 except KeyError:
198 first = None
199 data["next"] = f"{base_url}?{parse.urlencode({'before': first})}"
200
201 return data
202
203 async def APOutboxRequest(
204 self,
205 request: "HTTPRequest",
206 account_jid: jid.JID,
207 node: Optional[str],
208 ap_account: str,
209 ap_url: str
210 ) -> dict:
211 if node is None:
212 node = self.apg._m.namespace
213
214 parsed_url = parse.urlparse(request.uri.decode())
215 query_data = parse.parse_qs(parsed_url.query)
216 if query_data:
217 return await self.APOutboxPageRequest(
218 request, account_jid, node, ap_account, ap_url, query_data
219 )
220
221 # XXX: we can't use disco#info here because this request won't work on a bare jid
222 # due to security considerations of XEP-0030 (we don't have presence
223 # subscription).
224 # The current workaround is to do a request as if RSM was available, and actually
225 # check its availability according to result.
226 try:
227 __, metadata = await self.apg._p.getItems(
228 client=self.apg.client,
229 service=account_jid,
230 node=node,
231 max_items=0,
232 rsm_request=rsm.RSMRequest(max_=0)
233 )
234 except error.StanzaError as e:
235 log.warning(f"Can't get data from pubsub node {node} at {account_jid}: {e}")
236 return {}
237 try:
238 items_count = metadata["rsm"]["count"]
239 except KeyError:
240 log.warning(
241 f"No RSM metadata found when requesting pubsub node {node} at "
242 f"{account_jid}, defaulting to items_count=20"
243 )
244 items_count = 20
245
246 url = self.getCanonicalURL(request)
247 url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
248 url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
249 return {
250 "@context": "https://www.w3.org/ns/activitystreams",
251 "id": url,
252 "totalItems": items_count,
253 "type": "OrderedCollection",
254 "first": url_first_page,
255 "last": url_last_page,
256 }
257
258 async def APRequest(self, request):
259 path = request.path.decode()
260 ap_url = parse.urljoin(
261 f"https://{self.apg.public_url}",
262 path
263 )
264 request_type, ap_account = self.apg.parseAPURL(ap_url)
265 account_jid, node = await self.apg.getJIDAndNode(ap_account)
266 if request_type not in AP_REQUEST_TYPES:
267 raise exceptions.DataError(f"Invalid request type: {request_type!r}")
268 method = getattr(self, f"AP{request_type.title()}Request")
269 ret_data = await method(request, account_jid, node, ap_account, ap_url)
270 request.setHeader("content-type", CONTENT_TYPE_AP)
271 request.write(json.dumps(ret_data).encode())
272 request.finish()
273
274 def render(self, request):
275 request.setHeader("server", VERSION)
276 return super().render(request)
277
278 def render_GET(self, request):
279 path = request.path.decode().lstrip("/")
280 if path.startswith(".well-known/webfinger"):
281 defer.ensureDeferred(self.webfinger(request))
282 return server.NOT_DONE_YET
283 elif path.startswith(self.apg.ap_path):
284 defer.ensureDeferred(self.APRequest(request))
285 return server.NOT_DONE_YET
286
287 return web_resource.NoResource().render(request)
288
289
290 class HTTPRequest(server.Request):
291 pass
292
293
294 class HTTPServer(server.Site):
295 requestFactory = HTTPRequest
296
297 def __init__(self, ap_gateway):
298 super().__init__(HTTPAPGServer(ap_gateway))