comparison libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents d366d90a71aa
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
85 TYPE_LIKE, 85 TYPE_LIKE,
86 TYPE_MENTION, 86 TYPE_MENTION,
87 TYPE_REACTION, 87 TYPE_REACTION,
88 TYPE_TOMBSTONE, 88 TYPE_TOMBSTONE,
89 TYPE_JOIN, 89 TYPE_JOIN,
90 TYPE_LEAVE 90 TYPE_LEAVE,
91 ) 91 )
92 from .http_server import HTTPServer 92 from .http_server import HTTPServer
93 from .pubsub_service import APPubsubService 93 from .pubsub_service import APPubsubService
94 from .regex import RE_MENTION 94 from .regex import RE_MENTION
95 95
103 C.PI_IMPORT_NAME: IMPORT_NAME, 103 C.PI_IMPORT_NAME: IMPORT_NAME,
104 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 104 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
105 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 105 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
106 C.PI_PROTOCOLS: [], 106 C.PI_PROTOCOLS: [],
107 C.PI_DEPENDENCIES: [ 107 C.PI_DEPENDENCIES: [
108 "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", 108 "XEP-0050",
109 "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", 109 "XEP-0054",
110 "XEP-0447", "XEP-0471", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY" 110 "XEP-0060",
111 "XEP-0084",
112 "XEP-0106",
113 "XEP-0277",
114 "XEP-0292",
115 "XEP-0329",
116 "XEP-0372",
117 "XEP-0424",
118 "XEP-0465",
119 "XEP-0470",
120 "XEP-0447",
121 "XEP-0471",
122 "PUBSUB_CACHE",
123 "TEXT_SYNTAXES",
124 "IDENTITY",
111 ], 125 ],
112 C.PI_RECOMMENDATIONS: [], 126 C.PI_RECOMMENDATIONS: [],
113 C.PI_MAIN: "APGateway", 127 C.PI_MAIN: "APGateway",
114 C.PI_HANDLER: C.BOOL_TRUE, 128 C.PI_HANDLER: C.BOOL_TRUE,
115 C.PI_DESCRIPTION: _( 129 C.PI_DESCRIPTION: _(
158 "", 172 "",
159 items_cb=self._items_received, 173 items_cb=self._items_received,
160 # we want to be sure that the callbacks are launched before pubsub cache's 174 # we want to be sure that the callbacks are launched before pubsub cache's
161 # one, as we need to inspect items before they are actually removed from cache 175 # one, as we need to inspect items before they are actually removed from cache
162 # or updated 176 # or updated
163 priority=1000 177 priority=1000,
164 ) 178 )
165 self.pubsub_service = APPubsubService(self) 179 self.pubsub_service = APPubsubService(self)
166 self.ad_hoc = APAdHocService(self) 180 self.ad_hoc = APAdHocService(self)
167 self.ap_events = APEvents(self) 181 self.ap_events = APEvents(self)
168 host.trigger.add_with_check( 182 host.trigger.add_with_check(
169 "message_received", 183 "message_received", self, self._message_received_trigger, priority=-1000
170 self,
171 self._message_received_trigger,
172 priority=-1000
173 ) 184 )
174 host.trigger.add_with_check( 185 host.trigger.add_with_check(
175 "XEP-0424_retract_received", 186 "XEP-0424_retract_received", self, self._on_message_retract
176 self,
177 self._on_message_retract
178 ) 187 )
179 host.trigger.add_with_check( 188 host.trigger.add_with_check(
180 "XEP-0372_ref_received", 189 "XEP-0372_ref_received", self, self._on_reference_received
181 self,
182 self._on_reference_received
183 ) 190 )
184 191
185 host.bridge.add_method( 192 host.bridge.add_method(
186 "ap_send", 193 "ap_send",
187 ".plugin", 194 ".plugin",
213 key_size=4096, 220 key_size=4096,
214 ) 221 )
215 private_key_pem = self.private_key.private_bytes( 222 private_key_pem = self.private_key.private_bytes(
216 encoding=serialization.Encoding.PEM, 223 encoding=serialization.Encoding.PEM,
217 format=serialization.PrivateFormat.PKCS8, 224 format=serialization.PrivateFormat.PKCS8,
218 encryption_algorithm=serialization.NoEncryption() 225 encryption_algorithm=serialization.NoEncryption(),
219 ).decode() 226 ).decode()
220 await self.host.memory.storage.set_private_value( 227 await self.host.memory.storage.set_private_value(
221 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile 228 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile
222 ) 229 )
223 else: 230 else:
226 password=None, 233 password=None,
227 ) 234 )
228 self.public_key = self.private_key.public_key() 235 self.public_key = self.private_key.public_key()
229 self.public_key_pem = self.public_key.public_bytes( 236 self.public_key_pem = self.public_key.public_bytes(
230 encoding=serialization.Encoding.PEM, 237 encoding=serialization.Encoding.PEM,
231 format=serialization.PublicFormat.SubjectPublicKeyInfo 238 format=serialization.PublicFormat.SubjectPublicKeyInfo,
232 ).decode() 239 ).decode()
233 240
234 # params 241 # params
235 # URL and port 242 # URL and port
236 self.public_url = self.host.memory.config_get( 243 self.public_url = self.host.memory.config_get(
237 CONF_SECTION, "public_url" 244 CONF_SECTION, "public_url"
238 ) or self.host.memory.config_get( 245 ) or self.host.memory.config_get(CONF_SECTION, "xmpp_domain")
239 CONF_SECTION, "xmpp_domain"
240 )
241 if self.public_url is None: 246 if self.public_url is None:
242 log.error( 247 log.error(
243 '"public_url" not set in configuration, this is mandatory to have' 248 '"public_url" not set in configuration, this is mandatory to have'
244 "ActivityPub Gateway running. Please set this option it to public facing " 249 "ActivityPub Gateway running. Please set this option it to public facing "
245 f"url in {CONF_SECTION!r} configuration section." 250 f"url in {CONF_SECTION!r} configuration section."
246 ) 251 )
247 return 252 return
248 if parse.urlparse(self.public_url).scheme: 253 if parse.urlparse(self.public_url).scheme:
249 log.error( 254 log.error(
250 "Scheme must not be specified in \"public_url\", please remove it from " 255 'Scheme must not be specified in "public_url", please remove it from '
251 "\"public_url\" configuration option. ActivityPub Gateway won't be run." 256 '"public_url" configuration option. ActivityPub Gateway won\'t be run.'
252 ) 257 )
253 return 258 return
254 self.http_port = int(self.host.memory.config_get( 259 self.http_port = int(self.host.memory.config_get(CONF_SECTION, "http_port", 8123))
255 CONF_SECTION, 'http_port', 8123))
256 connection_type = self.host.memory.config_get( 260 connection_type = self.host.memory.config_get(
257 CONF_SECTION, 'http_connection_type', 'https') 261 CONF_SECTION, "http_connection_type", "https"
258 if connection_type not in ('http', 'https'): 262 )
263 if connection_type not in ("http", "https"):
259 raise exceptions.ConfigError( 264 raise exceptions.ConfigError(
260 'bad ap-gateay http_connection_type, you must use one of "http" or ' 265 'bad ap-gateay http_connection_type, you must use one of "http" or '
261 '"https"' 266 '"https"'
262 ) 267 )
263 self.http_sign_get = C.bool( 268 self.http_sign_get = C.bool(
264 self.host.memory.config_get(CONF_SECTION, "http_sign_get", C.BOOL_TRUE) 269 self.host.memory.config_get(CONF_SECTION, "http_sign_get", C.BOOL_TRUE)
265 ) 270 )
266 self.max_items = int(self.host.memory.config_get( 271 self.max_items = int(
267 CONF_SECTION, 'new_node_max_items', 50 272 self.host.memory.config_get(CONF_SECTION, "new_node_max_items", 50)
268 273 )
269 )) 274 self.comments_max_depth = int(
270 self.comments_max_depth = int(self.host.memory.config_get( 275 self.host.memory.config_get(CONF_SECTION, "comments_max_depth", 0)
271 CONF_SECTION, 'comments_max_depth', 0 276 )
272 )) 277 self.ap_path = self.host.memory.config_get(CONF_SECTION, "ap_path", "_ap")
273 self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap')
274 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") 278 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/")
275 # True (default) if we provide gateway only to entities/services from our server 279 # True (default) if we provide gateway only to entities/services from our server
276 self.local_only = C.bool( 280 self.local_only = C.bool(
277 self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE) 281 self.host.memory.config_get(CONF_SECTION, "local_only", C.BOOL_TRUE)
278 ) 282 )
279 # if True (default), mention will be parsed in non-private content coming from 283 # if True (default), mention will be parsed in non-private content coming from
280 # XMPP. This is necessary as XEP-0372 are coming separately from item where the 284 # XMPP. This is necessary as XEP-0372 are coming separately from item where the
281 # mention is done, which is hard to impossible to translate to ActivityPub (where 285 # mention is done, which is hard to impossible to translate to ActivityPub (where
282 # mention specified inside the item directly). See documentation for details. 286 # mention specified inside the item directly). See documentation for details.
283 self.auto_mentions = C.bool( 287 self.auto_mentions = C.bool(
284 self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE) 288 self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE)
285 ) 289 )
286 290
287 html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get( 291 html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get(
288 CONF_SECTION, 'html_redirect_dict', {} 292 CONF_SECTION, "html_redirect_dict", {}
289 ) 293 )
290 self.html_redirect: Dict[str, List[dict]] = {} 294 self.html_redirect: Dict[str, List[dict]] = {}
291 for url_type, target in html_redirect.items(): 295 for url_type, target in html_redirect.items():
292 if isinstance(target, str): 296 if isinstance(target, str):
293 target = {"url": target} 297 target = {"url": target}
305 filters["node"] = node_filter 309 filters["node"] = node_filter
306 self.html_redirect.setdefault(url_type, []).append(target) 310 self.html_redirect.setdefault(url_type, []).append(target)
307 311
308 # HTTP server launch 312 # HTTP server launch
309 self.server = HTTPServer(self) 313 self.server = HTTPServer(self)
310 if connection_type == 'http': 314 if connection_type == "http":
311 reactor.listenTCP(self.http_port, self.server) 315 reactor.listenTCP(self.http_port, self.server)
312 else: 316 else:
313 options = tls.get_options_from_config( 317 options = tls.get_options_from_config(self.host.memory.config, CONF_SECTION)
314 self.host.memory.config, CONF_SECTION)
315 tls.tls_options_check(options) 318 tls.tls_options_check(options)
316 context_factory = tls.get_tls_context_factory(options) 319 context_factory = tls.get_tls_context_factory(options)
317 reactor.listenSSL(self.http_port, self.server, context_factory) 320 reactor.listenSSL(self.http_port, self.server, context_factory)
318 321
319 async def profile_connecting(self, client): 322 async def profile_connecting(self, client):
320 self.client = client 323 self.client = client
321 client.sendHistory = True 324 client.sendHistory = True
322 client._ap_storage = persistent.LazyPersistentBinaryDict( 325 client._ap_storage = persistent.LazyPersistentBinaryDict(
323 IMPORT_NAME, 326 IMPORT_NAME, client.profile
324 client.profile
325 ) 327 )
326 await self.init(client) 328 await self.init(client)
327 329
328 def profile_connected(self, client): 330 def profile_connected(self, client):
329 self.ad_hoc.init(client) 331 self.ad_hoc.init(client)
330 332
331 async def _items_received( 333 async def _items_received(
332 self, 334 self, client: SatXMPPEntity, itemsEvent: pubsub.ItemsEvent
333 client: SatXMPPEntity,
334 itemsEvent: pubsub.ItemsEvent
335 ) -> None: 335 ) -> None:
336 """Callback called when pubsub items are received 336 """Callback called when pubsub items are received
337 337
338 if the items are adressed to a JID corresponding to an AP actor, they are 338 if the items are adressed to a JID corresponding to an AP actor, they are
339 converted to AP items and sent to the corresponding AP server. 339 converted to AP items and sent to the corresponding AP server.
353 353
354 ap_account = self._e.unescape(recipient.user) 354 ap_account = self._e.unescape(recipient.user)
355 355
356 if self._pa.is_attachment_node(itemsEvent.nodeIdentifier): 356 if self._pa.is_attachment_node(itemsEvent.nodeIdentifier):
357 await self.convert_and_post_attachments( 357 await self.convert_and_post_attachments(
358 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, 358 client,
359 itemsEvent.items 359 ap_account,
360 itemsEvent.sender,
361 itemsEvent.nodeIdentifier,
362 itemsEvent.items,
360 ) 363 )
361 else: 364 else:
362 await self.convert_and_post_items( 365 await self.convert_and_post_items(
363 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, 366 client,
364 itemsEvent.items 367 ap_account,
368 itemsEvent.sender,
369 itemsEvent.nodeIdentifier,
370 itemsEvent.items,
365 ) 371 )
366 372
367 async def get_virtual_client( 373 async def get_virtual_client(
368 self, 374 self, requestor_actor_id: str, actor_id: str
369 requestor_actor_id: str,
370 actor_id: str
371 ) -> SatXMPPEntity: 375 ) -> SatXMPPEntity:
372 """Get client for this component with a specified jid 376 """Get client for this component with a specified jid
373 377
374 This is needed to perform operations with the virtual JID corresponding to the AP 378 This is needed to perform operations with the virtual JID corresponding to the AP
375 actor instead of the JID of the gateway itself. 379 actor instead of the JID of the gateway itself.
411 raise exceptions.ExternalRequestError(msg) 415 raise exceptions.ExternalRequestError(msg)
412 try: 416 try:
413 return await treq.json_content(resp) 417 return await treq.json_content(resp)
414 except Exception as e: 418 except Exception as e:
415 raise error.StanzaError( 419 raise error.StanzaError(
416 "service-unavailable", 420 "service-unavailable", text=f"Can't get AP data at {url}: {e}"
417 text=f"Can't get AP data at {url}: {e}"
418 ) 421 )
419 422
420 async def ap_post(self, url: str, requestor_actor_id: str, doc: dict) -> TReqResponse: 423 async def ap_post(self, url: str, requestor_actor_id: str, doc: dict) -> TReqResponse:
421 """Sign a document and post it to AP server 424 """Sign a document and post it to AP server
422 425
427 if self.verbose: 430 if self.verbose:
428 __, actor_args = self.parse_apurl(requestor_actor_id) 431 __, actor_args = self.parse_apurl(requestor_actor_id)
429 actor_account = actor_args[0] 432 actor_account = actor_args[0]
430 to_log = [ 433 to_log = [
431 "", 434 "",
432 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" 435 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}",
433 ] 436 ]
434 437
435 body = json.dumps(doc).encode() 438 body = json.dumps(doc).encode()
436 headers = self._generate_signed_headers(url, requestor_actor_id, method="post", body=body) 439 headers = self._generate_signed_headers(
440 url, requestor_actor_id, method="post", body=body
441 )
437 headers["Content-Type"] = MEDIA_TYPE_AP 442 headers["Content-Type"] = MEDIA_TYPE_AP
438 443
439 if self.verbose: 444 if self.verbose:
440 if self.verbose >= 3: 445 if self.verbose >= 3:
441 h_to_log = "\n".join(f" {k}: {v}" for k, v in headers.items()) 446 h_to_log = "\n".join(f" {k}: {v}" for k, v in headers.items())
442 to_log.append(f" headers:\n{h_to_log}") 447 to_log.append(f" headers:\n{h_to_log}")
443 to_log.append("---") 448 to_log.append("---")
444 log.info("\n".join(to_log)) 449 log.info("\n".join(to_log))
445 450
446 resp = await treq.post( 451 resp = await treq.post(url, body, headers=headers)
447 url,
448 body,
449 headers=headers
450 )
451 if resp.code >= 300: 452 if resp.code >= 300:
452 text = await resp.text() 453 text = await resp.text()
453 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") 454 log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
454 elif self.verbose: 455 elif self.verbose:
455 log.info(f"==> response code: {resp.code}") 456 log.info(f"==> response code: {resp.code}")
456 return resp 457 return resp
457 458
458 def _generate_signed_headers( 459 def _generate_signed_headers(
459 self, 460 self, url: str, actor_id: str, method: str, body: bytes | None = None
460 url: str,
461 actor_id: str,
462 method: str,
463 body: bytes|None = None
464 ) -> dict[str, str]: 461 ) -> dict[str, str]:
465 """Generate HTTP headers with signature for a given request 462 """Generate HTTP headers with signature for a given request
466 463
467 @param url: AP server endpoint 464 @param url: AP server endpoint
468 @param actor_id: originating actor ID (URL) 465 @param actor_id: originating actor ID (URL)
472 """ 469 """
473 p_url = parse.urlparse(url) 470 p_url = parse.urlparse(url)
474 headers = { 471 headers = {
475 "(request-target)": f"{method} {p_url.path}", 472 "(request-target)": f"{method} {p_url.path}",
476 "Host": p_url.hostname, 473 "Host": p_url.hostname,
477 "Date": http.datetimeToString().decode() 474 "Date": http.datetimeToString().decode(),
478 } 475 }
479 476
480 if body: 477 if body:
481 digest_algo, digest_hash = self.get_digest(body) 478 digest_algo, digest_hash = self.get_digest(body)
482 headers["Digest"] = f"{digest_algo}={digest_hash}" 479 headers["Digest"] = f"{digest_algo}={digest_hash}"
485 return headers 482 return headers
486 483
487 @overload 484 @overload
488 async def ap_get_object( 485 async def ap_get_object(
489 self, requestor_actor_id: str, data: dict, key: str 486 self, requestor_actor_id: str, data: dict, key: str
490 ) -> dict|None: 487 ) -> dict | None: ...
491 ...
492 488
493 @overload 489 @overload
494 async def ap_get_object( 490 async def ap_get_object(
495 self, requestor_actor_id: str, data: Union[str, dict], key: None = None 491 self, requestor_actor_id: str, data: Union[str, dict], key: None = None
496 ) -> dict: 492 ) -> dict: ...
497 ... 493
498 494 async def ap_get_object(self, requestor_actor_id: str, data, key=None) -> dict | None:
499 async def ap_get_object(self, requestor_actor_id: str, data, key = None) -> dict|None:
500 """Retrieve an AP object, dereferencing when necessary 495 """Retrieve an AP object, dereferencing when necessary
501 496
502 This method is to be used with attributes marked as "Functional" in 497 This method is to be used with attributes marked as "Functional" in
503 https://www.w3.org/TR/activitystreams-vocabulary 498 https://www.w3.org/TR/activitystreams-vocabulary
504 @param requestor_actor_id: ID of the actor doing the request. 499 @param requestor_actor_id: ID of the actor doing the request.
524 else: 519 else:
525 raise NotImplementedError( 520 raise NotImplementedError(
526 "was expecting a string or a dict, got {type(value)}: {value!r}}" 521 "was expecting a string or a dict, got {type(value)}: {value!r}}"
527 ) 522 )
528 523
529 async def ap_get_local_object( 524 async def ap_get_local_object(self, url: str) -> dict:
530 self,
531 url: str
532 ) -> dict:
533 """Retrieve or generate local object 525 """Retrieve or generate local object
534 526
535 for now, only handle XMPP items to convert to AP 527 for now, only handle XMPP items to convert to AP
536 """ 528 """
537 url_type, url_args = self.parse_apurl(url) 529 url_type, url_args = self.parse_apurl(url)
594 raise NotImplementedError( 586 raise NotImplementedError(
595 'only object from "item" URLs can be retrieved for now' 587 'only object from "item" URLs can be retrieved for now'
596 ) 588 )
597 589
598 async def ap_get_list( 590 async def ap_get_list(
599 self, 591 self, requestor_actor_id: str, data: dict, key: str, only_ids: bool = False
600 requestor_actor_id: str,
601 data: dict,
602 key: str,
603 only_ids: bool = False
604 ) -> Optional[List[Dict[str, Any]]]: 592 ) -> Optional[List[Dict[str, Any]]]:
605 """Retrieve a list of objects from AP data, dereferencing when necessary 593 """Retrieve a list of objects from AP data, dereferencing when necessary
606 594
607 This method is to be used with non functional vocabularies. Use ``ap_get_object`` 595 This method is to be used with non functional vocabularies. Use ``ap_get_object``
608 otherwise. 596 otherwise.
624 if isinstance(value, dict): 612 if isinstance(value, dict):
625 return [value] 613 return [value]
626 if not isinstance(value, list): 614 if not isinstance(value, list):
627 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") 615 raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
628 if only_ids: 616 if only_ids:
629 return [ 617 return [{"id": v["id"]} if isinstance(v, dict) else {"id": v} for v in value]
630 {"id": v["id"]} if isinstance(v, dict) else {"id": v}
631 for v in value
632 ]
633 else: 618 else:
634 return [await self.ap_get_object(requestor_actor_id, i) for i in value] 619 return [await self.ap_get_object(requestor_actor_id, i) for i in value]
635 620
636 async def ap_get_actors( 621 async def ap_get_actors(
637 self, 622 self, requestor_actor_id: str, data: dict, key: str, as_account: bool = True
638 requestor_actor_id: str,
639 data: dict,
640 key: str,
641 as_account: bool = True
642 ) -> List[str]: 623 ) -> List[str]:
643 """Retrieve AP actors from data 624 """Retrieve AP actors from data
644 625
645 @param requestor_actor_id: ID of the actor doing the request. 626 @param requestor_actor_id: ID of the actor doing the request.
646 @param data: AP object containing a field with actors 627 @param data: AP object containing a field with actors
669 except (TypeError, KeyError): 650 except (TypeError, KeyError):
670 raise exceptions.DataError( 651 raise exceptions.DataError(
671 f"invalid actors list to object {data.get('id')!r}: {value!r}" 652 f"invalid actors list to object {data.get('id')!r}: {value!r}"
672 ) 653 )
673 if not value: 654 if not value:
674 raise exceptions.DataError( 655 raise exceptions.DataError(f"list of actors is empty")
675 f"list of actors is empty"
676 )
677 if as_account: 656 if as_account:
678 return [ 657 return [
679 await self.get_ap_account_from_id(requestor_actor_id, actor_id) 658 await self.get_ap_account_from_id(requestor_actor_id, actor_id)
680 for actor_id in value 659 for actor_id in value
681 ] 660 ]
695 @param data: AP object 674 @param data: AP object
696 @return: actor id of the sender 675 @return: actor id of the sender
697 @raise exceptions.NotFound: no actor has been found in data 676 @raise exceptions.NotFound: no actor has been found in data
698 """ 677 """
699 try: 678 try:
700 actors = await self.ap_get_actors(requestor_actor_id, data, "actor", as_account=False) 679 actors = await self.ap_get_actors(
680 requestor_actor_id, data, "actor", as_account=False
681 )
701 except exceptions.DataError: 682 except exceptions.DataError:
702 actors = None 683 actors = None
703 if not actors: 684 if not actors:
704 try: 685 try:
705 actors = await self.ap_get_actors(requestor_actor_id, data, "attributedTo", as_account=False) 686 actors = await self.ap_get_actors(
687 requestor_actor_id, data, "attributedTo", as_account=False
688 )
706 except exceptions.DataError: 689 except exceptions.DataError:
707 raise exceptions.NotFound( 690 raise exceptions.NotFound(
708 'actor not specified in "actor" or "attributedTo"' 691 'actor not specified in "actor" or "attributedTo"'
709 ) 692 )
710 try: 693 try:
713 raise exceptions.NotFound("list of actors is empty") 696 raise exceptions.NotFound("list of actors is empty")
714 697
715 def must_encode(self, text: str) -> bool: 698 def must_encode(self, text: str) -> bool:
716 """Indicate if a text must be period encoded""" 699 """Indicate if a text must be period encoded"""
717 return ( 700 return (
718 not RE_ALLOWED_UNQUOTED.match(text) 701 not RE_ALLOWED_UNQUOTED.match(text) or text.startswith("___") or "---" in text
719 or text.startswith("___")
720 or "---" in text
721 ) 702 )
722 703
723 def period_encode(self, text: str) -> str: 704 def period_encode(self, text: str) -> str:
724 """Period encode a text 705 """Period encode a text
725 706
733 .replace("~", "%7e") 714 .replace("~", "%7e")
734 .replace("%", ".") 715 .replace("%", ".")
735 ) 716 )
736 717
737 async def get_ap_account_from_jid_and_node( 718 async def get_ap_account_from_jid_and_node(
738 self, 719 self, jid_: jid.JID, node: Optional[str]
739 jid_: jid.JID,
740 node: Optional[str]
741 ) -> str: 720 ) -> str:
742 """Construct AP account from JID and node 721 """Construct AP account from JID and node
743 722
744 The account construction will use escaping when necessary 723 The account construction will use escaping when necessary
745 """ 724 """
776 raise MissingLocalPartError("there should be a local part") 755 raise MissingLocalPartError("there should be a local part")
777 756
778 if node: 757 if node:
779 account_elts.extend((node, "---")) 758 account_elts.extend((node, "---"))
780 759
781 account_elts.extend(( 760 account_elts.extend(
782 user, "@", jid_.host if is_local else self.client.jid.userhost() 761 (user, "@", jid_.host if is_local else self.client.jid.userhost())
783 )) 762 )
784 return "".join(account_elts) 763 return "".join(account_elts)
785 764
786 def is_local(self, jid_: jid.JID) -> bool: 765 def is_local(self, jid_: jid.JID) -> bool:
787 """Returns True if jid_ use a domain or subdomain of gateway's host""" 766 """Returns True if jid_ use a domain or subdomain of gateway's host"""
788 local_host = self.client.host.split(".") 767 local_host = self.client.host.split(".")
789 assert local_host 768 assert local_host
790 return jid_.host.split(".")[-len(local_host):] == local_host 769 return jid_.host.split(".")[-len(local_host) :] == local_host
791 770
792 async def is_pubsub(self, jid_: jid.JID) -> bool: 771 async def is_pubsub(self, jid_: jid.JID) -> bool:
793 """Indicate if a JID is a Pubsub service""" 772 """Indicate if a JID is a Pubsub service"""
794 host_disco = await self.host.get_disco_infos(self.client, jid_) 773 host_disco = await self.host.get_disco_infos(self.client, jid_)
795 return ( 774 return ("pubsub", "service") in host_disco.identities and not (
796 ("pubsub", "service") in host_disco.identities 775 "pubsub",
797 and not ("pubsub", "pep") in host_disco.identities 776 "pep",
798 ) 777 ) in host_disco.identities
799 778
800 async def get_jid_and_node(self, ap_account: str) -> tuple[jid.JID, str|None]: 779 async def get_jid_and_node(self, ap_account: str) -> tuple[jid.JID, str | None]:
801 """Decode raw AP account handle to get XMPP JID and Pubsub Node 780 """Decode raw AP account handle to get XMPP JID and Pubsub Node
802 781
803 Username are case insensitive. 782 Username are case insensitive.
804 783
805 By default, the username correspond to local username (i.e. username from 784 By default, the username correspond to local username (i.e. username from
854 else: 833 else:
855 node = None 834 node = None
856 835
857 if encoded: 836 if encoded:
858 username = parse.unquote( 837 username = parse.unquote(
859 RE_PERIOD_ENC.sub(r"%\g<hex>", username), 838 RE_PERIOD_ENC.sub(r"%\g<hex>", username), errors="strict"
860 errors="strict"
861 ) 839 )
862 if node: 840 if node:
863 node = parse.unquote( 841 node = parse.unquote(
864 RE_PERIOD_ENC.sub(r"%\g<hex>", node), 842 RE_PERIOD_ENC.sub(r"%\g<hex>", node), errors="strict"
865 errors="strict"
866 ) 843 )
867 844
868 if "@" in username: 845 if "@" in username:
869 username, domain = username.rsplit("@", 1) 846 username, domain = username.rsplit("@", 1)
870 847
897 """Compute JID linking to an AP account 874 """Compute JID linking to an AP account
898 875
899 The local jid is computer by escaping AP actor handle and using it as local part 876 The local jid is computer by escaping AP actor handle and using it as local part
900 of JID, where domain part is this gateway own JID 877 of JID, where domain part is this gateway own JID
901 """ 878 """
902 return jid.JID( 879 return jid.JID(None, (self._e.escape(account), self.client.jid.host, None))
903 None,
904 (
905 self._e.escape(account),
906 self.client.jid.host,
907 None
908 )
909 )
910 880
911 async def get_jid_from_id(self, requestor_actor_id: str, actor_id: str) -> jid.JID: 881 async def get_jid_from_id(self, requestor_actor_id: str, actor_id: str) -> jid.JID:
912 """Compute JID linking to an AP Actor ID 882 """Compute JID linking to an AP Actor ID
913 883
914 The local jid is computer by escaping AP actor handle and using it as local part 884 The local jid is computer by escaping AP actor handle and using it as local part
935 905
936 @param url: URL to parse (schema is not mandatory) 906 @param url: URL to parse (schema is not mandatory)
937 @return: endpoint type and extra arguments 907 @return: endpoint type and extra arguments
938 """ 908 """
939 path = parse.urlparse(url).path.lstrip("/") 909 path = parse.urlparse(url).path.lstrip("/")
940 type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/") 910 type_, *extra_args = path[len(self.ap_path) :].lstrip("/").split("/")
941 return type_, [parse.unquote(a) for a in extra_args] 911 return type_, [parse.unquote(a) for a in extra_args]
942 912
943 def build_apurl(self, type_:str , *args: str) -> str: 913 def build_apurl(self, type_: str, *args: str) -> str:
944 """Build an AP endpoint URL 914 """Build an AP endpoint URL
945 915
946 @param type_: type of AP endpoing 916 @param type_: type of AP endpoing
947 @param arg: endpoint dependant arguments 917 @param arg: endpoint dependant arguments
948 """ 918 """
949 return parse.urljoin( 919 return parse.urljoin(
950 self.base_ap_url, 920 self.base_ap_url,
951 str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))) 921 str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))),
952 ) 922 )
953 923
954 def is_local_url(self, url: str) -> bool: 924 def is_local_url(self, url: str) -> bool:
955 """Tells if an URL link to this component 925 """Tells if an URL link to this component
956 926
992 """Retrieve actor data with LRU cache""" 962 """Retrieve actor data with LRU cache"""
993 return await self.ap_get(actor_id, requestor_actor_id) 963 return await self.ap_get(actor_id, requestor_actor_id)
994 964
995 @async_lru(maxsize=LRU_MAX_SIZE) 965 @async_lru(maxsize=LRU_MAX_SIZE)
996 async def get_actor_pub_key_data( 966 async def get_actor_pub_key_data(
997 self, 967 self, requestor_actor_id: str, actor_id: str
998 requestor_actor_id: str,
999 actor_id: str
1000 ) -> Tuple[str, str, rsa.RSAPublicKey]: 968 ) -> Tuple[str, str, rsa.RSAPublicKey]:
1001 """Retrieve Public Key data from actor ID 969 """Retrieve Public Key data from actor ID
1002 970
1003 @param requestor_actor_id: ID of the actor doing the request. 971 @param requestor_actor_id: ID of the actor doing the request.
1004 @param actor_id: actor ID (url) 972 @param actor_id: actor ID (url)
1065 async def check_signature( 1033 async def check_signature(
1066 self, 1034 self,
1067 requestor_actor_id: str, 1035 requestor_actor_id: str,
1068 signature: str, 1036 signature: str,
1069 key_id: str, 1037 key_id: str,
1070 headers: Dict[str, str] 1038 headers: Dict[str, str],
1071 ) -> str: 1039 ) -> str:
1072 """Verify that signature matches given headers 1040 """Verify that signature matches given headers
1073 1041
1074 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 1042 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2
1075 1043
1079 @param headers: headers and their values, including pseudo-headers 1047 @param headers: headers and their values, including pseudo-headers
1080 @return: id of the signing actor 1048 @return: id of the signing actor
1081 1049
1082 @raise InvalidSignature: signature doesn't match headers 1050 @raise InvalidSignature: signature doesn't match headers
1083 """ 1051 """
1084 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) 1052 to_sign = "\n".join(f"{k.lower()}: {v}" for k, v in headers.items())
1085 if key_id.startswith("acct:"): 1053 if key_id.startswith("acct:"):
1086 actor = key_id[5:] 1054 actor = key_id[5:]
1087 actor_id = await self.get_ap_actor_id_from_account(actor) 1055 actor_id = await self.get_ap_actor_id_from_account(actor)
1088 else: 1056 else:
1089 actor_id = key_id.split("#", 1)[0] 1057 actor_id = key_id.split("#", 1)[0]
1090 1058
1091 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data( 1059 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data(
1092 requestor_actor_id, 1060 requestor_actor_id, actor_id
1093 actor_id
1094 ) 1061 )
1095 if pub_key_id != key_id or pub_key_owner != actor_id: 1062 if pub_key_id != key_id or pub_key_owner != actor_id:
1096 raise exceptions.EncryptionError("Public Key mismatch") 1063 raise exceptions.EncryptionError("Public Key mismatch")
1097 1064
1098 try: 1065 try:
1099 pub_key.verify( 1066 pub_key.verify(
1100 base64.b64decode(signature), 1067 base64.b64decode(signature),
1101 to_sign.encode(), 1068 to_sign.encode(),
1102 # we have to use PKCS1v15 padding to be compatible with Mastodon 1069 # we have to use PKCS1v15 padding to be compatible with Mastodon
1103 padding.PKCS1v15(), # type: ignore 1070 padding.PKCS1v15(), # type: ignore
1104 hashes.SHA256() # type: ignore 1071 hashes.SHA256(), # type: ignore
1105 ) 1072 )
1106 except InvalidSignature: 1073 except InvalidSignature:
1107 raise exceptions.EncryptionError( 1074 raise exceptions.EncryptionError(
1108 "Invalid signature (using PKC0S1 v1.5 and SHA-256)" 1075 "Invalid signature (using PKC0S1 v1.5 and SHA-256)"
1109 ) 1076 )
1110 1077
1111 return actor_id 1078 return actor_id
1112 1079
1113 def get_signature_data( 1080 def get_signature_data(
1114 self, 1081 self, key_id: str, headers: Dict[str, str]
1115 key_id: str,
1116 headers: Dict[str, str]
1117 ) -> Tuple[Dict[str, str], Dict[str, str]]: 1082 ) -> Tuple[Dict[str, str], Dict[str, str]]:
1118 """Generate and return signature and corresponding headers 1083 """Generate and return signature and corresponding headers
1119 1084
1120 @param parsed_url: URL where the request is sent/has been received 1085 @param parsed_url: URL where the request is sent/has been received
1121 @param key_id: ID of the key (URL linking to the data with public key) 1086 @param key_id: ID of the key (URL linking to the data with public key)
1128 ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers 1093 ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers
1129 removed, and ``Signature`` added. 1094 removed, and ``Signature`` added.
1130 """ 1095 """
1131 # headers must be lower case 1096 # headers must be lower case
1132 l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()} 1097 l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()}
1133 to_sign = "\n".join(f"{k}: {v}" for k,v in l_headers.items()) 1098 to_sign = "\n".join(f"{k}: {v}" for k, v in l_headers.items())
1134 signature = base64.b64encode(self.private_key.sign( 1099 signature = base64.b64encode(
1135 to_sign.encode(), 1100 self.private_key.sign(
1136 # we have to use PKCS1v15 padding to be compatible with Mastodon 1101 to_sign.encode(),
1137 padding.PKCS1v15(), # type: ignore 1102 # we have to use PKCS1v15 padding to be compatible with Mastodon
1138 hashes.SHA256() # type: ignore 1103 padding.PKCS1v15(), # type: ignore
1139 )).decode() 1104 hashes.SHA256(), # type: ignore
1105 )
1106 ).decode()
1140 sign_data = { 1107 sign_data = {
1141 "keyId": key_id, 1108 "keyId": key_id,
1142 "Algorithm": "rsa-sha256", 1109 "Algorithm": "rsa-sha256",
1143 "headers": " ".join(l_headers.keys()), 1110 "headers": " ".join(l_headers.keys()),
1144 "signature": signature 1111 "signature": signature,
1145 } 1112 }
1146 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} 1113 new_headers = {k: v for k, v in headers.items() if not k.startswith("(")}
1147 new_headers["Signature"] = self.build_signature_header(sign_data) 1114 new_headers["Signature"] = self.build_signature_header(sign_data)
1148 return new_headers, sign_data 1115 return new_headers, sign_data
1149 1116
1150 async def convert_and_post_items( 1117 async def convert_and_post_items(
1151 self, 1118 self,
1165 @param subscribe_extra_nodes: if True, extra data nodes will be automatically 1132 @param subscribe_extra_nodes: if True, extra data nodes will be automatically
1166 subscribed, that is comment nodes if present and attachments nodes. 1133 subscribed, that is comment nodes if present and attachments nodes.
1167 """ 1134 """
1168 actor_id = await self.get_ap_actor_id_from_account(ap_account) 1135 actor_id = await self.get_ap_actor_id_from_account(ap_account)
1169 requestor_actor_id = self.build_apurl( 1136 requestor_actor_id = self.build_apurl(
1170 TYPE_ACTOR, 1137 TYPE_ACTOR, await self.get_ap_account_from_jid_and_node(service, node)
1171 await self.get_ap_account_from_jid_and_node(service, node)
1172 ) 1138 )
1173 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) 1139 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id)
1174 for item in items: 1140 for item in items:
1175 if item.name == "item": 1141 if item.name == "item":
1176 cached_item = await self.host.memory.storage.search_pubsub_items({ 1142 cached_item = await self.host.memory.storage.search_pubsub_items(
1177 "profiles": [self.client.profile], 1143 {
1178 "services": [service], 1144 "profiles": [self.client.profile],
1179 "nodes": [node], 1145 "services": [service],
1180 "names": [item["id"]] 1146 "nodes": [node],
1181 }) 1147 "names": [item["id"]],
1148 }
1149 )
1182 is_new = not bool(cached_item) 1150 is_new = not bool(cached_item)
1183 if node.startswith(self._events.namespace): 1151 if node.startswith(self._events.namespace):
1184 # event item 1152 # event item
1185 event_data = self._events.event_elt_2_event_data(item) 1153 event_data = self._events.event_elt_2_event_data(item)
1186 try: 1154 try:
1257 client: SatXMPPEntity, 1225 client: SatXMPPEntity,
1258 ap_account: str, 1226 ap_account: str,
1259 service: jid.JID, 1227 service: jid.JID,
1260 node: str, 1228 node: str,
1261 items: List[domish.Element], 1229 items: List[domish.Element],
1262 publisher: Optional[jid.JID] = None 1230 publisher: Optional[jid.JID] = None,
1263 ) -> None: 1231 ) -> None:
1264 """Convert XMPP item attachments to AP activities and post them to actor inbox 1232 """Convert XMPP item attachments to AP activities and post them to actor inbox
1265 1233
1266 @param ap_account: account of ActivityPub actor receiving the item 1234 @param ap_account: account of ActivityPub actor receiving the item
1267 @param service: JID of the (virtual) pubsub service where the item has been 1235 @param service: JID of the (virtual) pubsub service where the item has been
1282 f"{len(items)})" 1250 f"{len(items)})"
1283 ) 1251 )
1284 1252
1285 actor_id = await self.get_ap_actor_id_from_account(ap_account) 1253 actor_id = await self.get_ap_actor_id_from_account(ap_account)
1286 requestor_actor_id = self.build_apurl( 1254 requestor_actor_id = self.build_apurl(
1287 TYPE_ACTOR, 1255 TYPE_ACTOR, await self.get_ap_account_from_jid_and_node(service, node)
1288 await self.get_ap_account_from_jid_and_node(service, node)
1289 ) 1256 )
1290 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) 1257 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id)
1291 1258
1292 item_elt = items[0] 1259 item_elt = items[0]
1293 item_id = item_elt["id"] 1260 item_id = item_elt["id"]
1325 ) 1292 )
1326 return 1293 return
1327 else: 1294 else:
1328 item_url = self.build_apurl(TYPE_ITEM, item_account, item_id) 1295 item_url = self.build_apurl(TYPE_ITEM, item_account, item_id)
1329 1296
1330 old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({ 1297 old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items(
1331 "profiles": [self.client.profile], 1298 {
1332 "services": [service], 1299 "profiles": [self.client.profile],
1333 "nodes": [node], 1300 "services": [service],
1334 "names": [item_elt["id"]] 1301 "nodes": [node],
1335 }) 1302 "names": [item_elt["id"]],
1303 }
1304 )
1336 if not old_attachment_pubsub_items: 1305 if not old_attachment_pubsub_items:
1337 old_attachment = {} 1306 old_attachment = {}
1338 else: 1307 else:
1339 old_attachment_items = [i.data for i in old_attachment_pubsub_items] 1308 old_attachment_items = [i.data for i in old_attachment_pubsub_items]
1340 old_attachments = self._pa.items_2_attachment_data( 1309 old_attachments = self._pa.items_2_attachment_data(
1343 try: 1312 try:
1344 old_attachment = old_attachments[0] 1313 old_attachment = old_attachments[0]
1345 except IndexError: 1314 except IndexError:
1346 # no known element was present in attachments 1315 # no known element was present in attachments
1347 old_attachment = {} 1316 old_attachment = {}
1348 publisher_account = await self.get_ap_account_from_jid_and_node( 1317 publisher_account = await self.get_ap_account_from_jid_and_node(publisher, None)
1349 publisher,
1350 None
1351 )
1352 publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account) 1318 publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account)
1353 try: 1319 try:
1354 attachments = self._pa.items_2_attachment_data(client, [item_elt])[0] 1320 attachments = self._pa.items_2_attachment_data(client, [item_elt])[0]
1355 except IndexError: 1321 except IndexError:
1356 # no known element was present in attachments 1322 # no known element was present in attachments
1388 for reaction in reactions: 1354 for reaction in reactions:
1389 activity_id = self.build_apurl( 1355 activity_id = self.build_apurl(
1390 "reaction", item_account, item_id, reaction.encode().hex() 1356 "reaction", item_account, item_id, reaction.encode().hex()
1391 ) 1357 )
1392 reaction_activity = self.create_activity( 1358 reaction_activity = self.create_activity(
1393 TYPE_REACTION, publisher_actor_id, item_url, 1359 TYPE_REACTION, publisher_actor_id, item_url, activity_id=activity_id
1394 activity_id=activity_id
1395 ) 1360 )
1396 reaction_activity["content"] = reaction 1361 reaction_activity["content"] = reaction
1397 reaction_activity["to"] = [ap_account] 1362 reaction_activity["to"] = [ap_account]
1398 reaction_activity["cc"] = [NS_AP_PUBLIC] 1363 reaction_activity["cc"] = [NS_AP_PUBLIC]
1399 if undo: 1364 if undo:
1408 if "rsvp" in attachments: 1373 if "rsvp" in attachments:
1409 attending = attachments["rsvp"].get("attending", "no") 1374 attending = attachments["rsvp"].get("attending", "no")
1410 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") 1375 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1411 if attending != old_attending: 1376 if attending != old_attending:
1412 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE 1377 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE
1413 activity_id = self.build_apurl(activity_type.lower(), item_account, item_id) 1378 activity_id = self.build_apurl(
1379 activity_type.lower(), item_account, item_id
1380 )
1414 activity = self.create_activity( 1381 activity = self.create_activity(
1415 activity_type, publisher_actor_id, item_url, activity_id=activity_id 1382 activity_type, publisher_actor_id, item_url, activity_id=activity_id
1416 ) 1383 )
1417 activity["to"] = [ap_account] 1384 activity["to"] = [ap_account]
1418 activity["cc"] = [NS_AP_PUBLIC] 1385 activity["cc"] = [NS_AP_PUBLIC]
1419 await self.ap_post(inbox, publisher_actor_id, activity) 1386 await self.ap_post(inbox, publisher_actor_id, activity)
1420 else: 1387 else:
1421 if "rsvp" in old_attachment: 1388 if "rsvp" in old_attachment:
1422 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") 1389 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1423 if old_attending == "yes": 1390 if old_attending == "yes":
1424 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) 1391 activity_id = self.build_apurl(
1392 TYPE_LEAVE.lower(), item_account, item_id
1393 )
1425 activity = self.create_activity( 1394 activity = self.create_activity(
1426 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id 1395 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id
1427 ) 1396 )
1428 activity["to"] = [ap_account] 1397 activity["to"] = [ap_account]
1429 activity["cc"] = [NS_AP_PUBLIC] 1398 activity["cc"] = [NS_AP_PUBLIC]
1434 log.debug("storing attachments item in cache") 1403 log.debug("storing attachments item in cache")
1435 cached_node = await self.host.memory.storage.get_pubsub_node( 1404 cached_node = await self.host.memory.storage.get_pubsub_node(
1436 client, service, node, with_subscriptions=True, create=True 1405 client, service, node, with_subscriptions=True, create=True
1437 ) 1406 )
1438 await self.host.memory.storage.cache_pubsub_items( 1407 await self.host.memory.storage.cache_pubsub_items(
1439 self.client, 1408 self.client, cached_node, [item_elt], [attachments]
1440 cached_node,
1441 [item_elt],
1442 [attachments]
1443 ) 1409 )
1444 1410
1445 def _publish_message(self, mess_data_s: str, service_s: str, profile: str): 1411 def _publish_message(self, mess_data_s: str, service_s: str, profile: str):
1446 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore 1412 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
1447 service = jid.JID(service_s) 1413 service = jid.JID(service_s)
1448 client = self.host.get_client(profile) 1414 client = self.host.get_client(profile)
1449 return defer.ensureDeferred(self.publish_message(client, mess_data, service)) 1415 return defer.ensureDeferred(self.publish_message(client, mess_data, service))
1450 1416
1451 @async_lru(maxsize=LRU_MAX_SIZE) 1417 @async_lru(maxsize=LRU_MAX_SIZE)
1461 """ 1427 """
1462 if account.count("@") != 1 or "/" in account: 1428 if account.count("@") != 1 or "/" in account:
1463 raise ValueError(f"Invalid account: {account!r}") 1429 raise ValueError(f"Invalid account: {account!r}")
1464 host = account.split("@")[1] 1430 host = account.split("@")[1]
1465 try: 1431 try:
1466 finger_data = await treq.json_content(await treq.get( 1432 finger_data = await treq.json_content(
1467 f"https://{host}/.well-known/webfinger?" 1433 await treq.get(
1468 f"resource=acct:{parse.quote_plus(account)}", 1434 f"https://{host}/.well-known/webfinger?"
1469 )) 1435 f"resource=acct:{parse.quote_plus(account)}",
1436 )
1437 )
1470 except Exception as e: 1438 except Exception as e:
1471 raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") 1439 raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}")
1472 for link in finger_data.get("links", []): 1440 for link in finger_data.get("links", []):
1473 if ( 1441 if (
1474 link.get("type") == "application/activity+json" 1442 link.get("type") == "application/activity+json"
1479 raise ValueError( 1447 raise ValueError(
1480 f"Invalid webfinger data for {account:r}: missing href" 1448 f"Invalid webfinger data for {account:r}: missing href"
1481 ) 1449 )
1482 break 1450 break
1483 else: 1451 else:
1484 raise ValueError( 1452 raise ValueError(f"No ActivityPub link found for {account!r}")
1485 f"No ActivityPub link found for {account!r}"
1486 )
1487 return href 1453 return href
1488 1454
1489 async def get_ap_actor_data_from_account( 1455 async def get_ap_actor_data_from_account(
1490 self, 1456 self, requestor_actor_id: str, account: str
1491 requestor_actor_id: str,
1492 account: str
1493 ) -> dict: 1457 ) -> dict:
1494 """Retrieve ActivityPub Actor data 1458 """Retrieve ActivityPub Actor data
1495 1459
1496 @param account: ActivityPub Actor identifier 1460 @param account: ActivityPub Actor identifier
1497 """ 1461 """
1498 href = await self.get_ap_actor_id_from_account(account) 1462 href = await self.get_ap_actor_id_from_account(account)
1499 return await self.ap_get(href, requestor_actor_id) 1463 return await self.ap_get(href, requestor_actor_id)
1500 1464
1501 async def get_ap_inbox_from_id( 1465 async def get_ap_inbox_from_id(
1502 self, 1466 self, requestor_actor_id: str, actor_id: str, use_shared: bool = True
1503 requestor_actor_id: str,
1504 actor_id: str,
1505 use_shared: bool = True
1506 ) -> str: 1467 ) -> str:
1507 """Retrieve inbox of an actor_id 1468 """Retrieve inbox of an actor_id
1508 1469
1509 @param requestor_actor_id: ID of the actor doing the request. 1470 @param requestor_actor_id: ID of the actor doing the request.
1510 @param actor_id: ID of the actor from whom Inbox must be retrieved. 1471 @param actor_id: ID of the actor from whom Inbox must be retrieved.
1529 @return: AP handle 1490 @return: AP handle
1530 """ 1491 """
1531 if self.is_local_url(actor_id): 1492 if self.is_local_url(actor_id):
1532 url_type, url_args = self.parse_apurl(actor_id) 1493 url_type, url_args = self.parse_apurl(actor_id)
1533 if url_type != "actor" or not url_args: 1494 if url_type != "actor" or not url_args:
1534 raise exceptions.DataError( 1495 raise exceptions.DataError(f"invalid local actor ID: {actor_id}")
1535 f"invalid local actor ID: {actor_id}"
1536 )
1537 account = url_args[0] 1496 account = url_args[0]
1538 try: 1497 try:
1539 account_user, account_host = account.split('@') 1498 account_user, account_host = account.split("@")
1540 except ValueError: 1499 except ValueError:
1541 raise exceptions.DataError( 1500 raise exceptions.DataError(f"invalid account from url: {actor_id}")
1542 f"invalid account from url: {actor_id}"
1543 )
1544 if not account_user or account_host != self.public_url: 1501 if not account_user or account_host != self.public_url:
1545 raise exceptions.DataError( 1502 raise exceptions.DataError(
1546 f"{account!r} is not a valid local account (from {actor_id})" 1503 f"{account!r} is not a valid local account (from {actor_id})"
1547 ) 1504 )
1548 return account 1505 return account
1634 pass 1591 pass
1635 elif start_index > 5000: 1592 elif start_index > 5000:
1636 raise error.StanzaError( 1593 raise error.StanzaError(
1637 "feature-not-implemented", 1594 "feature-not-implemented",
1638 text="Maximum limit for previous_index has been reached, this limit" 1595 text="Maximum limit for previous_index has been reached, this limit"
1639 "is set to avoid DoS" 1596 "is set to avoid DoS",
1640 ) 1597 )
1641 else: 1598 else:
1642 # we'll convert "start_index" to "after_id", thus we need the item just 1599 # we'll convert "start_index" to "after_id", thus we need the item just
1643 # before "start_index" 1600 # before "start_index"
1644 previous_index = start_index - 1 1601 previous_index = start_index - 1
1663 f"missing previous page link at {current_page}: {page_data!r}" 1620 f"missing previous page link at {current_page}: {page_data!r}"
1664 ) 1621 )
1665 raise error.StanzaError( 1622 raise error.StanzaError(
1666 "service-unavailable", 1623 "service-unavailable",
1667 "Error while retrieving previous page from AP service at " 1624 "Error while retrieving previous page from AP service at "
1668 f"{current_page}" 1625 f"{current_page}",
1669 ) 1626 )
1670 1627
1671 init_page = "last" if chronological_pagination else "first" 1628 init_page = "last" if chronological_pagination else "first"
1672 page = collection.get(init_page) 1629 page = collection.get(init_page)
1673 if not page: 1630 if not page:
1696 log.debug(f"{after_id!r} not found at {page_id}, skipping") 1653 log.debug(f"{after_id!r} not found at {page_id}, skipping")
1697 else: 1654 else:
1698 found_after_id = True 1655 found_after_id = True
1699 if chronological_pagination: 1656 if chronological_pagination:
1700 start_index = retrieved_items - len(page_items) + limit_idx + 1 1657 start_index = retrieved_items - len(page_items) + limit_idx + 1
1701 page_items = page_items[limit_idx+1:] 1658 page_items = page_items[limit_idx + 1 :]
1702 else: 1659 else:
1703 start_index = count - (retrieved_items - len(page_items) + 1660 start_index = count - (
1704 limit_idx + 1) 1661 retrieved_items - len(page_items) + limit_idx + 1
1662 )
1705 page_items = page_items[:limit_idx] 1663 page_items = page_items[:limit_idx]
1706 items.extend(page_items) 1664 items.extend(page_items)
1707 else: 1665 else:
1708 items.extend(page_items) 1666 items.extend(page_items)
1709 if max_items is not None and len(items) >= max_items: 1667 if max_items is not None and len(items) >= max_items:
1728 log.warning("Can't determine index of first element") 1686 log.warning("Can't determine index of first element")
1729 elif chronological_pagination: 1687 elif chronological_pagination:
1730 rsm_resp["index"] = 0 1688 rsm_resp["index"] = 0
1731 else: 1689 else:
1732 rsm_resp["index"] = count - len(items) 1690 rsm_resp["index"] = count - len(items)
1733 rsm_resp.update({ 1691 rsm_resp.update({"first": items[0]["id"], "last": items[-1]["id"]})
1734 "first": items[0]["id"],
1735 "last": items[-1]["id"]
1736 })
1737 1692
1738 return items, rsm.RSMResponse(**rsm_resp) 1693 return items, rsm.RSMResponse(**rsm_resp)
1739 1694
1740 async def ap_item_2_mb_data_and_elt(self, requestor_actor_id: str, ap_item: dict) -> tuple[dict, domish.Element]: 1695 async def ap_item_2_mb_data_and_elt(
1696 self, requestor_actor_id: str, ap_item: dict
1697 ) -> tuple[dict, domish.Element]:
1741 """Convert AP item to parsed microblog data and corresponding item element 1698 """Convert AP item to parsed microblog data and corresponding item element
1742 1699
1743 @param requestor_actor_id: ID of the actor requesting the conversion. 1700 @param requestor_actor_id: ID of the actor requesting the conversion.
1744 @param ap_item: AP item to convert. 1701 @param ap_item: AP item to convert.
1745 @return: microblog and correspondign <item> element. 1702 @return: microblog and correspondign <item> element.
1752 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] 1709 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"]
1753 else: 1710 else:
1754 item_elt["publisher"] = mb_data["author_jid"] 1711 item_elt["publisher"] = mb_data["author_jid"]
1755 return mb_data, item_elt 1712 return mb_data, item_elt
1756 1713
1757 async def ap_item_2_mb_elt(self, requestor_actor_id: str, ap_item: dict) -> domish.Element: 1714 async def ap_item_2_mb_elt(
1715 self, requestor_actor_id: str, ap_item: dict
1716 ) -> domish.Element:
1758 """Convert AP item to XMPP item element 1717 """Convert AP item to XMPP item element
1759 1718
1760 @param requestor_actor_id: ID of the actor requesting the conversion. 1719 @param requestor_actor_id: ID of the actor requesting the conversion.
1761 @param ap_item: AP item to convert. 1720 @param ap_item: AP item to convert.
1762 @return: <item> element 1721 @return: <item> element
1767 async def parse_ap_page( 1726 async def parse_ap_page(
1768 self, 1727 self,
1769 requestor_actor_id: str, 1728 requestor_actor_id: str,
1770 page: Union[str, dict], 1729 page: Union[str, dict],
1771 parser: Callable[[str, dict], Awaitable[domish.Element]], 1730 parser: Callable[[str, dict], Awaitable[domish.Element]],
1772 only_ids: bool = False 1731 only_ids: bool = False,
1773 ) -> Tuple[dict, List[domish.Element]]: 1732 ) -> Tuple[dict, List[domish.Element]]:
1774 """Convert AP objects from an AP page to XMPP items 1733 """Convert AP objects from an AP page to XMPP items
1775 1734
1776 @param requestor_actor_id: ID of the actor doing the request. 1735 @param requestor_actor_id: ID of the actor doing the request.
1777 @param page: Can be either url linking and AP page, or the page data directly 1736 @param page: Can be either url linking and AP page, or the page data directly
1779 @param only_ids: if True, only retrieve items IDs 1738 @param only_ids: if True, only retrieve items IDs
1780 @return: page data, pubsub items 1739 @return: page data, pubsub items
1781 """ 1740 """
1782 page_data = await self.ap_get_object(requestor_actor_id, page) 1741 page_data = await self.ap_get_object(requestor_actor_id, page)
1783 if page_data is None: 1742 if page_data is None:
1784 log.warning('No data found in collection') 1743 log.warning("No data found in collection")
1785 return {}, [] 1744 return {}, []
1786 ap_items = await self.ap_get_list(requestor_actor_id, page_data, "orderedItems", only_ids=only_ids) 1745 ap_items = await self.ap_get_list(
1746 requestor_actor_id, page_data, "orderedItems", only_ids=only_ids
1747 )
1787 if ap_items is None: 1748 if ap_items is None:
1788 ap_items = await self.ap_get_list(requestor_actor_id, page_data, "items", only_ids=only_ids) 1749 ap_items = await self.ap_get_list(
1750 requestor_actor_id, page_data, "items", only_ids=only_ids
1751 )
1789 if not ap_items: 1752 if not ap_items:
1790 log.warning(f'No item field found in collection: {page_data!r}') 1753 log.warning(f"No item field found in collection: {page_data!r}")
1791 return page_data, [] 1754 return page_data, []
1792 else: 1755 else:
1793 log.warning( 1756 log.warning("Items are not ordered, this is not spec compliant")
1794 "Items are not ordered, this is not spec compliant"
1795 )
1796 items = [] 1757 items = []
1797 # AP Collections are in antichronological order, but we expect chronological in 1758 # AP Collections are in antichronological order, but we expect chronological in
1798 # Pubsub, thus we reverse it 1759 # Pubsub, thus we reverse it
1799 for ap_item in reversed(ap_items): 1760 for ap_item in reversed(ap_items):
1800 try: 1761 try:
1803 continue 1764 continue
1804 1765
1805 return page_data, items 1766 return page_data, items
1806 1767
1807 async def get_comments_nodes( 1768 async def get_comments_nodes(
1808 self, 1769 self, requestor_actor_id: str, item_id: str, parent_id: Optional[str]
1809 requestor_actor_id: str,
1810 item_id: str,
1811 parent_id: Optional[str]
1812 ) -> Tuple[Optional[str], Optional[str]]: 1770 ) -> Tuple[Optional[str], Optional[str]]:
1813 """Get node where this item is and node to use for comments 1771 """Get node where this item is and node to use for comments
1814 1772
1815 if config option "comments_max_depth" is set, a common node will be used below the 1773 if config option "comments_max_depth" is set, a common node will be used below the
1816 given depth 1774 given depth
1825 "comments_max_depth") 1783 "comments_max_depth")
1826 """ 1784 """
1827 if parent_id is None or not self.comments_max_depth: 1785 if parent_id is None or not self.comments_max_depth:
1828 return ( 1786 return (
1829 self._m.get_comments_node(parent_id) if parent_id is not None else None, 1787 self._m.get_comments_node(parent_id) if parent_id is not None else None,
1830 self._m.get_comments_node(item_id) 1788 self._m.get_comments_node(item_id),
1831 ) 1789 )
1832 parent_url = parent_id 1790 parent_url = parent_id
1833 parents = [] 1791 parents = []
1834 for __ in range(COMMENTS_MAX_PARENTS): 1792 for __ in range(COMMENTS_MAX_PARENTS):
1835 parent_item = await self.ap_get(parent_url, requestor_actor_id) 1793 parent_item = await self.ap_get(parent_url, requestor_actor_id)
1836 parents.insert(0, parent_item) 1794 parents.insert(0, parent_item)
1837 parent_url = parent_item.get("inReplyTo") 1795 parent_url = parent_item.get("inReplyTo")
1838 if parent_url is None: 1796 if parent_url is None:
1839 break 1797 break
1840 parent_limit = self.comments_max_depth-1 1798 parent_limit = self.comments_max_depth - 1
1841 if len(parents) <= parent_limit: 1799 if len(parents) <= parent_limit:
1842 return ( 1800 return (
1843 self._m.get_comments_node(parents[-1]["id"]), 1801 self._m.get_comments_node(parents[-1]["id"]),
1844 self._m.get_comments_node(item_id) 1802 self._m.get_comments_node(item_id),
1845 ) 1803 )
1846 else: 1804 else:
1847 last_level_item = parents[parent_limit] 1805 last_level_item = parents[parent_limit]
1848 return ( 1806 return (self._m.get_comments_node(last_level_item["id"]), None)
1849 self._m.get_comments_node(last_level_item["id"]),
1850 None
1851 )
1852 1807
1853 async def ap_item_2_mb_data(self, requestor_actor_id: str, ap_item: dict) -> dict: 1808 async def ap_item_2_mb_data(self, requestor_actor_id: str, ap_item: dict) -> dict:
1854 """Convert AP activity or object to microblog data 1809 """Convert AP activity or object to microblog data
1855 1810
1856 @param actor_id: ID of the actor doing the request. 1811 @param actor_id: ID of the actor doing the request.
1923 1878
1924 # author 1879 # author
1925 if is_activity: 1880 if is_activity:
1926 authors = await self.ap_get_actors(requestor_actor_id, ap_item, "actor") 1881 authors = await self.ap_get_actors(requestor_actor_id, ap_item, "actor")
1927 else: 1882 else:
1928 authors = await self.ap_get_actors(requestor_actor_id, ap_object, "attributedTo") 1883 authors = await self.ap_get_actors(
1884 requestor_actor_id, ap_object, "attributedTo"
1885 )
1929 if len(authors) > 1: 1886 if len(authors) > 1:
1930 # we only keep first item as author 1887 # we only keep first item as author
1931 # TODO: handle multiple actors 1888 # TODO: handle multiple actors
1932 log.warning("multiple actors are not managed") 1889 log.warning("multiple actors are not managed")
1933 1890
1961 ) 1918 )
1962 if comments_node is not None: 1919 if comments_node is not None:
1963 comments_data = { 1920 comments_data = {
1964 "service": author_jid, 1921 "service": author_jid,
1965 "node": comments_node, 1922 "node": comments_node,
1966 "uri": uri.build_xmpp_uri( 1923 "uri": uri.build_xmpp_uri("pubsub", path=author_jid, node=comments_node),
1967 "pubsub",
1968 path=author_jid,
1969 node=comments_node
1970 )
1971 } 1924 }
1972 mb_data["comments"] = [comments_data] 1925 mb_data["comments"] = [comments_data]
1973 1926
1974 return mb_data 1927 return mb_data
1975 1928
1976 async def get_reply_to_id_from_xmpp_node( 1929 async def get_reply_to_id_from_xmpp_node(
1977 self, 1930 self, client: SatXMPPEntity, ap_account: str, parent_item: str, mb_data: dict
1978 client: SatXMPPEntity,
1979 ap_account: str,
1980 parent_item: str,
1981 mb_data: dict
1982 ) -> str: 1931 ) -> str:
1983 """Get URL to use for ``inReplyTo`` field in AP item. 1932 """Get URL to use for ``inReplyTo`` field in AP item.
1984 1933
1985 There is currently no way to know the parent service of a comment with XEP-0277. 1934 There is currently no way to know the parent service of a comment with XEP-0277.
1986 To work around that, we try to check if we have this item in the cache (we 1935 To work around that, we try to check if we have this item in the cache (we
1993 @param mb_data: microblog data of the publication 1942 @param mb_data: microblog data of the publication
1994 @return: URL to use in ``inReplyTo`` field 1943 @return: URL to use in ``inReplyTo`` field
1995 """ 1944 """
1996 # FIXME: propose a protoXEP to properly get parent item, node and service 1945 # FIXME: propose a protoXEP to properly get parent item, node and service
1997 1946
1998 found_items = await self.host.memory.storage.search_pubsub_items({ 1947 found_items = await self.host.memory.storage.search_pubsub_items(
1999 "profiles": [client.profile], 1948 {"profiles": [client.profile], "names": [parent_item]}
2000 "names": [parent_item] 1949 )
2001 })
2002 if not found_items: 1950 if not found_items:
2003 log.warning(f"parent item {parent_item!r} not found in cache") 1951 log.warning(f"parent item {parent_item!r} not found in cache")
2004 parent_ap_account = ap_account 1952 parent_ap_account = ap_account
2005 elif len(found_items) == 1: 1953 elif len(found_items) == 1:
2006 cached_node = found_items[0].node 1954 cached_node = found_items[0].node
2007 parent_ap_account = await self.get_ap_account_from_jid_and_node( 1955 parent_ap_account = await self.get_ap_account_from_jid_and_node(
2008 cached_node.service, 1956 cached_node.service, cached_node.name
2009 cached_node.name
2010 ) 1957 )
2011 else: 1958 else:
2012 # we found several cached item with given ID, we check if there is one 1959 # we found several cached item with given ID, we check if there is one
2013 # corresponding to this author 1960 # corresponding to this author
2014 try: 1961 try:
2015 author = jid.JID(mb_data["author_jid"]).userhostJID() 1962 author = jid.JID(mb_data["author_jid"]).userhostJID()
2016 cached_item = next( 1963 cached_item = next(
2017 i for i in found_items 1964 i
2018 if jid.JID(i.data["publisher"]).userhostJID() 1965 for i in found_items
2019 == author 1966 if jid.JID(i.data["publisher"]).userhostJID() == author
2020 ) 1967 )
2021 except StopIteration: 1968 except StopIteration:
2022 # no item corresponding to this author, we use ap_account 1969 # no item corresponding to this author, we use ap_account
2023 log.warning( 1970 log.warning(
2024 "Can't find a single cached item for parent item " 1971 "Can't find a single cached item for parent item " f"{parent_item!r}"
2025 f"{parent_item!r}"
2026 ) 1972 )
2027 parent_ap_account = ap_account 1973 parent_ap_account = ap_account
2028 else: 1974 else:
2029 cached_node = cached_item.node 1975 cached_node = cached_item.node
2030 parent_ap_account = await self.get_ap_account_from_jid_and_node( 1976 parent_ap_account = await self.get_ap_account_from_jid_and_node(
2031 cached_node.service, 1977 cached_node.service, cached_node.name
2032 cached_node.name 1978 )
2033 ) 1979
2034 1980 return self.build_apurl(TYPE_ITEM, parent_ap_account, parent_item)
2035 return self.build_apurl( 1981
2036 TYPE_ITEM, parent_ap_account, parent_item 1982 async def repeated_mb_2_ap_item(self, mb_data: dict) -> dict:
2037 )
2038
2039 async def repeated_mb_2_ap_item(
2040 self,
2041 mb_data: dict
2042 ) -> dict:
2043 """Convert repeated blog item to suitable AP Announce activity 1983 """Convert repeated blog item to suitable AP Announce activity
2044 1984
2045 @param mb_data: microblog metadata of an item repeating an other blog post 1985 @param mb_data: microblog metadata of an item repeating an other blog post
2046 @return: Announce activity linking to the repeated item 1986 @return: Announce activity linking to the repeated item
2047 """ 1987 """
2048 repeated = mb_data["extra"]["repeated"] 1988 repeated = mb_data["extra"]["repeated"]
2049 repeater = jid.JID(repeated["by"]) 1989 repeater = jid.JID(repeated["by"])
2050 repeater_account = await self.get_ap_account_from_jid_and_node( 1990 repeater_account = await self.get_ap_account_from_jid_and_node(repeater, None)
2051 repeater,
2052 None
2053 )
2054 repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account) 1991 repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account)
2055 repeated_uri = repeated["uri"] 1992 repeated_uri = repeated["uri"]
2056 1993
2057 if not repeated_uri.startswith("xmpp:"): 1994 if not repeated_uri.startswith("xmpp:"):
2058 log.warning( 1995 log.warning(
2093 "Announce", repeater_id, announced_uri, activity_id=activity_id 2030 "Announce", repeater_id, announced_uri, activity_id=activity_id
2094 ) 2031 )
2095 announce["to"] = [NS_AP_PUBLIC] 2032 announce["to"] = [NS_AP_PUBLIC]
2096 announce["cc"] = [ 2033 announce["cc"] = [
2097 self.build_apurl(TYPE_FOLLOWERS, repeater_account), 2034 self.build_apurl(TYPE_FOLLOWERS, repeater_account),
2098 await self.get_ap_actor_id_from_account(repeated_account) 2035 await self.get_ap_actor_id_from_account(repeated_account),
2099 ] 2036 ]
2100 return announce 2037 return announce
2101 2038
2102 async def mb_data_2_ap_item( 2039 async def mb_data_2_ap_item(
2103 self, 2040 self,
2104 client: SatXMPPEntity, 2041 client: SatXMPPEntity,
2105 mb_data: dict, 2042 mb_data: dict,
2106 public: bool =True, 2043 public: bool = True,
2107 is_new: bool = True, 2044 is_new: bool = True,
2108 ) -> dict: 2045 ) -> dict:
2109 """Convert Libervia Microblog Data to ActivityPub item 2046 """Convert Libervia Microblog Data to ActivityPub item
2110 2047
2111 @param mb_data: microblog data (as used in plugin XEP-0277) to convert 2048 @param mb_data: microblog data (as used in plugin XEP-0277) to convert
2129 if not mb_data.get("id"): 2066 if not mb_data.get("id"):
2130 mb_data["id"] = shortuuid.uuid() 2067 mb_data["id"] = shortuuid.uuid()
2131 if not mb_data.get("author_jid"): 2068 if not mb_data.get("author_jid"):
2132 mb_data["author_jid"] = client.jid.userhost() 2069 mb_data["author_jid"] = client.jid.userhost()
2133 ap_account = await self.get_ap_account_from_jid_and_node( 2070 ap_account = await self.get_ap_account_from_jid_and_node(
2134 jid.JID(mb_data["author_jid"]), 2071 jid.JID(mb_data["author_jid"]), None
2135 None
2136 ) 2072 )
2137 url_actor = self.build_apurl(TYPE_ACTOR, ap_account) 2073 url_actor = self.build_apurl(TYPE_ACTOR, ap_account)
2138 url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"]) 2074 url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"])
2139 ap_object = { 2075 ap_object = {
2140 "id": url_item, 2076 "id": url_item,
2151 attachments = extra.get(C.KEY_ATTACHMENTS) 2087 attachments = extra.get(C.KEY_ATTACHMENTS)
2152 if attachments: 2088 if attachments:
2153 ap_attachments = ap_object["attachment"] = [] 2089 ap_attachments = ap_object["attachment"] = []
2154 for attachment in attachments: 2090 for attachment in attachments:
2155 try: 2091 try:
2156 url = next( 2092 url = next(s["url"] for s in attachment["sources"] if "url" in s)
2157 s['url'] for s in attachment["sources"] if 'url' in s
2158 )
2159 except (StopIteration, KeyError): 2093 except (StopIteration, KeyError):
2160 log.warning( 2094 log.warning(f"Ignoring attachment without URL: {attachment}")
2161 f"Ignoring attachment without URL: {attachment}"
2162 )
2163 continue 2095 continue
2164 ap_attachment = { 2096 ap_attachment = {"url": url}
2165 "url": url
2166 }
2167 for key, ap_key in ( 2097 for key, ap_key in (
2168 ("media_type", "mediaType"), 2098 ("media_type", "mediaType"),
2169 # XXX: yes "name", cf. [ap_item_2_mb_data] 2099 # XXX: yes "name", cf. [ap_item_2_mb_data]
2170 ("desc", "name"), 2100 ("desc", "name"),
2171 ): 2101 ):
2189 mentioned_id = await self.get_ap_actor_id_from_account(mentioned) 2119 mentioned_id = await self.get_ap_actor_id_from_account(mentioned)
2190 except Exception as e: 2120 except Exception as e:
2191 log.warning(f"Can't add mention to {mentioned!r}: {e}") 2121 log.warning(f"Can't add mention to {mentioned!r}: {e}")
2192 else: 2122 else:
2193 ap_object["to"].append(mentioned_id) 2123 ap_object["to"].append(mentioned_id)
2194 ap_object.setdefault("tag", []).append({ 2124 ap_object.setdefault("tag", []).append(
2195 "type": TYPE_MENTION, 2125 {
2196 "href": mentioned_id, 2126 "type": TYPE_MENTION,
2197 "name": mention, 2127 "href": mentioned_id,
2198 }) 2128 "name": mention,
2129 }
2130 )
2199 try: 2131 try:
2200 node = mb_data["node"] 2132 node = mb_data["node"]
2201 service = jid.JID(mb_data["service"]) 2133 service = jid.JID(mb_data["service"])
2202 except KeyError: 2134 except KeyError:
2203 # node and service must always be specified when this method is used 2135 # node and service must always be specified when this method is used
2204 raise exceptions.InternalError( 2136 raise exceptions.InternalError("node or service is missing in mb_data")
2205 "node or service is missing in mb_data"
2206 )
2207 try: 2137 try:
2208 target_ap_account = await self.get_ap_account_from_jid_and_node( 2138 target_ap_account = await self.get_ap_account_from_jid_and_node(
2209 service, node 2139 service, node
2210 ) 2140 )
2211 except MissingLocalPartError: 2141 except MissingLocalPartError:
2224 service, node 2154 service, node
2225 ) 2155 )
2226 if self.is_virtual_jid(service): 2156 if self.is_virtual_jid(service):
2227 # service is a proxy JID for AP account 2157 # service is a proxy JID for AP account
2228 actor_data = await self.get_ap_actor_data_from_account( 2158 actor_data = await self.get_ap_actor_data_from_account(
2229 url_actor, 2159 url_actor, target_ap_account
2230 target_ap_account
2231 ) 2160 )
2232 followers = actor_data.get("followers") 2161 followers = actor_data.get("followers")
2233 else: 2162 else:
2234 # service is a real XMPP entity 2163 # service is a real XMPP entity
2235 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) 2164 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account)
2242 # this gateway and linking to an ActivityPub actor) 2171 # this gateway and linking to an ActivityPub actor)
2243 ap_object["inReplyTo"] = parent_item 2172 ap_object["inReplyTo"] = parent_item
2244 else: 2173 else:
2245 # the publication is from a followed real XMPP node 2174 # the publication is from a followed real XMPP node
2246 ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node( 2175 ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node(
2247 client, 2176 client, ap_account, parent_item, mb_data
2248 ap_account,
2249 parent_item,
2250 mb_data
2251 ) 2177 )
2252 2178
2253 return self.create_activity( 2179 return self.create_activity(
2254 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item 2180 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item
2255 ) 2181 )
2256 2182
2257 async def publish_message( 2183 async def publish_message(
2258 self, 2184 self, client: SatXMPPEntity, mess_data: dict, service: jid.JID
2259 client: SatXMPPEntity,
2260 mess_data: dict,
2261 service: jid.JID
2262 ) -> None: 2185 ) -> None:
2263 """Send an AP message 2186 """Send an AP message
2264 2187
2265 .. note:: 2188 .. note::
2266 2189
2290 item_data = await self.mb_data_2_ap_item(client, mess_data) 2213 item_data = await self.mb_data_2_ap_item(client, mess_data)
2291 url_actor = item_data["actor"] 2214 url_actor = item_data["actor"]
2292 await self.ap_post(inbox_url, url_actor, item_data) 2215 await self.ap_post(inbox_url, url_actor, item_data)
2293 2216
2294 async def ap_delete_item( 2217 async def ap_delete_item(
2295 self, 2218 self, jid_: jid.JID, node: Optional[str], item_id: str, public: bool = True
2296 jid_: jid.JID,
2297 node: Optional[str],
2298 item_id: str,
2299 public: bool = True
2300 ) -> Tuple[str, Dict[str, Any]]: 2219 ) -> Tuple[str, Dict[str, Any]]:
2301 """Build activity to delete an AP item 2220 """Build activity to delete an AP item
2302 2221
2303 @param jid_: JID of the entity deleting an item 2222 @param jid_: JID of the entity deleting an item
2304 @param node: node where the item is deleted 2223 @param node: node where the item is deleted
2312 node = self._m.namespace 2231 node = self._m.namespace
2313 2232
2314 author_account = await self.get_ap_account_from_jid_and_node(jid_, node) 2233 author_account = await self.get_ap_account_from_jid_and_node(jid_, node)
2315 author_actor_id = self.build_apurl(TYPE_ACTOR, author_account) 2234 author_actor_id = self.build_apurl(TYPE_ACTOR, author_account)
2316 2235
2317 items = await self.host.memory.storage.search_pubsub_items({ 2236 items = await self.host.memory.storage.search_pubsub_items(
2318 "profiles": [self.client.profile], 2237 {"profiles": [self.client.profile], "services": [jid_], "names": [item_id]}
2319 "services": [jid_], 2238 )
2320 "names": [item_id]
2321 })
2322 if not items: 2239 if not items:
2323 log.warning( 2240 log.warning(
2324 f"Deleting an unknown item at service {jid_}, node {node} and id " 2241 f"Deleting an unknown item at service {jid_}, node {node} and id "
2325 f"{item_id}" 2242 f"{item_id}"
2326 ) 2243 )
2327 else: 2244 else:
2328 try: 2245 try:
2329 mb_data = await self._m.item_2_mb_data(self.client, items[0].data, jid_, node) 2246 mb_data = await self._m.item_2_mb_data(
2247 self.client, items[0].data, jid_, node
2248 )
2330 if "repeated" in mb_data["extra"]: 2249 if "repeated" in mb_data["extra"]:
2331 # we are deleting a repeated item, we must translate this to an 2250 # we are deleting a repeated item, we must translate this to an
2332 # "Undo" of the "Announce" activity instead of a "Delete" one 2251 # "Undo" of the "Announce" activity instead of a "Delete" one
2333 announce = await self.repeated_mb_2_ap_item(mb_data) 2252 announce = await self.repeated_mb_2_ap_item(mb_data)
2334 undo = self.create_activity("Undo", author_actor_id, announce) 2253 undo = self.create_activity("Undo", author_actor_id, announce)
2339 f"{items[0].toXml()}" 2258 f"{items[0].toXml()}"
2340 ) 2259 )
2341 2260
2342 url_item = self.build_apurl(TYPE_ITEM, author_account, item_id) 2261 url_item = self.build_apurl(TYPE_ITEM, author_account, item_id)
2343 ap_item = self.create_activity( 2262 ap_item = self.create_activity(
2344 "Delete", 2263 "Delete", author_actor_id, {"id": url_item, "type": TYPE_TOMBSTONE}
2345 author_actor_id,
2346 {
2347 "id": url_item,
2348 "type": TYPE_TOMBSTONE
2349 }
2350 ) 2264 )
2351 if public: 2265 if public:
2352 ap_item["to"] = [NS_AP_PUBLIC] 2266 ap_item["to"] = [NS_AP_PUBLIC]
2353 return author_actor_id, ap_item 2267 return author_actor_id, ap_item
2354 2268
2355 def _message_received_trigger( 2269 def _message_received_trigger(
2356 self, 2270 self,
2357 client: SatXMPPEntity, 2271 client: SatXMPPEntity,
2358 message_elt: domish.Element, 2272 message_elt: domish.Element,
2359 post_treat: defer.Deferred 2273 post_treat: defer.Deferred,
2360 ) -> bool: 2274 ) -> bool:
2361 """add the gateway workflow on post treatment""" 2275 """add the gateway workflow on post treatment"""
2362 if self.client is None: 2276 if self.client is None:
2363 log.debug(f"no client set, ignoring message: {message_elt.toXml()}") 2277 log.debug(f"no client set, ignoring message: {message_elt.toXml()}")
2364 return True 2278 return True
2379 return mess_data 2293 return mess_data
2380 if not self.is_local(mess_data["from"]): 2294 if not self.is_local(mess_data["from"]):
2381 log.warning(f"ignoring non local message: {mess_data}") 2295 log.warning(f"ignoring non local message: {mess_data}")
2382 return mess_data 2296 return mess_data
2383 if not mess_data["to"].user: 2297 if not mess_data["to"].user:
2384 log.warning( 2298 log.warning(f"ignoring message addressed to gateway itself: {mess_data}")
2385 f"ignoring message addressed to gateway itself: {mess_data}"
2386 )
2387 return mess_data 2299 return mess_data
2388 requestor_actor_id = self.build_apurl(TYPE_ACTOR, mess_data["from"].userhost()) 2300 requestor_actor_id = self.build_apurl(TYPE_ACTOR, mess_data["from"].userhost())
2389 2301
2390 actor_account = self._e.unescape(mess_data["to"].user) 2302 actor_account = self._e.unescape(mess_data["to"].user)
2391 try: 2303 try:
2392 actor_id = await self.get_ap_actor_id_from_account(actor_account) 2304 actor_id = await self.get_ap_actor_id_from_account(actor_account)
2393 except Exception as e: 2305 except Exception as e:
2394 log.warning( 2306 log.warning(f"Can't retrieve data on actor {actor_account}: {e}")
2395 f"Can't retrieve data on actor {actor_account}: {e}"
2396 )
2397 # TODO: send an error <message> 2307 # TODO: send an error <message>
2398 return mess_data 2308 return mess_data
2399 inbox = await self.get_ap_inbox_from_id( 2309 inbox = await self.get_ap_inbox_from_id(
2400 requestor_actor_id, actor_id, use_shared=False 2310 requestor_actor_id, actor_id, use_shared=False
2401 ) 2311 )
2415 if origin_id: 2325 if origin_id:
2416 # we need to use origin ID when present to be able to retract the message 2326 # we need to use origin ID when present to be able to retract the message
2417 mb_data["id"] = origin_id 2327 mb_data["id"] = origin_id
2418 attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS) 2328 attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS)
2419 if attachments: 2329 if attachments:
2420 mb_data["extra"] = { 2330 mb_data["extra"] = {C.KEY_ATTACHMENTS: attachments}
2421 C.KEY_ATTACHMENTS: attachments
2422 }
2423 2331
2424 client = self.client.get_virtual_client(mess_data["from"]) 2332 client = self.client.get_virtual_client(mess_data["from"])
2425 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) 2333 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False)
2426 ap_object = ap_item["object"] 2334 ap_object = ap_item["object"]
2427 ap_object["to"] = ap_item["to"] = [actor_id] 2335 ap_object["to"] = ap_item["to"] = [actor_id]
2428 # we add a mention to direct message, otherwise peer is not notified in some AP 2336 # we add a mention to direct message, otherwise peer is not notified in some AP
2429 # implementations (notably Mastodon), and the message may be missed easily. 2337 # implementations (notably Mastodon), and the message may be missed easily.
2430 ap_object.setdefault("tag", []).append({ 2338 ap_object.setdefault("tag", []).append(
2431 "type": TYPE_MENTION, 2339 {
2432 "href": actor_id, 2340 "type": TYPE_MENTION,
2433 "name": f"@{actor_account}", 2341 "href": actor_id,
2434 }) 2342 "name": f"@{actor_account}",
2343 }
2344 )
2435 2345
2436 try: 2346 try:
2437 await self.ap_post(inbox, ap_item["actor"], ap_item) 2347 await self.ap_post(inbox, ap_item["actor"], ap_item)
2438 except Exception as e: 2348 except Exception as e:
2439 # TODO: send an error <message> 2349 # TODO: send an error <message>
2440 log.warning( 2350 log.warning(f"Can't send message to {inbox}: {e}")
2441 f"Can't send message to {inbox}: {e}"
2442 )
2443 return mess_data 2351 return mess_data
2444 2352
2445 async def _on_message_retract( 2353 async def _on_message_retract(
2446 self, 2354 self,
2447 client: SatXMPPEntity, 2355 client: SatXMPPEntity,
2448 message_elt: domish.Element, 2356 message_elt: domish.Element,
2449 retract_elt: domish.Element, 2357 retract_elt: domish.Element,
2450 history: History 2358 history: History,
2451 ) -> bool: 2359 ) -> bool:
2452 if client != self.client: 2360 if client != self.client:
2453 return True 2361 return True
2454 from_jid = jid.JID(message_elt["from"]) 2362 from_jid = jid.JID(message_elt["from"])
2455 if not self.is_local(from_jid): 2363 if not self.is_local(from_jid):
2456 log.debug( 2364 log.debug(f"ignoring retract request from non local jid {from_jid}")
2457 f"ignoring retract request from non local jid {from_jid}"
2458 )
2459 return False 2365 return False
2460 requestor_actor_id = self.build_apurl( 2366 requestor_actor_id = self.build_apurl(TYPE_ACTOR, from_jid.userhost())
2461 TYPE_ACTOR,
2462 from_jid.userhost()
2463 )
2464 to_jid = jid.JID(message_elt["to"]) 2367 to_jid = jid.JID(message_elt["to"])
2465 if (to_jid.host != self.client.jid.full() or not to_jid.user): 2368 if to_jid.host != self.client.jid.full() or not to_jid.user:
2466 # to_jid should be a virtual JID from this gateway 2369 # to_jid should be a virtual JID from this gateway
2467 raise exceptions.InternalError( 2370 raise exceptions.InternalError(f"Invalid destinee's JID: {to_jid.full()}")
2468 f"Invalid destinee's JID: {to_jid.full()}"
2469 )
2470 ap_account = self._e.unescape(to_jid.user) 2371 ap_account = self._e.unescape(to_jid.user)
2471 actor_id = await self.get_ap_actor_id_from_account(ap_account) 2372 actor_id = await self.get_ap_actor_id_from_account(ap_account)
2472 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False) 2373 inbox = await self.get_ap_inbox_from_id(
2374 requestor_actor_id, actor_id, use_shared=False
2375 )
2473 url_actor, ap_item = await self.ap_delete_item( 2376 url_actor, ap_item = await self.ap_delete_item(
2474 from_jid.userhostJID(), None, retract_elt["id"], public=False 2377 from_jid.userhostJID(), None, retract_elt["id"], public=False
2475 ) 2378 )
2476 resp = await self.ap_post(inbox, url_actor, ap_item) 2379 resp = await self.ap_post(inbox, url_actor, ap_item)
2477 return False 2380 return False
2478 2381
2479 async def _on_reference_received( 2382 async def _on_reference_received(
2480 self, 2383 self,
2481 client: SatXMPPEntity, 2384 client: SatXMPPEntity,
2482 message_elt: domish.Element, 2385 message_elt: domish.Element,
2483 reference_data: Dict[str, Union[str, int]] 2386 reference_data: Dict[str, Union[str, int]],
2484 ) -> bool: 2387 ) -> bool:
2485 parsed_uri: dict = reference_data.get("parsed_uri") 2388 parsed_uri: dict = reference_data.get("parsed_uri")
2486 if not parsed_uri: 2389 if not parsed_uri:
2487 log.warning(f"no parsed URI available in reference {reference_data}") 2390 log.warning(f"no parsed URI available in reference {reference_data}")
2488 return False 2391 return False
2551 client, cached_item.data, pubsub_service, pubsub_node 2454 client, cached_item.data, pubsub_service, pubsub_node
2552 ) 2455 )
2553 ap_item = await self.mb_data_2_ap_item(client, mb_data) 2456 ap_item = await self.mb_data_2_ap_item(client, mb_data)
2554 ap_object = ap_item["object"] 2457 ap_object = ap_item["object"]
2555 ap_object["to"] = [actor_id] 2458 ap_object["to"] = [actor_id]
2556 ap_object.setdefault("tag", []).append({ 2459 ap_object.setdefault("tag", []).append(
2557 "type": TYPE_MENTION, 2460 {
2558 "href": actor_id, 2461 "type": TYPE_MENTION,
2559 "name": ap_account, 2462 "href": actor_id,
2560 }) 2463 "name": ap_account,
2464 }
2465 )
2561 2466
2562 requestor_actor_id = ap_item["actor"] 2467 requestor_actor_id = ap_item["actor"]
2563 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False) 2468 inbox = await self.get_ap_inbox_from_id(
2469 requestor_actor_id, actor_id, use_shared=False
2470 )
2564 2471
2565 await self.ap_post(inbox, requestor_actor_id, ap_item) 2472 await self.ap_post(inbox, requestor_actor_id, ap_item)
2566 2473
2567 return False 2474 return False
2568 2475
2581 "Ignoring AP item replying to an XMPP item with an unexpected URL " 2488 "Ignoring AP item replying to an XMPP item with an unexpected URL "
2582 f"type({url_type!r}):\n{pformat(ap_item)}" 2489 f"type({url_type!r}):\n{pformat(ap_item)}"
2583 ) 2490 )
2584 return 2491 return
2585 try: 2492 try:
2586 parent_item_account, parent_item_id = url_args[0], '/'.join(url_args[1:]) 2493 parent_item_account, parent_item_id = url_args[0], "/".join(url_args[1:])
2587 except (IndexError, ValueError): 2494 except (IndexError, ValueError):
2588 log.warning( 2495 log.warning(
2589 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " 2496 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
2590 f"({in_reply_to!r}):\n{pformat(ap_item)}" 2497 f"({in_reply_to!r}):\n{pformat(ap_item)}"
2591 ) 2498 )
2601 try: 2508 try:
2602 parent_item_elt = items[0] 2509 parent_item_elt = items[0]
2603 except IndexError: 2510 except IndexError:
2604 log.warning( 2511 log.warning(
2605 f"Can't find parent item at {parent_item_service} (node " 2512 f"Can't find parent item at {parent_item_service} (node "
2606 f"{parent_item_node!r})\n{pformat(ap_item)}") 2513 f"{parent_item_node!r})\n{pformat(ap_item)}"
2514 )
2607 return 2515 return
2608 parent_item_parsed = await self._m.item_2_mb_data( 2516 parent_item_parsed = await self._m.item_2_mb_data(
2609 client, parent_item_elt, parent_item_service, parent_item_node 2517 client, parent_item_elt, parent_item_service, parent_item_node
2610 ) 2518 )
2611 try: 2519 try:
2612 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"]) 2520 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"])
2613 comment_node = parent_item_parsed["comments"][0]["node"] 2521 comment_node = parent_item_parsed["comments"][0]["node"]
2614 except (KeyError, IndexError): 2522 except (KeyError, IndexError):
2615 # we don't have a comment node set for this item 2523 # we don't have a comment node set for this item
2616 from libervia.backend.tools.xml_tools import pp_elt 2524 from libervia.backend.tools.xml_tools import pp_elt
2525
2617 log.info(f"{pp_elt(parent_item_elt.toXml())}") 2526 log.info(f"{pp_elt(parent_item_elt.toXml())}")
2618 raise NotImplementedError() 2527 raise NotImplementedError()
2619 else: 2528 else:
2620 requestor_actor_id = self.build_apurl( 2529 requestor_actor_id = self.build_apurl(
2621 TYPE_ACTOR, 2530 TYPE_ACTOR,
2622 await self.get_ap_account_from_jid_and_node(comment_service, comment_node) 2531 await self.get_ap_account_from_jid_and_node(
2532 comment_service, comment_node
2533 ),
2623 ) 2534 )
2624 __, item_elt = await self.ap_item_2_mb_data_and_elt( 2535 __, item_elt = await self.ap_item_2_mb_data_and_elt(
2625 requestor_actor_id, 2536 requestor_actor_id, ap_item
2626 ap_item
2627 ) 2537 )
2628 await self._p.publish(client, comment_service, comment_node, [item_elt]) 2538 await self._p.publish(client, comment_service, comment_node, [item_elt])
2629 await self.notify_mentions( 2539 await self.notify_mentions(
2630 targets, mentions, comment_service, comment_node, item_elt["id"] 2540 targets, mentions, comment_service, comment_node, item_elt["id"]
2631 ) 2541 )
2632 2542
2633 def get_ap_item_targets( 2543 def get_ap_item_targets(
2634 self, 2544 self, item: Dict[str, Any]
2635 item: Dict[str, Any]
2636 ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]: 2545 ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]:
2637 """Retrieve targets of an AP item, and indicate if it's a public one 2546 """Retrieve targets of an AP item, and indicate if it's a public one
2638 2547
2639 @param item: AP object payload 2548 @param item: AP object payload
2640 @return: Are returned: 2549 @return: Are returned:
2702 @param item: AP object payload 2611 @param item: AP object payload
2703 """ 2612 """
2704 is_public, targets, mentions = self.get_ap_item_targets(item) 2613 is_public, targets, mentions = self.get_ap_item_targets(item)
2705 if not is_public and targets.keys() == {TYPE_ACTOR}: 2614 if not is_public and targets.keys() == {TYPE_ACTOR}:
2706 # this is a direct message 2615 # this is a direct message
2707 await self.handle_message_ap_item( 2616 await self.handle_message_ap_item(client, targets, mentions, destinee, item)
2708 client, targets, mentions, destinee, item
2709 )
2710 else: 2617 else:
2711 await self.handle_pubsub_ap_item( 2618 await self.handle_pubsub_ap_item(
2712 client, targets, mentions, destinee, node, item, is_public 2619 client, targets, mentions, destinee, node, item, is_public
2713 ) 2620 )
2714 2621
2715 def get_requestor_actor_id_from_targets( 2622 def get_requestor_actor_id_from_targets(self, targets: set[str]) -> str:
2716 self,
2717 targets: set[str]
2718 ) -> str:
2719 """Find local actor to use as requestor_actor_id from request targets. 2623 """Find local actor to use as requestor_actor_id from request targets.
2720 2624
2721 A local actor must be used to sign HTTP request, notably HTTP GET request for AP 2625 A local actor must be used to sign HTTP request, notably HTTP GET request for AP
2722 instance checking signature, such as Mastodon when set in "secure mode". 2626 instance checking signature, such as Mastodon when set in "secure mode".
2723 2627
2729 @return: local actor ID to use as requestor_actor_id. 2633 @return: local actor ID to use as requestor_actor_id.
2730 """ 2634 """
2731 try: 2635 try:
2732 return next(t for t in targets if self.is_local_url(t)) 2636 return next(t for t in targets if self.is_local_url(t))
2733 except StopIteration: 2637 except StopIteration:
2734 log.warning( 2638 log.warning(f"Can't find local target to use as requestor ID: {targets!r}")
2735 f"Can't find local target to use as requestor ID: {targets!r}" 2639 return self.build_apurl(TYPE_ACTOR, f"libervia@{self.public_url}")
2736 )
2737 return self.build_apurl(
2738 TYPE_ACTOR, f"libervia@{self.public_url}"
2739 )
2740 2640
2741 async def handle_message_ap_item( 2641 async def handle_message_ap_item(
2742 self, 2642 self,
2743 client: SatXMPPEntity, 2643 client: SatXMPPEntity,
2744 targets: dict[str, Set[str]], 2644 targets: dict[str, Set[str]],
2745 mentions: list[Dict[str, str]], 2645 mentions: list[Dict[str, str]],
2746 destinee: jid.JID|None, 2646 destinee: jid.JID | None,
2747 item: dict, 2647 item: dict,
2748 ) -> None: 2648 ) -> None:
2749 """Parse and deliver direct AP items translating to XMPP messages 2649 """Parse and deliver direct AP items translating to XMPP messages
2750 2650
2751 @param targets: actors where the item must be delivered 2651 @param targets: actors where the item must be delivered
2753 @param item: AP object payload 2653 @param item: AP object payload
2754 """ 2654 """
2755 targets_urls = {t for t_set in targets.values() for t in t_set} 2655 targets_urls = {t for t_set in targets.values() for t in t_set}
2756 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) 2656 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls)
2757 targets_jids = { 2657 targets_jids = {
2758 await self.get_jid_from_id(requestor_actor_id, url) 2658 await self.get_jid_from_id(requestor_actor_id, url) for url in targets_urls
2759 for url in targets_urls
2760 } 2659 }
2761 if destinee is not None: 2660 if destinee is not None:
2762 targets_jids.add(destinee) 2661 targets_jids.add(destinee)
2763 mb_data = await self.ap_item_2_mb_data(requestor_actor_id, item) 2662 mb_data = await self.ap_item_2_mb_data(requestor_actor_id, item)
2764 extra = { 2663 extra = {"origin_id": mb_data["id"]}
2765 "origin_id": mb_data["id"]
2766 }
2767 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) 2664 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS)
2768 if attachments: 2665 if attachments:
2769 extra[C.KEY_ATTACHMENTS] = attachments 2666 extra[C.KEY_ATTACHMENTS] = attachments
2770 2667
2771 defer_l = [] 2668 defer_l = []
2772 for target_jid in targets_jids: 2669 for target_jid in targets_jids:
2773 defer_l.append( 2670 defer_l.append(
2774 client.sendMessage( 2671 client.sendMessage(
2775 target_jid, 2672 target_jid,
2776 {'': mb_data.get("content", "")}, 2673 {"": mb_data.get("content", "")},
2777 mb_data.get("title"), 2674 mb_data.get("title"),
2778 extra=extra 2675 extra=extra,
2779 ) 2676 )
2780 ) 2677 )
2781 await defer.DeferredList(defer_l) 2678 await defer.DeferredList(defer_l)
2782 2679
2783 async def notify_mentions( 2680 async def notify_mentions(
2796 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). 2693 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes).
2797 2694
2798 """ 2695 """
2799 targets_urls = {t for t_set in targets.values() for t in t_set} 2696 targets_urls = {t for t_set in targets.values() for t in t_set}
2800 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) 2697 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls)
2801 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) 2698 anchor = uri.build_xmpp_uri(
2699 "pubsub", path=service.full(), node=node, item=item_id
2700 )
2802 seen = set() 2701 seen = set()
2803 # we start with explicit mentions because mentions' content will be used in the 2702 # we start with explicit mentions because mentions' content will be used in the
2804 # future to fill "begin" and "end" reference attributes (we can't do it at the 2703 # future to fill "begin" and "end" reference attributes (we can't do it at the
2805 # moment as there is no way to specify the XML element to use in the blog item). 2704 # moment as there is no way to specify the XML element to use in the blog item).
2806 for mention in mentions: 2705 for mention in mentions:
2807 mentioned_jid = await self.get_jid_from_id(requestor_actor_id, mention["uri"]) 2706 mentioned_jid = await self.get_jid_from_id(requestor_actor_id, mention["uri"])
2808 self._refs.send_reference( 2707 self._refs.send_reference(self.client, to_jid=mentioned_jid, anchor=anchor)
2809 self.client,
2810 to_jid=mentioned_jid,
2811 anchor=anchor
2812 )
2813 seen.add(mentioned_jid) 2708 seen.add(mentioned_jid)
2814 2709
2815 remaining = { 2710 remaining = {
2816 await self.get_jid_from_id(requestor_actor_id, t) 2711 await self.get_jid_from_id(requestor_actor_id, t)
2817 for t_set in targets.values() 2712 for t_set in targets.values()
2818 for t in t_set 2713 for t in t_set
2819 } - seen 2714 } - seen
2820 for target in remaining: 2715 for target in remaining:
2821 self._refs.send_reference( 2716 self._refs.send_reference(self.client, to_jid=target, anchor=anchor)
2822 self.client,
2823 to_jid=target,
2824 anchor=anchor
2825 )
2826 2717
2827 async def handle_pubsub_ap_item( 2718 async def handle_pubsub_ap_item(
2828 self, 2719 self,
2829 client: SatXMPPEntity, 2720 client: SatXMPPEntity,
2830 targets: dict[str, set[str]], 2721 targets: dict[str, set[str]],
2831 mentions: list[dict[str, str]], 2722 mentions: list[dict[str, str]],
2832 destinee: jid.JID|None, 2723 destinee: jid.JID | None,
2833 node: str, 2724 node: str,
2834 item: dict, 2725 item: dict,
2835 public: bool 2726 public: bool,
2836 ) -> None: 2727 ) -> None:
2837 """Analyse, cache and deliver AP items translating to Pubsub 2728 """Analyse, cache and deliver AP items translating to Pubsub
2838 2729
2839 @param targets: actors/collections where the item must be delivered 2730 @param targets: actors/collections where the item must be delivered
2840 @param destinee: jid of the destinee, 2731 @param destinee: jid of the destinee,
2857 return 2748 return
2858 2749
2859 # this item is a reply to an AP item, we use or create a corresponding node 2750 # this item is a reply to an AP item, we use or create a corresponding node
2860 # for comments 2751 # for comments
2861 parent_node, __ = await self.get_comments_nodes( 2752 parent_node, __ = await self.get_comments_nodes(
2862 requestor_actor_id, 2753 requestor_actor_id, item["id"], in_reply_to
2863 item["id"],
2864 in_reply_to
2865 ) 2754 )
2866 node = parent_node or node 2755 node = parent_node or node
2867 cached_node = await self.host.memory.storage.get_pubsub_node( 2756 cached_node = await self.host.memory.storage.get_pubsub_node(
2868 client, service, node, with_subscriptions=True, create=True, 2757 client,
2869 create_kwargs={"subscribed": True} 2758 service,
2759 node,
2760 with_subscriptions=True,
2761 create=True,
2762 create_kwargs={"subscribed": True},
2870 ) 2763 )
2871 else: 2764 else:
2872 # it is a root item (i.e. not a reply to an other item) 2765 # it is a root item (i.e. not a reply to an other item)
2873 create = node == self._events.namespace 2766 create = node == self._events.namespace
2874 cached_node = await self.host.memory.storage.get_pubsub_node( 2767 cached_node = await self.host.memory.storage.get_pubsub_node(
2876 ) 2769 )
2877 if cached_node is None: 2770 if cached_node is None:
2878 log.warning( 2771 log.warning(
2879 f"Received item in unknown node {node!r} at {service}. This may be " 2772 f"Received item in unknown node {node!r} at {service}. This may be "
2880 f"due to a cache purge. We synchronise the node\n{item}" 2773 f"due to a cache purge. We synchronise the node\n{item}"
2881
2882 ) 2774 )
2883 return 2775 return
2884 if item.get("type") == TYPE_EVENT: 2776 if item.get("type") == TYPE_EVENT:
2885 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt( 2777 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(
2886 requestor_actor_id, 2778 requestor_actor_id, item
2887 item
2888 ) 2779 )
2889 else: 2780 else:
2890 data, item_elt = await self.ap_item_2_mb_data_and_elt( 2781 data, item_elt = await self.ap_item_2_mb_data_and_elt(
2891 requestor_actor_id, 2782 requestor_actor_id, item
2892 item
2893 ) 2783 )
2894 await self.host.memory.storage.cache_pubsub_items( 2784 await self.host.memory.storage.cache_pubsub_items(
2895 client, 2785 client, cached_node, [item_elt], [data]
2896 cached_node,
2897 [item_elt],
2898 [data]
2899 ) 2786 )
2900 2787
2901 for subscription in cached_node.subscriptions: 2788 for subscription in cached_node.subscriptions:
2902 if subscription.state != SubscriptionState.SUBSCRIBED: 2789 if subscription.state != SubscriptionState.SUBSCRIBED:
2903 continue 2790 continue
2904 self.pubsub_service.notifyPublish( 2791 self.pubsub_service.notifyPublish(
2905 service, 2792 service, node, [(subscription.subscriber, None, [item_elt])]
2906 node,
2907 [(subscription.subscriber, None, [item_elt])]
2908 ) 2793 )
2909 2794
2910 await self.notify_mentions(targets, mentions, service, node, item_elt["id"]) 2795 await self.notify_mentions(targets, mentions, service, node, item_elt["id"])
2911 2796
2912 async def new_ap_delete_item( 2797 async def new_ap_delete_item(
2938 history = await self.host.memory.storage.get( 2823 history = await self.host.memory.storage.get(
2939 client, 2824 client,
2940 History, 2825 History,
2941 History.origin_id, 2826 History.origin_id,
2942 item_id, 2827 item_id,
2943 (History.messages, History.subjects) 2828 (History.messages, History.subjects),
2944 ) 2829 )
2945 2830
2946 if history is not None: 2831 if history is not None:
2947 # it's a direct message 2832 # it's a direct message
2948 if history.source_jid != client.jid: 2833 if history.source_jid != client.jid:
2971 item_elt["id"] = item_id 2856 item_elt["id"] = item_id
2972 for subscription in cached_node.subscriptions: 2857 for subscription in cached_node.subscriptions:
2973 if subscription.state != SubscriptionState.SUBSCRIBED: 2858 if subscription.state != SubscriptionState.SUBSCRIBED:
2974 continue 2859 continue
2975 self.pubsub_service.notifyRetract( 2860 self.pubsub_service.notifyRetract(
2976 client.jid, 2861 client.jid, node, [(subscription.subscriber, None, [item_elt])]
2977 node, 2862 )
2978 [(subscription.subscriber, None, [item_elt])]
2979 )