Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | d366d90a71aa |
children | 0f953ce5f0a8 |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py Wed Jun 19 18:44:57 2024 +0200 @@ -87,7 +87,7 @@ TYPE_REACTION, TYPE_TOMBSTONE, TYPE_JOIN, - TYPE_LEAVE + TYPE_LEAVE, ) from .http_server import HTTPServer from .pubsub_service import APPubsubService @@ -105,9 +105,23 @@ C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ - "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", - "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", - "XEP-0447", "XEP-0471", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY" + "XEP-0050", + "XEP-0054", + "XEP-0060", + "XEP-0084", + "XEP-0106", + "XEP-0277", + "XEP-0292", + "XEP-0329", + "XEP-0372", + "XEP-0424", + "XEP-0465", + "XEP-0470", + "XEP-0447", + "XEP-0471", + "PUBSUB_CACHE", + "TEXT_SYNTAXES", + "IDENTITY", ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", @@ -160,26 +174,19 @@ # we want to be sure that the callbacks are launched before pubsub cache's # one, as we need to inspect items before they are actually removed from cache # or updated - priority=1000 + priority=1000, ) self.pubsub_service = APPubsubService(self) self.ad_hoc = APAdHocService(self) self.ap_events = APEvents(self) host.trigger.add_with_check( - "message_received", - self, - self._message_received_trigger, - priority=-1000 + "message_received", self, self._message_received_trigger, priority=-1000 ) host.trigger.add_with_check( - "XEP-0424_retract_received", - self, - self._on_message_retract + "XEP-0424_retract_received", self, self._on_message_retract ) host.trigger.add_with_check( - "XEP-0372_ref_received", - self, - self._on_reference_received + "XEP-0372_ref_received", self, self._on_reference_received ) host.bridge.add_method( @@ -215,7 +222,7 @@ private_key_pem = self.private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() + encryption_algorithm=serialization.NoEncryption(), ).decode() await self.host.memory.storage.set_private_value( IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile @@ -228,16 +235,14 @@ self.public_key = self.private_key.public_key() self.public_key_pem = self.public_key.public_bytes( encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo + format=serialization.PublicFormat.SubjectPublicKeyInfo, ).decode() # params # URL and port self.public_url = self.host.memory.config_get( CONF_SECTION, "public_url" - ) or self.host.memory.config_get( - CONF_SECTION, "xmpp_domain" - ) + ) or self.host.memory.config_get(CONF_SECTION, "xmpp_domain") if self.public_url is None: log.error( '"public_url" not set in configuration, this is mandatory to have' @@ -247,15 +252,15 @@ return if parse.urlparse(self.public_url).scheme: log.error( - "Scheme must not be specified in \"public_url\", please remove it from " - "\"public_url\" configuration option. ActivityPub Gateway won't be run." + 'Scheme must not be specified in "public_url", please remove it from ' + '"public_url" configuration option. ActivityPub Gateway won\'t be run.' ) return - self.http_port = int(self.host.memory.config_get( - CONF_SECTION, 'http_port', 8123)) + self.http_port = int(self.host.memory.config_get(CONF_SECTION, "http_port", 8123)) connection_type = self.host.memory.config_get( - CONF_SECTION, 'http_connection_type', 'https') - if connection_type not in ('http', 'https'): + CONF_SECTION, "http_connection_type", "https" + ) + if connection_type not in ("http", "https"): raise exceptions.ConfigError( 'bad ap-gateay http_connection_type, you must use one of "http" or ' '"https"' @@ -263,18 +268,17 @@ self.http_sign_get = C.bool( self.host.memory.config_get(CONF_SECTION, "http_sign_get", C.BOOL_TRUE) ) - self.max_items = int(self.host.memory.config_get( - CONF_SECTION, 'new_node_max_items', 50 - - )) - self.comments_max_depth = int(self.host.memory.config_get( - CONF_SECTION, 'comments_max_depth', 0 - )) - self.ap_path = self.host.memory.config_get(CONF_SECTION, 'ap_path', '_ap') + self.max_items = int( + self.host.memory.config_get(CONF_SECTION, "new_node_max_items", 50) + ) + self.comments_max_depth = int( + self.host.memory.config_get(CONF_SECTION, "comments_max_depth", 0) + ) + self.ap_path = self.host.memory.config_get(CONF_SECTION, "ap_path", "_ap") self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") # True (default) if we provide gateway only to entities/services from our server self.local_only = C.bool( - self.host.memory.config_get(CONF_SECTION, 'local_only', C.BOOL_TRUE) + self.host.memory.config_get(CONF_SECTION, "local_only", C.BOOL_TRUE) ) # if True (default), mention will be parsed in non-private content coming from # XMPP. This is necessary as XEP-0372 are coming separately from item where the @@ -285,7 +289,7 @@ ) html_redirect: Dict[str, Union[str, dict]] = self.host.memory.config_get( - CONF_SECTION, 'html_redirect_dict', {} + CONF_SECTION, "html_redirect_dict", {} ) self.html_redirect: Dict[str, List[dict]] = {} for url_type, target in html_redirect.items(): @@ -307,11 +311,10 @@ # HTTP server launch self.server = HTTPServer(self) - if connection_type == 'http': + if connection_type == "http": reactor.listenTCP(self.http_port, self.server) else: - options = tls.get_options_from_config( - self.host.memory.config, CONF_SECTION) + options = tls.get_options_from_config(self.host.memory.config, CONF_SECTION) tls.tls_options_check(options) context_factory = tls.get_tls_context_factory(options) reactor.listenSSL(self.http_port, self.server, context_factory) @@ -320,8 +323,7 @@ self.client = client client.sendHistory = True client._ap_storage = persistent.LazyPersistentBinaryDict( - IMPORT_NAME, - client.profile + IMPORT_NAME, client.profile ) await self.init(client) @@ -329,9 +331,7 @@ self.ad_hoc.init(client) async def _items_received( - self, - client: SatXMPPEntity, - itemsEvent: pubsub.ItemsEvent + self, client: SatXMPPEntity, itemsEvent: pubsub.ItemsEvent ) -> None: """Callback called when pubsub items are received @@ -355,19 +355,23 @@ if self._pa.is_attachment_node(itemsEvent.nodeIdentifier): await self.convert_and_post_attachments( - client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, - itemsEvent.items + client, + ap_account, + itemsEvent.sender, + itemsEvent.nodeIdentifier, + itemsEvent.items, ) else: await self.convert_and_post_items( - client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, - itemsEvent.items + client, + ap_account, + itemsEvent.sender, + itemsEvent.nodeIdentifier, + itemsEvent.items, ) async def get_virtual_client( - self, - requestor_actor_id: str, - actor_id: str + self, requestor_actor_id: str, actor_id: str ) -> SatXMPPEntity: """Get client for this component with a specified jid @@ -413,8 +417,7 @@ return await treq.json_content(resp) except Exception as e: raise error.StanzaError( - "service-unavailable", - text=f"Can't get AP data at {url}: {e}" + "service-unavailable", text=f"Can't get AP data at {url}: {e}" ) async def ap_post(self, url: str, requestor_actor_id: str, doc: dict) -> TReqResponse: @@ -429,11 +432,13 @@ actor_account = actor_args[0] to_log = [ "", - f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}" + f">>> {actor_account} is signing and posting to {url}:\n{pformat(doc)}", ] body = json.dumps(doc).encode() - headers = self._generate_signed_headers(url, requestor_actor_id, method="post", body=body) + headers = self._generate_signed_headers( + url, requestor_actor_id, method="post", body=body + ) headers["Content-Type"] = MEDIA_TYPE_AP if self.verbose: @@ -443,11 +448,7 @@ to_log.append("---") log.info("\n".join(to_log)) - resp = await treq.post( - url, - body, - headers=headers - ) + resp = await treq.post(url, body, headers=headers) if resp.code >= 300: text = await resp.text() log.warning(f"POST request to {url} failed [{resp.code}]: {text}") @@ -456,11 +457,7 @@ return resp def _generate_signed_headers( - self, - url: str, - actor_id: str, - method: str, - body: bytes|None = None + self, url: str, actor_id: str, method: str, body: bytes | None = None ) -> dict[str, str]: """Generate HTTP headers with signature for a given request @@ -474,7 +471,7 @@ headers = { "(request-target)": f"{method} {p_url.path}", "Host": p_url.hostname, - "Date": http.datetimeToString().decode() + "Date": http.datetimeToString().decode(), } if body: @@ -487,16 +484,14 @@ @overload async def ap_get_object( self, requestor_actor_id: str, data: dict, key: str - ) -> dict|None: - ... + ) -> dict | None: ... @overload async def ap_get_object( self, requestor_actor_id: str, data: Union[str, dict], key: None = None - ) -> dict: - ... + ) -> dict: ... - async def ap_get_object(self, requestor_actor_id: str, data, key = None) -> dict|None: + async def ap_get_object(self, requestor_actor_id: str, data, key=None) -> dict | None: """Retrieve an AP object, dereferencing when necessary This method is to be used with attributes marked as "Functional" in @@ -526,10 +521,7 @@ "was expecting a string or a dict, got {type(value)}: {value!r}}" ) - async def ap_get_local_object( - self, - url: str - ) -> dict: + async def ap_get_local_object(self, url: str) -> dict: """Retrieve or generate local object for now, only handle XMPP items to convert to AP @@ -596,11 +588,7 @@ ) async def ap_get_list( - self, - requestor_actor_id: str, - data: dict, - key: str, - only_ids: bool = False + self, requestor_actor_id: str, data: dict, key: str, only_ids: bool = False ) -> Optional[List[Dict[str, Any]]]: """Retrieve a list of objects from AP data, dereferencing when necessary @@ -626,19 +614,12 @@ if not isinstance(value, list): raise ValueError(f"A list was expected, got {type(value)}: {value!r}") if only_ids: - return [ - {"id": v["id"]} if isinstance(v, dict) else {"id": v} - for v in value - ] + return [{"id": v["id"]} if isinstance(v, dict) else {"id": v} for v in value] else: return [await self.ap_get_object(requestor_actor_id, i) for i in value] async def ap_get_actors( - self, - requestor_actor_id: str, - data: dict, - key: str, - as_account: bool = True + self, requestor_actor_id: str, data: dict, key: str, as_account: bool = True ) -> List[str]: """Retrieve AP actors from data @@ -671,9 +652,7 @@ f"invalid actors list to object {data.get('id')!r}: {value!r}" ) if not value: - raise exceptions.DataError( - f"list of actors is empty" - ) + raise exceptions.DataError(f"list of actors is empty") if as_account: return [ await self.get_ap_account_from_id(requestor_actor_id, actor_id) @@ -697,12 +676,16 @@ @raise exceptions.NotFound: no actor has been found in data """ try: - actors = await self.ap_get_actors(requestor_actor_id, data, "actor", as_account=False) + actors = await self.ap_get_actors( + requestor_actor_id, data, "actor", as_account=False + ) except exceptions.DataError: actors = None if not actors: try: - actors = await self.ap_get_actors(requestor_actor_id, data, "attributedTo", as_account=False) + actors = await self.ap_get_actors( + requestor_actor_id, data, "attributedTo", as_account=False + ) except exceptions.DataError: raise exceptions.NotFound( 'actor not specified in "actor" or "attributedTo"' @@ -715,9 +698,7 @@ def must_encode(self, text: str) -> bool: """Indicate if a text must be period encoded""" return ( - not RE_ALLOWED_UNQUOTED.match(text) - or text.startswith("___") - or "---" in text + not RE_ALLOWED_UNQUOTED.match(text) or text.startswith("___") or "---" in text ) def period_encode(self, text: str) -> str: @@ -735,9 +716,7 @@ ) async def get_ap_account_from_jid_and_node( - self, - jid_: jid.JID, - node: Optional[str] + self, jid_: jid.JID, node: Optional[str] ) -> str: """Construct AP account from JID and node @@ -778,26 +757,26 @@ if node: account_elts.extend((node, "---")) - account_elts.extend(( - user, "@", jid_.host if is_local else self.client.jid.userhost() - )) + account_elts.extend( + (user, "@", jid_.host if is_local else self.client.jid.userhost()) + ) return "".join(account_elts) def is_local(self, jid_: jid.JID) -> bool: """Returns True if jid_ use a domain or subdomain of gateway's host""" local_host = self.client.host.split(".") assert local_host - return jid_.host.split(".")[-len(local_host):] == local_host + return jid_.host.split(".")[-len(local_host) :] == local_host async def is_pubsub(self, jid_: jid.JID) -> bool: """Indicate if a JID is a Pubsub service""" host_disco = await self.host.get_disco_infos(self.client, jid_) - return ( - ("pubsub", "service") in host_disco.identities - and not ("pubsub", "pep") in host_disco.identities - ) + return ("pubsub", "service") in host_disco.identities and not ( + "pubsub", + "pep", + ) in host_disco.identities - async def get_jid_and_node(self, ap_account: str) -> tuple[jid.JID, str|None]: + async def get_jid_and_node(self, ap_account: str) -> tuple[jid.JID, str | None]: """Decode raw AP account handle to get XMPP JID and Pubsub Node Username are case insensitive. @@ -856,13 +835,11 @@ if encoded: username = parse.unquote( - RE_PERIOD_ENC.sub(r"%\g<hex>", username), - errors="strict" + RE_PERIOD_ENC.sub(r"%\g<hex>", username), errors="strict" ) if node: node = parse.unquote( - RE_PERIOD_ENC.sub(r"%\g<hex>", node), - errors="strict" + RE_PERIOD_ENC.sub(r"%\g<hex>", node), errors="strict" ) if "@" in username: @@ -899,14 +876,7 @@ The local jid is computer by escaping AP actor handle and using it as local part of JID, where domain part is this gateway own JID """ - return jid.JID( - None, - ( - self._e.escape(account), - self.client.jid.host, - None - ) - ) + return jid.JID(None, (self._e.escape(account), self.client.jid.host, None)) async def get_jid_from_id(self, requestor_actor_id: str, actor_id: str) -> jid.JID: """Compute JID linking to an AP Actor ID @@ -937,10 +907,10 @@ @return: endpoint type and extra arguments """ path = parse.urlparse(url).path.lstrip("/") - type_, *extra_args = path[len(self.ap_path):].lstrip("/").split("/") + type_, *extra_args = path[len(self.ap_path) :].lstrip("/").split("/") return type_, [parse.unquote(a) for a in extra_args] - def build_apurl(self, type_:str , *args: str) -> str: + def build_apurl(self, type_: str, *args: str) -> str: """Build an AP endpoint URL @param type_: type of AP endpoing @@ -948,7 +918,7 @@ """ return parse.urljoin( self.base_ap_url, - str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))) + str(Path(type_).joinpath(*(parse.quote_plus(a, safe="@") for a in args))), ) def is_local_url(self, url: str) -> bool: @@ -994,9 +964,7 @@ @async_lru(maxsize=LRU_MAX_SIZE) async def get_actor_pub_key_data( - self, - requestor_actor_id: str, - actor_id: str + self, requestor_actor_id: str, actor_id: str ) -> Tuple[str, str, rsa.RSAPublicKey]: """Retrieve Public Key data from actor ID @@ -1067,7 +1035,7 @@ requestor_actor_id: str, signature: str, key_id: str, - headers: Dict[str, str] + headers: Dict[str, str], ) -> str: """Verify that signature matches given headers @@ -1081,7 +1049,7 @@ @raise InvalidSignature: signature doesn't match headers """ - to_sign = "\n".join(f"{k.lower()}: {v}" for k,v in headers.items()) + to_sign = "\n".join(f"{k.lower()}: {v}" for k, v in headers.items()) if key_id.startswith("acct:"): actor = key_id[5:] actor_id = await self.get_ap_actor_id_from_account(actor) @@ -1089,8 +1057,7 @@ actor_id = key_id.split("#", 1)[0] pub_key_id, pub_key_owner, pub_key = await self.get_actor_pub_key_data( - requestor_actor_id, - actor_id + requestor_actor_id, actor_id ) if pub_key_id != key_id or pub_key_owner != actor_id: raise exceptions.EncryptionError("Public Key mismatch") @@ -1101,7 +1068,7 @@ to_sign.encode(), # we have to use PKCS1v15 padding to be compatible with Mastodon padding.PKCS1v15(), # type: ignore - hashes.SHA256() # type: ignore + hashes.SHA256(), # type: ignore ) except InvalidSignature: raise exceptions.EncryptionError( @@ -1111,9 +1078,7 @@ return actor_id def get_signature_data( - self, - key_id: str, - headers: Dict[str, str] + self, key_id: str, headers: Dict[str, str] ) -> Tuple[Dict[str, str], Dict[str, str]]: """Generate and return signature and corresponding headers @@ -1130,20 +1095,22 @@ """ # headers must be lower case l_headers: Dict[str, str] = {k.lower(): v for k, v in headers.items()} - to_sign = "\n".join(f"{k}: {v}" for k,v in l_headers.items()) - signature = base64.b64encode(self.private_key.sign( - to_sign.encode(), - # we have to use PKCS1v15 padding to be compatible with Mastodon - padding.PKCS1v15(), # type: ignore - hashes.SHA256() # type: ignore - )).decode() + to_sign = "\n".join(f"{k}: {v}" for k, v in l_headers.items()) + signature = base64.b64encode( + self.private_key.sign( + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), # type: ignore + hashes.SHA256(), # type: ignore + ) + ).decode() sign_data = { "keyId": key_id, "Algorithm": "rsa-sha256", "headers": " ".join(l_headers.keys()), - "signature": signature + "signature": signature, } - new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} + new_headers = {k: v for k, v in headers.items() if not k.startswith("(")} new_headers["Signature"] = self.build_signature_header(sign_data) return new_headers, sign_data @@ -1167,18 +1134,19 @@ """ actor_id = await self.get_ap_actor_id_from_account(ap_account) requestor_actor_id = self.build_apurl( - TYPE_ACTOR, - await self.get_ap_account_from_jid_and_node(service, node) + TYPE_ACTOR, await self.get_ap_account_from_jid_and_node(service, node) ) inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) for item in items: if item.name == "item": - cached_item = await self.host.memory.storage.search_pubsub_items({ - "profiles": [self.client.profile], - "services": [service], - "nodes": [node], - "names": [item["id"]] - }) + cached_item = await self.host.memory.storage.search_pubsub_items( + { + "profiles": [self.client.profile], + "services": [service], + "nodes": [node], + "names": [item["id"]], + } + ) is_new = not bool(cached_item) if node.startswith(self._events.namespace): # event item @@ -1259,7 +1227,7 @@ service: jid.JID, node: str, items: List[domish.Element], - publisher: Optional[jid.JID] = None + publisher: Optional[jid.JID] = None, ) -> None: """Convert XMPP item attachments to AP activities and post them to actor inbox @@ -1284,8 +1252,7 @@ actor_id = await self.get_ap_actor_id_from_account(ap_account) requestor_actor_id = self.build_apurl( - TYPE_ACTOR, - await self.get_ap_account_from_jid_and_node(service, node) + TYPE_ACTOR, await self.get_ap_account_from_jid_and_node(service, node) ) inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id) @@ -1327,12 +1294,14 @@ else: item_url = self.build_apurl(TYPE_ITEM, item_account, item_id) - old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items({ - "profiles": [self.client.profile], - "services": [service], - "nodes": [node], - "names": [item_elt["id"]] - }) + old_attachment_pubsub_items = await self.host.memory.storage.search_pubsub_items( + { + "profiles": [self.client.profile], + "services": [service], + "nodes": [node], + "names": [item_elt["id"]], + } + ) if not old_attachment_pubsub_items: old_attachment = {} else: @@ -1345,10 +1314,7 @@ except IndexError: # no known element was present in attachments old_attachment = {} - publisher_account = await self.get_ap_account_from_jid_and_node( - publisher, - None - ) + publisher_account = await self.get_ap_account_from_jid_and_node(publisher, None) publisher_actor_id = self.build_apurl(TYPE_ACTOR, publisher_account) try: attachments = self._pa.items_2_attachment_data(client, [item_elt])[0] @@ -1390,8 +1356,7 @@ "reaction", item_account, item_id, reaction.encode().hex() ) reaction_activity = self.create_activity( - TYPE_REACTION, publisher_actor_id, item_url, - activity_id=activity_id + TYPE_REACTION, publisher_actor_id, item_url, activity_id=activity_id ) reaction_activity["content"] = reaction reaction_activity["to"] = [ap_account] @@ -1410,7 +1375,9 @@ old_attending = old_attachment.get("rsvp", {}).get("attending", "no") if attending != old_attending: activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE - activity_id = self.build_apurl(activity_type.lower(), item_account, item_id) + activity_id = self.build_apurl( + activity_type.lower(), item_account, item_id + ) activity = self.create_activity( activity_type, publisher_actor_id, item_url, activity_id=activity_id ) @@ -1421,7 +1388,9 @@ if "rsvp" in old_attachment: old_attending = old_attachment.get("rsvp", {}).get("attending", "no") if old_attending == "yes": - activity_id = self.build_apurl(TYPE_LEAVE.lower(), item_account, item_id) + activity_id = self.build_apurl( + TYPE_LEAVE.lower(), item_account, item_id + ) activity = self.create_activity( TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id ) @@ -1436,14 +1405,11 @@ client, service, node, with_subscriptions=True, create=True ) await self.host.memory.storage.cache_pubsub_items( - self.client, - cached_node, - [item_elt], - [attachments] + self.client, cached_node, [item_elt], [attachments] ) def _publish_message(self, mess_data_s: str, service_s: str, profile: str): - mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore + mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore service = jid.JID(service_s) client = self.host.get_client(profile) return defer.ensureDeferred(self.publish_message(client, mess_data, service)) @@ -1463,10 +1429,12 @@ raise ValueError(f"Invalid account: {account!r}") host = account.split("@")[1] try: - finger_data = await treq.json_content(await treq.get( - f"https://{host}/.well-known/webfinger?" - f"resource=acct:{parse.quote_plus(account)}", - )) + finger_data = await treq.json_content( + await treq.get( + f"https://{host}/.well-known/webfinger?" + f"resource=acct:{parse.quote_plus(account)}", + ) + ) except Exception as e: raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") for link in finger_data.get("links", []): @@ -1481,15 +1449,11 @@ ) break else: - raise ValueError( - f"No ActivityPub link found for {account!r}" - ) + raise ValueError(f"No ActivityPub link found for {account!r}") return href async def get_ap_actor_data_from_account( - self, - requestor_actor_id: str, - account: str + self, requestor_actor_id: str, account: str ) -> dict: """Retrieve ActivityPub Actor data @@ -1499,10 +1463,7 @@ return await self.ap_get(href, requestor_actor_id) async def get_ap_inbox_from_id( - self, - requestor_actor_id: str, - actor_id: str, - use_shared: bool = True + self, requestor_actor_id: str, actor_id: str, use_shared: bool = True ) -> str: """Retrieve inbox of an actor_id @@ -1531,16 +1492,12 @@ if self.is_local_url(actor_id): url_type, url_args = self.parse_apurl(actor_id) if url_type != "actor" or not url_args: - raise exceptions.DataError( - f"invalid local actor ID: {actor_id}" - ) + raise exceptions.DataError(f"invalid local actor ID: {actor_id}") account = url_args[0] try: - account_user, account_host = account.split('@') + account_user, account_host = account.split("@") except ValueError: - raise exceptions.DataError( - f"invalid account from url: {actor_id}" - ) + raise exceptions.DataError(f"invalid account from url: {actor_id}") if not account_user or account_host != self.public_url: raise exceptions.DataError( f"{account!r} is not a valid local account (from {actor_id})" @@ -1636,7 +1593,7 @@ raise error.StanzaError( "feature-not-implemented", text="Maximum limit for previous_index has been reached, this limit" - "is set to avoid DoS" + "is set to avoid DoS", ) else: # we'll convert "start_index" to "after_id", thus we need the item just @@ -1665,7 +1622,7 @@ raise error.StanzaError( "service-unavailable", "Error while retrieving previous page from AP service at " - f"{current_page}" + f"{current_page}", ) init_page = "last" if chronological_pagination else "first" @@ -1698,10 +1655,11 @@ found_after_id = True if chronological_pagination: start_index = retrieved_items - len(page_items) + limit_idx + 1 - page_items = page_items[limit_idx+1:] + page_items = page_items[limit_idx + 1 :] else: - start_index = count - (retrieved_items - len(page_items) + - limit_idx + 1) + start_index = count - ( + retrieved_items - len(page_items) + limit_idx + 1 + ) page_items = page_items[:limit_idx] items.extend(page_items) else: @@ -1730,14 +1688,13 @@ rsm_resp["index"] = 0 else: rsm_resp["index"] = count - len(items) - rsm_resp.update({ - "first": items[0]["id"], - "last": items[-1]["id"] - }) + rsm_resp.update({"first": items[0]["id"], "last": items[-1]["id"]}) return items, rsm.RSMResponse(**rsm_resp) - async def ap_item_2_mb_data_and_elt(self, requestor_actor_id: str, ap_item: dict) -> tuple[dict, domish.Element]: + async def ap_item_2_mb_data_and_elt( + self, requestor_actor_id: str, ap_item: dict + ) -> tuple[dict, domish.Element]: """Convert AP item to parsed microblog data and corresponding item element @param requestor_actor_id: ID of the actor requesting the conversion. @@ -1754,7 +1711,9 @@ item_elt["publisher"] = mb_data["author_jid"] return mb_data, item_elt - async def ap_item_2_mb_elt(self, requestor_actor_id: str, ap_item: dict) -> domish.Element: + async def ap_item_2_mb_elt( + self, requestor_actor_id: str, ap_item: dict + ) -> domish.Element: """Convert AP item to XMPP item element @param requestor_actor_id: ID of the actor requesting the conversion. @@ -1769,7 +1728,7 @@ requestor_actor_id: str, page: Union[str, dict], parser: Callable[[str, dict], Awaitable[domish.Element]], - only_ids: bool = False + only_ids: bool = False, ) -> Tuple[dict, List[domish.Element]]: """Convert AP objects from an AP page to XMPP items @@ -1781,18 +1740,20 @@ """ page_data = await self.ap_get_object(requestor_actor_id, page) if page_data is None: - log.warning('No data found in collection') + log.warning("No data found in collection") return {}, [] - ap_items = await self.ap_get_list(requestor_actor_id, page_data, "orderedItems", only_ids=only_ids) + ap_items = await self.ap_get_list( + requestor_actor_id, page_data, "orderedItems", only_ids=only_ids + ) if ap_items is None: - ap_items = await self.ap_get_list(requestor_actor_id, page_data, "items", only_ids=only_ids) + ap_items = await self.ap_get_list( + requestor_actor_id, page_data, "items", only_ids=only_ids + ) if not ap_items: - log.warning(f'No item field found in collection: {page_data!r}') + log.warning(f"No item field found in collection: {page_data!r}") return page_data, [] else: - log.warning( - "Items are not ordered, this is not spec compliant" - ) + log.warning("Items are not ordered, this is not spec compliant") items = [] # AP Collections are in antichronological order, but we expect chronological in # Pubsub, thus we reverse it @@ -1805,10 +1766,7 @@ return page_data, items async def get_comments_nodes( - self, - requestor_actor_id: str, - item_id: str, - parent_id: Optional[str] + self, requestor_actor_id: str, item_id: str, parent_id: Optional[str] ) -> Tuple[Optional[str], Optional[str]]: """Get node where this item is and node to use for comments @@ -1827,7 +1785,7 @@ if parent_id is None or not self.comments_max_depth: return ( self._m.get_comments_node(parent_id) if parent_id is not None else None, - self._m.get_comments_node(item_id) + self._m.get_comments_node(item_id), ) parent_url = parent_id parents = [] @@ -1837,18 +1795,15 @@ parent_url = parent_item.get("inReplyTo") if parent_url is None: break - parent_limit = self.comments_max_depth-1 + parent_limit = self.comments_max_depth - 1 if len(parents) <= parent_limit: return ( self._m.get_comments_node(parents[-1]["id"]), - self._m.get_comments_node(item_id) + self._m.get_comments_node(item_id), ) else: last_level_item = parents[parent_limit] - return ( - self._m.get_comments_node(last_level_item["id"]), - None - ) + return (self._m.get_comments_node(last_level_item["id"]), None) async def ap_item_2_mb_data(self, requestor_actor_id: str, ap_item: dict) -> dict: """Convert AP activity or object to microblog data @@ -1925,7 +1880,9 @@ if is_activity: authors = await self.ap_get_actors(requestor_actor_id, ap_item, "actor") else: - authors = await self.ap_get_actors(requestor_actor_id, ap_object, "attributedTo") + authors = await self.ap_get_actors( + requestor_actor_id, ap_object, "attributedTo" + ) if len(authors) > 1: # we only keep first item as author # TODO: handle multiple actors @@ -1963,22 +1920,14 @@ comments_data = { "service": author_jid, "node": comments_node, - "uri": uri.build_xmpp_uri( - "pubsub", - path=author_jid, - node=comments_node - ) + "uri": uri.build_xmpp_uri("pubsub", path=author_jid, node=comments_node), } mb_data["comments"] = [comments_data] return mb_data async def get_reply_to_id_from_xmpp_node( - self, - client: SatXMPPEntity, - ap_account: str, - parent_item: str, - mb_data: dict + self, client: SatXMPPEntity, ap_account: str, parent_item: str, mb_data: dict ) -> str: """Get URL to use for ``inReplyTo`` field in AP item. @@ -1995,18 +1944,16 @@ """ # FIXME: propose a protoXEP to properly get parent item, node and service - found_items = await self.host.memory.storage.search_pubsub_items({ - "profiles": [client.profile], - "names": [parent_item] - }) + found_items = await self.host.memory.storage.search_pubsub_items( + {"profiles": [client.profile], "names": [parent_item]} + ) if not found_items: log.warning(f"parent item {parent_item!r} not found in cache") parent_ap_account = ap_account elif len(found_items) == 1: cached_node = found_items[0].node parent_ap_account = await self.get_ap_account_from_jid_and_node( - cached_node.service, - cached_node.name + cached_node.service, cached_node.name ) else: # we found several cached item with given ID, we check if there is one @@ -2014,32 +1961,25 @@ try: author = jid.JID(mb_data["author_jid"]).userhostJID() cached_item = next( - i for i in found_items - if jid.JID(i.data["publisher"]).userhostJID() - == author + i + for i in found_items + if jid.JID(i.data["publisher"]).userhostJID() == author ) except StopIteration: # no item corresponding to this author, we use ap_account log.warning( - "Can't find a single cached item for parent item " - f"{parent_item!r}" + "Can't find a single cached item for parent item " f"{parent_item!r}" ) parent_ap_account = ap_account else: cached_node = cached_item.node parent_ap_account = await self.get_ap_account_from_jid_and_node( - cached_node.service, - cached_node.name + cached_node.service, cached_node.name ) - return self.build_apurl( - TYPE_ITEM, parent_ap_account, parent_item - ) + return self.build_apurl(TYPE_ITEM, parent_ap_account, parent_item) - async def repeated_mb_2_ap_item( - self, - mb_data: dict - ) -> dict: + async def repeated_mb_2_ap_item(self, mb_data: dict) -> dict: """Convert repeated blog item to suitable AP Announce activity @param mb_data: microblog metadata of an item repeating an other blog post @@ -2047,10 +1987,7 @@ """ repeated = mb_data["extra"]["repeated"] repeater = jid.JID(repeated["by"]) - repeater_account = await self.get_ap_account_from_jid_and_node( - repeater, - None - ) + repeater_account = await self.get_ap_account_from_jid_and_node(repeater, None) repeater_id = self.build_apurl(TYPE_ACTOR, repeater_account) repeated_uri = repeated["uri"] @@ -2095,7 +2032,7 @@ announce["to"] = [NS_AP_PUBLIC] announce["cc"] = [ self.build_apurl(TYPE_FOLLOWERS, repeater_account), - await self.get_ap_actor_id_from_account(repeated_account) + await self.get_ap_actor_id_from_account(repeated_account), ] return announce @@ -2103,7 +2040,7 @@ self, client: SatXMPPEntity, mb_data: dict, - public: bool =True, + public: bool = True, is_new: bool = True, ) -> dict: """Convert Libervia Microblog Data to ActivityPub item @@ -2131,8 +2068,7 @@ if not mb_data.get("author_jid"): mb_data["author_jid"] = client.jid.userhost() ap_account = await self.get_ap_account_from_jid_and_node( - jid.JID(mb_data["author_jid"]), - None + jid.JID(mb_data["author_jid"]), None ) url_actor = self.build_apurl(TYPE_ACTOR, ap_account) url_item = self.build_apurl(TYPE_ITEM, ap_account, mb_data["id"]) @@ -2153,17 +2089,11 @@ ap_attachments = ap_object["attachment"] = [] for attachment in attachments: try: - url = next( - s['url'] for s in attachment["sources"] if 'url' in s - ) + url = next(s["url"] for s in attachment["sources"] if "url" in s) except (StopIteration, KeyError): - log.warning( - f"Ignoring attachment without URL: {attachment}" - ) + log.warning(f"Ignoring attachment without URL: {attachment}") continue - ap_attachment = { - "url": url - } + ap_attachment = {"url": url} for key, ap_key in ( ("media_type", "mediaType"), # XXX: yes "name", cf. [ap_item_2_mb_data] @@ -2191,19 +2121,19 @@ log.warning(f"Can't add mention to {mentioned!r}: {e}") else: ap_object["to"].append(mentioned_id) - ap_object.setdefault("tag", []).append({ - "type": TYPE_MENTION, - "href": mentioned_id, - "name": mention, - }) + ap_object.setdefault("tag", []).append( + { + "type": TYPE_MENTION, + "href": mentioned_id, + "name": mention, + } + ) try: node = mb_data["node"] service = jid.JID(mb_data["service"]) except KeyError: # node and service must always be specified when this method is used - raise exceptions.InternalError( - "node or service is missing in mb_data" - ) + raise exceptions.InternalError("node or service is missing in mb_data") try: target_ap_account = await self.get_ap_account_from_jid_and_node( service, node @@ -2226,8 +2156,7 @@ if self.is_virtual_jid(service): # service is a proxy JID for AP account actor_data = await self.get_ap_actor_data_from_account( - url_actor, - target_ap_account + url_actor, target_ap_account ) followers = actor_data.get("followers") else: @@ -2244,10 +2173,7 @@ else: # the publication is from a followed real XMPP node ap_object["inReplyTo"] = await self.get_reply_to_id_from_xmpp_node( - client, - ap_account, - parent_item, - mb_data + client, ap_account, parent_item, mb_data ) return self.create_activity( @@ -2255,10 +2181,7 @@ ) async def publish_message( - self, - client: SatXMPPEntity, - mess_data: dict, - service: jid.JID + self, client: SatXMPPEntity, mess_data: dict, service: jid.JID ) -> None: """Send an AP message @@ -2292,11 +2215,7 @@ await self.ap_post(inbox_url, url_actor, item_data) async def ap_delete_item( - self, - jid_: jid.JID, - node: Optional[str], - item_id: str, - public: bool = True + self, jid_: jid.JID, node: Optional[str], item_id: str, public: bool = True ) -> Tuple[str, Dict[str, Any]]: """Build activity to delete an AP item @@ -2314,11 +2233,9 @@ author_account = await self.get_ap_account_from_jid_and_node(jid_, node) author_actor_id = self.build_apurl(TYPE_ACTOR, author_account) - items = await self.host.memory.storage.search_pubsub_items({ - "profiles": [self.client.profile], - "services": [jid_], - "names": [item_id] - }) + items = await self.host.memory.storage.search_pubsub_items( + {"profiles": [self.client.profile], "services": [jid_], "names": [item_id]} + ) if not items: log.warning( f"Deleting an unknown item at service {jid_}, node {node} and id " @@ -2326,7 +2243,9 @@ ) else: try: - mb_data = await self._m.item_2_mb_data(self.client, items[0].data, jid_, node) + mb_data = await self._m.item_2_mb_data( + self.client, items[0].data, jid_, node + ) if "repeated" in mb_data["extra"]: # we are deleting a repeated item, we must translate this to an # "Undo" of the "Announce" activity instead of a "Delete" one @@ -2341,12 +2260,7 @@ url_item = self.build_apurl(TYPE_ITEM, author_account, item_id) ap_item = self.create_activity( - "Delete", - author_actor_id, - { - "id": url_item, - "type": TYPE_TOMBSTONE - } + "Delete", author_actor_id, {"id": url_item, "type": TYPE_TOMBSTONE} ) if public: ap_item["to"] = [NS_AP_PUBLIC] @@ -2356,7 +2270,7 @@ self, client: SatXMPPEntity, message_elt: domish.Element, - post_treat: defer.Deferred + post_treat: defer.Deferred, ) -> bool: """add the gateway workflow on post treatment""" if self.client is None: @@ -2381,9 +2295,7 @@ log.warning(f"ignoring non local message: {mess_data}") return mess_data if not mess_data["to"].user: - log.warning( - f"ignoring message addressed to gateway itself: {mess_data}" - ) + log.warning(f"ignoring message addressed to gateway itself: {mess_data}") return mess_data requestor_actor_id = self.build_apurl(TYPE_ACTOR, mess_data["from"].userhost()) @@ -2391,9 +2303,7 @@ try: actor_id = await self.get_ap_actor_id_from_account(actor_account) except Exception as e: - log.warning( - f"Can't retrieve data on actor {actor_account}: {e}" - ) + log.warning(f"Can't retrieve data on actor {actor_account}: {e}") # TODO: send an error <message> return mess_data inbox = await self.get_ap_inbox_from_id( @@ -2417,9 +2327,7 @@ mb_data["id"] = origin_id attachments = mess_data["extra"].get(C.KEY_ATTACHMENTS) if attachments: - mb_data["extra"] = { - C.KEY_ATTACHMENTS: attachments - } + mb_data["extra"] = {C.KEY_ATTACHMENTS: attachments} client = self.client.get_virtual_client(mess_data["from"]) ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) @@ -2427,19 +2335,19 @@ ap_object["to"] = ap_item["to"] = [actor_id] # we add a mention to direct message, otherwise peer is not notified in some AP # implementations (notably Mastodon), and the message may be missed easily. - ap_object.setdefault("tag", []).append({ - "type": TYPE_MENTION, - "href": actor_id, - "name": f"@{actor_account}", - }) + ap_object.setdefault("tag", []).append( + { + "type": TYPE_MENTION, + "href": actor_id, + "name": f"@{actor_account}", + } + ) try: await self.ap_post(inbox, ap_item["actor"], ap_item) except Exception as e: # TODO: send an error <message> - log.warning( - f"Can't send message to {inbox}: {e}" - ) + log.warning(f"Can't send message to {inbox}: {e}") return mess_data async def _on_message_retract( @@ -2447,29 +2355,24 @@ client: SatXMPPEntity, message_elt: domish.Element, retract_elt: domish.Element, - history: History + history: History, ) -> bool: if client != self.client: return True from_jid = jid.JID(message_elt["from"]) if not self.is_local(from_jid): - log.debug( - f"ignoring retract request from non local jid {from_jid}" - ) + log.debug(f"ignoring retract request from non local jid {from_jid}") return False - requestor_actor_id = self.build_apurl( - TYPE_ACTOR, - from_jid.userhost() - ) + requestor_actor_id = self.build_apurl(TYPE_ACTOR, from_jid.userhost()) to_jid = jid.JID(message_elt["to"]) - if (to_jid.host != self.client.jid.full() or not to_jid.user): + if to_jid.host != self.client.jid.full() or not to_jid.user: # to_jid should be a virtual JID from this gateway - raise exceptions.InternalError( - f"Invalid destinee's JID: {to_jid.full()}" - ) + raise exceptions.InternalError(f"Invalid destinee's JID: {to_jid.full()}") ap_account = self._e.unescape(to_jid.user) actor_id = await self.get_ap_actor_id_from_account(ap_account) - inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False) + inbox = await self.get_ap_inbox_from_id( + requestor_actor_id, actor_id, use_shared=False + ) url_actor, ap_item = await self.ap_delete_item( from_jid.userhostJID(), None, retract_elt["id"], public=False ) @@ -2480,7 +2383,7 @@ self, client: SatXMPPEntity, message_elt: domish.Element, - reference_data: Dict[str, Union[str, int]] + reference_data: Dict[str, Union[str, int]], ) -> bool: parsed_uri: dict = reference_data.get("parsed_uri") if not parsed_uri: @@ -2553,14 +2456,18 @@ ap_item = await self.mb_data_2_ap_item(client, mb_data) ap_object = ap_item["object"] ap_object["to"] = [actor_id] - ap_object.setdefault("tag", []).append({ - "type": TYPE_MENTION, - "href": actor_id, - "name": ap_account, - }) + ap_object.setdefault("tag", []).append( + { + "type": TYPE_MENTION, + "href": actor_id, + "name": ap_account, + } + ) requestor_actor_id = ap_item["actor"] - inbox = await self.get_ap_inbox_from_id(requestor_actor_id, actor_id, use_shared=False) + inbox = await self.get_ap_inbox_from_id( + requestor_actor_id, actor_id, use_shared=False + ) await self.ap_post(inbox, requestor_actor_id, ap_item) @@ -2583,7 +2490,7 @@ ) return try: - parent_item_account, parent_item_id = url_args[0], '/'.join(url_args[1:]) + parent_item_account, parent_item_id = url_args[0], "/".join(url_args[1:]) except (IndexError, ValueError): log.warning( "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " @@ -2603,7 +2510,8 @@ except IndexError: log.warning( f"Can't find parent item at {parent_item_service} (node " - f"{parent_item_node!r})\n{pformat(ap_item)}") + f"{parent_item_node!r})\n{pformat(ap_item)}" + ) return parent_item_parsed = await self._m.item_2_mb_data( client, parent_item_elt, parent_item_service, parent_item_node @@ -2614,16 +2522,18 @@ except (KeyError, IndexError): # we don't have a comment node set for this item from libervia.backend.tools.xml_tools import pp_elt + log.info(f"{pp_elt(parent_item_elt.toXml())}") raise NotImplementedError() else: requestor_actor_id = self.build_apurl( TYPE_ACTOR, - await self.get_ap_account_from_jid_and_node(comment_service, comment_node) + await self.get_ap_account_from_jid_and_node( + comment_service, comment_node + ), ) __, item_elt = await self.ap_item_2_mb_data_and_elt( - requestor_actor_id, - ap_item + requestor_actor_id, ap_item ) await self._p.publish(client, comment_service, comment_node, [item_elt]) await self.notify_mentions( @@ -2631,8 +2541,7 @@ ) def get_ap_item_targets( - self, - item: Dict[str, Any] + self, item: Dict[str, Any] ) -> Tuple[bool, Dict[str, Set[str]], List[Dict[str, str]]]: """Retrieve targets of an AP item, and indicate if it's a public one @@ -2704,18 +2613,13 @@ is_public, targets, mentions = self.get_ap_item_targets(item) if not is_public and targets.keys() == {TYPE_ACTOR}: # this is a direct message - await self.handle_message_ap_item( - client, targets, mentions, destinee, item - ) + await self.handle_message_ap_item(client, targets, mentions, destinee, item) else: await self.handle_pubsub_ap_item( client, targets, mentions, destinee, node, item, is_public ) - def get_requestor_actor_id_from_targets( - self, - targets: set[str] - ) -> str: + def get_requestor_actor_id_from_targets(self, targets: set[str]) -> str: """Find local actor to use as requestor_actor_id from request targets. A local actor must be used to sign HTTP request, notably HTTP GET request for AP @@ -2731,19 +2635,15 @@ try: return next(t for t in targets if self.is_local_url(t)) except StopIteration: - log.warning( - f"Can't find local target to use as requestor ID: {targets!r}" - ) - return self.build_apurl( - TYPE_ACTOR, f"libervia@{self.public_url}" - ) + log.warning(f"Can't find local target to use as requestor ID: {targets!r}") + return self.build_apurl(TYPE_ACTOR, f"libervia@{self.public_url}") async def handle_message_ap_item( self, client: SatXMPPEntity, targets: dict[str, Set[str]], mentions: list[Dict[str, str]], - destinee: jid.JID|None, + destinee: jid.JID | None, item: dict, ) -> None: """Parse and deliver direct AP items translating to XMPP messages @@ -2755,15 +2655,12 @@ targets_urls = {t for t_set in targets.values() for t in t_set} requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) targets_jids = { - await self.get_jid_from_id(requestor_actor_id, url) - for url in targets_urls + await self.get_jid_from_id(requestor_actor_id, url) for url in targets_urls } if destinee is not None: targets_jids.add(destinee) mb_data = await self.ap_item_2_mb_data(requestor_actor_id, item) - extra = { - "origin_id": mb_data["id"] - } + extra = {"origin_id": mb_data["id"]} attachments = mb_data["extra"].get(C.KEY_ATTACHMENTS) if attachments: extra[C.KEY_ATTACHMENTS] = attachments @@ -2773,9 +2670,9 @@ defer_l.append( client.sendMessage( target_jid, - {'': mb_data.get("content", "")}, + {"": mb_data.get("content", "")}, mb_data.get("title"), - extra=extra + extra=extra, ) ) await defer.DeferredList(defer_l) @@ -2798,18 +2695,16 @@ """ targets_urls = {t for t_set in targets.values() for t in t_set} requestor_actor_id = self.get_requestor_actor_id_from_targets(targets_urls) - anchor = uri.build_xmpp_uri("pubsub", path=service.full(), node=node, item=item_id) + anchor = uri.build_xmpp_uri( + "pubsub", path=service.full(), node=node, item=item_id + ) seen = set() # we start with explicit mentions because mentions' content will be used in the # future to fill "begin" and "end" reference attributes (we can't do it at the # moment as there is no way to specify the XML element to use in the blog item). for mention in mentions: mentioned_jid = await self.get_jid_from_id(requestor_actor_id, mention["uri"]) - self._refs.send_reference( - self.client, - to_jid=mentioned_jid, - anchor=anchor - ) + self._refs.send_reference(self.client, to_jid=mentioned_jid, anchor=anchor) seen.add(mentioned_jid) remaining = { @@ -2818,21 +2713,17 @@ for t in t_set } - seen for target in remaining: - self._refs.send_reference( - self.client, - to_jid=target, - anchor=anchor - ) + self._refs.send_reference(self.client, to_jid=target, anchor=anchor) async def handle_pubsub_ap_item( self, client: SatXMPPEntity, targets: dict[str, set[str]], mentions: list[dict[str, str]], - destinee: jid.JID|None, + destinee: jid.JID | None, node: str, item: dict, - public: bool + public: bool, ) -> None: """Analyse, cache and deliver AP items translating to Pubsub @@ -2859,14 +2750,16 @@ # this item is a reply to an AP item, we use or create a corresponding node # for comments parent_node, __ = await self.get_comments_nodes( - requestor_actor_id, - item["id"], - in_reply_to + requestor_actor_id, item["id"], in_reply_to ) node = parent_node or node cached_node = await self.host.memory.storage.get_pubsub_node( - client, service, node, with_subscriptions=True, create=True, - create_kwargs={"subscribed": True} + client, + service, + node, + with_subscriptions=True, + create=True, + create_kwargs={"subscribed": True}, ) else: # it is a root item (i.e. not a reply to an other item) @@ -2878,33 +2771,25 @@ log.warning( f"Received item in unknown node {node!r} at {service}. This may be " f"due to a cache purge. We synchronise the node\n{item}" - ) return if item.get("type") == TYPE_EVENT: data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt( - requestor_actor_id, - item + requestor_actor_id, item ) else: data, item_elt = await self.ap_item_2_mb_data_and_elt( - requestor_actor_id, - item + requestor_actor_id, item ) await self.host.memory.storage.cache_pubsub_items( - client, - cached_node, - [item_elt], - [data] + client, cached_node, [item_elt], [data] ) for subscription in cached_node.subscriptions: if subscription.state != SubscriptionState.SUBSCRIBED: continue self.pubsub_service.notifyPublish( - service, - node, - [(subscription.subscriber, None, [item_elt])] + service, node, [(subscription.subscriber, None, [item_elt])] ) await self.notify_mentions(targets, mentions, service, node, item_elt["id"]) @@ -2940,7 +2825,7 @@ History, History.origin_id, item_id, - (History.messages, History.subjects) + (History.messages, History.subjects), ) if history is not None: @@ -2973,7 +2858,5 @@ if subscription.state != SubscriptionState.SUBSCRIBED: continue self.pubsub_service.notifyRetract( - client.jid, - node, - [(subscription.subscriber, None, [item_elt])] + client.jid, node, [(subscription.subscriber, None, [item_elt])] )