comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3803:d5f343939239

component AP gateway: message retractation => AP deletion Convert XEP-0424 message retractation request to suitable AP delete activity. The message workflow and its triggers are now used instead of a direct observer, as it is now possible to do so with component, and this let other plugin to parse and eventually update metadata. rel 367
author Goffi <goffi@goffi.org>
date Fri, 17 Jun 2022 14:15:23 +0200
parents b5c9021020df
children 36b167ddbfca
comparison
equal deleted inserted replaced
3802:983df907d456 3803:d5f343939239
85 C.PI_IMPORT_NAME: IMPORT_NAME, 85 C.PI_IMPORT_NAME: IMPORT_NAME,
86 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 86 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
87 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 87 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
88 C.PI_PROTOCOLS: [], 88 C.PI_PROTOCOLS: [],
89 C.PI_DEPENDENCIES: [ 89 C.PI_DEPENDENCIES: [
90 "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES" 90 "XEP-0060", "XEP-0106", "XEP-0277", "XEP-0329", "XEP-0424", "XEP-0465",
91 "PUBSUB_CACHE", "TEXT_SYNTAXES"
91 ], 92 ],
92 C.PI_RECOMMENDATIONS: [], 93 C.PI_RECOMMENDATIONS: [],
93 C.PI_MAIN: "APGateway", 94 C.PI_MAIN: "APGateway",
94 C.PI_HANDLER: C.BOOL_TRUE, 95 C.PI_HANDLER: C.BOOL_TRUE,
95 C.PI_DESCRIPTION: _( 96 C.PI_DESCRIPTION: _(
110 self.initialised = False 111 self.initialised = False
111 self.client = None 112 self.client = None
112 self._m = host.plugins["XEP-0277"] 113 self._m = host.plugins["XEP-0277"]
113 self._p = host.plugins["XEP-0060"] 114 self._p = host.plugins["XEP-0060"]
114 self._e = host.plugins["XEP-0106"] 115 self._e = host.plugins["XEP-0106"]
116 self._r = host.plugins["XEP-0424"]
115 self._pps = host.plugins["XEP-0465"] 117 self._pps = host.plugins["XEP-0465"]
116 self._c = host.plugins["PUBSUB_CACHE"] 118 self._c = host.plugins["PUBSUB_CACHE"]
117 self._p.addManagedNode( 119 self._p.addManagedNode(
118 "", items_cb=self._itemsReceived 120 "", items_cb=self._itemsReceived
119 ) 121 )
120 self._t = host.plugins["TEXT_SYNTAXES"] 122 self._t = host.plugins["TEXT_SYNTAXES"]
121 self.pubsub_service = APPubsubService(self) 123 self.pubsub_service = APPubsubService(self)
124 host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000)
125 host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract)
122 126
123 host.bridge.addMethod( 127 host.bridge.addMethod(
124 "APSend", 128 "APSend",
125 ".plugin", 129 ".plugin",
126 in_sign="sss", 130 in_sign="sss",
224 reactor.listenSSL(self.http_port, self.server, context_factory) 228 reactor.listenSSL(self.http_port, self.server, context_factory)
225 229
226 async def profileConnecting(self, client): 230 async def profileConnecting(self, client):
227 self.client = client 231 self.client = client
228 await self.init(client) 232 await self.init(client)
229
230 def profileConnected(self, client):
231 client.xmlstream.addObserver("/message/body", self.onMessage)
232 233
233 async def _itemsReceived(self, client, itemsEvent): 234 async def _itemsReceived(self, client, itemsEvent):
234 """Callback called when pubsub items are received 235 """Callback called when pubsub items are received
235 236
236 if the items are adressed to a JID corresponding to an AP actor, they are 237 if the items are adressed to a JID corresponding to an AP actor, they are
870 comment_node = comment_data["node"] 871 comment_node = comment_data["node"]
871 await self._p.subscribe(client, comment_service, comment_node) 872 await self._p.subscribe(client, comment_service, comment_node)
872 ap_item = await self.mbdata2APitem(client, mb_data) 873 ap_item = await self.mbdata2APitem(client, mb_data)
873 url_actor = ap_item["object"]["attributedTo"] 874 url_actor = ap_item["object"]["attributedTo"]
874 elif item.name == "retract": 875 elif item.name == "retract":
875 author_account = await self.getAPAccountFromJidAndNode(client.jid, node) 876 url_actor, ap_item = await self.apDeleteItem(
876 author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account) 877 client.jid, node, item["id"]
877 url_item = self.buildAPURL(TYPE_ITEM, author_account, item["id"])
878 ap_item = self.createActivity(
879 "Delete",
880 author_actor_id,
881 {
882 "id": url_item,
883 "type": TYPE_TOMBSTONE
884 }
885 ) 878 )
886 ap_item["to"] = [NS_AP_PUBLIC]
887 url_actor = author_actor_id
888 else: 879 else:
889 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") 880 raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
890 resp = await self.signAndPost(inbox, url_actor, ap_item) 881 resp = await self.signAndPost(inbox, url_actor, ap_item)
891 if resp.code >= 300: 882 if resp.code >= 300:
892 text = await resp.text() 883 text = await resp.text()
923 body, 914 body,
924 headers=headers, 915 headers=headers,
925 ) 916 )
926 if resp.code >= 400: 917 if resp.code >= 400:
927 text = await resp.text() 918 text = await resp.text()
928 log.warning(f"POST request to {url} failed: {text}") 919 log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
929 return resp 920 return resp
930 921
931 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): 922 def _publishMessage(self, mess_data_s: str, service_s: str, profile: str):
932 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore 923 mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore
933 service = jid.JID(service_s) 924 service = jid.JID(service_s)
1551 url_actor = item_data["object"]["attributedTo"] 1542 url_actor = item_data["object"]["attributedTo"]
1552 resp = await self.signAndPost(inbox_url, url_actor, item_data) 1543 resp = await self.signAndPost(inbox_url, url_actor, item_data)
1553 if resp.code != 202: 1544 if resp.code != 202:
1554 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") 1545 raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
1555 1546
1556 @utils.ensure_deferred 1547 async def apDeleteItem(
1557 async def onMessage(self, message_elt: domish.Element) -> None: 1548 self,
1558 """Called when a XMPP message is received""" 1549 jid_: jid.JID,
1550 node: Optional[str],
1551 item_id: str,
1552 public: bool = True
1553 ) -> Tuple[str, Dict[str, Any]]:
1554 """Build activity to delete an AP item
1555
1556 @param jid_: JID of the entity deleting an item
1557 @param node: node where the item is deleted
1558 None if it's microblog or a message
1559 @param item_id: ID of the item to delete
1560 it's the Pubsub ID or message's origin ID
1561 @param public: if True, the activity will be addressed to public namespace
1562 @return: actor_id of the entity deleting the item, activity to send
1563 """
1564 author_account = await self.getAPAccountFromJidAndNode(jid_, node)
1565 author_actor_id = self.buildAPURL(TYPE_ACTOR, author_account)
1566 url_item = self.buildAPURL(TYPE_ITEM, author_account, item_id)
1567 ap_item = self.createActivity(
1568 "Delete",
1569 author_actor_id,
1570 {
1571 "id": url_item,
1572 "type": TYPE_TOMBSTONE
1573 }
1574 )
1575 if public:
1576 ap_item["to"] = [NS_AP_PUBLIC]
1577 url_actor = author_actor_id
1578 return url_actor, ap_item
1579
1580 def _messageReceivedTrigger(
1581 self,
1582 client: SatXMPPEntity,
1583 message_elt: domish.Element,
1584 post_treat: defer.Deferred
1585 ) -> bool:
1586 """add the gateway workflow on post treatment"""
1559 if not self.client: 1587 if not self.client:
1560 log.warning(f"no client set, ignoring message: {message_elt.toXml()}") 1588 log.warning(f"no client set, ignoring message: {message_elt.toXml()}")
1561 return 1589 return True
1562 mess_type = message_elt.getAttribute("type") 1590 post_treat.addCallback(
1563 if mess_type and mess_type not in ("chat", "normal"): 1591 lambda mess_data: defer.ensureDeferred(self.onMessage(client, mess_data))
1564 log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}") 1592 )
1565 return 1593 return True
1566 from_jid = jid.JID(message_elt["from"]) 1594
1567 if not self.isLocal(from_jid): 1595 async def onMessage(self, client: SatXMPPEntity, mess_data: dict) -> dict:
1568 log.warning(f"ignoring non local message: {message_elt.toXml()}") 1596 """Called once message has been parsed
1569 return 1597
1570 to_jid = jid.JID(message_elt["to"]) 1598 this method handle the conversion to AP items and posting
1571 if not to_jid.user: 1599 """
1600 if client != self.client:
1601 return mess_data
1602 if mess_data["type"] not in ("chat", "normal"):
1603 log.warning(f"ignoring message with unexpected type: {mess_data['xml'].toXml()}")
1604 return mess_data
1605 if not self.isLocal(mess_data["from"]):
1606 log.warning(f"ignoring non local message: {mess_data['xml'].toXml()}")
1607 return mess_data
1608 if not mess_data["to"].user:
1572 log.warning( 1609 log.warning(
1573 f"ignoring message addressed to gateway itself: {message_elt.toXml()}" 1610 f"ignoring message addressed to gateway itself: {mess_data['xml'].toXml()}"
1574 ) 1611 )
1575 return 1612 return mess_data
1576 1613
1577 actor_account = self._e.unescape(to_jid.user) 1614 actor_account = self._e.unescape(mess_data["to"].user)
1578 actor_id = await self.getAPActorIdFromAccount(actor_account) 1615 actor_id = await self.getAPActorIdFromAccount(actor_account)
1579 inbox = await self.getAPInboxFromId(actor_id) 1616 inbox = await self.getAPInboxFromId(actor_id)
1580 1617
1618 try:
1619 language, message = next(iter(mess_data["message"].items()))
1620 except (KeyError, StopIteration):
1621 log.warning(f"ignoring empty message: {mess_data}")
1622 return mess_data
1623
1581 mb_data = { 1624 mb_data = {
1582 "content": str(message_elt.body), 1625 "content": message,
1583 } 1626 }
1584 client = self.client.getVirtualClient(from_jid) 1627 if language:
1628 mb_data["language"] = language
1629 origin_id = mess_data["extra"].get("origin_id")
1630 if origin_id:
1631 # we need to use origin ID when present to be able to retract the message
1632 mb_data["id"] = origin_id
1633 client = self.client.getVirtualClient(mess_data["from"])
1585 ap_item = await self.mbdata2APitem(client, mb_data, public=False) 1634 ap_item = await self.mbdata2APitem(client, mb_data, public=False)
1586 ap_item["object"]["to"] = ap_item["to"] = [actor_id] 1635 ap_item["object"]["to"] = ap_item["to"] = [actor_id]
1587 await self.signAndPost(inbox, ap_item["actor"], ap_item) 1636 await self.signAndPost(inbox, ap_item["actor"], ap_item)
1637 return mess_data
1638
1639 async def _onMessageRetract(
1640 self,
1641 client: SatXMPPEntity,
1642 message_elt: domish.Element,
1643 retract_elt: domish.Element,
1644 fastened_elts
1645 ) -> bool:
1646 if client != self.client:
1647 return True
1648 from_jid = jid.JID(message_elt["from"])
1649 if not self.isLocal(from_jid):
1650 log.debug(
1651 f"ignoring retract request from non local jid {from_jid}"
1652 )
1653 return False
1654 to_jid = jid.JID(message_elt["to"])
1655 if (to_jid.host != self.client.jid.full() or not to_jid.user):
1656 # to_jid should be a virtual JID from this gateway
1657 raise exceptions.InternalError(
1658 f"Invalid destinee's JID: {to_jid.full()}"
1659 )
1660 ap_account = self._e.unescape(to_jid.user)
1661 actor_id = await self.getAPActorIdFromAccount(ap_account)
1662 inbox = await self.getAPInboxFromId(actor_id)
1663 url_actor, ap_item = await self.apDeleteItem(
1664 from_jid.userhostJID(), None, fastened_elts.id, public=False
1665 )
1666 resp = await self.signAndPost(inbox, url_actor, ap_item)
1667 if resp.code >= 300:
1668 text = await resp.text()
1669 log.warning(
1670 f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
1671 f"{pformat(ap_item)}"
1672 )
1673 return False
1588 1674
1589 async def newReplyToXMPPItem( 1675 async def newReplyToXMPPItem(
1590 self, 1676 self,
1591 client: SatXMPPEntity, 1677 client: SatXMPPEntity,
1592 ap_item: dict, 1678 ap_item: dict,