comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3888:aa7197b67c26

component AP gateway: AP <=> XMPP reactions conversions: - Pubsub Attachments plugin has been renamed to XEP-0470 following publication - XEP-0470 has been updated to follow 0.2 changes - AP reactions (as implemented in Pleroma) are converted to XEP-0470 - XEP-0470 events are converted to AP reactions (again, using "EmojiReact" from Pleroma) - AP activities related to attachments (like/reactions) are cached in Libervia because it's not possible to retrieve them from Pleroma instances once they have been emitted (doing an HTTP get on their ID returns a 404). For now those cache are not flushed, this should be improved in the future. - `sharedInbox` is used when available. Pleroma returns a 500 HTTP error when ``to`` or ``cc`` are used in a direct inbox. - reactions and like are not currently used for direct messages, because they can't be emitted from Pleroma in this case, thus there is no point in implementing them for the moment. rel 371
author Goffi <goffi@goffi.org>
date Wed, 31 Aug 2022 17:07:03 +0200
parents 6da749bbf320
children 0aa7023dcd08
comparison
equal deleted inserted replaced
3887:6090141b1b70 3888:aa7197b67c26
69 TYPE_ITEM, 69 TYPE_ITEM,
70 TYPE_FOLLOWERS, 70 TYPE_FOLLOWERS,
71 TYPE_TOMBSTONE, 71 TYPE_TOMBSTONE,
72 TYPE_MENTION, 72 TYPE_MENTION,
73 TYPE_LIKE, 73 TYPE_LIKE,
74 TYPE_REACTION,
74 NS_AP, 75 NS_AP,
75 NS_AP_PUBLIC, 76 NS_AP_PUBLIC,
76 PUBLIC_TUPLE 77 PUBLIC_TUPLE
77 ) 78 )
78 from .regex import RE_MENTION 79 from .regex import RE_MENTION
90 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 91 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
91 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 92 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
92 C.PI_PROTOCOLS: [], 93 C.PI_PROTOCOLS: [],
93 C.PI_DEPENDENCIES: [ 94 C.PI_DEPENDENCIES: [
94 "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", 95 "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292",
95 "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES", 96 "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", "PUBSUB_CACHE",
96 "IDENTITY", "PUBSUB_ATTACHMENTS" 97 "TEXT_SYNTAXES", "IDENTITY"
97 ], 98 ],
98 C.PI_RECOMMENDATIONS: [], 99 C.PI_RECOMMENDATIONS: [],
99 C.PI_MAIN: "APGateway", 100 C.PI_MAIN: "APGateway",
100 C.PI_HANDLER: C.BOOL_TRUE, 101 C.PI_HANDLER: C.BOOL_TRUE,
101 C.PI_DESCRIPTION: _( 102 C.PI_DESCRIPTION: _(
130 self._r = host.plugins["XEP-0424"] 131 self._r = host.plugins["XEP-0424"]
131 self._pps = host.plugins["XEP-0465"] 132 self._pps = host.plugins["XEP-0465"]
132 self._c = host.plugins["PUBSUB_CACHE"] 133 self._c = host.plugins["PUBSUB_CACHE"]
133 self._t = host.plugins["TEXT_SYNTAXES"] 134 self._t = host.plugins["TEXT_SYNTAXES"]
134 self._i = host.plugins["IDENTITY"] 135 self._i = host.plugins["IDENTITY"]
135 self._pa = host.plugins["PUBSUB_ATTACHMENTS"] 136 self._pa = host.plugins["XEP-0470"]
136 self._p.addManagedNode( 137 self._p.addManagedNode(
137 "", 138 "",
138 items_cb=self._itemsReceived, 139 items_cb=self._itemsReceived,
139 # we want to be sure that the callbacks are launched before pubsub cache's 140 # we want to be sure that the callbacks are launched before pubsub cache's
140 # one, as we need to inspect items before they are actually removed from cache 141 # one, as we need to inspect items before they are actually removed from cache
333 } 334 }
334 ) 335 )
335 if resp.code >= 300: 336 if resp.code >= 300:
336 text = await resp.text() 337 text = await resp.text()
337 if resp.code == 404: 338 if resp.code == 404:
338 raise exceptions.NotFound() 339 raise exceptions.NotFound(f"Can't find resource at {url}")
339 else: 340 else:
340 msg = f"HTTP error {resp.code}: {text}" 341 msg = f"HTTP error {resp.code}: {text}"
341 raise exceptions.ExternalRequestError(msg) 342 raise exceptions.ExternalRequestError(msg)
342 try: 343 try:
343 return await treq.json_content(resp) 344 return await treq.json_content(resp)
1102 item_url = self.buildAPURL(TYPE_ITEM, item_account, item_id) 1103 item_url = self.buildAPURL(TYPE_ITEM, item_account, item_id)
1103 1104
1104 old_attachment_pubsub_items = await self.host.memory.storage.searchPubsubItems({ 1105 old_attachment_pubsub_items = await self.host.memory.storage.searchPubsubItems({
1105 "profiles": [self.client.profile], 1106 "profiles": [self.client.profile],
1106 "services": [service], 1107 "services": [service],
1108 "nodes": [node],
1107 "names": [item_elt["id"]] 1109 "names": [item_elt["id"]]
1108 }) 1110 })
1109 if not old_attachment_pubsub_items: 1111 if not old_attachment_pubsub_items:
1110 old_attachment = {} 1112 old_attachment = {}
1111 else: 1113 else:
1125 attachments = self._pa.items2attachmentData(client, [item_elt])[0] 1127 attachments = self._pa.items2attachmentData(client, [item_elt])[0]
1126 except IndexError: 1128 except IndexError:
1127 # no known element was present in attachments 1129 # no known element was present in attachments
1128 attachments = {} 1130 attachments = {}
1129 1131
1132 # noticed
1130 if "noticed" in attachments: 1133 if "noticed" in attachments:
1131 if not "noticed" in old_attachment: 1134 if not "noticed" in old_attachment:
1132 # new "noticed" attachment, we translate to "Like" activity 1135 # new "noticed" attachment, we translate to "Like" activity
1133 activity_id = self.buildAPURL("like", item_account, item_id) 1136 activity_id = self.buildAPURL("like", item_account, item_id)
1134 like = self.createActivity( 1137 like = self.createActivity(
1135 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id 1138 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1136 ) 1139 )
1137 like["to"] = [NS_AP_PUBLIC] 1140 like["to"] = [ap_account]
1141 like["cc"] = [NS_AP_PUBLIC]
1138 await self.signAndPost(inbox, publisher_actor_id, like) 1142 await self.signAndPost(inbox, publisher_actor_id, like)
1139 else: 1143 else:
1140 if "noticed" in old_attachment: 1144 if "noticed" in old_attachment:
1141 # "noticed" attachment has been removed, we undo the "Like" activity 1145 # "noticed" attachment has been removed, we undo the "Like" activity
1142 activity_id = self.buildAPURL("like", item_account, item_id) 1146 activity_id = self.buildAPURL("like", item_account, item_id)
1143 like = self.createActivity( 1147 like = self.createActivity(
1144 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id 1148 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1145 ) 1149 )
1146 like["to"] = [NS_AP_PUBLIC] 1150 like["to"] = [ap_account]
1151 like["cc"] = [NS_AP_PUBLIC]
1147 undo = self.createActivity("Undo", publisher_actor_id, like) 1152 undo = self.createActivity("Undo", publisher_actor_id, like)
1148 await self.signAndPost(inbox, publisher_actor_id, undo) 1153 await self.signAndPost(inbox, publisher_actor_id, undo)
1154
1155 # reactions
1156 new_reactions = set(attachments.get("reactions", {}).get("reactions", []))
1157 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", []))
1158 reactions_remove = old_reactions - new_reactions
1159 reactions_add = new_reactions - old_reactions
1160 for reactions, undo in ((reactions_remove, True), (reactions_add, False)):
1161 for reaction in reactions:
1162 activity_id = self.buildAPURL(
1163 "reaction", item_account, item_id, reaction.encode().hex()
1164 )
1165 reaction_activity = self.createActivity(
1166 TYPE_REACTION, publisher_actor_id, item_url,
1167 activity_id=activity_id
1168 )
1169 reaction_activity["content"] = reaction
1170 reaction_activity["to"] = [ap_account]
1171 reaction_activity["cc"] = [NS_AP_PUBLIC]
1172 if undo:
1173 activy = self.createActivity(
1174 "Undo", publisher_actor_id, reaction_activity
1175 )
1176 else:
1177 activy = reaction_activity
1178 await self.signAndPost(inbox, publisher_actor_id, activy)
1149 1179
1150 if service.user and self.isVirtualJID(service): 1180 if service.user and self.isVirtualJID(service):
1151 # the item is on a virtual service, we need to store it in cache 1181 # the item is on a virtual service, we need to store it in cache
1152 log.debug("storing attachments item in cache") 1182 log.debug("storing attachments item in cache")
1153 cached_node = await self.host.memory.storage.getPubsubNode( 1183 cached_node = await self.host.memory.storage.getPubsubNode(
1260 @param account: ActivityPub Actor identifier 1290 @param account: ActivityPub Actor identifier
1261 """ 1291 """
1262 href = await self.getAPActorIdFromAccount(account) 1292 href = await self.getAPActorIdFromAccount(account)
1263 return await self.apGet(href) 1293 return await self.apGet(href)
1264 1294
1265 async def getAPInboxFromId(self, actor_id: str) -> str: 1295 async def getAPInboxFromId(self, actor_id: str, use_shared: bool = True) -> str:
1266 """Retrieve inbox of an actor_id""" 1296 """Retrieve inbox of an actor_id
1297
1298 @param use_shared: if True, and a shared inbox exists, it will be used instead of
1299 the user inbox
1300 """
1267 data = await self.getActorData(actor_id) 1301 data = await self.getActorData(actor_id)
1302 if use_shared:
1303 try:
1304 return data["endpoints"]["sharedInbox"]
1305 except KeyError:
1306 pass
1268 return data["inbox"] 1307 return data["inbox"]
1269 1308
1270 @async_lru(maxsize=LRU_MAX_SIZE) 1309 @async_lru(maxsize=LRU_MAX_SIZE)
1271 async def getAPAccountFromId(self, actor_id: str) -> str: 1310 async def getAPAccountFromId(self, actor_id: str) -> str:
1272 """Retrieve AP account from the ID URL 1311 """Retrieve AP account from the ID URL
2037 ) 2076 )
2038 return mess_data 2077 return mess_data
2039 2078
2040 actor_account = self._e.unescape(mess_data["to"].user) 2079 actor_account = self._e.unescape(mess_data["to"].user)
2041 actor_id = await self.getAPActorIdFromAccount(actor_account) 2080 actor_id = await self.getAPActorIdFromAccount(actor_account)
2042 inbox = await self.getAPInboxFromId(actor_id) 2081 inbox = await self.getAPInboxFromId(actor_id, use_shared=False)
2043 2082
2044 try: 2083 try:
2045 language, message = next(iter(mess_data["message"].items())) 2084 language, message = next(iter(mess_data["message"].items()))
2046 except (KeyError, StopIteration): 2085 except (KeyError, StopIteration):
2047 log.warning(f"ignoring empty message: {mess_data}") 2086 log.warning(f"ignoring empty message: {mess_data}")
2083 raise exceptions.InternalError( 2122 raise exceptions.InternalError(
2084 f"Invalid destinee's JID: {to_jid.full()}" 2123 f"Invalid destinee's JID: {to_jid.full()}"
2085 ) 2124 )
2086 ap_account = self._e.unescape(to_jid.user) 2125 ap_account = self._e.unescape(to_jid.user)
2087 actor_id = await self.getAPActorIdFromAccount(ap_account) 2126 actor_id = await self.getAPActorIdFromAccount(ap_account)
2088 inbox = await self.getAPInboxFromId(actor_id) 2127 inbox = await self.getAPInboxFromId(actor_id, use_shared=False)
2089 url_actor, ap_item = await self.apDeleteItem( 2128 url_actor, ap_item = await self.apDeleteItem(
2090 from_jid.userhostJID(), None, fastened_elts.id, public=False 2129 from_jid.userhostJID(), None, fastened_elts.id, public=False
2091 ) 2130 )
2092 resp = await self.signAndPost(inbox, url_actor, ap_item) 2131 resp = await self.signAndPost(inbox, url_actor, ap_item)
2093 return False 2132 return False
2173 "type": TYPE_MENTION, 2212 "type": TYPE_MENTION,
2174 "href": actor_id, 2213 "href": actor_id,
2175 "name": ap_account, 2214 "name": ap_account,
2176 }) 2215 })
2177 2216
2178 inbox = await self.getAPInboxFromId(actor_id) 2217 inbox = await self.getAPInboxFromId(actor_id, use_shared=False)
2179 2218
2180 resp = await self.signAndPost(inbox, ap_item["actor"], ap_item) 2219 resp = await self.signAndPost(inbox, ap_item["actor"], ap_item)
2181 2220
2182 return False 2221 return False
2183 2222