Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_comp_ap_gateway/__init__.py@c23cad65ae99 |
children | c3b68fdc2de7 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia ActivityPub Gateway | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import base64 | |
20 import calendar | |
21 import hashlib | |
22 import json | |
23 from pathlib import Path | |
24 from pprint import pformat | |
25 import re | |
26 from typing import ( | |
27 Any, | |
28 Awaitable, | |
29 Callable, | |
30 Dict, | |
31 List, | |
32 Optional, | |
33 Set, | |
34 Tuple, | |
35 Union, | |
36 overload, | |
37 ) | |
38 from urllib import parse | |
39 | |
40 from cryptography.exceptions import InvalidSignature | |
41 from cryptography.hazmat.primitives import serialization | |
42 from cryptography.hazmat.primitives import hashes | |
43 from cryptography.hazmat.primitives.asymmetric import rsa | |
44 from cryptography.hazmat.primitives.asymmetric import padding | |
45 import dateutil | |
46 from dateutil.parser import parserinfo | |
47 import shortuuid | |
48 from sqlalchemy.exc import IntegrityError | |
49 import treq | |
50 from treq.response import _Response as TReqResponse | |
51 from twisted.internet import defer, reactor, threads | |
52 from twisted.web import http | |
53 from twisted.words.protocols.jabber import error, jid | |
54 from twisted.words.xish import domish | |
55 from wokkel import pubsub, rsm | |
56 | |
57 from libervia.backend.core import exceptions | |
58 from libervia.backend.core.constants import Const as C | |
59 from libervia.backend.core.core_types import SatXMPPEntity | |
60 from libervia.backend.core.i18n import _ | |
61 from libervia.backend.core.log import getLogger | |
62 from libervia.backend.memory import persistent | |
63 from libervia.backend.memory.sqla_mapping import History, SubscriptionState | |
64 from libervia.backend.tools import utils | |
65 from libervia.backend.tools.common import data_format, date_utils, tls, uri | |
66 from libervia.backend.tools.common.async_utils import async_lru | |
67 | |
68 from .ad_hoc import APAdHocService | |
69 from .events import APEvents | |
70 from .constants import ( | |
71 ACTIVITY_OBJECT_MANDATORY, | |
72 ACTIVITY_TARGET_MANDATORY, | |
73 ACTIVITY_TYPES, | |
74 ACTIVITY_TYPES_LOWER, | |
75 COMMENTS_MAX_PARENTS, | |
76 CONF_SECTION, | |
77 IMPORT_NAME, | |
78 LRU_MAX_SIZE, | |
79 MEDIA_TYPE_AP, | |
80 NS_AP, | |
81 NS_AP_PUBLIC, | |
82 PUBLIC_TUPLE, | |
83 TYPE_ACTOR, | |
84 TYPE_EVENT, | |
85 TYPE_FOLLOWERS, | |
86 TYPE_ITEM, | |
87 TYPE_LIKE, | |
88 TYPE_MENTION, | |
89 TYPE_REACTION, | |
90 TYPE_TOMBSTONE, | |
91 TYPE_JOIN, | |
92 TYPE_LEAVE | |
93 ) | |
94 from .http_server import HTTPServer | |
95 from .pubsub_service import APPubsubService | |
96 from .regex import RE_MENTION | |
97 | |
98 | |
99 log = getLogger(__name__) | |
100 | |
101 IMPORT_NAME = "ap-gateway" | |
102 | |
103 PLUGIN_INFO = { | |
104 C.PI_NAME: "ActivityPub Gateway component", | |
105 C.PI_IMPORT_NAME: IMPORT_NAME, | |
106 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | |
107 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | |
108 C.PI_PROTOCOLS: [], | |
109 C.PI_DEPENDENCIES: [ | |
110 "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", | |
111 "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", | |
112 "XEP-0447", "XEP-0471", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY" | |
113 ], | |
114 C.PI_RECOMMENDATIONS: [], | |
115 C.PI_MAIN: "APGateway", | |
116 C.PI_HANDLER: C.BOOL_TRUE, | |
117 C.PI_DESCRIPTION: _( | |
118 "Gateway for bidirectional communication between XMPP and ActivityPub." | |
119 ), | |
120 } | |
121 | |
122 HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})" | |
123 RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") | |
124 RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") | |
125 RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") | |
126 | |
127 | |
128 class APGateway: | |
129 IMPORT_NAME = IMPORT_NAME | |
130 # show data send or received through HTTP, used for debugging | |
131 # 1: log POST objects | |
132 # 2: log POST and GET objects | |
133 # 3: log POST and GET objects with HTTP headers for GET requests | |
134 verbose = 0 | |
135 | |
136 def __init__(self, host): | |
137 self.host = host | |
138 self.initialised = False | |
139 self.client = None | |
140 self._p = host.plugins["XEP-0060"] | |
141 self._a = host.plugins["XEP-0084"] | |
142 self._e = host.plugins["XEP-0106"] | |
143 self._m = host.plugins["XEP-0277"] | |
144 self._v = host.plugins["XEP-0292"] | |
145 self._refs = host.plugins["XEP-0372"] | |
146 self._r = host.plugins["XEP-0424"] | |
147 self._sfs = host.plugins["XEP-0447"] | |
148 self._pps = host.plugins["XEP-0465"] | |
149 self._pa = host.plugins["XEP-0470"] | |
150 self._c = host.plugins["PUBSUB_CACHE"] | |
151 self._t = host.plugins["TEXT_SYNTAXES"] | |
152 self._i = host.plugins["IDENTITY"] | |
153 self._events = host.plugins["XEP-0471"] | |
154 self._p.add_managed_node( | |
155 "", | |
156 items_cb=self._items_received, | |
157 # we want to be sure that the callbacks are launched before pubsub cache's | |
158 # one, as we need to inspect items before they are actually removed from cache | |
159 # or updated | |
160 priority=1000 | |
161 ) | |
162 self.pubsub_service = APPubsubService(self) | |
163 self.ad_hoc = APAdHocService(self) | |
164 self.ap_events = APEvents(self) | |
165 host.trigger.add("message_received", self._message_received_trigger, priority=-1000) | |
166 host.trigger.add("XEP-0424_retractReceived", self._on_message_retract) | |
167 host.trigger.add("XEP-0372_ref_received", self._on_reference_received) | |
168 | |
169 host.bridge.add_method( | |
170 "ap_send", | |
171 ".plugin", | |
172 in_sign="sss", | |
173 out_sign="", | |
174 method=self._publish_message, | |
175 async_=True, | |
176 ) | |
177 | |
178 def get_handler(self, __): | |
179 return self.pubsub_service | |
180 | |
181 async def init(self, client): | |
182 if self.initialised: | |
183 return | |
184 | |
185 self.initialised = True | |
186 log.info(_("ActivityPub Gateway initialization")) | |
187 | |
188 # RSA keys | |
189 stored_data = await self.host.memory.storage.get_privates( | |
190 IMPORT_NAME, ["rsa_key"], profile=client.profile | |
191 ) | |
192 private_key_pem = stored_data.get("rsa_key") | |
193 if private_key_pem is None: | |
194 self.private_key = await threads.deferToThread( | |
195 rsa.generate_private_key, | |
196 public_exponent=65537, | |
197 key_size=4096, | |
198 ) | |
199 private_key_pem = self.private_key.private_bytes( | |
200 encoding=serialization.Encoding.PEM, | |
201 format=serialization.PrivateFormat.PKCS8, | |
202 encryption_algorithm=serialization.NoEncryption() | |
203 ).decode() | |
204 await self.host.memory.storage.set_private_value( | |
205 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile | |
206 ) | |
207 else: | |
208 self.private_key = serialization.load_pem_private_key( | |
209 private_key_pem.encode(), | |
210 password=None, | |
211 ) | |
212 self.public_key = self.private_key.public_key() | |
213 self.public_key_pem = self.public_key.public_bytes( | |
214 encoding=serialization.Encoding.PEM, | |
215 format=serialization.PublicFormat.SubjectPublicKeyInfo | |
216 ).decode() | |
217 | |
218 # params | |
219 # URL and port | |
220 self.public_url = self.host.memory.config_get( | |
221 CONF_SECTION, "public_url" | |
222 ) or self.host.memory.config_get( | |
223 CONF_SECTION, "xmpp_domain" | |
224 ) | |
225 if self.public_url is None: | |
226 log.error( | |
227 '"public_url" not set in configuration, this is mandatory to have' | |
228 "ActivityPub Gateway running. Please set this option it to public facing " | |
229 f"url in {CONF_SECTION!r} configuration section." | |
230 ) | |
231 return | |
232 if parse.urlparse(self.public_url).scheme: | |
233 log.error( | |
234 "Scheme must not be specified in \"public_url\", please remove it from " | |
235 "\"public_url\" configuration option. ActivityPub Gateway won't be run." | |
236 ) | |
237 return | |
238 self.http_port = int(self.host.memory.config_get( | |
239 CONF_SECTION, 'http_port', 8123)) | |
240 connection_type = self.host.memory.config_get( | |
241 CONF_SECTION, 'http_connection_type', 'https') | |
242 if connection_type not in ('http', 'https'): | |
243 raise exceptions.ConfigError( | |
244 'bad ap-gateay http_connection_type, you must use one of "http" or ' | |
245 '"https"' | |
246 ) | |
247 self.max_items = int(self.host.memory.config_get( | |
248 CONF_SECTION, 'new_node_max_items', 50 | |
249 | |
250 )) | |
251 self.comments_max_depth = int(self.host.memory.config_get( | |
252 CONF_SECTION, 'comments_max_depth', 0 | |
253 )) | |
254 self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap') | |
255 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") | |
256 # True (default) if we provide gateway only to entities/services from our server | |
257 self.local_only = C.bool( | |
258 self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE) | |
259 ) | |
260 # if True (default), mention will be parsed in non-private content coming from | |
261 # XMPP. This is necessary as XEP-0372 are coming separately from item where the | |
262 # mention is done, which is hard to impossible to translate to ActivityPub (where | |
263 # mention specified inside the item directly). See documentation for details. | |
264 self.auto_mentions = C.bool( | |
265 self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE) | |
266 ) | |
267 | |
268 html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get( | |
269 CONF_SECTION, 'html_redirect_dict', {} | |
270 ) | |
271 self.html_redirect: Dict[str, List[dict]] = {} | |
272 for url_type, target in html_redirect.items(): | |
273 if isinstance(target, str): | |
274 target = {"url": target} | |
275 elif not isinstance(target, dict): | |
276 raise exceptions.ConfigError( | |
277 f"html_redirect target must be a URL or a dict, not {target!r}" | |
278 ) | |
279 filters = target.setdefault("filters", {}) | |
280 if "url" not in target: | |
281 log.warning(f"invalid HTML redirection, missing target URL: {target}") | |
282 continue | |
283 # a slash in the url_type is a syntactic shortcut to have a node filter | |
284 if "/" in url_type: | |
285 url_type, node_filter = url_type.split("/", 1) | |
286 filters["node"] = node_filter | |
287 self.html_redirect.setdefault(url_type, []).append(target) | |
288 | |
289 # HTTP server launch | |
290 self.server = HTTPServer(self) | |
291 if connection_type == 'http': | |
292 reactor.listenTCP(self.http_port, self.server) | |
293 else: | |
294 options = tls.get_options_from_config( | |
295 self.host.memory.config, CONF_SECTION) | |
296 tls.tls_options_check(options) | |
297 context_factory = tls.get_tls_context_factory(options) | |
298 reactor.listenSSL(self.http_port, self.server, context_factory) | |
299 | |
300 async def profile_connecting(self, client): | |
301 self.client = client | |
302 client.sendHistory = True | |
303 client._ap_storage = persistent.LazyPersistentBinaryDict( | |
304 IMPORT_NAME, | |
305 client.profile | |
306 ) | |
307 await self.init(client) | |
308 | |
309 def profile_connected(self, client): | |
310 self.ad_hoc.init(client) | |
311 | |
312 async def _items_received( | |
313 self, | |
314 client: SatXMPPEntity, | |
315 itemsEvent: pubsub.ItemsEvent | |
316 ) -> None: | |
317 """Callback called when pubsub items are received | |
318 | |
319 if the items are adressed to a JID corresponding to an AP actor, they are | |
320 converted to AP items and sent to the corresponding AP server. | |
321 | |
322 If comments nodes are linked, they are automatically subscribed to get new items | |
323 from there too. | |
324 """ | |
325 if client != self.client: | |
326 return | |
327 # we need recipient as JID and not gateway own JID to be able to use methods such | |
328 # as "subscribe" | |
329 client = self.client.get_virtual_client(itemsEvent.sender) | |
330 recipient = itemsEvent.recipient | |
331 if not recipient.user: | |
332 log.debug("ignoring items event without local part specified") | |
333 return | |
334 | |
335 ap_account = self._e.unescape(recipient.user) | |
336 | |
337 if self._pa.is_attachment_node(itemsEvent.nodeIdentifier): | |
338 await self.convert_and_post_attachments( | |
339 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, | |
340 itemsEvent.items | |
341 ) | |
342 else: | |
343 await self.convert_and_post_items( | |
344 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, | |
345 itemsEvent.items | |
346 ) | |
347 | |
348 async def get_virtual_client(self, actor_id: str) -> SatXMPPEntity: | |
349 """Get client for this component with a specified jid | |
350 | |
351 This is needed to perform operations with the virtual JID corresponding to the AP | |
352 actor instead of the JID of the gateway itself. | |
353 @param actor_id: ID of the actor | |
354 @return: virtual client | |
355 """ | |
356 local_jid = await self.get_jid_from_id(actor_id) | |
357 return self.client.get_virtual_client(local_jid) | |
358 | |
359 def is_activity(self, data: dict) -> bool: | |
360 """Return True if the data has an activity type""" | |
361 try: | |
362 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER | |
363 except (KeyError, TypeError): | |
364 return False | |
365 | |
366 async def ap_get(self, url: str) -> dict: | |
367 """Retrieve AP JSON from given URL | |
368 | |
369 @raise error.StanzaError: "service-unavailable" is sent when something went wrong | |
370 with AP server | |
371 """ | |
372 resp = await treq.get( | |
373 url, | |
374 headers = { | |
375 "Accept": [MEDIA_TYPE_AP], | |
376 "Content-Type": [MEDIA_TYPE_AP], | |
377 } | |
378 ) | |
379 if resp.code >= 300: | |
380 text = await resp.text() | |
381 if resp.code == 404: | |
382 raise exceptions.NotFound(f"Can't find resource at {url}") | |
383 else: | |
384 msg = f"HTTP error {resp.code} (url: {url}): {text}" | |
385 raise exceptions.ExternalRequestError(msg) | |
386 try: | |
387 return await treq.json_content(resp) | |
388 except Exception as e: | |
389 raise error.StanzaError( | |
390 "service-unavailable", | |
391 text=f"Can't get AP data at {url}: {e}" | |
392 ) | |
393 | |
394 @overload | |
395 async def ap_get_object(self, data: dict, key: str) -> Optional[dict]: | |
396 ... | |
397 | |
398 @overload | |
399 async def ap_get_object( | |
400 self, data: Union[str, dict], key: None = None | |
401 ) -> dict: | |
402 ... | |
403 | |
404 async def ap_get_object(self, data, key = None): | |
405 """Retrieve an AP object, dereferencing when necessary | |
406 | |
407 This method is to be used with attributes marked as "Functional" in | |
408 https://www.w3.org/TR/activitystreams-vocabulary | |
409 @param data: AP object where an other object is looked for, or the object itself | |
410 @param key: name of the object to look for, or None if data is the object directly | |
411 @return: found object if any | |
412 """ | |
413 if key is not None: | |
414 value = data.get(key) | |
415 else: | |
416 value = data | |
417 if value is None: | |
418 if key is None: | |
419 raise ValueError("None can't be used with ap_get_object is key is None") | |
420 return None | |
421 elif isinstance(value, dict): | |
422 return value | |
423 elif isinstance(value, str): | |
424 if self.is_local_url(value): | |
425 return await self.ap_get_local_object(value) | |
426 else: | |
427 return await self.ap_get(value) | |
428 else: | |
429 raise NotImplementedError( | |
430 "was expecting a string or a dict, got {type(value)}: {value!r}}" | |
431 ) | |
432 | |
433 async def ap_get_local_object( | |
434 self, | |
435 url: str | |
436 ) -> dict: | |
437 """Retrieve or generate local object | |
438 | |
439 for now, only handle XMPP items to convert to AP | |
440 """ | |
441 url_type, url_args = self.parse_apurl(url) | |
442 if url_type == TYPE_ITEM: | |
443 try: | |
444 account, item_id = url_args | |
445 except ValueError: | |
446 raise ValueError(f"invalid URL: {url}") | |
447 author_jid, node = await self.get_jid_and_node(account) | |
448 if node is None: | |
449 node = self._m.namespace | |
450 cached_node = await self.host.memory.storage.get_pubsub_node( | |
451 self.client, author_jid, node | |
452 ) | |
453 if not cached_node: | |
454 log.debug(f"node {node!r} at {author_jid} is not found in cache") | |
455 found_item = None | |
456 else: | |
457 cached_items, __ = await self.host.memory.storage.get_items( | |
458 cached_node, item_ids=[item_id] | |
459 ) | |
460 if not cached_items: | |
461 log.debug( | |
462 f"item {item_id!r} of {node!r} at {author_jid} is not found in " | |
463 "cache" | |
464 ) | |
465 found_item = None | |
466 else: | |
467 found_item = cached_items[0].data | |
468 | |
469 if found_item is None: | |
470 # the node is not in cache, we have to make a request to retrieve the item | |
471 # If the node doesn't exist, get_items will raise a NotFound exception | |
472 found_items, __ = await self._p.get_items( | |
473 self.client, author_jid, node, item_ids=[item_id] | |
474 ) | |
475 try: | |
476 found_item = found_items[0] | |
477 except IndexError: | |
478 raise exceptions.NotFound(f"requested item at {url} can't be found") | |
479 | |
480 if node.startswith(self._events.namespace): | |
481 # this is an event | |
482 event_data = self._events.event_elt_2_event_data(found_item) | |
483 ap_item = await self.ap_events.event_data_2_ap_item( | |
484 event_data, author_jid | |
485 ) | |
486 # the URL must return the object and not the activity | |
487 ap_item["object"]["@context"] = ap_item["@context"] | |
488 return ap_item["object"] | |
489 else: | |
490 # this is a blog item | |
491 mb_data = await self._m.item_2_mb_data( | |
492 self.client, found_item, author_jid, node | |
493 ) | |
494 ap_item = await self.mb_data_2_ap_item(self.client, mb_data) | |
495 # the URL must return the object and not the activity | |
496 return ap_item["object"] | |
497 else: | |
498 raise NotImplementedError( | |
499 'only object from "item" URLs can be retrieved for now' | |
500 ) | |
501 | |
502 async def ap_get_list( | |
503 self, | |
504 data: dict, | |
505 key: str, | |
506 only_ids: bool = False | |
507 ) -> Optional[List[Dict[str, Any]]]: | |
508 """Retrieve a list of objects from AP data, dereferencing when necessary | |
509 | |
510 This method is to be used with non functional vocabularies. Use ``ap_get_object`` | |
511 otherwise. | |
512 If the value is a dictionary, it will be wrapped in a list | |
513 @param data: AP object where a list of objects is looked for | |
514 @param key: key of the list to look for | |
515 @param only_ids: if Trye, only items IDs are retrieved | |
516 @return: list of objects, or None if the key is not present | |
517 """ | |
518 value = data.get(key) | |
519 if value is None: | |
520 return None | |
521 elif isinstance(value, str): | |
522 if self.is_local_url(value): | |
523 value = await self.ap_get_local_object(value) | |
524 else: | |
525 value = await self.ap_get(value) | |
526 if isinstance(value, dict): | |
527 return [value] | |
528 if not isinstance(value, list): | |
529 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") | |
530 if only_ids: | |
531 return [ | |
532 {"id": v["id"]} if isinstance(v, dict) else {"id": v} | |
533 for v in value | |
534 ] | |
535 else: | |
536 return [await self.ap_get_object(i) for i in value] | |
537 | |
538 async def ap_get_actors( | |
539 self, | |
540 data: dict, | |
541 key: str, | |
542 as_account: bool = True | |
543 ) -> List[str]: | |
544 """Retrieve AP actors from data | |
545 | |
546 @param data: AP object containing a field with actors | |
547 @param key: field to use to retrieve actors | |
548 @param as_account: if True returns account handles, otherwise will return actor | |
549 IDs | |
550 @raise exceptions.DataError: there is not actor data or it is invalid | |
551 """ | |
552 value = data.get(key) | |
553 if value is None: | |
554 raise exceptions.DataError( | |
555 f"no actor associated to object {data.get('id')!r}" | |
556 ) | |
557 elif isinstance(value, dict): | |
558 actor_id = value.get("id") | |
559 if actor_id is None: | |
560 raise exceptions.DataError( | |
561 f"invalid actor associated to object {data.get('id')!r}: {value!r}" | |
562 ) | |
563 value = [actor_id] | |
564 elif isinstance(value, str): | |
565 value = [value] | |
566 elif isinstance(value, list): | |
567 try: | |
568 value = [a if isinstance(a, str) else a["id"] for a in value] | |
569 except (TypeError, KeyError): | |
570 raise exceptions.DataError( | |
571 f"invalid actors list to object {data.get('id')!r}: {value!r}" | |
572 ) | |
573 if not value: | |
574 raise exceptions.DataError( | |
575 f"list of actors is empty" | |
576 ) | |
577 if as_account: | |
578 return [await self.get_ap_account_from_id(actor_id) for actor_id in value] | |
579 else: | |
580 return value | |
581 | |
582 async def ap_get_sender_actor( | |
583 self, | |
584 data: dict, | |
585 ) -> str: | |
586 """Retrieve actor who sent data | |
587 | |
588 This is done by checking "actor" field first, then "attributedTo" field. | |
589 Only the first found actor is taken into account | |
590 @param data: AP object | |
591 @return: actor id of the sender | |
592 @raise exceptions.NotFound: no actor has been found in data | |
593 """ | |
594 try: | |
595 actors = await self.ap_get_actors(data, "actor", as_account=False) | |
596 except exceptions.DataError: | |
597 actors = None | |
598 if not actors: | |
599 try: | |
600 actors = await self.ap_get_actors(data, "attributedTo", as_account=False) | |
601 except exceptions.DataError: | |
602 raise exceptions.NotFound( | |
603 'actor not specified in "actor" or "attributedTo"' | |
604 ) | |
605 try: | |
606 return actors[0] | |
607 except IndexError: | |
608 raise exceptions.NotFound("list of actors is empty") | |
609 | |
610 def must_encode(self, text: str) -> bool: | |
611 """Indicate if a text must be period encoded""" | |
612 return ( | |
613 not RE_ALLOWED_UNQUOTED.match(text) | |
614 or text.startswith("___") | |
615 or "---" in text | |
616 ) | |
617 | |
618 def period_encode(self, text: str) -> str: | |
619 """Period encode a text | |
620 | |
621 see [get_jid_and_node] for reasons of period encoding | |
622 """ | |
623 return ( | |
624 parse.quote(text, safe="") | |
625 .replace("---", "%2d%2d%2d") | |
626 .replace("___", "%5f%5f%5f") | |
627 .replace(".", "%2e") | |
628 .replace("~", "%7e") | |
629 .replace("%", ".") | |
630 ) | |
631 | |
632 async def get_ap_account_from_jid_and_node( | |
633 self, | |
634 jid_: jid.JID, | |
635 node: Optional[str] | |
636 ) -> str: | |
637 """Construct AP account from JID and node | |
638 | |
639 The account construction will use escaping when necessary | |
640 """ | |
641 if not node or node == self._m.namespace: | |
642 node = None | |
643 | |
644 if self.client is None: | |
645 raise exceptions.InternalError("Client is not set yet") | |
646 | |
647 if self.is_virtual_jid(jid_): | |
648 # this is an proxy JID to an AP Actor | |
649 return self._e.unescape(jid_.user) | |
650 | |
651 if node and not jid_.user and not self.must_encode(node): | |
652 is_pubsub = await self.is_pubsub(jid_) | |
653 # when we have a pubsub service, the user part can be used to set the node | |
654 # this produces more user-friendly AP accounts | |
655 if is_pubsub: | |
656 jid_.user = node | |
657 node = None | |
658 | |
659 is_local = self.is_local(jid_) | |
660 user = jid_.user if is_local else jid_.userhost() | |
661 if user is None: | |
662 user = "" | |
663 account_elts = [] | |
664 if node and self.must_encode(node) or self.must_encode(user): | |
665 account_elts = ["___"] | |
666 if node: | |
667 node = self.period_encode(node) | |
668 user = self.period_encode(user) | |
669 | |
670 if not user: | |
671 raise exceptions.InternalError("there should be a user part") | |
672 | |
673 if node: | |
674 account_elts.extend((node, "---")) | |
675 | |
676 account_elts.extend(( | |
677 user, "@", jid_.host if is_local else self.client.jid.userhost() | |
678 )) | |
679 return "".join(account_elts) | |
680 | |
681 def is_local(self, jid_: jid.JID) -> bool: | |
682 """Returns True if jid_ use a domain or subdomain of gateway's host""" | |
683 local_host = self.client.host.split(".") | |
684 assert local_host | |
685 return jid_.host.split(".")[-len(local_host):] == local_host | |
686 | |
687 async def is_pubsub(self, jid_: jid.JID) -> bool: | |
688 """Indicate if a JID is a Pubsub service""" | |
689 host_disco = await self.host.get_disco_infos(self.client, jid_) | |
690 return ( | |
691 ("pubsub", "service") in host_disco.identities | |
692 and not ("pubsub", "pep") in host_disco.identities | |
693 ) | |
694 | |
695 async def get_jid_and_node(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: | |
696 """Decode raw AP account handle to get XMPP JID and Pubsub Node | |
697 | |
698 Username are case insensitive. | |
699 | |
700 By default, the username correspond to local username (i.e. username from | |
701 component's server). | |
702 | |
703 If local name's domain is a pubsub service (and not PEP), the username is taken as | |
704 a pubsub node. | |
705 | |
706 If ``---`` is present in username, the part before is used as pubsub node, and the | |
707 rest as a JID user part. | |
708 | |
709 If username starts with ``___``, characters are encoded using period encoding | |
710 (i.e. percent encoding where a ``.`` is used instead of ``%``). | |
711 | |
712 This horror is necessary due to limitation in some AP implementation (notably | |
713 Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 | |
714 | |
715 examples: | |
716 | |
717 ``toto@example.org`` => JID = toto@example.org, node = None | |
718 | |
719 ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a | |
720 non-local JID, and will work only if setings ``local_only`` is False), node = None | |
721 | |
722 ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => | |
723 JID = pubsub.example.org, node = toto | |
724 | |
725 ``tata---toto@example.org`` => JID = toto@example.org, node = tata | |
726 | |
727 ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org | |
728 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 | |
729 | |
730 @param ap_account: ActivityPub account handle (``username@domain.tld``) | |
731 @return: service JID and pubsub node | |
732 if pubsub node is None, default microblog pubsub node (and possibly other | |
733 nodes that plugins may hanlde) will be used | |
734 @raise ValueError: invalid account | |
735 @raise PermissionError: non local jid is used when gateway doesn't allow them | |
736 """ | |
737 if ap_account.count("@") != 1: | |
738 raise ValueError("Invalid AP account") | |
739 if ap_account.startswith("___"): | |
740 encoded = True | |
741 ap_account = ap_account[3:] | |
742 else: | |
743 encoded = False | |
744 | |
745 username, domain = ap_account.split("@") | |
746 | |
747 if "---" in username: | |
748 node, username = username.rsplit("---", 1) | |
749 else: | |
750 node = None | |
751 | |
752 if encoded: | |
753 username = parse.unquote( | |
754 RE_PERIOD_ENC.sub(r"%\g<hex>", username), | |
755 errors="strict" | |
756 ) | |
757 if node: | |
758 node = parse.unquote( | |
759 RE_PERIOD_ENC.sub(r"%\g<hex>", node), | |
760 errors="strict" | |
761 ) | |
762 | |
763 if "@" in username: | |
764 username, domain = username.rsplit("@", 1) | |
765 | |
766 if not node: | |
767 # we need to check host disco, because disco request to user may be | |
768 # blocked for privacy reason (see | |
769 # https://xmpp.org/extensions/xep-0030.html#security) | |
770 is_pubsub = await self.is_pubsub(jid.JID(domain)) | |
771 | |
772 if is_pubsub: | |
773 # if the host is a pubsub service and not a PEP, we consider that username | |
774 # is in fact the node name | |
775 node = username | |
776 username = None | |
777 | |
778 jid_s = f"{username}@{domain}" if username else domain | |
779 try: | |
780 jid_ = jid.JID(jid_s) | |
781 except RuntimeError: | |
782 raise ValueError(f"Invalid jid: {jid_s!r}") | |
783 | |
784 if self.local_only and not self.is_local(jid_): | |
785 raise exceptions.PermissionError( | |
786 "This gateway is configured to map only local entities and services" | |
787 ) | |
788 | |
789 return jid_, node | |
790 | |
791 def get_local_jid_from_account(self, account: str) -> jid.JID: | |
792 """Compute JID linking to an AP account | |
793 | |
794 The local jid is computer by escaping AP actor handle and using it as local part | |
795 of JID, where domain part is this gateway own JID | |
796 """ | |
797 return jid.JID( | |
798 None, | |
799 ( | |
800 self._e.escape(account), | |
801 self.client.jid.host, | |
802 None | |
803 ) | |
804 ) | |
805 | |
806 async def get_jid_from_id(self, actor_id: str) -> jid.JID: | |
807 """Compute JID linking to an AP Actor ID | |
808 | |
809 The local jid is computer by escaping AP actor handle and using it as local part | |
810 of JID, where domain part is this gateway own JID | |
811 If the actor_id comes from local server (checked with self.public_url), it means | |
812 that we have an XMPP entity, and the original JID is returned | |
813 """ | |
814 if self.is_local_url(actor_id): | |
815 request_type, extra_args = self.parse_apurl(actor_id) | |
816 if request_type != TYPE_ACTOR or len(extra_args) != 1: | |
817 raise ValueError(f"invalid actor id: {actor_id!r}") | |
818 actor_jid, __ = await self.get_jid_and_node(extra_args[0]) | |
819 return actor_jid | |
820 | |
821 account = await self.get_ap_account_from_id(actor_id) | |
822 return self.get_local_jid_from_account(account) | |
823 | |
824 def parse_apurl(self, url: str) -> Tuple[str, List[str]]: | |
825 """Parse an URL leading to an AP endpoint | |
826 | |
827 @param url: URL to parse (schema is not mandatory) | |
828 @return: endpoint type and extra arguments | |
829 """ | |
830 path = parse.urlparse(url).path.lstrip("/") | |
831 type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/") | |
832 return type_, [parse.unquote(a) for a in extra_args] | |
833 | |
834 def build_apurl(self, type_:str , *args: str) -> str: | |
835 """Build an AP endpoint URL | |
836 | |
837 @param type_: type of AP endpoing | |
838 @param arg: endpoint dependant arguments | |
839 """ | |
840 return parse.urljoin( | |
841 self.base_ap_url, | |
842 str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))) | |
843 ) | |
844 | |
845 def is_local_url(self, url: str) -> bool: | |
846 """Tells if an URL link to this component | |
847 | |
848 ``public_url`` and ``ap_path`` are used to check the URL | |
849 """ | |
850 return url.startswith(self.base_ap_url) | |
851 | |
852 def is_virtual_jid(self, jid_: jid.JID) -> bool: | |
853 """Tell if a JID is an AP actor mapped through this gateway""" | |
854 return jid_.host == self.client.jid.userhost() | |
855 | |
856 def build_signature_header(self, values: Dict[str, str]) -> str: | |
857 """Build key="<value>" signature header from signature data""" | |
858 fields = [] | |
859 for key, value in values.items(): | |
860 if key not in ("(created)", "(expired)"): | |
861 if '"' in value: | |
862 raise NotImplementedError( | |
863 "string escaping is not implemented, double-quote can't be used " | |
864 f"in {value!r}" | |
865 ) | |
866 value = f'"{value}"' | |
867 fields.append(f"{key}={value}") | |
868 | |
869 return ",".join(fields) | |
870 | |
871 def get_digest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]: | |
872 """Get digest data to use in header and signature | |
873 | |
874 @param body: body of the request | |
875 @return: hash name and digest | |
876 """ | |
877 if algo != "SHA-256": | |
878 raise NotImplementedError("only SHA-256 is implemented for now") | |
879 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() | |
880 | |
881 @async_lru(maxsize=LRU_MAX_SIZE) | |
882 async def get_actor_data(self, actor_id) -> dict: | |
883 """Retrieve actor data with LRU cache""" | |
884 return await self.ap_get(actor_id) | |
885 | |
886 @async_lru(maxsize=LRU_MAX_SIZE) | |
887 async def get_actor_pub_key_data( | |
888 self, | |
889 actor_id: str | |
890 ) -> Tuple[str, str, rsa.RSAPublicKey]: | |
891 """Retrieve Public Key data from actor ID | |
892 | |
893 @param actor_id: actor ID (url) | |
894 @return: key_id, owner and public_key | |
895 @raise KeyError: publicKey is missing from actor data | |
896 """ | |
897 actor_data = await self.get_actor_data(actor_id) | |
898 pub_key_data = actor_data["publicKey"] | |
899 key_id = pub_key_data["id"] | |
900 owner = pub_key_data["owner"] | |
901 pub_key_pem = pub_key_data["publicKeyPem"] | |
902 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) | |
903 return key_id, owner, pub_key | |
904 | |
905 def create_activity( | |
906 self, | |
907 activity: str, | |
908 actor_id: str, | |
909 object_: Optional[Union[str, dict]] = None, | |
910 target: Optional[Union[str, dict]] = None, | |
911 activity_id: Optional[str] = None, | |
912 **kwargs, | |
913 ) -> Dict[str, Any]: | |
914 """Generate base data for an activity | |
915 | |
916 @param activity: one of ACTIVITY_TYPES | |
917 @param actor_id: AP actor ID of the sender | |
918 @param object_: content of "object" field | |
919 @param target: content of "target" field | |
920 @param activity_id: ID to use for the activity | |
921 if not set it will be automatically generated, but it is usually desirable to | |
922 set the ID manually so it can be retrieved (e.g. for Undo) | |
923 """ | |
924 if activity not in ACTIVITY_TYPES: | |
925 raise exceptions.InternalError(f"invalid activity: {activity!r}") | |
926 if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: | |
927 raise exceptions.InternalError( | |
928 f'"object_" is mandatory for activity {activity!r}' | |
929 ) | |
930 if target is None and activity in ACTIVITY_TARGET_MANDATORY: | |
931 raise exceptions.InternalError( | |
932 f'"target" is mandatory for activity {activity!r}' | |
933 ) | |
934 if activity_id is None: | |
935 activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" | |
936 data: Dict[str, Any] = { | |
937 "@context": [NS_AP], | |
938 "actor": actor_id, | |
939 "id": activity_id, | |
940 "type": activity, | |
941 } | |
942 data.update(kwargs) | |
943 if object_ is not None: | |
944 data["object"] = object_ | |
945 if target is not None: | |
946 data["target"] = target | |
947 | |
948 return data | |
949 | |
950 def get_key_id(self, actor_id: str) -> str: | |
951 """Get local key ID from actor ID""" | |
952 return f"{actor_id}#main-key" | |
953 | |
954 async def check_signature( | |
955 self, | |
956 signature: str, | |
957 key_id: str, | |
958 headers: Dict[str, str] | |
959 ) -> str: | |
960 """Verify that signature matches given headers | |
961 | |
962 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 | |
963 | |
964 @param signature: Base64 encoded signature | |
965 @param key_id: ID of the key used to sign the data | |
966 @param headers: headers and their values, including pseudo-headers | |
967 @return: id of the signing actor | |
968 | |
969 @raise InvalidSignature: signature doesn't match headers | |
970 """ | |
971 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) | |
972 if key_id.startswith("acct:"): | |
973 actor = key_id[5:] | |
974 actor_id = await self.get_ap_actor_id_from_account(actor) | |
975 else: | |
976 actor_id = key_id.split("#", 1)[0] | |
977 | |
978 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(actor_id) | |
979 if pub_key_id != key_id or pub_key_owner != actor_id: | |
980 raise exceptions.EncryptionError("Public Key mismatch") | |
981 | |
982 try: | |
983 pub_key.verify( | |
984 base64.b64decode(signature), | |
985 to_sign.encode(), | |
986 # we have to use PKCS1v15 padding to be compatible with Mastodon | |
987 padding.PKCS1v15(), # type: ignore | |
988 hashes.SHA256() # type: ignore | |
989 ) | |
990 except InvalidSignature: | |
991 raise exceptions.EncryptionError( | |
992 "Invalid signature (using PKC0S1 v1.5 and SHA-256)" | |
993 ) | |
994 | |
995 return actor_id | |
996 | |
997 def get_signature_data( | |
998 self, | |
999 key_id: str, | |
1000 headers: Dict[str, str] | |
1001 ) -> Tuple[Dict[str, str], Dict[str, str]]: | |
1002 """Generate and return signature and corresponding headers | |
1003 | |
1004 @param parsed_url: URL where the request is sent/has been received | |
1005 @param key_id: ID of the key (URL linking to the data with public key) | |
1006 @param date: HTTP datetime string of signature generation | |
1007 @param body: body of the HTTP request | |
1008 @param headers: headers to sign and their value: | |
1009 default value will be used if not specified | |
1010 | |
1011 @return: headers and signature data | |
1012 ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers | |
1013 removed, and ``Signature`` added. | |
1014 """ | |
1015 # headers must be lower case | |
1016 l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()} | |
1017 to_sign = "\n".join(f"{k}: {v}" for k,v in l_headers.items()) | |
1018 signature = base64.b64encode(self.private_key.sign( | |
1019 to_sign.encode(), | |
1020 # we have to use PKCS1v15 padding to be compatible with Mastodon | |
1021 padding.PKCS1v15(), # type: ignore | |
1022 hashes.SHA256() # type: ignore | |
1023 )).decode() | |
1024 sign_data = { | |
1025 "keyId": key_id, | |
1026 "Algorithm": "rsa-sha256", | |
1027 "headers": " ".join(l_headers.keys()), | |
1028 "signature": signature | |
1029 } | |
1030 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} | |
1031 new_headers["Signature"] = self.build_signature_header(sign_data) | |
1032 return new_headers, sign_data | |
1033 | |
1034 async def convert_and_post_items( | |
1035 self, | |
1036 client: SatXMPPEntity, | |
1037 ap_account: str, | |
1038 service: jid.JID, | |
1039 node: str, | |
1040 items: List[domish.Element], | |
1041 subscribe_extra_nodes: bool = True, | |
1042 ) -> None: | |
1043 """Convert XMPP items to AP items and post them to actor inbox | |
1044 | |
1045 @param ap_account: account of ActivityPub actor receiving the item | |
1046 @param service: JID of the (virtual) pubsub service where the item has been | |
1047 published | |
1048 @param node: (virtual) node corresponding where the item has been published | |
1049 @param subscribe_extra_nodes: if True, extra data nodes will be automatically | |
1050 subscribed, that is comment nodes if present and attachments nodes. | |
1051 """ | |
1052 actor_id = await self.get_ap_actor_id_from_account(ap_account) | |
1053 inbox = await self.get_ap_inbox_from_id(actor_id) | |
1054 for item in items: | |
1055 if item.name == "item": | |
1056 cached_item = await self.host.memory.storage.search_pubsub_items({ | |
1057 "profiles": [self.client.profile], | |
1058 "services": [service], | |
1059 "nodes": [node], | |
1060 "names": [item["id"]] | |
1061 }) | |
1062 is_new = not bool(cached_item) | |
1063 if node.startswith(self._events.namespace): | |
1064 # event item | |
1065 event_data = self._events.event_elt_2_event_data(item) | |
1066 try: | |
1067 author_jid = jid.JID(item["publisher"]).userhostJID() | |
1068 except (KeyError, RuntimeWarning): | |
1069 root_elt = item | |
1070 while root_elt.parent is not None: | |
1071 root_elt = root_elt.parent | |
1072 author_jid = jid.JID(root_elt["from"]).userhostJID() | |
1073 if subscribe_extra_nodes and not self.is_virtual_jid(author_jid): | |
1074 # we subscribe automatically to comment nodes if any | |
1075 recipient_jid = self.get_local_jid_from_account(ap_account) | |
1076 recipient_client = self.client.get_virtual_client(recipient_jid) | |
1077 comments_data = event_data.get("comments") | |
1078 if comments_data: | |
1079 comment_service = jid.JID(comments_data["jid"]) | |
1080 comment_node = comments_data["node"] | |
1081 await self._p.subscribe( | |
1082 recipient_client, comment_service, comment_node | |
1083 ) | |
1084 try: | |
1085 await self._pa.subscribe( | |
1086 recipient_client, service, node, event_data["id"] | |
1087 ) | |
1088 except exceptions.NotFound: | |
1089 log.debug( | |
1090 f"no attachment node found for item {event_data['id']!r} " | |
1091 f"on {node!r} at {service}" | |
1092 ) | |
1093 ap_item = await self.ap_events.event_data_2_ap_item( | |
1094 event_data, author_jid, is_new=is_new | |
1095 ) | |
1096 else: | |
1097 # blog item | |
1098 mb_data = await self._m.item_2_mb_data(client, item, service, node) | |
1099 author_jid = jid.JID(mb_data["author_jid"]) | |
1100 if subscribe_extra_nodes and not self.is_virtual_jid(author_jid): | |
1101 # we subscribe automatically to comment nodes if any | |
1102 recipient_jid = self.get_local_jid_from_account(ap_account) | |
1103 recipient_client = self.client.get_virtual_client(recipient_jid) | |
1104 for comment_data in mb_data.get("comments", []): | |
1105 comment_service = jid.JID(comment_data["service"]) | |
1106 if self.is_virtual_jid(comment_service): | |
1107 log.debug( | |
1108 f"ignoring virtual comment service: {comment_data}" | |
1109 ) | |
1110 continue | |
1111 comment_node = comment_data["node"] | |
1112 await self._p.subscribe( | |
1113 recipient_client, comment_service, comment_node | |
1114 ) | |
1115 try: | |
1116 await self._pa.subscribe( | |
1117 recipient_client, service, node, mb_data["id"] | |
1118 ) | |
1119 except exceptions.NotFound: | |
1120 log.debug( | |
1121 f"no attachment node found for item {mb_data['id']!r} on " | |
1122 f"{node!r} at {service}" | |
1123 ) | |
1124 ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new) | |
1125 | |
1126 url_actor = ap_item["actor"] | |
1127 elif item.name == "retract": | |
1128 url_actor, ap_item = await self.ap_delete_item( | |
1129 client.jid, node, item["id"] | |
1130 ) | |
1131 else: | |
1132 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") | |
1133 await self.sign_and_post(inbox, url_actor, ap_item) | |
1134 | |
1135 async def convert_and_post_attachments( | |
1136 self, | |
1137 client: SatXMPPEntity, | |
1138 ap_account: str, | |
1139 service: jid.JID, | |
1140 node: str, | |
1141 items: List[domish.Element], | |
1142 publisher: Optional[jid.JID] = None | |
1143 ) -> None: | |
1144 """Convert XMPP item attachments to AP activities and post them to actor inbox | |
1145 | |
1146 @param ap_account: account of ActivityPub actor receiving the item | |
1147 @param service: JID of the (virtual) pubsub service where the item has been | |
1148 published | |
1149 @param node: (virtual) node corresponding where the item has been published | |
1150 subscribed, that is comment nodes if present and attachments nodes. | |
1151 @param items: attachments items | |
1152 @param publisher: publisher of the attachments item (it's NOT the PEP/Pubsub | |
1153 service, it's the publisher of the item). To be filled only when the publisher | |
1154 is known for sure, otherwise publisher will be determined either if | |
1155 "publisher" attribute is set by pubsub service, or as a last resort, using | |
1156 item's ID (which MUST be publisher bare JID according to pubsub-attachments | |
1157 specification). | |
1158 """ | |
1159 if len(items) != 1: | |
1160 log.warning( | |
1161 "we should get exactly one attachment item for an entity, got " | |
1162 f"{len(items)})" | |
1163 ) | |
1164 | |
1165 actor_id = await self.get_ap_actor_id_from_account(ap_account) | |
1166 inbox = await self.get_ap_inbox_from_id(actor_id) | |
1167 | |
1168 item_elt = items[0] | |
1169 item_id = item_elt["id"] | |
1170 | |
1171 if publisher is None: | |
1172 item_pub_s = item_elt.getAttribute("publisher") | |
1173 publisher = jid.JID(item_pub_s) if item_pub_s else jid.JID(item_id) | |
1174 | |
1175 if publisher.userhost() != item_id: | |
1176 log.warning( | |
1177 "attachments item ID must be publisher's bare JID, ignoring: " | |
1178 f"{item_elt.toXml()}" | |
1179 ) | |
1180 return | |
1181 | |
1182 if self.is_virtual_jid(publisher): | |
1183 log.debug(f"ignoring item coming from local virtual JID {publisher}") | |
1184 return | |
1185 | |
1186 if publisher is not None: | |
1187 item_elt["publisher"] = publisher.userhost() | |
1188 | |
1189 item_service, item_node, item_id = self._pa.attachment_node_2_item(node) | |
1190 item_account = await self.get_ap_account_from_jid_and_node(item_service, item_node) | |
1191 if self.is_virtual_jid(item_service): | |
1192 # it's a virtual JID mapping to an external AP actor, we can use the | |
1193 # item_id directly | |
1194 item_url = item_id | |
1195 if not item_url.startswith("https:"): | |
1196 log.warning( | |
1197 "item ID of external AP actor is not an https link, ignoring: " | |
1198 f"{item_id!r}" | |
1199 ) | |
1200 return | |
1201 else: | |
1202 item_url = self.build_apurl(TYPE_ITEM, item_account, item_id) | |
1203 | |
1204 old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({ | |
1205 "profiles": [self.client.profile], | |
1206 "services": [service], | |
1207 "nodes": [node], | |
1208 "names": [item_elt["id"]] | |
1209 }) | |
1210 if not old_attachment_pubsub_items: | |
1211 old_attachment = {} | |
1212 else: | |
1213 old_attachment_items = [i.data for i in old_attachment_pubsub_items] | |
1214 old_attachments = self._pa.items_2_attachment_data(client, old_attachment_items) | |
1215 try: | |
1216 old_attachment = old_attachments[0] | |
1217 except IndexError: | |
1218 # no known element was present in attachments | |
1219 old_attachment = {} | |
1220 publisher_account = await self.get_ap_account_from_jid_and_node( | |
1221 publisher, | |
1222 None | |
1223 ) | |
1224 publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account) | |
1225 try: | |
1226 attachments = self._pa.items_2_attachment_data(client, [item_elt])[0] | |
1227 except IndexError: | |
1228 # no known element was present in attachments | |
1229 attachments = {} | |
1230 | |
1231 # noticed | |
1232 if "noticed" in attachments: | |
1233 if not "noticed" in old_attachment: | |
1234 # new "noticed" attachment, we translate to "Like" activity | |
1235 activity_id = self.build_apurl("like", item_account, item_id) | |
1236 activity = self.create_activity( | |
1237 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id | |
1238 ) | |
1239 activity["to"] = [ap_account] | |
1240 activity["cc"] = [NS_AP_PUBLIC] | |
1241 await self.sign_and_post(inbox, publisher_actor_id, activity) | |
1242 else: | |
1243 if "noticed" in old_attachment: | |
1244 # "noticed" attachment has been removed, we undo the "Like" activity | |
1245 activity_id = self.build_apurl("like", item_account, item_id) | |
1246 activity = self.create_activity( | |
1247 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id | |
1248 ) | |
1249 activity["to"] = [ap_account] | |
1250 activity["cc"] = [NS_AP_PUBLIC] | |
1251 undo = self.create_activity("Undo", publisher_actor_id, activity) | |
1252 await self.sign_and_post(inbox, publisher_actor_id, undo) | |
1253 | |
1254 # reactions | |
1255 new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) | |
1256 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) | |
1257 reactions_remove = old_reactions - new_reactions | |
1258 reactions_add = new_reactions - old_reactions | |
1259 for reactions, undo in ((reactions_remove, True), (reactions_add, False)): | |
1260 for reaction in reactions: | |
1261 activity_id = self.build_apurl( | |
1262 "reaction", item_account, item_id, reaction.encode().hex() | |
1263 ) | |
1264 reaction_activity = self.create_activity( | |
1265 TYPE_REACTION, publisher_actor_id, item_url, | |
1266 activity_id=activity_id | |
1267 ) | |
1268 reaction_activity["content"] = reaction | |
1269 reaction_activity["to"] = [ap_account] | |
1270 reaction_activity["cc"] = [NS_AP_PUBLIC] | |
1271 if undo: | |
1272 activy = self.create_activity( | |
1273 "Undo", publisher_actor_id, reaction_activity | |
1274 ) | |
1275 else: | |
1276 activy = reaction_activity | |
1277 await self.sign_and_post(inbox, publisher_actor_id, activy) | |
1278 | |
1279 # RSVP | |
1280 if "rsvp" in attachments: | |
1281 attending = attachments["rsvp"].get("attending", "no") | |
1282 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") | |
1283 if attending != old_attending: | |
1284 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE | |
1285 activity_id = self.build_apurl(activity_type.lower(), item_account, item_id) | |
1286 activity = self.create_activity( | |
1287 activity_type, publisher_actor_id, item_url, activity_id=activity_id | |
1288 ) | |
1289 activity["to"] = [ap_account] | |
1290 activity["cc"] = [NS_AP_PUBLIC] | |
1291 await self.sign_and_post(inbox, publisher_actor_id, activity) | |
1292 else: | |
1293 if "rsvp" in old_attachment: | |
1294 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") | |
1295 if old_attending == "yes": | |
1296 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) | |
1297 activity = self.create_activity( | |
1298 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id | |
1299 ) | |
1300 activity["to"] = [ap_account] | |
1301 activity["cc"] = [NS_AP_PUBLIC] | |
1302 await self.sign_and_post(inbox, publisher_actor_id, activity) | |
1303 | |
1304 if service.user and self.is_virtual_jid(service): | |
1305 # the item is on a virtual service, we need to store it in cache | |
1306 log.debug("storing attachments item in cache") | |
1307 cached_node = await self.host.memory.storage.get_pubsub_node( | |
1308 client, service, node, with_subscriptions=True, create=True | |
1309 ) | |
1310 await self.host.memory.storage.cache_pubsub_items( | |
1311 self.client, | |
1312 cached_node, | |
1313 [item_elt], | |
1314 [attachments] | |
1315 ) | |
1316 | |
1317 async def sign_and_post(self, url: str, actor_id: str, doc: dict) -> TReqResponse: | |
1318 """Sign a documentent and post it to AP server | |
1319 | |
1320 @param url: AP server endpoint | |
1321 @param actor_id: originating actor ID (URL) | |
1322 @param doc: document to send | |
1323 """ | |
1324 if self.verbose: | |
1325 __, actor_args = self.parse_apurl(actor_id) | |
1326 actor_account = actor_args[0] | |
1327 to_log = [ | |
1328 "", | |
1329 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" | |
1330 ] | |
1331 | |
1332 p_url = parse.urlparse(url) | |
1333 body = json.dumps(doc).encode() | |
1334 digest_algo, digest_hash = self.get_digest(body) | |
1335 digest = f"{digest_algo}={digest_hash}" | |
1336 | |
1337 headers = { | |
1338 "(request-target)": f"post {p_url.path}", | |
1339 "Host": p_url.hostname, | |
1340 "Date": http.datetimeToString().decode(), | |
1341 "Digest": digest | |
1342 } | |
1343 headers["Content-Type"] = ( | |
1344 'application/activity+json' | |
1345 ) | |
1346 headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers) | |
1347 | |
1348 if self.verbose: | |
1349 if self.verbose>=3: | |
1350 h_to_log = "\n".join(f" {k}: {v}" for k,v in headers.items()) | |
1351 to_log.append(f" headers:\n{h_to_log}") | |
1352 to_log.append("---") | |
1353 log.info("\n".join(to_log)) | |
1354 | |
1355 resp = await treq.post( | |
1356 url, | |
1357 body, | |
1358 headers=headers, | |
1359 ) | |
1360 if resp.code >= 300: | |
1361 text = await resp.text() | |
1362 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") | |
1363 elif self.verbose: | |
1364 log.info(f"==> response code: {resp.code}") | |
1365 return resp | |
1366 | |
1367 def _publish_message(self, mess_data_s: str, service_s: str, profile: str): | |
1368 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore | |
1369 service = jid.JID(service_s) | |
1370 client = self.host.get_client(profile) | |
1371 return defer.ensureDeferred(self.publish_message(client, mess_data, service)) | |
1372 | |
1373 @async_lru(maxsize=LRU_MAX_SIZE) | |
1374 async def get_ap_actor_id_from_account(self, account: str) -> str: | |
1375 """Retrieve account ID from it's handle using WebFinger | |
1376 | |
1377 Don't use this method to get local actor id from a local account derivated for | |
1378 JID: in this case, the actor ID is retrieve with | |
1379 ``self.build_apurl(TYPE_ACTOR, ap_account)`` | |
1380 | |
1381 @param account: AP handle (user@domain.tld) | |
1382 @return: Actor ID (which is an URL) | |
1383 """ | |
1384 if account.count("@") != 1 or "/" in account: | |
1385 raise ValueError(f"Invalid account: {account!r}") | |
1386 host = account.split("@")[1] | |
1387 try: | |
1388 finger_data = await treq.json_content(await treq.get( | |
1389 f"https://{host}/.well-known/webfinger?" | |
1390 f"resource=acct:{parse.quote_plus(account)}", | |
1391 )) | |
1392 except Exception as e: | |
1393 raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") | |
1394 for link in finger_data.get("links", []): | |
1395 if ( | |
1396 link.get("type") == "application/activity+json" | |
1397 and link.get("rel") == "self" | |
1398 ): | |
1399 href = link.get("href", "").strip() | |
1400 if not href: | |
1401 raise ValueError( | |
1402 f"Invalid webfinger data for {account:r}: missing href" | |
1403 ) | |
1404 break | |
1405 else: | |
1406 raise ValueError( | |
1407 f"No ActivityPub link found for {account!r}" | |
1408 ) | |
1409 return href | |
1410 | |
1411 async def get_ap_actor_data_from_account(self, account: str) -> dict: | |
1412 """Retrieve ActivityPub Actor data | |
1413 | |
1414 @param account: ActivityPub Actor identifier | |
1415 """ | |
1416 href = await self.get_ap_actor_id_from_account(account) | |
1417 return await self.ap_get(href) | |
1418 | |
1419 async def get_ap_inbox_from_id(self, actor_id: str, use_shared: bool = True) -> str: | |
1420 """Retrieve inbox of an actor_id | |
1421 | |
1422 @param use_shared: if True, and a shared inbox exists, it will be used instead of | |
1423 the user inbox | |
1424 """ | |
1425 data = await self.get_actor_data(actor_id) | |
1426 if use_shared: | |
1427 try: | |
1428 return data["endpoints"]["sharedInbox"] | |
1429 except KeyError: | |
1430 pass | |
1431 return data["inbox"] | |
1432 | |
1433 @async_lru(maxsize=LRU_MAX_SIZE) | |
1434 async def get_ap_account_from_id(self, actor_id: str) -> str: | |
1435 """Retrieve AP account from the ID URL | |
1436 | |
1437 Works with external or local actor IDs. | |
1438 @param actor_id: AP ID of the actor (URL to the actor data) | |
1439 @return: AP handle | |
1440 """ | |
1441 if self.is_local_url(actor_id): | |
1442 url_type, url_args = self.parse_apurl(actor_id) | |
1443 if url_type != "actor" or not url_args: | |
1444 raise exceptions.DataError( | |
1445 f"invalid local actor ID: {actor_id}" | |
1446 ) | |
1447 account = url_args[0] | |
1448 try: | |
1449 account_user, account_host = account.split('@') | |
1450 except ValueError: | |
1451 raise exceptions.DataError( | |
1452 f"invalid account from url: {actor_id}" | |
1453 ) | |
1454 if not account_user or account_host != self.public_url: | |
1455 raise exceptions.DataError( | |
1456 f"{account!r} is not a valid local account (from {actor_id})" | |
1457 ) | |
1458 return account | |
1459 | |
1460 url_parsed = parse.urlparse(actor_id) | |
1461 actor_data = await self.get_actor_data(actor_id) | |
1462 username = actor_data.get("preferredUsername") | |
1463 if not username: | |
1464 raise exceptions.DataError( | |
1465 'No "preferredUsername" field found, can\'t retrieve actor account' | |
1466 ) | |
1467 account = f"{username}@{url_parsed.hostname}" | |
1468 # we try to retrieve the actor ID from the account to check it | |
1469 found_id = await self.get_ap_actor_id_from_account(account) | |
1470 if found_id != actor_id: | |
1471 # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 | |
1472 msg = ( | |
1473 f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " | |
1474 f"({actor_id!r}). This AP instance doesn't seems to use " | |
1475 '"preferredUsername" as we expect.' | |
1476 ) | |
1477 log.warning(msg) | |
1478 raise exceptions.DataError(msg) | |
1479 return account | |
1480 | |
1481 async def get_ap_items( | |
1482 self, | |
1483 collection: dict, | |
1484 max_items: Optional[int] = None, | |
1485 chronological_pagination: bool = True, | |
1486 after_id: Optional[str] = None, | |
1487 start_index: Optional[int] = None, | |
1488 parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None, | |
1489 only_ids: bool = False, | |
1490 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: | |
1491 """Retrieve AP items and convert them to XMPP items | |
1492 | |
1493 @param account: AP account handle to get items from | |
1494 @param max_items: maximum number of items to retrieve | |
1495 retrieve all items by default | |
1496 @param chronological_pagination: get pages in chronological order | |
1497 AP use reversed chronological order for pagination, "first" page returns more | |
1498 recent items. If "chronological_pagination" is True, "last" AP page will be | |
1499 retrieved first. | |
1500 @param after_id: if set, retrieve items starting from given ID | |
1501 Due to ActivityStream Collection Paging limitations, this is inefficient and | |
1502 if ``after_id`` is not already in cache, we have to retrieve every page until | |
1503 we find it. | |
1504 In most common cases, ``after_id`` should be in cache though (client usually | |
1505 use known ID when in-order pagination is used). | |
1506 @param start_index: start retrieving items from the one with given index | |
1507 Due to ActivityStream Collection Paging limitations, this is inefficient and | |
1508 all pages before the requested index will be retrieved to count items. | |
1509 @param parser: method to use to parse AP items and get XMPP item elements | |
1510 if None, use default generic parser | |
1511 @param only_ids: if True, only retrieve items IDs | |
1512 Retrieving only item IDs avoid HTTP requests to retrieve items, it may be | |
1513 sufficient in some use cases (e.g. when retrieving following/followers | |
1514 collections) | |
1515 @return: XMPP Pubsub items and corresponding RSM Response | |
1516 Items are always returned in chronological order in the result | |
1517 """ | |
1518 if parser is None: | |
1519 parser = self.ap_item_2_mb_elt | |
1520 | |
1521 rsm_resp: Dict[str, Union[bool, int]] = {} | |
1522 try: | |
1523 count = collection["totalItems"] | |
1524 except KeyError: | |
1525 log.warning( | |
1526 f'"totalItems" not found in collection {collection.get("id")}, ' | |
1527 "defaulting to 20" | |
1528 ) | |
1529 count = 20 | |
1530 else: | |
1531 log.info(f"{collection.get('id')} has {count} item(s)") | |
1532 | |
1533 rsm_resp["count"] = count | |
1534 | |
1535 if start_index is not None: | |
1536 assert chronological_pagination and after_id is None | |
1537 if start_index >= count: | |
1538 return [], rsm_resp | |
1539 elif start_index == 0: | |
1540 # this is the default behaviour | |
1541 pass | |
1542 elif start_index > 5000: | |
1543 raise error.StanzaError( | |
1544 "feature-not-implemented", | |
1545 text="Maximum limit for previous_index has been reached, this limit" | |
1546 "is set to avoid DoS" | |
1547 ) | |
1548 else: | |
1549 # we'll convert "start_index" to "after_id", thus we need the item just | |
1550 # before "start_index" | |
1551 previous_index = start_index - 1 | |
1552 retrieved_items = 0 | |
1553 current_page = collection["last"] | |
1554 while retrieved_items < count: | |
1555 page_data, items = await self.parse_ap_page( | |
1556 current_page, parser, only_ids | |
1557 ) | |
1558 if not items: | |
1559 log.warning(f"found an empty AP page at {current_page}") | |
1560 return [], rsm_resp | |
1561 page_start_idx = retrieved_items | |
1562 retrieved_items += len(items) | |
1563 if previous_index <= retrieved_items: | |
1564 after_id = items[previous_index - page_start_idx]["id"] | |
1565 break | |
1566 try: | |
1567 current_page = page_data["prev"] | |
1568 except KeyError: | |
1569 log.warning( | |
1570 f"missing previous page link at {current_page}: {page_data!r}" | |
1571 ) | |
1572 raise error.StanzaError( | |
1573 "service-unavailable", | |
1574 "Error while retrieving previous page from AP service at " | |
1575 f"{current_page}" | |
1576 ) | |
1577 | |
1578 init_page = "last" if chronological_pagination else "first" | |
1579 page = collection.get(init_page) | |
1580 if not page: | |
1581 raise exceptions.DataError( | |
1582 f"Initial page {init_page!r} not found for collection " | |
1583 f"{collection.get('id')})" | |
1584 ) | |
1585 items = [] | |
1586 page_items = [] | |
1587 retrieved_items = 0 | |
1588 found_after_id = False | |
1589 | |
1590 while retrieved_items < count: | |
1591 __, page_items = await self.parse_ap_page(page, parser, only_ids) | |
1592 if not page_items: | |
1593 break | |
1594 retrieved_items += len(page_items) | |
1595 if after_id is not None and not found_after_id: | |
1596 # if we have an after_id, we ignore all items until the requested one is | |
1597 # found | |
1598 try: | |
1599 limit_idx = [i["id"] for i in page_items].index(after_id) | |
1600 except ValueError: | |
1601 # if "after_id" is not found, we don't add any item from this page | |
1602 page_id = page.get("id") if isinstance(page, dict) else page | |
1603 log.debug(f"{after_id!r} not found at {page_id}, skipping") | |
1604 else: | |
1605 found_after_id = True | |
1606 if chronological_pagination: | |
1607 start_index = retrieved_items - len(page_items) + limit_idx + 1 | |
1608 page_items = page_items[limit_idx+1:] | |
1609 else: | |
1610 start_index = count - (retrieved_items - len(page_items) + | |
1611 limit_idx + 1) | |
1612 page_items = page_items[:limit_idx] | |
1613 items.extend(page_items) | |
1614 else: | |
1615 items.extend(page_items) | |
1616 if max_items is not None and len(items) >= max_items: | |
1617 if chronological_pagination: | |
1618 items = items[:max_items] | |
1619 else: | |
1620 items = items[-max_items:] | |
1621 break | |
1622 page = collection.get("prev" if chronological_pagination else "next") | |
1623 if not page: | |
1624 break | |
1625 | |
1626 if after_id is not None and not found_after_id: | |
1627 raise error.StanzaError("item-not-found") | |
1628 | |
1629 if items: | |
1630 if after_id is None: | |
1631 rsm_resp["index"] = 0 if chronological_pagination else count - len(items) | |
1632 if start_index is not None: | |
1633 rsm_resp["index"] = start_index | |
1634 elif after_id is not None: | |
1635 log.warning("Can't determine index of first element") | |
1636 elif chronological_pagination: | |
1637 rsm_resp["index"] = 0 | |
1638 else: | |
1639 rsm_resp["index"] = count - len(items) | |
1640 rsm_resp.update({ | |
1641 "first": items[0]["id"], | |
1642 "last": items[-1]["id"] | |
1643 }) | |
1644 | |
1645 return items, rsm.RSMResponse(**rsm_resp) | |
1646 | |
1647 async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]: | |
1648 """Convert AP item to parsed microblog data and corresponding item element""" | |
1649 mb_data = await self.ap_item_2_mb_data(ap_item) | |
1650 item_elt = await self._m.mb_data_2_entry_elt( | |
1651 self.client, mb_data, mb_data["id"], None, self._m.namespace | |
1652 ) | |
1653 if "repeated" in mb_data["extra"]: | |
1654 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] | |
1655 else: | |
1656 item_elt["publisher"] = mb_data["author_jid"] | |
1657 return mb_data, item_elt | |
1658 | |
1659 async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: | |
1660 """Convert AP item to XMPP item element""" | |
1661 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) | |
1662 return item_elt | |
1663 | |
1664 async def parse_ap_page( | |
1665 self, | |
1666 page: Union[str, dict], | |
1667 parser: Callable[[dict], Awaitable[domish.Element]], | |
1668 only_ids: bool = False | |
1669 ) -> Tuple[dict, List[domish.Element]]: | |
1670 """Convert AP objects from an AP page to XMPP items | |
1671 | |
1672 @param page: Can be either url linking and AP page, or the page data directly | |
1673 @param parser: method to use to parse AP items and get XMPP item elements | |
1674 @param only_ids: if True, only retrieve items IDs | |
1675 @return: page data, pubsub items | |
1676 """ | |
1677 page_data = await self.ap_get_object(page) | |
1678 if page_data is None: | |
1679 log.warning('No data found in collection') | |
1680 return {}, [] | |
1681 ap_items = await self.ap_get_list(page_data, "orderedItems", only_ids=only_ids) | |
1682 if ap_items is None: | |
1683 ap_items = await self.ap_get_list(page_data, "items", only_ids=only_ids) | |
1684 if not ap_items: | |
1685 log.warning(f'No item field found in collection: {page_data!r}') | |
1686 return page_data, [] | |
1687 else: | |
1688 log.warning( | |
1689 "Items are not ordered, this is not spec compliant" | |
1690 ) | |
1691 items = [] | |
1692 # AP Collections are in antichronological order, but we expect chronological in | |
1693 # Pubsub, thus we reverse it | |
1694 for ap_item in reversed(ap_items): | |
1695 try: | |
1696 items.append(await parser(ap_item)) | |
1697 except (exceptions.DataError, NotImplementedError, error.StanzaError): | |
1698 continue | |
1699 | |
1700 return page_data, items | |
1701 | |
1702 async def get_comments_nodes( | |
1703 self, | |
1704 item_id: str, | |
1705 parent_id: Optional[str] | |
1706 ) -> Tuple[Optional[str], Optional[str]]: | |
1707 """Get node where this item is and node to use for comments | |
1708 | |
1709 if config option "comments_max_depth" is set, a common node will be used below the | |
1710 given depth | |
1711 @param item_id: ID of the reference item | |
1712 @param parent_id: ID of the parent item if any (the ID set in "inReplyTo") | |
1713 @return: a tuple with parent_node_id, comments_node_id: | |
1714 - parent_node_id is the ID of the node where reference item must be. None is | |
1715 returned when the root node (i.e. not a comments node) must be used. | |
1716 - comments_node_id: is the ID of the node to use for comments. None is | |
1717 returned when no comment node must be used (happens when we have reached | |
1718 "comments_max_depth") | |
1719 """ | |
1720 if parent_id is None or not self.comments_max_depth: | |
1721 return ( | |
1722 self._m.get_comments_node(parent_id) if parent_id is not None else None, | |
1723 self._m.get_comments_node(item_id) | |
1724 ) | |
1725 parent_url = parent_id | |
1726 parents = [] | |
1727 for __ in range(COMMENTS_MAX_PARENTS): | |
1728 parent_item = await self.ap_get(parent_url) | |
1729 parents.insert(0, parent_item) | |
1730 parent_url = parent_item.get("inReplyTo") | |
1731 if parent_url is None: | |
1732 break | |
1733 parent_limit = self.comments_max_depth-1 | |
1734 if len(parents) <= parent_limit: | |
1735 return ( | |
1736 self._m.get_comments_node(parents[-1]["id"]), | |
1737 self._m.get_comments_node(item_id) | |
1738 ) | |
1739 else: | |
1740 last_level_item = parents[parent_limit] | |
1741 return ( | |
1742 self._m.get_comments_node(last_level_item["id"]), | |
1743 None | |
1744 ) | |
1745 | |
1746 async def ap_item_2_mb_data(self, ap_item: dict) -> dict: | |
1747 """Convert AP activity or object to microblog data | |
1748 | |
1749 @param ap_item: ActivityPub item to convert | |
1750 Can be either an activity of an object | |
1751 @return: AP Item's Object and microblog data | |
1752 @raise exceptions.DataError: something is invalid in the AP item | |
1753 @raise NotImplementedError: some AP data is not handled yet | |
1754 @raise error.StanzaError: error while contacting the AP server | |
1755 """ | |
1756 is_activity = self.is_activity(ap_item) | |
1757 if is_activity: | |
1758 ap_object = await self.ap_get_object(ap_item, "object") | |
1759 if not ap_object: | |
1760 log.warning(f'No "object" found in AP item {ap_item!r}') | |
1761 raise exceptions.DataError | |
1762 else: | |
1763 ap_object = ap_item | |
1764 item_id = ap_object.get("id") | |
1765 if not item_id: | |
1766 log.warning(f'No "id" found in AP item: {ap_object!r}') | |
1767 raise exceptions.DataError | |
1768 mb_data = {"id": item_id, "extra": {}} | |
1769 | |
1770 # content | |
1771 try: | |
1772 language, content_xhtml = ap_object["contentMap"].popitem() | |
1773 except (KeyError, AttributeError): | |
1774 try: | |
1775 mb_data["content_xhtml"] = ap_object["content"] | |
1776 except KeyError: | |
1777 log.warning(f"no content found:\n{ap_object!r}") | |
1778 raise exceptions.DataError | |
1779 else: | |
1780 mb_data["language"] = language | |
1781 mb_data["content_xhtml"] = content_xhtml | |
1782 | |
1783 mb_data["content"] = await self._t.convert( | |
1784 mb_data["content_xhtml"], | |
1785 self._t.SYNTAX_XHTML, | |
1786 self._t.SYNTAX_TEXT, | |
1787 False, | |
1788 ) | |
1789 | |
1790 if "attachment" in ap_object: | |
1791 attachments = mb_data["extra"][C.KEY_ATTACHMENTS] = [] | |
1792 for ap_attachment in ap_object["attachment"]: | |
1793 try: | |
1794 url = ap_attachment["url"] | |
1795 except KeyError: | |
1796 log.warning( | |
1797 f'"url" missing in AP attachment, ignoring: {ap_attachment}' | |
1798 ) | |
1799 continue | |
1800 | |
1801 if not url.startswith("http"): | |
1802 log.warning(f"non HTTP URL in attachment, ignoring: {ap_attachment}") | |
1803 continue | |
1804 attachment = {"url": url} | |
1805 for ap_key, key in ( | |
1806 ("mediaType", "media_type"), | |
1807 # XXX: as weird as it seems, "name" is actually used for description | |
1808 # in AP world | |
1809 ("name", "desc"), | |
1810 ): | |
1811 value = ap_attachment.get(ap_key) | |
1812 if value: | |
1813 attachment[key] = value | |
1814 attachments.append(attachment) | |
1815 | |
1816 # author | |
1817 if is_activity: | |
1818 authors = await self.ap_get_actors(ap_item, "actor") | |
1819 else: | |
1820 authors = await self.ap_get_actors(ap_object, "attributedTo") | |
1821 if len(authors) > 1: | |
1822 # we only keep first item as author | |
1823 # TODO: handle multiple actors | |
1824 log.warning("multiple actors are not managed") | |
1825 | |
1826 account = authors[0] | |
1827 author_jid = self.get_local_jid_from_account(account).full() | |
1828 | |
1829 mb_data["author"] = account.split("@", 1)[0] | |
1830 mb_data["author_jid"] = author_jid | |
1831 | |
1832 # published/updated | |
1833 for field in ("published", "updated"): | |
1834 value = ap_object.get(field) | |
1835 if not value and field == "updated": | |
1836 value = ap_object.get("published") | |
1837 if value: | |
1838 try: | |
1839 mb_data[field] = calendar.timegm( | |
1840 dateutil.parser.parse(str(value)).utctimetuple() | |
1841 ) | |
1842 except dateutil.parser.ParserError as e: | |
1843 log.warning(f"Can't parse {field!r} field: {e}") | |
1844 | |
1845 # repeat | |
1846 if "_repeated" in ap_item: | |
1847 mb_data["extra"]["repeated"] = ap_item["_repeated"] | |
1848 | |
1849 # comments | |
1850 in_reply_to = ap_object.get("inReplyTo") | |
1851 __, comments_node = await self.get_comments_nodes(item_id, in_reply_to) | |
1852 if comments_node is not None: | |
1853 comments_data = { | |
1854 "service": author_jid, | |
1855 "node": comments_node, | |
1856 "uri": uri.build_xmpp_uri( | |
1857 "pubsub", | |
1858 path=author_jid, | |
1859 node=comments_node | |
1860 ) | |
1861 } | |
1862 mb_data["comments"] = [comments_data] | |
1863 | |
1864 return mb_data | |
1865 | |
1866 async def get_reply_to_id_from_xmpp_node( | |
1867 self, | |
1868 client: SatXMPPEntity, | |
1869 ap_account: str, | |
1870 parent_item: str, | |
1871 mb_data: dict | |
1872 ) -> str: | |
1873 """Get URL to use for ``inReplyTo`` field in AP item. | |
1874 | |
1875 There is currently no way to know the parent service of a comment with XEP-0277. | |
1876 To work around that, we try to check if we have this item in the cache (we | |
1877 should). If there is more that one item with this ID, we first try to find one | |
1878 with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`. | |
1879 | |
1880 @param ap_account: AP account corresponding to the publication author | |
1881 @param parent_item: ID of the node where the publication this item is replying to | |
1882 has been posted | |
1883 @param mb_data: microblog data of the publication | |
1884 @return: URL to use in ``inReplyTo`` field | |
1885 """ | |
1886 # FIXME: propose a protoXEP to properly get parent item, node and service | |
1887 | |
1888 found_items = await self.host.memory.storage.search_pubsub_items({ | |
1889 "profiles": [client.profile], | |
1890 "names": [parent_item] | |
1891 }) | |
1892 if not found_items: | |
1893 log.warning(f"parent item {parent_item!r} not found in cache") | |
1894 parent_ap_account = ap_account | |
1895 elif len(found_items) == 1: | |
1896 cached_node = found_items[0].node | |
1897 parent_ap_account = await self.get_ap_account_from_jid_and_node( | |
1898 cached_node.service, | |
1899 cached_node.name | |
1900 ) | |
1901 else: | |
1902 # we found several cached item with given ID, we check if there is one | |
1903 # corresponding to this author | |
1904 try: | |
1905 author = jid.JID(mb_data["author_jid"]).userhostJID() | |
1906 cached_item = next( | |
1907 i for i in found_items | |
1908 if jid.JID(i.data["publisher"]).userhostJID() | |
1909 == author | |
1910 ) | |
1911 except StopIteration: | |
1912 # no item corresponding to this author, we use ap_account | |
1913 log.warning( | |
1914 "Can't find a single cached item for parent item " | |
1915 f"{parent_item!r}" | |
1916 ) | |
1917 parent_ap_account = ap_account | |
1918 else: | |
1919 cached_node = cached_item.node | |
1920 parent_ap_account = await self.get_ap_account_from_jid_and_node( | |
1921 cached_node.service, | |
1922 cached_node.name | |
1923 ) | |
1924 | |
1925 return self.build_apurl( | |
1926 TYPE_ITEM, parent_ap_account, parent_item | |
1927 ) | |
1928 | |
1929 async def repeated_mb_2_ap_item( | |
1930 self, | |
1931 mb_data: dict | |
1932 ) -> dict: | |
1933 """Convert repeated blog item to suitable AP Announce activity | |
1934 | |
1935 @param mb_data: microblog metadata of an item repeating an other blog post | |
1936 @return: Announce activity linking to the repeated item | |
1937 """ | |
1938 repeated = mb_data["extra"]["repeated"] | |
1939 repeater = jid.JID(repeated["by"]) | |
1940 repeater_account = await self.get_ap_account_from_jid_and_node( | |
1941 repeater, | |
1942 None | |
1943 ) | |
1944 repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account) | |
1945 repeated_uri = repeated["uri"] | |
1946 | |
1947 if not repeated_uri.startswith("xmpp:"): | |
1948 log.warning( | |
1949 "Only xmpp: URL are handled for repeated item at the moment, ignoring " | |
1950 f"item {mb_data}" | |
1951 ) | |
1952 raise NotImplementedError | |
1953 parsed_url = uri.parse_xmpp_uri(repeated_uri) | |
1954 if parsed_url["type"] != "pubsub": | |
1955 log.warning( | |
1956 "Only pubsub URL are handled for repeated item at the moment, ignoring " | |
1957 f"item {mb_data}" | |
1958 ) | |
1959 raise NotImplementedError | |
1960 rep_service = jid.JID(parsed_url["path"]) | |
1961 rep_item = parsed_url["item"] | |
1962 activity_id = self.build_apurl("item", repeater.userhost(), mb_data["id"]) | |
1963 | |
1964 if self.is_virtual_jid(rep_service): | |
1965 # it's an AP actor linked through this gateway | |
1966 # in this case we can simply use the item ID | |
1967 if not rep_item.startswith("https:"): | |
1968 log.warning( | |
1969 f"Was expecting an HTTPS url as item ID and got {rep_item!r}\n" | |
1970 f"{mb_data}" | |
1971 ) | |
1972 announced_uri = rep_item | |
1973 repeated_account = self._e.unescape(rep_service.user) | |
1974 else: | |
1975 # the repeated item is an XMPP publication, we build the corresponding ID | |
1976 rep_node = parsed_url["node"] | |
1977 repeated_account = await self.get_ap_account_from_jid_and_node( | |
1978 rep_service, rep_node | |
1979 ) | |
1980 announced_uri = self.build_apurl("item", repeated_account, rep_item) | |
1981 | |
1982 announce = self.create_activity( | |
1983 "Announce", repeater_id, announced_uri, activity_id=activity_id | |
1984 ) | |
1985 announce["to"] = [NS_AP_PUBLIC] | |
1986 announce["cc"] = [ | |
1987 self.build_apurl(TYPE_FOLLOWERS, repeater_account), | |
1988 await self.get_ap_actor_id_from_account(repeated_account) | |
1989 ] | |
1990 return announce | |
1991 | |
1992 async def mb_data_2_ap_item( | |
1993 self, | |
1994 client: SatXMPPEntity, | |
1995 mb_data: dict, | |
1996 public: bool =True, | |
1997 is_new: bool = True, | |
1998 ) -> dict: | |
1999 """Convert Libervia Microblog Data to ActivityPub item | |
2000 | |
2001 @param mb_data: microblog data (as used in plugin XEP-0277) to convert | |
2002 If ``public`` is True, ``service`` and ``node`` keys must be set. | |
2003 If ``published`` is not set, current datetime will be used | |
2004 @param public: True if the message is not a private/direct one | |
2005 if True, the AP Item will be marked as public, and AP followers of target AP | |
2006 account (which retrieve from ``service``) will be put in ``cc``. | |
2007 ``inReplyTo`` will also be set if suitable | |
2008 if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). | |
2009 This is usually used for direct messages. | |
2010 @param is_new: if True, the item is a new one (no instance has been found in | |
2011 cache). | |
2012 If True, a "Create" activity will be generated, otherwise an "Update" one will | |
2013 be. | |
2014 @return: Activity item | |
2015 """ | |
2016 extra = mb_data.get("extra", {}) | |
2017 if "repeated" in extra: | |
2018 return await self.repeated_mb_2_ap_item(mb_data) | |
2019 if not mb_data.get("id"): | |
2020 mb_data["id"] = shortuuid.uuid() | |
2021 if not mb_data.get("author_jid"): | |
2022 mb_data["author_jid"] = client.jid.userhost() | |
2023 ap_account = await self.get_ap_account_from_jid_and_node( | |
2024 jid.JID(mb_data["author_jid"]), | |
2025 None | |
2026 ) | |
2027 url_actor = self.build_apurl(TYPE_ACTOR, ap_account) | |
2028 url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"]) | |
2029 ap_object = { | |
2030 "id": url_item, | |
2031 "type": "Note", | |
2032 "published": utils.xmpp_date(mb_data.get("published")), | |
2033 "attributedTo": url_actor, | |
2034 "content": mb_data.get("content_xhtml") or mb_data["content"], | |
2035 } | |
2036 | |
2037 language = mb_data.get("language") | |
2038 if language: | |
2039 ap_object["contentMap"] = {language: ap_object["content"]} | |
2040 | |
2041 attachments = extra.get(C.KEY_ATTACHMENTS) | |
2042 if attachments: | |
2043 ap_attachments = ap_object["attachment"] = [] | |
2044 for attachment in attachments: | |
2045 try: | |
2046 url = next( | |
2047 s['url'] for s in attachment["sources"] if 'url' in s | |
2048 ) | |
2049 except (StopIteration, KeyError): | |
2050 log.warning( | |
2051 f"Ignoring attachment without URL: {attachment}" | |
2052 ) | |
2053 continue | |
2054 ap_attachment = { | |
2055 "url": url | |
2056 } | |
2057 for key, ap_key in ( | |
2058 ("media_type", "mediaType"), | |
2059 # XXX: yes "name", cf. [ap_item_2_mb_data] | |
2060 ("desc", "name"), | |
2061 ): | |
2062 value = attachment.get(key) | |
2063 if value: | |
2064 ap_attachment[ap_key] = value | |
2065 ap_attachments.append(ap_attachment) | |
2066 | |
2067 if public: | |
2068 ap_object["to"] = [NS_AP_PUBLIC] | |
2069 if self.auto_mentions: | |
2070 for m in RE_MENTION.finditer(ap_object["content"]): | |
2071 mention = m.group() | |
2072 mentioned = mention[1:] | |
2073 __, m_host = mentioned.split("@", 1) | |
2074 if m_host in (self.public_url, self.client.jid.host): | |
2075 # we ignore mention of local users, they should be sent as XMPP | |
2076 # references | |
2077 continue | |
2078 try: | |
2079 mentioned_id = await self.get_ap_actor_id_from_account(mentioned) | |
2080 except Exception as e: | |
2081 log.warning(f"Can't add mention to {mentioned!r}: {e}") | |
2082 else: | |
2083 ap_object["to"].append(mentioned_id) | |
2084 ap_object.setdefault("tag", []).append({ | |
2085 "type": TYPE_MENTION, | |
2086 "href": mentioned_id, | |
2087 "name": mention, | |
2088 }) | |
2089 try: | |
2090 node = mb_data["node"] | |
2091 service = jid.JID(mb_data["service"]) | |
2092 except KeyError: | |
2093 # node and service must always be specified when this method is used | |
2094 raise exceptions.InternalError( | |
2095 "node or service is missing in mb_data" | |
2096 ) | |
2097 target_ap_account = await self.get_ap_account_from_jid_and_node( | |
2098 service, node | |
2099 ) | |
2100 if self.is_virtual_jid(service): | |
2101 # service is a proxy JID for AP account | |
2102 actor_data = await self.get_ap_actor_data_from_account(target_ap_account) | |
2103 followers = actor_data.get("followers") | |
2104 else: | |
2105 # service is a real XMPP entity | |
2106 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) | |
2107 if followers: | |
2108 ap_object["cc"] = [followers] | |
2109 if self._m.is_comment_node(node): | |
2110 parent_item = self._m.get_parent_item(node) | |
2111 if self.is_virtual_jid(service): | |
2112 # the publication is on a virtual node (i.e. an XMPP node managed by | |
2113 # this gateway and linking to an ActivityPub actor) | |
2114 ap_object["inReplyTo"] = parent_item | |
2115 else: | |
2116 # the publication is from a followed real XMPP node | |
2117 ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node( | |
2118 client, | |
2119 ap_account, | |
2120 parent_item, | |
2121 mb_data | |
2122 ) | |
2123 | |
2124 return self.create_activity( | |
2125 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item | |
2126 ) | |
2127 | |
2128 async def publish_message( | |
2129 self, | |
2130 client: SatXMPPEntity, | |
2131 mess_data: dict, | |
2132 service: jid.JID | |
2133 ) -> None: | |
2134 """Send an AP message | |
2135 | |
2136 .. note:: | |
2137 | |
2138 This is a temporary method used for development only | |
2139 | |
2140 @param mess_data: message data. Following keys must be set: | |
2141 | |
2142 ``node`` | |
2143 identifier of message which is being replied (this will | |
2144 correspond to pubsub node in the future) | |
2145 | |
2146 ``content_xhtml`` or ``content`` | |
2147 message body (respectively in XHTML or plain text) | |
2148 | |
2149 @param service: JID corresponding to the AP actor. | |
2150 """ | |
2151 if not service.user: | |
2152 raise ValueError("service must have a local part") | |
2153 account = self._e.unescape(service.user) | |
2154 ap_actor_data = await self.get_ap_actor_data_from_account(account) | |
2155 | |
2156 try: | |
2157 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] | |
2158 except KeyError: | |
2159 raise exceptions.DataError("Can't get ActivityPub actor inbox") | |
2160 | |
2161 item_data = await self.mb_data_2_ap_item(client, mess_data) | |
2162 url_actor = item_data["actor"] | |
2163 resp = await self.sign_and_post(inbox_url, url_actor, item_data) | |
2164 | |
2165 async def ap_delete_item( | |
2166 self, | |
2167 jid_: jid.JID, | |
2168 node: Optional[str], | |
2169 item_id: str, | |
2170 public: bool = True | |
2171 ) -> Tuple[str, Dict[str, Any]]: | |
2172 """Build activity to delete an AP item | |
2173 | |
2174 @param jid_: JID of the entity deleting an item | |
2175 @param node: node where the item is deleted | |
2176 None if it's microblog or a message | |
2177 @param item_id: ID of the item to delete | |
2178 it's the Pubsub ID or message's origin ID | |
2179 @param public: if True, the activity will be addressed to public namespace | |
2180 @return: actor_id of the entity deleting the item, activity to send | |
2181 """ | |
2182 if node is None: | |
2183 node = self._m.namespace | |
2184 | |
2185 author_account = await self.get_ap_account_from_jid_and_node(jid_, node) | |
2186 author_actor_id = self.build_apurl(TYPE_ACTOR, author_account) | |
2187 | |
2188 items = await self.host.memory.storage.search_pubsub_items({ | |
2189 "profiles": [self.client.profile], | |
2190 "services": [jid_], | |
2191 "names": [item_id] | |
2192 }) | |
2193 if not items: | |
2194 log.warning( | |
2195 f"Deleting an unknown item at service {jid_}, node {node} and id " | |
2196 f"{item_id}" | |
2197 ) | |
2198 else: | |
2199 try: | |
2200 mb_data = await self._m.item_2_mb_data(self.client, items[0].data, jid_, node) | |
2201 if "repeated" in mb_data["extra"]: | |
2202 # we are deleting a repeated item, we must translate this to an | |
2203 # "Undo" of the "Announce" activity instead of a "Delete" one | |
2204 announce = await self.repeated_mb_2_ap_item(mb_data) | |
2205 undo = self.create_activity("Undo", author_actor_id, announce) | |
2206 return author_actor_id, undo | |
2207 except Exception as e: | |
2208 log.debug( | |
2209 f"Can't parse item, maybe it's not a blog item: {e}\n" | |
2210 f"{items[0].toXml()}" | |
2211 ) | |
2212 | |
2213 url_item = self.build_apurl(TYPE_ITEM, author_account, item_id) | |
2214 ap_item = self.create_activity( | |
2215 "Delete", | |
2216 author_actor_id, | |
2217 { | |
2218 "id": url_item, | |
2219 "type": TYPE_TOMBSTONE | |
2220 } | |
2221 ) | |
2222 if public: | |
2223 ap_item["to"] = [NS_AP_PUBLIC] | |
2224 return author_actor_id, ap_item | |
2225 | |
2226 def _message_received_trigger( | |
2227 self, | |
2228 client: SatXMPPEntity, | |
2229 message_elt: domish.Element, | |
2230 post_treat: defer.Deferred | |
2231 ) -> bool: | |
2232 """add the gateway workflow on post treatment""" | |
2233 if self.client is None: | |
2234 log.debug(f"no client set, ignoring message: {message_elt.toXml()}") | |
2235 return True | |
2236 post_treat.addCallback( | |
2237 lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data)) | |
2238 ) | |
2239 return True | |
2240 | |
2241 async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict: | |
2242 """Called once message has been parsed | |
2243 | |
2244 this method handle the conversion to AP items and posting | |
2245 """ | |
2246 if client != self.client: | |
2247 return mess_data | |
2248 if mess_data["type"] not in ("chat", "normal"): | |
2249 log.warning(f"ignoring message with unexpected type: {mess_data}") | |
2250 return mess_data | |
2251 if not self.is_local(mess_data["from"]): | |
2252 log.warning(f"ignoring non local message: {mess_data}") | |
2253 return mess_data | |
2254 if not mess_data["to"].user: | |
2255 log.warning( | |
2256 f"ignoring message addressed to gateway itself: {mess_data}" | |
2257 ) | |
2258 return mess_data | |
2259 | |
2260 actor_account = self._e.unescape(mess_data["to"].user) | |
2261 actor_id = await self.get_ap_actor_id_from_account(actor_account) | |
2262 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) | |
2263 | |
2264 try: | |
2265 language, message = next(iter(mess_data["message"].items())) | |
2266 except (KeyError, StopIteration): | |
2267 log.warning(f"ignoring empty message: {mess_data}") | |
2268 return mess_data | |
2269 | |
2270 mb_data = { | |
2271 "content": message, | |
2272 } | |
2273 if language: | |
2274 mb_data["language"] = language | |
2275 origin_id = mess_data["extra"].get("origin_id") | |
2276 if origin_id: | |
2277 # we need to use origin ID when present to be able to retract the message | |
2278 mb_data["id"] = origin_id | |
2279 attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS) | |
2280 if attachments: | |
2281 mb_data["extra"] = { | |
2282 C.KEY_ATTACHMENTS: attachments | |
2283 } | |
2284 | |
2285 client = self.client.get_virtual_client(mess_data["from"]) | |
2286 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) | |
2287 ap_object = ap_item["object"] | |
2288 ap_object["to"] = ap_item["to"] = [actor_id] | |
2289 # we add a mention to direct message, otherwise peer is not notified in some AP | |
2290 # implementations (notably Mastodon), and the message may be missed easily. | |
2291 ap_object.setdefault("tag", []).append({ | |
2292 "type": TYPE_MENTION, | |
2293 "href": actor_id, | |
2294 "name": f"@{actor_account}", | |
2295 }) | |
2296 | |
2297 await self.sign_and_post(inbox, ap_item["actor"], ap_item) | |
2298 return mess_data | |
2299 | |
2300 async def _on_message_retract( | |
2301 self, | |
2302 client: SatXMPPEntity, | |
2303 message_elt: domish.Element, | |
2304 retract_elt: domish.Element, | |
2305 fastened_elts | |
2306 ) -> bool: | |
2307 if client != self.client: | |
2308 return True | |
2309 from_jid = jid.JID(message_elt["from"]) | |
2310 if not self.is_local(from_jid): | |
2311 log.debug( | |
2312 f"ignoring retract request from non local jid {from_jid}" | |
2313 ) | |
2314 return False | |
2315 to_jid = jid.JID(message_elt["to"]) | |
2316 if (to_jid.host != self.client.jid.full() or not to_jid.user): | |
2317 # to_jid should be a virtual JID from this gateway | |
2318 raise exceptions.InternalError( | |
2319 f"Invalid destinee's JID: {to_jid.full()}" | |
2320 ) | |
2321 ap_account = self._e.unescape(to_jid.user) | |
2322 actor_id = await self.get_ap_actor_id_from_account(ap_account) | |
2323 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) | |
2324 url_actor, ap_item = await self.ap_delete_item( | |
2325 from_jid.userhostJID(), None, fastened_elts.id, public=False | |
2326 ) | |
2327 resp = await self.sign_and_post(inbox, url_actor, ap_item) | |
2328 return False | |
2329 | |
2330 async def _on_reference_received( | |
2331 self, | |
2332 client: SatXMPPEntity, | |
2333 message_elt: domish.Element, | |
2334 reference_data: Dict[str, Union[str, int]] | |
2335 ) -> bool: | |
2336 parsed_uri: dict = reference_data.get("parsed_uri") | |
2337 if not parsed_uri: | |
2338 log.warning(f"no parsed URI available in reference {reference_data}") | |
2339 return False | |
2340 | |
2341 try: | |
2342 mentioned = jid.JID(parsed_uri["path"]) | |
2343 except RuntimeError: | |
2344 log.warning(f"invalid target: {reference_data['uri']}") | |
2345 return False | |
2346 | |
2347 if mentioned.host != self.client.jid.full() or not mentioned.user: | |
2348 log.warning( | |
2349 f"ignoring mentioned user {mentioned}, it's not a JID mapping an AP " | |
2350 "account" | |
2351 ) | |
2352 return False | |
2353 | |
2354 ap_account = self._e.unescape(mentioned.user) | |
2355 actor_id = await self.get_ap_actor_id_from_account(ap_account) | |
2356 | |
2357 parsed_anchor: dict = reference_data.get("parsed_anchor") | |
2358 if not parsed_anchor: | |
2359 log.warning(f"no XMPP anchor, ignoring reference {reference_data!r}") | |
2360 return False | |
2361 | |
2362 if parsed_anchor["type"] != "pubsub": | |
2363 log.warning( | |
2364 f"ignoring reference with non pubsub anchor, this is not supported: " | |
2365 "{reference_data!r}" | |
2366 ) | |
2367 return False | |
2368 | |
2369 try: | |
2370 pubsub_service = jid.JID(parsed_anchor["path"]) | |
2371 except RuntimeError: | |
2372 log.warning(f"invalid anchor: {reference_data['anchor']}") | |
2373 return False | |
2374 pubsub_node = parsed_anchor.get("node") | |
2375 if not pubsub_node: | |
2376 log.warning(f"missing pubsub node in anchor: {reference_data['anchor']}") | |
2377 return False | |
2378 pubsub_item = parsed_anchor.get("item") | |
2379 if not pubsub_item: | |
2380 log.warning(f"missing pubsub item in anchor: {reference_data['anchor']}") | |
2381 return False | |
2382 | |
2383 cached_node = await self.host.memory.storage.get_pubsub_node( | |
2384 client, pubsub_service, pubsub_node | |
2385 ) | |
2386 if not cached_node: | |
2387 log.warning(f"Anchored node not found in cache: {reference_data['anchor']}") | |
2388 return False | |
2389 | |
2390 cached_items, __ = await self.host.memory.storage.get_items( | |
2391 cached_node, item_ids=[pubsub_item] | |
2392 ) | |
2393 if not cached_items: | |
2394 log.warning( | |
2395 f"Anchored pubsub item not found in cache: {reference_data['anchor']}" | |
2396 ) | |
2397 return False | |
2398 | |
2399 cached_item = cached_items[0] | |
2400 | |
2401 mb_data = await self._m.item_2_mb_data( | |
2402 client, cached_item.data, pubsub_service, pubsub_node | |
2403 ) | |
2404 ap_item = await self.mb_data_2_ap_item(client, mb_data) | |
2405 ap_object = ap_item["object"] | |
2406 ap_object["to"] = [actor_id] | |
2407 ap_object.setdefault("tag", []).append({ | |
2408 "type": TYPE_MENTION, | |
2409 "href": actor_id, | |
2410 "name": ap_account, | |
2411 }) | |
2412 | |
2413 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False) | |
2414 | |
2415 resp = await self.sign_and_post(inbox, ap_item["actor"], ap_item) | |
2416 | |
2417 return False | |
2418 | |
2419 async def new_reply_to_xmpp_item( | |
2420 self, | |
2421 client: SatXMPPEntity, | |
2422 ap_item: dict, | |
2423 targets: Dict[str, Set[str]], | |
2424 mentions: List[Dict[str, str]], | |
2425 ) -> None: | |
2426 """We got an AP item which is a reply to an XMPP item""" | |
2427 in_reply_to = ap_item["inReplyTo"] | |
2428 url_type, url_args = self.parse_apurl(in_reply_to) | |
2429 if url_type != "item": | |
2430 log.warning( | |
2431 "Ignoring AP item replying to an XMPP item with an unexpected URL " | |
2432 f"type({url_type!r}):\n{pformat(ap_item)}" | |
2433 ) | |
2434 return | |
2435 try: | |
2436 parent_item_account, parent_item_id = url_args[0], '/'.join(url_args[1:]) | |
2437 except (IndexError, ValueError): | |
2438 log.warning( | |
2439 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " | |
2440 f"({in_reply_to!r}):\n{pformat(ap_item)}" | |
2441 ) | |
2442 return | |
2443 parent_item_service, parent_item_node = await self.get_jid_and_node( | |
2444 parent_item_account | |
2445 ) | |
2446 if parent_item_node is None: | |
2447 parent_item_node = self._m.namespace | |
2448 items, __ = await self._p.get_items( | |
2449 client, parent_item_service, parent_item_node, item_ids=[parent_item_id] | |
2450 ) | |
2451 try: | |
2452 parent_item_elt = items[0] | |
2453 except IndexError: | |
2454 log.warning( | |
2455 f"Can't find parent item at {parent_item_service} (node " | |
2456 f"{parent_item_node!r})\n{pformat(ap_item)}") | |
2457 return | |
2458 parent_item_parsed = await self._m.item_2_mb_data( | |
2459 client, parent_item_elt, parent_item_service, parent_item_node | |
2460 ) | |
2461 try: | |
2462 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"]) | |
2463 comment_node = parent_item_parsed["comments"][0]["node"] | |
2464 except (KeyError, IndexError): | |
2465 # we don't have a comment node set for this item | |
2466 from libervia.backend.tools.xml_tools import pp_elt | |
2467 log.info(f"{pp_elt(parent_item_elt.toXml())}") | |
2468 raise NotImplementedError() | |
2469 else: | |
2470 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) | |
2471 await self._p.publish(client, comment_service, comment_node, [item_elt]) | |
2472 await self.notify_mentions( | |
2473 targets, mentions, comment_service, comment_node, item_elt["id"] | |
2474 ) | |
2475 | |
2476 def get_ap_item_targets( | |
2477 self, | |
2478 item: Dict[str, Any] | |
2479 ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]: | |
2480 """Retrieve targets of an AP item, and indicate if it's a public one | |
2481 | |
2482 @param item: AP object payload | |
2483 @return: Are returned: | |
2484 - is_public flag, indicating if the item is world-readable | |
2485 - a dict mapping target type to targets | |
2486 """ | |
2487 targets: Dict[str, Set[str]] = {} | |
2488 is_public = False | |
2489 # TODO: handle "audience" | |
2490 for key in ("to", "bto", "cc", "bcc"): | |
2491 values = item.get(key) | |
2492 if not values: | |
2493 continue | |
2494 if isinstance(values, str): | |
2495 values = [values] | |
2496 for value in values: | |
2497 if value in PUBLIC_TUPLE: | |
2498 is_public = True | |
2499 continue | |
2500 if not value: | |
2501 continue | |
2502 if not self.is_local_url(value): | |
2503 continue | |
2504 target_type = self.parse_apurl(value)[0] | |
2505 if target_type != TYPE_ACTOR: | |
2506 log.debug(f"ignoring non actor type as a target: {href}") | |
2507 else: | |
2508 targets.setdefault(target_type, set()).add(value) | |
2509 | |
2510 mentions = [] | |
2511 tags = item.get("tag") | |
2512 if tags: | |
2513 for tag in tags: | |
2514 if tag.get("type") != TYPE_MENTION: | |
2515 continue | |
2516 href = tag.get("href") | |
2517 if not href: | |
2518 log.warning('Missing "href" field from mention object: {tag!r}') | |
2519 continue | |
2520 if not self.is_local_url(href): | |
2521 continue | |
2522 uri_type = self.parse_apurl(href)[0] | |
2523 if uri_type != TYPE_ACTOR: | |
2524 log.debug(f"ignoring non actor URI as a target: {href}") | |
2525 continue | |
2526 mention = {"uri": href} | |
2527 mentions.append(mention) | |
2528 name = tag.get("name") | |
2529 if name: | |
2530 mention["content"] = name | |
2531 | |
2532 return is_public, targets, mentions | |
2533 | |
2534 async def new_ap_item( | |
2535 self, | |
2536 client: SatXMPPEntity, | |
2537 destinee: Optional[jid.JID], | |
2538 node: str, | |
2539 item: dict, | |
2540 ) -> None: | |
2541 """Analyse, cache and send notification for received AP item | |
2542 | |
2543 @param destinee: jid of the destinee, | |
2544 @param node: XMPP pubsub node | |
2545 @param item: AP object payload | |
2546 """ | |
2547 is_public, targets, mentions = self.get_ap_item_targets(item) | |
2548 if not is_public and targets.keys() == {TYPE_ACTOR}: | |
2549 # this is a direct message | |
2550 await self.handle_message_ap_item( | |
2551 client, targets, mentions, destinee, item | |
2552 ) | |
2553 else: | |
2554 await self.handle_pubsub_ap_item( | |
2555 client, targets, mentions, destinee, node, item, is_public | |
2556 ) | |
2557 | |
2558 async def handle_message_ap_item( | |
2559 self, | |
2560 client: SatXMPPEntity, | |
2561 targets: Dict[str, Set[str]], | |
2562 mentions: List[Dict[str, str]], | |
2563 destinee: Optional[jid.JID], | |
2564 item: dict, | |
2565 ) -> None: | |
2566 """Parse and deliver direct AP items translating to XMPP messages | |
2567 | |
2568 @param targets: actors where the item must be delivered | |
2569 @param destinee: jid of the destinee, | |
2570 @param item: AP object payload | |
2571 """ | |
2572 targets_jids = { | |
2573 await self.get_jid_from_id(t) | |
2574 for t_set in targets.values() | |
2575 for t in t_set | |
2576 } | |
2577 if destinee is not None: | |
2578 targets_jids.add(destinee) | |
2579 mb_data = await self.ap_item_2_mb_data(item) | |
2580 extra = { | |
2581 "origin_id": mb_data["id"] | |
2582 } | |
2583 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) | |
2584 if attachments: | |
2585 extra[C.KEY_ATTACHMENTS] = attachments | |
2586 | |
2587 defer_l = [] | |
2588 for target_jid in targets_jids: | |
2589 defer_l.append( | |
2590 client.sendMessage( | |
2591 target_jid, | |
2592 {'': mb_data.get("content", "")}, | |
2593 mb_data.get("title"), | |
2594 extra=extra | |
2595 ) | |
2596 ) | |
2597 await defer.DeferredList(defer_l) | |
2598 | |
2599 async def notify_mentions( | |
2600 self, | |
2601 targets: Dict[str, Set[str]], | |
2602 mentions: List[Dict[str, str]], | |
2603 service: jid.JID, | |
2604 node: str, | |
2605 item_id: str, | |
2606 ) -> None: | |
2607 """Send mention notifications to recipients and mentioned entities | |
2608 | |
2609 XEP-0372 (References) is used. | |
2610 | |
2611 Mentions are also sent to recipients as they are primary audience (see | |
2612 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). | |
2613 | |
2614 """ | |
2615 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) | |
2616 seen = set() | |
2617 # we start with explicit mentions because mentions' content will be used in the | |
2618 # future to fill "begin" and "end" reference attributes (we can't do it at the | |
2619 # moment as there is no way to specify the XML element to use in the blog item). | |
2620 for mention in mentions: | |
2621 mentioned_jid = await self.get_jid_from_id(mention["uri"]) | |
2622 self._refs.send_reference( | |
2623 self.client, | |
2624 to_jid=mentioned_jid, | |
2625 anchor=anchor | |
2626 ) | |
2627 seen.add(mentioned_jid) | |
2628 | |
2629 remaining = { | |
2630 await self.get_jid_from_id(t) | |
2631 for t_set in targets.values() | |
2632 for t in t_set | |
2633 } - seen | |
2634 for target in remaining: | |
2635 self._refs.send_reference( | |
2636 self.client, | |
2637 to_jid=target, | |
2638 anchor=anchor | |
2639 ) | |
2640 | |
2641 async def handle_pubsub_ap_item( | |
2642 self, | |
2643 client: SatXMPPEntity, | |
2644 targets: Dict[str, Set[str]], | |
2645 mentions: List[Dict[str, str]], | |
2646 destinee: Optional[jid.JID], | |
2647 node: str, | |
2648 item: dict, | |
2649 public: bool | |
2650 ) -> None: | |
2651 """Analyse, cache and deliver AP items translating to Pubsub | |
2652 | |
2653 @param targets: actors/collections where the item must be delivered | |
2654 @param destinee: jid of the destinee, | |
2655 @param node: XMPP pubsub node | |
2656 @param item: AP object payload | |
2657 @param public: True if the item is public | |
2658 """ | |
2659 # XXX: "public" is not used for now | |
2660 service = client.jid | |
2661 in_reply_to = item.get("inReplyTo") | |
2662 | |
2663 if in_reply_to and isinstance(in_reply_to, list): | |
2664 in_reply_to = in_reply_to[0] | |
2665 if in_reply_to and isinstance(in_reply_to, str): | |
2666 if self.is_local_url(in_reply_to): | |
2667 # this is a reply to an XMPP item | |
2668 await self.new_reply_to_xmpp_item(client, item, targets, mentions) | |
2669 return | |
2670 | |
2671 # this item is a reply to an AP item, we use or create a corresponding node | |
2672 # for comments | |
2673 parent_node, __ = await self.get_comments_nodes(item["id"], in_reply_to) | |
2674 node = parent_node or node | |
2675 cached_node = await self.host.memory.storage.get_pubsub_node( | |
2676 client, service, node, with_subscriptions=True, create=True, | |
2677 create_kwargs={"subscribed": True} | |
2678 ) | |
2679 else: | |
2680 # it is a root item (i.e. not a reply to an other item) | |
2681 create = node == self._events.namespace | |
2682 cached_node = await self.host.memory.storage.get_pubsub_node( | |
2683 client, service, node, with_subscriptions=True, create=create | |
2684 ) | |
2685 if cached_node is None: | |
2686 log.warning( | |
2687 f"Received item in unknown node {node!r} at {service}. This may be " | |
2688 f"due to a cache purge. We synchronise the node\n{item}" | |
2689 | |
2690 ) | |
2691 return | |
2692 if item.get("type") == TYPE_EVENT: | |
2693 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) | |
2694 else: | |
2695 data, item_elt = await self.ap_item_2_mb_data_and_elt(item) | |
2696 await self.host.memory.storage.cache_pubsub_items( | |
2697 client, | |
2698 cached_node, | |
2699 [item_elt], | |
2700 [data] | |
2701 ) | |
2702 | |
2703 for subscription in cached_node.subscriptions: | |
2704 if subscription.state != SubscriptionState.SUBSCRIBED: | |
2705 continue | |
2706 self.pubsub_service.notifyPublish( | |
2707 service, | |
2708 node, | |
2709 [(subscription.subscriber, None, [item_elt])] | |
2710 ) | |
2711 | |
2712 await self.notify_mentions(targets, mentions, service, node, item_elt["id"]) | |
2713 | |
2714 async def new_ap_delete_item( | |
2715 self, | |
2716 client: SatXMPPEntity, | |
2717 destinee: Optional[jid.JID], | |
2718 node: str, | |
2719 item: dict, | |
2720 ) -> None: | |
2721 """Analyse, cache and send notification for received AP item | |
2722 | |
2723 @param destinee: jid of the destinee, | |
2724 @param node: XMPP pubsub node | |
2725 @param activity: parent AP activity | |
2726 @param item: AP object payload | |
2727 only the "id" field is used | |
2728 """ | |
2729 item_id = item.get("id") | |
2730 if not item_id: | |
2731 raise exceptions.DataError('"id" attribute is missing in item') | |
2732 if not item_id.startswith("http"): | |
2733 raise exceptions.DataError(f"invalid id: {item_id!r}") | |
2734 if self.is_local_url(item_id): | |
2735 raise ValueError("Local IDs should not be used") | |
2736 | |
2737 # we have no way to know if a deleted item is a direct one (thus a message) or one | |
2738 # converted to pubsub. We check if the id is in message history to decide what to | |
2739 # do. | |
2740 history = await self.host.memory.storage.get( | |
2741 client, | |
2742 History, | |
2743 History.origin_id, | |
2744 item_id, | |
2745 (History.messages, History.subjects) | |
2746 ) | |
2747 | |
2748 if history is not None: | |
2749 # it's a direct message | |
2750 if history.source_jid != client.jid: | |
2751 log.warning( | |
2752 f"retraction received from an entity ''{client.jid}) which is " | |
2753 f"not the original sender of the message ({history.source_jid}), " | |
2754 "hack attemps?" | |
2755 ) | |
2756 raise exceptions.PermissionError("forbidden") | |
2757 | |
2758 await self._r.retract_by_history(client, history) | |
2759 else: | |
2760 # no history in cache with this ID, it's probably a pubsub item | |
2761 cached_node = await self.host.memory.storage.get_pubsub_node( | |
2762 client, client.jid, node, with_subscriptions=True | |
2763 ) | |
2764 if cached_node is None: | |
2765 log.warning( | |
2766 f"Received an item retract for node {node!r} at {client.jid} " | |
2767 "which is not cached" | |
2768 ) | |
2769 raise exceptions.NotFound | |
2770 await self.host.memory.storage.delete_pubsub_items(cached_node, [item_id]) | |
2771 # notifyRetract is expecting domish.Element instances | |
2772 item_elt = domish.Element((None, "item")) | |
2773 item_elt["id"] = item_id | |
2774 for subscription in cached_node.subscriptions: | |
2775 if subscription.state != SubscriptionState.SUBSCRIBED: | |
2776 continue | |
2777 self.pubsub_service.notifyRetract( | |
2778 client.jid, | |
2779 node, | |
2780 [(subscription.subscriber, None, [item_elt])] | |
2781 ) |