comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3764:125c7043b277

comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers: this patch implements those major features: - `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish to AP using this service - replies to XMPP items are managed - `inReplyTo` is filled when converting XMPP items to AP objects - `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to XMPP and put in cache. Subscriptions are always public. - `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription (which should be XEP-0465, but the XEP is not yet published at the time of commit), in both directions. - new helper methods to check if an URL is local and to get JID from actor ID doc will follow to explain behaviour rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 19:12:33 +0200
parents a8c7e5cef0cb
children efc34a89e70b
comparison
equal deleted inserted replaced
3763:b2ade5ecdbab 3764:125c7043b277
21 import hashlib 21 import hashlib
22 import json 22 import json
23 from pathlib import Path 23 from pathlib import Path
24 from pprint import pformat 24 from pprint import pformat
25 import re 25 import re
26 from typing import Any, Dict, List, Optional, Tuple, Union, overload 26 from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload
27 from urllib import parse 27 from urllib import parse
28 28
29 from cryptography.exceptions import InvalidSignature 29 from cryptography.exceptions import InvalidSignature
30 from cryptography.hazmat.primitives import serialization 30 from cryptography.hazmat.primitives import serialization
31 from cryptography.hazmat.primitives import hashes 31 from cryptography.hazmat.primitives import hashes
45 from sat.core import exceptions 45 from sat.core import exceptions
46 from sat.core.constants import Const as C 46 from sat.core.constants import Const as C
47 from sat.core.core_types import SatXMPPEntity 47 from sat.core.core_types import SatXMPPEntity
48 from sat.core.i18n import _ 48 from sat.core.i18n import _
49 from sat.core.log import getLogger 49 from sat.core.log import getLogger
50 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState 50 from sat.memory.sqla_mapping import SubscriptionState
51 from sat.tools import utils 51 from sat.tools import utils
52 from sat.tools.common import data_format, tls, uri 52 from sat.tools.common import data_format, tls, uri
53 from sat.tools.common.async_utils import async_lru 53 from sat.tools.common.async_utils import async_lru
54 54
55 from .constants import ( 55 from .constants import (
78 C.PI_NAME: "ActivityPub Gateway component", 78 C.PI_NAME: "ActivityPub Gateway component",
79 C.PI_IMPORT_NAME: IMPORT_NAME, 79 C.PI_IMPORT_NAME: IMPORT_NAME,
80 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 80 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
81 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 81 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
82 C.PI_PROTOCOLS: [], 82 C.PI_PROTOCOLS: [],
83 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"], 83 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"],
84 C.PI_RECOMMENDATIONS: [], 84 C.PI_RECOMMENDATIONS: [],
85 C.PI_MAIN: "APGateway", 85 C.PI_MAIN: "APGateway",
86 C.PI_HANDLER: C.BOOL_TRUE, 86 C.PI_HANDLER: C.BOOL_TRUE,
87 C.PI_DESCRIPTION: _( 87 C.PI_DESCRIPTION: _(
88 "Gateway for bidirectional communication between XMPP and ActivityPub." 88 "Gateway for bidirectional communication between XMPP and ActivityPub."
98 class APGateway: 98 class APGateway:
99 99
100 def __init__(self, host): 100 def __init__(self, host):
101 self.host = host 101 self.host = host
102 self.initialised = False 102 self.initialised = False
103 self.client = None
103 self._m = host.plugins["XEP-0277"] 104 self._m = host.plugins["XEP-0277"]
104 self._p = host.plugins["XEP-0060"] 105 self._p = host.plugins["XEP-0060"]
105 self._e = host.plugins["XEP-0106"] 106 self._e = host.plugins["XEP-0106"]
107 self._pps = host.plugins["XEP-0465"]
106 self._c = host.plugins["PUBSUB_CACHE"] 108 self._c = host.plugins["PUBSUB_CACHE"]
109 self._p.addManagedNode(
110 "", items_cb=self._itemsReceived
111 )
107 self.pubsub_service = APPubsubService(self) 112 self.pubsub_service = APPubsubService(self)
108 113
109 host.bridge.addMethod( 114 host.bridge.addMethod(
110 "APSend", 115 "APSend",
111 ".plugin", 116 ".plugin",
211 216
212 async def profileConnecting(self, client): 217 async def profileConnecting(self, client):
213 self.client = client 218 self.client = client
214 await self.init(client) 219 await self.init(client)
215 220
221 async def _itemsReceived(self, client, itemsEvent):
222 """Callback called when pubsub items are received
223
224 if the items are adressed to a JID corresponding to an AP actor, they are
225 converted to AP items and sent to the corresponding AP server.
226
227 If comments nodes are linked, they are automatically subscribed to get new items
228 from there too.
229 """
230 if client != self.client:
231 return
232 # we need recipient as JID and not gateway own JID to be able to use methods such
233 # as "subscribe"
234 client = self.client.getVirtualClient(itemsEvent.sender)
235 recipient = itemsEvent.recipient
236 if not recipient.user:
237 log.debug("ignoring items event without local part specified")
238 return
239
240 ap_account = self._e.unescape(recipient.user)
241 await self.convertAndPostItems(
242 client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items
243 )
244
216 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: 245 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity:
217 """Get client for this component with a specified jid 246 """Get client for this component with a specified jid
218 247
219 This is needed to perform operations with the virtual JID corresponding to the AP 248 This is needed to perform operations with the virtual JID corresponding to the AP
220 actor instead of the JID of the gateway itself. 249 actor instead of the JID of the gateway itself.
221 @param actor_id: ID of the actor 250 @param actor_id: ID of the actor
222 @return: virtual client 251 @return: virtual client
223 """ 252 """
224 account = await self.getAPAccountFromId(actor_id) 253 local_jid = await self.getJIDFromId(actor_id)
225 local_jid = self.getLocalJIDFromAccount(account)
226 return self.client.getVirtualClient(local_jid) 254 return self.client.getVirtualClient(local_jid)
227 255
228 def isActivity(self, data: dict) -> bool: 256 def isActivity(self, data: dict) -> bool:
229 """Return True if the data has an activity type""" 257 """Return True if the data has an activity type"""
230 try: 258 try:
286 else: 314 else:
287 raise NotImplementedError( 315 raise NotImplementedError(
288 "was expecting a string or a dict, got {type(value)}: {value!r}}" 316 "was expecting a string or a dict, got {type(value)}: {value!r}}"
289 ) 317 )
290 318
291 async def apGetList(self, data: dict, key: str) -> Optional[List[dict]]: 319 async def apGetList(
320 self,
321 data: dict,
322 key: str,
323 only_ids: bool = False
324 ) -> Optional[List[Dict[str, Any]]]:
292 """Retrieve a list of objects from AP data, dereferencing when necessary 325 """Retrieve a list of objects from AP data, dereferencing when necessary
293 326
294 This method is to be used with non functional vocabularies. Use ``apGetObject`` 327 This method is to be used with non functional vocabularies. Use ``apGetObject``
295 otherwise. 328 otherwise.
296 If the value is a dictionary, it will be wrapped in a list 329 If the value is a dictionary, it will be wrapped in a list
297 @param data: AP object where a list of objects is looked for 330 @param data: AP object where a list of objects is looked for
298 @param key: key of the list to look for 331 @param key: key of the list to look for
332 @param only_ids: if Trye, only items IDs are retrieved
299 @return: list of objects, or None if the key is not present 333 @return: list of objects, or None if the key is not present
300 """ 334 """
301 value = data.get(key) 335 value = data.get(key)
302 if value is None: 336 if value is None:
303 return None 337 return None
305 value = await self.apGet(value) 339 value = await self.apGet(value)
306 if isinstance(value, dict): 340 if isinstance(value, dict):
307 return [value] 341 return [value]
308 if not isinstance(value, list): 342 if not isinstance(value, list):
309 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") 343 raise ValueError(f"A list was expected, got {type(value)}: {value!r}")
310 return [await self.apGetObject(i) for i in value] 344 if only_ids:
345 return [
346 {"id": v["id"]} if isinstance(v, dict) else {"id": v}
347 for v in value
348 ]
349 else:
350 return [await self.apGetObject(i) for i in value]
311 351
312 async def apGetActors( 352 async def apGetActors(
313 self, 353 self,
314 data: dict, 354 data: dict,
315 key: str, 355 key: str,
494 ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org 534 ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org
495 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 535 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0
496 536
497 @param ap_account: ActivityPub account handle (``username@domain.tld``) 537 @param ap_account: ActivityPub account handle (``username@domain.tld``)
498 @return: service JID and pubsub node 538 @return: service JID and pubsub node
499 if pubsub is None, default microblog pubsub node (and possibly other nodes 539 if pubsub node is None, default microblog pubsub node (and possibly other
500 that plugins may hanlde) will be used 540 nodes that plugins may hanlde) will be used
501 @raise ValueError: invalid account 541 @raise ValueError: invalid account
502 @raise PermissionError: non local jid is used when gateway doesn't allow them 542 @raise PermissionError: non local jid is used when gateway doesn't allow them
503 """ 543 """
504 if ap_account.count("@") != 1: 544 if ap_account.count("@") != 1:
505 raise ValueError("Invalid AP account") 545 raise ValueError("Invalid AP account")
568 self.client.jid.host, 608 self.client.jid.host,
569 None 609 None
570 ) 610 )
571 ) 611 )
572 612
613 async def getJIDFromId(self, actor_id: str) -> jid.JID:
614 """Compute JID linking to an AP Actor ID
615
616 The local jid is computer by escaping AP actor handle and using it as local part
617 of JID, where domain part is this gateway own JID
618 If the actor_id comes from local server (checked with self.public_url), it means
619 that we have an XMPP entity, and the original JID is returned
620 """
621 if self.isLocalURL(actor_id):
622 request_type, extra_args = self.parseAPURL(actor_id)
623 if request_type != TYPE_ACTOR or len(extra_args) != 1:
624 raise ValueError(f"invalid actor id: {actor_id!r}")
625 actor_jid, __ = await self.getJIDAndNode(extra_args[0])
626 return actor_jid
627
628 account = await self.getAPAccountFromId(actor_id)
629 return self.getLocalJIDFromAccount(account)
630
573 def parseAPURL(self, url: str) -> Tuple[str, List[str]]: 631 def parseAPURL(self, url: str) -> Tuple[str, List[str]]:
574 """Parse an URL leading to an AP endpoint 632 """Parse an URL leading to an AP endpoint
575 633
576 @param url: URL to parse (schema is not mandatory) 634 @param url: URL to parse (schema is not mandatory)
577 @return: endpoint type and extra arguments 635 @return: endpoint type and extra arguments
588 """ 646 """
589 return parse.urljoin( 647 return parse.urljoin(
590 self.base_ap_url, 648 self.base_ap_url,
591 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) 649 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args)))
592 ) 650 )
651
652 def isLocalURL(self, url: str) -> bool:
653 """Tells if an URL link to this component
654
655 ``public_url`` and ``ap_path`` are used to check the URL
656 """
657 return url.startswith(self.base_ap_url)
593 658
594 def buildSignatureHeader(self, values: Dict[str, str]) -> str: 659 def buildSignatureHeader(self, values: Dict[str, str]) -> str:
595 """Build key="<value>" signature header from signature data""" 660 """Build key="<value>" signature header from signature data"""
596 fields = [] 661 fields = []
597 for key, value in values.items(): 662 for key, value in values.items():
749 "signature": signature 814 "signature": signature
750 } 815 }
751 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} 816 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")}
752 new_headers["Signature"] = self.buildSignatureHeader(sign_data) 817 new_headers["Signature"] = self.buildSignatureHeader(sign_data)
753 return new_headers, sign_data 818 return new_headers, sign_data
819
820 async def convertAndPostItems(
821 self,
822 client: SatXMPPEntity,
823 ap_account: str,
824 service: jid.JID,
825 node: str,
826 items: List[domish.Element],
827 subscribe_comments_nodes: bool = False,
828 ) -> None:
829 """Convert XMPP items to AP items and post them to actor inbox
830
831 @param ap_account: account of ActivityPub actor
832 @param service: JID of the virtual pubsub service corresponding to the AP actor
833 @param node: virtual node corresponding to the AP actor and items
834 @param subscribe_comments_nodes: if True, comment nodes present in given items,
835 they will be automatically subscribed
836 """
837 actor_id = await self.getAPActorIdFromAccount(ap_account)
838 inbox = await self.getAPInboxFromId(actor_id)
839 for item in items:
840 mb_data = await self._m.item2mbdata(client, item, service, node)
841 if subscribe_comments_nodes:
842 # we subscribe automatically to comment nodes if any
843 for comment_data in mb_data.get("comments", []):
844 comment_service = jid.JID(comment_data["service"])
845 comment_node = comment_data["node"]
846 await self._p.subscribe(client, comment_service, comment_node)
847 ap_item = await self.mbdata2APitem(client, mb_data)
848 url_actor = ap_item["object"]["attributedTo"]
849 resp = await self.signAndPost(inbox, url_actor, ap_item)
850 if resp.code >= 300:
851 text = await resp.text()
852 log.warning(
853 f"unexpected return code while sending AP item: {resp.code}\n{text}\n"
854 f"{pformat(ap_item)}"
855 )
754 856
755 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: 857 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse:
756 """Sign a documentent and post it to AP server 858 """Sign a documentent and post it to AP server
757 859
758 @param url: AP server endpoint 860 @param url: AP server endpoint
871 collection: dict, 973 collection: dict,
872 max_items: Optional[int] = None, 974 max_items: Optional[int] = None,
873 chronological_pagination: bool = True, 975 chronological_pagination: bool = True,
874 after_id: Optional[str] = None, 976 after_id: Optional[str] = None,
875 start_index: Optional[int] = None, 977 start_index: Optional[int] = None,
978 parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None,
979 only_ids: bool = False,
876 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: 980 ) -> Tuple[List[domish.Element], rsm.RSMResponse]:
877 """Retrieve AP items and convert them to XMPP items 981 """Retrieve AP items and convert them to XMPP items
878 982
879 @param account: AP account handle to get items from 983 @param account: AP account handle to get items from
880 @param max_items: maximum number of items to retrieve 984 @param max_items: maximum number of items to retrieve
890 In most common cases, ``after_id`` should be in cache though (client usually 994 In most common cases, ``after_id`` should be in cache though (client usually
891 use known ID when in-order pagination is used). 995 use known ID when in-order pagination is used).
892 @param start_index: start retrieving items from the one with given index 996 @param start_index: start retrieving items from the one with given index
893 Due to ActivityStream Collection Paging limitations, this is inefficient and 997 Due to ActivityStream Collection Paging limitations, this is inefficient and
894 all pages before the requested index will be retrieved to count items. 998 all pages before the requested index will be retrieved to count items.
999 @param parser: method to use to parse AP items and get XMPP item elements
1000 if None, use default generic parser
1001 @param only_ids: if True, only retrieve items IDs
1002 Retrieving only item IDs avoid HTTP requests to retrieve items, it may be
1003 sufficient in some use cases (e.g. when retrieving following/followers
1004 collections)
895 @return: XMPP Pubsub items and corresponding RSM Response 1005 @return: XMPP Pubsub items and corresponding RSM Response
896 Items are always returned in chronological order in the result 1006 Items are always returned in chronological order in the result
897 """ 1007 """
1008 if parser is None:
1009 parser = self.apItem2Elt
1010
898 rsm_resp: Dict[str, Union[bool, int]] = {} 1011 rsm_resp: Dict[str, Union[bool, int]] = {}
899 try: 1012 try:
900 count = collection["totalItems"] 1013 count = collection["totalItems"]
901 except KeyError: 1014 except KeyError:
902 log.warning( 1015 log.warning(
927 # before "start_index" 1040 # before "start_index"
928 previous_index = start_index - 1 1041 previous_index = start_index - 1
929 retrieved_items = 0 1042 retrieved_items = 0
930 current_page = collection["last"] 1043 current_page = collection["last"]
931 while retrieved_items < count: 1044 while retrieved_items < count:
932 page_data, items = await self.parseAPPage(current_page) 1045 page_data, items = await self.parseAPPage(
1046 current_page, parser, only_ids
1047 )
933 if not items: 1048 if not items:
934 log.warning(f"found an empty AP page at {current_page}") 1049 log.warning(f"found an empty AP page at {current_page}")
935 return [], rsm_resp 1050 return [], rsm_resp
936 page_start_idx = retrieved_items 1051 page_start_idx = retrieved_items
937 retrieved_items += len(items) 1052 retrieved_items += len(items)
961 page_items = [] 1076 page_items = []
962 retrieved_items = 0 1077 retrieved_items = 0
963 found_after_id = False 1078 found_after_id = False
964 1079
965 while retrieved_items < count: 1080 while retrieved_items < count:
966 __, page_items = await self.parseAPPage(page) 1081 __, page_items = await self.parseAPPage(page, parser, only_ids)
967 if not page_items: 1082 if not page_items:
968 break 1083 break
969 retrieved_items += len(page_items) 1084 retrieved_items += len(page_items)
970 if after_id is not None and not found_after_id: 1085 if after_id is not None and not found_after_id:
971 # if we have an after_id, we ignore all items until the requested one is 1086 # if we have an after_id, we ignore all items until the requested one is
1017 "last": items[-1]["id"] 1132 "last": items[-1]["id"]
1018 }) 1133 })
1019 1134
1020 return items, rsm.RSMResponse(**rsm_resp) 1135 return items, rsm.RSMResponse(**rsm_resp)
1021 1136
1137 async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
1138 """Convert AP item to parsed microblog data and corresponding item element"""
1139 mb_data = await self.apItem2MBdata(ap_item)
1140 item_elt = await self._m.data2entry(
1141 self.client, mb_data, mb_data["id"], None, self._m.namespace
1142 )
1143 item_elt["publisher"] = mb_data["author_jid"]
1144 return mb_data, item_elt
1145
1146 async def apItem2Elt(self, ap_item: dict) -> domish.Element:
1147 """Convert AP item to XMPP item element"""
1148 __, item_elt = await self.apItem2MbDataAndElt(ap_item)
1149 return item_elt
1150
1022 async def parseAPPage( 1151 async def parseAPPage(
1023 self, 1152 self,
1024 page: Union[str, dict] 1153 page: Union[str, dict],
1154 parser: Callable[[dict], Awaitable[domish.Element]],
1155 only_ids: bool = False
1025 ) -> Tuple[dict, List[domish.Element]]: 1156 ) -> Tuple[dict, List[domish.Element]]:
1026 """Convert AP objects from an AP page to XMPP items 1157 """Convert AP objects from an AP page to XMPP items
1027 1158
1028 @param page: Can be either url linking and AP page, or the page data directly 1159 @param page: Can be either url linking and AP page, or the page data directly
1160 @param parser: method to use to parse AP items and get XMPP item elements
1161 @param only_ids: if True, only retrieve items IDs
1029 @return: page data, pubsub items 1162 @return: page data, pubsub items
1030 """ 1163 """
1031 page_data = await self.apGetObject(page) 1164 page_data = await self.apGetObject(page)
1032 if page_data is None: 1165 if page_data is None:
1033 log.warning('No data found in collection') 1166 log.warning('No data found in collection')
1034 return {}, [] 1167 return {}, []
1035 ap_items = await self.apGetList(page_data, "orderedItems") 1168 ap_items = await self.apGetList(page_data, "orderedItems", only_ids=only_ids)
1036 if ap_items is None: 1169 if ap_items is None:
1037 ap_items = await self.apGetList(page_data, "items") 1170 ap_items = await self.apGetList(page_data, "items", only_ids=only_ids)
1038 if not ap_items: 1171 if not ap_items:
1039 log.warning(f'No item field found in collection: {page_data!r}') 1172 log.warning(f'No item field found in collection: {page_data!r}')
1040 return page_data, [] 1173 return page_data, []
1041 else: 1174 else:
1042 log.warning( 1175 log.warning(
1045 items = [] 1178 items = []
1046 # AP Collections are in antichronological order, but we expect chronological in 1179 # AP Collections are in antichronological order, but we expect chronological in
1047 # Pubsub, thus we reverse it 1180 # Pubsub, thus we reverse it
1048 for ap_item in reversed(ap_items): 1181 for ap_item in reversed(ap_items):
1049 try: 1182 try:
1050 items.append(await self.apItem2Elt(ap_item)) 1183 items.append(await parser(ap_item))
1051 except (exceptions.DataError, NotImplementedError, error.StanzaError): 1184 except (exceptions.DataError, NotImplementedError, error.StanzaError):
1052 continue 1185 continue
1053 1186
1054 return page_data, items 1187 return page_data, items
1055
1056 async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
1057 """Convert AP item to parsed microblog data and corresponding item element"""
1058 mb_data = await self.apItem2MBdata(ap_item)
1059 item_elt = await self._m.data2entry(
1060 self.client, mb_data, mb_data["id"], None, self._m.namespace
1061 )
1062 item_elt["publisher"] = mb_data["author_jid"]
1063 return mb_data, item_elt
1064
1065 async def apItem2Elt(self, ap_item: dict) -> domish.Element:
1066 """Convert AP item to XMPP item element"""
1067 __, item_elt = await self.apItem2MbDataAndElt(ap_item)
1068 return item_elt
1069 1188
1070 async def getCommentsNodes( 1189 async def getCommentsNodes(
1071 self, 1190 self,
1072 item_id: str, 1191 item_id: str,
1073 parent_id: Optional[str] 1192 parent_id: Optional[str]
1195 } 1314 }
1196 mb_data["comments"] = [comments_data] 1315 mb_data["comments"] = [comments_data]
1197 1316
1198 return mb_data 1317 return mb_data
1199 1318
1319 async def getReplyToIdFromXMPPNode(
1320 self,
1321 client: SatXMPPEntity,
1322 ap_account: str,
1323 parent_item: str,
1324 mb_data: dict
1325 ) -> str:
1326 """Get URL to use for ``inReplyTo`` field in AP item.
1327
1328 There is currently no way to know the parent service of a comment with XEP-0277.
1329 To work around that, we try to check if we have this item in the cache (we
1330 should). If there is more that one item with this ID, we first try to find one
1331 with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`.
1332
1333 @param ap_account: AP account corresponding to the publication author
1334 @param parent_item: ID of the node where the publication this item is replying to
1335 has been posted
1336 @param mb_data: microblog data of the publication
1337 @return: URL to use in ``inReplyTo`` field
1338 """
1339 # FIXME: propose a protoXEP to properly get parent item, node and service
1340
1341 found_items = await self.host.memory.storage.searchPubsubItems({
1342 "profiles": [client.profile],
1343 "names": [parent_item]
1344 })
1345 if not found_items:
1346 log.warning(f"parent item {parent_item!r} not found in cache")
1347 parent_ap_account = ap_account
1348 elif len(found_items) == 1:
1349 cached_node = found_items[0].node
1350 parent_ap_account = await self.getAPAccountFromJidAndNode(
1351 cached_node.service,
1352 cached_node.name
1353 )
1354 else:
1355 # we found several cached item with given ID, we check if there is one
1356 # corresponding to this author
1357 try:
1358 author = jid.JID(mb_data["author_jid"]).userhostJID()
1359 cached_item = next(
1360 i for i in found_items
1361 if jid.JID(i.data["publisher"]).userhostJID()
1362 == author
1363 )
1364 except StopIteration:
1365 # no item corresponding to this author, we use ap_account
1366 log.warning(
1367 "Can't find a single cached item for parent item "
1368 f"{parent_item!r}"
1369 )
1370 parent_ap_account = ap_account
1371 else:
1372 cached_node = cached_item.node
1373 parent_ap_account = await self.getAPAccountFromJidAndNode(
1374 cached_node.service,
1375 cached_node.name
1376 )
1377
1378 return self.buildAPURL(
1379 TYPE_ITEM, parent_ap_account, parent_item
1380 )
1381
1200 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: 1382 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict:
1201 """Convert Libervia Microblog Data to ActivityPub item""" 1383 """Convert Libervia Microblog Data to ActivityPub item"""
1384 try:
1385 node = mb_data["node"]
1386 service = jid.JID(mb_data["service"])
1387 except KeyError:
1388 # node and service must always be specified when this method is used
1389 raise exceptions.InternalError(
1390 "node or service is missing in mb_data"
1391 )
1202 if not mb_data.get("id"): 1392 if not mb_data.get("id"):
1203 mb_data["id"] = shortuuid.uuid() 1393 mb_data["id"] = shortuuid.uuid()
1204 if not mb_data.get("author_jid"): 1394 if not mb_data.get("author_jid"):
1205 mb_data["author_jid"] = client.jid.full() 1395 mb_data["author_jid"] = client.jid.full()
1206 ap_account = await self.getAPAccountFromJidAndNode( 1396 ap_account = await self.getAPAccountFromJidAndNode(
1207 jid.JID(mb_data["author_jid"]), 1397 jid.JID(mb_data["author_jid"]),
1208 None 1398 None
1209 ) 1399 )
1210 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) 1400 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
1211 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) 1401 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
1212 return { 1402 ap_object = {
1403 "id": url_item,
1404 "type": "Note",
1405 "published": utils.xmpp_date(mb_data["published"]),
1406 "attributedTo": url_actor,
1407 "content": mb_data.get("content_xhtml") or mb_data["content"],
1408 "to": ["https://www.w3.org/ns/activitystreams#Public"],
1409 }
1410
1411 target_ap_account = self._e.unescape(service.user)
1412 actor_data = await self.getAPActorDataFromAccount(target_ap_account)
1413 followers = actor_data.get("followers")
1414 if followers:
1415 ap_object["cc"] = [followers]
1416
1417 ap_item = {
1213 "@context": "https://www.w3.org/ns/activitystreams", 1418 "@context": "https://www.w3.org/ns/activitystreams",
1214 "id": url_item, 1419 "id": url_item,
1215 "type": "Create", 1420 "type": "Create",
1216 "actor": url_actor, 1421 "actor": url_actor,
1217 1422
1218 "object": { 1423 "object": ap_object
1219 "id": url_item,
1220 "type": "Note",
1221 "published": utils.xmpp_date(mb_data["published"]),
1222 "attributedTo": url_actor,
1223 "content": mb_data.get("content_xhtml") or mb_data["content"],
1224 "to": "https://www.w3.org/ns/activitystreams#Public"
1225 }
1226 } 1424 }
1425 language = mb_data.get("language")
1426 if language:
1427 ap_object["contentMap"] = {language: ap_object["content"]}
1428 if self._m.isCommentNode(node):
1429 parent_item = self._m.getParentItem(node)
1430 if service.host == self.client.jid.userhost():
1431 # the publication is on a virtual node (i.e. an XMPP node managed by
1432 # this gateway and linking to an ActivityPub actor)
1433 ap_object["inReplyTo"] = parent_item
1434 else:
1435 # the publication is from a followed real XMPP node
1436 ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
1437 client,
1438 ap_account,
1439 parent_item,
1440 mb_data
1441 )
1442
1443 return ap_item
1227 1444
1228 async def publishMessage( 1445 async def publishMessage(
1229 self, 1446 self,
1230 client: SatXMPPEntity, 1447 client: SatXMPPEntity,
1231 mess_data: dict, 1448 mess_data: dict,
1261 item_data = await self.mbdata2APitem(client, mess_data) 1478 item_data = await self.mbdata2APitem(client, mess_data)
1262 url_actor = item_data["object"]["attributedTo"] 1479 url_actor = item_data["object"]["attributedTo"]
1263 resp = await self.signAndPost(inbox_url, url_actor, item_data) 1480 resp = await self.signAndPost(inbox_url, url_actor, item_data)
1264 if resp.code != 202: 1481 if resp.code != 202:
1265 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") 1482 raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
1483
1484 async def newReplyToXMPPItem(
1485 self,
1486 client: SatXMPPEntity,
1487 ap_item: dict,
1488 ) -> None:
1489 """We got an AP item which is a reply to an XMPP item"""
1490 in_reply_to = ap_item["inReplyTo"]
1491 url_type, url_args = self.parseAPURL(in_reply_to)
1492 if url_type != "item":
1493 log.warning(
1494 "Ignoring AP item replying to an XMPP item with an unexpected URL "
1495 f"type({url_type!r}):\n{pformat(ap_item)}"
1496 )
1497 return
1498 try:
1499 parent_item_account, parent_item_id = url_args[0].split("/", 1)
1500 except (IndexError, ValueError):
1501 log.warning(
1502 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL "
1503 f"({in_reply_to!r}):\n{pformat(ap_item)}"
1504 )
1505 return
1506 parent_item_service, parent_item_node = await self.getJIDAndNode(parent_item_account)
1507 if parent_item_node is None:
1508 parent_item_node = self._m.namespace
1509 items, __ = await self._p.getItems(
1510 client, parent_item_service, parent_item_node, item_ids=[parent_item_id]
1511 )
1512 try:
1513 parent_item_elt = items[0]
1514 except IndexError:
1515 log.warning(
1516 f"Can't find parent item at {parent_item_service} (node "
1517 f"{parent_item_node!r})\n{pformat(ap_item)}")
1518 return
1519 parent_item_parsed = await self._m.item2mbdata(
1520 client, parent_item_elt, parent_item_service, parent_item_node
1521 )
1522 try:
1523 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"])
1524 comment_node = parent_item_parsed["comments"][0]["node"]
1525 except (KeyError, IndexError):
1526 # we don't have a comment node set for this item
1527 from sat.tools.xml_tools import ppElt
1528 log.info(f"{ppElt(parent_item_elt.toXml())}")
1529 raise NotImplemented()
1530 else:
1531 __, item_elt = await self.apItem2MbDataAndElt(ap_item)
1532 await self._p.publish(client, comment_service, comment_node, [item_elt])
1266 1533
1267 async def newAPItem( 1534 async def newAPItem(
1268 self, 1535 self,
1269 client: SatXMPPEntity, 1536 client: SatXMPPEntity,
1270 destinee: Optional[jid.JID], 1537 destinee: Optional[jid.JID],
1277 @param node: XMPP pubsub node 1544 @param node: XMPP pubsub node
1278 @param item: AP object payload 1545 @param item: AP object payload
1279 """ 1546 """
1280 service = client.jid 1547 service = client.jid
1281 in_reply_to = item.get("inReplyTo") 1548 in_reply_to = item.get("inReplyTo")
1549 if in_reply_to and isinstance(in_reply_to, list):
1550 in_reply_to = in_reply_to[0]
1282 if in_reply_to and isinstance(in_reply_to, str): 1551 if in_reply_to and isinstance(in_reply_to, str):
1283 # this item is a reply, we use or create a corresponding node for comments 1552 if self.isLocalURL(in_reply_to):
1553 # this is a reply to an XMPP item
1554 return await self.newReplyToXMPPItem(client, item)
1555
1556 # this item is a reply to an AP item, we use or create a corresponding node
1557 # for comments
1284 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) 1558 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to)
1285 node = parent_node or node 1559 node = parent_node or node
1286 cached_node = await self.host.memory.storage.getPubsubNode( 1560 cached_node = await self.host.memory.storage.getPubsubNode(
1287 client, service, node, with_subscriptions=True 1561 client, service, node, with_subscriptions=True
1288 ) 1562 )
1310 cached_node = await self.host.memory.storage.getPubsubNode( 1584 cached_node = await self.host.memory.storage.getPubsubNode(
1311 client, service, node, with_subscriptions=True 1585 client, service, node, with_subscriptions=True
1312 ) 1586 )
1313 if cached_node is None: 1587 if cached_node is None:
1314 log.warning( 1588 log.warning(
1315 f"Received item in unknown node {node!r} at {service}\n{item}" 1589 f"Received item in unknown node {node!r} at {service}. This may be "
1590 f"due to a cache purge. We synchronise the node\n{item}"
1316 1591
1317 ) 1592 )
1318 return 1593 return
1319 mb_data, item_elt = await self.apItem2MbDataAndElt(item) 1594 mb_data, item_elt = await self.apItem2MbDataAndElt(item)
1320 await self.host.memory.storage.cachePubsubItems( 1595 await self.host.memory.storage.cachePubsubItems(