Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3729:86eea17cafa7
component AP gateway: split plugin in several files:
constants, HTTP server and Pubsub service have been put in separated files.
rel: 363
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 31 Jan 2022 18:35:49 +0100 |
parents | sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d |
children | bf0505d41c09 |
comparison
equal
deleted
inserted
replaced
3728:b15644cae50d | 3729:86eea17cafa7 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia ActivityPub Gateway | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import base64 | |
20 import hashlib | |
21 import json | |
22 from pathlib import Path | |
23 from typing import Optional, Dict, Tuple, List, Union | |
24 from urllib import parse | |
25 import calendar | |
26 import re | |
27 | |
28 import dateutil | |
29 from cryptography.hazmat.primitives import serialization | |
30 from cryptography.hazmat.primitives import hashes | |
31 from cryptography.hazmat.primitives.asymmetric import rsa | |
32 from cryptography.hazmat.primitives.asymmetric import padding | |
33 import shortuuid | |
34 import treq | |
35 from treq.response import _Response as TReqResponse | |
36 from twisted.internet import defer, reactor, threads | |
37 from twisted.web import http | |
38 from twisted.words.protocols.jabber import jid, error | |
39 from twisted.words.xish import domish | |
40 from wokkel import rsm | |
41 | |
42 from sat.core import exceptions | |
43 from sat.core.constants import Const as C | |
44 from sat.core.core_types import SatXMPPEntity | |
45 from sat.core.i18n import _ | |
46 from sat.core.log import getLogger | |
47 from sat.tools import utils | |
48 from sat.tools.common import data_format, tls | |
49 from sat.tools.common.async_utils import async_lru | |
50 | |
51 from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP, | |
52 AP_MB_MAP, LRU_MAX_SIZE) | |
53 from .http_server import HTTPServer | |
54 from .pubsub_service import APPubsubService | |
55 | |
56 | |
57 log = getLogger(__name__) | |
58 | |
59 IMPORT_NAME = "ap-gateway" | |
60 | |
61 PLUGIN_INFO = { | |
62 C.PI_NAME: "ActivityPub Gateway component", | |
63 C.PI_IMPORT_NAME: IMPORT_NAME, | |
64 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | |
65 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | |
66 C.PI_PROTOCOLS: [], | |
67 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], | |
68 C.PI_RECOMMENDATIONS: [], | |
69 C.PI_MAIN: "APGateway", | |
70 C.PI_HANDLER: C.BOOL_TRUE, | |
71 C.PI_DESCRIPTION: _( | |
72 "Gateway for bidirectional communication between XMPP and ActivityPub." | |
73 ), | |
74 } | |
75 | |
76 HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})" | |
77 RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") | |
78 RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") | |
79 RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") | |
80 | |
81 | |
82 class APGateway: | |
83 | |
84 def __init__(self, host): | |
85 self.host = host | |
86 self.initialised = False | |
87 self._m = host.plugins["XEP-0277"] | |
88 self._p = host.plugins["XEP-0060"] | |
89 | |
90 host.bridge.addMethod( | |
91 "APSend", | |
92 ".plugin", | |
93 in_sign="sss", | |
94 out_sign="", | |
95 method=self._publishMessage, | |
96 async_=True, | |
97 ) | |
98 | |
99 def getHandler(self, __): | |
100 return APPubsubService(self) | |
101 | |
102 async def init(self, client): | |
103 if self.initialised: | |
104 return | |
105 | |
106 self.initialised = True | |
107 log.info(_("ActivityPub Gateway initialization")) | |
108 | |
109 # RSA keys | |
110 stored_data = await self.host.memory.storage.getPrivates( | |
111 IMPORT_NAME, ["rsa_key"], profile=client.profile | |
112 ) | |
113 private_key_pem = stored_data.get("rsa_key") | |
114 if private_key_pem is None: | |
115 self.private_key = await threads.deferToThread( | |
116 rsa.generate_private_key, | |
117 public_exponent=65537, | |
118 key_size=4096, | |
119 ) | |
120 private_key_pem = self.private_key.private_bytes( | |
121 encoding=serialization.Encoding.PEM, | |
122 format=serialization.PrivateFormat.PKCS8, | |
123 encryption_algorithm=serialization.NoEncryption() | |
124 ).decode() | |
125 await self.host.memory.storage.setPrivateValue( | |
126 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile | |
127 ) | |
128 else: | |
129 self.private_key = serialization.load_pem_private_key( | |
130 private_key_pem.encode(), | |
131 password=None, | |
132 ) | |
133 self.public_key = self.private_key.public_key() | |
134 self.public_key_pem = self.public_key.public_bytes( | |
135 encoding=serialization.Encoding.PEM, | |
136 format=serialization.PublicFormat.SubjectPublicKeyInfo | |
137 ).decode() | |
138 | |
139 # params | |
140 # URL and port | |
141 self.public_url = self.host.memory.getConfig( | |
142 CONF_SECTION, "public_url" | |
143 ) or self.host.memory.getConfig( | |
144 CONF_SECTION, "xmpp_domain" | |
145 ) | |
146 if self.public_url is None: | |
147 log.error( | |
148 '"public_url" not set in configuration, this is mandatory to have' | |
149 "ActivityPub Gateway running. Please set this option it to public facing " | |
150 f"url in {CONF_SECTION!r} configuration section." | |
151 ) | |
152 return | |
153 if parse.urlparse(self.public_url).scheme: | |
154 log.error( | |
155 "Scheme must not be specified in \"public_url\", please remove it from " | |
156 "\"public_url\" configuration option. ActivityPub Gateway won't be run." | |
157 ) | |
158 return | |
159 self.http_port = int(self.host.memory.getConfig( | |
160 CONF_SECTION, 'http_port', 8123)) | |
161 connection_type = self.host.memory.getConfig( | |
162 CONF_SECTION, 'http_connection_type', 'https') | |
163 if connection_type not in ('http', 'https'): | |
164 raise exceptions.ConfigError( | |
165 'bad ap-gateay http_connection_type, you must use one of "http" or ' | |
166 '"https"' | |
167 ) | |
168 self.max_items = self.host.memory.getConfig( | |
169 CONF_SECTION, 'new_node_max_items', 50 | |
170 | |
171 ) | |
172 self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') | |
173 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") | |
174 # True (default) if we provide gateway only to entities/services from our server | |
175 self.local_only = C.bool( | |
176 self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) | |
177 ) | |
178 | |
179 # HTTP server launch | |
180 self.server = HTTPServer(self) | |
181 if connection_type == 'http': | |
182 reactor.listenTCP(self.http_port, self.server) | |
183 else: | |
184 options = tls.getOptionsFromConfig( | |
185 self.host.memory.config, CONF_SECTION) | |
186 tls.TLSOptionsCheck(options) | |
187 context_factory = tls.getTLSContextFactory(options) | |
188 reactor.listenSSL(self.http_port, self.server, context_factory) | |
189 | |
190 async def profileConnecting(self, client): | |
191 self.client = client | |
192 await self.init(client) | |
193 | |
194 async def apGet(self, url: str) -> dict: | |
195 """Retrieve AP JSON from given URL | |
196 | |
197 @raise error.StanzaError: "service-unavailable" is sent when something went wrong | |
198 with AP server | |
199 """ | |
200 try: | |
201 return await treq.json_content(await treq.get( | |
202 url, | |
203 headers = { | |
204 "Accept": [MEDIA_TYPE_AP], | |
205 "Content-Type": [MEDIA_TYPE_AP], | |
206 } | |
207 )) | |
208 except Exception as e: | |
209 raise error.StanzaError( | |
210 "service-unavailable", | |
211 text=f"Can't get AP data at {url}: {e}" | |
212 ) | |
213 | |
214 def mustEncode(self, text: str) -> bool: | |
215 """Indicate if a text must be period encoded""" | |
216 return ( | |
217 not RE_ALLOWED_UNQUOTED.match(text) | |
218 or text.startswith("___") | |
219 or "---" in text | |
220 ) | |
221 | |
222 def periodEncode(self, text: str) -> str: | |
223 """Period encode a text | |
224 | |
225 see [getJIDAndNode] for reasons of period encoding | |
226 """ | |
227 return ( | |
228 parse.quote(text, safe="") | |
229 .replace("---", "%2d%2d%2d") | |
230 .replace("___", "%5f%5f%5f") | |
231 .replace(".", "%2e") | |
232 .replace("~", "%7e") | |
233 .replace("%", ".") | |
234 ) | |
235 | |
236 async def getAPAccountFromJidAndNode( | |
237 self, | |
238 jid_: jid.JID, | |
239 node: Optional[str] | |
240 ) -> str: | |
241 """Construct AP account from JID and node | |
242 | |
243 The account construction will use escaping when necessary | |
244 """ | |
245 if not node or node == self._m.namespace: | |
246 node = None | |
247 | |
248 if node and not jid_.user and not self.mustEncode(node): | |
249 is_pubsub = await self.isPubsub(jid_) | |
250 # when we have a pubsub service, the user part can be used to set the node | |
251 # this produces more user-friendly AP accounts | |
252 if is_pubsub: | |
253 jid_.user = node | |
254 node = None | |
255 | |
256 is_local = self.isLocal(jid_) | |
257 user = jid_.user if is_local else jid_.userhost() | |
258 if user is None: | |
259 user = "" | |
260 account_elts = [] | |
261 if node and self.mustEncode(node) or self.mustEncode(user): | |
262 account_elts = ["___"] | |
263 if node: | |
264 node = self.periodEncode(node) | |
265 user = self.periodEncode(user) | |
266 | |
267 if not user: | |
268 raise exceptions.InternalError("there should be a user part") | |
269 | |
270 if node: | |
271 account_elts.extend((node, "---")) | |
272 | |
273 account_elts.extend(( | |
274 user, "@", jid_.host if is_local else self.client.jid.userhost() | |
275 )) | |
276 return "".join(account_elts) | |
277 | |
278 def isLocal(self, jid_: jid.JID) -> bool: | |
279 """Returns True if jid_ use a domain or subdomain of gateway's host""" | |
280 local_host = self.client.host.split(".") | |
281 assert local_host | |
282 return jid_.host.split(".")[-len(local_host):] == local_host | |
283 | |
284 async def isPubsub(self, jid_: jid.JID) -> bool: | |
285 """Indicate if a JID is a Pubsub service""" | |
286 host_disco = await self.host.getDiscoInfos(self.client, jid_) | |
287 return ( | |
288 ("pubsub", "service") in host_disco.identities | |
289 and not ("pubsub", "pep") in host_disco.identities | |
290 ) | |
291 | |
292 async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: | |
293 """Decode raw AP account handle to get XMPP JID and Pubsub Node | |
294 | |
295 Username are case insensitive. | |
296 | |
297 By default, the username correspond to local username (i.e. username from | |
298 component's server). | |
299 | |
300 If local name's domain is a pubsub service (and not PEP), the username is taken as | |
301 a pubsub node. | |
302 | |
303 If ``---`` is present in username, the part before is used as pubsub node, and the | |
304 rest as a JID user part. | |
305 | |
306 If username starts with ``___``, characters are encoded using period encoding | |
307 (i.e. percent encoding where a ``.`` is used instead of ``%``). | |
308 | |
309 This horror is necessary due to limitation in some AP implementation (notably | |
310 Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 | |
311 | |
312 examples: | |
313 | |
314 ``toto@example.org`` => JID = toto@example.org, node = None | |
315 | |
316 ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a | |
317 non-local JID, and will work only if setings ``local_only`` is False), node = None | |
318 | |
319 ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => | |
320 JID = pubsub.example.org, node = toto | |
321 | |
322 ``tata---toto@example.org`` => JID = toto@example.org, node = tata | |
323 | |
324 ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org | |
325 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 | |
326 | |
327 @param ap_account: ActivityPub account handle (``username@domain.tld``) | |
328 @return: service JID and pubsub node | |
329 if pubsub is None, default microblog pubsub node (and possibly other nodes | |
330 that plugins may hanlde) will be used | |
331 @raise ValueError: invalid account | |
332 @raise PermissionError: non local jid is used when gateway doesn't allow them | |
333 """ | |
334 if ap_account.count("@") != 1: | |
335 raise ValueError("Invalid AP account") | |
336 if ap_account.startswith("___"): | |
337 encoded = True | |
338 ap_account = ap_account[3:] | |
339 else: | |
340 encoded = False | |
341 | |
342 username, domain = ap_account.split("@") | |
343 | |
344 if "---" in username: | |
345 node, username = username.rsplit("---", 1) | |
346 else: | |
347 node = None | |
348 | |
349 if encoded: | |
350 username = parse.unquote( | |
351 RE_PERIOD_ENC.sub(r"%\g<hex>", username), | |
352 errors="strict" | |
353 ) | |
354 if node: | |
355 node = parse.unquote( | |
356 RE_PERIOD_ENC.sub(r"%\g<hex>", node), | |
357 errors="strict" | |
358 ) | |
359 | |
360 if "@" in username: | |
361 username, domain = username.rsplit("@", 1) | |
362 | |
363 if not node: | |
364 # we need to check host disco, because disco request to user may be | |
365 # blocked for privacy reason (see | |
366 # https://xmpp.org/extensions/xep-0030.html#security) | |
367 is_pubsub = await self.isPubsub(jid.JID(domain)) | |
368 | |
369 if is_pubsub: | |
370 # if the host is a pubsub service and not a PEP, we consider that username | |
371 # is in fact the node name | |
372 node = username | |
373 username = None | |
374 | |
375 jid_s = f"{username}@{domain}" if username else domain | |
376 try: | |
377 jid_ = jid.JID(jid_s) | |
378 except RuntimeError: | |
379 raise ValueError(f"Invalid jid: {jid_s!r}") | |
380 | |
381 if self.local_only and not self.isLocal(jid_): | |
382 raise exceptions.PermissionError( | |
383 "This gateway is configured to map only local entities and services" | |
384 ) | |
385 | |
386 return jid_, node | |
387 | |
388 def parseAPURL(self, url: str) -> Tuple[str, str]: | |
389 """Parse an URL leading to an AP endpoint | |
390 | |
391 @param url: URL to parse (schema is not mandatory) | |
392 @return: endpoint type and AP account | |
393 """ | |
394 path = parse.urlparse(url).path.lstrip("/") | |
395 type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) | |
396 return type_, parse.unquote(account) | |
397 | |
398 def buildAPURL(self, type_:str , *args: str) -> str: | |
399 """Build an AP endpoint URL | |
400 | |
401 @param type_: type of AP endpoing | |
402 @param arg: endpoint dependant arguments | |
403 """ | |
404 return parse.urljoin( | |
405 self.base_ap_url, | |
406 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) | |
407 ) | |
408 | |
409 async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: | |
410 """Sign a documentent and post it to AP server | |
411 | |
412 @param url: AP server endpoint | |
413 @param url_actor: URL generated by this gateway for local actor | |
414 @param doc: document to send | |
415 """ | |
416 p_url = parse.urlparse(url) | |
417 date = http.datetimeToString().decode() | |
418 body = json.dumps(doc).encode() | |
419 digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() | |
420 digest = f"sha-256={digest_hash}" | |
421 to_sign = ( | |
422 f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" | |
423 f"date: {date}\ndigest: {digest}" | |
424 ) | |
425 signature = base64.b64encode(self.private_key.sign( | |
426 to_sign.encode(), | |
427 # we have to use PKCS1v15 padding to be compatible with Mastodon | |
428 padding.PKCS1v15(), | |
429 hashes.SHA256() | |
430 )).decode() | |
431 h_signature = ( | |
432 f'keyId="{url_actor}",headers="(request-target) host date digest",' | |
433 f'signature="{signature}"' | |
434 ) | |
435 return await treq.post( | |
436 url, | |
437 body, | |
438 headers={ | |
439 "Host": [p_url.hostname], | |
440 "Date": [date], | |
441 "Digest": [digest], | |
442 "Signature": [h_signature], | |
443 } | |
444 ) | |
445 | |
446 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): | |
447 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore | |
448 service = jid.JID(service_s) | |
449 client = self.host.getClient(profile) | |
450 return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) | |
451 | |
452 async def getAPActorIdFromAccount(self, account: str) -> str: | |
453 """Retrieve account ID from it's handle using WebFinger | |
454 | |
455 @param account: AP handle (user@domain.tld) | |
456 @return: Actor ID (which is an URL) | |
457 """ | |
458 if account.count("@") != 1 or "/" in account: | |
459 raise ValueError("Invalid account: {account!r}") | |
460 host = account.split("@")[1] | |
461 try: | |
462 finger_data = await treq.json_content(await treq.get( | |
463 f"https://{host}/.well-known/webfinger?" | |
464 f"resource=acct:{parse.quote_plus(account)}", | |
465 )) | |
466 except Exception as e: | |
467 raise exceptions.DataError(f"Can't get webfinger data: {e}") | |
468 for link in finger_data.get("links", []): | |
469 if ( | |
470 link.get("type") == "application/activity+json" | |
471 and link.get("rel") == "self" | |
472 ): | |
473 href = link.get("href", "").strip() | |
474 if not href: | |
475 raise ValueError( | |
476 f"Invalid webfinger data for {account:r}: missing href" | |
477 ) | |
478 break | |
479 else: | |
480 raise ValueError( | |
481 f"No ActivityPub link found for {account!r}" | |
482 ) | |
483 return href | |
484 | |
485 async def getAPActorDataFromId(self, account: str) -> dict: | |
486 """Retrieve ActivityPub Actor data | |
487 | |
488 @param account: ActivityPub Actor identifier | |
489 """ | |
490 href = await self.getAPActorIdFromAccount(account) | |
491 return await self.apGet(href) | |
492 | |
493 @async_lru(maxsize=LRU_MAX_SIZE) | |
494 async def getAPAccountFromId(self, actor_id: str): | |
495 """Retrieve AP account from the ID URL | |
496 | |
497 @param actor_id: AP ID of the actor (URL to the actor data) | |
498 """ | |
499 url_parsed = parse.urlparse(actor_id) | |
500 actor_data = await self.apGet(actor_id) | |
501 username = actor_data.get("preferredUsername") | |
502 if not username: | |
503 raise exceptions.DataError( | |
504 'No "preferredUsername" field found, can\'t retrieve actor account' | |
505 ) | |
506 account = f"{username}@{url_parsed.hostname}" | |
507 # we try to retrieve the actor ID from the account to check it | |
508 found_id = await self.getAPActorIdFromAccount(account) | |
509 if found_id != actor_id: | |
510 # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 | |
511 msg = ( | |
512 f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " | |
513 f"({actor_id!r}). This AP instance doesn't seems to use " | |
514 '"preferredUsername" as we expect.' | |
515 ) | |
516 log.warning(msg) | |
517 raise exceptions.DataError(msg) | |
518 return account | |
519 | |
520 async def getAPItems( | |
521 self, | |
522 account: str, | |
523 max_items: Optional[int] = None, | |
524 chronological_pagination: bool = True, | |
525 after_id: Optional[str] = None, | |
526 start_index: Optional[int] = None, | |
527 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: | |
528 """Retrieve AP items and convert them to XMPP items | |
529 | |
530 @param account: AP account handle to get items from | |
531 @param max_items: maximum number of items to retrieve | |
532 retrieve all items by default | |
533 @param chronological_pagination: get pages in chronological order | |
534 AP use reversed chronological order for pagination, "first" page returns more | |
535 recent items. If "chronological_pagination" is True, "last" AP page will be | |
536 retrieved first. | |
537 @param after_id: if set, retrieve items starting from given ID | |
538 Due to ActivityStream Collection Paging limitations, this is inefficient and | |
539 if ``after_id`` is not already in cache, we have to retrieve every page until | |
540 we find it. | |
541 In most common cases, ``after_id`` should be in cache though (client usually | |
542 use known ID when in-order pagination is used). | |
543 @param start_index: start retrieving items from the one with given index | |
544 Due to ActivityStream Collection Paging limitations, this is inefficient and | |
545 all pages before the requested index will be retrieved to count items. | |
546 @return: XMPP Pubsub items and corresponding RSM Response | |
547 Items are always returned in chronological order in the result | |
548 """ | |
549 actor_data = await self.getAPActorDataFromId(account) | |
550 outbox = actor_data.get("outbox") | |
551 rsm_resp: Dict[str, Union[bool, int]] = {} | |
552 if not outbox: | |
553 raise exceptions.DataError(f"No outbox found for actor {account}") | |
554 outbox_data = await self.apGet(outbox) | |
555 try: | |
556 count = outbox_data["totalItems"] | |
557 except KeyError: | |
558 log.warning( | |
559 f'"totalItems" not found in outbox of {account}, defaulting to 20' | |
560 ) | |
561 count = 20 | |
562 else: | |
563 log.info(f"{account}'s outbox has {count} item(s)") | |
564 rsm_resp["count"] = count | |
565 | |
566 if start_index is not None: | |
567 assert chronological_pagination and after_id is None | |
568 if start_index >= count: | |
569 return [], rsm_resp | |
570 elif start_index == 0: | |
571 # this is the default behaviour | |
572 pass | |
573 elif start_index > 5000: | |
574 raise error.StanzaError( | |
575 "feature-not-implemented", | |
576 text="Maximum limit for previous_index has been reached, this limit" | |
577 "is set to avoid DoS" | |
578 ) | |
579 else: | |
580 # we'll convert "start_index" to "after_id", thus we need the item just | |
581 # before "start_index" | |
582 previous_index = start_index - 1 | |
583 retrieved_items = 0 | |
584 current_page = outbox_data["last"] | |
585 while retrieved_items < count: | |
586 page_data, items = await self.parseAPPage(current_page) | |
587 if not items: | |
588 log.warning(f"found an empty AP page at {current_page}") | |
589 return [], rsm_resp | |
590 page_start_idx = retrieved_items | |
591 retrieved_items += len(items) | |
592 if previous_index <= retrieved_items: | |
593 after_id = items[previous_index - page_start_idx]["id"] | |
594 break | |
595 try: | |
596 current_page = page_data["prev"] | |
597 except KeyError: | |
598 log.warning( | |
599 f"missing previous page link at {current_page}: {page_data!r}" | |
600 ) | |
601 raise error.StanzaError( | |
602 "service-unavailable", | |
603 "Error while retrieving previous page from AP service at " | |
604 f"{current_page}" | |
605 ) | |
606 | |
607 init_page = "last" if chronological_pagination else "first" | |
608 page = outbox_data.get(init_page) | |
609 if not page: | |
610 raise exceptions.DataError( | |
611 f"Initial page {init_page!r} not found for outbox {outbox}" | |
612 ) | |
613 items = [] | |
614 page_items = [] | |
615 retrieved_items = 0 | |
616 found_after_id = False | |
617 | |
618 while retrieved_items < count: | |
619 __, page_items = await self.parseAPPage(page) | |
620 retrieved_items += len(page_items) | |
621 if after_id is not None and not found_after_id: | |
622 # if we have an after_id, we ignore all items until the requested one is | |
623 # found | |
624 try: | |
625 limit_idx = [i["id"] for i in page_items].index(after_id) | |
626 except ValueError: | |
627 # if "after_id" is not found, we don't add any item from this page | |
628 log.debug(f"{after_id!r} not found at {page}, skipping") | |
629 else: | |
630 found_after_id = True | |
631 if chronological_pagination: | |
632 start_index = retrieved_items - len(page_items) + limit_idx + 1 | |
633 page_items = page_items[limit_idx+1:] | |
634 else: | |
635 start_index = count - (retrieved_items - len(page_items) + | |
636 limit_idx + 1) | |
637 page_items = page_items[:limit_idx] | |
638 items.extend(page_items) | |
639 else: | |
640 items.extend(page_items) | |
641 if max_items is not None and len(items) >= max_items: | |
642 if chronological_pagination: | |
643 items = items[:max_items] | |
644 else: | |
645 items = items[-max_items:] | |
646 break | |
647 page = outbox_data.get("prev" if chronological_pagination else "next") | |
648 if not page: | |
649 break | |
650 | |
651 if after_id is not None and not found_after_id: | |
652 raise error.StanzaError("item-not-found") | |
653 | |
654 if after_id is None: | |
655 rsm_resp["index"] = 0 if chronological_pagination else count - len(items) | |
656 | |
657 if start_index is not None: | |
658 rsm_resp["index"] = start_index | |
659 elif after_id is not None: | |
660 log.warning("Can't determine index of first element") | |
661 elif chronological_pagination: | |
662 rsm_resp["index"] = 0 | |
663 else: | |
664 rsm_resp["index"] = count - len(items) | |
665 if items: | |
666 rsm_resp.update({ | |
667 "first": items[0]["id"], | |
668 "last": items[-1]["id"] | |
669 }) | |
670 | |
671 return items, rsm.RSMResponse(**rsm_resp) | |
672 | |
673 async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: | |
674 """Convert AP objects from an AP page to XMPP items | |
675 | |
676 @param url: url linking and AP page | |
677 @return: page data, pubsub items | |
678 """ | |
679 page_data = await self.apGet(url) | |
680 ap_items = page_data.get("orderedItems") | |
681 if not ap_items: | |
682 log.warning('No "orderedItems" collection found') | |
683 return page_data, [] | |
684 items = [] | |
685 # AP Collections are in antichronological order, but we expect chronological in | |
686 # Pubsub, thus we reverse it | |
687 for ap_item in reversed(ap_items): | |
688 try: | |
689 ap_object, mb_data = await self.apItem2MBdata(ap_item) | |
690 except (exceptions.DataError, NotImplementedError, error.StanzaError): | |
691 continue | |
692 | |
693 item_elt = await self._m.data2entry( | |
694 self.client, mb_data, ap_object["id"], None, self._m.namespace | |
695 ) | |
696 item_elt["publisher"] = mb_data["author_jid"].full() | |
697 items.append(item_elt) | |
698 | |
699 return page_data, items | |
700 | |
701 async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: | |
702 """Convert AP item to microblog data | |
703 | |
704 @return: AP Item's Object and microblog data | |
705 @raise exceptions.DataError: something is invalid in the AP item | |
706 @raise NotImplemented: some AP data is not handled yet | |
707 @raise error.StanzaError: error while contacting the AP server | |
708 """ | |
709 ap_object = ap_item.get("object") | |
710 if not ap_object: | |
711 log.warning(f'No "object" found in AP item {ap_item!r}') | |
712 raise exceptions.DataError | |
713 if isinstance(ap_object, str): | |
714 ap_object = await self.apGet(ap_object) | |
715 obj_id = ap_object.get("id") | |
716 if not obj_id: | |
717 log.warning(f'No "id" found in AP object: {ap_object!r}') | |
718 raise exceptions.DataError | |
719 if ap_object.get("inReplyTo") is not None: | |
720 raise NotImplementedError | |
721 mb_data = {} | |
722 for ap_key, mb_key in AP_MB_MAP.items(): | |
723 data = ap_object.get(ap_key) | |
724 if data is None: | |
725 continue | |
726 mb_data[mb_key] = data | |
727 | |
728 # content | |
729 try: | |
730 language, content_xhtml = ap_object["contentMap"].popitem() | |
731 except (KeyError, AttributeError): | |
732 try: | |
733 mb_data["content_xhtml"] = mb_data["content"] | |
734 except KeyError: | |
735 log.warning(f"no content found:\n{ap_object!r}") | |
736 raise exceptions.DataError | |
737 else: | |
738 mb_data["language"] = language | |
739 mb_data["content_xhtml"] = content_xhtml | |
740 | |
741 # author | |
742 actor = ap_item.get("actor") | |
743 if not actor: | |
744 log.warning(f"no actor associated to object id {obj_id!r}") | |
745 raise exceptions.DataError | |
746 elif isinstance(actor, list): | |
747 # we only keep first item of list as author | |
748 # TODO: handle multiple actors | |
749 if len(actor) > 1: | |
750 log.warning("multiple actors are not managed") | |
751 actor = actor[0] | |
752 | |
753 if isinstance(actor, dict): | |
754 actor = actor.get("id") | |
755 if not actor: | |
756 log.warning(f"no actor id found: {actor!r}") | |
757 raise exceptions.DataError | |
758 | |
759 if isinstance(actor, str): | |
760 account = await self.getAPAccountFromId(actor) | |
761 mb_data["author"] = account.split("@", 1)[0] | |
762 author_jid = mb_data["author_jid"] = jid.JID( | |
763 None, | |
764 ( | |
765 self.host.plugins["XEP-0106"].escape(account), | |
766 self.client.jid.host, | |
767 None | |
768 ) | |
769 ) | |
770 else: | |
771 log.warning(f"unknown actor type found: {actor!r}") | |
772 raise exceptions.DataError | |
773 | |
774 # published/updated | |
775 for field in ("published", "updated"): | |
776 value = ap_object.get(field) | |
777 if not value and field == "updated": | |
778 value = ap_object.get("published") | |
779 if value: | |
780 try: | |
781 mb_data[field] = calendar.timegm( | |
782 dateutil.parser.parse(str(value)).utctimetuple() | |
783 ) | |
784 except dateutil.parser.ParserError as e: | |
785 log.warning(f"Can't parse {field!r} field: {e}") | |
786 return ap_object, mb_data | |
787 | |
788 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: | |
789 """Convert Libervia Microblog Data to ActivityPub item""" | |
790 if not mb_data.get("id"): | |
791 mb_data["id"] = shortuuid.uuid() | |
792 if not mb_data.get("author_jid"): | |
793 mb_data["author_jid"] = client.jid | |
794 ap_account = await self.getAPAccountFromJidAndNode( | |
795 jid.JID(mb_data["author_jid"]), | |
796 None | |
797 ) | |
798 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) | |
799 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) | |
800 return { | |
801 "@context": "https://www.w3.org/ns/activitystreams", | |
802 "id": url_item, | |
803 "type": "Create", | |
804 "actor": url_actor, | |
805 | |
806 "object": { | |
807 "id": url_item, | |
808 "type": "Note", | |
809 "published": utils.xmpp_date(mb_data["published"]), | |
810 "attributedTo": url_actor, | |
811 "content": mb_data.get("content_xhtml") or mb_data["content"], | |
812 "to": "https://www.w3.org/ns/activitystreams#Public" | |
813 } | |
814 } | |
815 | |
816 async def publishMessage( | |
817 self, | |
818 client: SatXMPPEntity, | |
819 mess_data: dict, | |
820 service: jid.JID | |
821 ) -> None: | |
822 """Send an AP message | |
823 | |
824 .. note:: | |
825 | |
826 This is a temporary method used for development only | |
827 | |
828 @param mess_data: message data. Following keys must be set: | |
829 | |
830 ``node`` | |
831 identifier of message which is being replied (this will | |
832 correspond to pubsub node in the future) | |
833 | |
834 ``content_xhtml`` or ``content`` | |
835 message body (respectively in XHTML or plain text) | |
836 | |
837 @param service: JID corresponding to the AP actor. | |
838 """ | |
839 if not service.user: | |
840 raise ValueError("service must have a local part") | |
841 account = self.host.plugins["XEP-0106"].unescape(service.user) | |
842 ap_actor_data = await self.getAPActorDataFromId(account) | |
843 | |
844 try: | |
845 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] | |
846 except KeyError: | |
847 raise exceptions.DataError("Can't get ActivityPub actor inbox") | |
848 | |
849 item_data = await self.mbdata2APitem(client, mess_data) | |
850 url_actor = item_data["object"]["attributedTo"] | |
851 resp = await self.signAndPost(inbox_url, url_actor, item_data) | |
852 if resp.code != 202: | |
853 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") |