Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3764:125c7043b277
comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
this patch implements those major features:
- `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish
to AP using this service
- replies to XMPP items are managed
- `inReplyTo` is filled when converting XMPP items to AP objects
- `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to
XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to
XMPP and put in cache. Subscriptions are always public.
- `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription
(which should be XEP-0465, but the XEP is not yet published at the time of commit), in
both directions.
- new helper methods to check if an URL is local and to get JID from actor ID
doc will follow to explain behaviour
rel 365
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 13 May 2022 19:12:33 +0200 |
parents | a8c7e5cef0cb |
children | efc34a89e70b |
comparison
equal
deleted
inserted
replaced
3763:b2ade5ecdbab | 3764:125c7043b277 |
---|---|
21 import hashlib | 21 import hashlib |
22 import json | 22 import json |
23 from pathlib import Path | 23 from pathlib import Path |
24 from pprint import pformat | 24 from pprint import pformat |
25 import re | 25 import re |
26 from typing import Any, Dict, List, Optional, Tuple, Union, overload | 26 from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload |
27 from urllib import parse | 27 from urllib import parse |
28 | 28 |
29 from cryptography.exceptions import InvalidSignature | 29 from cryptography.exceptions import InvalidSignature |
30 from cryptography.hazmat.primitives import serialization | 30 from cryptography.hazmat.primitives import serialization |
31 from cryptography.hazmat.primitives import hashes | 31 from cryptography.hazmat.primitives import hashes |
45 from sat.core import exceptions | 45 from sat.core import exceptions |
46 from sat.core.constants import Const as C | 46 from sat.core.constants import Const as C |
47 from sat.core.core_types import SatXMPPEntity | 47 from sat.core.core_types import SatXMPPEntity |
48 from sat.core.i18n import _ | 48 from sat.core.i18n import _ |
49 from sat.core.log import getLogger | 49 from sat.core.log import getLogger |
50 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState | 50 from sat.memory.sqla_mapping import SubscriptionState |
51 from sat.tools import utils | 51 from sat.tools import utils |
52 from sat.tools.common import data_format, tls, uri | 52 from sat.tools.common import data_format, tls, uri |
53 from sat.tools.common.async_utils import async_lru | 53 from sat.tools.common.async_utils import async_lru |
54 | 54 |
55 from .constants import ( | 55 from .constants import ( |
78 C.PI_NAME: "ActivityPub Gateway component", | 78 C.PI_NAME: "ActivityPub Gateway component", |
79 C.PI_IMPORT_NAME: IMPORT_NAME, | 79 C.PI_IMPORT_NAME: IMPORT_NAME, |
80 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | 80 C.PI_MODES: [C.PLUG_MODE_COMPONENT], |
81 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | 81 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, |
82 C.PI_PROTOCOLS: [], | 82 C.PI_PROTOCOLS: [], |
83 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "PUBSUB_CACHE"], | 83 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"], |
84 C.PI_RECOMMENDATIONS: [], | 84 C.PI_RECOMMENDATIONS: [], |
85 C.PI_MAIN: "APGateway", | 85 C.PI_MAIN: "APGateway", |
86 C.PI_HANDLER: C.BOOL_TRUE, | 86 C.PI_HANDLER: C.BOOL_TRUE, |
87 C.PI_DESCRIPTION: _( | 87 C.PI_DESCRIPTION: _( |
88 "Gateway for bidirectional communication between XMPP and ActivityPub." | 88 "Gateway for bidirectional communication between XMPP and ActivityPub." |
98 class APGateway: | 98 class APGateway: |
99 | 99 |
100 def __init__(self, host): | 100 def __init__(self, host): |
101 self.host = host | 101 self.host = host |
102 self.initialised = False | 102 self.initialised = False |
103 self.client = None | |
103 self._m = host.plugins["XEP-0277"] | 104 self._m = host.plugins["XEP-0277"] |
104 self._p = host.plugins["XEP-0060"] | 105 self._p = host.plugins["XEP-0060"] |
105 self._e = host.plugins["XEP-0106"] | 106 self._e = host.plugins["XEP-0106"] |
107 self._pps = host.plugins["XEP-0465"] | |
106 self._c = host.plugins["PUBSUB_CACHE"] | 108 self._c = host.plugins["PUBSUB_CACHE"] |
109 self._p.addManagedNode( | |
110 "", items_cb=self._itemsReceived | |
111 ) | |
107 self.pubsub_service = APPubsubService(self) | 112 self.pubsub_service = APPubsubService(self) |
108 | 113 |
109 host.bridge.addMethod( | 114 host.bridge.addMethod( |
110 "APSend", | 115 "APSend", |
111 ".plugin", | 116 ".plugin", |
211 | 216 |
212 async def profileConnecting(self, client): | 217 async def profileConnecting(self, client): |
213 self.client = client | 218 self.client = client |
214 await self.init(client) | 219 await self.init(client) |
215 | 220 |
221 async def _itemsReceived(self, client, itemsEvent): | |
222 """Callback called when pubsub items are received | |
223 | |
224 if the items are adressed to a JID corresponding to an AP actor, they are | |
225 converted to AP items and sent to the corresponding AP server. | |
226 | |
227 If comments nodes are linked, they are automatically subscribed to get new items | |
228 from there too. | |
229 """ | |
230 if client != self.client: | |
231 return | |
232 # we need recipient as JID and not gateway own JID to be able to use methods such | |
233 # as "subscribe" | |
234 client = self.client.getVirtualClient(itemsEvent.sender) | |
235 recipient = itemsEvent.recipient | |
236 if not recipient.user: | |
237 log.debug("ignoring items event without local part specified") | |
238 return | |
239 | |
240 ap_account = self._e.unescape(recipient.user) | |
241 await self.convertAndPostItems( | |
242 client, ap_account, recipient, itemsEvent.nodeIdentifier, itemsEvent.items | |
243 ) | |
244 | |
216 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: | 245 async def getVirtualClient(self, actor_id: str) -> SatXMPPEntity: |
217 """Get client for this component with a specified jid | 246 """Get client for this component with a specified jid |
218 | 247 |
219 This is needed to perform operations with the virtual JID corresponding to the AP | 248 This is needed to perform operations with the virtual JID corresponding to the AP |
220 actor instead of the JID of the gateway itself. | 249 actor instead of the JID of the gateway itself. |
221 @param actor_id: ID of the actor | 250 @param actor_id: ID of the actor |
222 @return: virtual client | 251 @return: virtual client |
223 """ | 252 """ |
224 account = await self.getAPAccountFromId(actor_id) | 253 local_jid = await self.getJIDFromId(actor_id) |
225 local_jid = self.getLocalJIDFromAccount(account) | |
226 return self.client.getVirtualClient(local_jid) | 254 return self.client.getVirtualClient(local_jid) |
227 | 255 |
228 def isActivity(self, data: dict) -> bool: | 256 def isActivity(self, data: dict) -> bool: |
229 """Return True if the data has an activity type""" | 257 """Return True if the data has an activity type""" |
230 try: | 258 try: |
286 else: | 314 else: |
287 raise NotImplementedError( | 315 raise NotImplementedError( |
288 "was expecting a string or a dict, got {type(value)}: {value!r}}" | 316 "was expecting a string or a dict, got {type(value)}: {value!r}}" |
289 ) | 317 ) |
290 | 318 |
291 async def apGetList(self, data: dict, key: str) -> Optional[List[dict]]: | 319 async def apGetList( |
320 self, | |
321 data: dict, | |
322 key: str, | |
323 only_ids: bool = False | |
324 ) -> Optional[List[Dict[str, Any]]]: | |
292 """Retrieve a list of objects from AP data, dereferencing when necessary | 325 """Retrieve a list of objects from AP data, dereferencing when necessary |
293 | 326 |
294 This method is to be used with non functional vocabularies. Use ``apGetObject`` | 327 This method is to be used with non functional vocabularies. Use ``apGetObject`` |
295 otherwise. | 328 otherwise. |
296 If the value is a dictionary, it will be wrapped in a list | 329 If the value is a dictionary, it will be wrapped in a list |
297 @param data: AP object where a list of objects is looked for | 330 @param data: AP object where a list of objects is looked for |
298 @param key: key of the list to look for | 331 @param key: key of the list to look for |
332 @param only_ids: if Trye, only items IDs are retrieved | |
299 @return: list of objects, or None if the key is not present | 333 @return: list of objects, or None if the key is not present |
300 """ | 334 """ |
301 value = data.get(key) | 335 value = data.get(key) |
302 if value is None: | 336 if value is None: |
303 return None | 337 return None |
305 value = await self.apGet(value) | 339 value = await self.apGet(value) |
306 if isinstance(value, dict): | 340 if isinstance(value, dict): |
307 return [value] | 341 return [value] |
308 if not isinstance(value, list): | 342 if not isinstance(value, list): |
309 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") | 343 raise ValueError(f"A list was expected, got {type(value)}: {value!r}") |
310 return [await self.apGetObject(i) for i in value] | 344 if only_ids: |
345 return [ | |
346 {"id": v["id"]} if isinstance(v, dict) else {"id": v} | |
347 for v in value | |
348 ] | |
349 else: | |
350 return [await self.apGetObject(i) for i in value] | |
311 | 351 |
312 async def apGetActors( | 352 async def apGetActors( |
313 self, | 353 self, |
314 data: dict, | 354 data: dict, |
315 key: str, | 355 key: str, |
494 ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org | 534 ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org |
495 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 | 535 being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 |
496 | 536 |
497 @param ap_account: ActivityPub account handle (``username@domain.tld``) | 537 @param ap_account: ActivityPub account handle (``username@domain.tld``) |
498 @return: service JID and pubsub node | 538 @return: service JID and pubsub node |
499 if pubsub is None, default microblog pubsub node (and possibly other nodes | 539 if pubsub node is None, default microblog pubsub node (and possibly other |
500 that plugins may hanlde) will be used | 540 nodes that plugins may hanlde) will be used |
501 @raise ValueError: invalid account | 541 @raise ValueError: invalid account |
502 @raise PermissionError: non local jid is used when gateway doesn't allow them | 542 @raise PermissionError: non local jid is used when gateway doesn't allow them |
503 """ | 543 """ |
504 if ap_account.count("@") != 1: | 544 if ap_account.count("@") != 1: |
505 raise ValueError("Invalid AP account") | 545 raise ValueError("Invalid AP account") |
568 self.client.jid.host, | 608 self.client.jid.host, |
569 None | 609 None |
570 ) | 610 ) |
571 ) | 611 ) |
572 | 612 |
613 async def getJIDFromId(self, actor_id: str) -> jid.JID: | |
614 """Compute JID linking to an AP Actor ID | |
615 | |
616 The local jid is computer by escaping AP actor handle and using it as local part | |
617 of JID, where domain part is this gateway own JID | |
618 If the actor_id comes from local server (checked with self.public_url), it means | |
619 that we have an XMPP entity, and the original JID is returned | |
620 """ | |
621 if self.isLocalURL(actor_id): | |
622 request_type, extra_args = self.parseAPURL(actor_id) | |
623 if request_type != TYPE_ACTOR or len(extra_args) != 1: | |
624 raise ValueError(f"invalid actor id: {actor_id!r}") | |
625 actor_jid, __ = await self.getJIDAndNode(extra_args[0]) | |
626 return actor_jid | |
627 | |
628 account = await self.getAPAccountFromId(actor_id) | |
629 return self.getLocalJIDFromAccount(account) | |
630 | |
573 def parseAPURL(self, url: str) -> Tuple[str, List[str]]: | 631 def parseAPURL(self, url: str) -> Tuple[str, List[str]]: |
574 """Parse an URL leading to an AP endpoint | 632 """Parse an URL leading to an AP endpoint |
575 | 633 |
576 @param url: URL to parse (schema is not mandatory) | 634 @param url: URL to parse (schema is not mandatory) |
577 @return: endpoint type and extra arguments | 635 @return: endpoint type and extra arguments |
588 """ | 646 """ |
589 return parse.urljoin( | 647 return parse.urljoin( |
590 self.base_ap_url, | 648 self.base_ap_url, |
591 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) | 649 str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) |
592 ) | 650 ) |
651 | |
652 def isLocalURL(self, url: str) -> bool: | |
653 """Tells if an URL link to this component | |
654 | |
655 ``public_url`` and ``ap_path`` are used to check the URL | |
656 """ | |
657 return url.startswith(self.base_ap_url) | |
593 | 658 |
594 def buildSignatureHeader(self, values: Dict[str, str]) -> str: | 659 def buildSignatureHeader(self, values: Dict[str, str]) -> str: |
595 """Build key="<value>" signature header from signature data""" | 660 """Build key="<value>" signature header from signature data""" |
596 fields = [] | 661 fields = [] |
597 for key, value in values.items(): | 662 for key, value in values.items(): |
749 "signature": signature | 814 "signature": signature |
750 } | 815 } |
751 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} | 816 new_headers = {k: v for k,v in headers.items() if not k.startswith("(")} |
752 new_headers["Signature"] = self.buildSignatureHeader(sign_data) | 817 new_headers["Signature"] = self.buildSignatureHeader(sign_data) |
753 return new_headers, sign_data | 818 return new_headers, sign_data |
819 | |
820 async def convertAndPostItems( | |
821 self, | |
822 client: SatXMPPEntity, | |
823 ap_account: str, | |
824 service: jid.JID, | |
825 node: str, | |
826 items: List[domish.Element], | |
827 subscribe_comments_nodes: bool = False, | |
828 ) -> None: | |
829 """Convert XMPP items to AP items and post them to actor inbox | |
830 | |
831 @param ap_account: account of ActivityPub actor | |
832 @param service: JID of the virtual pubsub service corresponding to the AP actor | |
833 @param node: virtual node corresponding to the AP actor and items | |
834 @param subscribe_comments_nodes: if True, comment nodes present in given items, | |
835 they will be automatically subscribed | |
836 """ | |
837 actor_id = await self.getAPActorIdFromAccount(ap_account) | |
838 inbox = await self.getAPInboxFromId(actor_id) | |
839 for item in items: | |
840 mb_data = await self._m.item2mbdata(client, item, service, node) | |
841 if subscribe_comments_nodes: | |
842 # we subscribe automatically to comment nodes if any | |
843 for comment_data in mb_data.get("comments", []): | |
844 comment_service = jid.JID(comment_data["service"]) | |
845 comment_node = comment_data["node"] | |
846 await self._p.subscribe(client, comment_service, comment_node) | |
847 ap_item = await self.mbdata2APitem(client, mb_data) | |
848 url_actor = ap_item["object"]["attributedTo"] | |
849 resp = await self.signAndPost(inbox, url_actor, ap_item) | |
850 if resp.code >= 300: | |
851 text = await resp.text() | |
852 log.warning( | |
853 f"unexpected return code while sending AP item: {resp.code}\n{text}\n" | |
854 f"{pformat(ap_item)}" | |
855 ) | |
754 | 856 |
755 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: | 857 async def signAndPost(self, url: str, actor_id: str, doc: dict) -> TReqResponse: |
756 """Sign a documentent and post it to AP server | 858 """Sign a documentent and post it to AP server |
757 | 859 |
758 @param url: AP server endpoint | 860 @param url: AP server endpoint |
871 collection: dict, | 973 collection: dict, |
872 max_items: Optional[int] = None, | 974 max_items: Optional[int] = None, |
873 chronological_pagination: bool = True, | 975 chronological_pagination: bool = True, |
874 after_id: Optional[str] = None, | 976 after_id: Optional[str] = None, |
875 start_index: Optional[int] = None, | 977 start_index: Optional[int] = None, |
978 parser: Optional[Callable[[dict], Awaitable[domish.Element]]] = None, | |
979 only_ids: bool = False, | |
876 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: | 980 ) -> Tuple[List[domish.Element], rsm.RSMResponse]: |
877 """Retrieve AP items and convert them to XMPP items | 981 """Retrieve AP items and convert them to XMPP items |
878 | 982 |
879 @param account: AP account handle to get items from | 983 @param account: AP account handle to get items from |
880 @param max_items: maximum number of items to retrieve | 984 @param max_items: maximum number of items to retrieve |
890 In most common cases, ``after_id`` should be in cache though (client usually | 994 In most common cases, ``after_id`` should be in cache though (client usually |
891 use known ID when in-order pagination is used). | 995 use known ID when in-order pagination is used). |
892 @param start_index: start retrieving items from the one with given index | 996 @param start_index: start retrieving items from the one with given index |
893 Due to ActivityStream Collection Paging limitations, this is inefficient and | 997 Due to ActivityStream Collection Paging limitations, this is inefficient and |
894 all pages before the requested index will be retrieved to count items. | 998 all pages before the requested index will be retrieved to count items. |
999 @param parser: method to use to parse AP items and get XMPP item elements | |
1000 if None, use default generic parser | |
1001 @param only_ids: if True, only retrieve items IDs | |
1002 Retrieving only item IDs avoid HTTP requests to retrieve items, it may be | |
1003 sufficient in some use cases (e.g. when retrieving following/followers | |
1004 collections) | |
895 @return: XMPP Pubsub items and corresponding RSM Response | 1005 @return: XMPP Pubsub items and corresponding RSM Response |
896 Items are always returned in chronological order in the result | 1006 Items are always returned in chronological order in the result |
897 """ | 1007 """ |
1008 if parser is None: | |
1009 parser = self.apItem2Elt | |
1010 | |
898 rsm_resp: Dict[str, Union[bool, int]] = {} | 1011 rsm_resp: Dict[str, Union[bool, int]] = {} |
899 try: | 1012 try: |
900 count = collection["totalItems"] | 1013 count = collection["totalItems"] |
901 except KeyError: | 1014 except KeyError: |
902 log.warning( | 1015 log.warning( |
927 # before "start_index" | 1040 # before "start_index" |
928 previous_index = start_index - 1 | 1041 previous_index = start_index - 1 |
929 retrieved_items = 0 | 1042 retrieved_items = 0 |
930 current_page = collection["last"] | 1043 current_page = collection["last"] |
931 while retrieved_items < count: | 1044 while retrieved_items < count: |
932 page_data, items = await self.parseAPPage(current_page) | 1045 page_data, items = await self.parseAPPage( |
1046 current_page, parser, only_ids | |
1047 ) | |
933 if not items: | 1048 if not items: |
934 log.warning(f"found an empty AP page at {current_page}") | 1049 log.warning(f"found an empty AP page at {current_page}") |
935 return [], rsm_resp | 1050 return [], rsm_resp |
936 page_start_idx = retrieved_items | 1051 page_start_idx = retrieved_items |
937 retrieved_items += len(items) | 1052 retrieved_items += len(items) |
961 page_items = [] | 1076 page_items = [] |
962 retrieved_items = 0 | 1077 retrieved_items = 0 |
963 found_after_id = False | 1078 found_after_id = False |
964 | 1079 |
965 while retrieved_items < count: | 1080 while retrieved_items < count: |
966 __, page_items = await self.parseAPPage(page) | 1081 __, page_items = await self.parseAPPage(page, parser, only_ids) |
967 if not page_items: | 1082 if not page_items: |
968 break | 1083 break |
969 retrieved_items += len(page_items) | 1084 retrieved_items += len(page_items) |
970 if after_id is not None and not found_after_id: | 1085 if after_id is not None and not found_after_id: |
971 # if we have an after_id, we ignore all items until the requested one is | 1086 # if we have an after_id, we ignore all items until the requested one is |
1017 "last": items[-1]["id"] | 1132 "last": items[-1]["id"] |
1018 }) | 1133 }) |
1019 | 1134 |
1020 return items, rsm.RSMResponse(**rsm_resp) | 1135 return items, rsm.RSMResponse(**rsm_resp) |
1021 | 1136 |
1137 async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: | |
1138 """Convert AP item to parsed microblog data and corresponding item element""" | |
1139 mb_data = await self.apItem2MBdata(ap_item) | |
1140 item_elt = await self._m.data2entry( | |
1141 self.client, mb_data, mb_data["id"], None, self._m.namespace | |
1142 ) | |
1143 item_elt["publisher"] = mb_data["author_jid"] | |
1144 return mb_data, item_elt | |
1145 | |
1146 async def apItem2Elt(self, ap_item: dict) -> domish.Element: | |
1147 """Convert AP item to XMPP item element""" | |
1148 __, item_elt = await self.apItem2MbDataAndElt(ap_item) | |
1149 return item_elt | |
1150 | |
1022 async def parseAPPage( | 1151 async def parseAPPage( |
1023 self, | 1152 self, |
1024 page: Union[str, dict] | 1153 page: Union[str, dict], |
1154 parser: Callable[[dict], Awaitable[domish.Element]], | |
1155 only_ids: bool = False | |
1025 ) -> Tuple[dict, List[domish.Element]]: | 1156 ) -> Tuple[dict, List[domish.Element]]: |
1026 """Convert AP objects from an AP page to XMPP items | 1157 """Convert AP objects from an AP page to XMPP items |
1027 | 1158 |
1028 @param page: Can be either url linking and AP page, or the page data directly | 1159 @param page: Can be either url linking and AP page, or the page data directly |
1160 @param parser: method to use to parse AP items and get XMPP item elements | |
1161 @param only_ids: if True, only retrieve items IDs | |
1029 @return: page data, pubsub items | 1162 @return: page data, pubsub items |
1030 """ | 1163 """ |
1031 page_data = await self.apGetObject(page) | 1164 page_data = await self.apGetObject(page) |
1032 if page_data is None: | 1165 if page_data is None: |
1033 log.warning('No data found in collection') | 1166 log.warning('No data found in collection') |
1034 return {}, [] | 1167 return {}, [] |
1035 ap_items = await self.apGetList(page_data, "orderedItems") | 1168 ap_items = await self.apGetList(page_data, "orderedItems", only_ids=only_ids) |
1036 if ap_items is None: | 1169 if ap_items is None: |
1037 ap_items = await self.apGetList(page_data, "items") | 1170 ap_items = await self.apGetList(page_data, "items", only_ids=only_ids) |
1038 if not ap_items: | 1171 if not ap_items: |
1039 log.warning(f'No item field found in collection: {page_data!r}') | 1172 log.warning(f'No item field found in collection: {page_data!r}') |
1040 return page_data, [] | 1173 return page_data, [] |
1041 else: | 1174 else: |
1042 log.warning( | 1175 log.warning( |
1045 items = [] | 1178 items = [] |
1046 # AP Collections are in antichronological order, but we expect chronological in | 1179 # AP Collections are in antichronological order, but we expect chronological in |
1047 # Pubsub, thus we reverse it | 1180 # Pubsub, thus we reverse it |
1048 for ap_item in reversed(ap_items): | 1181 for ap_item in reversed(ap_items): |
1049 try: | 1182 try: |
1050 items.append(await self.apItem2Elt(ap_item)) | 1183 items.append(await parser(ap_item)) |
1051 except (exceptions.DataError, NotImplementedError, error.StanzaError): | 1184 except (exceptions.DataError, NotImplementedError, error.StanzaError): |
1052 continue | 1185 continue |
1053 | 1186 |
1054 return page_data, items | 1187 return page_data, items |
1055 | |
1056 async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: | |
1057 """Convert AP item to parsed microblog data and corresponding item element""" | |
1058 mb_data = await self.apItem2MBdata(ap_item) | |
1059 item_elt = await self._m.data2entry( | |
1060 self.client, mb_data, mb_data["id"], None, self._m.namespace | |
1061 ) | |
1062 item_elt["publisher"] = mb_data["author_jid"] | |
1063 return mb_data, item_elt | |
1064 | |
1065 async def apItem2Elt(self, ap_item: dict) -> domish.Element: | |
1066 """Convert AP item to XMPP item element""" | |
1067 __, item_elt = await self.apItem2MbDataAndElt(ap_item) | |
1068 return item_elt | |
1069 | 1188 |
1070 async def getCommentsNodes( | 1189 async def getCommentsNodes( |
1071 self, | 1190 self, |
1072 item_id: str, | 1191 item_id: str, |
1073 parent_id: Optional[str] | 1192 parent_id: Optional[str] |
1195 } | 1314 } |
1196 mb_data["comments"] = [comments_data] | 1315 mb_data["comments"] = [comments_data] |
1197 | 1316 |
1198 return mb_data | 1317 return mb_data |
1199 | 1318 |
1319 async def getReplyToIdFromXMPPNode( | |
1320 self, | |
1321 client: SatXMPPEntity, | |
1322 ap_account: str, | |
1323 parent_item: str, | |
1324 mb_data: dict | |
1325 ) -> str: | |
1326 """Get URL to use for ``inReplyTo`` field in AP item. | |
1327 | |
1328 There is currently no way to know the parent service of a comment with XEP-0277. | |
1329 To work around that, we try to check if we have this item in the cache (we | |
1330 should). If there is more that one item with this ID, we first try to find one | |
1331 with this author_jid. If nothing is found, we use ap_account to build `inReplyTo`. | |
1332 | |
1333 @param ap_account: AP account corresponding to the publication author | |
1334 @param parent_item: ID of the node where the publication this item is replying to | |
1335 has been posted | |
1336 @param mb_data: microblog data of the publication | |
1337 @return: URL to use in ``inReplyTo`` field | |
1338 """ | |
1339 # FIXME: propose a protoXEP to properly get parent item, node and service | |
1340 | |
1341 found_items = await self.host.memory.storage.searchPubsubItems({ | |
1342 "profiles": [client.profile], | |
1343 "names": [parent_item] | |
1344 }) | |
1345 if not found_items: | |
1346 log.warning(f"parent item {parent_item!r} not found in cache") | |
1347 parent_ap_account = ap_account | |
1348 elif len(found_items) == 1: | |
1349 cached_node = found_items[0].node | |
1350 parent_ap_account = await self.getAPAccountFromJidAndNode( | |
1351 cached_node.service, | |
1352 cached_node.name | |
1353 ) | |
1354 else: | |
1355 # we found several cached item with given ID, we check if there is one | |
1356 # corresponding to this author | |
1357 try: | |
1358 author = jid.JID(mb_data["author_jid"]).userhostJID() | |
1359 cached_item = next( | |
1360 i for i in found_items | |
1361 if jid.JID(i.data["publisher"]).userhostJID() | |
1362 == author | |
1363 ) | |
1364 except StopIteration: | |
1365 # no item corresponding to this author, we use ap_account | |
1366 log.warning( | |
1367 "Can't find a single cached item for parent item " | |
1368 f"{parent_item!r}" | |
1369 ) | |
1370 parent_ap_account = ap_account | |
1371 else: | |
1372 cached_node = cached_item.node | |
1373 parent_ap_account = await self.getAPAccountFromJidAndNode( | |
1374 cached_node.service, | |
1375 cached_node.name | |
1376 ) | |
1377 | |
1378 return self.buildAPURL( | |
1379 TYPE_ITEM, parent_ap_account, parent_item | |
1380 ) | |
1381 | |
1200 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: | 1382 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: |
1201 """Convert Libervia Microblog Data to ActivityPub item""" | 1383 """Convert Libervia Microblog Data to ActivityPub item""" |
1384 try: | |
1385 node = mb_data["node"] | |
1386 service = jid.JID(mb_data["service"]) | |
1387 except KeyError: | |
1388 # node and service must always be specified when this method is used | |
1389 raise exceptions.InternalError( | |
1390 "node or service is missing in mb_data" | |
1391 ) | |
1202 if not mb_data.get("id"): | 1392 if not mb_data.get("id"): |
1203 mb_data["id"] = shortuuid.uuid() | 1393 mb_data["id"] = shortuuid.uuid() |
1204 if not mb_data.get("author_jid"): | 1394 if not mb_data.get("author_jid"): |
1205 mb_data["author_jid"] = client.jid.full() | 1395 mb_data["author_jid"] = client.jid.full() |
1206 ap_account = await self.getAPAccountFromJidAndNode( | 1396 ap_account = await self.getAPAccountFromJidAndNode( |
1207 jid.JID(mb_data["author_jid"]), | 1397 jid.JID(mb_data["author_jid"]), |
1208 None | 1398 None |
1209 ) | 1399 ) |
1210 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) | 1400 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) |
1211 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) | 1401 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) |
1212 return { | 1402 ap_object = { |
1403 "id": url_item, | |
1404 "type": "Note", | |
1405 "published": utils.xmpp_date(mb_data["published"]), | |
1406 "attributedTo": url_actor, | |
1407 "content": mb_data.get("content_xhtml") or mb_data["content"], | |
1408 "to": ["https://www.w3.org/ns/activitystreams#Public"], | |
1409 } | |
1410 | |
1411 target_ap_account = self._e.unescape(service.user) | |
1412 actor_data = await self.getAPActorDataFromAccount(target_ap_account) | |
1413 followers = actor_data.get("followers") | |
1414 if followers: | |
1415 ap_object["cc"] = [followers] | |
1416 | |
1417 ap_item = { | |
1213 "@context": "https://www.w3.org/ns/activitystreams", | 1418 "@context": "https://www.w3.org/ns/activitystreams", |
1214 "id": url_item, | 1419 "id": url_item, |
1215 "type": "Create", | 1420 "type": "Create", |
1216 "actor": url_actor, | 1421 "actor": url_actor, |
1217 | 1422 |
1218 "object": { | 1423 "object": ap_object |
1219 "id": url_item, | |
1220 "type": "Note", | |
1221 "published": utils.xmpp_date(mb_data["published"]), | |
1222 "attributedTo": url_actor, | |
1223 "content": mb_data.get("content_xhtml") or mb_data["content"], | |
1224 "to": "https://www.w3.org/ns/activitystreams#Public" | |
1225 } | |
1226 } | 1424 } |
1425 language = mb_data.get("language") | |
1426 if language: | |
1427 ap_object["contentMap"] = {language: ap_object["content"]} | |
1428 if self._m.isCommentNode(node): | |
1429 parent_item = self._m.getParentItem(node) | |
1430 if service.host == self.client.jid.userhost(): | |
1431 # the publication is on a virtual node (i.e. an XMPP node managed by | |
1432 # this gateway and linking to an ActivityPub actor) | |
1433 ap_object["inReplyTo"] = parent_item | |
1434 else: | |
1435 # the publication is from a followed real XMPP node | |
1436 ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode( | |
1437 client, | |
1438 ap_account, | |
1439 parent_item, | |
1440 mb_data | |
1441 ) | |
1442 | |
1443 return ap_item | |
1227 | 1444 |
1228 async def publishMessage( | 1445 async def publishMessage( |
1229 self, | 1446 self, |
1230 client: SatXMPPEntity, | 1447 client: SatXMPPEntity, |
1231 mess_data: dict, | 1448 mess_data: dict, |
1261 item_data = await self.mbdata2APitem(client, mess_data) | 1478 item_data = await self.mbdata2APitem(client, mess_data) |
1262 url_actor = item_data["object"]["attributedTo"] | 1479 url_actor = item_data["object"]["attributedTo"] |
1263 resp = await self.signAndPost(inbox_url, url_actor, item_data) | 1480 resp = await self.signAndPost(inbox_url, url_actor, item_data) |
1264 if resp.code != 202: | 1481 if resp.code != 202: |
1265 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") | 1482 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") |
1483 | |
1484 async def newReplyToXMPPItem( | |
1485 self, | |
1486 client: SatXMPPEntity, | |
1487 ap_item: dict, | |
1488 ) -> None: | |
1489 """We got an AP item which is a reply to an XMPP item""" | |
1490 in_reply_to = ap_item["inReplyTo"] | |
1491 url_type, url_args = self.parseAPURL(in_reply_to) | |
1492 if url_type != "item": | |
1493 log.warning( | |
1494 "Ignoring AP item replying to an XMPP item with an unexpected URL " | |
1495 f"type({url_type!r}):\n{pformat(ap_item)}" | |
1496 ) | |
1497 return | |
1498 try: | |
1499 parent_item_account, parent_item_id = url_args[0].split("/", 1) | |
1500 except (IndexError, ValueError): | |
1501 log.warning( | |
1502 "Ignoring AP item replying to an XMPP item with invalid inReplyTo URL " | |
1503 f"({in_reply_to!r}):\n{pformat(ap_item)}" | |
1504 ) | |
1505 return | |
1506 parent_item_service, parent_item_node = await self.getJIDAndNode(parent_item_account) | |
1507 if parent_item_node is None: | |
1508 parent_item_node = self._m.namespace | |
1509 items, __ = await self._p.getItems( | |
1510 client, parent_item_service, parent_item_node, item_ids=[parent_item_id] | |
1511 ) | |
1512 try: | |
1513 parent_item_elt = items[0] | |
1514 except IndexError: | |
1515 log.warning( | |
1516 f"Can't find parent item at {parent_item_service} (node " | |
1517 f"{parent_item_node!r})\n{pformat(ap_item)}") | |
1518 return | |
1519 parent_item_parsed = await self._m.item2mbdata( | |
1520 client, parent_item_elt, parent_item_service, parent_item_node | |
1521 ) | |
1522 try: | |
1523 comment_service = jid.JID(parent_item_parsed["comments"][0]["service"]) | |
1524 comment_node = parent_item_parsed["comments"][0]["node"] | |
1525 except (KeyError, IndexError): | |
1526 # we don't have a comment node set for this item | |
1527 from sat.tools.xml_tools import ppElt | |
1528 log.info(f"{ppElt(parent_item_elt.toXml())}") | |
1529 raise NotImplemented() | |
1530 else: | |
1531 __, item_elt = await self.apItem2MbDataAndElt(ap_item) | |
1532 await self._p.publish(client, comment_service, comment_node, [item_elt]) | |
1266 | 1533 |
1267 async def newAPItem( | 1534 async def newAPItem( |
1268 self, | 1535 self, |
1269 client: SatXMPPEntity, | 1536 client: SatXMPPEntity, |
1270 destinee: Optional[jid.JID], | 1537 destinee: Optional[jid.JID], |
1277 @param node: XMPP pubsub node | 1544 @param node: XMPP pubsub node |
1278 @param item: AP object payload | 1545 @param item: AP object payload |
1279 """ | 1546 """ |
1280 service = client.jid | 1547 service = client.jid |
1281 in_reply_to = item.get("inReplyTo") | 1548 in_reply_to = item.get("inReplyTo") |
1549 if in_reply_to and isinstance(in_reply_to, list): | |
1550 in_reply_to = in_reply_to[0] | |
1282 if in_reply_to and isinstance(in_reply_to, str): | 1551 if in_reply_to and isinstance(in_reply_to, str): |
1283 # this item is a reply, we use or create a corresponding node for comments | 1552 if self.isLocalURL(in_reply_to): |
1553 # this is a reply to an XMPP item | |
1554 return await self.newReplyToXMPPItem(client, item) | |
1555 | |
1556 # this item is a reply to an AP item, we use or create a corresponding node | |
1557 # for comments | |
1284 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) | 1558 parent_node, __ = await self.getCommentsNodes(item["id"], in_reply_to) |
1285 node = parent_node or node | 1559 node = parent_node or node |
1286 cached_node = await self.host.memory.storage.getPubsubNode( | 1560 cached_node = await self.host.memory.storage.getPubsubNode( |
1287 client, service, node, with_subscriptions=True | 1561 client, service, node, with_subscriptions=True |
1288 ) | 1562 ) |
1310 cached_node = await self.host.memory.storage.getPubsubNode( | 1584 cached_node = await self.host.memory.storage.getPubsubNode( |
1311 client, service, node, with_subscriptions=True | 1585 client, service, node, with_subscriptions=True |
1312 ) | 1586 ) |
1313 if cached_node is None: | 1587 if cached_node is None: |
1314 log.warning( | 1588 log.warning( |
1315 f"Received item in unknown node {node!r} at {service}\n{item}" | 1589 f"Received item in unknown node {node!r} at {service}. This may be " |
1590 f"due to a cache purge. We synchronise the node\n{item}" | |
1316 | 1591 |
1317 ) | 1592 ) |
1318 return | 1593 return |
1319 mb_data, item_elt = await self.apItem2MbDataAndElt(item) | 1594 mb_data, item_elt = await self.apItem2MbDataAndElt(item) |
1320 await self.host.memory.storage.cachePubsubItems( | 1595 await self.host.memory.storage.cachePubsubItems( |