# HG changeset patch # User Goffi # Date 1658419655 -7200 # Node ID c0bcbcf5b4b7bf01392330bd1b3b197cd645cc74 # Parent 37d2c0282304fea726679c6fcc59eeafa69b07fc component AP gateway: handle `Like` and `Undo`/`Like` activities: rel 370 diff -r 37d2c0282304 -r c0bcbcf5b4b7 sat/plugins/plugin_comp_ap_gateway/__init__.py --- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Thu Jul 21 18:05:20 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Thu Jul 21 18:07:35 2022 +0200 @@ -293,7 +293,7 @@ if self._pa.isAttachmentNode(itemsEvent.nodeIdentifier): await self.convertAndPostAttachments( client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, - itemsEvent.items, publisher=itemsEvent.sender + itemsEvent.items ) else: await self.convertAndPostItems( @@ -581,7 +581,7 @@ if self.client is None: raise exceptions.InternalError("Client is not set yet") - if jid_.host == self.client.jid.userhost(): + if self.isVirtualJID(jid_): # this is an proxy JID to an AP Actor return self._e.unescape(jid_.user) @@ -786,6 +786,10 @@ """ return url.startswith(self.base_ap_url) + def isVirtualJID(self, jid_: jid.JID) -> bool: + """Tell if a JID is an AP actor mapped through this gateway""" + return jid_.host == self.client.jid.userhost() + def buildSignatureHeader(self, values: Dict[str, str]) -> str: """Build key="" signature header from signature data""" fields = [] @@ -983,12 +987,16 @@ for item in items: if item.name == "item": mb_data = await self._m.item2mbdata(client, item, service, node) - if subscribe_extra_nodes: + author_jid = jid.JID(mb_data["author_jid"]) + if subscribe_extra_nodes and not self.isVirtualJID(author_jid): # we subscribe automatically to comment nodes if any recipient_jid = self.getLocalJIDFromAccount(ap_account) recipient_client = self.client.getVirtualClient(recipient_jid) for comment_data in mb_data.get("comments", []): comment_service = jid.JID(comment_data["service"]) + if self.isVirtualJID(comment_service): + log.debug(f"ignoring virtual comment service: {comment_data}") + continue comment_node = comment_data["node"] await self._p.subscribe( recipient_client, comment_service, comment_node @@ -1019,7 +1027,7 @@ service: jid.JID, node: str, items: List[domish.Element], - publisher: Optional[jid.JID] + publisher: Optional[jid.JID] = None ) -> None: """Convert XMPP item attachments to AP activities and post them to actor inbox @@ -1027,8 +1035,14 @@ @param service: JID of the (virtual) pubsub service where the item has been published @param node: (virtual) node corresponding where the item has been published - @param subscribe_extra_nodes: if True, extra data nodes will be automatically - subscribed, that is comment nodes if present and attachments nodes. + subscribed, that is comment nodes if present and attachments nodes. + @param items: attachments items + @param publisher: publisher of the attachments item (it's NOT the PEP/Pubsub + service, it's the publisher of the item). To be filled only when the publisher + is known for sure, otherwise publisher will be determined either if + "publisher" attribute is set by pubsub service, or as a last resort, using + item's ID (which MUST be publisher bare JID according to pubsub-attachments + specification). """ if len(items) != 1: log.warning( @@ -1040,11 +1054,29 @@ inbox = await self.getAPInboxFromId(actor_id) item_elt = items[0] + item_id = item_elt["id"] + + if publisher is None: + item_pub_s = item_elt.getAttribute("publisher") + publisher = jid.JID(item_pub_s) if item_pub_s else jid.JID(item_id) + + if publisher.userhost() != item_id: + log.warning( + "attachments item ID must be publisher's bare JID, ignoring: " + f"{item_elt.toXml()}" + ) + return + + if self.isVirtualJID(publisher): + log.debug(f"ignoring item coming from local virtual JID {publisher}") + return + if publisher is not None: item_elt["publisher"] = publisher.userhost() + item_service, item_node, item_id = self._pa.attachmentNode2Item(node) item_account = await self.getAPAccountFromJidAndNode(item_service, item_node) - if item_service.host == self.client.jid.userhost(): + if self.isVirtualJID(item_service): # it's a virtual JID mapping to an external AP actor, we can use the # item_id directly item_url = item_id @@ -1072,11 +1104,11 @@ except IndexError: # no known element was present in attachments old_attachment = {} - sender_account = await self.getAPAccountFromJidAndNode( - client.jid, + publisher_account = await self.getAPAccountFromJidAndNode( + publisher, None ) - sender_actor_id = self.buildAPURL(TYPE_ACTOR, sender_account) + publisher_actor_id = self.buildAPURL(TYPE_ACTOR, publisher_account) try: attachments = self._pa.items2attachmentData(client, [item_elt])[0] except IndexError: @@ -1088,22 +1120,22 @@ # new "noticed" attachment, we translate to "Like" activity activity_id = self.buildAPURL("like", item_account, item_id) like = self.createActivity( - TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id + TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id ) like["to"] = [NS_AP_PUBLIC] - await self.signAndPost(inbox, sender_actor_id, like) + await self.signAndPost(inbox, publisher_actor_id, like) else: if "noticed" in old_attachment: # "noticed" attachment has been removed, we undo the "Like" activity activity_id = self.buildAPURL("like", item_account, item_id) like = self.createActivity( - TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id + TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id ) like["to"] = [NS_AP_PUBLIC] - undo = self.createActivity("Undo", sender_actor_id, like) - await self.signAndPost(inbox, sender_actor_id, undo) + undo = self.createActivity("Undo", publisher_actor_id, like) + await self.signAndPost(inbox, publisher_actor_id, undo) - if service.user and service.host == self.client.jid.userhost(): + if service.user and self.isVirtualJID(service): # the item is on a virtual service, we need to store it in cache log.debug("storing attachments item in cache") cached_node = await self.host.memory.storage.getPubsubNode( @@ -1720,7 +1752,7 @@ rep_item = parsed_url["item"] activity_id = self.buildAPURL("item", repeater.userhost(), mb_data["id"]) - if rep_service.host == self.client.jid.userhost(): + if self.isVirtualJID(rep_service): # it's an AP actor linked through this gateway # in this case we can simply use the item ID if not rep_item.startswith("https:"): @@ -1825,7 +1857,7 @@ target_ap_account = await self.getAPAccountFromJidAndNode( service, node ) - if service.host == self.client.jid.userhost(): + if self.isVirtualJID(service): # service is a proxy JID for AP account actor_data = await self.getAPActorDataFromAccount(target_ap_account) followers = actor_data.get("followers") @@ -1836,7 +1868,7 @@ ap_object["cc"] = [followers] if self._m.isCommentNode(node): parent_item = self._m.getParentItem(node) - if service.host == self.client.jid.userhost(): + if self.isVirtualJID(service): # the publication is on a virtual node (i.e. an XMPP node managed by # this gateway and linking to an ActivityPub actor) ap_object["inReplyTo"] = parent_item diff -r 37d2c0282304 -r c0bcbcf5b4b7 sat/plugins/plugin_comp_ap_gateway/http_server.py --- a/sat/plugins/plugin_comp_ap_gateway/http_server.py Thu Jul 21 18:05:20 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Thu Jul 21 18:07:35 2022 +0200 @@ -34,6 +34,7 @@ from sat.core import exceptions from sat.core.constants import Const as C from sat.core.i18n import _ +from sat.core.core_types import SatXMPPEntity from sat.core.log import getLogger from sat.tools.common import date_utils, uri from sat.memory.sqla_mapping import SubscriptionState @@ -41,7 +42,7 @@ from .constants import ( NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, - SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING + SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE ) from .regex import RE_SIG_PARAM @@ -149,6 +150,8 @@ # we can use directly the Announce object, as only the "id" field is # needed await self.apg.newAPDeleteItem(client, None, node, obj) + elif type_ == TYPE_LIKE: + await self.handleNewLikeItem(client, obj, True) else: log.warning(f"Unmanaged undo type: {type_!r}") @@ -339,6 +342,108 @@ repeated=True ) + async def handleNewLikeItem( + self, + client: SatXMPPEntity, + data: dict, + undo: bool = False, + ) -> None: + liked_ids = data.get("object") + if not liked_ids: + raise exceptions.DataError("object should be set") + elif isinstance(liked_ids, list): + try: + liked_ids = [o["id"] for o in liked_ids] + except (KeyError, TypeError): + raise exceptions.DataError(f"invalid object: {liked_ids!r}") + elif isinstance(liked_ids, dict): + obj_id = liked_ids.get("id") + if not obj_id or not isinstance(obj_id, str): + raise exceptions.DataError(f"invalid object: {liked_ids!r}") + liked_ids = [obj_id] + elif isinstance(liked_ids, str): + liked_ids = [liked_ids] + + for liked_id in liked_ids: + if not self.apg.isLocalURL(liked_id): + log.debug(f"ignoring non local liked ID: {liked_id}") + continue + url_type, url_args = self.apg.parseAPURL(liked_id) + if url_type != TYPE_ITEM: + log.warning(f"unexpected local URL for liked item: {liked_id}") + continue + try: + account, item_id = url_args + except ValueError: + raise ValueError(f"invalid URL: {liked_id}") + author_jid, item_node = await self.apg.getJIDAndNode(account) + if item_node is None: + item_node = self.apg._m.namespace + attachment_node = self.apg._pa.getAttachmentNodeName( + author_jid, item_node, item_id + ) + cached_node = await self.apg.host.memory.storage.getPubsubNode( + client, + author_jid, + attachment_node, + with_subscriptions=True, + create=True + ) + found_items, __ = await self.apg.host.memory.storage.getItems( + cached_node, item_ids=[item_id] + ) + if not found_items: + old_item_elt = None + else: + found_item = found_items[0] + old_item_elt = found_item.data + + item_elt = self.apg._pa.applySetHandler( + client, + {"extra": {"noticed": not undo}}, + old_item_elt, + [("noticed", self.apg._pa.namespace)] + ) + # we reparse the element, as there can be other attachments + attachments_data = self.apg._pa.items2attachmentData(client, [item_elt]) + # and we update the cache + await self.apg.host.memory.storage.cachePubsubItems( + client, + cached_node, + [item_elt], + attachments_data or [{}] + ) + + if self.apg.isVirtualJID(author_jid): + # the attachment is on t a virtual pubsub service (linking to an AP item), + # we notify all subscribers + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.apg.pubsub_service.notifyPublish( + author_jid, + attachment_node, + [(subscription.subscriber, None, [item_elt])] + ) + else: + # the attachment is on an XMPP item, we publish it to the attachment node + await self.apg._p.sendItems( + client, author_jid, attachment_node, [item_elt] + ) + + async def handleLikeActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + client = await self.apg.getVirtualClient(signing_actor) + await self.handleNewLikeItem(client, data) + async def APActorRequest( self, request: "HTTPRequest", @@ -623,7 +728,7 @@ ) followers = [] for subscriber in subscribers.keys(): - if subscriber.host == self.apg.client.jid.userhost(): + if self.apg.isVirtualJID(subscriber): # the subscriber is an AP user subscribed with this gateway ap_account = self.apg._e.unescape(subscriber.user) else: @@ -660,7 +765,7 @@ following = [] for sub_dict in subscriptions: service = jid.JID(sub_dict["service"]) - if service.host == self.apg.client.jid.userhost(): + if self.apg.isVirtualJID(service): # the subscription is to an AP actor with this gateway ap_account = self.apg._e.unescape(service.user) else: diff -r 37d2c0282304 -r c0bcbcf5b4b7 sat/plugins/plugin_pubsub_attachments.py --- a/sat/plugins/plugin_pubsub_attachments.py Thu Jul 21 18:05:20 2022 +0200 +++ b/sat/plugins/plugin_pubsub_attachments.py Thu Jul 21 18:07:35 2022 +0200 @@ -61,7 +61,7 @@ host.registerNamespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS) self.host = host self._p = host.plugins["XEP-0060"] - self.handlers = {} + self.handlers: Dict[Tuple[str, str], dict[str, Any]] = {} host.trigger.add("XEP-0277_send", self.onMBSend) self.registerAttachmentHandler( "noticed", NS_PUBSUB_ATTACHMENTS, self.noticedGet, self.noticedSet @@ -270,12 +270,73 @@ ) -> None: client = self.host.getClient(profile_key) attachments = data_format.deserialise(attachments_s) or {} - return defer.ensureDeferred(self.setAttachments( client, attachments)) + return defer.ensureDeferred(self.setAttachments(client, attachments)) + + def applySetHandler( + self, + client: SatXMPPEntity, + attachments_data: dict, + item_elt: Optional[domish.Element], + handlers: Optional[List[Tuple[str, str]]] = None, + from_jid: Optional[jid.JID] = None, + ) -> domish.Element: + """Apply all ``set`` callbacks to an attachments item + + @param attachments_data: data describing the attachments + ``extra`` key will be used, and created if not found + @param from_jid: jid of the author of the attachments + ``client.jid.userhostJID()`` will be used if not specified + @param item_elt: item containing an element + will be modified in place + if None, a new element will be created + @param handlers: list of (name, namespace) of handlers to use. + if None, all registered handlers will be used. + @return: updated item_elt if given, otherwise a new item_elt + """ + attachments_data.setdefault("extra", {}) + if item_elt is None: + item_id = client.jid.userhost() if from_jid is None else from_jid.userhost() + item_elt = pubsub.Item(item_id) + item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) + + try: + attachments_elt = next( + item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments") + ) + except StopIteration: + log.warning( + f"no element found, creating a new one: {item_elt.toXml()}" + ) + attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) + + if handlers is None: + handlers = list(self.handlers.keys()) + + for name, namespace in handlers: + try: + handler = self.handlers[(name, namespace)] + except KeyError: + log.error( + f"unregistered handler ({name!r}, {namespace!r}) is requested, " + "ignoring" + ) + continue + try: + former_elt = next(attachments_elt.elements(namespace, name)) + except StopIteration: + former_elt = None + new_elt = handler["set"](client, attachments_data, former_elt) + if new_elt != former_elt: + if former_elt is not None: + attachments_elt.children.remove(former_elt) + if new_elt is not None: + attachments_elt.addChild(new_elt) + return item_elt async def setAttachments( self, client: SatXMPPEntity, - data: Dict[str, Any] + attachments_data: Dict[str, Any] ) -> None: """Set or update attachments @@ -287,51 +348,39 @@ used in attachments where "update" makes sense (e.g. it's used for "reactions" but not for "noticed"). - @param data: microblog data data. Various keys (usually stored in - data["extra"]) may be used depending on the attachments handlers - registered. The keys "service", "node" and "id" MUST be set. + @param attachments_data: data describing attachments. Various keys (usually stored + in attachments_data["extra"]) may be used depending on the attachments + handlers registered. The keys "service", "node" and "id" MUST be set. + ``attachments_data`` is thought to be compatible with microblog data. + """ - data.setdefault("extra", {}) try: - service = jid.JID(data["service"]) - node = data["node"] - item = data["id"] + service = jid.JID(attachments_data["service"]) + node = attachments_data["node"] + item = attachments_data["id"] except (KeyError, RuntimeError): raise ValueError( 'data must have "service", "node" and "id" set' ) attachment_node = self.getAttachmentNodeName(service, node, item) - items, __ = await self._p.getItems( - client, service, attachment_node, item_ids=[client.jid.userhost()] - ) - if not items: - # the item doesn't exist, we create a new one - item_elt = pubsub.Item(client.jid.userhost()) - item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) - else: - item_elt = items[0] - try: - attachments_elt = next( - item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments") + items, __ = await self._p.getItems( + client, service, attachment_node, item_ids=[client.jid.userhost()] ) - except StopIteration: - log.warning( - f"no element found, creating a new one: {item_elt.toXml()}" - ) - attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) + except exceptions.NotFound: + item_elt = None + else: + if not items: + item_elt = None + else: + item_elt = items[0] - for (name, namespace), handler in self.handlers.items(): - try: - former_elt = next(attachments_elt.elements(namespace, name)) - except StopIteration: - former_elt = None - new_elt = handler["set"](client, data, former_elt) - if new_elt != former_elt: - if former_elt is not None: - attachments_elt.children.remove(former_elt) - if new_elt is not None: - attachments_elt.addChild(new_elt) + item_elt = self.applySetHandler( + client, + attachments_data, + item_elt=item_elt, + ) + try: await self._p.sendItems(client, service, attachment_node, [item_elt]) except error.StanzaError as e: