Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3804:36b167ddbfca
component AP gateway: AP delete activity => message retract:
handle retractation of messages. As it is not possible to know from the AP item alone if
we need to to a message retractation (XEP-0424) or a pubsub retractation (XEP-0060), we
now cache sent message, and decide which method to use according to how the item is cached
(i.e. in message history or in pubsub cache).
rel 367
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 17 Jun 2022 14:15:23 +0200 |
parents | d5f343939239 |
children | 2032826cfbcf |
comparison
equal
deleted
inserted
replaced
3803:d5f343939239 | 3804:36b167ddbfca |
---|---|
47 from sat.core import exceptions | 47 from sat.core import exceptions |
48 from sat.core.constants import Const as C | 48 from sat.core.constants import Const as C |
49 from sat.core.core_types import SatXMPPEntity | 49 from sat.core.core_types import SatXMPPEntity |
50 from sat.core.i18n import _ | 50 from sat.core.i18n import _ |
51 from sat.core.log import getLogger | 51 from sat.core.log import getLogger |
52 from sat.memory.sqla_mapping import SubscriptionState | 52 from sat.memory.sqla_mapping import SubscriptionState, History |
53 from sat.tools import utils | 53 from sat.tools import utils |
54 from sat.tools.common import data_format, tls, uri | 54 from sat.tools.common import data_format, tls, uri |
55 from sat.tools.common.async_utils import async_lru | 55 from sat.tools.common.async_utils import async_lru |
56 | 56 |
57 from .constants import ( | 57 from .constants import ( |
227 context_factory = tls.getTLSContextFactory(options) | 227 context_factory = tls.getTLSContextFactory(options) |
228 reactor.listenSSL(self.http_port, self.server, context_factory) | 228 reactor.listenSSL(self.http_port, self.server, context_factory) |
229 | 229 |
230 async def profileConnecting(self, client): | 230 async def profileConnecting(self, client): |
231 self.client = client | 231 self.client = client |
232 client.sendHistory = True | |
232 await self.init(client) | 233 await self.init(client) |
233 | 234 |
234 async def _itemsReceived(self, client, itemsEvent): | 235 async def _itemsReceived(self, client, itemsEvent): |
235 """Callback called when pubsub items are received | 236 """Callback called when pubsub items are received |
236 | 237 |
1692 log.warning( | 1693 log.warning( |
1693 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " | 1694 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " |
1694 f"({in_reply_to!r}):\n{pformat(ap_item)}" | 1695 f"({in_reply_to!r}):\n{pformat(ap_item)}" |
1695 ) | 1696 ) |
1696 return | 1697 return |
1697 parent_item_service, parent_item_node = await self.getJIDAndNode(parent_item_account) | 1698 parent_item_service, parent_item_node = await self.getJIDAndNode( |
1699 parent_item_account | |
1700 ) | |
1698 if parent_item_node is None: | 1701 if parent_item_node is None: |
1699 parent_item_node = self._m.namespace | 1702 parent_item_node = self._m.namespace |
1700 items, __ = await self._p.getItems( | 1703 items, __ = await self._p.getItems( |
1701 client, parent_item_service, parent_item_node, item_ids=[parent_item_id] | 1704 client, parent_item_service, parent_item_node, item_ids=[parent_item_id] |
1702 ) | 1705 ) |
1720 raise NotImplemented() | 1723 raise NotImplemented() |
1721 else: | 1724 else: |
1722 __, item_elt = await self.apItem2MbDataAndElt(ap_item) | 1725 __, item_elt = await self.apItem2MbDataAndElt(ap_item) |
1723 await self._p.publish(client, comment_service, comment_node, [item_elt]) | 1726 await self._p.publish(client, comment_service, comment_node, [item_elt]) |
1724 | 1727 |
1725 async def newAPItem( | 1728 def getAPItemTargets(self, item: Dict[str, Any]) -> Tuple[bool, Set[str], Set[str]]: |
1726 self, | 1729 """Retrieve targets of an AP item, and indicate if it's a public one |
1727 client: SatXMPPEntity, | 1730 |
1728 destinee: Optional[jid.JID], | |
1729 node: str, | |
1730 item: dict, | |
1731 ) -> None: | |
1732 """Analyse, cache and send notification for received AP item | |
1733 | |
1734 @param destinee: jid of the destinee, | |
1735 @param node: XMPP pubsub node | |
1736 @param item: AP object payload | 1731 @param item: AP object payload |
1732 @return: Are returned: | |
1733 - is_public flag, indicating if the item is world-readable | |
1734 - targets of the item | |
1735 - targets of the items | |
1737 """ | 1736 """ |
1738 targets: Set[str] = set() | 1737 targets: Set[str] = set() |
1739 is_public = False | 1738 is_public = False |
1740 # TODO: handle "audience" | 1739 # TODO: handle "audience" |
1741 for key in ("to", "bto", "cc", "bcc"): | 1740 for key in ("to", "bto", "cc", "bcc"): |
1753 if not self.isLocalURL(value): | 1752 if not self.isLocalURL(value): |
1754 continue | 1753 continue |
1755 targets.add(value) | 1754 targets.add(value) |
1756 | 1755 |
1757 targets_types = {self.parseAPURL(t)[0] for t in targets} | 1756 targets_types = {self.parseAPURL(t)[0] for t in targets} |
1757 return is_public, targets, targets_types | |
1758 | |
1759 async def newAPItem( | |
1760 self, | |
1761 client: SatXMPPEntity, | |
1762 destinee: Optional[jid.JID], | |
1763 node: str, | |
1764 item: dict, | |
1765 ) -> None: | |
1766 """Analyse, cache and send notification for received AP item | |
1767 | |
1768 @param destinee: jid of the destinee, | |
1769 @param node: XMPP pubsub node | |
1770 @param item: AP object payload | |
1771 """ | |
1772 is_public, targets, targets_types = self.getAPItemTargets(item) | |
1758 if not is_public and targets_types == {TYPE_ACTOR}: | 1773 if not is_public and targets_types == {TYPE_ACTOR}: |
1759 # this is a direct message | 1774 # this is a direct message |
1760 await self.handleMessageAPItem( | 1775 await self.handleMessageAPItem( |
1761 client, targets, destinee, item | 1776 client, targets, destinee, item |
1762 ) | 1777 ) |
1787 defer_l.append( | 1802 defer_l.append( |
1788 client.sendMessage( | 1803 client.sendMessage( |
1789 target_jid, | 1804 target_jid, |
1790 {'': mb_data.get("content", "")}, | 1805 {'': mb_data.get("content", "")}, |
1791 mb_data.get("title"), | 1806 mb_data.get("title"), |
1792 | 1807 extra={"origin_id": mb_data["id"]} |
1793 ) | 1808 ) |
1794 ) | 1809 ) |
1795 await defer.DeferredList(defer_l) | 1810 await defer.DeferredList(defer_l) |
1796 | 1811 |
1797 async def handlePubsubAPItem( | 1812 async def handlePubsubAPItem( |
1881 async def newAPDeleteItem( | 1896 async def newAPDeleteItem( |
1882 self, | 1897 self, |
1883 client: SatXMPPEntity, | 1898 client: SatXMPPEntity, |
1884 destinee: Optional[jid.JID], | 1899 destinee: Optional[jid.JID], |
1885 node: str, | 1900 node: str, |
1901 activity: dict, | |
1886 item: dict, | 1902 item: dict, |
1887 ) -> None: | 1903 ) -> None: |
1888 """Analyse, cache and send notification for received AP item | 1904 """Analyse, cache and send notification for received AP item |
1889 | 1905 |
1890 @param destinee: jid of the destinee, | 1906 @param destinee: jid of the destinee, |
1891 @param node: XMPP pubsub node | 1907 @param node: XMPP pubsub node |
1908 @param activity: parent AP activity | |
1892 @param item: AP object payload | 1909 @param item: AP object payload |
1893 """ | 1910 """ |
1894 item_id = item.get("id") | 1911 item_id = item.get("id") |
1895 if not item_id: | 1912 if not item_id: |
1896 raise exceptions.DataError('"id" attribute is missing in item') | 1913 raise exceptions.DataError('"id" attribute is missing in item') |
1914 if not item_id.startswith("http"): | |
1915 raise exceptions.DataError(f"invalid id: {item_id!r}") | |
1897 if self.isLocalURL(item_id): | 1916 if self.isLocalURL(item_id): |
1898 raise ValueError("Local IDs should not be used") | 1917 raise ValueError("Local IDs should not be used") |
1899 | 1918 |
1900 cached_node = await self.host.memory.storage.getPubsubNode( | 1919 # we have no way to know if a deleted item is a direct one (thus a message) or one |
1901 client, client.jid, node, with_subscriptions=True | 1920 # converted to pubsub. We check if the id is in message history to decide what to |
1902 ) | 1921 # do. |
1903 if cached_node is None: | 1922 history = await self.host.memory.storage.get( |
1904 log.warning( | 1923 client, |
1905 f"Received an item retract for node {node!r} at {client.jid} which is " | 1924 History, |
1906 "not cached" | 1925 History.origin_id, |
1907 ) | 1926 item_id, |
1927 (History.messages, History.subjects) | |
1928 ) | |
1929 | |
1930 if history is not None: | |
1931 # it's a direct message | |
1932 if history.source_jid != client.jid: | |
1933 log.warning( | |
1934 f"retractation received from an entity ''{client.jid}) which is " | |
1935 f"not the original sender of the message ({history.source_jid}), " | |
1936 "hack attemps?" | |
1937 ) | |
1938 raise exceptions.PermissionError("forbidden") | |
1939 | |
1940 await self._r.retractByHistory(client, history) | |
1908 else: | 1941 else: |
1942 # no history in cache with this ID, it's probably a pubsub item | |
1943 cached_node = await self.host.memory.storage.getPubsubNode( | |
1944 client, client.jid, node, with_subscriptions=True | |
1945 ) | |
1946 if cached_node is None: | |
1947 log.warning( | |
1948 f"Received an item retract for node {node!r} at {client.jid} " | |
1949 "which is not cached" | |
1950 ) | |
1951 raise exceptions.NotFound | |
1909 await self.host.memory.storage.deletePubsubItems(cached_node, [item_id]) | 1952 await self.host.memory.storage.deletePubsubItems(cached_node, [item_id]) |
1910 # notifyRetract is expecting domish.Element instances | 1953 # notifyRetract is expecting domish.Element instances |
1911 item_elt = domish.Element((None, "item")) | 1954 item_elt = domish.Element((None, "item")) |
1912 item_elt["id"] = item_id | 1955 item_elt["id"] = item_id |
1913 for subscription in cached_node.subscriptions: | 1956 for subscription in cached_node.subscriptions: |