comparison sat/plugins/plugin_comp_ap_gateway.py @ 3682:7c990aaa49d3

comp AP Gateway: ActivityPub Component first draft: this implement the base component, it is for the moment usable only through a developer API. rel: 362
author Goffi <goffi@goffi.org>
date Sun, 26 Sep 2021 16:41:55 +0200
parents
children 8353cc3b8db9
comparison
equal deleted inserted replaced
3681:742e466fa000 3682:7c990aaa49d3
1 #!/usr/bin/env python3
2
3 # Libervia ActivityPub Gateway
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import time
20 import json
21 import base64
22 import hashlib
23 from urllib import parse
24 from typing import Tuple
25 from pathlib import Path
26 import shortuuid
27 from cryptography.hazmat.primitives.asymmetric import rsa
28 from cryptography.hazmat.primitives import serialization
29 from cryptography.hazmat.primitives import hashes
30 from cryptography.hazmat.primitives.asymmetric import padding
31 from twisted.internet import reactor, threads, defer
32 from twisted.web import server, resource as web_resource, http
33 from twisted.words.protocols.jabber import jid
34 import treq
35 from treq.response import _Response as TReqResponse
36 from sat.core.i18n import _
37 from sat.core.constants import Const as C
38 from sat.core import exceptions
39 from sat.core.log import getLogger
40 from sat.core.core_types import SatXMPPEntity
41 from sat.tools.common import tls, data_format
42 from sat.tools import utils
43
44
45 log = getLogger(__name__)
46
47 IMPORT_NAME = "ap-gateway"
48
49 PLUGIN_INFO = {
50 C.PI_NAME: "ActivityPub Gateway component",
51 C.PI_IMPORT_NAME: IMPORT_NAME,
52 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
53 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
54 C.PI_PROTOCOLS: [],
55 C.PI_DEPENDENCIES: ["XEP-0106"],
56 C.PI_RECOMMENDATIONS: [],
57 C.PI_MAIN: "APGateway",
58 C.PI_HANDLER: C.BOOL_FALSE,
59 C.PI_DESCRIPTION: _(
60 "Gateway for bidirectional communication between XMPP and ActivityPub."
61 ),
62 }
63
64 CONF_SECTION = f"component {IMPORT_NAME}"
65 CONTENT_TYPE_AP = "application/activity+json; charset=utf-8"
66 TYPE_ACTOR = "actor"
67 TYPE_INBOX = "inbox"
68 TYPE_ITEM = "item"
69 MEDIA_TYPE_AP = "application/activity+json"
70
71
72 class HTTPAPGServer(web_resource.Resource):
73 """HTTP Server handling ActivityPub S2S protocol"""
74 isLeaf = True
75
76 def __init__(self, ap_gateway):
77 self.apg = ap_gateway
78 super().__init__()
79
80 def webfinger(self, request):
81 url_parsed = parse.urlparse(request.uri.decode())
82 query = parse.parse_qs(url_parsed.query)
83 resource = query.get("resource", [""])[0]
84 account = resource[5:].strip()
85 log.info(f"request pour {account}")
86 if not resource.startswith("acct:") or not account:
87 return web_resource.ErrorPage(
88 http.BAD_REQUEST, "Bad Request" , "Invalid webfinger resource"
89 ).render(request)
90
91 actor_url = self.apg.buildAPURL(TYPE_ACTOR, account)
92
93 resp = {
94 "subject": resource,
95 "links": [
96 {
97 "rel": "self",
98 "type": "application/activity+json",
99 "href": actor_url
100 }
101 ]
102 }
103 request.setHeader("content-type", CONTENT_TYPE_AP)
104 return json.dumps(resp).encode()
105
106 def APRequest(self, request):
107 path = request.path.decode()
108 actor_url = parse.urljoin(
109 f"https://{self.apg.public_url}",
110 path
111 )
112 __, account = self.apg.parseAPURL(actor_url)
113 inbox_url = self.apg.buildAPURL(TYPE_INBOX, account)
114 username = account.split("@", 1)[0]
115 actor = {
116 "@context": [
117 "https://www.w3.org/ns/activitystreams",
118 "https://w3id.org/security/v1"
119 ],
120
121 "id": actor_url,
122 "type": "Person",
123 "preferredUsername": username,
124 "inbox": inbox_url,
125
126 "publicKey": {
127 "id": f"{actor_url}#main-key",
128 "owner": actor_url,
129 "publicKeyPem": self.apg.public_key_pem
130 }
131 }
132 request.setHeader("content-type", CONTENT_TYPE_AP)
133 return json.dumps(actor).encode()
134
135 def render_GET(self, request):
136 path = request.path.decode().lstrip("/")
137 if path.startswith(".well-known/webfinger"):
138 return self.webfinger(request)
139 elif path.startswith(self.apg.ap_path):
140 return self.APRequest(request)
141 return web_resource.NoResource().render(request)
142
143
144 class HTTPRequest(server.Request):
145 pass
146
147
148 class HTTPServer(server.Site):
149 requestFactory = HTTPRequest
150
151 def __init__(self, ap_gateway):
152 super().__init__(HTTPAPGServer(ap_gateway))
153
154
155 class APGateway:
156
157 def __init__(self, host):
158 self.host = host
159 self.initialised = False
160
161 host.bridge.addMethod(
162 "APSend",
163 ".plugin",
164 in_sign="sss",
165 out_sign="",
166 method=self._publishMessage,
167 async_=True,
168 )
169
170 async def init(self, client):
171 if self.initialised:
172 return
173
174 self.initialised = True
175 log.info(_("ActivityPub Gateway initialization"))
176
177 # RSA keys
178 stored_data = await self.host.memory.storage.getPrivates(
179 IMPORT_NAME, ["rsa_key"], profile=client.profile
180 )
181 private_key_pem = stored_data.get("rsa_key")
182 if private_key_pem is None:
183 self.private_key = await threads.deferToThread(
184 rsa.generate_private_key,
185 public_exponent=65537,
186 key_size=4096,
187 )
188 private_key_pem = self.private_key.private_bytes(
189 encoding=serialization.Encoding.PEM,
190 format=serialization.PrivateFormat.PKCS8,
191 encryption_algorithm=serialization.NoEncryption()
192 ).decode()
193 await self.host.memory.storage.setPrivateValue(
194 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
195 )
196 else:
197 self.private_key = serialization.load_pem_private_key(
198 private_key_pem.encode(),
199 password=None,
200 )
201 self.public_key = self.private_key.public_key()
202 self.public_key_pem = self.public_key.public_bytes(
203 encoding=serialization.Encoding.PEM,
204 format=serialization.PublicFormat.SubjectPublicKeyInfo
205 ).decode()
206
207 # params (URL and port)
208 self.public_url = self.host.memory.getConfig(
209 CONF_SECTION, "public_url"
210 ) or self.host.memory.getConfig(
211 CONF_SECTION, "xmpp_domain"
212 )
213 if self.public_url is None:
214 log.error(
215 '"public_url" not set in configuration, this is mandatory to have'
216 "ActivityPub Gateway running. Please set this option it to public facing "
217 f"url in {CONF_SECTION!r} configuration section."
218 )
219 return
220 if parse.urlparse(self.public_url).scheme:
221 log.error(
222 "Scheme must not be specified in \"public_url\", please remove it from "
223 "\"public_url\" configuration option. ActivityPub Gateway won't be run."
224 )
225 return
226 self.http_port = int(self.host.memory.getConfig(
227 CONF_SECTION, 'http_port', 8123))
228 connection_type = self.host.memory.getConfig(
229 CONF_SECTION, 'http_connection_type', 'https')
230 if connection_type not in ('http', 'https'):
231 raise exceptions.ConfigError(
232 'bad ap-gateay http_connection_type, you must use one of "http" or '
233 '"https"'
234 )
235 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap')
236 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
237
238 # HTTP server launch
239 self.server = HTTPServer(self)
240 if connection_type == 'http':
241 reactor.listenTCP(self.http_port, self.server)
242 else:
243 options = tls.getOptionsFromConfig(
244 self.host.memory.config, CONF_SECTION)
245 tls.TLSOptionsCheck(options)
246 context_factory = tls.getTLSContextFactory(options)
247 reactor.listenSSL(self.http_port, self.server, context_factory)
248
249 async def profileConnecting(self, client):
250 await self.init(client)
251
252 def parseAPURL(self, url: str) -> Tuple[str, str]:
253 """Parse an URL leading to an AP endpoint
254
255 @param url: URL to parse (schema is not mandatory)
256 @return: endpoint type and AP account
257 """
258 path = parse.urlparse(url).path.lstrip("/")
259 type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1)
260 return type_, parse.unquote(account)
261
262 def buildAPURL(self, type_:str , *args: str) -> str:
263 """Build an AP endpoint URL
264
265 @param type_: type of AP endpoing
266 @param arg: endpoint dependant arguments
267 """
268 return parse.urljoin(
269 self.base_ap_url,
270 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
271 )
272
273 async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse:
274 """Sign a documentent and post it to AP server
275
276 @param url: AP server endpoint
277 @param url_actor: URL generated by this gateway for local actor
278 @param doc: document to send
279 """
280 p_url = parse.urlparse(url)
281 date = http.datetimeToString().decode()
282 body = json.dumps(doc).encode()
283 digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode()
284 digest = f"sha-256={digest_hash}"
285 to_sign = (
286 f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n"
287 f"date: {date}\ndigest: {digest}"
288 )
289 signature = base64.b64encode(self.private_key.sign(
290 to_sign.encode(),
291 # we have to use PKCS1v15 padding to be compatible with Mastodon
292 padding.PKCS1v15(),
293 hashes.SHA256()
294 )).decode()
295 h_signature = (
296 f'keyId="{url_actor}",headers="(request-target) host date digest",'
297 f'signature="{signature}"'
298 )
299 return await treq.post(
300 url,
301 body,
302 headers={
303 "Host": [p_url.hostname],
304 "Date": [date],
305 "Digest": [digest],
306 "Signature": [h_signature],
307 }
308 )
309
310 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
311 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
312 service = jid.JID(service_s)
313 client = self.host.getClient(profile)
314 return defer.ensureDeferred(self.publishMessage(client, mess_data, service))
315
316 async def getAPActorData(self, account: str) -> dict:
317 """Retrieve ActivityPub Actor data
318
319 @param account: ActivityPub Actor identifier
320 """
321 if account.count("@") != 1 or "/" in account:
322 raise ValueError("Invalid account: {account!r}")
323 host = account.split("@")[1]
324 try:
325 finger_data = await treq.json_content(await treq.get(
326 f"https://{host}/.well-known/webfinger?"
327 f"resource=acct:{parse.quote_plus(account)}",
328 ))
329 except Exception as e:
330 raise exceptions.DataError(f"Can't get webfinger data: {e}")
331 for link in finger_data.get("links", []):
332 if (
333 link.get("type") == "application/activity+json"
334 and link.get("rel") == "self"
335 ):
336 href = link.get("href", "").strip()
337 if not href:
338 raise ValueError(
339 f"Invalid webfinger data for {account:r}: missing href"
340 )
341 break
342 else:
343 raise ValueError(
344 f"No ActivityPub link found for {account!r}"
345 )
346 try:
347 ap_actor_data = await treq.json_content(await treq.get(
348 href,
349 headers = {
350 "Accept": [MEDIA_TYPE_AP],
351 "Content-Type": [MEDIA_TYPE_AP],
352 }
353 ))
354 except Exception as e:
355 raise exceptions.DataError(f"Can't get ActivityPub actor data: {e}")
356
357 return ap_actor_data
358
359 async def publishMessage(
360 self,
361 client: SatXMPPEntity,
362 mess_data: dict,
363 service: jid.JID
364 ) -> None:
365 """Send an AP message
366
367 .. note::
368
369 This is a temporary method used for development only
370
371 @param mess_data: message data. Following keys must be set:
372
373 ``node``
374 identifier of message which is being replied (this will
375 correspond to pubsub node in the future)
376
377 ``content_xhtml`` or ``content``
378 message body (respectively in XHTML or plain text)
379
380 @param service: JID corresponding to the AP actor.
381 """
382 if not service.user:
383 raise ValueError("service must have a local part")
384 account = self.host.plugins["XEP-0106"].unescape(service.user)
385 ap_actor_data = await self.getAPActorData(account)
386
387 try:
388 inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
389 except KeyError:
390 raise exceptions.DataError("Can't get ActivityPub actor inbox")
391
392 if not mess_data.get("id"):
393 mess_data["id"] = shortuuid.uuid()
394 url_actor = self.buildAPURL(TYPE_ACTOR, client.jid.userhost())
395 url_item = self.buildAPURL(TYPE_ITEM, client.jid.userhost(), mess_data["id"])
396 now = time.time()
397 item_data = {
398 "@context": "https://www.w3.org/ns/activitystreams",
399 "id": url_item,
400 "type": "Create",
401 "actor": url_actor,
402
403 "object": {
404 "id": url_item,
405 "type": "Note",
406 "published": utils.xmpp_date(now),
407 "attributedTo": url_actor,
408 "inReplyTo": mess_data["node"],
409 "content": mess_data.get("content_xhtml") or mess_data["content"],
410 "to": "https://www.w3.org/ns/activitystreams#Public"
411 }
412 }
413 resp = await self.signAndPost(inbox_url, url_actor, item_data)
414 if resp.code == 202:
415 raise exceptions.NetworkError(f"unexpected return code: {resp.code}")