Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3844:65e5718e7710
component AP gateway: `Announce` activity implementation:
`Announce` and `Undo` of `Announce` are now implemented and converted to suitable XEP-0277
"repeat" items, or retract.
rel 370
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 14 Jul 2022 12:55:30 +0200 |
parents | 381340b9a9ee |
children | cc13efdd8360 |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py Thu Jul 14 12:55:30 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Thu Jul 14 12:55:30 2022 +0200 @@ -35,7 +35,7 @@ from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger -from sat.tools.common import date_utils +from sat.tools.common import date_utils, uri from sat.memory.sqla_mapping import SubscriptionState from .constants import ( @@ -129,22 +129,26 @@ if actor != signing_actor: log.warning(f"ignoring object not attributed to signing actor: {data}") continue - try: - target_account = obj["object"] - except KeyError: - log.warning(f'ignoring invalid object, missing "object" key: {data}') - continue - if not self.apg.isLocalURL(target_account): - log.warning(f"ignoring unfollow request to non local actor: {data}") - continue if type_ == "Follow": + try: + target_account = obj["object"] + except KeyError: + log.warning(f'ignoring invalid object, missing "object" key: {data}') + continue + if not self.apg.isLocalURL(target_account): + log.warning(f"ignoring unfollow request to non local actor: {data}") + continue await self.apg._p.unsubscribe( client, account_jid, node, sender=client.jid, ) + elif type_ == "Announce": + # we can use directly the Announce object, as only the "id" field is + # needed + await self.apg.newAPDeleteItem(client, None, node, obj) else: log.warning(f"Unmanaged undo type: {type_!r}") @@ -243,11 +247,6 @@ ap_url: str, signing_actor: str ): - digest = request.getHeader("digest") - if digest in self._seen_digest: - log.debug(f"Ignoring duplicated request (digest: {digest!r})") - return - self._seen_digest.append(digest) if node is None: node = self.apg._m.namespace client = await self.apg.getVirtualClient(signing_actor) @@ -255,6 +254,59 @@ for obj in objects: await self.apg.newAPDeleteItem(client, account_jid, node, obj) + async def handleNewAPItems( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + signing_actor: str, + repeated: bool = False, + ): + """Helper method to handle workflow for new AP items + + accept globally the same parameter as for handleCreateActivity + @param repeated: if True, the item is an item republished from somewhere else + """ + if "_repeated" in data: + log.error( + '"_repeated" field already present in given AP item, this should not ' + f"happen. Ignoring object from {signing_actor}\n{data}" + ) + raise exceptions.DataError("unexpected field in item") + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + objects = await self.apg.apGetList(data, "object") + for obj in objects: + sender = await self.apg.apGetSenderActor(obj) + if repeated: + # we don't check sender when item is repeated, as it should be different + # from post author in this case + sender_jid = await self.apg.getJIDFromId(sender) + repeater_jid = await self.apg.getJIDFromId(signing_actor) + + obj["_repeated"] = { + "by": repeater_jid.full(), + "at": data.get("published"), + "uri": uri.buildXMPPUri( + "pubsub", + path=sender_jid.full(), + node=self.apg._m.namespace, + item=obj["id"] + ) + } + # we must use activity's id and targets, not the original item ones + for field in ("id", "to", "bto", "cc", "bcc"): + obj[field] = data.get(field) + else: + if sender != signing_actor: + log.warning( + "Ignoring object not attributed to signing actor: {obj}" + ) + continue + await self.apg.newAPItem(client, account_jid, node, obj) + async def handleCreateActivity( self, request: "HTTPRequest", @@ -265,23 +317,27 @@ ap_url: str, signing_actor: str ): - digest = request.getHeader("digest") - if digest in self._seen_digest: - log.debug(f"Ignoring duplicated request (digest: {digest!r})") - return - self._seen_digest.append(digest) - if node is None: - node = self.apg._m.namespace - client = await self.apg.getVirtualClient(signing_actor) - objects = await self.apg.apGetList(data, "object") - for obj in objects: - sender = await self.apg.apGetSenderActor(obj) - if sender != signing_actor: - log.warning( - "Ignoring object not attributed to signing actor: {obj}" - ) - else: - await self.apg.newAPItem(client, account_jid, node, obj) + await self.handleNewAPItems(request, data, account_jid, node, signing_actor) + + async def handleAnnounceActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + # we create a new item + await self.handleNewAPItems( + request, + data, + account_jid, + node, + signing_actor, + repeated=True + ) async def APActorRequest( self, @@ -680,8 +736,16 @@ request.finish() return + request.setResponseCode(http.ACCEPTED) + + digest = request.getHeader("digest") + if digest in self._seen_digest: + log.debug(f"Ignoring duplicated request (digest: {digest!r})") + request.finish() + return + self._seen_digest.append(digest) + # default response code, may be changed, e.g. in case of exception - request.setResponseCode(http.ACCEPTED) try: return await self.APRequest(request, signing_actor) except Exception as e: