# HG changeset patch # User Goffi # Date 1658332392 -7200 # Node ID 59fbb66b292361c2e362b9821860cc3f98dac626 # Parent ac255a0fbd4cfceac61fe37d113a62339d29f2b1 component AP gateway: handle XMPP attachments -> AP likes conversion: convert `noticed` attachments coming from XMPP to suitable `Like` activities. Virtual node is created in cache when necessary, and virtual items published to virtual JID mapping AP accounts are cached too. rel 370 diff -r ac255a0fbd4c -r 59fbb66b2923 sat/plugins/plugin_comp_ap_gateway/__init__.py --- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Wed Jul 20 17:49:51 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Wed Jul 20 17:53:12 2022 +0200 @@ -70,6 +70,7 @@ TYPE_FOLLOWERS, TYPE_TOMBSTONE, TYPE_MENTION, + TYPE_LIKE, NS_AP, NS_AP_PUBLIC, PUBLIC_TUPLE @@ -90,9 +91,9 @@ C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ - "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", "XEP-0329", - "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", - "XEP-0054" + "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", + "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES", + "IDENTITY", "PUBSUB_ATTACHMENTS" ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", @@ -131,6 +132,7 @@ self._c = host.plugins["PUBSUB_CACHE"] self._t = host.plugins["TEXT_SYNTAXES"] self._i = host.plugins["IDENTITY"] + self._pa = host.plugins["PUBSUB_ATTACHMENTS"] self._p.addManagedNode( "", items_cb=self._itemsReceived, @@ -287,9 +289,17 @@ return ap_account = self._e.unescape(recipient.user) - await self.convertAndPostItems( - client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items - ) + + if self._pa.isAttachmentNode(itemsEvent.nodeIdentifier): + await self.convertAndPostAttachments( + client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, + itemsEvent.items, publisher=itemsEvent.sender + ) + else: + await self.convertAndPostItems( + client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, + itemsEvent.items + ) async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: """Get client for this component with a specified jid @@ -406,11 +416,14 @@ if found_item is None: # the node is not in cache, we have to make a request to retrieve the item - # if doesn't exist, getItems will raise a NotFound exception + # If the node doesn't exist, getItems will raise a NotFound exception found_items, __ = await self._p.getItems( self.client, author_jid, node, item_ids=[item_id] ) - found_item = found_items[0] + try: + found_item = found_items[0] + except KeyError: + raise exceptions.NotFound("requested items can't be found") mb_data = await self._m.item2mbdata( self.client, found_item, author_jid, node @@ -834,6 +847,12 @@ """Generate base data for an activity @param activity: one of ACTIVITY_TYPES + @param actor_id: AP actor ID of the sender + @param object_: content of "object" field + @param target: content of "target" field + @param activity_id: ID to use for the activity + if not set it will be automatically generated, but it is usually desirable to + set the ID manually so it can be retrieved (e.g. for Undo) """ if activity not in ACTIVITY_TYPES: raise exceptions.InternalError(f"invalid activity: {activity!r}") @@ -948,27 +967,41 @@ service: jid.JID, node: str, items: List[domish.Element], - subscribe_comments_nodes: bool = False, + subscribe_extra_nodes: bool = True, ) -> None: """Convert XMPP items to AP items and post them to actor inbox - @param ap_account: account of ActivityPub actor - @param service: JID of the virtual pubsub service corresponding to the AP actor - @param node: virtual node corresponding to the AP actor and items - @param subscribe_comments_nodes: if True, comment nodes present in given items, - they will be automatically subscribed + @param ap_account: account of ActivityPub actor receiving the item + @param service: JID of the (virtual) pubsub service where the item has been + published + @param node: (virtual) node corresponding where the item has been published + @param subscribe_extra_nodes: if True, extra data nodes will be automatically + subscribed, that is comment nodes if present and attachments nodes. """ actor_id = await self.getAPActorIdFromAccount(ap_account) inbox = await self.getAPInboxFromId(actor_id) for item in items: if item.name == "item": mb_data = await self._m.item2mbdata(client, item, service, node) - if subscribe_comments_nodes: + if subscribe_extra_nodes: # we subscribe automatically to comment nodes if any + recipient_jid = self.getLocalJIDFromAccount(ap_account) + recipient_client = self.client.getVirtualClient(recipient_jid) for comment_data in mb_data.get("comments", []): comment_service = jid.JID(comment_data["service"]) comment_node = comment_data["node"] - await self._p.subscribe(client, comment_service, comment_node) + await self._p.subscribe( + recipient_client, comment_service, comment_node + ) + try: + await self._pa.subscribe( + recipient_client, service, node, mb_data["id"] + ) + except exceptions.NotFound: + log.debug( + f"no attachment node found for item {mb_data['id']!r} on " + f"{node!r} at {service}" + ) ap_item = await self.mbdata2APitem(client, mb_data) url_actor = ap_item["actor"] elif item.name == "retract": @@ -978,12 +1011,110 @@ else: raise exceptions.InternalError(f"unexpected element: {item.toXml()}") resp = await self.signAndPost(inbox, url_actor, ap_item) - if resp.code >= 300: - text = await resp.text() + + async def convertAndPostAttachments( + self, + client: SatXMPPEntity, + ap_account: str, + service: jid.JID, + node: str, + items: List[domish.Element], + publisher: Optional[jid.JID] + ) -> None: + """Convert XMPP item attachments to AP activities and post them to actor inbox + + @param ap_account: account of ActivityPub actor receiving the item + @param service: JID of the (virtual) pubsub service where the item has been + published + @param node: (virtual) node corresponding where the item has been published + @param subscribe_extra_nodes: if True, extra data nodes will be automatically + subscribed, that is comment nodes if present and attachments nodes. + """ + if len(items) != 1: + log.warning( + "we should get exactly one attachment item for an entity, got " + f"{len(items)})" + ) + + actor_id = await self.getAPActorIdFromAccount(ap_account) + inbox = await self.getAPInboxFromId(actor_id) + + item_elt = items[0] + if publisher is not None: + item_elt["publisher"] = publisher.userhost() + item_service, item_node, item_id = self._pa.attachmentNode2Item(node) + item_account = await self.getAPAccountFromJidAndNode(item_service, item_node) + if item_service.host == self.client.jid.userhost(): + # it's a virtual JID mapping to an external AP actor, we can use the + # item_id directly + item_url = item_id + if not item_url.startswith("https:"): log.warning( - f"unexpected return code while sending AP item: {resp.code}\n{text}\n" - f"{pformat(ap_item)}" + "item ID of external AP actor is not an https link, ignoring: " + f"{item_id!r}" ) + return + else: + item_url = self.buildAPURL(TYPE_ITEM, item_account, item_id) + + old_attachment_pubsub_items = await self.host.memory.storage.searchPubsubItems({ + "profiles": [self.client.profile], + "services": [service], + "names": [item_elt["id"]] + }) + if not old_attachment_pubsub_items: + old_attachment = {} + else: + old_attachment_items = [i.data for i in old_attachment_pubsub_items] + old_attachments = self._pa.items2attachmentData(client, old_attachment_items) + try: + old_attachment = old_attachments[0] + except IndexError: + # no known element was present in attachments + old_attachment = {} + sender_account = await self.getAPAccountFromJidAndNode( + client.jid, + None + ) + sender_actor_id = self.buildAPURL(TYPE_ACTOR, sender_account) + try: + attachments = self._pa.items2attachmentData(client, [item_elt])[0] + except IndexError: + # no known element was present in attachments + attachments = {} + + if "noticed" in attachments: + if not "noticed" in old_attachment: + # new "noticed" attachment, we translate to "Like" activity + activity_id = self.buildAPURL("like", item_account, item_id) + like = self.createActivity( + TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id + ) + like["to"] = [NS_AP_PUBLIC] + await self.signAndPost(inbox, sender_actor_id, like) + else: + if "noticed" in old_attachment: + # "noticed" attachment has been removed, we undo the "Like" activity + activity_id = self.buildAPURL("like", item_account, item_id) + like = self.createActivity( + TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id + ) + like["to"] = [NS_AP_PUBLIC] + undo = self.createActivity("Undo", sender_actor_id, like) + await self.signAndPost(inbox, sender_actor_id, undo) + + if service.user and service.host == self.client.jid.userhost(): + # the item is on a virtual service, we need to store it in cache + log.debug("storing attachments item in cache") + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node, with_subscriptions=True, create=True + ) + await self.host.memory.storage.cachePubsubItems( + self.client, + cached_node, + [item_elt], + [attachments] + ) async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: """Sign a documentent and post it to AP server @@ -1020,7 +1151,7 @@ body, headers=headers, ) - if resp.code >= 400: + if resp.code >= 300: text = await resp.text() log.warning(f"POST request to {url} failed [{resp.code}]: {text}") elif self.verbose: @@ -1694,7 +1825,7 @@ target_ap_account = await self.getAPAccountFromJidAndNode( service, node ) - if service.host == self.client.jid.userhost: + if service.host == self.client.jid.userhost(): # service is a proxy JID for AP account actor_data = await self.getAPActorDataFromAccount(target_ap_account) followers = actor_data.get("followers") @@ -1758,8 +1889,6 @@ item_data = await self.mbdata2APitem(client, mess_data) url_actor = item_data["actor"] resp = await self.signAndPost(inbox_url, url_actor, item_data) - if resp.code != 202: - raise exceptions.NetworkError(f"unexpected return code: {resp.code}") async def apDeleteItem( self, @@ -1909,12 +2038,6 @@ from_jid.userhostJID(), None, fastened_elts.id, public=False ) resp = await self.signAndPost(inbox, url_actor, ap_item) - if resp.code >= 300: - text = await resp.text() - log.warning( - f"unexpected return code while sending AP item: {resp.code}\n{text}\n" - f"{pformat(ap_item)}" - ) return False async def _onReferenceReceived( @@ -2003,12 +2126,6 @@ inbox = await self.getAPInboxFromId(actor_id) resp = await self.signAndPost(inbox, ap_item["actor"], ap_item) - if resp.code >= 300: - text = await resp.text() - log.warning( - f"unexpected return code while sending AP item: {resp.code}\n{text}\n" - f"{pformat(ap_item)}" - ) return False @@ -2263,27 +2380,9 @@ parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) node = parent_node or node cached_node = await self.host.memory.storage.getPubsubNode( - client, service, node, with_subscriptions=True + client, service, node, with_subscriptions=True, create=True, + create_kwargs={"subscribed": True} ) - if cached_node is None: - try: - cached_node = await self.host.memory.storage.setPubsubNode( - client, - service, - node, - subscribed=True - ) - except IntegrityError as e: - if "unique" in str(e.orig).lower(): - # the node may already exist, if it has been created just after - # getPubsubNode above - log.debug("ignoring UNIQUE constraint error") - cached_node = await self.host.memory.storage.getPubsubNode( - client, service, node, with_subscriptions=True - ) - else: - raise e - else: # it is a root item (i.e. not a reply to an other item) cached_node = await self.host.memory.storage.getPubsubNode( diff -r ac255a0fbd4c -r 59fbb66b2923 sat/plugins/plugin_comp_ap_gateway/constants.py --- a/sat/plugins/plugin_comp_ap_gateway/constants.py Wed Jul 20 17:49:51 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/constants.py Wed Jul 20 17:53:12 2022 +0200 @@ -29,6 +29,7 @@ TYPE_ITEM = "item" TYPE_TOMBSTONE = "Tombstone" TYPE_MENTION = "Mention" +TYPE_LIKE = "Like" MEDIA_TYPE_AP = "application/activity+json" NS_AP = "https://www.w3.org/ns/activitystreams" NS_AP_PUBLIC = f"{NS_AP}#Public" diff -r ac255a0fbd4c -r 59fbb66b2923 sat/plugins/plugin_comp_ap_gateway/pubsub_service.py --- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Wed Jul 20 17:49:51 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Wed Jul 20 17:53:12 2022 +0200 @@ -119,9 +119,14 @@ ) client = self.apg.client.getVirtualClient(requestor) - await self.apg.convertAndPostItems( - client, ap_account, service, nodeIdentifier, items - ) + if self.apg._pa.isAttachmentNode(nodeIdentifier): + await self.apg.convertAndPostAttachments( + client, ap_account, service, nodeIdentifier, items, publisher=requestor + ) + else: + await self.apg.convertAndPostItems( + client, ap_account, service, nodeIdentifier, items + ) async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: """Convert actor ID from following collection to XMPP item""" @@ -291,6 +296,10 @@ log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") return [], None + # cached_node may be pre-filled with some nodes (e.g. attachments nodes), + # otherwise it is filled when suitable + cached_node = None + client = self.apg.client kwargs = {} if node == self.apg._pps.subscriptions_node: @@ -315,6 +324,15 @@ self.apg.client, ap_account, itemIdentifiers ) return [item_elt], None + elif self.apg._pa.isAttachmentNode(node): + use_cache = True + # we check cache here because we emit an item-not-found error if the node is + # not in cache, as we are not dealing with real AP items + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) + if cached_node is None: + raise error.StanzaError("item-not-found") else: if not node.startswith(self.apg._m.namespace): raise error.StanzaError( @@ -326,11 +344,11 @@ parser = self.apg.apItem2Elt use_cache = True - client = self.apg.client if use_cache: - cached_node = await self.host.memory.storage.getPubsubNode( - client, service, node - ) + if cached_node is None: + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) # TODO: check if node is synchronised if cached_node is not None: # the node is cached, we return items from cache @@ -467,7 +485,7 @@ data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) resp = await self.apg.signAndPost(inbox, req_actor_id, data) - if resp.code >= 400: + if resp.code >= 300: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") @@ -488,7 +506,7 @@ ) resp = await self.apg.signAndPost(inbox, req_actor_id, data) - if resp.code >= 400: + if resp.code >= 300: text = await resp.text() raise error.StanzaError("service-unavailable", text=text)