Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3803:d5f343939239
component AP gateway: message retractation => AP deletion
Convert XEP-0424 message retractation request to suitable AP delete activity.
The message workflow and its triggers are now used instead of a direct observer, as it is
now possible to do so with component, and this let other plugin to parse and eventually
update metadata.
rel 367
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 17 Jun 2022 14:15:23 +0200 |
parents | b5c9021020df |
children | 36b167ddbfca |
comparison
equal
deleted
inserted
replaced
3802:983df907d456 | 3803:d5f343939239 |
---|---|
85 C.PI_IMPORT_NAME: IMPORT_NAME, | 85 C.PI_IMPORT_NAME: IMPORT_NAME, |
86 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | 86 C.PI_MODES: [C.PLUG_MODE_COMPONENT], |
87 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | 87 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, |
88 C.PI_PROTOCOLS: [], | 88 C.PI_PROTOCOLS: [], |
89 C.PI_DEPENDENCIES: [ | 89 C.PI_DEPENDENCIES: [ |
90 "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES" | 90 "XEP-0060", "XEP-0106", "XEP-0277", "XEP-0329", "XEP-0424", "XEP-0465", |
91 "PUBSUB_CACHE", "TEXT_SYNTAXES" | |
91 ], | 92 ], |
92 C.PI_RECOMMENDATIONS: [], | 93 C.PI_RECOMMENDATIONS: [], |
93 C.PI_MAIN: "APGateway", | 94 C.PI_MAIN: "APGateway", |
94 C.PI_HANDLER: C.BOOL_TRUE, | 95 C.PI_HANDLER: C.BOOL_TRUE, |
95 C.PI_DESCRIPTION: _( | 96 C.PI_DESCRIPTION: _( |
110 self.initialised = False | 111 self.initialised = False |
111 self.client = None | 112 self.client = None |
112 self._m = host.plugins["XEP-0277"] | 113 self._m = host.plugins["XEP-0277"] |
113 self._p = host.plugins["XEP-0060"] | 114 self._p = host.plugins["XEP-0060"] |
114 self._e = host.plugins["XEP-0106"] | 115 self._e = host.plugins["XEP-0106"] |
116 self._r = host.plugins["XEP-0424"] | |
115 self._pps = host.plugins["XEP-0465"] | 117 self._pps = host.plugins["XEP-0465"] |
116 self._c = host.plugins["PUBSUB_CACHE"] | 118 self._c = host.plugins["PUBSUB_CACHE"] |
117 self._p.addManagedNode( | 119 self._p.addManagedNode( |
118 "", items_cb=self._itemsReceived | 120 "", items_cb=self._itemsReceived |
119 ) | 121 ) |
120 self._t = host.plugins["TEXT_SYNTAXES"] | 122 self._t = host.plugins["TEXT_SYNTAXES"] |
121 self.pubsub_service = APPubsubService(self) | 123 self.pubsub_service = APPubsubService(self) |
124 host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000) | |
125 host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract) | |
122 | 126 |
123 host.bridge.addMethod( | 127 host.bridge.addMethod( |
124 "APSend", | 128 "APSend", |
125 ".plugin", | 129 ".plugin", |
126 in_sign="sss", | 130 in_sign="sss", |
224 reactor.listenSSL(self.http_port, self.server, context_factory) | 228 reactor.listenSSL(self.http_port, self.server, context_factory) |
225 | 229 |
226 async def profileConnecting(self, client): | 230 async def profileConnecting(self, client): |
227 self.client = client | 231 self.client = client |
228 await self.init(client) | 232 await self.init(client) |
229 | |
230 def profileConnected(self, client): | |
231 client.xmlstream.addObserver("/message/body", self.onMessage) | |
232 | 233 |
233 async def _itemsReceived(self, client, itemsEvent): | 234 async def _itemsReceived(self, client, itemsEvent): |
234 """Callback called when pubsub items are received | 235 """Callback called when pubsub items are received |
235 | 236 |
236 if the items are adressed to a JID corresponding to an AP actor, they are | 237 if the items are adressed to a JID corresponding to an AP actor, they are |
870 comment_node = comment_data["node"] | 871 comment_node = comment_data["node"] |
871 await self._p.subscribe(client, comment_service, comment_node) | 872 await self._p.subscribe(client, comment_service, comment_node) |
872 ap_item = await self.mbdata2APitem(client, mb_data) | 873 ap_item = await self.mbdata2APitem(client, mb_data) |
873 url_actor = ap_item["object"]["attributedTo"] | 874 url_actor = ap_item["object"]["attributedTo"] |
874 elif item.name == "retract": | 875 elif item.name == "retract": |
875 author_account = await self.getAPAccountFromJidAndNode(client.jid, node) | 876 url_actor, ap_item = await self.apDeleteItem( |
876 author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account) | 877 client.jid, node, item["id"] |
877 url_item = self.buildAPURL(TYPE_ITEM, author_account, item["id"]) | |
878 ap_item = self.createActivity( | |
879 "Delete", | |
880 author_actor_id, | |
881 { | |
882 "id": url_item, | |
883 "type": TYPE_TOMBSTONE | |
884 } | |
885 ) | 878 ) |
886 ap_item["to"] = [NS_AP_PUBLIC] | |
887 url_actor = author_actor_id | |
888 else: | 879 else: |
889 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") | 880 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") |
890 resp = await self.signAndPost(inbox, url_actor, ap_item) | 881 resp = await self.signAndPost(inbox, url_actor, ap_item) |
891 if resp.code >= 300: | 882 if resp.code >= 300: |
892 text = await resp.text() | 883 text = await resp.text() |
923 body, | 914 body, |
924 headers=headers, | 915 headers=headers, |
925 ) | 916 ) |
926 if resp.code >= 400: | 917 if resp.code >= 400: |
927 text = await resp.text() | 918 text = await resp.text() |
928 log.warning(f"POST request to {url} failed: {text}") | 919 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") |
929 return resp | 920 return resp |
930 | 921 |
931 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): | 922 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): |
932 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore | 923 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore |
933 service = jid.JID(service_s) | 924 service = jid.JID(service_s) |
1551 url_actor = item_data["object"]["attributedTo"] | 1542 url_actor = item_data["object"]["attributedTo"] |
1552 resp = await self.signAndPost(inbox_url, url_actor, item_data) | 1543 resp = await self.signAndPost(inbox_url, url_actor, item_data) |
1553 if resp.code != 202: | 1544 if resp.code != 202: |
1554 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") | 1545 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") |
1555 | 1546 |
1556 @utils.ensure_deferred | 1547 async def apDeleteItem( |
1557 async def onMessage(self, message_elt: domish.Element) -> None: | 1548 self, |
1558 """Called when a XMPP message is received""" | 1549 jid_: jid.JID, |
1550 node: Optional[str], | |
1551 item_id: str, | |
1552 public: bool = True | |
1553 ) -> Tuple[str, Dict[str, Any]]: | |
1554 """Build activity to delete an AP item | |
1555 | |
1556 @param jid_: JID of the entity deleting an item | |
1557 @param node: node where the item is deleted | |
1558 None if it's microblog or a message | |
1559 @param item_id: ID of the item to delete | |
1560 it's the Pubsub ID or message's origin ID | |
1561 @param public: if True, the activity will be addressed to public namespace | |
1562 @return: actor_id of the entity deleting the item, activity to send | |
1563 """ | |
1564 author_account = await self.getAPAccountFromJidAndNode(jid_, node) | |
1565 author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account) | |
1566 url_item = self.buildAPURL(TYPE_ITEM, author_account, item_id) | |
1567 ap_item = self.createActivity( | |
1568 "Delete", | |
1569 author_actor_id, | |
1570 { | |
1571 "id": url_item, | |
1572 "type": TYPE_TOMBSTONE | |
1573 } | |
1574 ) | |
1575 if public: | |
1576 ap_item["to"] = [NS_AP_PUBLIC] | |
1577 url_actor = author_actor_id | |
1578 return url_actor, ap_item | |
1579 | |
1580 def _messageReceivedTrigger( | |
1581 self, | |
1582 client: SatXMPPEntity, | |
1583 message_elt: domish.Element, | |
1584 post_treat: defer.Deferred | |
1585 ) -> bool: | |
1586 """add the gateway workflow on post treatment""" | |
1559 if not self.client: | 1587 if not self.client: |
1560 log.warning(f"no client set, ignoring message: {message_elt.toXml()}") | 1588 log.warning(f"no client set, ignoring message: {message_elt.toXml()}") |
1561 return | 1589 return True |
1562 mess_type = message_elt.getAttribute("type") | 1590 post_treat.addCallback( |
1563 if mess_type and mess_type not in ("chat", "normal"): | 1591 lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data)) |
1564 log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}") | 1592 ) |
1565 return | 1593 return True |
1566 from_jid = jid.JID(message_elt["from"]) | 1594 |
1567 if not self.isLocal(from_jid): | 1595 async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict: |
1568 log.warning(f"ignoring non local message: {message_elt.toXml()}") | 1596 """Called once message has been parsed |
1569 return | 1597 |
1570 to_jid = jid.JID(message_elt["to"]) | 1598 this method handle the conversion to AP items and posting |
1571 if not to_jid.user: | 1599 """ |
1600 if client != self.client: | |
1601 return mess_data | |
1602 if mess_data["type"] not in ("chat", "normal"): | |
1603 log.warning(f"ignoring message with unexpected type: {mess_data['xml'].toXml()}") | |
1604 return mess_data | |
1605 if not self.isLocal(mess_data["from"]): | |
1606 log.warning(f"ignoring non local message: {mess_data['xml'].toXml()}") | |
1607 return mess_data | |
1608 if not mess_data["to"].user: | |
1572 log.warning( | 1609 log.warning( |
1573 f"ignoring message addressed to gateway itself: {message_elt.toXml()}" | 1610 f"ignoring message addressed to gateway itself: {mess_data['xml'].toXml()}" |
1574 ) | 1611 ) |
1575 return | 1612 return mess_data |
1576 | 1613 |
1577 actor_account = self._e.unescape(to_jid.user) | 1614 actor_account = self._e.unescape(mess_data["to"].user) |
1578 actor_id = await self.getAPActorIdFromAccount(actor_account) | 1615 actor_id = await self.getAPActorIdFromAccount(actor_account) |
1579 inbox = await self.getAPInboxFromId(actor_id) | 1616 inbox = await self.getAPInboxFromId(actor_id) |
1580 | 1617 |
1618 try: | |
1619 language, message = next(iter(mess_data["message"].items())) | |
1620 except (KeyError, StopIteration): | |
1621 log.warning(f"ignoring empty message: {mess_data}") | |
1622 return mess_data | |
1623 | |
1581 mb_data = { | 1624 mb_data = { |
1582 "content": str(message_elt.body), | 1625 "content": message, |
1583 } | 1626 } |
1584 client = self.client.getVirtualClient(from_jid) | 1627 if language: |
1628 mb_data["language"] = language | |
1629 origin_id = mess_data["extra"].get("origin_id") | |
1630 if origin_id: | |
1631 # we need to use origin ID when present to be able to retract the message | |
1632 mb_data["id"] = origin_id | |
1633 client = self.client.getVirtualClient(mess_data["from"]) | |
1585 ap_item = await self.mbdata2APitem(client, mb_data, public=False) | 1634 ap_item = await self.mbdata2APitem(client, mb_data, public=False) |
1586 ap_item["object"]["to"] = ap_item["to"] = [actor_id] | 1635 ap_item["object"]["to"] = ap_item["to"] = [actor_id] |
1587 await self.signAndPost(inbox, ap_item["actor"], ap_item) | 1636 await self.signAndPost(inbox, ap_item["actor"], ap_item) |
1637 return mess_data | |
1638 | |
1639 async def _onMessageRetract( | |
1640 self, | |
1641 client: SatXMPPEntity, | |
1642 message_elt: domish.Element, | |
1643 retract_elt: domish.Element, | |
1644 fastened_elts | |
1645 ) -> bool: | |
1646 if client != self.client: | |
1647 return True | |
1648 from_jid = jid.JID(message_elt["from"]) | |
1649 if not self.isLocal(from_jid): | |
1650 log.debug( | |
1651 f"ignoring retract request from non local jid {from_jid}" | |
1652 ) | |
1653 return False | |
1654 to_jid = jid.JID(message_elt["to"]) | |
1655 if (to_jid.host != self.client.jid.full() or not to_jid.user): | |
1656 # to_jid should be a virtual JID from this gateway | |
1657 raise exceptions.InternalError( | |
1658 f"Invalid destinee's JID: {to_jid.full()}" | |
1659 ) | |
1660 ap_account = self._e.unescape(to_jid.user) | |
1661 actor_id = await self.getAPActorIdFromAccount(ap_account) | |
1662 inbox = await self.getAPInboxFromId(actor_id) | |
1663 url_actor, ap_item = await self.apDeleteItem( | |
1664 from_jid.userhostJID(), None, fastened_elts.id, public=False | |
1665 ) | |
1666 resp = await self.signAndPost(inbox, url_actor, ap_item) | |
1667 if resp.code >= 300: | |
1668 text = await resp.text() | |
1669 log.warning( | |
1670 f"unexpected return code while sending AP item: {resp.code}\n{text}\n" | |
1671 f"{pformat(ap_item)}" | |
1672 ) | |
1673 return False | |
1588 | 1674 |
1589 async def newReplyToXMPPItem( | 1675 async def newReplyToXMPPItem( |
1590 self, | 1676 self, |
1591 client: SatXMPPEntity, | 1677 client: SatXMPPEntity, |
1592 ap_item: dict, | 1678 ap_item: dict, |