Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3865:59fbb66b2923
component AP gateway: handle XMPP attachments -> AP likes conversion:
convert `noticed` attachments coming from XMPP to suitable `Like` activities. Virtual node
is created in cache when necessary, and virtual items published to virtual JID mapping AP
accounts are cached too.
rel 370
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 20 Jul 2022 17:53:12 +0200 |
parents | bc7f9d0a404f |
children | c0bcbcf5b4b7 |
comparison
equal
deleted
inserted
replaced
3864:ac255a0fbd4c | 3865:59fbb66b2923 |
---|---|
68 TYPE_ACTOR, | 68 TYPE_ACTOR, |
69 TYPE_ITEM, | 69 TYPE_ITEM, |
70 TYPE_FOLLOWERS, | 70 TYPE_FOLLOWERS, |
71 TYPE_TOMBSTONE, | 71 TYPE_TOMBSTONE, |
72 TYPE_MENTION, | 72 TYPE_MENTION, |
73 TYPE_LIKE, | |
73 NS_AP, | 74 NS_AP, |
74 NS_AP_PUBLIC, | 75 NS_AP_PUBLIC, |
75 PUBLIC_TUPLE | 76 PUBLIC_TUPLE |
76 ) | 77 ) |
77 from .regex import RE_MENTION | 78 from .regex import RE_MENTION |
88 C.PI_IMPORT_NAME: IMPORT_NAME, | 89 C.PI_IMPORT_NAME: IMPORT_NAME, |
89 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | 90 C.PI_MODES: [C.PLUG_MODE_COMPONENT], |
90 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | 91 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, |
91 C.PI_PROTOCOLS: [], | 92 C.PI_PROTOCOLS: [], |
92 C.PI_DEPENDENCIES: [ | 93 C.PI_DEPENDENCIES: [ |
93 "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", "XEP-0329", | 94 "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", |
94 "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", | 95 "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES", |
95 "XEP-0054" | 96 "IDENTITY", "PUBSUB_ATTACHMENTS" |
96 ], | 97 ], |
97 C.PI_RECOMMENDATIONS: [], | 98 C.PI_RECOMMENDATIONS: [], |
98 C.PI_MAIN: "APGateway", | 99 C.PI_MAIN: "APGateway", |
99 C.PI_HANDLER: C.BOOL_TRUE, | 100 C.PI_HANDLER: C.BOOL_TRUE, |
100 C.PI_DESCRIPTION: _( | 101 C.PI_DESCRIPTION: _( |
129 self._r = host.plugins["XEP-0424"] | 130 self._r = host.plugins["XEP-0424"] |
130 self._pps = host.plugins["XEP-0465"] | 131 self._pps = host.plugins["XEP-0465"] |
131 self._c = host.plugins["PUBSUB_CACHE"] | 132 self._c = host.plugins["PUBSUB_CACHE"] |
132 self._t = host.plugins["TEXT_SYNTAXES"] | 133 self._t = host.plugins["TEXT_SYNTAXES"] |
133 self._i = host.plugins["IDENTITY"] | 134 self._i = host.plugins["IDENTITY"] |
135 self._pa = host.plugins["PUBSUB_ATTACHMENTS"] | |
134 self._p.addManagedNode( | 136 self._p.addManagedNode( |
135 "", | 137 "", |
136 items_cb=self._itemsReceived, | 138 items_cb=self._itemsReceived, |
137 # we want to be sure that the callbacks are launched before pubsub cache's | 139 # we want to be sure that the callbacks are launched before pubsub cache's |
138 # one, as we need to inspect items before they are actually removed from cache | 140 # one, as we need to inspect items before they are actually removed from cache |
285 if not recipient.user: | 287 if not recipient.user: |
286 log.debug("ignoring items event without local part specified") | 288 log.debug("ignoring items event without local part specified") |
287 return | 289 return |
288 | 290 |
289 ap_account = self._e.unescape(recipient.user) | 291 ap_account = self._e.unescape(recipient.user) |
290 await self.convertAndPostItems( | 292 |
291 client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items | 293 if self._pa.isAttachmentNode(itemsEvent.nodeIdentifier): |
292 ) | 294 await self.convertAndPostAttachments( |
295 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, | |
296 itemsEvent.items, publisher=itemsEvent.sender | |
297 ) | |
298 else: | |
299 await self.convertAndPostItems( | |
300 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier, | |
301 itemsEvent.items | |
302 ) | |
293 | 303 |
294 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: | 304 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: |
295 """Get client for this component with a specified jid | 305 """Get client for this component with a specified jid |
296 | 306 |
297 This is needed to perform operations with the virtual JID corresponding to the AP | 307 This is needed to perform operations with the virtual JID corresponding to the AP |
404 else: | 414 else: |
405 found_item = cached_items[0].data | 415 found_item = cached_items[0].data |
406 | 416 |
407 if found_item is None: | 417 if found_item is None: |
408 # the node is not in cache, we have to make a request to retrieve the item | 418 # the node is not in cache, we have to make a request to retrieve the item |
409 # if doesn't exist, getItems will raise a NotFound exception | 419 # If the node doesn't exist, getItems will raise a NotFound exception |
410 found_items, __ = await self._p.getItems( | 420 found_items, __ = await self._p.getItems( |
411 self.client, author_jid, node, item_ids=[item_id] | 421 self.client, author_jid, node, item_ids=[item_id] |
412 ) | 422 ) |
413 found_item = found_items[0] | 423 try: |
424 found_item = found_items[0] | |
425 except KeyError: | |
426 raise exceptions.NotFound("requested items can't be found") | |
414 | 427 |
415 mb_data = await self._m.item2mbdata( | 428 mb_data = await self._m.item2mbdata( |
416 self.client, found_item, author_jid, node | 429 self.client, found_item, author_jid, node |
417 ) | 430 ) |
418 ap_item = await self.mbdata2APitem(self.client, mb_data) | 431 ap_item = await self.mbdata2APitem(self.client, mb_data) |
832 **kwargs, | 845 **kwargs, |
833 ) -> Dict[str, Any]: | 846 ) -> Dict[str, Any]: |
834 """Generate base data for an activity | 847 """Generate base data for an activity |
835 | 848 |
836 @param activity: one of ACTIVITY_TYPES | 849 @param activity: one of ACTIVITY_TYPES |
850 @param actor_id: AP actor ID of the sender | |
851 @param object_: content of "object" field | |
852 @param target: content of "target" field | |
853 @param activity_id: ID to use for the activity | |
854 if not set it will be automatically generated, but it is usually desirable to | |
855 set the ID manually so it can be retrieved (e.g. for Undo) | |
837 """ | 856 """ |
838 if activity not in ACTIVITY_TYPES: | 857 if activity not in ACTIVITY_TYPES: |
839 raise exceptions.InternalError(f"invalid activity: {activity!r}") | 858 raise exceptions.InternalError(f"invalid activity: {activity!r}") |
840 if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: | 859 if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: |
841 raise exceptions.InternalError( | 860 raise exceptions.InternalError( |
946 client: SatXMPPEntity, | 965 client: SatXMPPEntity, |
947 ap_account: str, | 966 ap_account: str, |
948 service: jid.JID, | 967 service: jid.JID, |
949 node: str, | 968 node: str, |
950 items: List[domish.Element], | 969 items: List[domish.Element], |
951 subscribe_comments_nodes: bool = False, | 970 subscribe_extra_nodes: bool = True, |
952 ) -> None: | 971 ) -> None: |
953 """Convert XMPP items to AP items and post them to actor inbox | 972 """Convert XMPP items to AP items and post them to actor inbox |
954 | 973 |
955 @param ap_account: account of ActivityPub actor | 974 @param ap_account: account of ActivityPub actor receiving the item |
956 @param service: JID of the virtual pubsub service corresponding to the AP actor | 975 @param service: JID of the (virtual) pubsub service where the item has been |
957 @param node: virtual node corresponding to the AP actor and items | 976 published |
958 @param subscribe_comments_nodes: if True, comment nodes present in given items, | 977 @param node: (virtual) node corresponding where the item has been published |
959 they will be automatically subscribed | 978 @param subscribe_extra_nodes: if True, extra data nodes will be automatically |
979 subscribed, that is comment nodes if present and attachments nodes. | |
960 """ | 980 """ |
961 actor_id = await self.getAPActorIdFromAccount(ap_account) | 981 actor_id = await self.getAPActorIdFromAccount(ap_account) |
962 inbox = await self.getAPInboxFromId(actor_id) | 982 inbox = await self.getAPInboxFromId(actor_id) |
963 for item in items: | 983 for item in items: |
964 if item.name == "item": | 984 if item.name == "item": |
965 mb_data = await self._m.item2mbdata(client, item, service, node) | 985 mb_data = await self._m.item2mbdata(client, item, service, node) |
966 if subscribe_comments_nodes: | 986 if subscribe_extra_nodes: |
967 # we subscribe automatically to comment nodes if any | 987 # we subscribe automatically to comment nodes if any |
988 recipient_jid = self.getLocalJIDFromAccount(ap_account) | |
989 recipient_client = self.client.getVirtualClient(recipient_jid) | |
968 for comment_data in mb_data.get("comments", []): | 990 for comment_data in mb_data.get("comments", []): |
969 comment_service = jid.JID(comment_data["service"]) | 991 comment_service = jid.JID(comment_data["service"]) |
970 comment_node = comment_data["node"] | 992 comment_node = comment_data["node"] |
971 await self._p.subscribe(client, comment_service, comment_node) | 993 await self._p.subscribe( |
994 recipient_client, comment_service, comment_node | |
995 ) | |
996 try: | |
997 await self._pa.subscribe( | |
998 recipient_client, service, node, mb_data["id"] | |
999 ) | |
1000 except exceptions.NotFound: | |
1001 log.debug( | |
1002 f"no attachment node found for item {mb_data['id']!r} on " | |
1003 f"{node!r} at {service}" | |
1004 ) | |
972 ap_item = await self.mbdata2APitem(client, mb_data) | 1005 ap_item = await self.mbdata2APitem(client, mb_data) |
973 url_actor = ap_item["actor"] | 1006 url_actor = ap_item["actor"] |
974 elif item.name == "retract": | 1007 elif item.name == "retract": |
975 url_actor, ap_item = await self.apDeleteItem( | 1008 url_actor, ap_item = await self.apDeleteItem( |
976 client.jid, node, item["id"] | 1009 client.jid, node, item["id"] |
977 ) | 1010 ) |
978 else: | 1011 else: |
979 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") | 1012 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") |
980 resp = await self.signAndPost(inbox, url_actor, ap_item) | 1013 resp = await self.signAndPost(inbox, url_actor, ap_item) |
981 if resp.code >= 300: | 1014 |
982 text = await resp.text() | 1015 async def convertAndPostAttachments( |
1016 self, | |
1017 client: SatXMPPEntity, | |
1018 ap_account: str, | |
1019 service: jid.JID, | |
1020 node: str, | |
1021 items: List[domish.Element], | |
1022 publisher: Optional[jid.JID] | |
1023 ) -> None: | |
1024 """Convert XMPP item attachments to AP activities and post them to actor inbox | |
1025 | |
1026 @param ap_account: account of ActivityPub actor receiving the item | |
1027 @param service: JID of the (virtual) pubsub service where the item has been | |
1028 published | |
1029 @param node: (virtual) node corresponding where the item has been published | |
1030 @param subscribe_extra_nodes: if True, extra data nodes will be automatically | |
1031 subscribed, that is comment nodes if present and attachments nodes. | |
1032 """ | |
1033 if len(items) != 1: | |
1034 log.warning( | |
1035 "we should get exactly one attachment item for an entity, got " | |
1036 f"{len(items)})" | |
1037 ) | |
1038 | |
1039 actor_id = await self.getAPActorIdFromAccount(ap_account) | |
1040 inbox = await self.getAPInboxFromId(actor_id) | |
1041 | |
1042 item_elt = items[0] | |
1043 if publisher is not None: | |
1044 item_elt["publisher"] = publisher.userhost() | |
1045 item_service, item_node, item_id = self._pa.attachmentNode2Item(node) | |
1046 item_account = await self.getAPAccountFromJidAndNode(item_service, item_node) | |
1047 if item_service.host == self.client.jid.userhost(): | |
1048 # it's a virtual JID mapping to an external AP actor, we can use the | |
1049 # item_id directly | |
1050 item_url = item_id | |
1051 if not item_url.startswith("https:"): | |
983 log.warning( | 1052 log.warning( |
984 f"unexpected return code while sending AP item: {resp.code}\n{text}\n" | 1053 "item ID of external AP actor is not an https link, ignoring: " |
985 f"{pformat(ap_item)}" | 1054 f"{item_id!r}" |
986 ) | 1055 ) |
1056 return | |
1057 else: | |
1058 item_url = self.buildAPURL(TYPE_ITEM, item_account, item_id) | |
1059 | |
1060 old_attachment_pubsub_items = await self.host.memory.storage.searchPubsubItems({ | |
1061 "profiles": [self.client.profile], | |
1062 "services": [service], | |
1063 "names": [item_elt["id"]] | |
1064 }) | |
1065 if not old_attachment_pubsub_items: | |
1066 old_attachment = {} | |
1067 else: | |
1068 old_attachment_items = [i.data for i in old_attachment_pubsub_items] | |
1069 old_attachments = self._pa.items2attachmentData(client, old_attachment_items) | |
1070 try: | |
1071 old_attachment = old_attachments[0] | |
1072 except IndexError: | |
1073 # no known element was present in attachments | |
1074 old_attachment = {} | |
1075 sender_account = await self.getAPAccountFromJidAndNode( | |
1076 client.jid, | |
1077 None | |
1078 ) | |
1079 sender_actor_id = self.buildAPURL(TYPE_ACTOR, sender_account) | |
1080 try: | |
1081 attachments = self._pa.items2attachmentData(client, [item_elt])[0] | |
1082 except IndexError: | |
1083 # no known element was present in attachments | |
1084 attachments = {} | |
1085 | |
1086 if "noticed" in attachments: | |
1087 if not "noticed" in old_attachment: | |
1088 # new "noticed" attachment, we translate to "Like" activity | |
1089 activity_id = self.buildAPURL("like", item_account, item_id) | |
1090 like = self.createActivity( | |
1091 TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id | |
1092 ) | |
1093 like["to"] = [NS_AP_PUBLIC] | |
1094 await self.signAndPost(inbox, sender_actor_id, like) | |
1095 else: | |
1096 if "noticed" in old_attachment: | |
1097 # "noticed" attachment has been removed, we undo the "Like" activity | |
1098 activity_id = self.buildAPURL("like", item_account, item_id) | |
1099 like = self.createActivity( | |
1100 TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id | |
1101 ) | |
1102 like["to"] = [NS_AP_PUBLIC] | |
1103 undo = self.createActivity("Undo", sender_actor_id, like) | |
1104 await self.signAndPost(inbox, sender_actor_id, undo) | |
1105 | |
1106 if service.user and service.host == self.client.jid.userhost(): | |
1107 # the item is on a virtual service, we need to store it in cache | |
1108 log.debug("storing attachments item in cache") | |
1109 cached_node = await self.host.memory.storage.getPubsubNode( | |
1110 client, service, node, with_subscriptions=True, create=True | |
1111 ) | |
1112 await self.host.memory.storage.cachePubsubItems( | |
1113 self.client, | |
1114 cached_node, | |
1115 [item_elt], | |
1116 [attachments] | |
1117 ) | |
987 | 1118 |
988 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: | 1119 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: |
989 """Sign a documentent and post it to AP server | 1120 """Sign a documentent and post it to AP server |
990 | 1121 |
991 @param url: AP server endpoint | 1122 @param url: AP server endpoint |
1018 resp = await treq.post( | 1149 resp = await treq.post( |
1019 url, | 1150 url, |
1020 body, | 1151 body, |
1021 headers=headers, | 1152 headers=headers, |
1022 ) | 1153 ) |
1023 if resp.code >= 400: | 1154 if resp.code >= 300: |
1024 text = await resp.text() | 1155 text = await resp.text() |
1025 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") | 1156 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") |
1026 elif self.verbose: | 1157 elif self.verbose: |
1027 log.info(f"==> response code: {resp.code}") | 1158 log.info(f"==> response code: {resp.code}") |
1028 return resp | 1159 return resp |
1692 "node or service is missing in mb_data" | 1823 "node or service is missing in mb_data" |
1693 ) | 1824 ) |
1694 target_ap_account = await self.getAPAccountFromJidAndNode( | 1825 target_ap_account = await self.getAPAccountFromJidAndNode( |
1695 service, node | 1826 service, node |
1696 ) | 1827 ) |
1697 if service.host == self.client.jid.userhost: | 1828 if service.host == self.client.jid.userhost(): |
1698 # service is a proxy JID for AP account | 1829 # service is a proxy JID for AP account |
1699 actor_data = await self.getAPActorDataFromAccount(target_ap_account) | 1830 actor_data = await self.getAPActorDataFromAccount(target_ap_account) |
1700 followers = actor_data.get("followers") | 1831 followers = actor_data.get("followers") |
1701 else: | 1832 else: |
1702 # service is a real XMPP entity | 1833 # service is a real XMPP entity |
1756 raise exceptions.DataError("Can't get ActivityPub actor inbox") | 1887 raise exceptions.DataError("Can't get ActivityPub actor inbox") |
1757 | 1888 |
1758 item_data = await self.mbdata2APitem(client, mess_data) | 1889 item_data = await self.mbdata2APitem(client, mess_data) |
1759 url_actor = item_data["actor"] | 1890 url_actor = item_data["actor"] |
1760 resp = await self.signAndPost(inbox_url, url_actor, item_data) | 1891 resp = await self.signAndPost(inbox_url, url_actor, item_data) |
1761 if resp.code != 202: | |
1762 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") | |
1763 | 1892 |
1764 async def apDeleteItem( | 1893 async def apDeleteItem( |
1765 self, | 1894 self, |
1766 jid_: jid.JID, | 1895 jid_: jid.JID, |
1767 node: Optional[str], | 1896 node: Optional[str], |
1907 inbox = await self.getAPInboxFromId(actor_id) | 2036 inbox = await self.getAPInboxFromId(actor_id) |
1908 url_actor, ap_item = await self.apDeleteItem( | 2037 url_actor, ap_item = await self.apDeleteItem( |
1909 from_jid.userhostJID(), None, fastened_elts.id, public=False | 2038 from_jid.userhostJID(), None, fastened_elts.id, public=False |
1910 ) | 2039 ) |
1911 resp = await self.signAndPost(inbox, url_actor, ap_item) | 2040 resp = await self.signAndPost(inbox, url_actor, ap_item) |
1912 if resp.code >= 300: | |
1913 text = await resp.text() | |
1914 log.warning( | |
1915 f"unexpected return code while sending AP item: {resp.code}\n{text}\n" | |
1916 f"{pformat(ap_item)}" | |
1917 ) | |
1918 return False | 2041 return False |
1919 | 2042 |
1920 async def _onReferenceReceived( | 2043 async def _onReferenceReceived( |
1921 self, | 2044 self, |
1922 client: SatXMPPEntity, | 2045 client: SatXMPPEntity, |
2001 }) | 2124 }) |
2002 | 2125 |
2003 inbox = await self.getAPInboxFromId(actor_id) | 2126 inbox = await self.getAPInboxFromId(actor_id) |
2004 | 2127 |
2005 resp = await self.signAndPost(inbox, ap_item["actor"], ap_item) | 2128 resp = await self.signAndPost(inbox, ap_item["actor"], ap_item) |
2006 if resp.code >= 300: | |
2007 text = await resp.text() | |
2008 log.warning( | |
2009 f"unexpected return code while sending AP item: {resp.code}\n{text}\n" | |
2010 f"{pformat(ap_item)}" | |
2011 ) | |
2012 | 2129 |
2013 return False | 2130 return False |
2014 | 2131 |
2015 async def newReplyToXMPPItem( | 2132 async def newReplyToXMPPItem( |
2016 self, | 2133 self, |
2261 # this item is a reply to an AP item, we use or create a corresponding node | 2378 # this item is a reply to an AP item, we use or create a corresponding node |
2262 # for comments | 2379 # for comments |
2263 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) | 2380 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) |
2264 node = parent_node or node | 2381 node = parent_node or node |
2265 cached_node = await self.host.memory.storage.getPubsubNode( | 2382 cached_node = await self.host.memory.storage.getPubsubNode( |
2266 client, service, node, with_subscriptions=True | 2383 client, service, node, with_subscriptions=True, create=True, |
2267 ) | 2384 create_kwargs={"subscribed": True} |
2268 if cached_node is None: | 2385 ) |
2269 try: | |
2270 cached_node = await self.host.memory.storage.setPubsubNode( | |
2271 client, | |
2272 service, | |
2273 node, | |
2274 subscribed=True | |
2275 ) | |
2276 except IntegrityError as e: | |
2277 if "unique" in str(e.orig).lower(): | |
2278 # the node may already exist, if it has been created just after | |
2279 # getPubsubNode above | |
2280 log.debug("ignoring UNIQUE constraint error") | |
2281 cached_node = await self.host.memory.storage.getPubsubNode( | |
2282 client, service, node, with_subscriptions=True | |
2283 ) | |
2284 else: | |
2285 raise e | |
2286 | |
2287 else: | 2386 else: |
2288 # it is a root item (i.e. not a reply to an other item) | 2387 # it is a root item (i.e. not a reply to an other item) |
2289 cached_node = await self.host.memory.storage.getPubsubNode( | 2388 cached_node = await self.host.memory.storage.getPubsubNode( |
2290 client, service, node, with_subscriptions=True | 2389 client, service, node, with_subscriptions=True |
2291 ) | 2390 ) |