Mercurial > libervia-backend
changeset 3803:d5f343939239
component AP gateway: message retractation => AP deletion
Convert XEP-0424 message retractation request to suitable AP delete activity.
The message workflow and its triggers are now used instead of a direct observer, as it is
now possible to do so with component, and this let other plugin to parse and eventually
update metadata.
rel 367
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 17 Jun 2022 14:15:23 +0200 |
parents | 983df907d456 |
children | 36b167ddbfca |
files | sat/plugins/plugin_comp_ap_gateway/__init__.py |
diffstat | 1 files changed, 122 insertions(+), 36 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Fri Jun 17 14:15:23 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Fri Jun 17 14:15:23 2022 +0200 @@ -87,7 +87,8 @@ C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ - "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES" + "XEP-0060", "XEP-0106", "XEP-0277", "XEP-0329", "XEP-0424", "XEP-0465", + "PUBSUB_CACHE", "TEXT_SYNTAXES" ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", @@ -112,6 +113,7 @@ self._m = host.plugins["XEP-0277"] self._p = host.plugins["XEP-0060"] self._e = host.plugins["XEP-0106"] + self._r = host.plugins["XEP-0424"] self._pps = host.plugins["XEP-0465"] self._c = host.plugins["PUBSUB_CACHE"] self._p.addManagedNode( @@ -119,6 +121,8 @@ ) self._t = host.plugins["TEXT_SYNTAXES"] self.pubsub_service = APPubsubService(self) + host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000) + host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract) host.bridge.addMethod( "APSend", @@ -227,9 +231,6 @@ self.client = client await self.init(client) - def profileConnected(self, client): - client.xmlstream.addObserver("/message/body", self.onMessage) - async def _itemsReceived(self, client, itemsEvent): """Callback called when pubsub items are received @@ -872,19 +873,9 @@ ap_item = await self.mbdata2APitem(client, mb_data) url_actor = ap_item["object"]["attributedTo"] elif item.name == "retract": - author_account = await self.getAPAccountFromJidAndNode(client.jid, node) - author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account) - url_item = self.buildAPURL(TYPE_ITEM, author_account, item["id"]) - ap_item = self.createActivity( - "Delete", - author_actor_id, - { - "id": url_item, - "type": TYPE_TOMBSTONE - } + url_actor, ap_item = await self.apDeleteItem( + client.jid, node, item["id"] ) - ap_item["to"] = [NS_AP_PUBLIC] - url_actor = author_actor_id else: raise exceptions.InternalError(f"unexpected element: {item.toXml()}") resp = await self.signAndPost(inbox, url_actor, ap_item) @@ -925,7 +916,7 @@ ) if resp.code >= 400: text = await resp.text() - log.warning(f"POST request to {url} failed: {text}") + log.warning(f"POST request to {url} failed [{resp.code}]: {text}") return resp def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): @@ -1553,38 +1544,133 @@ if resp.code != 202: raise exceptions.NetworkError(f"unexpected return code: {resp.code}") - @utils.ensure_deferred - async def onMessage(self, message_elt: domish.Element) -> None: - """Called when a XMPP message is received""" + async def apDeleteItem( + self, + jid_: jid.JID, + node: Optional[str], + item_id: str, + public: bool = True + ) -> Tuple[str, Dict[str, Any]]: + """Build activity to delete an AP item + + @param jid_: JID of the entity deleting an item + @param node: node where the item is deleted + None if it's microblog or a message + @param item_id: ID of the item to delete + it's the Pubsub ID or message's origin ID + @param public: if True, the activity will be addressed to public namespace + @return: actor_id of the entity deleting the item, activity to send + """ + author_account = await self.getAPAccountFromJidAndNode(jid_, node) + author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account) + url_item = self.buildAPURL(TYPE_ITEM, author_account, item_id) + ap_item = self.createActivity( + "Delete", + author_actor_id, + { + "id": url_item, + "type": TYPE_TOMBSTONE + } + ) + if public: + ap_item["to"] = [NS_AP_PUBLIC] + url_actor = author_actor_id + return url_actor, ap_item + + def _messageReceivedTrigger( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + post_treat: defer.Deferred + ) -> bool: + """add the gateway workflow on post treatment""" if not self.client: log.warning(f"no client set, ignoring message: {message_elt.toXml()}") - return - mess_type = message_elt.getAttribute("type") - if mess_type and mess_type not in ("chat", "normal"): - log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}") - return - from_jid = jid.JID(message_elt["from"]) - if not self.isLocal(from_jid): - log.warning(f"ignoring non local message: {message_elt.toXml()}") - return - to_jid = jid.JID(message_elt["to"]) - if not to_jid.user: + return True + post_treat.addCallback( + lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data)) + ) + return True + + async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict: + """Called once message has been parsed + + this method handle the conversion to AP items and posting + """ + if client != self.client: + return mess_data + if mess_data["type"] not in ("chat", "normal"): + log.warning(f"ignoring message with unexpected type: {mess_data['xml'].toXml()}") + return mess_data + if not self.isLocal(mess_data["from"]): + log.warning(f"ignoring non local message: {mess_data['xml'].toXml()}") + return mess_data + if not mess_data["to"].user: log.warning( - f"ignoring message addressed to gateway itself: {message_elt.toXml()}" + f"ignoring message addressed to gateway itself: {mess_data['xml'].toXml()}" ) - return + return mess_data - actor_account = self._e.unescape(to_jid.user) + actor_account = self._e.unescape(mess_data["to"].user) actor_id = await self.getAPActorIdFromAccount(actor_account) inbox = await self.getAPInboxFromId(actor_id) + try: + language, message = next(iter(mess_data["message"].items())) + except (KeyError, StopIteration): + log.warning(f"ignoring empty message: {mess_data}") + return mess_data + mb_data = { - "content": str(message_elt.body), + "content": message, } - client = self.client.getVirtualClient(from_jid) + if language: + mb_data["language"] = language + origin_id = mess_data["extra"].get("origin_id") + if origin_id: + # we need to use origin ID when present to be able to retract the message + mb_data["id"] = origin_id + client = self.client.getVirtualClient(mess_data["from"]) ap_item = await self.mbdata2APitem(client, mb_data, public=False) ap_item["object"]["to"] = ap_item["to"] = [actor_id] await self.signAndPost(inbox, ap_item["actor"], ap_item) + return mess_data + + async def _onMessageRetract( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + retract_elt: domish.Element, + fastened_elts + ) -> bool: + if client != self.client: + return True + from_jid = jid.JID(message_elt["from"]) + if not self.isLocal(from_jid): + log.debug( + f"ignoring retract request from non local jid {from_jid}" + ) + return False + to_jid = jid.JID(message_elt["to"]) + if (to_jid.host != self.client.jid.full() or not to_jid.user): + # to_jid should be a virtual JID from this gateway + raise exceptions.InternalError( + f"Invalid destinee's JID: {to_jid.full()}" + ) + ap_account = self._e.unescape(to_jid.user) + actor_id = await self.getAPActorIdFromAccount(ap_account) + inbox = await self.getAPInboxFromId(actor_id) + url_actor, ap_item = await self.apDeleteItem( + from_jid.userhostJID(), None, fastened_elts.id, public=False + ) + resp = await self.signAndPost(inbox, url_actor, ap_item) + if resp.code >= 300: + text = await resp.text() + log.warning( + f"unexpected return code while sending AP item: {resp.code}\n{text}\n" + f"{pformat(ap_item)}" + ) + return False async def newReplyToXMPPItem( self,