Mercurial > libervia-backend
changeset 3844:65e5718e7710
component AP gateway: `Announce` activity implementation:
`Announce` and `Undo` of `Announce` are now implemented and converted to suitable XEP-0277
"repeat" items, or retract.
rel 370
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 14 Jul 2022 12:55:30 +0200 |
parents | 17c757bd74bc |
children | 4f9d4650eab5 |
files | sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/constants.py sat/plugins/plugin_comp_ap_gateway/http_server.py |
diffstat | 3 files changed, 109 insertions(+), 35 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Thu Jul 14 12:55:30 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Thu Jul 14 12:55:30 2022 +0200 @@ -1267,7 +1267,10 @@ item_elt = await self._m.data2entry( self.client, mb_data, mb_data["id"], None, self._m.namespace ) - item_elt["publisher"] = mb_data["author_jid"] + if "repeated" in mb_data["extra"]: + item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] + else: + item_elt["publisher"] = mb_data["author_jid"] return mb_data, item_elt async def apItem2Elt(self, ap_item: dict) -> domish.Element: @@ -1360,6 +1363,8 @@ async def apItem2MBdata(self, ap_item: dict) -> dict: """Convert AP activity or object to microblog data + @param ap_item: ActivityPub item to convert + Can be either an activity of an object @return: AP Item's Object and microblog data @raise exceptions.DataError: something is invalid in the AP item @raise NotImplemented: some AP data is not handled yet @@ -1377,7 +1382,7 @@ if not item_id: log.warning(f'No "id" found in AP item: {ap_object!r}') raise exceptions.DataError - mb_data = {"id": item_id} + mb_data = {"id": item_id, "extra": {}} # content try: @@ -1428,6 +1433,10 @@ except dateutil.parser.ParserError as e: log.warning(f"Can't parse {field!r} field: {e}") + # repeat + if "_repeated" in ap_item: + mb_data["extra"]["repeated"] = ap_item["_repeated"] + # comments in_reply_to = ap_object.get("inReplyTo") __, comments_node = await self.getCommentsNodes(item_id, in_reply_to) @@ -2194,6 +2203,7 @@ @param node: XMPP pubsub node @param activity: parent AP activity @param item: AP object payload + only the "id" field is used """ item_id = item.get("id") if not item_id:
--- a/sat/plugins/plugin_comp_ap_gateway/constants.py Thu Jul 14 12:55:30 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/constants.py Thu Jul 14 12:55:30 2022 +0200 @@ -67,7 +67,7 @@ ACTIVITY_TARGET_MANDATORY = ("Add", "Remove") # activities which can be used with Shared Inbox (i.e. with no account specified) # must be lowercase -ACTIVIY_NO_ACCOUNT_ALLOWED = ("create", "delete") +ACTIVIY_NO_ACCOUNT_ALLOWED = ("create", "delete", "announce", "undo") # maximum number of parents to retrieve when comments_max_depth option is set COMMENTS_MAX_PARENTS = 100 # maximum size of avatar, in bytes
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py Thu Jul 14 12:55:30 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Thu Jul 14 12:55:30 2022 +0200 @@ -35,7 +35,7 @@ from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger -from sat.tools.common import date_utils +from sat.tools.common import date_utils, uri from sat.memory.sqla_mapping import SubscriptionState from .constants import ( @@ -129,22 +129,26 @@ if actor != signing_actor: log.warning(f"ignoring object not attributed to signing actor: {data}") continue - try: - target_account = obj["object"] - except KeyError: - log.warning(f'ignoring invalid object, missing "object" key: {data}') - continue - if not self.apg.isLocalURL(target_account): - log.warning(f"ignoring unfollow request to non local actor: {data}") - continue if type_ == "Follow": + try: + target_account = obj["object"] + except KeyError: + log.warning(f'ignoring invalid object, missing "object" key: {data}') + continue + if not self.apg.isLocalURL(target_account): + log.warning(f"ignoring unfollow request to non local actor: {data}") + continue await self.apg._p.unsubscribe( client, account_jid, node, sender=client.jid, ) + elif type_ == "Announce": + # we can use directly the Announce object, as only the "id" field is + # needed + await self.apg.newAPDeleteItem(client, None, node, obj) else: log.warning(f"Unmanaged undo type: {type_!r}") @@ -243,11 +247,6 @@ ap_url: str, signing_actor: str ): - digest = request.getHeader("digest") - if digest in self._seen_digest: - log.debug(f"Ignoring duplicated request (digest: {digest!r})") - return - self._seen_digest.append(digest) if node is None: node = self.apg._m.namespace client = await self.apg.getVirtualClient(signing_actor) @@ -255,6 +254,59 @@ for obj in objects: await self.apg.newAPDeleteItem(client, account_jid, node, obj) + async def handleNewAPItems( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + signing_actor: str, + repeated: bool = False, + ): + """Helper method to handle workflow for new AP items + + accept globally the same parameter as for handleCreateActivity + @param repeated: if True, the item is an item republished from somewhere else + """ + if "_repeated" in data: + log.error( + '"_repeated" field already present in given AP item, this should not ' + f"happen. Ignoring object from {signing_actor}\n{data}" + ) + raise exceptions.DataError("unexpected field in item") + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + objects = await self.apg.apGetList(data, "object") + for obj in objects: + sender = await self.apg.apGetSenderActor(obj) + if repeated: + # we don't check sender when item is repeated, as it should be different + # from post author in this case + sender_jid = await self.apg.getJIDFromId(sender) + repeater_jid = await self.apg.getJIDFromId(signing_actor) + + obj["_repeated"] = { + "by": repeater_jid.full(), + "at": data.get("published"), + "uri": uri.buildXMPPUri( + "pubsub", + path=sender_jid.full(), + node=self.apg._m.namespace, + item=obj["id"] + ) + } + # we must use activity's id and targets, not the original item ones + for field in ("id", "to", "bto", "cc", "bcc"): + obj[field] = data.get(field) + else: + if sender != signing_actor: + log.warning( + "Ignoring object not attributed to signing actor: {obj}" + ) + continue + await self.apg.newAPItem(client, account_jid, node, obj) + async def handleCreateActivity( self, request: "HTTPRequest", @@ -265,23 +317,27 @@ ap_url: str, signing_actor: str ): - digest = request.getHeader("digest") - if digest in self._seen_digest: - log.debug(f"Ignoring duplicated request (digest: {digest!r})") - return - self._seen_digest.append(digest) - if node is None: - node = self.apg._m.namespace - client = await self.apg.getVirtualClient(signing_actor) - objects = await self.apg.apGetList(data, "object") - for obj in objects: - sender = await self.apg.apGetSenderActor(obj) - if sender != signing_actor: - log.warning( - "Ignoring object not attributed to signing actor: {obj}" - ) - else: - await self.apg.newAPItem(client, account_jid, node, obj) + await self.handleNewAPItems(request, data, account_jid, node, signing_actor) + + async def handleAnnounceActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + # we create a new item + await self.handleNewAPItems( + request, + data, + account_jid, + node, + signing_actor, + repeated=True + ) async def APActorRequest( self, @@ -680,8 +736,16 @@ request.finish() return + request.setResponseCode(http.ACCEPTED) + + digest = request.getHeader("digest") + if digest in self._seen_digest: + log.debug(f"Ignoring duplicated request (digest: {digest!r})") + request.finish() + return + self._seen_digest.append(digest) + # default response code, may be changed, e.g. in case of exception - request.setResponseCode(http.ACCEPTED) try: return await self.APRequest(request, signing_actor) except Exception as e: