comparison libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_comp_ap_gateway/__init__.py@c23cad65ae99
children c3b68fdc2de7
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia ActivityPub Gateway
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import base64
20 import calendar
21 import hashlib
22 import json
23 from pathlib import Path
24 from pprint import pformat
25 import re
26 from typing import (
27 Any,
28 Awaitable,
29 Callable,
30 Dict,
31 List,
32 Optional,
33 Set,
34 Tuple,
35 Union,
36 overload,
37 )
38 from urllib import parse
39
40 from cryptography.exceptions import InvalidSignature
41 from cryptography.hazmat.primitives import serialization
42 from cryptography.hazmat.primitives import hashes
43 from cryptography.hazmat.primitives.asymmetric import rsa
44 from cryptography.hazmat.primitives.asymmetric import padding
45 import dateutil
46 from dateutil.parser import parserinfo
47 import shortuuid
48 from sqlalchemy.exc import IntegrityError
49 import treq
50 from treq.response import _Response as TReqResponse
51 from twisted.internet import defer, reactor, threads
52 from twisted.web import http
53 from twisted.words.protocols.jabber import error, jid
54 from twisted.words.xish import domish
55 from wokkel import pubsub, rsm
56
57 from libervia.backend.core import exceptions
58 from libervia.backend.core.constants import Const as C
59 from libervia.backend.core.core_types import SatXMPPEntity
60 from libervia.backend.core.i18n import _
61 from libervia.backend.core.log import getLogger
62 from libervia.backend.memory import persistent
63 from libervia.backend.memory.sqla_mapping import History, SubscriptionState
64 from libervia.backend.tools import utils
65 from libervia.backend.tools.common import data_format, date_utils, tls, uri
66 from libervia.backend.tools.common.async_utils import async_lru
67
68 from .ad_hoc import APAdHocService
69 from .events import APEvents
70 from .constants import (
71 ACTIVITY_OBJECT_MANDATORY,
72 ACTIVITY_TARGET_MANDATORY,
73 ACTIVITY_TYPES,
74 ACTIVITY_TYPES_LOWER,
75 COMMENTS_MAX_PARENTS,
76 CONF_SECTION,
77 IMPORT_NAME,
78 LRU_MAX_SIZE,
79 MEDIA_TYPE_AP,
80 NS_AP,
81 NS_AP_PUBLIC,
82 PUBLIC_TUPLE,
83 TYPE_ACTOR,
84 TYPE_EVENT,
85 TYPE_FOLLOWERS,
86 TYPE_ITEM,
87 TYPE_LIKE,
88 TYPE_MENTION,
89 TYPE_REACTION,
90 TYPE_TOMBSTONE,
91 TYPE_JOIN,
92 TYPE_LEAVE
93 )
94 from .http_server import HTTPServer
95 from .pubsub_service import APPubsubService
96 from .regex import RE_MENTION
97
98
99 log = getLogger(__name__)
100
101 IMPORT_NAME = "ap-gateway"
102
103 PLUGIN_INFO = {
104 C.PI_NAME: "ActivityPub Gateway component",
105 C.PI_IMPORT_NAME: IMPORT_NAME,
106 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
107 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
108 C.PI_PROTOCOLS: [],
109 C.PI_DEPENDENCIES: [
110 "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277",
111 "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470",
112 "XEP-0447", "XEP-0471", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY"
113 ],
114 C.PI_RECOMMENDATIONS: [],
115 C.PI_MAIN: "APGateway",
116 C.PI_HANDLER: C.BOOL_TRUE,
117 C.PI_DESCRIPTION: _(
118 "Gateway for bidirectional communication between XMPP and ActivityPub."
119 ),
120 }
121
122 HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})"
123 RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}")
124 RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}")
125 RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$")
126
127
128 class APGateway:
129 IMPORT_NAME = IMPORT_NAME
130 # show data send or received through HTTP, used for debugging
131 # 1: log POST objects
132 # 2: log POST and GET objects
133 # 3: log POST and GET objects with HTTP headers for GET requests
134 verbose = 0
135
136 def __init__(self, host):
137 self.host = host
138 self.initialised = False
139 self.client = None
140 self._p = host.plugins["XEP-0060"]
141 self._a = host.plugins["XEP-0084"]
142 self._e = host.plugins["XEP-0106"]
143 self._m = host.plugins["XEP-0277"]
144 self._v = host.plugins["XEP-0292"]
145 self._refs = host.plugins["XEP-0372"]
146 self._r = host.plugins["XEP-0424"]
147 self._sfs = host.plugins["XEP-0447"]
148 self._pps = host.plugins["XEP-0465"]
149 self._pa = host.plugins["XEP-0470"]
150 self._c = host.plugins["PUBSUB_CACHE"]
151 self._t = host.plugins["TEXT_SYNTAXES"]
152 self._i = host.plugins["IDENTITY"]
153 self._events = host.plugins["XEP-0471"]
154 self._p.add_managed_node(
155 "",
156 items_cb=self._items_received,
157 # we want to be sure that the callbacks are launched before pubsub cache's
158 # one, as we need to inspect items before they are actually removed from cache
159 # or updated
160 priority=1000
161 )
162 self.pubsub_service = APPubsubService(self)
163 self.ad_hoc = APAdHocService(self)
164 self.ap_events = APEvents(self)
165 host.trigger.add("message_received", self._message_received_trigger, priority=-1000)
166 host.trigger.add("XEP-0424_retractReceived", self._on_message_retract)
167 host.trigger.add("XEP-0372_ref_received", self._on_reference_received)
168
169 host.bridge.add_method(
170 "ap_send",
171 ".plugin",
172 in_sign="sss",
173 out_sign="",
174 method=self._publish_message,
175 async_=True,
176 )
177
178 def get_handler(self, __):
179 return self.pubsub_service
180
181 async def init(self, client):
182 if self.initialised:
183 return
184
185 self.initialised = True
186 log.info(_("ActivityPub Gateway initialization"))
187
188 # RSA keys
189 stored_data = await self.host.memory.storage.get_privates(
190 IMPORT_NAME, ["rsa_key"], profile=client.profile
191 )
192 private_key_pem = stored_data.get("rsa_key")
193 if private_key_pem is None:
194 self.private_key = await threads.deferToThread(
195 rsa.generate_private_key,
196 public_exponent=65537,
197 key_size=4096,
198 )
199 private_key_pem = self.private_key.private_bytes(
200 encoding=serialization.Encoding.PEM,
201 format=serialization.PrivateFormat.PKCS8,
202 encryption_algorithm=serialization.NoEncryption()
203 ).decode()
204 await self.host.memory.storage.set_private_value(
205 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
206 )
207 else:
208 self.private_key = serialization.load_pem_private_key(
209 private_key_pem.encode(),
210 password=None,
211 )
212 self.public_key = self.private_key.public_key()
213 self.public_key_pem = self.public_key.public_bytes(
214 encoding=serialization.Encoding.PEM,
215 format=serialization.PublicFormat.SubjectPublicKeyInfo
216 ).decode()
217
218 # params
219 # URL and port
220 self.public_url = self.host.memory.config_get(
221 CONF_SECTION, "public_url"
222 ) or self.host.memory.config_get(
223 CONF_SECTION, "xmpp_domain"
224 )
225 if self.public_url is None:
226 log.error(
227 '"public_url" not set in configuration, this is mandatory to have'
228 "ActivityPub Gateway running. Please set this option it to public facing "
229 f"url in {CONF_SECTION!r} configuration section."
230 )
231 return
232 if parse.urlparse(self.public_url).scheme:
233 log.error(
234 "Scheme must not be specified in \"public_url\", please remove it from "
235 "\"public_url\" configuration option. ActivityPub Gateway won't be run."
236 )
237 return
238 self.http_port = int(self.host.memory.config_get(
239 CONF_SECTION, 'http_port', 8123))
240 connection_type = self.host.memory.config_get(
241 CONF_SECTION, 'http_connection_type', 'https')
242 if connection_type not in ('http', 'https'):
243 raise exceptions.ConfigError(
244 'bad ap-gateay http_connection_type, you must use one of "http" or '
245 '"https"'
246 )
247 self.max_items = int(self.host.memory.config_get(
248 CONF_SECTION, 'new_node_max_items', 50
249
250 ))
251 self.comments_max_depth = int(self.host.memory.config_get(
252 CONF_SECTION, 'comments_max_depth', 0
253 ))
254 self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap')
255 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
256 # True (default) if we provide gateway only to entities/services from our server
257 self.local_only = C.bool(
258 self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE)
259 )
260 # if True (default), mention will be parsed in non-private content coming from
261 # XMPP. This is necessary as XEP-0372 are coming separately from item where the
262 # mention is done, which is hard to impossible to translate to ActivityPub (where
263 # mention specified inside the item directly). See documentation for details.
264 self.auto_mentions = C.bool(
265 self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE)
266 )
267
268 html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get(
269 CONF_SECTION, 'html_redirect_dict', {}
270 )
271 self.html_redirect: Dict[str, List[dict]] = {}
272 for url_type, target in html_redirect.items():
273 if isinstance(target, str):
274 target = {"url": target}
275 elif not isinstance(target, dict):
276 raise exceptions.ConfigError(
277 f"html_redirect target must be a URL or a dict, not {target!r}"
278 )
279 filters = target.setdefault("filters", {})
280 if "url" not in target:
281 log.warning(f"invalid HTML redirection, missing target URL: {target}")
282 continue
283 # a slash in the url_type is a syntactic shortcut to have a node filter
284 if "/" in url_type:
285 url_type, node_filter = url_type.split("/", 1)
286 filters["node"] = node_filter
287 self.html_redirect.setdefault(url_type, []).append(target)
288
289 # HTTP server launch
290 self.server = HTTPServer(self)
291 if connection_type == 'http':
292 reactor.listenTCP(self.http_port, self.server)
293 else:
294 options = tls.get_options_from_config(
295 self.host.memory.config, CONF_SECTION)
296 tls.tls_options_check(options)
297 context_factory = tls.get_tls_context_factory(options)
298 reactor.listenSSL(self.http_port, self.server, context_factory)
299
300 async def profile_connecting(self, client):
301 self.client = client
302 client.sendHistory = True
303 client._ap_storage = persistent.LazyPersistentBinaryDict(
304 IMPORT_NAME,
305 client.profile
306 )
307 await self.init(client)
308
309 def profile_connected(self, client):
310 self.ad_hoc.init(client)
311
312 async def _items_received(
313 self,
314 client: SatXMPPEntity,
315 itemsEvent: pubsub.ItemsEvent
316 ) -> None:
317 """Callback called when pubsub items are received
318
319 if the items are adressed to a JID corresponding to an AP actor, they are
320 converted to AP items and sent to the corresponding AP server.
321
322 If comments nodes are linked, they are automatically subscribed to get new items
323 from there too.
324 """
325 if client != self.client:
326 return
327 # we need recipient as JID and not gateway own JID to be able to use methods such
328 # as "subscribe"
329 client = self.client.get_virtual_client(itemsEvent.sender)
330 recipient = itemsEvent.recipient
331 if not recipient.user:
332 log.debug("ignoring items event without local part specified")
333 return
334
335 ap_account = self._e.unescape(recipient.user)
336
337 if self._pa.is_attachment_node(itemsEvent.nodeIdentifier):
338 await self.convert_and_post_attachments(
339 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
340 itemsEvent.items
341 )
342 else:
343 await self.convert_and_post_items(
344 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
345 itemsEvent.items
346 )
347
348 async def get_virtual_client(self, actor_id: str) -> SatXMPPEntity:
349 """Get client for this component with a specified jid
350
351 This is needed to perform operations with the virtual JID corresponding to the AP
352 actor instead of the JID of the gateway itself.
353 @param actor_id: ID of the actor
354 @return: virtual client
355 """
356 local_jid = await self.get_jid_from_id(actor_id)
357 return self.client.get_virtual_client(local_jid)
358
359 def is_activity(self, data: dict) -> bool:
360 """Return True if the data has an activity type"""
361 try:
362 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
363 except (KeyError, TypeError):
364 return False
365
366 async def ap_get(self, url: str) -> dict:
367 """Retrieve AP JSON from given URL
368
369 @raise error.StanzaError: "service-unavailable" is sent when something went wrong
370 with AP server
371 """
372 resp = await treq.get(
373 url,
374 headers = {
375 "Accept": [MEDIA_TYPE_AP],
376 "Content-Type": [MEDIA_TYPE_AP],
377 }
378 )
379 if resp.code >= 300:
380 text = await resp.text()
381 if resp.code == 404:
382 raise exceptions.NotFound(f"Can't find resource at {url}")
383 else:
384 msg = f"HTTP error {resp.code} (url: {url}): {text}"
385 raise exceptions.ExternalRequestError(msg)
386 try:
387 return await treq.json_content(resp)
388 except Exception as e:
389 raise error.StanzaError(
390 "service-unavailable",
391 text=f"Can't get AP data at {url}: {e}"
392 )
393
394 @overload
395 async def ap_get_object(self, data: dict, key: str) -> Optional[dict]:
396 ...
397
398 @overload
399 async def ap_get_object(
400 self, data: Union[str, dict], key: None = None
401 ) -> dict:
402 ...
403
404 async def ap_get_object(self, data, key = None):
405 """Retrieve an AP object, dereferencing when necessary
406
407 This method is to be used with attributes marked as "Functional" in
408 https://www.w3.org/TR/activitystreams-vocabulary
409 @param data: AP object where an other object is looked for, or the object itself
410 @param key: name of the object to look for, or None if data is the object directly
411 @return: found object if any
412 """
413 if key is not None:
414 value = data.get(key)
415 else:
416 value = data
417 if value is None:
418 if key is None:
419 raise ValueError("None can't be used with ap_get_object is key is None")
420 return None
421 elif isinstance(value, dict):
422 return value
423 elif isinstance(value, str):
424 if self.is_local_url(value):
425 return await self.ap_get_local_object(value)
426 else:
427 return await self.ap_get(value)
428 else:
429 raise NotImplementedError(
430 "was expecting a string or a dict, got {type(value)}: {value!r}}"
431 )
432
433 async def ap_get_local_object(
434 self,
435 url: str
436 ) -> dict:
437 """Retrieve or generate local object
438
439 for now, only handle XMPP items to convert to AP
440 """
441 url_type, url_args = self.parse_apurl(url)
442 if url_type == TYPE_ITEM:
443 try:
444 account, item_id = url_args
445 except ValueError:
446 raise ValueError(f"invalid URL: {url}")
447 author_jid, node = await self.get_jid_and_node(account)
448 if node is None:
449 node = self._m.namespace
450 cached_node = await self.host.memory.storage.get_pubsub_node(
451 self.client, author_jid, node
452 )
453 if not cached_node:
454 log.debug(f"node {node!r} at {author_jid} is not found in cache")
455 found_item = None
456 else:
457 cached_items, __ = await self.host.memory.storage.get_items(
458 cached_node, item_ids=[item_id]
459 )
460 if not cached_items:
461 log.debug(
462 f"item {item_id!r} of {node!r} at {author_jid} is not found in "
463 "cache"
464 )
465 found_item = None
466 else:
467 found_item = cached_items[0].data
468
469 if found_item is None:
470 # the node is not in cache, we have to make a request to retrieve the item
471 # If the node doesn't exist, get_items will raise a NotFound exception
472 found_items, __ = await self._p.get_items(
473 self.client, author_jid, node, item_ids=[item_id]
474 )
475 try:
476 found_item = found_items[0]
477 except IndexError:
478 raise exceptions.NotFound(f"requested item at {url} can't be found")
479
480 if node.startswith(self._events.namespace):
481 # this is an event
482 event_data = self._events.event_elt_2_event_data(found_item)
483 ap_item = await self.ap_events.event_data_2_ap_item(
484 event_data, author_jid
485 )
486 # the URL must return the object and not the activity
487 ap_item["object"]["@context"] = ap_item["@context"]
488 return ap_item["object"]
489 else:
490 # this is a blog item
491 mb_data = await self._m.item_2_mb_data(
492 self.client, found_item, author_jid, node
493 )
494 ap_item = await self.mb_data_2_ap_item(self.client, mb_data)
495 # the URL must return the object and not the activity
496 return ap_item["object"]
497 else:
498 raise NotImplementedError(
499 'only object from "item" URLs can be retrieved for now'
500 )
501
502 async def ap_get_list(
503 self,
504 data: dict,
505 key: str,
506 only_ids: bool = False
507 ) -> Optional[List[Dict[str, Any]]]:
508 """Retrieve a list of objects from AP data, dereferencing when necessary
509
510 This method is to be used with non functional vocabularies. Use ``ap_get_object``
511 otherwise.
512 If the value is a dictionary, it will be wrapped in a list
513 @param data: AP object where a list of objects is looked for
514 @param key: key of the list to look for
515 @param only_ids: if Trye, only items IDs are retrieved
516 @return: list of objects, or None if the key is not present
517 """
518 value = data.get(key)
519 if value is None:
520 return None
521 elif isinstance(value, str):
522 if self.is_local_url(value):
523 value = await self.ap_get_local_object(value)
524 else:
525 value = await self.ap_get(value)
526 if isinstance(value, dict):
527 return [value]
528 if not isinstance(value, list):
529 raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
530 if only_ids:
531 return [
532 {"id": v["id"]} if isinstance(v, dict) else {"id": v}
533 for v in value
534 ]
535 else:
536 return [await self.ap_get_object(i) for i in value]
537
538 async def ap_get_actors(
539 self,
540 data: dict,
541 key: str,
542 as_account: bool = True
543 ) -> List[str]:
544 """Retrieve AP actors from data
545
546 @param data: AP object containing a field with actors
547 @param key: field to use to retrieve actors
548 @param as_account: if True returns account handles, otherwise will return actor
549 IDs
550 @raise exceptions.DataError: there is not actor data or it is invalid
551 """
552 value = data.get(key)
553 if value is None:
554 raise exceptions.DataError(
555 f"no actor associated to object {data.get('id')!r}"
556 )
557 elif isinstance(value, dict):
558 actor_id = value.get("id")
559 if actor_id is None:
560 raise exceptions.DataError(
561 f"invalid actor associated to object {data.get('id')!r}: {value!r}"
562 )
563 value = [actor_id]
564 elif isinstance(value, str):
565 value = [value]
566 elif isinstance(value, list):
567 try:
568 value = [a if isinstance(a, str) else a["id"] for a in value]
569 except (TypeError, KeyError):
570 raise exceptions.DataError(
571 f"invalid actors list to object {data.get('id')!r}: {value!r}"
572 )
573 if not value:
574 raise exceptions.DataError(
575 f"list of actors is empty"
576 )
577 if as_account:
578 return [await self.get_ap_account_from_id(actor_id) for actor_id in value]
579 else:
580 return value
581
582 async def ap_get_sender_actor(
583 self,
584 data: dict,
585 ) -> str:
586 """Retrieve actor who sent data
587
588 This is done by checking "actor" field first, then "attributedTo" field.
589 Only the first found actor is taken into account
590 @param data: AP object
591 @return: actor id of the sender
592 @raise exceptions.NotFound: no actor has been found in data
593 """
594 try:
595 actors = await self.ap_get_actors(data, "actor", as_account=False)
596 except exceptions.DataError:
597 actors = None
598 if not actors:
599 try:
600 actors = await self.ap_get_actors(data, "attributedTo", as_account=False)
601 except exceptions.DataError:
602 raise exceptions.NotFound(
603 'actor not specified in "actor" or "attributedTo"'
604 )
605 try:
606 return actors[0]
607 except IndexError:
608 raise exceptions.NotFound("list of actors is empty")
609
610 def must_encode(self, text: str) -> bool:
611 """Indicate if a text must be period encoded"""
612 return (
613 not RE_ALLOWED_UNQUOTED.match(text)
614 or text.startswith("___")
615 or "---" in text
616 )
617
618 def period_encode(self, text: str) -> str:
619 """Period encode a text
620
621 see [get_jid_and_node] for reasons of period encoding
622 """
623 return (
624 parse.quote(text, safe="")
625 .replace("---", "%2d%2d%2d")
626 .replace("___", "%5f%5f%5f")
627 .replace(".", "%2e")
628 .replace("~", "%7e")
629 .replace("%", ".")
630 )
631
632 async def get_ap_account_from_jid_and_node(
633 self,
634 jid_: jid.JID,
635 node: Optional[str]
636 ) -> str:
637 """Construct AP account from JID and node
638
639 The account construction will use escaping when necessary
640 """
641 if not node or node == self._m.namespace:
642 node = None
643
644 if self.client is None:
645 raise exceptions.InternalError("Client is not set yet")
646
647 if self.is_virtual_jid(jid_):
648 # this is an proxy JID to an AP Actor
649 return self._e.unescape(jid_.user)
650
651 if node and not jid_.user and not self.must_encode(node):
652 is_pubsub = await self.is_pubsub(jid_)
653 # when we have a pubsub service, the user part can be used to set the node
654 # this produces more user-friendly AP accounts
655 if is_pubsub:
656 jid_.user = node
657 node = None
658
659 is_local = self.is_local(jid_)
660 user = jid_.user if is_local else jid_.userhost()
661 if user is None:
662 user = ""
663 account_elts = []
664 if node and self.must_encode(node) or self.must_encode(user):
665 account_elts = ["___"]
666 if node:
667 node = self.period_encode(node)
668 user = self.period_encode(user)
669
670 if not user:
671 raise exceptions.InternalError("there should be a user part")
672
673 if node:
674 account_elts.extend((node, "---"))
675
676 account_elts.extend((
677 user, "@", jid_.host if is_local else self.client.jid.userhost()
678 ))
679 return "".join(account_elts)
680
681 def is_local(self, jid_: jid.JID) -> bool:
682 """Returns True if jid_ use a domain or subdomain of gateway's host"""
683 local_host = self.client.host.split(".")
684 assert local_host
685 return jid_.host.split(".")[-len(local_host):] == local_host
686
687 async def is_pubsub(self, jid_: jid.JID) -> bool:
688 """Indicate if a JID is a Pubsub service"""
689 host_disco = await self.host.get_disco_infos(self.client, jid_)
690 return (
691 ("pubsub", "service") in host_disco.identities
692 and not ("pubsub", "pep") in host_disco.identities
693 )
694
695 async def get_jid_and_node(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]:
696 """Decode raw AP account handle to get XMPP JID and Pubsub Node
697
698 Username are case insensitive.
699
700 By default, the username correspond to local username (i.e. username from
701 component's server).
702
703 If local name's domain is a pubsub service (and not PEP), the username is taken as
704 a pubsub node.
705
706 If ``---`` is present in username, the part before is used as pubsub node, and the
707 rest as a JID user part.
708
709 If username starts with ``___``, characters are encoded using period encoding
710 (i.e. percent encoding where a ``.`` is used instead of ``%``).
711
712 This horror is necessary due to limitation in some AP implementation (notably
713 Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222
714
715 examples:
716
717 ``toto@example.org`` => JID = toto@example.org, node = None
718
719 ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a
720 non-local JID, and will work only if setings ``local_only`` is False), node = None
721
722 ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) =>
723 JID = pubsub.example.org, node = toto
724
725 ``tata---toto@example.org`` => JID = toto@example.org, node = tata
726
727 ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org
728 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0
729
730 @param ap_account: ActivityPub account handle (``username@domain.tld``)
731 @return: service JID and pubsub node
732 if pubsub node is None, default microblog pubsub node (and possibly other
733 nodes that plugins may hanlde) will be used
734 @raise ValueError: invalid account
735 @raise PermissionError: non local jid is used when gateway doesn't allow them
736 """
737 if ap_account.count("@") != 1:
738 raise ValueError("Invalid AP account")
739 if ap_account.startswith("___"):
740 encoded = True
741 ap_account = ap_account[3:]
742 else:
743 encoded = False
744
745 username, domain = ap_account.split("@")
746
747 if "---" in username:
748 node, username = username.rsplit("---", 1)
749 else:
750 node = None
751
752 if encoded:
753 username = parse.unquote(
754 RE_PERIOD_ENC.sub(r"%\g<hex>", username),
755 errors="strict"
756 )
757 if node:
758 node = parse.unquote(
759 RE_PERIOD_ENC.sub(r"%\g<hex>", node),
760 errors="strict"
761 )
762
763 if "@" in username:
764 username, domain = username.rsplit("@", 1)
765
766 if not node:
767 # we need to check host disco, because disco request to user may be
768 # blocked for privacy reason (see
769 # https://xmpp.org/extensions/xep-0030.html#security)
770 is_pubsub = await self.is_pubsub(jid.JID(domain))
771
772 if is_pubsub:
773 # if the host is a pubsub service and not a PEP, we consider that username
774 # is in fact the node name
775 node = username
776 username = None
777
778 jid_s = f"{username}@{domain}" if username else domain
779 try:
780 jid_ = jid.JID(jid_s)
781 except RuntimeError:
782 raise ValueError(f"Invalid jid: {jid_s!r}")
783
784 if self.local_only and not self.is_local(jid_):
785 raise exceptions.PermissionError(
786 "This gateway is configured to map only local entities and services"
787 )
788
789 return jid_, node
790
791 def get_local_jid_from_account(self, account: str) -> jid.JID:
792 """Compute JID linking to an AP account
793
794 The local jid is computer by escaping AP actor handle and using it as local part
795 of JID, where domain part is this gateway own JID
796 """
797 return jid.JID(
798 None,
799 (
800 self._e.escape(account),
801 self.client.jid.host,
802 None
803 )
804 )
805
806 async def get_jid_from_id(self, actor_id: str) -> jid.JID:
807 """Compute JID linking to an AP Actor ID
808
809 The local jid is computer by escaping AP actor handle and using it as local part
810 of JID, where domain part is this gateway own JID
811 If the actor_id comes from local server (checked with self.public_url), it means
812 that we have an XMPP entity, and the original JID is returned
813 """
814 if self.is_local_url(actor_id):
815 request_type, extra_args = self.parse_apurl(actor_id)
816 if request_type != TYPE_ACTOR or len(extra_args) != 1:
817 raise ValueError(f"invalid actor id: {actor_id!r}")
818 actor_jid, __ = await self.get_jid_and_node(extra_args[0])
819 return actor_jid
820
821 account = await self.get_ap_account_from_id(actor_id)
822 return self.get_local_jid_from_account(account)
823
824 def parse_apurl(self, url: str) -> Tuple[str, List[str]]:
825 """Parse an URL leading to an AP endpoint
826
827 @param url: URL to parse (schema is not mandatory)
828 @return: endpoint type and extra arguments
829 """
830 path = parse.urlparse(url).path.lstrip("/")
831 type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/")
832 return type_, [parse.unquote(a) for a in extra_args]
833
834 def build_apurl(self, type_:str , *args: str) -> str:
835 """Build an AP endpoint URL
836
837 @param type_: type of AP endpoing
838 @param arg: endpoint dependant arguments
839 """
840 return parse.urljoin(
841 self.base_ap_url,
842 str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args)))
843 )
844
845 def is_local_url(self, url: str) -> bool:
846 """Tells if an URL link to this component
847
848 ``public_url`` and ``ap_path`` are used to check the URL
849 """
850 return url.startswith(self.base_ap_url)
851
852 def is_virtual_jid(self, jid_: jid.JID) -> bool:
853 """Tell if a JID is an AP actor mapped through this gateway"""
854 return jid_.host == self.client.jid.userhost()
855
856 def build_signature_header(self, values: Dict[str, str]) -> str:
857 """Build key="<value>" signature header from signature data"""
858 fields = []
859 for key, value in values.items():
860 if key not in ("(created)", "(expired)"):
861 if '"' in value:
862 raise NotImplementedError(
863 "string escaping is not implemented, double-quote can't be used "
864 f"in {value!r}"
865 )
866 value = f'"{value}"'
867 fields.append(f"{key}={value}")
868
869 return ",".join(fields)
870
871 def get_digest(self, body: bytes, algo="SHA-256") -> Tuple[str, str]:
872 """Get digest data to use in header and signature
873
874 @param body: body of the request
875 @return: hash name and digest
876 """
877 if algo != "SHA-256":
878 raise NotImplementedError("only SHA-256 is implemented for now")
879 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
880
881 @async_lru(maxsize=LRU_MAX_SIZE)
882 async def get_actor_data(self, actor_id) -> dict:
883 """Retrieve actor data with LRU cache"""
884 return await self.ap_get(actor_id)
885
886 @async_lru(maxsize=LRU_MAX_SIZE)
887 async def get_actor_pub_key_data(
888 self,
889 actor_id: str
890 ) -> Tuple[str, str, rsa.RSAPublicKey]:
891 """Retrieve Public Key data from actor ID
892
893 @param actor_id: actor ID (url)
894 @return: key_id, owner and public_key
895 @raise KeyError: publicKey is missing from actor data
896 """
897 actor_data = await self.get_actor_data(actor_id)
898 pub_key_data = actor_data["publicKey"]
899 key_id = pub_key_data["id"]
900 owner = pub_key_data["owner"]
901 pub_key_pem = pub_key_data["publicKeyPem"]
902 pub_key = serialization.load_pem_public_key(pub_key_pem.encode())
903 return key_id, owner, pub_key
904
905 def create_activity(
906 self,
907 activity: str,
908 actor_id: str,
909 object_: Optional[Union[str, dict]] = None,
910 target: Optional[Union[str, dict]] = None,
911 activity_id: Optional[str] = None,
912 **kwargs,
913 ) -> Dict[str, Any]:
914 """Generate base data for an activity
915
916 @param activity: one of ACTIVITY_TYPES
917 @param actor_id: AP actor ID of the sender
918 @param object_: content of "object" field
919 @param target: content of "target" field
920 @param activity_id: ID to use for the activity
921 if not set it will be automatically generated, but it is usually desirable to
922 set the ID manually so it can be retrieved (e.g. for Undo)
923 """
924 if activity not in ACTIVITY_TYPES:
925 raise exceptions.InternalError(f"invalid activity: {activity!r}")
926 if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY:
927 raise exceptions.InternalError(
928 f'"object_" is mandatory for activity {activity!r}'
929 )
930 if target is None and activity in ACTIVITY_TARGET_MANDATORY:
931 raise exceptions.InternalError(
932 f'"target" is mandatory for activity {activity!r}'
933 )
934 if activity_id is None:
935 activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}"
936 data: Dict[str, Any] = {
937 "@context": [NS_AP],
938 "actor": actor_id,
939 "id": activity_id,
940 "type": activity,
941 }
942 data.update(kwargs)
943 if object_ is not None:
944 data["object"] = object_
945 if target is not None:
946 data["target"] = target
947
948 return data
949
950 def get_key_id(self, actor_id: str) -> str:
951 """Get local key ID from actor ID"""
952 return f"{actor_id}#main-key"
953
954 async def check_signature(
955 self,
956 signature: str,
957 key_id: str,
958 headers: Dict[str, str]
959 ) -> str:
960 """Verify that signature matches given headers
961
962 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2
963
964 @param signature: Base64 encoded signature
965 @param key_id: ID of the key used to sign the data
966 @param headers: headers and their values, including pseudo-headers
967 @return: id of the signing actor
968
969 @raise InvalidSignature: signature doesn't match headers
970 """
971 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items())
972 if key_id.startswith("acct:"):
973 actor = key_id[5:]
974 actor_id = await self.get_ap_actor_id_from_account(actor)
975 else:
976 actor_id = key_id.split("#", 1)[0]
977
978 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(actor_id)
979 if pub_key_id != key_id or pub_key_owner != actor_id:
980 raise exceptions.EncryptionError("Public Key mismatch")
981
982 try:
983 pub_key.verify(
984 base64.b64decode(signature),
985 to_sign.encode(),
986 # we have to use PKCS1v15 padding to be compatible with Mastodon
987 padding.PKCS1v15(), # type: ignore
988 hashes.SHA256() # type: ignore
989 )
990 except InvalidSignature:
991 raise exceptions.EncryptionError(
992 "Invalid signature (using PKC0S1 v1.5 and SHA-256)"
993 )
994
995 return actor_id
996
997 def get_signature_data(
998 self,
999 key_id: str,
1000 headers: Dict[str, str]
1001 ) -> Tuple[Dict[str, str], Dict[str, str]]:
1002 """Generate and return signature and corresponding headers
1003
1004 @param parsed_url: URL where the request is sent/has been received
1005 @param key_id: ID of the key (URL linking to the data with public key)
1006 @param date: HTTP datetime string of signature generation
1007 @param body: body of the HTTP request
1008 @param headers: headers to sign and their value:
1009 default value will be used if not specified
1010
1011 @return: headers and signature data
1012 ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers
1013 removed, and ``Signature`` added.
1014 """
1015 # headers must be lower case
1016 l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()}
1017 to_sign = "\n".join(f"{k}: {v}" for k,v in l_headers.items())
1018 signature = base64.b64encode(self.private_key.sign(
1019 to_sign.encode(),
1020 # we have to use PKCS1v15 padding to be compatible with Mastodon
1021 padding.PKCS1v15(), # type: ignore
1022 hashes.SHA256() # type: ignore
1023 )).decode()
1024 sign_data = {
1025 "keyId": key_id,
1026 "Algorithm": "rsa-sha256",
1027 "headers": " ".join(l_headers.keys()),
1028 "signature": signature
1029 }
1030 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")}
1031 new_headers["Signature"] = self.build_signature_header(sign_data)
1032 return new_headers, sign_data
1033
1034 async def convert_and_post_items(
1035 self,
1036 client: SatXMPPEntity,
1037 ap_account: str,
1038 service: jid.JID,
1039 node: str,
1040 items: List[domish.Element],
1041 subscribe_extra_nodes: bool = True,
1042 ) -> None:
1043 """Convert XMPP items to AP items and post them to actor inbox
1044
1045 @param ap_account: account of ActivityPub actor receiving the item
1046 @param service: JID of the (virtual) pubsub service where the item has been
1047 published
1048 @param node: (virtual) node corresponding where the item has been published
1049 @param subscribe_extra_nodes: if True, extra data nodes will be automatically
1050 subscribed, that is comment nodes if present and attachments nodes.
1051 """
1052 actor_id = await self.get_ap_actor_id_from_account(ap_account)
1053 inbox = await self.get_ap_inbox_from_id(actor_id)
1054 for item in items:
1055 if item.name == "item":
1056 cached_item = await self.host.memory.storage.search_pubsub_items({
1057 "profiles": [self.client.profile],
1058 "services": [service],
1059 "nodes": [node],
1060 "names": [item["id"]]
1061 })
1062 is_new = not bool(cached_item)
1063 if node.startswith(self._events.namespace):
1064 # event item
1065 event_data = self._events.event_elt_2_event_data(item)
1066 try:
1067 author_jid = jid.JID(item["publisher"]).userhostJID()
1068 except (KeyError, RuntimeWarning):
1069 root_elt = item
1070 while root_elt.parent is not None:
1071 root_elt = root_elt.parent
1072 author_jid = jid.JID(root_elt["from"]).userhostJID()
1073 if subscribe_extra_nodes and not self.is_virtual_jid(author_jid):
1074 # we subscribe automatically to comment nodes if any
1075 recipient_jid = self.get_local_jid_from_account(ap_account)
1076 recipient_client = self.client.get_virtual_client(recipient_jid)
1077 comments_data = event_data.get("comments")
1078 if comments_data:
1079 comment_service = jid.JID(comments_data["jid"])
1080 comment_node = comments_data["node"]
1081 await self._p.subscribe(
1082 recipient_client, comment_service, comment_node
1083 )
1084 try:
1085 await self._pa.subscribe(
1086 recipient_client, service, node, event_data["id"]
1087 )
1088 except exceptions.NotFound:
1089 log.debug(
1090 f"no attachment node found for item {event_data['id']!r} "
1091 f"on {node!r} at {service}"
1092 )
1093 ap_item = await self.ap_events.event_data_2_ap_item(
1094 event_data, author_jid, is_new=is_new
1095 )
1096 else:
1097 # blog item
1098 mb_data = await self._m.item_2_mb_data(client, item, service, node)
1099 author_jid = jid.JID(mb_data["author_jid"])
1100 if subscribe_extra_nodes and not self.is_virtual_jid(author_jid):
1101 # we subscribe automatically to comment nodes if any
1102 recipient_jid = self.get_local_jid_from_account(ap_account)
1103 recipient_client = self.client.get_virtual_client(recipient_jid)
1104 for comment_data in mb_data.get("comments", []):
1105 comment_service = jid.JID(comment_data["service"])
1106 if self.is_virtual_jid(comment_service):
1107 log.debug(
1108 f"ignoring virtual comment service: {comment_data}"
1109 )
1110 continue
1111 comment_node = comment_data["node"]
1112 await self._p.subscribe(
1113 recipient_client, comment_service, comment_node
1114 )
1115 try:
1116 await self._pa.subscribe(
1117 recipient_client, service, node, mb_data["id"]
1118 )
1119 except exceptions.NotFound:
1120 log.debug(
1121 f"no attachment node found for item {mb_data['id']!r} on "
1122 f"{node!r} at {service}"
1123 )
1124 ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new)
1125
1126 url_actor = ap_item["actor"]
1127 elif item.name == "retract":
1128 url_actor, ap_item = await self.ap_delete_item(
1129 client.jid, node, item["id"]
1130 )
1131 else:
1132 raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
1133 await self.sign_and_post(inbox, url_actor, ap_item)
1134
1135 async def convert_and_post_attachments(
1136 self,
1137 client: SatXMPPEntity,
1138 ap_account: str,
1139 service: jid.JID,
1140 node: str,
1141 items: List[domish.Element],
1142 publisher: Optional[jid.JID] = None
1143 ) -> None:
1144 """Convert XMPP item attachments to AP activities and post them to actor inbox
1145
1146 @param ap_account: account of ActivityPub actor receiving the item
1147 @param service: JID of the (virtual) pubsub service where the item has been
1148 published
1149 @param node: (virtual) node corresponding where the item has been published
1150 subscribed, that is comment nodes if present and attachments nodes.
1151 @param items: attachments items
1152 @param publisher: publisher of the attachments item (it's NOT the PEP/Pubsub
1153 service, it's the publisher of the item). To be filled only when the publisher
1154 is known for sure, otherwise publisher will be determined either if
1155 "publisher" attribute is set by pubsub service, or as a last resort, using
1156 item's ID (which MUST be publisher bare JID according to pubsub-attachments
1157 specification).
1158 """
1159 if len(items) != 1:
1160 log.warning(
1161 "we should get exactly one attachment item for an entity, got "
1162 f"{len(items)})"
1163 )
1164
1165 actor_id = await self.get_ap_actor_id_from_account(ap_account)
1166 inbox = await self.get_ap_inbox_from_id(actor_id)
1167
1168 item_elt = items[0]
1169 item_id = item_elt["id"]
1170
1171 if publisher is None:
1172 item_pub_s = item_elt.getAttribute("publisher")
1173 publisher = jid.JID(item_pub_s) if item_pub_s else jid.JID(item_id)
1174
1175 if publisher.userhost() != item_id:
1176 log.warning(
1177 "attachments item ID must be publisher's bare JID, ignoring: "
1178 f"{item_elt.toXml()}"
1179 )
1180 return
1181
1182 if self.is_virtual_jid(publisher):
1183 log.debug(f"ignoring item coming from local virtual JID {publisher}")
1184 return
1185
1186 if publisher is not None:
1187 item_elt["publisher"] = publisher.userhost()
1188
1189 item_service, item_node, item_id = self._pa.attachment_node_2_item(node)
1190 item_account = await self.get_ap_account_from_jid_and_node(item_service, item_node)
1191 if self.is_virtual_jid(item_service):
1192 # it's a virtual JID mapping to an external AP actor, we can use the
1193 # item_id directly
1194 item_url = item_id
1195 if not item_url.startswith("https:"):
1196 log.warning(
1197 "item ID of external AP actor is not an https link, ignoring: "
1198 f"{item_id!r}"
1199 )
1200 return
1201 else:
1202 item_url = self.build_apurl(TYPE_ITEM, item_account, item_id)
1203
1204 old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({
1205 "profiles": [self.client.profile],
1206 "services": [service],
1207 "nodes": [node],
1208 "names": [item_elt["id"]]
1209 })
1210 if not old_attachment_pubsub_items:
1211 old_attachment = {}
1212 else:
1213 old_attachment_items = [i.data for i in old_attachment_pubsub_items]
1214 old_attachments = self._pa.items_2_attachment_data(client, old_attachment_items)
1215 try:
1216 old_attachment = old_attachments[0]
1217 except IndexError:
1218 # no known element was present in attachments
1219 old_attachment = {}
1220 publisher_account = await self.get_ap_account_from_jid_and_node(
1221 publisher,
1222 None
1223 )
1224 publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account)
1225 try:
1226 attachments = self._pa.items_2_attachment_data(client, [item_elt])[0]
1227 except IndexError:
1228 # no known element was present in attachments
1229 attachments = {}
1230
1231 # noticed
1232 if "noticed" in attachments:
1233 if not "noticed" in old_attachment:
1234 # new "noticed" attachment, we translate to "Like" activity
1235 activity_id = self.build_apurl("like", item_account, item_id)
1236 activity = self.create_activity(
1237 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1238 )
1239 activity["to"] = [ap_account]
1240 activity["cc"] = [NS_AP_PUBLIC]
1241 await self.sign_and_post(inbox, publisher_actor_id, activity)
1242 else:
1243 if "noticed" in old_attachment:
1244 # "noticed" attachment has been removed, we undo the "Like" activity
1245 activity_id = self.build_apurl("like", item_account, item_id)
1246 activity = self.create_activity(
1247 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1248 )
1249 activity["to"] = [ap_account]
1250 activity["cc"] = [NS_AP_PUBLIC]
1251 undo = self.create_activity("Undo", publisher_actor_id, activity)
1252 await self.sign_and_post(inbox, publisher_actor_id, undo)
1253
1254 # reactions
1255 new_reactions = set(attachments.get("reactions", {}).get("reactions", []))
1256 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", []))
1257 reactions_remove = old_reactions - new_reactions
1258 reactions_add = new_reactions - old_reactions
1259 for reactions, undo in ((reactions_remove, True), (reactions_add, False)):
1260 for reaction in reactions:
1261 activity_id = self.build_apurl(
1262 "reaction", item_account, item_id, reaction.encode().hex()
1263 )
1264 reaction_activity = self.create_activity(
1265 TYPE_REACTION, publisher_actor_id, item_url,
1266 activity_id=activity_id
1267 )
1268 reaction_activity["content"] = reaction
1269 reaction_activity["to"] = [ap_account]
1270 reaction_activity["cc"] = [NS_AP_PUBLIC]
1271 if undo:
1272 activy = self.create_activity(
1273 "Undo", publisher_actor_id, reaction_activity
1274 )
1275 else:
1276 activy = reaction_activity
1277 await self.sign_and_post(inbox, publisher_actor_id, activy)
1278
1279 # RSVP
1280 if "rsvp" in attachments:
1281 attending = attachments["rsvp"].get("attending", "no")
1282 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1283 if attending != old_attending:
1284 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE
1285 activity_id = self.build_apurl(activity_type.lower(), item_account, item_id)
1286 activity = self.create_activity(
1287 activity_type, publisher_actor_id, item_url, activity_id=activity_id
1288 )
1289 activity["to"] = [ap_account]
1290 activity["cc"] = [NS_AP_PUBLIC]
1291 await self.sign_and_post(inbox, publisher_actor_id, activity)
1292 else:
1293 if "rsvp" in old_attachment:
1294 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1295 if old_attending == "yes":
1296 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id)
1297 activity = self.create_activity(
1298 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id
1299 )
1300 activity["to"] = [ap_account]
1301 activity["cc"] = [NS_AP_PUBLIC]
1302 await self.sign_and_post(inbox, publisher_actor_id, activity)
1303
1304 if service.user and self.is_virtual_jid(service):
1305 # the item is on a virtual service, we need to store it in cache
1306 log.debug("storing attachments item in cache")
1307 cached_node = await self.host.memory.storage.get_pubsub_node(
1308 client, service, node, with_subscriptions=True, create=True
1309 )
1310 await self.host.memory.storage.cache_pubsub_items(
1311 self.client,
1312 cached_node,
1313 [item_elt],
1314 [attachments]
1315 )
1316
1317 async def sign_and_post(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
1318 """Sign a documentent and post it to AP server
1319
1320 @param url: AP server endpoint
1321 @param actor_id: originating actor ID (URL)
1322 @param doc: document to send
1323 """
1324 if self.verbose:
1325 __, actor_args = self.parse_apurl(actor_id)
1326 actor_account = actor_args[0]
1327 to_log = [
1328 "",
1329 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}"
1330 ]
1331
1332 p_url = parse.urlparse(url)
1333 body = json.dumps(doc).encode()
1334 digest_algo, digest_hash = self.get_digest(body)
1335 digest = f"{digest_algo}={digest_hash}"
1336
1337 headers = {
1338 "(request-target)": f"post {p_url.path}",
1339 "Host": p_url.hostname,
1340 "Date": http.datetimeToString().decode(),
1341 "Digest": digest
1342 }
1343 headers["Content-Type"] = (
1344 'application/activity+json'
1345 )
1346 headers, __ = self.get_signature_data(self.get_key_id(actor_id), headers)
1347
1348 if self.verbose:
1349 if self.verbose>=3:
1350 h_to_log = "\n".join(f" {k}: {v}" for k,v in headers.items())
1351 to_log.append(f" headers:\n{h_to_log}")
1352 to_log.append("---")
1353 log.info("\n".join(to_log))
1354
1355 resp = await treq.post(
1356 url,
1357 body,
1358 headers=headers,
1359 )
1360 if resp.code >= 300:
1361 text = await resp.text()
1362 log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
1363 elif self.verbose:
1364 log.info(f"==> response code: {resp.code}")
1365 return resp
1366
1367 def _publish_message(self, mess_data_s: str, service_s: str, profile: str):
1368 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
1369 service = jid.JID(service_s)
1370 client = self.host.get_client(profile)
1371 return defer.ensureDeferred(self.publish_message(client, mess_data, service))
1372
1373 @async_lru(maxsize=LRU_MAX_SIZE)
1374 async def get_ap_actor_id_from_account(self, account: str) -> str:
1375 """Retrieve account ID from it's handle using WebFinger
1376
1377 Don't use this method to get local actor id from a local account derivated for
1378 JID: in this case, the actor ID is retrieve with
1379 ``self.build_apurl(TYPE_ACTOR, ap_account)``
1380
1381 @param account: AP handle (user@domain.tld)
1382 @return: Actor ID (which is an URL)
1383 """
1384 if account.count("@") != 1 or "/" in account:
1385 raise ValueError(f"Invalid account: {account!r}")
1386 host = account.split("@")[1]
1387 try:
1388 finger_data = await treq.json_content(await treq.get(
1389 f"https://{host}/.well-known/webfinger?"
1390 f"resource=acct:{parse.quote_plus(account)}",
1391 ))
1392 except Exception as e:
1393 raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}")
1394 for link in finger_data.get("links", []):
1395 if (
1396 link.get("type") == "application/activity+json"
1397 and link.get("rel") == "self"
1398 ):
1399 href = link.get("href", "").strip()
1400 if not href:
1401 raise ValueError(
1402 f"Invalid webfinger data for {account:r}: missing href"
1403 )
1404 break
1405 else:
1406 raise ValueError(
1407 f"No ActivityPub link found for {account!r}"
1408 )
1409 return href
1410
1411 async def get_ap_actor_data_from_account(self, account: str) -> dict:
1412 """Retrieve ActivityPub Actor data
1413
1414 @param account: ActivityPub Actor identifier
1415 """
1416 href = await self.get_ap_actor_id_from_account(account)
1417 return await self.ap_get(href)
1418
1419 async def get_ap_inbox_from_id(self, actor_id: str, use_shared: bool = True) -> str:
1420 """Retrieve inbox of an actor_id
1421
1422 @param use_shared: if True, and a shared inbox exists, it will be used instead of
1423 the user inbox
1424 """
1425 data = await self.get_actor_data(actor_id)
1426 if use_shared:
1427 try:
1428 return data["endpoints"]["sharedInbox"]
1429 except KeyError:
1430 pass
1431 return data["inbox"]
1432
1433 @async_lru(maxsize=LRU_MAX_SIZE)
1434 async def get_ap_account_from_id(self, actor_id: str) -> str:
1435 """Retrieve AP account from the ID URL
1436
1437 Works with external or local actor IDs.
1438 @param actor_id: AP ID of the actor (URL to the actor data)
1439 @return: AP handle
1440 """
1441 if self.is_local_url(actor_id):
1442 url_type, url_args = self.parse_apurl(actor_id)
1443 if url_type != "actor" or not url_args:
1444 raise exceptions.DataError(
1445 f"invalid local actor ID: {actor_id}"
1446 )
1447 account = url_args[0]
1448 try:
1449 account_user, account_host = account.split('@')
1450 except ValueError:
1451 raise exceptions.DataError(
1452 f"invalid account from url: {actor_id}"
1453 )
1454 if not account_user or account_host != self.public_url:
1455 raise exceptions.DataError(
1456 f"{account!r} is not a valid local account (from {actor_id})"
1457 )
1458 return account
1459
1460 url_parsed = parse.urlparse(actor_id)
1461 actor_data = await self.get_actor_data(actor_id)
1462 username = actor_data.get("preferredUsername")
1463 if not username:
1464 raise exceptions.DataError(
1465 'No "preferredUsername" field found, can\'t retrieve actor account'
1466 )
1467 account = f"{username}@{url_parsed.hostname}"
1468 # we try to retrieve the actor ID from the account to check it
1469 found_id = await self.get_ap_actor_id_from_account(account)
1470 if found_id != actor_id:
1471 # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196
1472 msg = (
1473 f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID "
1474 f"({actor_id!r}). This AP instance doesn't seems to use "
1475 '"preferredUsername" as we expect.'
1476 )
1477 log.warning(msg)
1478 raise exceptions.DataError(msg)
1479 return account
1480
1481 async def get_ap_items(
1482 self,
1483 collection: dict,
1484 max_items: Optional[int] = None,
1485 chronological_pagination: bool = True,
1486 after_id: Optional[str] = None,
1487 start_index: Optional[int] = None,
1488 parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None,
1489 only_ids: bool = False,
1490 ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
1491 """Retrieve AP items and convert them to XMPP items
1492
1493 @param account: AP account handle to get items from
1494 @param max_items: maximum number of items to retrieve
1495 retrieve all items by default
1496 @param chronological_pagination: get pages in chronological order
1497 AP use reversed chronological order for pagination, "first" page returns more
1498 recent items. If "chronological_pagination" is True, "last" AP page will be
1499 retrieved first.
1500 @param after_id: if set, retrieve items starting from given ID
1501 Due to ActivityStream Collection Paging limitations, this is inefficient and
1502 if ``after_id`` is not already in cache, we have to retrieve every page until
1503 we find it.
1504 In most common cases, ``after_id`` should be in cache though (client usually
1505 use known ID when in-order pagination is used).
1506 @param start_index: start retrieving items from the one with given index
1507 Due to ActivityStream Collection Paging limitations, this is inefficient and
1508 all pages before the requested index will be retrieved to count items.
1509 @param parser: method to use to parse AP items and get XMPP item elements
1510 if None, use default generic parser
1511 @param only_ids: if True, only retrieve items IDs
1512 Retrieving only item IDs avoid HTTP requests to retrieve items, it may be
1513 sufficient in some use cases (e.g. when retrieving following/followers
1514 collections)
1515 @return: XMPP Pubsub items and corresponding RSM Response
1516 Items are always returned in chronological order in the result
1517 """
1518 if parser is None:
1519 parser = self.ap_item_2_mb_elt
1520
1521 rsm_resp: Dict[str, Union[bool, int]] = {}
1522 try:
1523 count = collection["totalItems"]
1524 except KeyError:
1525 log.warning(
1526 f'"totalItems" not found in collection {collection.get("id")}, '
1527 "defaulting to 20"
1528 )
1529 count = 20
1530 else:
1531 log.info(f"{collection.get('id')} has {count} item(s)")
1532
1533 rsm_resp["count"] = count
1534
1535 if start_index is not None:
1536 assert chronological_pagination and after_id is None
1537 if start_index >= count:
1538 return [], rsm_resp
1539 elif start_index == 0:
1540 # this is the default behaviour
1541 pass
1542 elif start_index > 5000:
1543 raise error.StanzaError(
1544 "feature-not-implemented",
1545 text="Maximum limit for previous_index has been reached, this limit"
1546 "is set to avoid DoS"
1547 )
1548 else:
1549 # we'll convert "start_index" to "after_id", thus we need the item just
1550 # before "start_index"
1551 previous_index = start_index - 1
1552 retrieved_items = 0
1553 current_page = collection["last"]
1554 while retrieved_items < count:
1555 page_data, items = await self.parse_ap_page(
1556 current_page, parser, only_ids
1557 )
1558 if not items:
1559 log.warning(f"found an empty AP page at {current_page}")
1560 return [], rsm_resp
1561 page_start_idx = retrieved_items
1562 retrieved_items += len(items)
1563 if previous_index <= retrieved_items:
1564 after_id = items[previous_index - page_start_idx]["id"]
1565 break
1566 try:
1567 current_page = page_data["prev"]
1568 except KeyError:
1569 log.warning(
1570 f"missing previous page link at {current_page}: {page_data!r}"
1571 )
1572 raise error.StanzaError(
1573 "service-unavailable",
1574 "Error while retrieving previous page from AP service at "
1575 f"{current_page}"
1576 )
1577
1578 init_page = "last" if chronological_pagination else "first"
1579 page = collection.get(init_page)
1580 if not page:
1581 raise exceptions.DataError(
1582 f"Initial page {init_page!r} not found for collection "
1583 f"{collection.get('id')})"
1584 )
1585 items = []
1586 page_items = []
1587 retrieved_items = 0
1588 found_after_id = False
1589
1590 while retrieved_items < count:
1591 __, page_items = await self.parse_ap_page(page, parser, only_ids)
1592 if not page_items:
1593 break
1594 retrieved_items += len(page_items)
1595 if after_id is not None and not found_after_id:
1596 # if we have an after_id, we ignore all items until the requested one is
1597 # found
1598 try:
1599 limit_idx = [i["id"] for i in page_items].index(after_id)
1600 except ValueError:
1601 # if "after_id" is not found, we don't add any item from this page
1602 page_id = page.get("id") if isinstance(page, dict) else page
1603 log.debug(f"{after_id!r} not found at {page_id}, skipping")
1604 else:
1605 found_after_id = True
1606 if chronological_pagination:
1607 start_index = retrieved_items - len(page_items) + limit_idx + 1
1608 page_items = page_items[limit_idx+1:]
1609 else:
1610 start_index = count - (retrieved_items - len(page_items) +
1611 limit_idx + 1)
1612 page_items = page_items[:limit_idx]
1613 items.extend(page_items)
1614 else:
1615 items.extend(page_items)
1616 if max_items is not None and len(items) >= max_items:
1617 if chronological_pagination:
1618 items = items[:max_items]
1619 else:
1620 items = items[-max_items:]
1621 break
1622 page = collection.get("prev" if chronological_pagination else "next")
1623 if not page:
1624 break
1625
1626 if after_id is not None and not found_after_id:
1627 raise error.StanzaError("item-not-found")
1628
1629 if items:
1630 if after_id is None:
1631 rsm_resp["index"] = 0 if chronological_pagination else count - len(items)
1632 if start_index is not None:
1633 rsm_resp["index"] = start_index
1634 elif after_id is not None:
1635 log.warning("Can't determine index of first element")
1636 elif chronological_pagination:
1637 rsm_resp["index"] = 0
1638 else:
1639 rsm_resp["index"] = count - len(items)
1640 rsm_resp.update({
1641 "first": items[0]["id"],
1642 "last": items[-1]["id"]
1643 })
1644
1645 return items, rsm.RSMResponse(**rsm_resp)
1646
1647 async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
1648 """Convert AP item to parsed microblog data and corresponding item element"""
1649 mb_data = await self.ap_item_2_mb_data(ap_item)
1650 item_elt = await self._m.mb_data_2_entry_elt(
1651 self.client, mb_data, mb_data["id"], None, self._m.namespace
1652 )
1653 if "repeated" in mb_data["extra"]:
1654 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"]
1655 else:
1656 item_elt["publisher"] = mb_data["author_jid"]
1657 return mb_data, item_elt
1658
1659 async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element:
1660 """Convert AP item to XMPP item element"""
1661 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
1662 return item_elt
1663
1664 async def parse_ap_page(
1665 self,
1666 page: Union[str, dict],
1667 parser: Callable[[dict], Awaitable[domish.Element]],
1668 only_ids: bool = False
1669 ) -> Tuple[dict, List[domish.Element]]:
1670 """Convert AP objects from an AP page to XMPP items
1671
1672 @param page: Can be either url linking and AP page, or the page data directly
1673 @param parser: method to use to parse AP items and get XMPP item elements
1674 @param only_ids: if True, only retrieve items IDs
1675 @return: page data, pubsub items
1676 """
1677 page_data = await self.ap_get_object(page)
1678 if page_data is None:
1679 log.warning('No data found in collection')
1680 return {}, []
1681 ap_items = await self.ap_get_list(page_data, "orderedItems", only_ids=only_ids)
1682 if ap_items is None:
1683 ap_items = await self.ap_get_list(page_data, "items", only_ids=only_ids)
1684 if not ap_items:
1685 log.warning(f'No item field found in collection: {page_data!r}')
1686 return page_data, []
1687 else:
1688 log.warning(
1689 "Items are not ordered, this is not spec compliant"
1690 )
1691 items = []
1692 # AP Collections are in antichronological order, but we expect chronological in
1693 # Pubsub, thus we reverse it
1694 for ap_item in reversed(ap_items):
1695 try:
1696 items.append(await parser(ap_item))
1697 except (exceptions.DataError, NotImplementedError, error.StanzaError):
1698 continue
1699
1700 return page_data, items
1701
1702 async def get_comments_nodes(
1703 self,
1704 item_id: str,
1705 parent_id: Optional[str]
1706 ) -> Tuple[Optional[str], Optional[str]]:
1707 """Get node where this item is and node to use for comments
1708
1709 if config option "comments_max_depth" is set, a common node will be used below the
1710 given depth
1711 @param item_id: ID of the reference item
1712 @param parent_id: ID of the parent item if any (the ID set in "inReplyTo")
1713 @return: a tuple with parent_node_id, comments_node_id:
1714 - parent_node_id is the ID of the node where reference item must be. None is
1715 returned when the root node (i.e. not a comments node) must be used.
1716 - comments_node_id: is the ID of the node to use for comments. None is
1717 returned when no comment node must be used (happens when we have reached
1718 "comments_max_depth")
1719 """
1720 if parent_id is None or not self.comments_max_depth:
1721 return (
1722 self._m.get_comments_node(parent_id) if parent_id is not None else None,
1723 self._m.get_comments_node(item_id)
1724 )
1725 parent_url = parent_id
1726 parents = []
1727 for __ in range(COMMENTS_MAX_PARENTS):
1728 parent_item = await self.ap_get(parent_url)
1729 parents.insert(0, parent_item)
1730 parent_url = parent_item.get("inReplyTo")
1731 if parent_url is None:
1732 break
1733 parent_limit = self.comments_max_depth-1
1734 if len(parents) <= parent_limit:
1735 return (
1736 self._m.get_comments_node(parents[-1]["id"]),
1737 self._m.get_comments_node(item_id)
1738 )
1739 else:
1740 last_level_item = parents[parent_limit]
1741 return (
1742 self._m.get_comments_node(last_level_item["id"]),
1743 None
1744 )
1745
1746 async def ap_item_2_mb_data(self, ap_item: dict) -> dict:
1747 """Convert AP activity or object to microblog data
1748
1749 @param ap_item: ActivityPub item to convert
1750 Can be either an activity of an object
1751 @return: AP Item's Object and microblog data
1752 @raise exceptions.DataError: something is invalid in the AP item
1753 @raise NotImplementedError: some AP data is not handled yet
1754 @raise error.StanzaError: error while contacting the AP server
1755 """
1756 is_activity = self.is_activity(ap_item)
1757 if is_activity:
1758 ap_object = await self.ap_get_object(ap_item, "object")
1759 if not ap_object:
1760 log.warning(f'No "object" found in AP item {ap_item!r}')
1761 raise exceptions.DataError
1762 else:
1763 ap_object = ap_item
1764 item_id = ap_object.get("id")
1765 if not item_id:
1766 log.warning(f'No "id" found in AP item: {ap_object!r}')
1767 raise exceptions.DataError
1768 mb_data = {"id": item_id, "extra": {}}
1769
1770 # content
1771 try:
1772 language, content_xhtml = ap_object["contentMap"].popitem()
1773 except (KeyError, AttributeError):
1774 try:
1775 mb_data["content_xhtml"] = ap_object["content"]
1776 except KeyError:
1777 log.warning(f"no content found:\n{ap_object!r}")
1778 raise exceptions.DataError
1779 else:
1780 mb_data["language"] = language
1781 mb_data["content_xhtml"] = content_xhtml
1782
1783 mb_data["content"] = await self._t.convert(
1784 mb_data["content_xhtml"],
1785 self._t.SYNTAX_XHTML,
1786 self._t.SYNTAX_TEXT,
1787 False,
1788 )
1789
1790 if "attachment" in ap_object:
1791 attachments = mb_data["extra"][C.KEY_ATTACHMENTS] = []
1792 for ap_attachment in ap_object["attachment"]:
1793 try:
1794 url = ap_attachment["url"]
1795 except KeyError:
1796 log.warning(
1797 f'"url" missing in AP attachment, ignoring: {ap_attachment}'
1798 )
1799 continue
1800
1801 if not url.startswith("http"):
1802 log.warning(f"non HTTP URL in attachment, ignoring: {ap_attachment}")
1803 continue
1804 attachment = {"url": url}
1805 for ap_key, key in (
1806 ("mediaType", "media_type"),
1807 # XXX: as weird as it seems, "name" is actually used for description
1808 # in AP world
1809 ("name", "desc"),
1810 ):
1811 value = ap_attachment.get(ap_key)
1812 if value:
1813 attachment[key] = value
1814 attachments.append(attachment)
1815
1816 # author
1817 if is_activity:
1818 authors = await self.ap_get_actors(ap_item, "actor")
1819 else:
1820 authors = await self.ap_get_actors(ap_object, "attributedTo")
1821 if len(authors) > 1:
1822 # we only keep first item as author
1823 # TODO: handle multiple actors
1824 log.warning("multiple actors are not managed")
1825
1826 account = authors[0]
1827 author_jid = self.get_local_jid_from_account(account).full()
1828
1829 mb_data["author"] = account.split("@", 1)[0]
1830 mb_data["author_jid"] = author_jid
1831
1832 # published/updated
1833 for field in ("published", "updated"):
1834 value = ap_object.get(field)
1835 if not value and field == "updated":
1836 value = ap_object.get("published")
1837 if value:
1838 try:
1839 mb_data[field] = calendar.timegm(
1840 dateutil.parser.parse(str(value)).utctimetuple()
1841 )
1842 except dateutil.parser.ParserError as e:
1843 log.warning(f"Can't parse {field!r} field: {e}")
1844
1845 # repeat
1846 if "_repeated" in ap_item:
1847 mb_data["extra"]["repeated"] = ap_item["_repeated"]
1848
1849 # comments
1850 in_reply_to = ap_object.get("inReplyTo")
1851 __, comments_node = await self.get_comments_nodes(item_id, in_reply_to)
1852 if comments_node is not None:
1853 comments_data = {
1854 "service": author_jid,
1855 "node": comments_node,
1856 "uri": uri.build_xmpp_uri(
1857 "pubsub",
1858 path=author_jid,
1859 node=comments_node
1860 )
1861 }
1862 mb_data["comments"] = [comments_data]
1863
1864 return mb_data
1865
1866 async def get_reply_to_id_from_xmpp_node(
1867 self,
1868 client: SatXMPPEntity,
1869 ap_account: str,
1870 parent_item: str,
1871 mb_data: dict
1872 ) -> str:
1873 """Get URL to use for ``inReplyTo`` field in AP item.
1874
1875 There is currently no way to know the parent service of a comment with XEP-0277.
1876 To work around that, we try to check if we have this item in the cache (we
1877 should). If there is more that one item with this ID, we first try to find one
1878 with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`.
1879
1880 @param ap_account: AP account corresponding to the publication author
1881 @param parent_item: ID of the node where the publication this item is replying to
1882 has been posted
1883 @param mb_data: microblog data of the publication
1884 @return: URL to use in ``inReplyTo`` field
1885 """
1886 # FIXME: propose a protoXEP to properly get parent item, node and service
1887
1888 found_items = await self.host.memory.storage.search_pubsub_items({
1889 "profiles": [client.profile],
1890 "names": [parent_item]
1891 })
1892 if not found_items:
1893 log.warning(f"parent item {parent_item!r} not found in cache")
1894 parent_ap_account = ap_account
1895 elif len(found_items) == 1:
1896 cached_node = found_items[0].node
1897 parent_ap_account = await self.get_ap_account_from_jid_and_node(
1898 cached_node.service,
1899 cached_node.name
1900 )
1901 else:
1902 # we found several cached item with given ID, we check if there is one
1903 # corresponding to this author
1904 try:
1905 author = jid.JID(mb_data["author_jid"]).userhostJID()
1906 cached_item = next(
1907 i for i in found_items
1908 if jid.JID(i.data["publisher"]).userhostJID()
1909 == author
1910 )
1911 except StopIteration:
1912 # no item corresponding to this author, we use ap_account
1913 log.warning(
1914 "Can't find a single cached item for parent item "
1915 f"{parent_item!r}"
1916 )
1917 parent_ap_account = ap_account
1918 else:
1919 cached_node = cached_item.node
1920 parent_ap_account = await self.get_ap_account_from_jid_and_node(
1921 cached_node.service,
1922 cached_node.name
1923 )
1924
1925 return self.build_apurl(
1926 TYPE_ITEM, parent_ap_account, parent_item
1927 )
1928
1929 async def repeated_mb_2_ap_item(
1930 self,
1931 mb_data: dict
1932 ) -> dict:
1933 """Convert repeated blog item to suitable AP Announce activity
1934
1935 @param mb_data: microblog metadata of an item repeating an other blog post
1936 @return: Announce activity linking to the repeated item
1937 """
1938 repeated = mb_data["extra"]["repeated"]
1939 repeater = jid.JID(repeated["by"])
1940 repeater_account = await self.get_ap_account_from_jid_and_node(
1941 repeater,
1942 None
1943 )
1944 repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account)
1945 repeated_uri = repeated["uri"]
1946
1947 if not repeated_uri.startswith("xmpp:"):
1948 log.warning(
1949 "Only xmpp: URL are handled for repeated item at the moment, ignoring "
1950 f"item {mb_data}"
1951 )
1952 raise NotImplementedError
1953 parsed_url = uri.parse_xmpp_uri(repeated_uri)
1954 if parsed_url["type"] != "pubsub":
1955 log.warning(
1956 "Only pubsub URL are handled for repeated item at the moment, ignoring "
1957 f"item {mb_data}"
1958 )
1959 raise NotImplementedError
1960 rep_service = jid.JID(parsed_url["path"])
1961 rep_item = parsed_url["item"]
1962 activity_id = self.build_apurl("item", repeater.userhost(), mb_data["id"])
1963
1964 if self.is_virtual_jid(rep_service):
1965 # it's an AP actor linked through this gateway
1966 # in this case we can simply use the item ID
1967 if not rep_item.startswith("https:"):
1968 log.warning(
1969 f"Was expecting an HTTPS url as item ID and got {rep_item!r}\n"
1970 f"{mb_data}"
1971 )
1972 announced_uri = rep_item
1973 repeated_account = self._e.unescape(rep_service.user)
1974 else:
1975 # the repeated item is an XMPP publication, we build the corresponding ID
1976 rep_node = parsed_url["node"]
1977 repeated_account = await self.get_ap_account_from_jid_and_node(
1978 rep_service, rep_node
1979 )
1980 announced_uri = self.build_apurl("item", repeated_account, rep_item)
1981
1982 announce = self.create_activity(
1983 "Announce", repeater_id, announced_uri, activity_id=activity_id
1984 )
1985 announce["to"] = [NS_AP_PUBLIC]
1986 announce["cc"] = [
1987 self.build_apurl(TYPE_FOLLOWERS, repeater_account),
1988 await self.get_ap_actor_id_from_account(repeated_account)
1989 ]
1990 return announce
1991
1992 async def mb_data_2_ap_item(
1993 self,
1994 client: SatXMPPEntity,
1995 mb_data: dict,
1996 public: bool =True,
1997 is_new: bool = True,
1998 ) -> dict:
1999 """Convert Libervia Microblog Data to ActivityPub item
2000
2001 @param mb_data: microblog data (as used in plugin XEP-0277) to convert
2002 If ``public`` is True, ``service`` and ``node`` keys must be set.
2003 If ``published`` is not set, current datetime will be used
2004 @param public: True if the message is not a private/direct one
2005 if True, the AP Item will be marked as public, and AP followers of target AP
2006 account (which retrieve from ``service``) will be put in ``cc``.
2007 ``inReplyTo`` will also be set if suitable
2008 if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
2009 This is usually used for direct messages.
2010 @param is_new: if True, the item is a new one (no instance has been found in
2011 cache).
2012 If True, a "Create" activity will be generated, otherwise an "Update" one will
2013 be.
2014 @return: Activity item
2015 """
2016 extra = mb_data.get("extra", {})
2017 if "repeated" in extra:
2018 return await self.repeated_mb_2_ap_item(mb_data)
2019 if not mb_data.get("id"):
2020 mb_data["id"] = shortuuid.uuid()
2021 if not mb_data.get("author_jid"):
2022 mb_data["author_jid"] = client.jid.userhost()
2023 ap_account = await self.get_ap_account_from_jid_and_node(
2024 jid.JID(mb_data["author_jid"]),
2025 None
2026 )
2027 url_actor = self.build_apurl(TYPE_ACTOR, ap_account)
2028 url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"])
2029 ap_object = {
2030 "id": url_item,
2031 "type": "Note",
2032 "published": utils.xmpp_date(mb_data.get("published")),
2033 "attributedTo": url_actor,
2034 "content": mb_data.get("content_xhtml") or mb_data["content"],
2035 }
2036
2037 language = mb_data.get("language")
2038 if language:
2039 ap_object["contentMap"] = {language: ap_object["content"]}
2040
2041 attachments = extra.get(C.KEY_ATTACHMENTS)
2042 if attachments:
2043 ap_attachments = ap_object["attachment"] = []
2044 for attachment in attachments:
2045 try:
2046 url = next(
2047 s['url'] for s in attachment["sources"] if 'url' in s
2048 )
2049 except (StopIteration, KeyError):
2050 log.warning(
2051 f"Ignoring attachment without URL: {attachment}"
2052 )
2053 continue
2054 ap_attachment = {
2055 "url": url
2056 }
2057 for key, ap_key in (
2058 ("media_type", "mediaType"),
2059 # XXX: yes "name", cf. [ap_item_2_mb_data]
2060 ("desc", "name"),
2061 ):
2062 value = attachment.get(key)
2063 if value:
2064 ap_attachment[ap_key] = value
2065 ap_attachments.append(ap_attachment)
2066
2067 if public:
2068 ap_object["to"] = [NS_AP_PUBLIC]
2069 if self.auto_mentions:
2070 for m in RE_MENTION.finditer(ap_object["content"]):
2071 mention = m.group()
2072 mentioned = mention[1:]
2073 __, m_host = mentioned.split("@", 1)
2074 if m_host in (self.public_url, self.client.jid.host):
2075 # we ignore mention of local users, they should be sent as XMPP
2076 # references
2077 continue
2078 try:
2079 mentioned_id = await self.get_ap_actor_id_from_account(mentioned)
2080 except Exception as e:
2081 log.warning(f"Can't add mention to {mentioned!r}: {e}")
2082 else:
2083 ap_object["to"].append(mentioned_id)
2084 ap_object.setdefault("tag", []).append({
2085 "type": TYPE_MENTION,
2086 "href": mentioned_id,
2087 "name": mention,
2088 })
2089 try:
2090 node = mb_data["node"]
2091 service = jid.JID(mb_data["service"])
2092 except KeyError:
2093 # node and service must always be specified when this method is used
2094 raise exceptions.InternalError(
2095 "node or service is missing in mb_data"
2096 )
2097 target_ap_account = await self.get_ap_account_from_jid_and_node(
2098 service, node
2099 )
2100 if self.is_virtual_jid(service):
2101 # service is a proxy JID for AP account
2102 actor_data = await self.get_ap_actor_data_from_account(target_ap_account)
2103 followers = actor_data.get("followers")
2104 else:
2105 # service is a real XMPP entity
2106 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account)
2107 if followers:
2108 ap_object["cc"] = [followers]
2109 if self._m.is_comment_node(node):
2110 parent_item = self._m.get_parent_item(node)
2111 if self.is_virtual_jid(service):
2112 # the publication is on a virtual node (i.e. an XMPP node managed by
2113 # this gateway and linking to an ActivityPub actor)
2114 ap_object["inReplyTo"] = parent_item
2115 else:
2116 # the publication is from a followed real XMPP node
2117 ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node(
2118 client,
2119 ap_account,
2120 parent_item,
2121 mb_data
2122 )
2123
2124 return self.create_activity(
2125 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item
2126 )
2127
2128 async def publish_message(
2129 self,
2130 client: SatXMPPEntity,
2131 mess_data: dict,
2132 service: jid.JID
2133 ) -> None:
2134 """Send an AP message
2135
2136 .. note::
2137
2138 This is a temporary method used for development only
2139
2140 @param mess_data: message data. Following keys must be set:
2141
2142 ``node``
2143 identifier of message which is being replied (this will
2144 correspond to pubsub node in the future)
2145
2146 ``content_xhtml`` or ``content``
2147 message body (respectively in XHTML or plain text)
2148
2149 @param service: JID corresponding to the AP actor.
2150 """
2151 if not service.user:
2152 raise ValueError("service must have a local part")
2153 account = self._e.unescape(service.user)
2154 ap_actor_data = await self.get_ap_actor_data_from_account(account)
2155
2156 try:
2157 inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
2158 except KeyError:
2159 raise exceptions.DataError("Can't get ActivityPub actor inbox")
2160
2161 item_data = await self.mb_data_2_ap_item(client, mess_data)
2162 url_actor = item_data["actor"]
2163 resp = await self.sign_and_post(inbox_url, url_actor, item_data)
2164
2165 async def ap_delete_item(
2166 self,
2167 jid_: jid.JID,
2168 node: Optional[str],
2169 item_id: str,
2170 public: bool = True
2171 ) -> Tuple[str, Dict[str, Any]]:
2172 """Build activity to delete an AP item
2173
2174 @param jid_: JID of the entity deleting an item
2175 @param node: node where the item is deleted
2176 None if it's microblog or a message
2177 @param item_id: ID of the item to delete
2178 it's the Pubsub ID or message's origin ID
2179 @param public: if True, the activity will be addressed to public namespace
2180 @return: actor_id of the entity deleting the item, activity to send
2181 """
2182 if node is None:
2183 node = self._m.namespace
2184
2185 author_account = await self.get_ap_account_from_jid_and_node(jid_, node)
2186 author_actor_id = self.build_apurl(TYPE_ACTOR, author_account)
2187
2188 items = await self.host.memory.storage.search_pubsub_items({
2189 "profiles": [self.client.profile],
2190 "services": [jid_],
2191 "names": [item_id]
2192 })
2193 if not items:
2194 log.warning(
2195 f"Deleting an unknown item at service {jid_}, node {node} and id "
2196 f"{item_id}"
2197 )
2198 else:
2199 try:
2200 mb_data = await self._m.item_2_mb_data(self.client, items[0].data, jid_, node)
2201 if "repeated" in mb_data["extra"]:
2202 # we are deleting a repeated item, we must translate this to an
2203 # "Undo" of the "Announce" activity instead of a "Delete" one
2204 announce = await self.repeated_mb_2_ap_item(mb_data)
2205 undo = self.create_activity("Undo", author_actor_id, announce)
2206 return author_actor_id, undo
2207 except Exception as e:
2208 log.debug(
2209 f"Can't parse item, maybe it's not a blog item: {e}\n"
2210 f"{items[0].toXml()}"
2211 )
2212
2213 url_item = self.build_apurl(TYPE_ITEM, author_account, item_id)
2214 ap_item = self.create_activity(
2215 "Delete",
2216 author_actor_id,
2217 {
2218 "id": url_item,
2219 "type": TYPE_TOMBSTONE
2220 }
2221 )
2222 if public:
2223 ap_item["to"] = [NS_AP_PUBLIC]
2224 return author_actor_id, ap_item
2225
2226 def _message_received_trigger(
2227 self,
2228 client: SatXMPPEntity,
2229 message_elt: domish.Element,
2230 post_treat: defer.Deferred
2231 ) -> bool:
2232 """add the gateway workflow on post treatment"""
2233 if self.client is None:
2234 log.debug(f"no client set, ignoring message: {message_elt.toXml()}")
2235 return True
2236 post_treat.addCallback(
2237 lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data))
2238 )
2239 return True
2240
2241 async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict:
2242 """Called once message has been parsed
2243
2244 this method handle the conversion to AP items and posting
2245 """
2246 if client != self.client:
2247 return mess_data
2248 if mess_data["type"] not in ("chat", "normal"):
2249 log.warning(f"ignoring message with unexpected type: {mess_data}")
2250 return mess_data
2251 if not self.is_local(mess_data["from"]):
2252 log.warning(f"ignoring non local message: {mess_data}")
2253 return mess_data
2254 if not mess_data["to"].user:
2255 log.warning(
2256 f"ignoring message addressed to gateway itself: {mess_data}"
2257 )
2258 return mess_data
2259
2260 actor_account = self._e.unescape(mess_data["to"].user)
2261 actor_id = await self.get_ap_actor_id_from_account(actor_account)
2262 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
2263
2264 try:
2265 language, message = next(iter(mess_data["message"].items()))
2266 except (KeyError, StopIteration):
2267 log.warning(f"ignoring empty message: {mess_data}")
2268 return mess_data
2269
2270 mb_data = {
2271 "content": message,
2272 }
2273 if language:
2274 mb_data["language"] = language
2275 origin_id = mess_data["extra"].get("origin_id")
2276 if origin_id:
2277 # we need to use origin ID when present to be able to retract the message
2278 mb_data["id"] = origin_id
2279 attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS)
2280 if attachments:
2281 mb_data["extra"] = {
2282 C.KEY_ATTACHMENTS: attachments
2283 }
2284
2285 client = self.client.get_virtual_client(mess_data["from"])
2286 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False)
2287 ap_object = ap_item["object"]
2288 ap_object["to"] = ap_item["to"] = [actor_id]
2289 # we add a mention to direct message, otherwise peer is not notified in some AP
2290 # implementations (notably Mastodon), and the message may be missed easily.
2291 ap_object.setdefault("tag", []).append({
2292 "type": TYPE_MENTION,
2293 "href": actor_id,
2294 "name": f"@{actor_account}",
2295 })
2296
2297 await self.sign_and_post(inbox, ap_item["actor"], ap_item)
2298 return mess_data
2299
2300 async def _on_message_retract(
2301 self,
2302 client: SatXMPPEntity,
2303 message_elt: domish.Element,
2304 retract_elt: domish.Element,
2305 fastened_elts
2306 ) -> bool:
2307 if client != self.client:
2308 return True
2309 from_jid = jid.JID(message_elt["from"])
2310 if not self.is_local(from_jid):
2311 log.debug(
2312 f"ignoring retract request from non local jid {from_jid}"
2313 )
2314 return False
2315 to_jid = jid.JID(message_elt["to"])
2316 if (to_jid.host != self.client.jid.full() or not to_jid.user):
2317 # to_jid should be a virtual JID from this gateway
2318 raise exceptions.InternalError(
2319 f"Invalid destinee's JID: {to_jid.full()}"
2320 )
2321 ap_account = self._e.unescape(to_jid.user)
2322 actor_id = await self.get_ap_actor_id_from_account(ap_account)
2323 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
2324 url_actor, ap_item = await self.ap_delete_item(
2325 from_jid.userhostJID(), None, fastened_elts.id, public=False
2326 )
2327 resp = await self.sign_and_post(inbox, url_actor, ap_item)
2328 return False
2329
2330 async def _on_reference_received(
2331 self,
2332 client: SatXMPPEntity,
2333 message_elt: domish.Element,
2334 reference_data: Dict[str, Union[str, int]]
2335 ) -> bool:
2336 parsed_uri: dict = reference_data.get("parsed_uri")
2337 if not parsed_uri:
2338 log.warning(f"no parsed URI available in reference {reference_data}")
2339 return False
2340
2341 try:
2342 mentioned = jid.JID(parsed_uri["path"])
2343 except RuntimeError:
2344 log.warning(f"invalid target: {reference_data['uri']}")
2345 return False
2346
2347 if mentioned.host != self.client.jid.full() or not mentioned.user:
2348 log.warning(
2349 f"ignoring mentioned user {mentioned}, it's not a JID mapping an AP "
2350 "account"
2351 )
2352 return False
2353
2354 ap_account = self._e.unescape(mentioned.user)
2355 actor_id = await self.get_ap_actor_id_from_account(ap_account)
2356
2357 parsed_anchor: dict = reference_data.get("parsed_anchor")
2358 if not parsed_anchor:
2359 log.warning(f"no XMPP anchor, ignoring reference {reference_data!r}")
2360 return False
2361
2362 if parsed_anchor["type"] != "pubsub":
2363 log.warning(
2364 f"ignoring reference with non pubsub anchor, this is not supported: "
2365 "{reference_data!r}"
2366 )
2367 return False
2368
2369 try:
2370 pubsub_service = jid.JID(parsed_anchor["path"])
2371 except RuntimeError:
2372 log.warning(f"invalid anchor: {reference_data['anchor']}")
2373 return False
2374 pubsub_node = parsed_anchor.get("node")
2375 if not pubsub_node:
2376 log.warning(f"missing pubsub node in anchor: {reference_data['anchor']}")
2377 return False
2378 pubsub_item = parsed_anchor.get("item")
2379 if not pubsub_item:
2380 log.warning(f"missing pubsub item in anchor: {reference_data['anchor']}")
2381 return False
2382
2383 cached_node = await self.host.memory.storage.get_pubsub_node(
2384 client, pubsub_service, pubsub_node
2385 )
2386 if not cached_node:
2387 log.warning(f"Anchored node not found in cache: {reference_data['anchor']}")
2388 return False
2389
2390 cached_items, __ = await self.host.memory.storage.get_items(
2391 cached_node, item_ids=[pubsub_item]
2392 )
2393 if not cached_items:
2394 log.warning(
2395 f"Anchored pubsub item not found in cache: {reference_data['anchor']}"
2396 )
2397 return False
2398
2399 cached_item = cached_items[0]
2400
2401 mb_data = await self._m.item_2_mb_data(
2402 client, cached_item.data, pubsub_service, pubsub_node
2403 )
2404 ap_item = await self.mb_data_2_ap_item(client, mb_data)
2405 ap_object = ap_item["object"]
2406 ap_object["to"] = [actor_id]
2407 ap_object.setdefault("tag", []).append({
2408 "type": TYPE_MENTION,
2409 "href": actor_id,
2410 "name": ap_account,
2411 })
2412
2413 inbox = await self.get_ap_inbox_from_id(actor_id, use_shared=False)
2414
2415 resp = await self.sign_and_post(inbox, ap_item["actor"], ap_item)
2416
2417 return False
2418
2419 async def new_reply_to_xmpp_item(
2420 self,
2421 client: SatXMPPEntity,
2422 ap_item: dict,
2423 targets: Dict[str, Set[str]],
2424 mentions: List[Dict[str, str]],
2425 ) -> None:
2426 """We got an AP item which is a reply to an XMPP item"""
2427 in_reply_to = ap_item["inReplyTo"]
2428 url_type, url_args = self.parse_apurl(in_reply_to)
2429 if url_type != "item":
2430 log.warning(
2431 "Ignoring AP item replying to an XMPP item with an unexpected URL "
2432 f"type({url_type!r}):\n{pformat(ap_item)}"
2433 )
2434 return
2435 try:
2436 parent_item_account, parent_item_id = url_args[0], '/'.join(url_args[1:])
2437 except (IndexError, ValueError):
2438 log.warning(
2439 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
2440 f"({in_reply_to!r}):\n{pformat(ap_item)}"
2441 )
2442 return
2443 parent_item_service, parent_item_node = await self.get_jid_and_node(
2444 parent_item_account
2445 )
2446 if parent_item_node is None:
2447 parent_item_node = self._m.namespace
2448 items, __ = await self._p.get_items(
2449 client, parent_item_service, parent_item_node, item_ids=[parent_item_id]
2450 )
2451 try:
2452 parent_item_elt = items[0]
2453 except IndexError:
2454 log.warning(
2455 f"Can't find parent item at {parent_item_service} (node "
2456 f"{parent_item_node!r})\n{pformat(ap_item)}")
2457 return
2458 parent_item_parsed = await self._m.item_2_mb_data(
2459 client, parent_item_elt, parent_item_service, parent_item_node
2460 )
2461 try:
2462 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"])
2463 comment_node = parent_item_parsed["comments"][0]["node"]
2464 except (KeyError, IndexError):
2465 # we don't have a comment node set for this item
2466 from libervia.backend.tools.xml_tools import pp_elt
2467 log.info(f"{pp_elt(parent_item_elt.toXml())}")
2468 raise NotImplementedError()
2469 else:
2470 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
2471 await self._p.publish(client, comment_service, comment_node, [item_elt])
2472 await self.notify_mentions(
2473 targets, mentions, comment_service, comment_node, item_elt["id"]
2474 )
2475
2476 def get_ap_item_targets(
2477 self,
2478 item: Dict[str, Any]
2479 ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]:
2480 """Retrieve targets of an AP item, and indicate if it's a public one
2481
2482 @param item: AP object payload
2483 @return: Are returned:
2484 - is_public flag, indicating if the item is world-readable
2485 - a dict mapping target type to targets
2486 """
2487 targets: Dict[str, Set[str]] = {}
2488 is_public = False
2489 # TODO: handle "audience"
2490 for key in ("to", "bto", "cc", "bcc"):
2491 values = item.get(key)
2492 if not values:
2493 continue
2494 if isinstance(values, str):
2495 values = [values]
2496 for value in values:
2497 if value in PUBLIC_TUPLE:
2498 is_public = True
2499 continue
2500 if not value:
2501 continue
2502 if not self.is_local_url(value):
2503 continue
2504 target_type = self.parse_apurl(value)[0]
2505 if target_type != TYPE_ACTOR:
2506 log.debug(f"ignoring non actor type as a target: {href}")
2507 else:
2508 targets.setdefault(target_type, set()).add(value)
2509
2510 mentions = []
2511 tags = item.get("tag")
2512 if tags:
2513 for tag in tags:
2514 if tag.get("type") != TYPE_MENTION:
2515 continue
2516 href = tag.get("href")
2517 if not href:
2518 log.warning('Missing "href" field from mention object: {tag!r}')
2519 continue
2520 if not self.is_local_url(href):
2521 continue
2522 uri_type = self.parse_apurl(href)[0]
2523 if uri_type != TYPE_ACTOR:
2524 log.debug(f"ignoring non actor URI as a target: {href}")
2525 continue
2526 mention = {"uri": href}
2527 mentions.append(mention)
2528 name = tag.get("name")
2529 if name:
2530 mention["content"] = name
2531
2532 return is_public, targets, mentions
2533
2534 async def new_ap_item(
2535 self,
2536 client: SatXMPPEntity,
2537 destinee: Optional[jid.JID],
2538 node: str,
2539 item: dict,
2540 ) -> None:
2541 """Analyse, cache and send notification for received AP item
2542
2543 @param destinee: jid of the destinee,
2544 @param node: XMPP pubsub node
2545 @param item: AP object payload
2546 """
2547 is_public, targets, mentions = self.get_ap_item_targets(item)
2548 if not is_public and targets.keys() == {TYPE_ACTOR}:
2549 # this is a direct message
2550 await self.handle_message_ap_item(
2551 client, targets, mentions, destinee, item
2552 )
2553 else:
2554 await self.handle_pubsub_ap_item(
2555 client, targets, mentions, destinee, node, item, is_public
2556 )
2557
2558 async def handle_message_ap_item(
2559 self,
2560 client: SatXMPPEntity,
2561 targets: Dict[str, Set[str]],
2562 mentions: List[Dict[str, str]],
2563 destinee: Optional[jid.JID],
2564 item: dict,
2565 ) -> None:
2566 """Parse and deliver direct AP items translating to XMPP messages
2567
2568 @param targets: actors where the item must be delivered
2569 @param destinee: jid of the destinee,
2570 @param item: AP object payload
2571 """
2572 targets_jids = {
2573 await self.get_jid_from_id(t)
2574 for t_set in targets.values()
2575 for t in t_set
2576 }
2577 if destinee is not None:
2578 targets_jids.add(destinee)
2579 mb_data = await self.ap_item_2_mb_data(item)
2580 extra = {
2581 "origin_id": mb_data["id"]
2582 }
2583 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS)
2584 if attachments:
2585 extra[C.KEY_ATTACHMENTS] = attachments
2586
2587 defer_l = []
2588 for target_jid in targets_jids:
2589 defer_l.append(
2590 client.sendMessage(
2591 target_jid,
2592 {'': mb_data.get("content", "")},
2593 mb_data.get("title"),
2594 extra=extra
2595 )
2596 )
2597 await defer.DeferredList(defer_l)
2598
2599 async def notify_mentions(
2600 self,
2601 targets: Dict[str, Set[str]],
2602 mentions: List[Dict[str, str]],
2603 service: jid.JID,
2604 node: str,
2605 item_id: str,
2606 ) -> None:
2607 """Send mention notifications to recipients and mentioned entities
2608
2609 XEP-0372 (References) is used.
2610
2611 Mentions are also sent to recipients as they are primary audience (see
2612 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes).
2613
2614 """
2615 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id)
2616 seen = set()
2617 # we start with explicit mentions because mentions' content will be used in the
2618 # future to fill "begin" and "end" reference attributes (we can't do it at the
2619 # moment as there is no way to specify the XML element to use in the blog item).
2620 for mention in mentions:
2621 mentioned_jid = await self.get_jid_from_id(mention["uri"])
2622 self._refs.send_reference(
2623 self.client,
2624 to_jid=mentioned_jid,
2625 anchor=anchor
2626 )
2627 seen.add(mentioned_jid)
2628
2629 remaining = {
2630 await self.get_jid_from_id(t)
2631 for t_set in targets.values()
2632 for t in t_set
2633 } - seen
2634 for target in remaining:
2635 self._refs.send_reference(
2636 self.client,
2637 to_jid=target,
2638 anchor=anchor
2639 )
2640
2641 async def handle_pubsub_ap_item(
2642 self,
2643 client: SatXMPPEntity,
2644 targets: Dict[str, Set[str]],
2645 mentions: List[Dict[str, str]],
2646 destinee: Optional[jid.JID],
2647 node: str,
2648 item: dict,
2649 public: bool
2650 ) -> None:
2651 """Analyse, cache and deliver AP items translating to Pubsub
2652
2653 @param targets: actors/collections where the item must be delivered
2654 @param destinee: jid of the destinee,
2655 @param node: XMPP pubsub node
2656 @param item: AP object payload
2657 @param public: True if the item is public
2658 """
2659 # XXX: "public" is not used for now
2660 service = client.jid
2661 in_reply_to = item.get("inReplyTo")
2662
2663 if in_reply_to and isinstance(in_reply_to, list):
2664 in_reply_to = in_reply_to[0]
2665 if in_reply_to and isinstance(in_reply_to, str):
2666 if self.is_local_url(in_reply_to):
2667 # this is a reply to an XMPP item
2668 await self.new_reply_to_xmpp_item(client, item, targets, mentions)
2669 return
2670
2671 # this item is a reply to an AP item, we use or create a corresponding node
2672 # for comments
2673 parent_node, __ = await self.get_comments_nodes(item["id"], in_reply_to)
2674 node = parent_node or node
2675 cached_node = await self.host.memory.storage.get_pubsub_node(
2676 client, service, node, with_subscriptions=True, create=True,
2677 create_kwargs={"subscribed": True}
2678 )
2679 else:
2680 # it is a root item (i.e. not a reply to an other item)
2681 create = node == self._events.namespace
2682 cached_node = await self.host.memory.storage.get_pubsub_node(
2683 client, service, node, with_subscriptions=True, create=create
2684 )
2685 if cached_node is None:
2686 log.warning(
2687 f"Received item in unknown node {node!r} at {service}. This may be "
2688 f"due to a cache purge. We synchronise the node\n{item}"
2689
2690 )
2691 return
2692 if item.get("type") == TYPE_EVENT:
2693 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item)
2694 else:
2695 data, item_elt = await self.ap_item_2_mb_data_and_elt(item)
2696 await self.host.memory.storage.cache_pubsub_items(
2697 client,
2698 cached_node,
2699 [item_elt],
2700 [data]
2701 )
2702
2703 for subscription in cached_node.subscriptions:
2704 if subscription.state != SubscriptionState.SUBSCRIBED:
2705 continue
2706 self.pubsub_service.notifyPublish(
2707 service,
2708 node,
2709 [(subscription.subscriber, None, [item_elt])]
2710 )
2711
2712 await self.notify_mentions(targets, mentions, service, node, item_elt["id"])
2713
2714 async def new_ap_delete_item(
2715 self,
2716 client: SatXMPPEntity,
2717 destinee: Optional[jid.JID],
2718 node: str,
2719 item: dict,
2720 ) -> None:
2721 """Analyse, cache and send notification for received AP item
2722
2723 @param destinee: jid of the destinee,
2724 @param node: XMPP pubsub node
2725 @param activity: parent AP activity
2726 @param item: AP object payload
2727 only the "id" field is used
2728 """
2729 item_id = item.get("id")
2730 if not item_id:
2731 raise exceptions.DataError('"id" attribute is missing in item')
2732 if not item_id.startswith("http"):
2733 raise exceptions.DataError(f"invalid id: {item_id!r}")
2734 if self.is_local_url(item_id):
2735 raise ValueError("Local IDs should not be used")
2736
2737 # we have no way to know if a deleted item is a direct one (thus a message) or one
2738 # converted to pubsub. We check if the id is in message history to decide what to
2739 # do.
2740 history = await self.host.memory.storage.get(
2741 client,
2742 History,
2743 History.origin_id,
2744 item_id,
2745 (History.messages, History.subjects)
2746 )
2747
2748 if history is not None:
2749 # it's a direct message
2750 if history.source_jid != client.jid:
2751 log.warning(
2752 f"retraction received from an entity ''{client.jid}) which is "
2753 f"not the original sender of the message ({history.source_jid}), "
2754 "hack attemps?"
2755 )
2756 raise exceptions.PermissionError("forbidden")
2757
2758 await self._r.retract_by_history(client, history)
2759 else:
2760 # no history in cache with this ID, it's probably a pubsub item
2761 cached_node = await self.host.memory.storage.get_pubsub_node(
2762 client, client.jid, node, with_subscriptions=True
2763 )
2764 if cached_node is None:
2765 log.warning(
2766 f"Received an item retract for node {node!r} at {client.jid} "
2767 "which is not cached"
2768 )
2769 raise exceptions.NotFound
2770 await self.host.memory.storage.delete_pubsub_items(cached_node, [item_id])
2771 # notifyRetract is expecting domish.Element instances
2772 item_elt = domish.Element((None, "item"))
2773 item_elt["id"] = item_id
2774 for subscription in cached_node.subscriptions:
2775 if subscription.state != SubscriptionState.SUBSCRIBED:
2776 continue
2777 self.pubsub_service.notifyRetract(
2778 client.jid,
2779 node,
2780 [(subscription.subscriber, None, [item_elt])]
2781 )