comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3865:59fbb66b2923

component AP gateway: handle XMPP attachments -> AP likes conversion: convert `noticed` attachments coming from XMPP to suitable `Like` activities. Virtual node is created in cache when necessary, and virtual items published to virtual JID mapping AP accounts are cached too. rel 370
author Goffi <goffi@goffi.org>
date Wed, 20 Jul 2022 17:53:12 +0200
parents bc7f9d0a404f
children c0bcbcf5b4b7
comparison
equal deleted inserted replaced
3864:ac255a0fbd4c 3865:59fbb66b2923
68 TYPE_ACTOR, 68 TYPE_ACTOR,
69 TYPE_ITEM, 69 TYPE_ITEM,
70 TYPE_FOLLOWERS, 70 TYPE_FOLLOWERS,
71 TYPE_TOMBSTONE, 71 TYPE_TOMBSTONE,
72 TYPE_MENTION, 72 TYPE_MENTION,
73 TYPE_LIKE,
73 NS_AP, 74 NS_AP,
74 NS_AP_PUBLIC, 75 NS_AP_PUBLIC,
75 PUBLIC_TUPLE 76 PUBLIC_TUPLE
76 ) 77 )
77 from .regex import RE_MENTION 78 from .regex import RE_MENTION
88 C.PI_IMPORT_NAME: IMPORT_NAME, 89 C.PI_IMPORT_NAME: IMPORT_NAME,
89 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 90 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
90 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 91 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
91 C.PI_PROTOCOLS: [], 92 C.PI_PROTOCOLS: [],
92 C.PI_DEPENDENCIES: [ 93 C.PI_DEPENDENCIES: [
93 "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", "XEP-0329", 94 "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292",
94 "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", 95 "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES",
95 "XEP-0054" 96 "IDENTITY", "PUBSUB_ATTACHMENTS"
96 ], 97 ],
97 C.PI_RECOMMENDATIONS: [], 98 C.PI_RECOMMENDATIONS: [],
98 C.PI_MAIN: "APGateway", 99 C.PI_MAIN: "APGateway",
99 C.PI_HANDLER: C.BOOL_TRUE, 100 C.PI_HANDLER: C.BOOL_TRUE,
100 C.PI_DESCRIPTION: _( 101 C.PI_DESCRIPTION: _(
129 self._r = host.plugins["XEP-0424"] 130 self._r = host.plugins["XEP-0424"]
130 self._pps = host.plugins["XEP-0465"] 131 self._pps = host.plugins["XEP-0465"]
131 self._c = host.plugins["PUBSUB_CACHE"] 132 self._c = host.plugins["PUBSUB_CACHE"]
132 self._t = host.plugins["TEXT_SYNTAXES"] 133 self._t = host.plugins["TEXT_SYNTAXES"]
133 self._i = host.plugins["IDENTITY"] 134 self._i = host.plugins["IDENTITY"]
135 self._pa = host.plugins["PUBSUB_ATTACHMENTS"]
134 self._p.addManagedNode( 136 self._p.addManagedNode(
135 "", 137 "",
136 items_cb=self._itemsReceived, 138 items_cb=self._itemsReceived,
137 # we want to be sure that the callbacks are launched before pubsub cache's 139 # we want to be sure that the callbacks are launched before pubsub cache's
138 # one, as we need to inspect items before they are actually removed from cache 140 # one, as we need to inspect items before they are actually removed from cache
285 if not recipient.user: 287 if not recipient.user:
286 log.debug("ignoring items event without local part specified") 288 log.debug("ignoring items event without local part specified")
287 return 289 return
288 290
289 ap_account = self._e.unescape(recipient.user) 291 ap_account = self._e.unescape(recipient.user)
290 await self.convertAndPostItems( 292
291 client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items 293 if self._pa.isAttachmentNode(itemsEvent.nodeIdentifier):
292 ) 294 await self.convertAndPostAttachments(
295 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
296 itemsEvent.items, publisher=itemsEvent.sender
297 )
298 else:
299 await self.convertAndPostItems(
300 client, ap_account, itemsEvent.sender, itemsEvent.nodeIdentifier,
301 itemsEvent.items
302 )
293 303
294 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: 304 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity:
295 """Get client for this component with a specified jid 305 """Get client for this component with a specified jid
296 306
297 This is needed to perform operations with the virtual JID corresponding to the AP 307 This is needed to perform operations with the virtual JID corresponding to the AP
404 else: 414 else:
405 found_item = cached_items[0].data 415 found_item = cached_items[0].data
406 416
407 if found_item is None: 417 if found_item is None:
408 # the node is not in cache, we have to make a request to retrieve the item 418 # the node is not in cache, we have to make a request to retrieve the item
409 # if doesn't exist, getItems will raise a NotFound exception 419 # If the node doesn't exist, getItems will raise a NotFound exception
410 found_items, __ = await self._p.getItems( 420 found_items, __ = await self._p.getItems(
411 self.client, author_jid, node, item_ids=[item_id] 421 self.client, author_jid, node, item_ids=[item_id]
412 ) 422 )
413 found_item = found_items[0] 423 try:
424 found_item = found_items[0]
425 except KeyError:
426 raise exceptions.NotFound("requested items can't be found")
414 427
415 mb_data = await self._m.item2mbdata( 428 mb_data = await self._m.item2mbdata(
416 self.client, found_item, author_jid, node 429 self.client, found_item, author_jid, node
417 ) 430 )
418 ap_item = await self.mbdata2APitem(self.client, mb_data) 431 ap_item = await self.mbdata2APitem(self.client, mb_data)
832 **kwargs, 845 **kwargs,
833 ) -> Dict[str, Any]: 846 ) -> Dict[str, Any]:
834 """Generate base data for an activity 847 """Generate base data for an activity
835 848
836 @param activity: one of ACTIVITY_TYPES 849 @param activity: one of ACTIVITY_TYPES
850 @param actor_id: AP actor ID of the sender
851 @param object_: content of "object" field
852 @param target: content of "target" field
853 @param activity_id: ID to use for the activity
854 if not set it will be automatically generated, but it is usually desirable to
855 set the ID manually so it can be retrieved (e.g. for Undo)
837 """ 856 """
838 if activity not in ACTIVITY_TYPES: 857 if activity not in ACTIVITY_TYPES:
839 raise exceptions.InternalError(f"invalid activity: {activity!r}") 858 raise exceptions.InternalError(f"invalid activity: {activity!r}")
840 if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY: 859 if object_ is None and activity in ACTIVITY_OBJECT_MANDATORY:
841 raise exceptions.InternalError( 860 raise exceptions.InternalError(
946 client: SatXMPPEntity, 965 client: SatXMPPEntity,
947 ap_account: str, 966 ap_account: str,
948 service: jid.JID, 967 service: jid.JID,
949 node: str, 968 node: str,
950 items: List[domish.Element], 969 items: List[domish.Element],
951 subscribe_comments_nodes: bool = False, 970 subscribe_extra_nodes: bool = True,
952 ) -> None: 971 ) -> None:
953 """Convert XMPP items to AP items and post them to actor inbox 972 """Convert XMPP items to AP items and post them to actor inbox
954 973
955 @param ap_account: account of ActivityPub actor 974 @param ap_account: account of ActivityPub actor receiving the item
956 @param service: JID of the virtual pubsub service corresponding to the AP actor 975 @param service: JID of the (virtual) pubsub service where the item has been
957 @param node: virtual node corresponding to the AP actor and items 976 published
958 @param subscribe_comments_nodes: if True, comment nodes present in given items, 977 @param node: (virtual) node corresponding where the item has been published
959 they will be automatically subscribed 978 @param subscribe_extra_nodes: if True, extra data nodes will be automatically
979 subscribed, that is comment nodes if present and attachments nodes.
960 """ 980 """
961 actor_id = await self.getAPActorIdFromAccount(ap_account) 981 actor_id = await self.getAPActorIdFromAccount(ap_account)
962 inbox = await self.getAPInboxFromId(actor_id) 982 inbox = await self.getAPInboxFromId(actor_id)
963 for item in items: 983 for item in items:
964 if item.name == "item": 984 if item.name == "item":
965 mb_data = await self._m.item2mbdata(client, item, service, node) 985 mb_data = await self._m.item2mbdata(client, item, service, node)
966 if subscribe_comments_nodes: 986 if subscribe_extra_nodes:
967 # we subscribe automatically to comment nodes if any 987 # we subscribe automatically to comment nodes if any
988 recipient_jid = self.getLocalJIDFromAccount(ap_account)
989 recipient_client = self.client.getVirtualClient(recipient_jid)
968 for comment_data in mb_data.get("comments", []): 990 for comment_data in mb_data.get("comments", []):
969 comment_service = jid.JID(comment_data["service"]) 991 comment_service = jid.JID(comment_data["service"])
970 comment_node = comment_data["node"] 992 comment_node = comment_data["node"]
971 await self._p.subscribe(client, comment_service, comment_node) 993 await self._p.subscribe(
994 recipient_client, comment_service, comment_node
995 )
996 try:
997 await self._pa.subscribe(
998 recipient_client, service, node, mb_data["id"]
999 )
1000 except exceptions.NotFound:
1001 log.debug(
1002 f"no attachment node found for item {mb_data['id']!r} on "
1003 f"{node!r} at {service}"
1004 )
972 ap_item = await self.mbdata2APitem(client, mb_data) 1005 ap_item = await self.mbdata2APitem(client, mb_data)
973 url_actor = ap_item["actor"] 1006 url_actor = ap_item["actor"]
974 elif item.name == "retract": 1007 elif item.name == "retract":
975 url_actor, ap_item = await self.apDeleteItem( 1008 url_actor, ap_item = await self.apDeleteItem(
976 client.jid, node, item["id"] 1009 client.jid, node, item["id"]
977 ) 1010 )
978 else: 1011 else:
979 raise exceptions.InternalError(f"unexpected element: {item.toXml()}") 1012 raise exceptions.InternalError(f"unexpected element: {item.toXml()}")
980 resp = await self.signAndPost(inbox, url_actor, ap_item) 1013 resp = await self.signAndPost(inbox, url_actor, ap_item)
981 if resp.code >= 300: 1014
982 text = await resp.text() 1015 async def convertAndPostAttachments(
1016 self,
1017 client: SatXMPPEntity,
1018 ap_account: str,
1019 service: jid.JID,
1020 node: str,
1021 items: List[domish.Element],
1022 publisher: Optional[jid.JID]
1023 ) -> None:
1024 """Convert XMPP item attachments to AP activities and post them to actor inbox
1025
1026 @param ap_account: account of ActivityPub actor receiving the item
1027 @param service: JID of the (virtual) pubsub service where the item has been
1028 published
1029 @param node: (virtual) node corresponding where the item has been published
1030 @param subscribe_extra_nodes: if True, extra data nodes will be automatically
1031 subscribed, that is comment nodes if present and attachments nodes.
1032 """
1033 if len(items) != 1:
1034 log.warning(
1035 "we should get exactly one attachment item for an entity, got "
1036 f"{len(items)})"
1037 )
1038
1039 actor_id = await self.getAPActorIdFromAccount(ap_account)
1040 inbox = await self.getAPInboxFromId(actor_id)
1041
1042 item_elt = items[0]
1043 if publisher is not None:
1044 item_elt["publisher"] = publisher.userhost()
1045 item_service, item_node, item_id = self._pa.attachmentNode2Item(node)
1046 item_account = await self.getAPAccountFromJidAndNode(item_service, item_node)
1047 if item_service.host == self.client.jid.userhost():
1048 # it's a virtual JID mapping to an external AP actor, we can use the
1049 # item_id directly
1050 item_url = item_id
1051 if not item_url.startswith("https:"):
983 log.warning( 1052 log.warning(
984 f"unexpected return code while sending AP item: {resp.code}\n{text}\n" 1053 "item ID of external AP actor is not an https link, ignoring: "
985 f"{pformat(ap_item)}" 1054 f"{item_id!r}"
986 ) 1055 )
1056 return
1057 else:
1058 item_url = self.buildAPURL(TYPE_ITEM, item_account, item_id)
1059
1060 old_attachment_pubsub_items = await self.host.memory.storage.searchPubsubItems({
1061 "profiles": [self.client.profile],
1062 "services": [service],
1063 "names": [item_elt["id"]]
1064 })
1065 if not old_attachment_pubsub_items:
1066 old_attachment = {}
1067 else:
1068 old_attachment_items = [i.data for i in old_attachment_pubsub_items]
1069 old_attachments = self._pa.items2attachmentData(client, old_attachment_items)
1070 try:
1071 old_attachment = old_attachments[0]
1072 except IndexError:
1073 # no known element was present in attachments
1074 old_attachment = {}
1075 sender_account = await self.getAPAccountFromJidAndNode(
1076 client.jid,
1077 None
1078 )
1079 sender_actor_id = self.buildAPURL(TYPE_ACTOR, sender_account)
1080 try:
1081 attachments = self._pa.items2attachmentData(client, [item_elt])[0]
1082 except IndexError:
1083 # no known element was present in attachments
1084 attachments = {}
1085
1086 if "noticed" in attachments:
1087 if not "noticed" in old_attachment:
1088 # new "noticed" attachment, we translate to "Like" activity
1089 activity_id = self.buildAPURL("like", item_account, item_id)
1090 like = self.createActivity(
1091 TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id
1092 )
1093 like["to"] = [NS_AP_PUBLIC]
1094 await self.signAndPost(inbox, sender_actor_id, like)
1095 else:
1096 if "noticed" in old_attachment:
1097 # "noticed" attachment has been removed, we undo the "Like" activity
1098 activity_id = self.buildAPURL("like", item_account, item_id)
1099 like = self.createActivity(
1100 TYPE_LIKE, sender_actor_id, item_url, activity_id=activity_id
1101 )
1102 like["to"] = [NS_AP_PUBLIC]
1103 undo = self.createActivity("Undo", sender_actor_id, like)
1104 await self.signAndPost(inbox, sender_actor_id, undo)
1105
1106 if service.user and service.host == self.client.jid.userhost():
1107 # the item is on a virtual service, we need to store it in cache
1108 log.debug("storing attachments item in cache")
1109 cached_node = await self.host.memory.storage.getPubsubNode(
1110 client, service, node, with_subscriptions=True, create=True
1111 )
1112 await self.host.memory.storage.cachePubsubItems(
1113 self.client,
1114 cached_node,
1115 [item_elt],
1116 [attachments]
1117 )
987 1118
988 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: 1119 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
989 """Sign a documentent and post it to AP server 1120 """Sign a documentent and post it to AP server
990 1121
991 @param url: AP server endpoint 1122 @param url: AP server endpoint
1018 resp = await treq.post( 1149 resp = await treq.post(
1019 url, 1150 url,
1020 body, 1151 body,
1021 headers=headers, 1152 headers=headers,
1022 ) 1153 )
1023 if resp.code >= 400: 1154 if resp.code >= 300:
1024 text = await resp.text() 1155 text = await resp.text()
1025 log.warning(f"POST request to {url} failed [{resp.code}]: {text}") 1156 log.warning(f"POST request to {url} failed [{resp.code}]: {text}")
1026 elif self.verbose: 1157 elif self.verbose:
1027 log.info(f"==> response code: {resp.code}") 1158 log.info(f"==> response code: {resp.code}")
1028 return resp 1159 return resp
1692 "node or service is missing in mb_data" 1823 "node or service is missing in mb_data"
1693 ) 1824 )
1694 target_ap_account = await self.getAPAccountFromJidAndNode( 1825 target_ap_account = await self.getAPAccountFromJidAndNode(
1695 service, node 1826 service, node
1696 ) 1827 )
1697 if service.host == self.client.jid.userhost: 1828 if service.host == self.client.jid.userhost():
1698 # service is a proxy JID for AP account 1829 # service is a proxy JID for AP account
1699 actor_data = await self.getAPActorDataFromAccount(target_ap_account) 1830 actor_data = await self.getAPActorDataFromAccount(target_ap_account)
1700 followers = actor_data.get("followers") 1831 followers = actor_data.get("followers")
1701 else: 1832 else:
1702 # service is a real XMPP entity 1833 # service is a real XMPP entity
1756 raise exceptions.DataError("Can't get ActivityPub actor inbox") 1887 raise exceptions.DataError("Can't get ActivityPub actor inbox")
1757 1888
1758 item_data = await self.mbdata2APitem(client, mess_data) 1889 item_data = await self.mbdata2APitem(client, mess_data)
1759 url_actor = item_data["actor"] 1890 url_actor = item_data["actor"]
1760 resp = await self.signAndPost(inbox_url, url_actor, item_data) 1891 resp = await self.signAndPost(inbox_url, url_actor, item_data)
1761 if resp.code != 202:
1762 raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
1763 1892
1764 async def apDeleteItem( 1893 async def apDeleteItem(
1765 self, 1894 self,
1766 jid_: jid.JID, 1895 jid_: jid.JID,
1767 node: Optional[str], 1896 node: Optional[str],
1907 inbox = await self.getAPInboxFromId(actor_id) 2036 inbox = await self.getAPInboxFromId(actor_id)
1908 url_actor, ap_item = await self.apDeleteItem( 2037 url_actor, ap_item = await self.apDeleteItem(
1909 from_jid.userhostJID(), None, fastened_elts.id, public=False 2038 from_jid.userhostJID(), None, fastened_elts.id, public=False
1910 ) 2039 )
1911 resp = await self.signAndPost(inbox, url_actor, ap_item) 2040 resp = await self.signAndPost(inbox, url_actor, ap_item)
1912 if resp.code >= 300:
1913 text = await resp.text()
1914 log.warning(
1915 f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
1916 f"{pformat(ap_item)}"
1917 )
1918 return False 2041 return False
1919 2042
1920 async def _onReferenceReceived( 2043 async def _onReferenceReceived(
1921 self, 2044 self,
1922 client: SatXMPPEntity, 2045 client: SatXMPPEntity,
2001 }) 2124 })
2002 2125
2003 inbox = await self.getAPInboxFromId(actor_id) 2126 inbox = await self.getAPInboxFromId(actor_id)
2004 2127
2005 resp = await self.signAndPost(inbox, ap_item["actor"], ap_item) 2128 resp = await self.signAndPost(inbox, ap_item["actor"], ap_item)
2006 if resp.code >= 300:
2007 text = await resp.text()
2008 log.warning(
2009 f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
2010 f"{pformat(ap_item)}"
2011 )
2012 2129
2013 return False 2130 return False
2014 2131
2015 async def newReplyToXMPPItem( 2132 async def newReplyToXMPPItem(
2016 self, 2133 self,
2261 # this item is a reply to an AP item, we use or create a corresponding node 2378 # this item is a reply to an AP item, we use or create a corresponding node
2262 # for comments 2379 # for comments
2263 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) 2380 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to)
2264 node = parent_node or node 2381 node = parent_node or node
2265 cached_node = await self.host.memory.storage.getPubsubNode( 2382 cached_node = await self.host.memory.storage.getPubsubNode(
2266 client, service, node, with_subscriptions=True 2383 client, service, node, with_subscriptions=True, create=True,
2267 ) 2384 create_kwargs={"subscribed": True}
2268 if cached_node is None: 2385 )
2269 try:
2270 cached_node = await self.host.memory.storage.setPubsubNode(
2271 client,
2272 service,
2273 node,
2274 subscribed=True
2275 )
2276 except IntegrityError as e:
2277 if "unique" in str(e.orig).lower():
2278 # the node may already exist, if it has been created just after
2279 # getPubsubNode above
2280 log.debug("ignoring UNIQUE constraint error")
2281 cached_node = await self.host.memory.storage.getPubsubNode(
2282 client, service, node, with_subscriptions=True
2283 )
2284 else:
2285 raise e
2286
2287 else: 2386 else:
2288 # it is a root item (i.e. not a reply to an other item) 2387 # it is a root item (i.e. not a reply to an other item)
2289 cached_node = await self.host.memory.storage.getPubsubNode( 2388 cached_node = await self.host.memory.storage.getPubsubNode(
2290 client, service, node, with_subscriptions=True 2389 client, service, node, with_subscriptions=True
2291 ) 2390 )