Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | d366d90a71aa |
children | 0f953ce5f0a8 |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
85 TYPE_LIKE, | 85 TYPE_LIKE, |
86 TYPE_MENTION, | 86 TYPE_MENTION, |
87 TYPE_REACTION, | 87 TYPE_REACTION, |
88 TYPE_TOMBSTONE, | 88 TYPE_TOMBSTONE, |
89 TYPE_JOIN, | 89 TYPE_JOIN, |
90 TYPE_LEAVE | 90 TYPE_LEAVE, |
91 ) | 91 ) |
92 from .http_server import HTTPServer | 92 from .http_server import HTTPServer |
93 from .pubsub_service import APPubsubService | 93 from .pubsub_service import APPubsubService |
94 from .regex import RE_MENTION | 94 from .regex import RE_MENTION |
95 | 95 |
103 C.PI_IMPORT_NAME: IMPORT_NAME, | 103 C.PI_IMPORT_NAME: IMPORT_NAME, |
104 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | 104 C.PI_MODES: [C.PLUG_MODE_COMPONENT], |
105 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | 105 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, |
106 C.PI_PROTOCOLS: [], | 106 C.PI_PROTOCOLS: [], |
107 C.PI_DEPENDENCIES: [ | 107 C.PI_DEPENDENCIES: [ |
108 "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", | 108 "XEP-0050", |
109 "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", | 109 "XEP-0054", |
110 "XEP-0447", "XEP-0471", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY" | 110 "XEP-0060", |
111 "XEP-0084", | |
112 "XEP-0106", | |
113 "XEP-0277", | |
114 "XEP-0292", | |
115 "XEP-0329", | |
116 "XEP-0372", | |
117 "XEP-0424", | |
118 "XEP-0465", | |
119 "XEP-0470", | |
120 "XEP-0447", | |
121 "XEP-0471", | |
122 "PUBSUB_CACHE", | |
123 "TEXT_SYNTAXES", | |
124 "IDENTITY", | |
111 ], | 125 ], |
112 C.PI_RECOMMENDATIONS: [], | 126 C.PI_RECOMMENDATIONS: [], |
113 C.PI_MAIN: "APGateway", | 127 C.PI_MAIN: "APGateway", |
114 C.PI_HANDLER: C.BOOL_TRUE, | 128 C.PI_HANDLER: C.BOOL_TRUE, |
115 C.PI_DESCRIPTION: _( | 129 C.PI_DESCRIPTION: _( |
158 "", | 172 "", |
159 items_cb=self._items_received, | 173 items_cb=self._items_received, |
160 # we want to be sure that the callbacks are launched before pubsub cache's | 174 # we want to be sure that the callbacks are launched before pubsub cache's |
161 # one, as we need to inspect items before they are actually removed from cache | 175 # one, as we need to inspect items before they are actually removed from cache |
162 # or updated | 176 # or updated |
163 priority=1000 | 177 priority=1000, |
164 ) | 178 ) |
165 self.pubsub_service = APPubsubService(self) | 179 self.pubsub_service = APPubsubService(self) |
166 self.ad_hoc = APAdHocService(self) | 180 self.ad_hoc = APAdHocService(self) |
167 self.ap_events = APEvents(self) | 181 self.ap_events = APEvents(self) |
168 host.trigger.add_with_check( | 182 host.trigger.add_with_check( |
169 "message_received", | 183 "message_received", self, self._message_received_trigger, priority=-1000 |
170 self, | |
171 self._message_received_trigger, | |
172 priority=-1000 | |
173 ) | 184 ) |
174 host.trigger.add_with_check( | 185 host.trigger.add_with_check( |
175 "XEP-0424_retract_received", | 186 "XEP-0424_retract_received", self, self._on_message_retract |
176 self, | |
177 self._on_message_retract | |
178 ) | 187 ) |
179 host.trigger.add_with_check( | 188 host.trigger.add_with_check( |
180 "XEP-0372_ref_received", | 189 "XEP-0372_ref_received", self, self._on_reference_received |
181 self, | |
182 self._on_reference_received | |
183 ) | 190 ) |
184 | 191 |
185 host.bridge.add_method( | 192 host.bridge.add_method( |
186 "ap_send", | 193 "ap_send", |
187 ".plugin", | 194 ".plugin", |
213 key_size=4096, | 220 key_size=4096, |
214 ) | 221 ) |
215 private_key_pem = self.private_key.private_bytes( | 222 private_key_pem = self.private_key.private_bytes( |
216 encoding=serialization.Encoding.PEM, | 223 encoding=serialization.Encoding.PEM, |
217 format=serialization.PrivateFormat.PKCS8, | 224 format=serialization.PrivateFormat.PKCS8, |
218 encryption_algorithm=serialization.NoEncryption() | 225 encryption_algorithm=serialization.NoEncryption(), |
219 ).decode() | 226 ).decode() |
220 await self.host.memory.storage.set_private_value( | 227 await self.host.memory.storage.set_private_value( |
221 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile | 228 IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile |
222 ) | 229 ) |
223 else: | 230 else: |
226 password=None, | 233 password=None, |
227 ) | 234 ) |
228 self.public_key = self.private_key.public_key() | 235 self.public_key = self.private_key.public_key() |
229 self.public_key_pem = self.public_key.public_bytes( | 236 self.public_key_pem = self.public_key.public_bytes( |
230 encoding=serialization.Encoding.PEM, | 237 encoding=serialization.Encoding.PEM, |
231 format=serialization.PublicFormat.SubjectPublicKeyInfo | 238 format=serialization.PublicFormat.SubjectPublicKeyInfo, |
232 ).decode() | 239 ).decode() |
233 | 240 |
234 # params | 241 # params |
235 # URL and port | 242 # URL and port |
236 self.public_url = self.host.memory.config_get( | 243 self.public_url = self.host.memory.config_get( |
237 CONF_SECTION, "public_url" | 244 CONF_SECTION, "public_url" |
238 ) or self.host.memory.config_get( | 245 ) or self.host.memory.config_get(CONF_SECTION, "xmpp_domain") |
239 CONF_SECTION, "xmpp_domain" | |
240 ) | |
241 if self.public_url is None: | 246 if self.public_url is None: |
242 log.error( | 247 log.error( |
243 '"public_url" not set in configuration, this is mandatory to have' | 248 '"public_url" not set in configuration, this is mandatory to have' |
244 "ActivityPub Gateway running. Please set this option it to public facing " | 249 "ActivityPub Gateway running. Please set this option it to public facing " |
245 f"url in {CONF_SECTION!r} configuration section." | 250 f"url in {CONF_SECTION!r} configuration section." |
246 ) | 251 ) |
247 return | 252 return |
248 if parse.urlparse(self.public_url).scheme: | 253 if parse.urlparse(self.public_url).scheme: |
249 log.error( | 254 log.error( |
250 "Scheme must not be specified in \"public_url\", please remove it from " | 255 'Scheme must not be specified in "public_url", please remove it from ' |
251 "\"public_url\" configuration option. ActivityPub Gateway won't be run." | 256 '"public_url" configuration option. ActivityPub Gateway won\'t be run.' |
252 ) | 257 ) |
253 return | 258 return |
254 self.http_port = int(self.host.memory.config_get( | 259 self.http_port = int(self.host.memory.config_get(CONF_SECTION, "http_port", 8123)) |
255 CONF_SECTION, 'http_port', 8123)) | |
256 connection_type = self.host.memory.config_get( | 260 connection_type = self.host.memory.config_get( |
257 CONF_SECTION, 'http_connection_type', 'https') | 261 CONF_SECTION, "http_connection_type", "https" |
258 if connection_type not in ('http', 'https'): | 262 ) |
263 if connection_type not in ("http", "https"): | |
259 raise exceptions.ConfigError( | 264 raise exceptions.ConfigError( |
260 'bad ap-gateay http_connection_type, you must use one of "http" or ' | 265 'bad ap-gateay http_connection_type, you must use one of "http" or ' |
261 '"https"' | 266 '"https"' |
262 ) | 267 ) |
263 self.http_sign_get = C.bool( | 268 self.http_sign_get = C.bool( |
264 self.host.memory.config_get(CONF_SECTION, "http_sign_get", C.BOOL_TRUE) | 269 self.host.memory.config_get(CONF_SECTION, "http_sign_get", C.BOOL_TRUE) |
265 ) | 270 ) |
266 self.max_items = int(self.host.memory.config_get( | 271 self.max_items = int( |
267 CONF_SECTION, 'new_node_max_items', 50 | 272 self.host.memory.config_get(CONF_SECTION, "new_node_max_items", 50) |
268 | 273 ) |
269 )) | 274 self.comments_max_depth = int( |
270 self.comments_max_depth = int(self.host.memory.config_get( | 275 self.host.memory.config_get(CONF_SECTION, "comments_max_depth", 0) |
271 CONF_SECTION, 'comments_max_depth', 0 | 276 ) |
272 )) | 277 self.ap_path = self.host.memory.config_get(CONF_SECTION, "ap_path", "_ap") |
273 self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap') | |
274 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") | 278 self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") |
275 # True (default) if we provide gateway only to entities/services from our server | 279 # True (default) if we provide gateway only to entities/services from our server |
276 self.local_only = C.bool( | 280 self.local_only = C.bool( |
277 self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE) | 281 self.host.memory.config_get(CONF_SECTION, "local_only", C.BOOL_TRUE) |
278 ) | 282 ) |
279 # if True (default), mention will be parsed in non-private content coming from | 283 # if True (default), mention will be parsed in non-private content coming from |
280 # XMPP. This is necessary as XEP-0372 are coming separately from item where the | 284 # XMPP. This is necessary as XEP-0372 are coming separately from item where the |
281 # mention is done, which is hard to impossible to translate to ActivityPub (where | 285 # mention is done, which is hard to impossible to translate to ActivityPub (where |
282 # mention specified inside the item directly). See documentation for details. | 286 # mention specified inside the item directly). See documentation for details. |
283 self.auto_mentions = C.bool( | 287 self.auto_mentions = C.bool( |
284 self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE) | 288 self.host.memory.config_get(CONF_SECTION, "auto_mentions", C.BOOL_TRUE) |
285 ) | 289 ) |
286 | 290 |
287 html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get( | 291 html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get( |
288 CONF_SECTION, 'html_redirect_dict', {} | 292 CONF_SECTION, "html_redirect_dict", {} |
289 ) | 293 ) |
290 self.html_redirect: Dict[str, List[dict]] = {} | 294 self.html_redirect: Dict[str, List[dict]] = {} |
291 for url_type, target in html_redirect.items(): | 295 for url_type, target in html_redirect.items(): |
292 if isinstance(target, str): | 296 if isinstance(target, str): |
293 target = {"url": target} | 297 target = {"url": target} |
305 filters["node"] = node_filter | 309 filters["node"] = node_filter |
306 self.html_redirect.setdefault(url_type, []).append(target) | 310 self.html_redirect.setdefault(url_type, []).append(target) |
307 | 311 |
308 # HTTP server launch | 312 # HTTP server launch |
309 self.server = HTTPServer(self) | 313 self.server = HTTPServer(self) |
310 if connection_type == 'http': | 314 if connection_type == "http": |
311 reactor.listenTCP(self.http_port, self.server) | 315 reactor.listenTCP(self.http_port, self.server) |
312 else: | 316 else: |
313 options = tls.get_options_from_config( | 317 options = tls.get_options_from_config(self.host.memory.config, CONF_SECTION) |
314 self.host.memory.config, CONF_SECTION) | |
315 tls.tls_options_check(options) | 318 tls.tls_options_check(options) |
316 context_factory = tls.get_tls_context_factory(options) | 319 context_factory = tls.get_tls_context_factory(options) |
317 reactor.listenSSL(self.http_port, self.server, context_factory) | 320 reactor.listenSSL(self.http_port, self.server, context_factory) |
318 | 321 |
319 async def profile_connecting(self, client): | 322 async def profile_connecting(self, client): |
320 self.client = client | 323 self.client = client |
321 client.sendHistory = True | 324 client.sendHistory = True |
322 client._ap_storage = persistent.LazyPersistentBinaryDict( | 325 client._ap_storage = persistent.LazyPersistentBinaryDict( |
323 IMPORT_NAME, | 326 IMPORT_NAME, client.profile |
324 client.profile | |
325 ) | 327 ) |
326 await self.init(client) | 328 await self.init(client) |
327 | 329 |
328 def profile_connected(self, client): | 330 def profile_connected(self, client): |
329 self.ad_hoc.init(client) | 331 self.ad_hoc.init(client) |
330 | 332 |
331 async def _items_received( | 333 async def _items_received( |
332 self, | 334 self, client: SatXMPPEntity, itemsEvent: pubsub.ItemsEvent |
333 client: SatXMPPEntity, | |
334 itemsEvent: pubsub.ItemsEvent | |
335 ) -> None: | 335 ) -> None: |
336 """Callback called when pubsub items are received | 336 """Callback called when pubsub items are received |
337 | 337 |
338 if the items are adressed to a JID corresponding to an AP actor, they are | 338 if the items are adressed to a JID corresponding to an AP actor, they are |
339 converted to AP items and sent to the corresponding AP server. | 339 converted to AP items and sent to the corresponding AP server. |
353 | 353 |
354 ap_account = self._e.unescape(recipient.user) | 354 ap_account = self._e.unescape(recipient.user) |
355 | 355 |
356 if self._pa.is_attachment_node(itemsEvent.nodeIdentifier): | 356 if self._pa.is_attachment_node(itemsEvent.nodeIdentifier): |
357 await self.convert_and_post_attachments( | 357 await self.convert_and_post_attachments( |
358 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, | 358 client, |
359 itemsEvent.items | 359 ap_account, |
360 itemsEvent.sender, | |
361 itemsEvent.nodeIdentifier, | |
362 itemsEvent.items, | |
360 ) | 363 ) |
361 else: | 364 else: |
362 await self.convert_and_post_items( | 365 await self.convert_and_post_items( |
363 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, | 366 client, |
364 itemsEvent.items | 367 ap_account, |
368 itemsEvent.sender, | |
369 itemsEvent.nodeIdentifier, | |
370 itemsEvent.items, | |
365 ) | 371 ) |
366 | 372 |
367 async def get_virtual_client( | 373 async def get_virtual_client( |
368 self, | 374 self, requestor_actor_id: str, actor_id: str |
369 requestor_actor_id: str, | |
370 actor_id: str | |
371 ) -> SatXMPPEntity: | 375 ) -> SatXMPPEntity: |
372 """Get client for this component with a specified jid | 376 """Get client for this component with a specified jid |
373 | 377 |
374 This is needed to perform operations with the virtual JID corresponding to the AP | 378 This is needed to perform operations with the virtual JID corresponding to the AP |
375 actor instead of the JID of the gateway itself. | 379 actor instead of the JID of the gateway itself. |
411 raise exceptions.ExternalRequestError(msg) | 415 raise exceptions.ExternalRequestError(msg) |
412 try: | 416 try: |
413 return await treq.json_content(resp) | 417 return await treq.json_content(resp) |
414 except Exception as e: | 418 except Exception as e: |
415 raise error.StanzaError( | 419 raise error.StanzaError( |
416 "service-unavailable", | 420 "service-unavailable", text=f"Can't get AP data at {url}: {e}" |
417 text=f"Can't get AP data at {url}: {e}" | |
418 ) | 421 ) |
419 | 422 |
420 async def ap_post(self, url: str, requestor_actor_id: str, doc: dict) -> TReqResponse: | 423 async def ap_post(self, url: str, requestor_actor_id: str, doc: dict) -> TReqResponse: |
421 """Sign a document and post it to AP server | 424 """Sign a document and post it to AP server |
422 | 425 |
427 if self.verbose: | 430 if self.verbose: |
428 __, actor_args = self.parse_apurl(requestor_actor_id) | 431 __, actor_args = self.parse_apurl(requestor_actor_id) |
429 actor_account = actor_args[0] | 432 actor_account = actor_args[0] |
430 to_log = [ | 433 to_log = [ |
431 "", | 434 "", |
432 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" | 435 f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}", |
433 ] | 436 ] |
434 | 437 |
435 body = json.dumps(doc).encode() | 438 body = json.dumps(doc).encode() |
436 headers = self._generate_signed_headers(url, requestor_actor_id, method="post", body=body) | 439 headers = self._generate_signed_headers( |
440 url, requestor_actor_id, method="post", body=body | |
441 ) | |
437 headers["Content-Type"] = MEDIA_TYPE_AP | 442 headers["Content-Type"] = MEDIA_TYPE_AP |
438 | 443 |
439 if self.verbose: | 444 if self.verbose: |
440 if self.verbose >= 3: | 445 if self.verbose >= 3: |
441 h_to_log = "\n".join(f" {k}: {v}" for k, v in headers.items()) | 446 h_to_log = "\n".join(f" {k}: {v}" for k, v in headers.items()) |
442 to_log.append(f" headers:\n{h_to_log}") | 447 to_log.append(f" headers:\n{h_to_log}") |
443 to_log.append("---") | 448 to_log.append("---") |
444 log.info("\n".join(to_log)) | 449 log.info("\n".join(to_log)) |
445 | 450 |
446 resp = await treq.post( | 451 resp = await treq.post(url, body, headers=headers) |
447 url, | |
448 body, | |
449 headers=headers | |
450 ) | |
451 if resp.code >= 300: | 452 if resp.code >= 300: |
452 text = await resp.text() | 453 text = await resp.text() |
453 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") | 454 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") |
454 elif self.verbose: | 455 elif self.verbose: |
455 log.info(f"==> response code: {resp.code}") | 456 log.info(f"==> response code: {resp.code}") |
456 return resp | 457 return resp |
457 | 458 |
458 def _generate_signed_headers( | 459 def _generate_signed_headers( |
459 self, | 460 self, url: str, actor_id: str, method: str, body: bytes | None = None |
460 url: str, | |
461 actor_id: str, | |
462 method: str, | |
463 body: bytes|None = None | |
464 ) -> dict[str, str]: | 461 ) -> dict[str, str]: |
465 """Generate HTTP headers with signature for a given request | 462 """Generate HTTP headers with signature for a given request |
466 | 463 |
467 @param url: AP server endpoint | 464 @param url: AP server endpoint |
468 @param actor_id: originating actor ID (URL) | 465 @param actor_id: originating actor ID (URL) |
472 """ | 469 """ |
473 p_url = parse.urlparse(url) | 470 p_url = parse.urlparse(url) |
474 headers = { | 471 headers = { |
475 "(request-target)": f"{method} {p_url.path}", | 472 "(request-target)": f"{method} {p_url.path}", |
476 "Host": p_url.hostname, | 473 "Host": p_url.hostname, |
477 "Date": http.datetimeToString().decode() | 474 "Date": http.datetimeToString().decode(), |
478 } | 475 } |
479 | 476 |
480 if body: | 477 if body: |
481 digest_algo, digest_hash = self.get_digest(body) | 478 digest_algo, digest_hash = self.get_digest(body) |
482 headers["Digest"] = f"{digest_algo}={digest_hash}" | 479 headers["Digest"] = f"{digest_algo}={digest_hash}" |
485 return headers | 482 return headers |
486 | 483 |
487 @overload | 484 @overload |
488 async def ap_get_object( | 485 async def ap_get_object( |
489 self, requestor_actor_id: str, data: dict, key: str | 486 self, requestor_actor_id: str, data: dict, key: str |
490 ) -> dict|None: | 487 ) -> dict | None: ... |
491 ... | |
492 | 488 |
493 @overload | 489 @overload |
494 async def ap_get_object( | 490 async def ap_get_object( |
495 self, requestor_actor_id: str, data: Union[str, dict], key: None = None | 491 self, requestor_actor_id: str, data: Union[str, dict], key: None = None |
496 ) -> dict: | 492 ) -> dict: ... |
497 ... | 493 |
498 | 494 async def ap_get_object(self, requestor_actor_id: str, data, key=None) -> dict | None: |
499 async def ap_get_object(self, requestor_actor_id: str, data, key = None) -> dict|None: | |
500 """Retrieve an AP object, dereferencing when necessary | 495 """Retrieve an AP object, dereferencing when necessary |
501 | 496 |
502 This method is to be used with attributes marked as "Functional" in | 497 This method is to be used with attributes marked as "Functional" in |
503 https://www.w3.org/TR/activitystreams-vocabulary | 498 https://www.w3.org/TR/activitystreams-vocabulary |
504 @param requestor_actor_id: ID of the actor doing the request. | 499 @param requestor_actor_id: ID of the actor doing the request. |
524 else: | 519 else: |
525 raise NotImplementedError( | 520 raise NotImplementedError( |
526 "was expecting a string or a dict, got {type(value)}: {value!r}}" | 521 "was expecting a string or a dict, got {type(value)}: {value!r}}" |
527 ) | 522 ) |
528 | 523 |
529 async def ap_get_local_object( | 524 async def ap_get_local_object(self, url: str) -> dict: |
530 self, | |
531 url: str | |
532 ) -> dict: | |
533 """Retrieve or generate local object | 525 """Retrieve or generate local object |
534 | 526 |
535 for now, only handle XMPP items to convert to AP | 527 for now, only handle XMPP items to convert to AP |
536 """ | 528 """ |
537 url_type, url_args = self.parse_apurl(url) | 529 url_type, url_args = self.parse_apurl(url) |
594 raise NotImplementedError( | 586 raise NotImplementedError( |
595 'only object from "item" URLs can be retrieved for now' | 587 'only object from "item" URLs can be retrieved for now' |
596 ) | 588 ) |
597 | 589 |
598 async def ap_get_list( | 590 async def ap_get_list( |
599 self, | 591 self, requestor_actor_id: str, data: dict, key: str, only_ids: bool = False |
600 requestor_actor_id: str, | |
601 data: dict, | |
602 key: str, | |
603 only_ids: bool = False | |
604 ) -> Optional[List[Dict[str, Any]]]: | 592 ) -> Optional[List[Dict[str, Any]]]: |
605 """Retrieve a list of objects from AP data, dereferencing when necessary | 593 """Retrieve a list of objects from AP data, dereferencing when necessary |
606 | 594 |
607 This method is to be used with non functional vocabularies. Use ``ap_get_object`` | 595 This method is to be used with non functional vocabularies. Use ``ap_get_object`` |
608 otherwise. | 596 otherwise. |
624 if isinstance(value, dict): | 612 if isinstance(value, dict): |
625 return [value] | 613 return [value] |
626 if not isinstance(value, list): | 614 if not isinstance(value, list): |
627 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") | 615 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") |
628 if only_ids: | 616 if only_ids: |
629 return [ | 617 return [{"id": v["id"]} if isinstance(v, dict) else {"id": v} for v in value] |
630 {"id": v["id"]} if isinstance(v, dict) else {"id": v} | |
631 for v in value | |
632 ] | |
633 else: | 618 else: |
634 return [await self.ap_get_object(requestor_actor_id, i) for i in value] | 619 return [await self.ap_get_object(requestor_actor_id, i) for i in value] |
635 | 620 |
636 async def ap_get_actors( | 621 async def ap_get_actors( |
637 self, | 622 self, requestor_actor_id: str, data: dict, key: str, as_account: bool = True |
638 requestor_actor_id: str, | |
639 data: dict, | |
640 key: str, | |
641 as_account: bool = True | |
642 ) -> List[str]: | 623 ) -> List[str]: |
643 """Retrieve AP actors from data | 624 """Retrieve AP actors from data |
644 | 625 |
645 @param requestor_actor_id: ID of the actor doing the request. | 626 @param requestor_actor_id: ID of the actor doing the request. |
646 @param data: AP object containing a field with actors | 627 @param data: AP object containing a field with actors |
669 except (TypeError, KeyError): | 650 except (TypeError, KeyError): |
670 raise exceptions.DataError( | 651 raise exceptions.DataError( |
671 f"invalid actors list to object {data.get('id')!r}: {value!r}" | 652 f"invalid actors list to object {data.get('id')!r}: {value!r}" |
672 ) | 653 ) |
673 if not value: | 654 if not value: |
674 raise exceptions.DataError( | 655 raise exceptions.DataError(f"list of actors is empty") |
675 f"list of actors is empty" | |
676 ) | |
677 if as_account: | 656 if as_account: |
678 return [ | 657 return [ |
679 await self.get_ap_account_from_id(requestor_actor_id, actor_id) | 658 await self.get_ap_account_from_id(requestor_actor_id, actor_id) |
680 for actor_id in value | 659 for actor_id in value |
681 ] | 660 ] |
695 @param data: AP object | 674 @param data: AP object |
696 @return: actor id of the sender | 675 @return: actor id of the sender |
697 @raise exceptions.NotFound: no actor has been found in data | 676 @raise exceptions.NotFound: no actor has been found in data |
698 """ | 677 """ |
699 try: | 678 try: |
700 actors = await self.ap_get_actors(requestor_actor_id, data, "actor", as_account=False) | 679 actors = await self.ap_get_actors( |
680 requestor_actor_id, data, "actor", as_account=False | |
681 ) | |
701 except exceptions.DataError: | 682 except exceptions.DataError: |
702 actors = None | 683 actors = None |
703 if not actors: | 684 if not actors: |
704 try: | 685 try: |
705 actors = await self.ap_get_actors(requestor_actor_id, data, "attributedTo", as_account=False) | 686 actors = await self.ap_get_actors( |
687 requestor_actor_id, data, "attributedTo", as_account=False | |
688 ) | |
706 except exceptions.DataError: | 689 except exceptions.DataError: |
707 raise exceptions.NotFound( | 690 raise exceptions.NotFound( |
708 'actor not specified in "actor" or "attributedTo"' | 691 'actor not specified in "actor" or "attributedTo"' |
709 ) | 692 ) |
710 try: | 693 try: |
713 raise exceptions.NotFound("list of actors is empty") | 696 raise exceptions.NotFound("list of actors is empty") |
714 | 697 |
715 def must_encode(self, text: str) -> bool: | 698 def must_encode(self, text: str) -> bool: |
716 """Indicate if a text must be period encoded""" | 699 """Indicate if a text must be period encoded""" |
717 return ( | 700 return ( |
718 not RE_ALLOWED_UNQUOTED.match(text) | 701 not RE_ALLOWED_UNQUOTED.match(text) or text.startswith("___") or "---" in text |
719 or text.startswith("___") | |
720 or "---" in text | |
721 ) | 702 ) |
722 | 703 |
723 def period_encode(self, text: str) -> str: | 704 def period_encode(self, text: str) -> str: |
724 """Period encode a text | 705 """Period encode a text |
725 | 706 |
733 .replace("~", "%7e") | 714 .replace("~", "%7e") |
734 .replace("%", ".") | 715 .replace("%", ".") |
735 ) | 716 ) |
736 | 717 |
737 async def get_ap_account_from_jid_and_node( | 718 async def get_ap_account_from_jid_and_node( |
738 self, | 719 self, jid_: jid.JID, node: Optional[str] |
739 jid_: jid.JID, | |
740 node: Optional[str] | |
741 ) -> str: | 720 ) -> str: |
742 """Construct AP account from JID and node | 721 """Construct AP account from JID and node |
743 | 722 |
744 The account construction will use escaping when necessary | 723 The account construction will use escaping when necessary |
745 """ | 724 """ |
776 raise MissingLocalPartError("there should be a local part") | 755 raise MissingLocalPartError("there should be a local part") |
777 | 756 |
778 if node: | 757 if node: |
779 account_elts.extend((node, "---")) | 758 account_elts.extend((node, "---")) |
780 | 759 |
781 account_elts.extend(( | 760 account_elts.extend( |
782 user, "@", jid_.host if is_local else self.client.jid.userhost() | 761 (user, "@", jid_.host if is_local else self.client.jid.userhost()) |
783 )) | 762 ) |
784 return "".join(account_elts) | 763 return "".join(account_elts) |
785 | 764 |
786 def is_local(self, jid_: jid.JID) -> bool: | 765 def is_local(self, jid_: jid.JID) -> bool: |
787 """Returns True if jid_ use a domain or subdomain of gateway's host""" | 766 """Returns True if jid_ use a domain or subdomain of gateway's host""" |
788 local_host = self.client.host.split(".") | 767 local_host = self.client.host.split(".") |
789 assert local_host | 768 assert local_host |
790 return jid_.host.split(".")[-len(local_host):] == local_host | 769 return jid_.host.split(".")[-len(local_host) :] == local_host |
791 | 770 |
792 async def is_pubsub(self, jid_: jid.JID) -> bool: | 771 async def is_pubsub(self, jid_: jid.JID) -> bool: |
793 """Indicate if a JID is a Pubsub service""" | 772 """Indicate if a JID is a Pubsub service""" |
794 host_disco = await self.host.get_disco_infos(self.client, jid_) | 773 host_disco = await self.host.get_disco_infos(self.client, jid_) |
795 return ( | 774 return ("pubsub", "service") in host_disco.identities and not ( |
796 ("pubsub", "service") in host_disco.identities | 775 "pubsub", |
797 and not ("pubsub", "pep") in host_disco.identities | 776 "pep", |
798 ) | 777 ) in host_disco.identities |
799 | 778 |
800 async def get_jid_and_node(self, ap_account: str) -> tuple[jid.JID, str|None]: | 779 async def get_jid_and_node(self, ap_account: str) -> tuple[jid.JID, str | None]: |
801 """Decode raw AP account handle to get XMPP JID and Pubsub Node | 780 """Decode raw AP account handle to get XMPP JID and Pubsub Node |
802 | 781 |
803 Username are case insensitive. | 782 Username are case insensitive. |
804 | 783 |
805 By default, the username correspond to local username (i.e. username from | 784 By default, the username correspond to local username (i.e. username from |
854 else: | 833 else: |
855 node = None | 834 node = None |
856 | 835 |
857 if encoded: | 836 if encoded: |
858 username = parse.unquote( | 837 username = parse.unquote( |
859 RE_PERIOD_ENC.sub(r"%\g<hex>", username), | 838 RE_PERIOD_ENC.sub(r"%\g<hex>", username), errors="strict" |
860 errors="strict" | |
861 ) | 839 ) |
862 if node: | 840 if node: |
863 node = parse.unquote( | 841 node = parse.unquote( |
864 RE_PERIOD_ENC.sub(r"%\g<hex>", node), | 842 RE_PERIOD_ENC.sub(r"%\g<hex>", node), errors="strict" |
865 errors="strict" | |
866 ) | 843 ) |
867 | 844 |
868 if "@" in username: | 845 if "@" in username: |
869 username, domain = username.rsplit("@", 1) | 846 username, domain = username.rsplit("@", 1) |
870 | 847 |
897 """Compute JID linking to an AP account | 874 """Compute JID linking to an AP account |
898 | 875 |
899 The local jid is computer by escaping AP actor handle and using it as local part | 876 The local jid is computer by escaping AP actor handle and using it as local part |
900 of JID, where domain part is this gateway own JID | 877 of JID, where domain part is this gateway own JID |
901 """ | 878 """ |
902 return jid.JID( | 879 return jid.JID(None, (self._e.escape(account), self.client.jid.host, None)) |
903 None, | |
904 ( | |
905 self._e.escape(account), | |
906 self.client.jid.host, | |
907 None | |
908 ) | |
909 ) | |
910 | 880 |
911 async def get_jid_from_id(self, requestor_actor_id: str, actor_id: str) -> jid.JID: | 881 async def get_jid_from_id(self, requestor_actor_id: str, actor_id: str) -> jid.JID: |
912 """Compute JID linking to an AP Actor ID | 882 """Compute JID linking to an AP Actor ID |
913 | 883 |
914 The local jid is computer by escaping AP actor handle and using it as local part | 884 The local jid is computer by escaping AP actor handle and using it as local part |
935 | 905 |
936 @param url: URL to parse (schema is not mandatory) | 906 @param url: URL to parse (schema is not mandatory) |
937 @return: endpoint type and extra arguments | 907 @return: endpoint type and extra arguments |
938 """ | 908 """ |
939 path = parse.urlparse(url).path.lstrip("/") | 909 path = parse.urlparse(url).path.lstrip("/") |
940 type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/") | 910 type_, *extra_args = path[len(self.ap_path) :].lstrip("/").split("/") |
941 return type_, [parse.unquote(a) for a in extra_args] | 911 return type_, [parse.unquote(a) for a in extra_args] |
942 | 912 |
943 def build_apurl(self, type_:str , *args: str) -> str: | 913 def build_apurl(self, type_: str, *args: str) -> str: |
944 """Build an AP endpoint URL | 914 """Build an AP endpoint URL |
945 | 915 |
946 @param type_: type of AP endpoing | 916 @param type_: type of AP endpoing |
947 @param arg: endpoint dependant arguments | 917 @param arg: endpoint dependant arguments |
948 """ | 918 """ |
949 return parse.urljoin( | 919 return parse.urljoin( |
950 self.base_ap_url, | 920 self.base_ap_url, |
951 str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))) | 921 str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))), |
952 ) | 922 ) |
953 | 923 |
954 def is_local_url(self, url: str) -> bool: | 924 def is_local_url(self, url: str) -> bool: |
955 """Tells if an URL link to this component | 925 """Tells if an URL link to this component |
956 | 926 |
992 """Retrieve actor data with LRU cache""" | 962 """Retrieve actor data with LRU cache""" |
993 return await self.ap_get(actor_id, requestor_actor_id) | 963 return await self.ap_get(actor_id, requestor_actor_id) |
994 | 964 |
995 @async_lru(maxsize=LRU_MAX_SIZE) | 965 @async_lru(maxsize=LRU_MAX_SIZE) |
996 async def get_actor_pub_key_data( | 966 async def get_actor_pub_key_data( |
997 self, | 967 self, requestor_actor_id: str, actor_id: str |
998 requestor_actor_id: str, | |
999 actor_id: str | |
1000 ) -> Tuple[str, str, rsa.RSAPublicKey]: | 968 ) -> Tuple[str, str, rsa.RSAPublicKey]: |
1001 """Retrieve Public Key data from actor ID | 969 """Retrieve Public Key data from actor ID |
1002 | 970 |
1003 @param requestor_actor_id: ID of the actor doing the request. | 971 @param requestor_actor_id: ID of the actor doing the request. |
1004 @param actor_id: actor ID (url) | 972 @param actor_id: actor ID (url) |
1065 async def check_signature( | 1033 async def check_signature( |
1066 self, | 1034 self, |
1067 requestor_actor_id: str, | 1035 requestor_actor_id: str, |
1068 signature: str, | 1036 signature: str, |
1069 key_id: str, | 1037 key_id: str, |
1070 headers: Dict[str, str] | 1038 headers: Dict[str, str], |
1071 ) -> str: | 1039 ) -> str: |
1072 """Verify that signature matches given headers | 1040 """Verify that signature matches given headers |
1073 | 1041 |
1074 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 | 1042 see https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-06#section-3.1.2 |
1075 | 1043 |
1079 @param headers: headers and their values, including pseudo-headers | 1047 @param headers: headers and their values, including pseudo-headers |
1080 @return: id of the signing actor | 1048 @return: id of the signing actor |
1081 | 1049 |
1082 @raise InvalidSignature: signature doesn't match headers | 1050 @raise InvalidSignature: signature doesn't match headers |
1083 """ | 1051 """ |
1084 to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) | 1052 to_sign = "\n".join(f"{k.lower()}: {v}" for k, v in headers.items()) |
1085 if key_id.startswith("acct:"): | 1053 if key_id.startswith("acct:"): |
1086 actor = key_id[5:] | 1054 actor = key_id[5:] |
1087 actor_id = await self.get_ap_actor_id_from_account(actor) | 1055 actor_id = await self.get_ap_actor_id_from_account(actor) |
1088 else: | 1056 else: |
1089 actor_id = key_id.split("#", 1)[0] | 1057 actor_id = key_id.split("#", 1)[0] |
1090 | 1058 |
1091 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data( | 1059 pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data( |
1092 requestor_actor_id, | 1060 requestor_actor_id, actor_id |
1093 actor_id | |
1094 ) | 1061 ) |
1095 if pub_key_id != key_id or pub_key_owner != actor_id: | 1062 if pub_key_id != key_id or pub_key_owner != actor_id: |
1096 raise exceptions.EncryptionError("Public Key mismatch") | 1063 raise exceptions.EncryptionError("Public Key mismatch") |
1097 | 1064 |
1098 try: | 1065 try: |
1099 pub_key.verify( | 1066 pub_key.verify( |
1100 base64.b64decode(signature), | 1067 base64.b64decode(signature), |
1101 to_sign.encode(), | 1068 to_sign.encode(), |
1102 # we have to use PKCS1v15 padding to be compatible with Mastodon | 1069 # we have to use PKCS1v15 padding to be compatible with Mastodon |
1103 padding.PKCS1v15(), # type: ignore | 1070 padding.PKCS1v15(), # type: ignore |
1104 hashes.SHA256() # type: ignore | 1071 hashes.SHA256(), # type: ignore |
1105 ) | 1072 ) |
1106 except InvalidSignature: | 1073 except InvalidSignature: |
1107 raise exceptions.EncryptionError( | 1074 raise exceptions.EncryptionError( |
1108 "Invalid signature (using PKC0S1 v1.5 and SHA-256)" | 1075 "Invalid signature (using PKC0S1 v1.5 and SHA-256)" |
1109 ) | 1076 ) |
1110 | 1077 |
1111 return actor_id | 1078 return actor_id |
1112 | 1079 |
1113 def get_signature_data( | 1080 def get_signature_data( |
1114 self, | 1081 self, key_id: str, headers: Dict[str, str] |
1115 key_id: str, | |
1116 headers: Dict[str, str] | |
1117 ) -> Tuple[Dict[str, str], Dict[str, str]]: | 1082 ) -> Tuple[Dict[str, str], Dict[str, str]]: |
1118 """Generate and return signature and corresponding headers | 1083 """Generate and return signature and corresponding headers |
1119 | 1084 |
1120 @param parsed_url: URL where the request is sent/has been received | 1085 @param parsed_url: URL where the request is sent/has been received |
1121 @param key_id: ID of the key (URL linking to the data with public key) | 1086 @param key_id: ID of the key (URL linking to the data with public key) |
1128 ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers | 1093 ``headers`` is an updated copy of ``headers`` arguments, with pseudo-headers |
1129 removed, and ``Signature`` added. | 1094 removed, and ``Signature`` added. |
1130 """ | 1095 """ |
1131 # headers must be lower case | 1096 # headers must be lower case |
1132 l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()} | 1097 l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()} |
1133 to_sign = "\n".join(f"{k}: {v}" for k,v in l_headers.items()) | 1098 to_sign = "\n".join(f"{k}: {v}" for k, v in l_headers.items()) |
1134 signature = base64.b64encode(self.private_key.sign( | 1099 signature = base64.b64encode( |
1135 to_sign.encode(), | 1100 self.private_key.sign( |
1136 # we have to use PKCS1v15 padding to be compatible with Mastodon | 1101 to_sign.encode(), |
1137 padding.PKCS1v15(), # type: ignore | 1102 # we have to use PKCS1v15 padding to be compatible with Mastodon |
1138 hashes.SHA256() # type: ignore | 1103 padding.PKCS1v15(), # type: ignore |
1139 )).decode() | 1104 hashes.SHA256(), # type: ignore |
1105 ) | |
1106 ).decode() | |
1140 sign_data = { | 1107 sign_data = { |
1141 "keyId": key_id, | 1108 "keyId": key_id, |
1142 "Algorithm": "rsa-sha256", | 1109 "Algorithm": "rsa-sha256", |
1143 "headers": " ".join(l_headers.keys()), | 1110 "headers": " ".join(l_headers.keys()), |
1144 "signature": signature | 1111 "signature": signature, |
1145 } | 1112 } |
1146 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} | 1113 new_headers = {k: v for k, v in headers.items() if not k.startswith("(")} |
1147 new_headers["Signature"] = self.build_signature_header(sign_data) | 1114 new_headers["Signature"] = self.build_signature_header(sign_data) |
1148 return new_headers, sign_data | 1115 return new_headers, sign_data |
1149 | 1116 |
1150 async def convert_and_post_items( | 1117 async def convert_and_post_items( |
1151 self, | 1118 self, |
1165 @param subscribe_extra_nodes: if True, extra data nodes will be automatically | 1132 @param subscribe_extra_nodes: if True, extra data nodes will be automatically |
1166 subscribed, that is comment nodes if present and attachments nodes. | 1133 subscribed, that is comment nodes if present and attachments nodes. |
1167 """ | 1134 """ |
1168 actor_id = await self.get_ap_actor_id_from_account(ap_account) | 1135 actor_id = await self.get_ap_actor_id_from_account(ap_account) |
1169 requestor_actor_id = self.build_apurl( | 1136 requestor_actor_id = self.build_apurl( |
1170 TYPE_ACTOR, | 1137 TYPE_ACTOR, await self.get_ap_account_from_jid_and_node(service, node) |
1171 await self.get_ap_account_from_jid_and_node(service, node) | |
1172 ) | 1138 ) |
1173 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) | 1139 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) |
1174 for item in items: | 1140 for item in items: |
1175 if item.name == "item": | 1141 if item.name == "item": |
1176 cached_item = await self.host.memory.storage.search_pubsub_items({ | 1142 cached_item = await self.host.memory.storage.search_pubsub_items( |
1177 "profiles": [self.client.profile], | 1143 { |
1178 "services": [service], | 1144 "profiles": [self.client.profile], |
1179 "nodes": [node], | 1145 "services": [service], |
1180 "names": [item["id"]] | 1146 "nodes": [node], |
1181 }) | 1147 "names": [item["id"]], |
1148 } | |
1149 ) | |
1182 is_new = not bool(cached_item) | 1150 is_new = not bool(cached_item) |
1183 if node.startswith(self._events.namespace): | 1151 if node.startswith(self._events.namespace): |
1184 # event item | 1152 # event item |
1185 event_data = self._events.event_elt_2_event_data(item) | 1153 event_data = self._events.event_elt_2_event_data(item) |
1186 try: | 1154 try: |
1257 client: SatXMPPEntity, | 1225 client: SatXMPPEntity, |
1258 ap_account: str, | 1226 ap_account: str, |
1259 service: jid.JID, | 1227 service: jid.JID, |
1260 node: str, | 1228 node: str, |
1261 items: List[domish.Element], | 1229 items: List[domish.Element], |
1262 publisher: Optional[jid.JID] = None | 1230 publisher: Optional[jid.JID] = None, |
1263 ) -> None: | 1231 ) -> None: |
1264 """Convert XMPP item attachments to AP activities and post them to actor inbox | 1232 """Convert XMPP item attachments to AP activities and post them to actor inbox |
1265 | 1233 |
1266 @param ap_account: account of ActivityPub actor receiving the item | 1234 @param ap_account: account of ActivityPub actor receiving the item |
1267 @param service: JID of the (virtual) pubsub service where the item has been | 1235 @param service: JID of the (virtual) pubsub service where the item has been |
1282 f"{len(items)})" | 1250 f"{len(items)})" |
1283 ) | 1251 ) |
1284 | 1252 |
1285 actor_id = await self.get_ap_actor_id_from_account(ap_account) | 1253 actor_id = await self.get_ap_actor_id_from_account(ap_account) |
1286 requestor_actor_id = self.build_apurl( | 1254 requestor_actor_id = self.build_apurl( |
1287 TYPE_ACTOR, | 1255 TYPE_ACTOR, await self.get_ap_account_from_jid_and_node(service, node) |
1288 await self.get_ap_account_from_jid_and_node(service, node) | |
1289 ) | 1256 ) |
1290 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) | 1257 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) |
1291 | 1258 |
1292 item_elt = items[0] | 1259 item_elt = items[0] |
1293 item_id = item_elt["id"] | 1260 item_id = item_elt["id"] |
1325 ) | 1292 ) |
1326 return | 1293 return |
1327 else: | 1294 else: |
1328 item_url = self.build_apurl(TYPE_ITEM, item_account, item_id) | 1295 item_url = self.build_apurl(TYPE_ITEM, item_account, item_id) |
1329 | 1296 |
1330 old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({ | 1297 old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items( |
1331 "profiles": [self.client.profile], | 1298 { |
1332 "services": [service], | 1299 "profiles": [self.client.profile], |
1333 "nodes": [node], | 1300 "services": [service], |
1334 "names": [item_elt["id"]] | 1301 "nodes": [node], |
1335 }) | 1302 "names": [item_elt["id"]], |
1303 } | |
1304 ) | |
1336 if not old_attachment_pubsub_items: | 1305 if not old_attachment_pubsub_items: |
1337 old_attachment = {} | 1306 old_attachment = {} |
1338 else: | 1307 else: |
1339 old_attachment_items = [i.data for i in old_attachment_pubsub_items] | 1308 old_attachment_items = [i.data for i in old_attachment_pubsub_items] |
1340 old_attachments = self._pa.items_2_attachment_data( | 1309 old_attachments = self._pa.items_2_attachment_data( |
1343 try: | 1312 try: |
1344 old_attachment = old_attachments[0] | 1313 old_attachment = old_attachments[0] |
1345 except IndexError: | 1314 except IndexError: |
1346 # no known element was present in attachments | 1315 # no known element was present in attachments |
1347 old_attachment = {} | 1316 old_attachment = {} |
1348 publisher_account = await self.get_ap_account_from_jid_and_node( | 1317 publisher_account = await self.get_ap_account_from_jid_and_node(publisher, None) |
1349 publisher, | |
1350 None | |
1351 ) | |
1352 publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account) | 1318 publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account) |
1353 try: | 1319 try: |
1354 attachments = self._pa.items_2_attachment_data(client, [item_elt])[0] | 1320 attachments = self._pa.items_2_attachment_data(client, [item_elt])[0] |
1355 except IndexError: | 1321 except IndexError: |
1356 # no known element was present in attachments | 1322 # no known element was present in attachments |
1388 for reaction in reactions: | 1354 for reaction in reactions: |
1389 activity_id = self.build_apurl( | 1355 activity_id = self.build_apurl( |
1390 "reaction", item_account, item_id, reaction.encode().hex() | 1356 "reaction", item_account, item_id, reaction.encode().hex() |
1391 ) | 1357 ) |
1392 reaction_activity = self.create_activity( | 1358 reaction_activity = self.create_activity( |
1393 TYPE_REACTION, publisher_actor_id, item_url, | 1359 TYPE_REACTION, publisher_actor_id, item_url, activity_id=activity_id |
1394 activity_id=activity_id | |
1395 ) | 1360 ) |
1396 reaction_activity["content"] = reaction | 1361 reaction_activity["content"] = reaction |
1397 reaction_activity["to"] = [ap_account] | 1362 reaction_activity["to"] = [ap_account] |
1398 reaction_activity["cc"] = [NS_AP_PUBLIC] | 1363 reaction_activity["cc"] = [NS_AP_PUBLIC] |
1399 if undo: | 1364 if undo: |
1408 if "rsvp" in attachments: | 1373 if "rsvp" in attachments: |
1409 attending = attachments["rsvp"].get("attending", "no") | 1374 attending = attachments["rsvp"].get("attending", "no") |
1410 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") | 1375 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") |
1411 if attending != old_attending: | 1376 if attending != old_attending: |
1412 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE | 1377 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE |
1413 activity_id = self.build_apurl(activity_type.lower(), item_account, item_id) | 1378 activity_id = self.build_apurl( |
1379 activity_type.lower(), item_account, item_id | |
1380 ) | |
1414 activity = self.create_activity( | 1381 activity = self.create_activity( |
1415 activity_type, publisher_actor_id, item_url, activity_id=activity_id | 1382 activity_type, publisher_actor_id, item_url, activity_id=activity_id |
1416 ) | 1383 ) |
1417 activity["to"] = [ap_account] | 1384 activity["to"] = [ap_account] |
1418 activity["cc"] = [NS_AP_PUBLIC] | 1385 activity["cc"] = [NS_AP_PUBLIC] |
1419 await self.ap_post(inbox, publisher_actor_id, activity) | 1386 await self.ap_post(inbox, publisher_actor_id, activity) |
1420 else: | 1387 else: |
1421 if "rsvp" in old_attachment: | 1388 if "rsvp" in old_attachment: |
1422 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") | 1389 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") |
1423 if old_attending == "yes": | 1390 if old_attending == "yes": |
1424 activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) | 1391 activity_id = self.build_apurl( |
1392 TYPE_LEAVE.lower(), item_account, item_id | |
1393 ) | |
1425 activity = self.create_activity( | 1394 activity = self.create_activity( |
1426 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id | 1395 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id |
1427 ) | 1396 ) |
1428 activity["to"] = [ap_account] | 1397 activity["to"] = [ap_account] |
1429 activity["cc"] = [NS_AP_PUBLIC] | 1398 activity["cc"] = [NS_AP_PUBLIC] |
1434 log.debug("storing attachments item in cache") | 1403 log.debug("storing attachments item in cache") |
1435 cached_node = await self.host.memory.storage.get_pubsub_node( | 1404 cached_node = await self.host.memory.storage.get_pubsub_node( |
1436 client, service, node, with_subscriptions=True, create=True | 1405 client, service, node, with_subscriptions=True, create=True |
1437 ) | 1406 ) |
1438 await self.host.memory.storage.cache_pubsub_items( | 1407 await self.host.memory.storage.cache_pubsub_items( |
1439 self.client, | 1408 self.client, cached_node, [item_elt], [attachments] |
1440 cached_node, | |
1441 [item_elt], | |
1442 [attachments] | |
1443 ) | 1409 ) |
1444 | 1410 |
1445 def _publish_message(self, mess_data_s: str, service_s: str, profile: str): | 1411 def _publish_message(self, mess_data_s: str, service_s: str, profile: str): |
1446 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore | 1412 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore |
1447 service = jid.JID(service_s) | 1413 service = jid.JID(service_s) |
1448 client = self.host.get_client(profile) | 1414 client = self.host.get_client(profile) |
1449 return defer.ensureDeferred(self.publish_message(client, mess_data, service)) | 1415 return defer.ensureDeferred(self.publish_message(client, mess_data, service)) |
1450 | 1416 |
1451 @async_lru(maxsize=LRU_MAX_SIZE) | 1417 @async_lru(maxsize=LRU_MAX_SIZE) |
1461 """ | 1427 """ |
1462 if account.count("@") != 1 or "/" in account: | 1428 if account.count("@") != 1 or "/" in account: |
1463 raise ValueError(f"Invalid account: {account!r}") | 1429 raise ValueError(f"Invalid account: {account!r}") |
1464 host = account.split("@")[1] | 1430 host = account.split("@")[1] |
1465 try: | 1431 try: |
1466 finger_data = await treq.json_content(await treq.get( | 1432 finger_data = await treq.json_content( |
1467 f"https://{host}/.well-known/webfinger?" | 1433 await treq.get( |
1468 f"resource=acct:{parse.quote_plus(account)}", | 1434 f"https://{host}/.well-known/webfinger?" |
1469 )) | 1435 f"resource=acct:{parse.quote_plus(account)}", |
1436 ) | |
1437 ) | |
1470 except Exception as e: | 1438 except Exception as e: |
1471 raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") | 1439 raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") |
1472 for link in finger_data.get("links", []): | 1440 for link in finger_data.get("links", []): |
1473 if ( | 1441 if ( |
1474 link.get("type") == "application/activity+json" | 1442 link.get("type") == "application/activity+json" |
1479 raise ValueError( | 1447 raise ValueError( |
1480 f"Invalid webfinger data for {account:r}: missing href" | 1448 f"Invalid webfinger data for {account:r}: missing href" |
1481 ) | 1449 ) |
1482 break | 1450 break |
1483 else: | 1451 else: |
1484 raise ValueError( | 1452 raise ValueError(f"No ActivityPub link found for {account!r}") |
1485 f"No ActivityPub link found for {account!r}" | |
1486 ) | |
1487 return href | 1453 return href |
1488 | 1454 |
1489 async def get_ap_actor_data_from_account( | 1455 async def get_ap_actor_data_from_account( |
1490 self, | 1456 self, requestor_actor_id: str, account: str |
1491 requestor_actor_id: str, | |
1492 account: str | |
1493 ) -> dict: | 1457 ) -> dict: |
1494 """Retrieve ActivityPub Actor data | 1458 """Retrieve ActivityPub Actor data |
1495 | 1459 |
1496 @param account: ActivityPub Actor identifier | 1460 @param account: ActivityPub Actor identifier |
1497 """ | 1461 """ |
1498 href = await self.get_ap_actor_id_from_account(account) | 1462 href = await self.get_ap_actor_id_from_account(account) |
1499 return await self.ap_get(href, requestor_actor_id) | 1463 return await self.ap_get(href, requestor_actor_id) |
1500 | 1464 |
1501 async def get_ap_inbox_from_id( | 1465 async def get_ap_inbox_from_id( |
1502 self, | 1466 self, requestor_actor_id: str, actor_id: str, use_shared: bool = True |
1503 requestor_actor_id: str, | |
1504 actor_id: str, | |
1505 use_shared: bool = True | |
1506 ) -> str: | 1467 ) -> str: |
1507 """Retrieve inbox of an actor_id | 1468 """Retrieve inbox of an actor_id |
1508 | 1469 |
1509 @param requestor_actor_id: ID of the actor doing the request. | 1470 @param requestor_actor_id: ID of the actor doing the request. |
1510 @param actor_id: ID of the actor from whom Inbox must be retrieved. | 1471 @param actor_id: ID of the actor from whom Inbox must be retrieved. |
1529 @return: AP handle | 1490 @return: AP handle |
1530 """ | 1491 """ |
1531 if self.is_local_url(actor_id): | 1492 if self.is_local_url(actor_id): |
1532 url_type, url_args = self.parse_apurl(actor_id) | 1493 url_type, url_args = self.parse_apurl(actor_id) |
1533 if url_type != "actor" or not url_args: | 1494 if url_type != "actor" or not url_args: |
1534 raise exceptions.DataError( | 1495 raise exceptions.DataError(f"invalid local actor ID: {actor_id}") |
1535 f"invalid local actor ID: {actor_id}" | |
1536 ) | |
1537 account = url_args[0] | 1496 account = url_args[0] |
1538 try: | 1497 try: |
1539 account_user, account_host = account.split('@') | 1498 account_user, account_host = account.split("@") |
1540 except ValueError: | 1499 except ValueError: |
1541 raise exceptions.DataError( | 1500 raise exceptions.DataError(f"invalid account from url: {actor_id}") |
1542 f"invalid account from url: {actor_id}" | |
1543 ) | |
1544 if not account_user or account_host != self.public_url: | 1501 if not account_user or account_host != self.public_url: |
1545 raise exceptions.DataError( | 1502 raise exceptions.DataError( |
1546 f"{account!r} is not a valid local account (from {actor_id})" | 1503 f"{account!r} is not a valid local account (from {actor_id})" |
1547 ) | 1504 ) |
1548 return account | 1505 return account |
1634 pass | 1591 pass |
1635 elif start_index > 5000: | 1592 elif start_index > 5000: |
1636 raise error.StanzaError( | 1593 raise error.StanzaError( |
1637 "feature-not-implemented", | 1594 "feature-not-implemented", |
1638 text="Maximum limit for previous_index has been reached, this limit" | 1595 text="Maximum limit for previous_index has been reached, this limit" |
1639 "is set to avoid DoS" | 1596 "is set to avoid DoS", |
1640 ) | 1597 ) |
1641 else: | 1598 else: |
1642 # we'll convert "start_index" to "after_id", thus we need the item just | 1599 # we'll convert "start_index" to "after_id", thus we need the item just |
1643 # before "start_index" | 1600 # before "start_index" |
1644 previous_index = start_index - 1 | 1601 previous_index = start_index - 1 |
1663 f"missing previous page link at {current_page}: {page_data!r}" | 1620 f"missing previous page link at {current_page}: {page_data!r}" |
1664 ) | 1621 ) |
1665 raise error.StanzaError( | 1622 raise error.StanzaError( |
1666 "service-unavailable", | 1623 "service-unavailable", |
1667 "Error while retrieving previous page from AP service at " | 1624 "Error while retrieving previous page from AP service at " |
1668 f"{current_page}" | 1625 f"{current_page}", |
1669 ) | 1626 ) |
1670 | 1627 |
1671 init_page = "last" if chronological_pagination else "first" | 1628 init_page = "last" if chronological_pagination else "first" |
1672 page = collection.get(init_page) | 1629 page = collection.get(init_page) |
1673 if not page: | 1630 if not page: |
1696 log.debug(f"{after_id!r} not found at {page_id}, skipping") | 1653 log.debug(f"{after_id!r} not found at {page_id}, skipping") |
1697 else: | 1654 else: |
1698 found_after_id = True | 1655 found_after_id = True |
1699 if chronological_pagination: | 1656 if chronological_pagination: |
1700 start_index = retrieved_items - len(page_items) + limit_idx + 1 | 1657 start_index = retrieved_items - len(page_items) + limit_idx + 1 |
1701 page_items = page_items[limit_idx+1:] | 1658 page_items = page_items[limit_idx + 1 :] |
1702 else: | 1659 else: |
1703 start_index = count - (retrieved_items - len(page_items) + | 1660 start_index = count - ( |
1704 limit_idx + 1) | 1661 retrieved_items - len(page_items) + limit_idx + 1 |
1662 ) | |
1705 page_items = page_items[:limit_idx] | 1663 page_items = page_items[:limit_idx] |
1706 items.extend(page_items) | 1664 items.extend(page_items) |
1707 else: | 1665 else: |
1708 items.extend(page_items) | 1666 items.extend(page_items) |
1709 if max_items is not None and len(items) >= max_items: | 1667 if max_items is not None and len(items) >= max_items: |
1728 log.warning("Can't determine index of first element") | 1686 log.warning("Can't determine index of first element") |
1729 elif chronological_pagination: | 1687 elif chronological_pagination: |
1730 rsm_resp["index"] = 0 | 1688 rsm_resp["index"] = 0 |
1731 else: | 1689 else: |
1732 rsm_resp["index"] = count - len(items) | 1690 rsm_resp["index"] = count - len(items) |
1733 rsm_resp.update({ | 1691 rsm_resp.update({"first": items[0]["id"], "last": items[-1]["id"]}) |
1734 "first": items[0]["id"], | |
1735 "last": items[-1]["id"] | |
1736 }) | |
1737 | 1692 |
1738 return items, rsm.RSMResponse(**rsm_resp) | 1693 return items, rsm.RSMResponse(**rsm_resp) |
1739 | 1694 |
1740 async def ap_item_2_mb_data_and_elt(self, requestor_actor_id: str, ap_item: dict) -> tuple[dict, domish.Element]: | 1695 async def ap_item_2_mb_data_and_elt( |
1696 self, requestor_actor_id: str, ap_item: dict | |
1697 ) -> tuple[dict, domish.Element]: | |
1741 """Convert AP item to parsed microblog data and corresponding item element | 1698 """Convert AP item to parsed microblog data and corresponding item element |
1742 | 1699 |
1743 @param requestor_actor_id: ID of the actor requesting the conversion. | 1700 @param requestor_actor_id: ID of the actor requesting the conversion. |
1744 @param ap_item: AP item to convert. | 1701 @param ap_item: AP item to convert. |
1745 @return: microblog and correspondign <item> element. | 1702 @return: microblog and correspondign <item> element. |
1752 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] | 1709 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] |
1753 else: | 1710 else: |
1754 item_elt["publisher"] = mb_data["author_jid"] | 1711 item_elt["publisher"] = mb_data["author_jid"] |
1755 return mb_data, item_elt | 1712 return mb_data, item_elt |
1756 | 1713 |
1757 async def ap_item_2_mb_elt(self, requestor_actor_id: str, ap_item: dict) -> domish.Element: | 1714 async def ap_item_2_mb_elt( |
1715 self, requestor_actor_id: str, ap_item: dict | |
1716 ) -> domish.Element: | |
1758 """Convert AP item to XMPP item element | 1717 """Convert AP item to XMPP item element |
1759 | 1718 |
1760 @param requestor_actor_id: ID of the actor requesting the conversion. | 1719 @param requestor_actor_id: ID of the actor requesting the conversion. |
1761 @param ap_item: AP item to convert. | 1720 @param ap_item: AP item to convert. |
1762 @return: <item> element | 1721 @return: <item> element |
1767 async def parse_ap_page( | 1726 async def parse_ap_page( |
1768 self, | 1727 self, |
1769 requestor_actor_id: str, | 1728 requestor_actor_id: str, |
1770 page: Union[str, dict], | 1729 page: Union[str, dict], |
1771 parser: Callable[[str, dict], Awaitable[domish.Element]], | 1730 parser: Callable[[str, dict], Awaitable[domish.Element]], |
1772 only_ids: bool = False | 1731 only_ids: bool = False, |
1773 ) -> Tuple[dict, List[domish.Element]]: | 1732 ) -> Tuple[dict, List[domish.Element]]: |
1774 """Convert AP objects from an AP page to XMPP items | 1733 """Convert AP objects from an AP page to XMPP items |
1775 | 1734 |
1776 @param requestor_actor_id: ID of the actor doing the request. | 1735 @param requestor_actor_id: ID of the actor doing the request. |
1777 @param page: Can be either url linking and AP page, or the page data directly | 1736 @param page: Can be either url linking and AP page, or the page data directly |
1779 @param only_ids: if True, only retrieve items IDs | 1738 @param only_ids: if True, only retrieve items IDs |
1780 @return: page data, pubsub items | 1739 @return: page data, pubsub items |
1781 """ | 1740 """ |
1782 page_data = await self.ap_get_object(requestor_actor_id, page) | 1741 page_data = await self.ap_get_object(requestor_actor_id, page) |
1783 if page_data is None: | 1742 if page_data is None: |
1784 log.warning('No data found in collection') | 1743 log.warning("No data found in collection") |
1785 return {}, [] | 1744 return {}, [] |
1786 ap_items = await self.ap_get_list(requestor_actor_id, page_data, "orderedItems", only_ids=only_ids) | 1745 ap_items = await self.ap_get_list( |
1746 requestor_actor_id, page_data, "orderedItems", only_ids=only_ids | |
1747 ) | |
1787 if ap_items is None: | 1748 if ap_items is None: |
1788 ap_items = await self.ap_get_list(requestor_actor_id, page_data, "items", only_ids=only_ids) | 1749 ap_items = await self.ap_get_list( |
1750 requestor_actor_id, page_data, "items", only_ids=only_ids | |
1751 ) | |
1789 if not ap_items: | 1752 if not ap_items: |
1790 log.warning(f'No item field found in collection: {page_data!r}') | 1753 log.warning(f"No item field found in collection: {page_data!r}") |
1791 return page_data, [] | 1754 return page_data, [] |
1792 else: | 1755 else: |
1793 log.warning( | 1756 log.warning("Items are not ordered, this is not spec compliant") |
1794 "Items are not ordered, this is not spec compliant" | |
1795 ) | |
1796 items = [] | 1757 items = [] |
1797 # AP Collections are in antichronological order, but we expect chronological in | 1758 # AP Collections are in antichronological order, but we expect chronological in |
1798 # Pubsub, thus we reverse it | 1759 # Pubsub, thus we reverse it |
1799 for ap_item in reversed(ap_items): | 1760 for ap_item in reversed(ap_items): |
1800 try: | 1761 try: |
1803 continue | 1764 continue |
1804 | 1765 |
1805 return page_data, items | 1766 return page_data, items |
1806 | 1767 |
1807 async def get_comments_nodes( | 1768 async def get_comments_nodes( |
1808 self, | 1769 self, requestor_actor_id: str, item_id: str, parent_id: Optional[str] |
1809 requestor_actor_id: str, | |
1810 item_id: str, | |
1811 parent_id: Optional[str] | |
1812 ) -> Tuple[Optional[str], Optional[str]]: | 1770 ) -> Tuple[Optional[str], Optional[str]]: |
1813 """Get node where this item is and node to use for comments | 1771 """Get node where this item is and node to use for comments |
1814 | 1772 |
1815 if config option "comments_max_depth" is set, a common node will be used below the | 1773 if config option "comments_max_depth" is set, a common node will be used below the |
1816 given depth | 1774 given depth |
1825 "comments_max_depth") | 1783 "comments_max_depth") |
1826 """ | 1784 """ |
1827 if parent_id is None or not self.comments_max_depth: | 1785 if parent_id is None or not self.comments_max_depth: |
1828 return ( | 1786 return ( |
1829 self._m.get_comments_node(parent_id) if parent_id is not None else None, | 1787 self._m.get_comments_node(parent_id) if parent_id is not None else None, |
1830 self._m.get_comments_node(item_id) | 1788 self._m.get_comments_node(item_id), |
1831 ) | 1789 ) |
1832 parent_url = parent_id | 1790 parent_url = parent_id |
1833 parents = [] | 1791 parents = [] |
1834 for __ in range(COMMENTS_MAX_PARENTS): | 1792 for __ in range(COMMENTS_MAX_PARENTS): |
1835 parent_item = await self.ap_get(parent_url, requestor_actor_id) | 1793 parent_item = await self.ap_get(parent_url, requestor_actor_id) |
1836 parents.insert(0, parent_item) | 1794 parents.insert(0, parent_item) |
1837 parent_url = parent_item.get("inReplyTo") | 1795 parent_url = parent_item.get("inReplyTo") |
1838 if parent_url is None: | 1796 if parent_url is None: |
1839 break | 1797 break |
1840 parent_limit = self.comments_max_depth-1 | 1798 parent_limit = self.comments_max_depth - 1 |
1841 if len(parents) <= parent_limit: | 1799 if len(parents) <= parent_limit: |
1842 return ( | 1800 return ( |
1843 self._m.get_comments_node(parents[-1]["id"]), | 1801 self._m.get_comments_node(parents[-1]["id"]), |
1844 self._m.get_comments_node(item_id) | 1802 self._m.get_comments_node(item_id), |
1845 ) | 1803 ) |
1846 else: | 1804 else: |
1847 last_level_item = parents[parent_limit] | 1805 last_level_item = parents[parent_limit] |
1848 return ( | 1806 return (self._m.get_comments_node(last_level_item["id"]), None) |
1849 self._m.get_comments_node(last_level_item["id"]), | |
1850 None | |
1851 ) | |
1852 | 1807 |
1853 async def ap_item_2_mb_data(self, requestor_actor_id: str, ap_item: dict) -> dict: | 1808 async def ap_item_2_mb_data(self, requestor_actor_id: str, ap_item: dict) -> dict: |
1854 """Convert AP activity or object to microblog data | 1809 """Convert AP activity or object to microblog data |
1855 | 1810 |
1856 @param actor_id: ID of the actor doing the request. | 1811 @param actor_id: ID of the actor doing the request. |
1923 | 1878 |
1924 # author | 1879 # author |
1925 if is_activity: | 1880 if is_activity: |
1926 authors = await self.ap_get_actors(requestor_actor_id, ap_item, "actor") | 1881 authors = await self.ap_get_actors(requestor_actor_id, ap_item, "actor") |
1927 else: | 1882 else: |
1928 authors = await self.ap_get_actors(requestor_actor_id, ap_object, "attributedTo") | 1883 authors = await self.ap_get_actors( |
1884 requestor_actor_id, ap_object, "attributedTo" | |
1885 ) | |
1929 if len(authors) > 1: | 1886 if len(authors) > 1: |
1930 # we only keep first item as author | 1887 # we only keep first item as author |
1931 # TODO: handle multiple actors | 1888 # TODO: handle multiple actors |
1932 log.warning("multiple actors are not managed") | 1889 log.warning("multiple actors are not managed") |
1933 | 1890 |
1961 ) | 1918 ) |
1962 if comments_node is not None: | 1919 if comments_node is not None: |
1963 comments_data = { | 1920 comments_data = { |
1964 "service": author_jid, | 1921 "service": author_jid, |
1965 "node": comments_node, | 1922 "node": comments_node, |
1966 "uri": uri.build_xmpp_uri( | 1923 "uri": uri.build_xmpp_uri("pubsub", path=author_jid, node=comments_node), |
1967 "pubsub", | |
1968 path=author_jid, | |
1969 node=comments_node | |
1970 ) | |
1971 } | 1924 } |
1972 mb_data["comments"] = [comments_data] | 1925 mb_data["comments"] = [comments_data] |
1973 | 1926 |
1974 return mb_data | 1927 return mb_data |
1975 | 1928 |
1976 async def get_reply_to_id_from_xmpp_node( | 1929 async def get_reply_to_id_from_xmpp_node( |
1977 self, | 1930 self, client: SatXMPPEntity, ap_account: str, parent_item: str, mb_data: dict |
1978 client: SatXMPPEntity, | |
1979 ap_account: str, | |
1980 parent_item: str, | |
1981 mb_data: dict | |
1982 ) -> str: | 1931 ) -> str: |
1983 """Get URL to use for ``inReplyTo`` field in AP item. | 1932 """Get URL to use for ``inReplyTo`` field in AP item. |
1984 | 1933 |
1985 There is currently no way to know the parent service of a comment with XEP-0277. | 1934 There is currently no way to know the parent service of a comment with XEP-0277. |
1986 To work around that, we try to check if we have this item in the cache (we | 1935 To work around that, we try to check if we have this item in the cache (we |
1993 @param mb_data: microblog data of the publication | 1942 @param mb_data: microblog data of the publication |
1994 @return: URL to use in ``inReplyTo`` field | 1943 @return: URL to use in ``inReplyTo`` field |
1995 """ | 1944 """ |
1996 # FIXME: propose a protoXEP to properly get parent item, node and service | 1945 # FIXME: propose a protoXEP to properly get parent item, node and service |
1997 | 1946 |
1998 found_items = await self.host.memory.storage.search_pubsub_items({ | 1947 found_items = await self.host.memory.storage.search_pubsub_items( |
1999 "profiles": [client.profile], | 1948 {"profiles": [client.profile], "names": [parent_item]} |
2000 "names": [parent_item] | 1949 ) |
2001 }) | |
2002 if not found_items: | 1950 if not found_items: |
2003 log.warning(f"parent item {parent_item!r} not found in cache") | 1951 log.warning(f"parent item {parent_item!r} not found in cache") |
2004 parent_ap_account = ap_account | 1952 parent_ap_account = ap_account |
2005 elif len(found_items) == 1: | 1953 elif len(found_items) == 1: |
2006 cached_node = found_items[0].node | 1954 cached_node = found_items[0].node |
2007 parent_ap_account = await self.get_ap_account_from_jid_and_node( | 1955 parent_ap_account = await self.get_ap_account_from_jid_and_node( |
2008 cached_node.service, | 1956 cached_node.service, cached_node.name |
2009 cached_node.name | |
2010 ) | 1957 ) |
2011 else: | 1958 else: |
2012 # we found several cached item with given ID, we check if there is one | 1959 # we found several cached item with given ID, we check if there is one |
2013 # corresponding to this author | 1960 # corresponding to this author |
2014 try: | 1961 try: |
2015 author = jid.JID(mb_data["author_jid"]).userhostJID() | 1962 author = jid.JID(mb_data["author_jid"]).userhostJID() |
2016 cached_item = next( | 1963 cached_item = next( |
2017 i for i in found_items | 1964 i |
2018 if jid.JID(i.data["publisher"]).userhostJID() | 1965 for i in found_items |
2019 == author | 1966 if jid.JID(i.data["publisher"]).userhostJID() == author |
2020 ) | 1967 ) |
2021 except StopIteration: | 1968 except StopIteration: |
2022 # no item corresponding to this author, we use ap_account | 1969 # no item corresponding to this author, we use ap_account |
2023 log.warning( | 1970 log.warning( |
2024 "Can't find a single cached item for parent item " | 1971 "Can't find a single cached item for parent item " f"{parent_item!r}" |
2025 f"{parent_item!r}" | |
2026 ) | 1972 ) |
2027 parent_ap_account = ap_account | 1973 parent_ap_account = ap_account |
2028 else: | 1974 else: |
2029 cached_node = cached_item.node | 1975 cached_node = cached_item.node |
2030 parent_ap_account = await self.get_ap_account_from_jid_and_node( | 1976 parent_ap_account = await self.get_ap_account_from_jid_and_node( |
2031 cached_node.service, | 1977 cached_node.service, cached_node.name |
2032 cached_node.name | 1978 ) |
2033 ) | 1979 |
2034 | 1980 return self.build_apurl(TYPE_ITEM, parent_ap_account, parent_item) |
2035 return self.build_apurl( | 1981 |
2036 TYPE_ITEM, parent_ap_account, parent_item | 1982 async def repeated_mb_2_ap_item(self, mb_data: dict) -> dict: |
2037 ) | |
2038 | |
2039 async def repeated_mb_2_ap_item( | |
2040 self, | |
2041 mb_data: dict | |
2042 ) -> dict: | |
2043 """Convert repeated blog item to suitable AP Announce activity | 1983 """Convert repeated blog item to suitable AP Announce activity |
2044 | 1984 |
2045 @param mb_data: microblog metadata of an item repeating an other blog post | 1985 @param mb_data: microblog metadata of an item repeating an other blog post |
2046 @return: Announce activity linking to the repeated item | 1986 @return: Announce activity linking to the repeated item |
2047 """ | 1987 """ |
2048 repeated = mb_data["extra"]["repeated"] | 1988 repeated = mb_data["extra"]["repeated"] |
2049 repeater = jid.JID(repeated["by"]) | 1989 repeater = jid.JID(repeated["by"]) |
2050 repeater_account = await self.get_ap_account_from_jid_and_node( | 1990 repeater_account = await self.get_ap_account_from_jid_and_node(repeater, None) |
2051 repeater, | |
2052 None | |
2053 ) | |
2054 repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account) | 1991 repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account) |
2055 repeated_uri = repeated["uri"] | 1992 repeated_uri = repeated["uri"] |
2056 | 1993 |
2057 if not repeated_uri.startswith("xmpp:"): | 1994 if not repeated_uri.startswith("xmpp:"): |
2058 log.warning( | 1995 log.warning( |
2093 "Announce", repeater_id, announced_uri, activity_id=activity_id | 2030 "Announce", repeater_id, announced_uri, activity_id=activity_id |
2094 ) | 2031 ) |
2095 announce["to"] = [NS_AP_PUBLIC] | 2032 announce["to"] = [NS_AP_PUBLIC] |
2096 announce["cc"] = [ | 2033 announce["cc"] = [ |
2097 self.build_apurl(TYPE_FOLLOWERS, repeater_account), | 2034 self.build_apurl(TYPE_FOLLOWERS, repeater_account), |
2098 await self.get_ap_actor_id_from_account(repeated_account) | 2035 await self.get_ap_actor_id_from_account(repeated_account), |
2099 ] | 2036 ] |
2100 return announce | 2037 return announce |
2101 | 2038 |
2102 async def mb_data_2_ap_item( | 2039 async def mb_data_2_ap_item( |
2103 self, | 2040 self, |
2104 client: SatXMPPEntity, | 2041 client: SatXMPPEntity, |
2105 mb_data: dict, | 2042 mb_data: dict, |
2106 public: bool =True, | 2043 public: bool = True, |
2107 is_new: bool = True, | 2044 is_new: bool = True, |
2108 ) -> dict: | 2045 ) -> dict: |
2109 """Convert Libervia Microblog Data to ActivityPub item | 2046 """Convert Libervia Microblog Data to ActivityPub item |
2110 | 2047 |
2111 @param mb_data: microblog data (as used in plugin XEP-0277) to convert | 2048 @param mb_data: microblog data (as used in plugin XEP-0277) to convert |
2129 if not mb_data.get("id"): | 2066 if not mb_data.get("id"): |
2130 mb_data["id"] = shortuuid.uuid() | 2067 mb_data["id"] = shortuuid.uuid() |
2131 if not mb_data.get("author_jid"): | 2068 if not mb_data.get("author_jid"): |
2132 mb_data["author_jid"] = client.jid.userhost() | 2069 mb_data["author_jid"] = client.jid.userhost() |
2133 ap_account = await self.get_ap_account_from_jid_and_node( | 2070 ap_account = await self.get_ap_account_from_jid_and_node( |
2134 jid.JID(mb_data["author_jid"]), | 2071 jid.JID(mb_data["author_jid"]), None |
2135 None | |
2136 ) | 2072 ) |
2137 url_actor = self.build_apurl(TYPE_ACTOR, ap_account) | 2073 url_actor = self.build_apurl(TYPE_ACTOR, ap_account) |
2138 url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"]) | 2074 url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"]) |
2139 ap_object = { | 2075 ap_object = { |
2140 "id": url_item, | 2076 "id": url_item, |
2151 attachments = extra.get(C.KEY_ATTACHMENTS) | 2087 attachments = extra.get(C.KEY_ATTACHMENTS) |
2152 if attachments: | 2088 if attachments: |
2153 ap_attachments = ap_object["attachment"] = [] | 2089 ap_attachments = ap_object["attachment"] = [] |
2154 for attachment in attachments: | 2090 for attachment in attachments: |
2155 try: | 2091 try: |
2156 url = next( | 2092 url = next(s["url"] for s in attachment["sources"] if "url" in s) |
2157 s['url'] for s in attachment["sources"] if 'url' in s | |
2158 ) | |
2159 except (StopIteration, KeyError): | 2093 except (StopIteration, KeyError): |
2160 log.warning( | 2094 log.warning(f"Ignoring attachment without URL: {attachment}") |
2161 f"Ignoring attachment without URL: {attachment}" | |
2162 ) | |
2163 continue | 2095 continue |
2164 ap_attachment = { | 2096 ap_attachment = {"url": url} |
2165 "url": url | |
2166 } | |
2167 for key, ap_key in ( | 2097 for key, ap_key in ( |
2168 ("media_type", "mediaType"), | 2098 ("media_type", "mediaType"), |
2169 # XXX: yes "name", cf. [ap_item_2_mb_data] | 2099 # XXX: yes "name", cf. [ap_item_2_mb_data] |
2170 ("desc", "name"), | 2100 ("desc", "name"), |
2171 ): | 2101 ): |
2189 mentioned_id = await self.get_ap_actor_id_from_account(mentioned) | 2119 mentioned_id = await self.get_ap_actor_id_from_account(mentioned) |
2190 except Exception as e: | 2120 except Exception as e: |
2191 log.warning(f"Can't add mention to {mentioned!r}: {e}") | 2121 log.warning(f"Can't add mention to {mentioned!r}: {e}") |
2192 else: | 2122 else: |
2193 ap_object["to"].append(mentioned_id) | 2123 ap_object["to"].append(mentioned_id) |
2194 ap_object.setdefault("tag", []).append({ | 2124 ap_object.setdefault("tag", []).append( |
2195 "type": TYPE_MENTION, | 2125 { |
2196 "href": mentioned_id, | 2126 "type": TYPE_MENTION, |
2197 "name": mention, | 2127 "href": mentioned_id, |
2198 }) | 2128 "name": mention, |
2129 } | |
2130 ) | |
2199 try: | 2131 try: |
2200 node = mb_data["node"] | 2132 node = mb_data["node"] |
2201 service = jid.JID(mb_data["service"]) | 2133 service = jid.JID(mb_data["service"]) |
2202 except KeyError: | 2134 except KeyError: |
2203 # node and service must always be specified when this method is used | 2135 # node and service must always be specified when this method is used |
2204 raise exceptions.InternalError( | 2136 raise exceptions.InternalError("node or service is missing in mb_data") |
2205 "node or service is missing in mb_data" | |
2206 ) | |
2207 try: | 2137 try: |
2208 target_ap_account = await self.get_ap_account_from_jid_and_node( | 2138 target_ap_account = await self.get_ap_account_from_jid_and_node( |
2209 service, node | 2139 service, node |
2210 ) | 2140 ) |
2211 except MissingLocalPartError: | 2141 except MissingLocalPartError: |
2224 service, node | 2154 service, node |
2225 ) | 2155 ) |
2226 if self.is_virtual_jid(service): | 2156 if self.is_virtual_jid(service): |
2227 # service is a proxy JID for AP account | 2157 # service is a proxy JID for AP account |
2228 actor_data = await self.get_ap_actor_data_from_account( | 2158 actor_data = await self.get_ap_actor_data_from_account( |
2229 url_actor, | 2159 url_actor, target_ap_account |
2230 target_ap_account | |
2231 ) | 2160 ) |
2232 followers = actor_data.get("followers") | 2161 followers = actor_data.get("followers") |
2233 else: | 2162 else: |
2234 # service is a real XMPP entity | 2163 # service is a real XMPP entity |
2235 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) | 2164 followers = self.build_apurl(TYPE_FOLLOWERS, target_ap_account) |
2242 # this gateway and linking to an ActivityPub actor) | 2171 # this gateway and linking to an ActivityPub actor) |
2243 ap_object["inReplyTo"] = parent_item | 2172 ap_object["inReplyTo"] = parent_item |
2244 else: | 2173 else: |
2245 # the publication is from a followed real XMPP node | 2174 # the publication is from a followed real XMPP node |
2246 ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node( | 2175 ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node( |
2247 client, | 2176 client, ap_account, parent_item, mb_data |
2248 ap_account, | |
2249 parent_item, | |
2250 mb_data | |
2251 ) | 2177 ) |
2252 | 2178 |
2253 return self.create_activity( | 2179 return self.create_activity( |
2254 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item | 2180 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item |
2255 ) | 2181 ) |
2256 | 2182 |
2257 async def publish_message( | 2183 async def publish_message( |
2258 self, | 2184 self, client: SatXMPPEntity, mess_data: dict, service: jid.JID |
2259 client: SatXMPPEntity, | |
2260 mess_data: dict, | |
2261 service: jid.JID | |
2262 ) -> None: | 2185 ) -> None: |
2263 """Send an AP message | 2186 """Send an AP message |
2264 | 2187 |
2265 .. note:: | 2188 .. note:: |
2266 | 2189 |
2290 item_data = await self.mb_data_2_ap_item(client, mess_data) | 2213 item_data = await self.mb_data_2_ap_item(client, mess_data) |
2291 url_actor = item_data["actor"] | 2214 url_actor = item_data["actor"] |
2292 await self.ap_post(inbox_url, url_actor, item_data) | 2215 await self.ap_post(inbox_url, url_actor, item_data) |
2293 | 2216 |
2294 async def ap_delete_item( | 2217 async def ap_delete_item( |
2295 self, | 2218 self, jid_: jid.JID, node: Optional[str], item_id: str, public: bool = True |
2296 jid_: jid.JID, | |
2297 node: Optional[str], | |
2298 item_id: str, | |
2299 public: bool = True | |
2300 ) -> Tuple[str, Dict[str, Any]]: | 2219 ) -> Tuple[str, Dict[str, Any]]: |
2301 """Build activity to delete an AP item | 2220 """Build activity to delete an AP item |
2302 | 2221 |
2303 @param jid_: JID of the entity deleting an item | 2222 @param jid_: JID of the entity deleting an item |
2304 @param node: node where the item is deleted | 2223 @param node: node where the item is deleted |
2312 node = self._m.namespace | 2231 node = self._m.namespace |
2313 | 2232 |
2314 author_account = await self.get_ap_account_from_jid_and_node(jid_, node) | 2233 author_account = await self.get_ap_account_from_jid_and_node(jid_, node) |
2315 author_actor_id = self.build_apurl(TYPE_ACTOR, author_account) | 2234 author_actor_id = self.build_apurl(TYPE_ACTOR, author_account) |
2316 | 2235 |
2317 items = await self.host.memory.storage.search_pubsub_items({ | 2236 items = await self.host.memory.storage.search_pubsub_items( |
2318 "profiles": [self.client.profile], | 2237 {"profiles": [self.client.profile], "services": [jid_], "names": [item_id]} |
2319 "services": [jid_], | 2238 ) |
2320 "names": [item_id] | |
2321 }) | |
2322 if not items: | 2239 if not items: |
2323 log.warning( | 2240 log.warning( |
2324 f"Deleting an unknown item at service {jid_}, node {node} and id " | 2241 f"Deleting an unknown item at service {jid_}, node {node} and id " |
2325 f"{item_id}" | 2242 f"{item_id}" |
2326 ) | 2243 ) |
2327 else: | 2244 else: |
2328 try: | 2245 try: |
2329 mb_data = await self._m.item_2_mb_data(self.client, items[0].data, jid_, node) | 2246 mb_data = await self._m.item_2_mb_data( |
2247 self.client, items[0].data, jid_, node | |
2248 ) | |
2330 if "repeated" in mb_data["extra"]: | 2249 if "repeated" in mb_data["extra"]: |
2331 # we are deleting a repeated item, we must translate this to an | 2250 # we are deleting a repeated item, we must translate this to an |
2332 # "Undo" of the "Announce" activity instead of a "Delete" one | 2251 # "Undo" of the "Announce" activity instead of a "Delete" one |
2333 announce = await self.repeated_mb_2_ap_item(mb_data) | 2252 announce = await self.repeated_mb_2_ap_item(mb_data) |
2334 undo = self.create_activity("Undo", author_actor_id, announce) | 2253 undo = self.create_activity("Undo", author_actor_id, announce) |
2339 f"{items[0].toXml()}" | 2258 f"{items[0].toXml()}" |
2340 ) | 2259 ) |
2341 | 2260 |
2342 url_item = self.build_apurl(TYPE_ITEM, author_account, item_id) | 2261 url_item = self.build_apurl(TYPE_ITEM, author_account, item_id) |
2343 ap_item = self.create_activity( | 2262 ap_item = self.create_activity( |
2344 "Delete", | 2263 "Delete", author_actor_id, {"id": url_item, "type": TYPE_TOMBSTONE} |
2345 author_actor_id, | |
2346 { | |
2347 "id": url_item, | |
2348 "type": TYPE_TOMBSTONE | |
2349 } | |
2350 ) | 2264 ) |
2351 if public: | 2265 if public: |
2352 ap_item["to"] = [NS_AP_PUBLIC] | 2266 ap_item["to"] = [NS_AP_PUBLIC] |
2353 return author_actor_id, ap_item | 2267 return author_actor_id, ap_item |
2354 | 2268 |
2355 def _message_received_trigger( | 2269 def _message_received_trigger( |
2356 self, | 2270 self, |
2357 client: SatXMPPEntity, | 2271 client: SatXMPPEntity, |
2358 message_elt: domish.Element, | 2272 message_elt: domish.Element, |
2359 post_treat: defer.Deferred | 2273 post_treat: defer.Deferred, |
2360 ) -> bool: | 2274 ) -> bool: |
2361 """add the gateway workflow on post treatment""" | 2275 """add the gateway workflow on post treatment""" |
2362 if self.client is None: | 2276 if self.client is None: |
2363 log.debug(f"no client set, ignoring message: {message_elt.toXml()}") | 2277 log.debug(f"no client set, ignoring message: {message_elt.toXml()}") |
2364 return True | 2278 return True |
2379 return mess_data | 2293 return mess_data |
2380 if not self.is_local(mess_data["from"]): | 2294 if not self.is_local(mess_data["from"]): |
2381 log.warning(f"ignoring non local message: {mess_data}") | 2295 log.warning(f"ignoring non local message: {mess_data}") |
2382 return mess_data | 2296 return mess_data |
2383 if not mess_data["to"].user: | 2297 if not mess_data["to"].user: |
2384 log.warning( | 2298 log.warning(f"ignoring message addressed to gateway itself: {mess_data}") |
2385 f"ignoring message addressed to gateway itself: {mess_data}" | |
2386 ) | |
2387 return mess_data | 2299 return mess_data |
2388 requestor_actor_id = self.build_apurl(TYPE_ACTOR, mess_data["from"].userhost()) | 2300 requestor_actor_id = self.build_apurl(TYPE_ACTOR, mess_data["from"].userhost()) |
2389 | 2301 |
2390 actor_account = self._e.unescape(mess_data["to"].user) | 2302 actor_account = self._e.unescape(mess_data["to"].user) |
2391 try: | 2303 try: |
2392 actor_id = await self.get_ap_actor_id_from_account(actor_account) | 2304 actor_id = await self.get_ap_actor_id_from_account(actor_account) |
2393 except Exception as e: | 2305 except Exception as e: |
2394 log.warning( | 2306 log.warning(f"Can't retrieve data on actor {actor_account}: {e}") |
2395 f"Can't retrieve data on actor {actor_account}: {e}" | |
2396 ) | |
2397 # TODO: send an error <message> | 2307 # TODO: send an error <message> |
2398 return mess_data | 2308 return mess_data |
2399 inbox = await self.get_ap_inbox_from_id( | 2309 inbox = await self.get_ap_inbox_from_id( |
2400 requestor_actor_id, actor_id, use_shared=False | 2310 requestor_actor_id, actor_id, use_shared=False |
2401 ) | 2311 ) |
2415 if origin_id: | 2325 if origin_id: |
2416 # we need to use origin ID when present to be able to retract the message | 2326 # we need to use origin ID when present to be able to retract the message |
2417 mb_data["id"] = origin_id | 2327 mb_data["id"] = origin_id |
2418 attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS) | 2328 attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS) |
2419 if attachments: | 2329 if attachments: |
2420 mb_data["extra"] = { | 2330 mb_data["extra"] = {C.KEY_ATTACHMENTS: attachments} |
2421 C.KEY_ATTACHMENTS: attachments | |
2422 } | |
2423 | 2331 |
2424 client = self.client.get_virtual_client(mess_data["from"]) | 2332 client = self.client.get_virtual_client(mess_data["from"]) |
2425 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) | 2333 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) |
2426 ap_object = ap_item["object"] | 2334 ap_object = ap_item["object"] |
2427 ap_object["to"] = ap_item["to"] = [actor_id] | 2335 ap_object["to"] = ap_item["to"] = [actor_id] |
2428 # we add a mention to direct message, otherwise peer is not notified in some AP | 2336 # we add a mention to direct message, otherwise peer is not notified in some AP |
2429 # implementations (notably Mastodon), and the message may be missed easily. | 2337 # implementations (notably Mastodon), and the message may be missed easily. |
2430 ap_object.setdefault("tag", []).append({ | 2338 ap_object.setdefault("tag", []).append( |
2431 "type": TYPE_MENTION, | 2339 { |
2432 "href": actor_id, | 2340 "type": TYPE_MENTION, |
2433 "name": f"@{actor_account}", | 2341 "href": actor_id, |
2434 }) | 2342 "name": f"@{actor_account}", |
2343 } | |
2344 ) | |
2435 | 2345 |
2436 try: | 2346 try: |
2437 await self.ap_post(inbox, ap_item["actor"], ap_item) | 2347 await self.ap_post(inbox, ap_item["actor"], ap_item) |
2438 except Exception as e: | 2348 except Exception as e: |
2439 # TODO: send an error <message> | 2349 # TODO: send an error <message> |
2440 log.warning( | 2350 log.warning(f"Can't send message to {inbox}: {e}") |
2441 f"Can't send message to {inbox}: {e}" | |
2442 ) | |
2443 return mess_data | 2351 return mess_data |
2444 | 2352 |
2445 async def _on_message_retract( | 2353 async def _on_message_retract( |
2446 self, | 2354 self, |
2447 client: SatXMPPEntity, | 2355 client: SatXMPPEntity, |
2448 message_elt: domish.Element, | 2356 message_elt: domish.Element, |
2449 retract_elt: domish.Element, | 2357 retract_elt: domish.Element, |
2450 history: History | 2358 history: History, |
2451 ) -> bool: | 2359 ) -> bool: |
2452 if client != self.client: | 2360 if client != self.client: |
2453 return True | 2361 return True |
2454 from_jid = jid.JID(message_elt["from"]) | 2362 from_jid = jid.JID(message_elt["from"]) |
2455 if not self.is_local(from_jid): | 2363 if not self.is_local(from_jid): |
2456 log.debug( | 2364 log.debug(f"ignoring retract request from non local jid {from_jid}") |
2457 f"ignoring retract request from non local jid {from_jid}" | |
2458 ) | |
2459 return False | 2365 return False |
2460 requestor_actor_id = self.build_apurl( | 2366 requestor_actor_id = self.build_apurl(TYPE_ACTOR, from_jid.userhost()) |
2461 TYPE_ACTOR, | |
2462 from_jid.userhost() | |
2463 ) | |
2464 to_jid = jid.JID(message_elt["to"]) | 2367 to_jid = jid.JID(message_elt["to"]) |
2465 if (to_jid.host != self.client.jid.full() or not to_jid.user): | 2368 if to_jid.host != self.client.jid.full() or not to_jid.user: |
2466 # to_jid should be a virtual JID from this gateway | 2369 # to_jid should be a virtual JID from this gateway |
2467 raise exceptions.InternalError( | 2370 raise exceptions.InternalError(f"Invalid destinee's JID: {to_jid.full()}") |
2468 f"Invalid destinee's JID: {to_jid.full()}" | |
2469 ) | |
2470 ap_account = self._e.unescape(to_jid.user) | 2371 ap_account = self._e.unescape(to_jid.user) |
2471 actor_id = await self.get_ap_actor_id_from_account(ap_account) | 2372 actor_id = await self.get_ap_actor_id_from_account(ap_account) |
2472 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False) | 2373 inbox = await self.get_ap_inbox_from_id( |
2374 requestor_actor_id, actor_id, use_shared=False | |
2375 ) | |
2473 url_actor, ap_item = await self.ap_delete_item( | 2376 url_actor, ap_item = await self.ap_delete_item( |
2474 from_jid.userhostJID(), None, retract_elt["id"], public=False | 2377 from_jid.userhostJID(), None, retract_elt["id"], public=False |
2475 ) | 2378 ) |
2476 resp = await self.ap_post(inbox, url_actor, ap_item) | 2379 resp = await self.ap_post(inbox, url_actor, ap_item) |
2477 return False | 2380 return False |
2478 | 2381 |
2479 async def _on_reference_received( | 2382 async def _on_reference_received( |
2480 self, | 2383 self, |
2481 client: SatXMPPEntity, | 2384 client: SatXMPPEntity, |
2482 message_elt: domish.Element, | 2385 message_elt: domish.Element, |
2483 reference_data: Dict[str, Union[str, int]] | 2386 reference_data: Dict[str, Union[str, int]], |
2484 ) -> bool: | 2387 ) -> bool: |
2485 parsed_uri: dict = reference_data.get("parsed_uri") | 2388 parsed_uri: dict = reference_data.get("parsed_uri") |
2486 if not parsed_uri: | 2389 if not parsed_uri: |
2487 log.warning(f"no parsed URI available in reference {reference_data}") | 2390 log.warning(f"no parsed URI available in reference {reference_data}") |
2488 return False | 2391 return False |
2551 client, cached_item.data, pubsub_service, pubsub_node | 2454 client, cached_item.data, pubsub_service, pubsub_node |
2552 ) | 2455 ) |
2553 ap_item = await self.mb_data_2_ap_item(client, mb_data) | 2456 ap_item = await self.mb_data_2_ap_item(client, mb_data) |
2554 ap_object = ap_item["object"] | 2457 ap_object = ap_item["object"] |
2555 ap_object["to"] = [actor_id] | 2458 ap_object["to"] = [actor_id] |
2556 ap_object.setdefault("tag", []).append({ | 2459 ap_object.setdefault("tag", []).append( |
2557 "type": TYPE_MENTION, | 2460 { |
2558 "href": actor_id, | 2461 "type": TYPE_MENTION, |
2559 "name": ap_account, | 2462 "href": actor_id, |
2560 }) | 2463 "name": ap_account, |
2464 } | |
2465 ) | |
2561 | 2466 |
2562 requestor_actor_id = ap_item["actor"] | 2467 requestor_actor_id = ap_item["actor"] |
2563 inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False) | 2468 inbox = await self.get_ap_inbox_from_id( |
2469 requestor_actor_id, actor_id, use_shared=False | |
2470 ) | |
2564 | 2471 |
2565 await self.ap_post(inbox, requestor_actor_id, ap_item) | 2472 await self.ap_post(inbox, requestor_actor_id, ap_item) |
2566 | 2473 |
2567 return False | 2474 return False |
2568 | 2475 |
2581 "Ignoring AP item replying to an XMPP item with an unexpected URL " | 2488 "Ignoring AP item replying to an XMPP item with an unexpected URL " |
2582 f"type({url_type!r}):\n{pformat(ap_item)}" | 2489 f"type({url_type!r}):\n{pformat(ap_item)}" |
2583 ) | 2490 ) |
2584 return | 2491 return |
2585 try: | 2492 try: |
2586 parent_item_account, parent_item_id = url_args[0], '/'.join(url_args[1:]) | 2493 parent_item_account, parent_item_id = url_args[0], "/".join(url_args[1:]) |
2587 except (IndexError, ValueError): | 2494 except (IndexError, ValueError): |
2588 log.warning( | 2495 log.warning( |
2589 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " | 2496 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " |
2590 f"({in_reply_to!r}):\n{pformat(ap_item)}" | 2497 f"({in_reply_to!r}):\n{pformat(ap_item)}" |
2591 ) | 2498 ) |
2601 try: | 2508 try: |
2602 parent_item_elt = items[0] | 2509 parent_item_elt = items[0] |
2603 except IndexError: | 2510 except IndexError: |
2604 log.warning( | 2511 log.warning( |
2605 f"Can't find parent item at {parent_item_service} (node " | 2512 f"Can't find parent item at {parent_item_service} (node " |
2606 f"{parent_item_node!r})\n{pformat(ap_item)}") | 2513 f"{parent_item_node!r})\n{pformat(ap_item)}" |
2514 ) | |
2607 return | 2515 return |
2608 parent_item_parsed = await self._m.item_2_mb_data( | 2516 parent_item_parsed = await self._m.item_2_mb_data( |
2609 client, parent_item_elt, parent_item_service, parent_item_node | 2517 client, parent_item_elt, parent_item_service, parent_item_node |
2610 ) | 2518 ) |
2611 try: | 2519 try: |
2612 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"]) | 2520 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"]) |
2613 comment_node = parent_item_parsed["comments"][0]["node"] | 2521 comment_node = parent_item_parsed["comments"][0]["node"] |
2614 except (KeyError, IndexError): | 2522 except (KeyError, IndexError): |
2615 # we don't have a comment node set for this item | 2523 # we don't have a comment node set for this item |
2616 from libervia.backend.tools.xml_tools import pp_elt | 2524 from libervia.backend.tools.xml_tools import pp_elt |
2525 | |
2617 log.info(f"{pp_elt(parent_item_elt.toXml())}") | 2526 log.info(f"{pp_elt(parent_item_elt.toXml())}") |
2618 raise NotImplementedError() | 2527 raise NotImplementedError() |
2619 else: | 2528 else: |
2620 requestor_actor_id = self.build_apurl( | 2529 requestor_actor_id = self.build_apurl( |
2621 TYPE_ACTOR, | 2530 TYPE_ACTOR, |
2622 await self.get_ap_account_from_jid_and_node(comment_service, comment_node) | 2531 await self.get_ap_account_from_jid_and_node( |
2532 comment_service, comment_node | |
2533 ), | |
2623 ) | 2534 ) |
2624 __, item_elt = await self.ap_item_2_mb_data_and_elt( | 2535 __, item_elt = await self.ap_item_2_mb_data_and_elt( |
2625 requestor_actor_id, | 2536 requestor_actor_id, ap_item |
2626 ap_item | |
2627 ) | 2537 ) |
2628 await self._p.publish(client, comment_service, comment_node, [item_elt]) | 2538 await self._p.publish(client, comment_service, comment_node, [item_elt]) |
2629 await self.notify_mentions( | 2539 await self.notify_mentions( |
2630 targets, mentions, comment_service, comment_node, item_elt["id"] | 2540 targets, mentions, comment_service, comment_node, item_elt["id"] |
2631 ) | 2541 ) |
2632 | 2542 |
2633 def get_ap_item_targets( | 2543 def get_ap_item_targets( |
2634 self, | 2544 self, item: Dict[str, Any] |
2635 item: Dict[str, Any] | |
2636 ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]: | 2545 ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]: |
2637 """Retrieve targets of an AP item, and indicate if it's a public one | 2546 """Retrieve targets of an AP item, and indicate if it's a public one |
2638 | 2547 |
2639 @param item: AP object payload | 2548 @param item: AP object payload |
2640 @return: Are returned: | 2549 @return: Are returned: |
2702 @param item: AP object payload | 2611 @param item: AP object payload |
2703 """ | 2612 """ |
2704 is_public, targets, mentions = self.get_ap_item_targets(item) | 2613 is_public, targets, mentions = self.get_ap_item_targets(item) |
2705 if not is_public and targets.keys() == {TYPE_ACTOR}: | 2614 if not is_public and targets.keys() == {TYPE_ACTOR}: |
2706 # this is a direct message | 2615 # this is a direct message |
2707 await self.handle_message_ap_item( | 2616 await self.handle_message_ap_item(client, targets, mentions, destinee, item) |
2708 client, targets, mentions, destinee, item | |
2709 ) | |
2710 else: | 2617 else: |
2711 await self.handle_pubsub_ap_item( | 2618 await self.handle_pubsub_ap_item( |
2712 client, targets, mentions, destinee, node, item, is_public | 2619 client, targets, mentions, destinee, node, item, is_public |
2713 ) | 2620 ) |
2714 | 2621 |
2715 def get_requestor_actor_id_from_targets( | 2622 def get_requestor_actor_id_from_targets(self, targets: set[str]) -> str: |
2716 self, | |
2717 targets: set[str] | |
2718 ) -> str: | |
2719 """Find local actor to use as requestor_actor_id from request targets. | 2623 """Find local actor to use as requestor_actor_id from request targets. |
2720 | 2624 |
2721 A local actor must be used to sign HTTP request, notably HTTP GET request for AP | 2625 A local actor must be used to sign HTTP request, notably HTTP GET request for AP |
2722 instance checking signature, such as Mastodon when set in "secure mode". | 2626 instance checking signature, such as Mastodon when set in "secure mode". |
2723 | 2627 |
2729 @return: local actor ID to use as requestor_actor_id. | 2633 @return: local actor ID to use as requestor_actor_id. |
2730 """ | 2634 """ |
2731 try: | 2635 try: |
2732 return next(t for t in targets if self.is_local_url(t)) | 2636 return next(t for t in targets if self.is_local_url(t)) |
2733 except StopIteration: | 2637 except StopIteration: |
2734 log.warning( | 2638 log.warning(f"Can't find local target to use as requestor ID: {targets!r}") |
2735 f"Can't find local target to use as requestor ID: {targets!r}" | 2639 return self.build_apurl(TYPE_ACTOR, f"libervia@{self.public_url}") |
2736 ) | |
2737 return self.build_apurl( | |
2738 TYPE_ACTOR, f"libervia@{self.public_url}" | |
2739 ) | |
2740 | 2640 |
2741 async def handle_message_ap_item( | 2641 async def handle_message_ap_item( |
2742 self, | 2642 self, |
2743 client: SatXMPPEntity, | 2643 client: SatXMPPEntity, |
2744 targets: dict[str, Set[str]], | 2644 targets: dict[str, Set[str]], |
2745 mentions: list[Dict[str, str]], | 2645 mentions: list[Dict[str, str]], |
2746 destinee: jid.JID|None, | 2646 destinee: jid.JID | None, |
2747 item: dict, | 2647 item: dict, |
2748 ) -> None: | 2648 ) -> None: |
2749 """Parse and deliver direct AP items translating to XMPP messages | 2649 """Parse and deliver direct AP items translating to XMPP messages |
2750 | 2650 |
2751 @param targets: actors where the item must be delivered | 2651 @param targets: actors where the item must be delivered |
2753 @param item: AP object payload | 2653 @param item: AP object payload |
2754 """ | 2654 """ |
2755 targets_urls = {t for t_set in targets.values() for t in t_set} | 2655 targets_urls = {t for t_set in targets.values() for t in t_set} |
2756 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) | 2656 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) |
2757 targets_jids = { | 2657 targets_jids = { |
2758 await self.get_jid_from_id(requestor_actor_id, url) | 2658 await self.get_jid_from_id(requestor_actor_id, url) for url in targets_urls |
2759 for url in targets_urls | |
2760 } | 2659 } |
2761 if destinee is not None: | 2660 if destinee is not None: |
2762 targets_jids.add(destinee) | 2661 targets_jids.add(destinee) |
2763 mb_data = await self.ap_item_2_mb_data(requestor_actor_id, item) | 2662 mb_data = await self.ap_item_2_mb_data(requestor_actor_id, item) |
2764 extra = { | 2663 extra = {"origin_id": mb_data["id"]} |
2765 "origin_id": mb_data["id"] | |
2766 } | |
2767 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) | 2664 attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) |
2768 if attachments: | 2665 if attachments: |
2769 extra[C.KEY_ATTACHMENTS] = attachments | 2666 extra[C.KEY_ATTACHMENTS] = attachments |
2770 | 2667 |
2771 defer_l = [] | 2668 defer_l = [] |
2772 for target_jid in targets_jids: | 2669 for target_jid in targets_jids: |
2773 defer_l.append( | 2670 defer_l.append( |
2774 client.sendMessage( | 2671 client.sendMessage( |
2775 target_jid, | 2672 target_jid, |
2776 {'': mb_data.get("content", "")}, | 2673 {"": mb_data.get("content", "")}, |
2777 mb_data.get("title"), | 2674 mb_data.get("title"), |
2778 extra=extra | 2675 extra=extra, |
2779 ) | 2676 ) |
2780 ) | 2677 ) |
2781 await defer.DeferredList(defer_l) | 2678 await defer.DeferredList(defer_l) |
2782 | 2679 |
2783 async def notify_mentions( | 2680 async def notify_mentions( |
2796 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). | 2693 https://www.w3.org/TR/activitystreams-vocabulary/#microsyntaxes). |
2797 | 2694 |
2798 """ | 2695 """ |
2799 targets_urls = {t for t_set in targets.values() for t in t_set} | 2696 targets_urls = {t for t_set in targets.values() for t in t_set} |
2800 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) | 2697 requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) |
2801 anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) | 2698 anchor = uri.build_xmpp_uri( |
2699 "pubsub", path=service.full(), node=node, item=item_id | |
2700 ) | |
2802 seen = set() | 2701 seen = set() |
2803 # we start with explicit mentions because mentions' content will be used in the | 2702 # we start with explicit mentions because mentions' content will be used in the |
2804 # future to fill "begin" and "end" reference attributes (we can't do it at the | 2703 # future to fill "begin" and "end" reference attributes (we can't do it at the |
2805 # moment as there is no way to specify the XML element to use in the blog item). | 2704 # moment as there is no way to specify the XML element to use in the blog item). |
2806 for mention in mentions: | 2705 for mention in mentions: |
2807 mentioned_jid = await self.get_jid_from_id(requestor_actor_id, mention["uri"]) | 2706 mentioned_jid = await self.get_jid_from_id(requestor_actor_id, mention["uri"]) |
2808 self._refs.send_reference( | 2707 self._refs.send_reference(self.client, to_jid=mentioned_jid, anchor=anchor) |
2809 self.client, | |
2810 to_jid=mentioned_jid, | |
2811 anchor=anchor | |
2812 ) | |
2813 seen.add(mentioned_jid) | 2708 seen.add(mentioned_jid) |
2814 | 2709 |
2815 remaining = { | 2710 remaining = { |
2816 await self.get_jid_from_id(requestor_actor_id, t) | 2711 await self.get_jid_from_id(requestor_actor_id, t) |
2817 for t_set in targets.values() | 2712 for t_set in targets.values() |
2818 for t in t_set | 2713 for t in t_set |
2819 } - seen | 2714 } - seen |
2820 for target in remaining: | 2715 for target in remaining: |
2821 self._refs.send_reference( | 2716 self._refs.send_reference(self.client, to_jid=target, anchor=anchor) |
2822 self.client, | |
2823 to_jid=target, | |
2824 anchor=anchor | |
2825 ) | |
2826 | 2717 |
2827 async def handle_pubsub_ap_item( | 2718 async def handle_pubsub_ap_item( |
2828 self, | 2719 self, |
2829 client: SatXMPPEntity, | 2720 client: SatXMPPEntity, |
2830 targets: dict[str, set[str]], | 2721 targets: dict[str, set[str]], |
2831 mentions: list[dict[str, str]], | 2722 mentions: list[dict[str, str]], |
2832 destinee: jid.JID|None, | 2723 destinee: jid.JID | None, |
2833 node: str, | 2724 node: str, |
2834 item: dict, | 2725 item: dict, |
2835 public: bool | 2726 public: bool, |
2836 ) -> None: | 2727 ) -> None: |
2837 """Analyse, cache and deliver AP items translating to Pubsub | 2728 """Analyse, cache and deliver AP items translating to Pubsub |
2838 | 2729 |
2839 @param targets: actors/collections where the item must be delivered | 2730 @param targets: actors/collections where the item must be delivered |
2840 @param destinee: jid of the destinee, | 2731 @param destinee: jid of the destinee, |
2857 return | 2748 return |
2858 | 2749 |
2859 # this item is a reply to an AP item, we use or create a corresponding node | 2750 # this item is a reply to an AP item, we use or create a corresponding node |
2860 # for comments | 2751 # for comments |
2861 parent_node, __ = await self.get_comments_nodes( | 2752 parent_node, __ = await self.get_comments_nodes( |
2862 requestor_actor_id, | 2753 requestor_actor_id, item["id"], in_reply_to |
2863 item["id"], | |
2864 in_reply_to | |
2865 ) | 2754 ) |
2866 node = parent_node or node | 2755 node = parent_node or node |
2867 cached_node = await self.host.memory.storage.get_pubsub_node( | 2756 cached_node = await self.host.memory.storage.get_pubsub_node( |
2868 client, service, node, with_subscriptions=True, create=True, | 2757 client, |
2869 create_kwargs={"subscribed": True} | 2758 service, |
2759 node, | |
2760 with_subscriptions=True, | |
2761 create=True, | |
2762 create_kwargs={"subscribed": True}, | |
2870 ) | 2763 ) |
2871 else: | 2764 else: |
2872 # it is a root item (i.e. not a reply to an other item) | 2765 # it is a root item (i.e. not a reply to an other item) |
2873 create = node == self._events.namespace | 2766 create = node == self._events.namespace |
2874 cached_node = await self.host.memory.storage.get_pubsub_node( | 2767 cached_node = await self.host.memory.storage.get_pubsub_node( |
2876 ) | 2769 ) |
2877 if cached_node is None: | 2770 if cached_node is None: |
2878 log.warning( | 2771 log.warning( |
2879 f"Received item in unknown node {node!r} at {service}. This may be " | 2772 f"Received item in unknown node {node!r} at {service}. This may be " |
2880 f"due to a cache purge. We synchronise the node\n{item}" | 2773 f"due to a cache purge. We synchronise the node\n{item}" |
2881 | |
2882 ) | 2774 ) |
2883 return | 2775 return |
2884 if item.get("type") == TYPE_EVENT: | 2776 if item.get("type") == TYPE_EVENT: |
2885 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt( | 2777 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt( |
2886 requestor_actor_id, | 2778 requestor_actor_id, item |
2887 item | |
2888 ) | 2779 ) |
2889 else: | 2780 else: |
2890 data, item_elt = await self.ap_item_2_mb_data_and_elt( | 2781 data, item_elt = await self.ap_item_2_mb_data_and_elt( |
2891 requestor_actor_id, | 2782 requestor_actor_id, item |
2892 item | |
2893 ) | 2783 ) |
2894 await self.host.memory.storage.cache_pubsub_items( | 2784 await self.host.memory.storage.cache_pubsub_items( |
2895 client, | 2785 client, cached_node, [item_elt], [data] |
2896 cached_node, | |
2897 [item_elt], | |
2898 [data] | |
2899 ) | 2786 ) |
2900 | 2787 |
2901 for subscription in cached_node.subscriptions: | 2788 for subscription in cached_node.subscriptions: |
2902 if subscription.state != SubscriptionState.SUBSCRIBED: | 2789 if subscription.state != SubscriptionState.SUBSCRIBED: |
2903 continue | 2790 continue |
2904 self.pubsub_service.notifyPublish( | 2791 self.pubsub_service.notifyPublish( |
2905 service, | 2792 service, node, [(subscription.subscriber, None, [item_elt])] |
2906 node, | |
2907 [(subscription.subscriber, None, [item_elt])] | |
2908 ) | 2793 ) |
2909 | 2794 |
2910 await self.notify_mentions(targets, mentions, service, node, item_elt["id"]) | 2795 await self.notify_mentions(targets, mentions, service, node, item_elt["id"]) |
2911 | 2796 |
2912 async def new_ap_delete_item( | 2797 async def new_ap_delete_item( |
2938 history = await self.host.memory.storage.get( | 2823 history = await self.host.memory.storage.get( |
2939 client, | 2824 client, |
2940 History, | 2825 History, |
2941 History.origin_id, | 2826 History.origin_id, |
2942 item_id, | 2827 item_id, |
2943 (History.messages, History.subjects) | 2828 (History.messages, History.subjects), |
2944 ) | 2829 ) |
2945 | 2830 |
2946 if history is not None: | 2831 if history is not None: |
2947 # it's a direct message | 2832 # it's a direct message |
2948 if history.source_jid != client.jid: | 2833 if history.source_jid != client.jid: |
2971 item_elt["id"] = item_id | 2856 item_elt["id"] = item_id |
2972 for subscription in cached_node.subscriptions: | 2857 for subscription in cached_node.subscriptions: |
2973 if subscription.state != SubscriptionState.SUBSCRIBED: | 2858 if subscription.state != SubscriptionState.SUBSCRIBED: |
2974 continue | 2859 continue |
2975 self.pubsub_service.notifyRetract( | 2860 self.pubsub_service.notifyRetract( |
2976 client.jid, | 2861 client.jid, node, [(subscription.subscriber, None, [item_elt])] |
2977 node, | 2862 ) |
2978 [(subscription.subscriber, None, [item_elt])] | |
2979 ) |