comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3804:36b167ddbfca

component AP gateway: AP delete activity => message retract: handle retractation of messages. As it is not possible to know from the AP item alone if we need to to a message retractation (XEP-0424) or a pubsub retractation (XEP-0060), we now cache sent message, and decide which method to use according to how the item is cached (i.e. in message history or in pubsub cache). rel 367
author Goffi <goffi@goffi.org>
date Fri, 17 Jun 2022 14:15:23 +0200
parents d5f343939239
children 2032826cfbcf
comparison
equal deleted inserted replaced
3803:d5f343939239 3804:36b167ddbfca
47 from sat.core import exceptions 47 from sat.core import exceptions
48 from sat.core.constants import Const as C 48 from sat.core.constants import Const as C
49 from sat.core.core_types import SatXMPPEntity 49 from sat.core.core_types import SatXMPPEntity
50 from sat.core.i18n import _ 50 from sat.core.i18n import _
51 from sat.core.log import getLogger 51 from sat.core.log import getLogger
52 from sat.memory.sqla_mapping import SubscriptionState 52 from sat.memory.sqla_mapping import SubscriptionState, History
53 from sat.tools import utils 53 from sat.tools import utils
54 from sat.tools.common import data_format, tls, uri 54 from sat.tools.common import data_format, tls, uri
55 from sat.tools.common.async_utils import async_lru 55 from sat.tools.common.async_utils import async_lru
56 56
57 from .constants import ( 57 from .constants import (
227 context_factory = tls.getTLSContextFactory(options) 227 context_factory = tls.getTLSContextFactory(options)
228 reactor.listenSSL(self.http_port, self.server, context_factory) 228 reactor.listenSSL(self.http_port, self.server, context_factory)
229 229
230 async def profileConnecting(self, client): 230 async def profileConnecting(self, client):
231 self.client = client 231 self.client = client
232 client.sendHistory = True
232 await self.init(client) 233 await self.init(client)
233 234
234 async def _itemsReceived(self, client, itemsEvent): 235 async def _itemsReceived(self, client, itemsEvent):
235 """Callback called when pubsub items are received 236 """Callback called when pubsub items are received
236 237
1692 log.warning( 1693 log.warning(
1693 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " 1694 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
1694 f"({in_reply_to!r}):\n{pformat(ap_item)}" 1695 f"({in_reply_to!r}):\n{pformat(ap_item)}"
1695 ) 1696 )
1696 return 1697 return
1697 parent_item_service, parent_item_node = await self.getJIDAndNode(parent_item_account) 1698 parent_item_service, parent_item_node = await self.getJIDAndNode(
1699 parent_item_account
1700 )
1698 if parent_item_node is None: 1701 if parent_item_node is None:
1699 parent_item_node = self._m.namespace 1702 parent_item_node = self._m.namespace
1700 items, __ = await self._p.getItems( 1703 items, __ = await self._p.getItems(
1701 client, parent_item_service, parent_item_node, item_ids=[parent_item_id] 1704 client, parent_item_service, parent_item_node, item_ids=[parent_item_id]
1702 ) 1705 )
1720 raise NotImplemented() 1723 raise NotImplemented()
1721 else: 1724 else:
1722 __, item_elt = await self.apItem2MbDataAndElt(ap_item) 1725 __, item_elt = await self.apItem2MbDataAndElt(ap_item)
1723 await self._p.publish(client, comment_service, comment_node, [item_elt]) 1726 await self._p.publish(client, comment_service, comment_node, [item_elt])
1724 1727
1725 async def newAPItem( 1728 def getAPItemTargets(self, item: Dict[str, Any]) -> Tuple[bool, Set[str], Set[str]]:
1726 self, 1729 """Retrieve targets of an AP item, and indicate if it's a public one
1727 client: SatXMPPEntity, 1730
1728 destinee: Optional[jid.JID],
1729 node: str,
1730 item: dict,
1731 ) -> None:
1732 """Analyse, cache and send notification for received AP item
1733
1734 @param destinee: jid of the destinee,
1735 @param node: XMPP pubsub node
1736 @param item: AP object payload 1731 @param item: AP object payload
1732 @return: Are returned:
1733 - is_public flag, indicating if the item is world-readable
1734 - targets of the item
1735 - targets of the items
1737 """ 1736 """
1738 targets: Set[str] = set() 1737 targets: Set[str] = set()
1739 is_public = False 1738 is_public = False
1740 # TODO: handle "audience" 1739 # TODO: handle "audience"
1741 for key in ("to", "bto", "cc", "bcc"): 1740 for key in ("to", "bto", "cc", "bcc"):
1753 if not self.isLocalURL(value): 1752 if not self.isLocalURL(value):
1754 continue 1753 continue
1755 targets.add(value) 1754 targets.add(value)
1756 1755
1757 targets_types = {self.parseAPURL(t)[0] for t in targets} 1756 targets_types = {self.parseAPURL(t)[0] for t in targets}
1757 return is_public, targets, targets_types
1758
1759 async def newAPItem(
1760 self,
1761 client: SatXMPPEntity,
1762 destinee: Optional[jid.JID],
1763 node: str,
1764 item: dict,
1765 ) -> None:
1766 """Analyse, cache and send notification for received AP item
1767
1768 @param destinee: jid of the destinee,
1769 @param node: XMPP pubsub node
1770 @param item: AP object payload
1771 """
1772 is_public, targets, targets_types = self.getAPItemTargets(item)
1758 if not is_public and targets_types == {TYPE_ACTOR}: 1773 if not is_public and targets_types == {TYPE_ACTOR}:
1759 # this is a direct message 1774 # this is a direct message
1760 await self.handleMessageAPItem( 1775 await self.handleMessageAPItem(
1761 client, targets, destinee, item 1776 client, targets, destinee, item
1762 ) 1777 )
1787 defer_l.append( 1802 defer_l.append(
1788 client.sendMessage( 1803 client.sendMessage(
1789 target_jid, 1804 target_jid,
1790 {'': mb_data.get("content", "")}, 1805 {'': mb_data.get("content", "")},
1791 mb_data.get("title"), 1806 mb_data.get("title"),
1792 1807 extra={"origin_id": mb_data["id"]}
1793 ) 1808 )
1794 ) 1809 )
1795 await defer.DeferredList(defer_l) 1810 await defer.DeferredList(defer_l)
1796 1811
1797 async def handlePubsubAPItem( 1812 async def handlePubsubAPItem(
1881 async def newAPDeleteItem( 1896 async def newAPDeleteItem(
1882 self, 1897 self,
1883 client: SatXMPPEntity, 1898 client: SatXMPPEntity,
1884 destinee: Optional[jid.JID], 1899 destinee: Optional[jid.JID],
1885 node: str, 1900 node: str,
1901 activity: dict,
1886 item: dict, 1902 item: dict,
1887 ) -> None: 1903 ) -> None:
1888 """Analyse, cache and send notification for received AP item 1904 """Analyse, cache and send notification for received AP item
1889 1905
1890 @param destinee: jid of the destinee, 1906 @param destinee: jid of the destinee,
1891 @param node: XMPP pubsub node 1907 @param node: XMPP pubsub node
1908 @param activity: parent AP activity
1892 @param item: AP object payload 1909 @param item: AP object payload
1893 """ 1910 """
1894 item_id = item.get("id") 1911 item_id = item.get("id")
1895 if not item_id: 1912 if not item_id:
1896 raise exceptions.DataError('"id" attribute is missing in item') 1913 raise exceptions.DataError('"id" attribute is missing in item')
1914 if not item_id.startswith("http"):
1915 raise exceptions.DataError(f"invalid id: {item_id!r}")
1897 if self.isLocalURL(item_id): 1916 if self.isLocalURL(item_id):
1898 raise ValueError("Local IDs should not be used") 1917 raise ValueError("Local IDs should not be used")
1899 1918
1900 cached_node = await self.host.memory.storage.getPubsubNode( 1919 # we have no way to know if a deleted item is a direct one (thus a message) or one
1901 client, client.jid, node, with_subscriptions=True 1920 # converted to pubsub. We check if the id is in message history to decide what to
1902 ) 1921 # do.
1903 if cached_node is None: 1922 history = await self.host.memory.storage.get(
1904 log.warning( 1923 client,
1905 f"Received an item retract for node {node!r} at {client.jid} which is " 1924 History,
1906 "not cached" 1925 History.origin_id,
1907 ) 1926 item_id,
1927 (History.messages, History.subjects)
1928 )
1929
1930 if history is not None:
1931 # it's a direct message
1932 if history.source_jid != client.jid:
1933 log.warning(
1934 f"retractation received from an entity ''{client.jid}) which is "
1935 f"not the original sender of the message ({history.source_jid}), "
1936 "hack attemps?"
1937 )
1938 raise exceptions.PermissionError("forbidden")
1939
1940 await self._r.retractByHistory(client, history)
1908 else: 1941 else:
1942 # no history in cache with this ID, it's probably a pubsub item
1943 cached_node = await self.host.memory.storage.getPubsubNode(
1944 client, client.jid, node, with_subscriptions=True
1945 )
1946 if cached_node is None:
1947 log.warning(
1948 f"Received an item retract for node {node!r} at {client.jid} "
1949 "which is not cached"
1950 )
1951 raise exceptions.NotFound
1909 await self.host.memory.storage.deletePubsubItems(cached_node, [item_id]) 1952 await self.host.memory.storage.deletePubsubItems(cached_node, [item_id])
1910 # notifyRetract is expecting domish.Element instances 1953 # notifyRetract is expecting domish.Element instances
1911 item_elt = domish.Element((None, "item")) 1954 item_elt = domish.Element((None, "item"))
1912 item_elt["id"] = item_id 1955 item_elt["id"] = item_id
1913 for subscription in cached_node.subscriptions: 1956 for subscription in cached_node.subscriptions: