Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3904:0aa7023dcd08
component AP gateway: events:
- XMPP Events <=> AP Events conversion
- `Join`/`Leave` activities are converted to RSVP attachments and vice versa
- fix caching/notification on item published on a virtual pubsub node
- add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account
- handle `Update` activity
- on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of
the item is already present in cache
- `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the
actor mapping the same JID with the Events node (i.e. it links to the Events node of the
entity)
- fix subscription to nodes which are not the microblog one
rel 372
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 22 Sep 2022 00:01:41 +0200 |
parents | aa7197b67c26 |
children | 6fa4ca0c047e |
comparison
equal
deleted
inserted
replaced
3903:384b7e6c2dbf | 3904:0aa7023dcd08 |
---|---|
18 | 18 |
19 import base64 | 19 import base64 |
20 import calendar | 20 import calendar |
21 import hashlib | 21 import hashlib |
22 import json | 22 import json |
23 from os import access | |
23 from pathlib import Path | 24 from pathlib import Path |
24 from pprint import pformat | 25 from pprint import pformat |
25 import re | 26 import re |
26 from typing import ( | 27 from typing import ( |
27 Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload | 28 Any, |
29 Awaitable, | |
30 Callable, | |
31 Dict, | |
32 List, | |
33 Optional, | |
34 Set, | |
35 Tuple, | |
36 Type, | |
37 Union, | |
38 overload, | |
28 ) | 39 ) |
29 from urllib import parse | 40 from urllib import parse |
30 | 41 |
31 from cryptography.exceptions import InvalidSignature | 42 from cryptography.exceptions import InvalidSignature |
32 from cryptography.hazmat.primitives import serialization | 43 from cryptography.hazmat.primitives import serialization |
33 from cryptography.hazmat.primitives import hashes | 44 from cryptography.hazmat.primitives import hashes |
34 from cryptography.hazmat.primitives.asymmetric import rsa | 45 from cryptography.hazmat.primitives.asymmetric import rsa |
35 from cryptography.hazmat.primitives.asymmetric import padding | 46 from cryptography.hazmat.primitives.asymmetric import padding |
36 import dateutil | 47 import dateutil |
48 from dateutil.parser import parserinfo | |
37 import shortuuid | 49 import shortuuid |
38 from sqlalchemy.exc import IntegrityError | 50 from sqlalchemy.exc import IntegrityError |
39 import treq | 51 import treq |
40 from treq.response import _Response as TReqResponse | 52 from treq.response import _Response as TReqResponse |
41 from twisted.internet import defer, reactor, threads | 53 from twisted.internet import defer, reactor, threads |
42 from twisted.web import http | 54 from twisted.web import http |
43 from twisted.words.protocols.jabber import error, jid | 55 from twisted.words.protocols.jabber import error, jid |
44 from twisted.words.xish import domish | 56 from twisted.words.xish import domish |
45 from wokkel import rsm, pubsub | 57 from wokkel import pubsub, rsm |
46 | 58 |
47 from sat.core import exceptions | 59 from sat.core import exceptions |
48 from sat.core.constants import Const as C | 60 from sat.core.constants import Const as C |
49 from sat.core.core_types import SatXMPPEntity | 61 from sat.core.core_types import SatXMPPEntity |
50 from sat.core.i18n import _ | 62 from sat.core.i18n import _ |
51 from sat.core.log import getLogger | 63 from sat.core.log import getLogger |
52 from sat.memory.sqla_mapping import SubscriptionState, History | |
53 from sat.memory import persistent | 64 from sat.memory import persistent |
65 from sat.memory.sqla_mapping import History, SubscriptionState | |
54 from sat.tools import utils | 66 from sat.tools import utils |
55 from sat.tools.common import data_format, tls, uri | 67 from sat.tools.common import data_format, date_utils, tls, uri |
56 from sat.tools.common.async_utils import async_lru | 68 from sat.tools.common.async_utils import async_lru |
57 | 69 |
70 from .ad_hoc import APAdHocService | |
71 from .events import APEvents | |
58 from .constants import ( | 72 from .constants import ( |
59 ACTIVITY_OBJECT_MANDATORY, | 73 ACTIVITY_OBJECT_MANDATORY, |
60 ACTIVITY_TARGET_MANDATORY, | 74 ACTIVITY_TARGET_MANDATORY, |
61 ACTIVITY_TYPES, | 75 ACTIVITY_TYPES, |
62 ACTIVITY_TYPES_LOWER, | 76 ACTIVITY_TYPES_LOWER, |
63 COMMENTS_MAX_PARENTS, | 77 COMMENTS_MAX_PARENTS, |
64 CONF_SECTION, | 78 CONF_SECTION, |
65 IMPORT_NAME, | 79 IMPORT_NAME, |
66 LRU_MAX_SIZE, | 80 LRU_MAX_SIZE, |
67 MEDIA_TYPE_AP, | 81 MEDIA_TYPE_AP, |
68 TYPE_ACTOR, | |
69 TYPE_ITEM, | |
70 TYPE_FOLLOWERS, | |
71 TYPE_TOMBSTONE, | |
72 TYPE_MENTION, | |
73 TYPE_LIKE, | |
74 TYPE_REACTION, | |
75 NS_AP, | 82 NS_AP, |
76 NS_AP_PUBLIC, | 83 NS_AP_PUBLIC, |
77 PUBLIC_TUPLE | 84 PUBLIC_TUPLE, |
85 TYPE_ACTOR, | |
86 TYPE_EVENT, | |
87 TYPE_FOLLOWERS, | |
88 TYPE_ITEM, | |
89 TYPE_LIKE, | |
90 TYPE_MENTION, | |
91 TYPE_REACTION, | |
92 TYPE_TOMBSTONE, | |
93 TYPE_JOIN, | |
94 TYPE_LEAVE | |
78 ) | 95 ) |
79 from .regex import RE_MENTION | |
80 from .http_server import HTTPServer | 96 from .http_server import HTTPServer |
81 from .pubsub_service import APPubsubService | 97 from .pubsub_service import APPubsubService |
98 from .regex import RE_MENTION | |
82 | 99 |
83 | 100 |
84 log = getLogger(__name__) | 101 log = getLogger(__name__) |
85 | 102 |
86 IMPORT_NAME = "ap-gateway" | 103 IMPORT_NAME = "ap-gateway" |
90 C.PI_IMPORT_NAME: IMPORT_NAME, | 107 C.PI_IMPORT_NAME: IMPORT_NAME, |
91 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | 108 C.PI_MODES: [C.PLUG_MODE_COMPONENT], |
92 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | 109 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, |
93 C.PI_PROTOCOLS: [], | 110 C.PI_PROTOCOLS: [], |
94 C.PI_DEPENDENCIES: [ | 111 C.PI_DEPENDENCIES: [ |
95 "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", | 112 "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", |
96 "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", "PUBSUB_CACHE", | 113 "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", |
97 "TEXT_SYNTAXES", "IDENTITY" | 114 "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", "EVENTS" |
98 ], | 115 ], |
99 C.PI_RECOMMENDATIONS: [], | 116 C.PI_RECOMMENDATIONS: [], |
100 C.PI_MAIN: "APGateway", | 117 C.PI_MAIN: "APGateway", |
101 C.PI_HANDLER: C.BOOL_TRUE, | 118 C.PI_HANDLER: C.BOOL_TRUE, |
102 C.PI_DESCRIPTION: _( | 119 C.PI_DESCRIPTION: _( |
132 self._pps = host.plugins["XEP-0465"] | 149 self._pps = host.plugins["XEP-0465"] |
133 self._c = host.plugins["PUBSUB_CACHE"] | 150 self._c = host.plugins["PUBSUB_CACHE"] |
134 self._t = host.plugins["TEXT_SYNTAXES"] | 151 self._t = host.plugins["TEXT_SYNTAXES"] |
135 self._i = host.plugins["IDENTITY"] | 152 self._i = host.plugins["IDENTITY"] |
136 self._pa = host.plugins["XEP-0470"] | 153 self._pa = host.plugins["XEP-0470"] |
154 self._events = host.plugins["EVENTS"] | |
137 self._p.addManagedNode( | 155 self._p.addManagedNode( |
138 "", | 156 "", |
139 items_cb=self._itemsReceived, | 157 items_cb=self._itemsReceived, |
140 # we want to be sure that the callbacks are launched before pubsub cache's | 158 # we want to be sure that the callbacks are launched before pubsub cache's |
141 # one, as we need to inspect items before they are actually removed from cache | 159 # one, as we need to inspect items before they are actually removed from cache |
142 # or updated | 160 # or updated |
143 priority=1000 | 161 priority=1000 |
144 ) | 162 ) |
145 self.pubsub_service = APPubsubService(self) | 163 self.pubsub_service = APPubsubService(self) |
164 self.ad_hoc = APAdHocService(self) | |
165 self.ap_events = APEvents(self) | |
146 host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000) | 166 host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000) |
147 host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract) | 167 host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract) |
148 host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived) | 168 host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived) |
149 | 169 |
150 host.bridge.addMethod( | 170 host.bridge.addMethod( |
264 IMPORT_NAME, | 284 IMPORT_NAME, |
265 client.profile | 285 client.profile |
266 ) | 286 ) |
267 await self.init(client) | 287 await self.init(client) |
268 | 288 |
289 def profileConnected(self, client): | |
290 self.ad_hoc.init(client) | |
291 | |
269 async def _itemsReceived( | 292 async def _itemsReceived( |
270 self, | 293 self, |
271 client: SatXMPPEntity, | 294 client: SatXMPPEntity, |
272 itemsEvent: pubsub.ItemsEvent | 295 itemsEvent: pubsub.ItemsEvent |
273 ) -> None: | 296 ) -> None: |
311 @return: virtual client | 334 @return: virtual client |
312 """ | 335 """ |
313 local_jid = await self.getJIDFromId(actor_id) | 336 local_jid = await self.getJIDFromId(actor_id) |
314 return self.client.getVirtualClient(local_jid) | 337 return self.client.getVirtualClient(local_jid) |
315 | 338 |
316 def isActivity(self, data: dict) -> bool: | 339 def is_activity(self, data: dict) -> bool: |
317 """Return True if the data has an activity type""" | 340 """Return True if the data has an activity type""" |
318 try: | 341 try: |
319 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER | 342 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER |
320 except (KeyError, TypeError): | 343 except (KeyError, TypeError): |
321 return False | 344 return False |
432 try: | 455 try: |
433 found_item = found_items[0] | 456 found_item = found_items[0] |
434 except IndexError: | 457 except IndexError: |
435 raise exceptions.NotFound("requested items can't be found") | 458 raise exceptions.NotFound("requested items can't be found") |
436 | 459 |
437 mb_data = await self._m.item2mbdata( | 460 if node.startswith(self._events.namespace): |
438 self.client, found_item, author_jid, node | 461 # this is an event |
439 ) | 462 event_data = self._events.event_elt_2_event_data(found_item) |
440 ap_item = await self.mbdata2APitem(self.client, mb_data) | 463 ap_item = await self.ap_events.event_data_2_ap_item( |
441 # the URL must return the object and not the activity | 464 event_data, author_jid |
442 return ap_item["object"] | 465 ) |
466 # the URL must return the object and not the activity | |
467 ap_item["object"]["@context"] = ap_item["@context"] | |
468 return ap_item["object"] | |
469 else: | |
470 # this is a blog item | |
471 mb_data = await self._m.item2mbdata( | |
472 self.client, found_item, author_jid, node | |
473 ) | |
474 ap_item = await self.mb_data_2_ap_item(self.client, mb_data) | |
475 # the URL must return the object and not the activity | |
476 return ap_item["object"] | |
443 else: | 477 else: |
444 raise NotImplementedError( | 478 raise NotImplementedError( |
445 'only object from "item" URLs can be retrieved for now' | 479 'only object from "item" URLs can be retrieved for now' |
446 ) | 480 ) |
447 | 481 |
529 self, | 563 self, |
530 data: dict, | 564 data: dict, |
531 ) -> str: | 565 ) -> str: |
532 """Retrieve actor who sent data | 566 """Retrieve actor who sent data |
533 | 567 |
534 This is done by checking "attributedTo" field first, then "actor" field. | 568 This is done by checking "actor" field first, then "attributedTo" field. |
535 Only the first found actor is taken into accoun | 569 Only the first found actor is taken into account |
536 @param data: AP object | 570 @param data: AP object |
537 @return: actor id of the sender | 571 @return: actor id of the sender |
538 @raise exceptions.NotFound: no actor has been found in data | 572 @raise exceptions.NotFound: no actor has been found in data |
539 """ | 573 """ |
540 try: | 574 try: |
541 actors = await self.apGetActors(data, "attributedTo", as_account=False) | 575 actors = await self.apGetActors(data, "actor", as_account=False) |
542 except exceptions.DataError: | 576 except exceptions.DataError: |
543 actors = None | 577 actors = None |
544 if not actors: | 578 if not actors: |
545 try: | 579 try: |
546 actors = await self.apGetActors(data, "actor", as_account=False) | 580 actors = await self.apGetActors(data, "attributedTo", as_account=False) |
547 except exceptions.DataError: | 581 except exceptions.DataError: |
548 raise exceptions.NotFound( | 582 raise exceptions.NotFound( |
549 'actor not specified in "attributedTo" or "actor"' | 583 'actor not specified in "actor" or "attributedTo"' |
550 ) | 584 ) |
551 try: | 585 try: |
552 return actors[0] | 586 return actors[0] |
553 except IndexError: | 587 except IndexError: |
554 raise exceptions.NotFound("list of actors is empty") | 588 raise exceptions.NotFound("list of actors is empty") |
878 f'"target" is mandatory for activity {activity!r}' | 912 f'"target" is mandatory for activity {activity!r}' |
879 ) | 913 ) |
880 if activity_id is None: | 914 if activity_id is None: |
881 activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" | 915 activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" |
882 data: Dict[str, Any] = { | 916 data: Dict[str, Any] = { |
883 "@context": NS_AP, | 917 "@context": [NS_AP], |
884 "actor": actor_id, | 918 "actor": actor_id, |
885 "id": activity_id, | 919 "id": activity_id, |
886 "type": activity, | 920 "type": activity, |
887 } | 921 } |
888 data.update(kwargs) | 922 data.update(kwargs) |
991 @param ap_account: account of ActivityPub actor receiving the item | 1025 @param ap_account: account of ActivityPub actor receiving the item |
992 @param service: JID of the (virtual) pubsub service where the item has been | 1026 @param service: JID of the (virtual) pubsub service where the item has been |
993 published | 1027 published |
994 @param node: (virtual) node corresponding where the item has been published | 1028 @param node: (virtual) node corresponding where the item has been published |
995 @param subscribe_extra_nodes: if True, extra data nodes will be automatically | 1029 @param subscribe_extra_nodes: if True, extra data nodes will be automatically |
996 subscribed, that is comment nodes if present and attachments nodes. | 1030 subscribed, that is comment nodes if present and attachments nodes. |
997 """ | 1031 """ |
998 actor_id = await self.getAPActorIdFromAccount(ap_account) | 1032 actor_id = await self.getAPActorIdFromAccount(ap_account) |
999 inbox = await self.getAPInboxFromId(actor_id) | 1033 inbox = await self.getAPInboxFromId(actor_id) |
1000 for item in items: | 1034 for item in items: |
1001 if item.name == "item": | 1035 if item.name == "item": |
1002 mb_data = await self._m.item2mbdata(client, item, service, node) | 1036 cached_item = await self.host.memory.storage.searchPubsubItems({ |
1003 author_jid = jid.JID(mb_data["author_jid"]) | 1037 "profiles": [self.client.profile], |
1004 if subscribe_extra_nodes and not self.isVirtualJID(author_jid): | 1038 "services": [service], |
1005 # we subscribe automatically to comment nodes if any | 1039 "nodes": [node], |
1006 recipient_jid = self.getLocalJIDFromAccount(ap_account) | 1040 "names": [item["id"]] |
1007 recipient_client = self.client.getVirtualClient(recipient_jid) | 1041 }) |
1008 for comment_data in mb_data.get("comments", []): | 1042 is_new = not bool(cached_item) |
1009 comment_service = jid.JID(comment_data["service"]) | 1043 if node.startswith(self._events.namespace): |
1010 if self.isVirtualJID(comment_service): | 1044 # event item |
1011 log.debug(f"ignoring virtual comment service: {comment_data}") | 1045 event_data = self._events.event_elt_2_event_data(item) |
1012 continue | |
1013 comment_node = comment_data["node"] | |
1014 await self._p.subscribe( | |
1015 recipient_client, comment_service, comment_node | |
1016 ) | |
1017 try: | 1046 try: |
1018 await self._pa.subscribe( | 1047 author_jid = jid.JID(item["publisher"]).userhostJID() |
1019 recipient_client, service, node, mb_data["id"] | 1048 except (KeyError, RuntimeWarning): |
1020 ) | 1049 root_elt = item |
1021 except exceptions.NotFound: | 1050 while root_elt.parent is not None: |
1022 log.debug( | 1051 root_elt = root_elt.parent |
1023 f"no attachment node found for item {mb_data['id']!r} on " | 1052 author_jid = jid.JID(root_elt["from"]).userhostJID() |
1024 f"{node!r} at {service}" | 1053 if subscribe_extra_nodes and not self.isVirtualJID(author_jid): |
1025 ) | 1054 # we subscribe automatically to comment nodes if any |
1026 ap_item = await self.mbdata2APitem(client, mb_data) | 1055 recipient_jid = self.getLocalJIDFromAccount(ap_account) |
1056 recipient_client = self.client.getVirtualClient(recipient_jid) | |
1057 comments_data = event_data.get("comments") | |
1058 if comments_data: | |
1059 comment_service = jid.JID(comments_data["jid"]) | |
1060 comment_node = comments_data["node"] | |
1061 await self._p.subscribe( | |
1062 recipient_client, comment_service, comment_node | |
1063 ) | |
1064 try: | |
1065 await self._pa.subscribe( | |
1066 recipient_client, service, node, event_data["id"] | |
1067 ) | |
1068 except exceptions.NotFound: | |
1069 log.debug( | |
1070 f"no attachment node found for item {event_data['id']!r} " | |
1071 f"on {node!r} at {service}" | |
1072 ) | |
1073 ap_item = await self.ap_events.event_data_2_ap_item( | |
1074 event_data, author_jid, is_new=is_new | |
1075 ) | |
1076 else: | |
1077 # blog item | |
1078 mb_data = await self._m.item2mbdata(client, item, service, node) | |
1079 author_jid = jid.JID(mb_data["author_jid"]) | |
1080 if subscribe_extra_nodes and not self.isVirtualJID(author_jid): | |
1081 # we subscribe automatically to comment nodes if any | |
1082 recipient_jid = self.getLocalJIDFromAccount(ap_account) | |
1083 recipient_client = self.client.getVirtualClient(recipient_jid) | |
1084 for comment_data in mb_data.get("comments", []): | |
1085 comment_service = jid.JID(comment_data["service"]) | |
1086 if self.isVirtualJID(comment_service): | |
1087 log.debug( | |
1088 f"ignoring virtual comment service: {comment_data}" | |
1089 ) | |
1090 continue | |
1091 comment_node = comment_data["node"] | |
1092 await self._p.subscribe( | |
1093 recipient_client, comment_service, comment_node | |
1094 ) | |
1095 try: | |
1096 await self._pa.subscribe( | |
1097 recipient_client, service, node, mb_data["id"] | |
1098 ) | |
1099 except exceptions.NotFound: | |
1100 log.debug( | |
1101 f"no attachment node found for item {mb_data['id']!r} on " | |
1102 f"{node!r} at {service}" | |
1103 ) | |
1104 ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new) | |
1105 | |
1027 url_actor = ap_item["actor"] | 1106 url_actor = ap_item["actor"] |
1028 elif item.name == "retract": | 1107 elif item.name == "retract": |
1029 url_actor, ap_item = await self.apDeleteItem( | 1108 url_actor, ap_item = await self.apDeleteItem( |
1030 client.jid, node, item["id"] | 1109 client.jid, node, item["id"] |
1031 ) | 1110 ) |
1132 # noticed | 1211 # noticed |
1133 if "noticed" in attachments: | 1212 if "noticed" in attachments: |
1134 if not "noticed" in old_attachment: | 1213 if not "noticed" in old_attachment: |
1135 # new "noticed" attachment, we translate to "Like" activity | 1214 # new "noticed" attachment, we translate to "Like" activity |
1136 activity_id = self.buildAPURL("like", item_account, item_id) | 1215 activity_id = self.buildAPURL("like", item_account, item_id) |
1137 like = self.createActivity( | 1216 activity = self.createActivity( |
1138 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id | 1217 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id |
1139 ) | 1218 ) |
1140 like["to"] = [ap_account] | 1219 activity["to"] = [ap_account] |
1141 like["cc"] = [NS_AP_PUBLIC] | 1220 activity["cc"] = [NS_AP_PUBLIC] |
1142 await self.signAndPost(inbox, publisher_actor_id, like) | 1221 await self.signAndPost(inbox, publisher_actor_id, activity) |
1143 else: | 1222 else: |
1144 if "noticed" in old_attachment: | 1223 if "noticed" in old_attachment: |
1145 # "noticed" attachment has been removed, we undo the "Like" activity | 1224 # "noticed" attachment has been removed, we undo the "Like" activity |
1146 activity_id = self.buildAPURL("like", item_account, item_id) | 1225 activity_id = self.buildAPURL("like", item_account, item_id) |
1147 like = self.createActivity( | 1226 activity = self.createActivity( |
1148 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id | 1227 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id |
1149 ) | 1228 ) |
1150 like["to"] = [ap_account] | 1229 activity["to"] = [ap_account] |
1151 like["cc"] = [NS_AP_PUBLIC] | 1230 activity["cc"] = [NS_AP_PUBLIC] |
1152 undo = self.createActivity("Undo", publisher_actor_id, like) | 1231 undo = self.createActivity("Undo", publisher_actor_id, activity) |
1153 await self.signAndPost(inbox, publisher_actor_id, undo) | 1232 await self.signAndPost(inbox, publisher_actor_id, undo) |
1154 | 1233 |
1155 # reactions | 1234 # reactions |
1156 new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) | 1235 new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) |
1157 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) | 1236 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) |
1174 "Undo", publisher_actor_id, reaction_activity | 1253 "Undo", publisher_actor_id, reaction_activity |
1175 ) | 1254 ) |
1176 else: | 1255 else: |
1177 activy = reaction_activity | 1256 activy = reaction_activity |
1178 await self.signAndPost(inbox, publisher_actor_id, activy) | 1257 await self.signAndPost(inbox, publisher_actor_id, activy) |
1258 | |
1259 # RSVP | |
1260 if "rsvp" in attachments: | |
1261 attending = attachments["rsvp"].get("attending", "no") | |
1262 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") | |
1263 if attending != old_attending: | |
1264 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE | |
1265 activity_id = self.buildAPURL(activity_type.lower(), item_account, item_id) | |
1266 activity = self.createActivity( | |
1267 activity_type, publisher_actor_id, item_url, activity_id=activity_id | |
1268 ) | |
1269 activity["to"] = [ap_account] | |
1270 activity["cc"] = [NS_AP_PUBLIC] | |
1271 await self.signAndPost(inbox, publisher_actor_id, activity) | |
1272 else: | |
1273 if "rsvp" in old_attachment: | |
1274 old_attending = old_attachment.get("rsvp", {}).get("attending", "no") | |
1275 if old_attending == "yes": | |
1276 activity_id = self.buildAPURL(TYPE_LEAVE.lower(), item_account, item_id) | |
1277 activity = self.createActivity( | |
1278 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id | |
1279 ) | |
1280 activity["to"] = [ap_account] | |
1281 activity["cc"] = [NS_AP_PUBLIC] | |
1282 await self.signAndPost(inbox, publisher_actor_id, activity) | |
1179 | 1283 |
1180 if service.user and self.isVirtualJID(service): | 1284 if service.user and self.isVirtualJID(service): |
1181 # the item is on a virtual service, we need to store it in cache | 1285 # the item is on a virtual service, we need to store it in cache |
1182 log.debug("storing attachments item in cache") | 1286 log.debug("storing attachments item in cache") |
1183 cached_node = await self.host.memory.storage.getPubsubNode( | 1287 cached_node = await self.host.memory.storage.getPubsubNode( |
1390 collections) | 1494 collections) |
1391 @return: XMPP Pubsub items and corresponding RSM Response | 1495 @return: XMPP Pubsub items and corresponding RSM Response |
1392 Items are always returned in chronological order in the result | 1496 Items are always returned in chronological order in the result |
1393 """ | 1497 """ |
1394 if parser is None: | 1498 if parser is None: |
1395 parser = self.apItem2Elt | 1499 parser = self.ap_item_2_mb_elt |
1396 | 1500 |
1397 rsm_resp: Dict[str, Union[bool, int]] = {} | 1501 rsm_resp: Dict[str, Union[bool, int]] = {} |
1398 try: | 1502 try: |
1399 count = collection["totalItems"] | 1503 count = collection["totalItems"] |
1400 except KeyError: | 1504 except KeyError: |
1518 "last": items[-1]["id"] | 1622 "last": items[-1]["id"] |
1519 }) | 1623 }) |
1520 | 1624 |
1521 return items, rsm.RSMResponse(**rsm_resp) | 1625 return items, rsm.RSMResponse(**rsm_resp) |
1522 | 1626 |
1523 async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: | 1627 async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]: |
1524 """Convert AP item to parsed microblog data and corresponding item element""" | 1628 """Convert AP item to parsed microblog data and corresponding item element""" |
1525 mb_data = await self.apItem2MBdata(ap_item) | 1629 mb_data = await self.apItem2MBdata(ap_item) |
1526 item_elt = await self._m.data2entry( | 1630 item_elt = await self._m.data2entry( |
1527 self.client, mb_data, mb_data["id"], None, self._m.namespace | 1631 self.client, mb_data, mb_data["id"], None, self._m.namespace |
1528 ) | 1632 ) |
1530 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] | 1634 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] |
1531 else: | 1635 else: |
1532 item_elt["publisher"] = mb_data["author_jid"] | 1636 item_elt["publisher"] = mb_data["author_jid"] |
1533 return mb_data, item_elt | 1637 return mb_data, item_elt |
1534 | 1638 |
1535 async def apItem2Elt(self, ap_item: dict) -> domish.Element: | 1639 async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: |
1536 """Convert AP item to XMPP item element""" | 1640 """Convert AP item to XMPP item element""" |
1537 __, item_elt = await self.apItem2MbDataAndElt(ap_item) | 1641 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) |
1538 return item_elt | 1642 return item_elt |
1539 | 1643 |
1540 async def parseAPPage( | 1644 async def parseAPPage( |
1541 self, | 1645 self, |
1542 page: Union[str, dict], | 1646 page: Union[str, dict], |
1627 @return: AP Item's Object and microblog data | 1731 @return: AP Item's Object and microblog data |
1628 @raise exceptions.DataError: something is invalid in the AP item | 1732 @raise exceptions.DataError: something is invalid in the AP item |
1629 @raise NotImplementedError: some AP data is not handled yet | 1733 @raise NotImplementedError: some AP data is not handled yet |
1630 @raise error.StanzaError: error while contacting the AP server | 1734 @raise error.StanzaError: error while contacting the AP server |
1631 """ | 1735 """ |
1632 is_activity = self.isActivity(ap_item) | 1736 is_activity = self.is_activity(ap_item) |
1633 if is_activity: | 1737 if is_activity: |
1634 ap_object = await self.apGetObject(ap_item, "object") | 1738 ap_object = await self.apGetObject(ap_item, "object") |
1635 if not ap_object: | 1739 if not ap_object: |
1636 log.warning(f'No "object" found in AP item {ap_item!r}') | 1740 log.warning(f'No "object" found in AP item {ap_item!r}') |
1637 raise exceptions.DataError | 1741 raise exceptions.DataError |
1837 self.buildAPURL(TYPE_FOLLOWERS, repeater_account), | 1941 self.buildAPURL(TYPE_FOLLOWERS, repeater_account), |
1838 await self.getAPActorIdFromAccount(repeated_account) | 1942 await self.getAPActorIdFromAccount(repeated_account) |
1839 ] | 1943 ] |
1840 return announce | 1944 return announce |
1841 | 1945 |
1842 async def mbdata2APitem( | 1946 async def mb_data_2_ap_item( |
1843 self, | 1947 self, |
1844 client: SatXMPPEntity, | 1948 client: SatXMPPEntity, |
1845 mb_data: dict, | 1949 mb_data: dict, |
1846 public=True | 1950 public: bool =True, |
1951 is_new: bool = True, | |
1847 ) -> dict: | 1952 ) -> dict: |
1848 """Convert Libervia Microblog Data to ActivityPub item | 1953 """Convert Libervia Microblog Data to ActivityPub item |
1849 | 1954 |
1850 @param mb_data: microblog data (as used in plugin XEP-0277) to convert | 1955 @param mb_data: microblog data (as used in plugin XEP-0277) to convert |
1851 If ``public`` is True, ``service`` and ``node`` keys must be set. | 1956 If ``public`` is True, ``service`` and ``node`` keys must be set. |
1854 if True, the AP Item will be marked as public, and AP followers of target AP | 1959 if True, the AP Item will be marked as public, and AP followers of target AP |
1855 account (which retrieve from ``service``) will be put in ``cc``. | 1960 account (which retrieve from ``service``) will be put in ``cc``. |
1856 ``inReplyTo`` will also be set if suitable | 1961 ``inReplyTo`` will also be set if suitable |
1857 if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). | 1962 if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). |
1858 This is usually used for direct messages. | 1963 This is usually used for direct messages. |
1964 @param is_new: if True, the item is a new one (no instance has been found in | |
1965 cache). | |
1966 If True, a "Create" activity will be generated, otherwise an "Update" one will | |
1967 be. | |
1859 @return: Activity item | 1968 @return: Activity item |
1860 """ | 1969 """ |
1861 extra = mb_data.get("extra", {}) | 1970 extra = mb_data.get("extra", {}) |
1862 if "repeated" in extra: | 1971 if "repeated" in extra: |
1863 return await self.repeatedMB2APItem(mb_data) | 1972 return await self.repeatedMB2APItem(mb_data) |
1939 parent_item, | 2048 parent_item, |
1940 mb_data | 2049 mb_data |
1941 ) | 2050 ) |
1942 | 2051 |
1943 return self.createActivity( | 2052 return self.createActivity( |
1944 "Create", url_actor, ap_object, activity_id=url_item | 2053 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item |
1945 ) | 2054 ) |
1946 | 2055 |
1947 async def publishMessage( | 2056 async def publishMessage( |
1948 self, | 2057 self, |
1949 client: SatXMPPEntity, | 2058 client: SatXMPPEntity, |
1975 try: | 2084 try: |
1976 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] | 2085 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] |
1977 except KeyError: | 2086 except KeyError: |
1978 raise exceptions.DataError("Can't get ActivityPub actor inbox") | 2087 raise exceptions.DataError("Can't get ActivityPub actor inbox") |
1979 | 2088 |
1980 item_data = await self.mbdata2APitem(client, mess_data) | 2089 item_data = await self.mb_data_2_ap_item(client, mess_data) |
1981 url_actor = item_data["actor"] | 2090 url_actor = item_data["actor"] |
1982 resp = await self.signAndPost(inbox_url, url_actor, item_data) | 2091 resp = await self.signAndPost(inbox_url, url_actor, item_data) |
1983 | 2092 |
1984 async def apDeleteItem( | 2093 async def apDeleteItem( |
1985 self, | 2094 self, |
2094 origin_id = mess_data["extra"].get("origin_id") | 2203 origin_id = mess_data["extra"].get("origin_id") |
2095 if origin_id: | 2204 if origin_id: |
2096 # we need to use origin ID when present to be able to retract the message | 2205 # we need to use origin ID when present to be able to retract the message |
2097 mb_data["id"] = origin_id | 2206 mb_data["id"] = origin_id |
2098 client = self.client.getVirtualClient(mess_data["from"]) | 2207 client = self.client.getVirtualClient(mess_data["from"]) |
2099 ap_item = await self.mbdata2APitem(client, mb_data, public=False) | 2208 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) |
2100 ap_item["object"]["to"] = ap_item["to"] = [actor_id] | 2209 ap_item["object"]["to"] = ap_item["to"] = [actor_id] |
2101 await self.signAndPost(inbox, ap_item["actor"], ap_item) | 2210 await self.signAndPost(inbox, ap_item["actor"], ap_item) |
2102 return mess_data | 2211 return mess_data |
2103 | 2212 |
2104 async def _onMessageRetract( | 2213 async def _onMessageRetract( |
2203 cached_item = cached_items[0] | 2312 cached_item = cached_items[0] |
2204 | 2313 |
2205 mb_data = await self._m.item2mbdata( | 2314 mb_data = await self._m.item2mbdata( |
2206 client, cached_item.data, pubsub_service, pubsub_node | 2315 client, cached_item.data, pubsub_service, pubsub_node |
2207 ) | 2316 ) |
2208 ap_item = await self.mbdata2APitem(client, mb_data) | 2317 ap_item = await self.mb_data_2_ap_item(client, mb_data) |
2209 ap_object = ap_item["object"] | 2318 ap_object = ap_item["object"] |
2210 ap_object["to"] = [actor_id] | 2319 ap_object["to"] = [actor_id] |
2211 ap_object.setdefault("tag", []).append({ | 2320 ap_object.setdefault("tag", []).append({ |
2212 "type": TYPE_MENTION, | 2321 "type": TYPE_MENTION, |
2213 "href": actor_id, | 2322 "href": actor_id, |
2269 # we don't have a comment node set for this item | 2378 # we don't have a comment node set for this item |
2270 from sat.tools.xml_tools import ppElt | 2379 from sat.tools.xml_tools import ppElt |
2271 log.info(f"{ppElt(parent_item_elt.toXml())}") | 2380 log.info(f"{ppElt(parent_item_elt.toXml())}") |
2272 raise NotImplementedError() | 2381 raise NotImplementedError() |
2273 else: | 2382 else: |
2274 __, item_elt = await self.apItem2MbDataAndElt(ap_item) | 2383 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) |
2275 await self._p.publish(client, comment_service, comment_node, [item_elt]) | 2384 await self._p.publish(client, comment_service, comment_node, [item_elt]) |
2276 await self.notifyMentions( | 2385 await self.notifyMentions( |
2277 targets, mentions, comment_service, comment_node, item_elt["id"] | 2386 targets, mentions, comment_service, comment_node, item_elt["id"] |
2278 ) | 2387 ) |
2279 | 2388 |
2452 @param node: XMPP pubsub node | 2561 @param node: XMPP pubsub node |
2453 @param item: AP object payload | 2562 @param item: AP object payload |
2454 @param public: True if the item is public | 2563 @param public: True if the item is public |
2455 """ | 2564 """ |
2456 # XXX: "public" is not used for now | 2565 # XXX: "public" is not used for now |
2457 | |
2458 service = client.jid | 2566 service = client.jid |
2459 in_reply_to = item.get("inReplyTo") | 2567 in_reply_to = item.get("inReplyTo") |
2460 | 2568 |
2461 if in_reply_to and isinstance(in_reply_to, list): | 2569 if in_reply_to and isinstance(in_reply_to, list): |
2462 in_reply_to = in_reply_to[0] | 2570 in_reply_to = in_reply_to[0] |
2474 client, service, node, with_subscriptions=True, create=True, | 2582 client, service, node, with_subscriptions=True, create=True, |
2475 create_kwargs={"subscribed": True} | 2583 create_kwargs={"subscribed": True} |
2476 ) | 2584 ) |
2477 else: | 2585 else: |
2478 # it is a root item (i.e. not a reply to an other item) | 2586 # it is a root item (i.e. not a reply to an other item) |
2587 create = node == self._events.namespace | |
2479 cached_node = await self.host.memory.storage.getPubsubNode( | 2588 cached_node = await self.host.memory.storage.getPubsubNode( |
2480 client, service, node, with_subscriptions=True | 2589 client, service, node, with_subscriptions=True, create=create |
2481 ) | 2590 ) |
2482 if cached_node is None: | 2591 if cached_node is None: |
2483 log.warning( | 2592 log.warning( |
2484 f"Received item in unknown node {node!r} at {service}. This may be " | 2593 f"Received item in unknown node {node!r} at {service}. This may be " |
2485 f"due to a cache purge. We synchronise the node\n{item}" | 2594 f"due to a cache purge. We synchronise the node\n{item}" |
2486 | 2595 |
2487 ) | 2596 ) |
2488 return | 2597 return |
2489 mb_data, item_elt = await self.apItem2MbDataAndElt(item) | 2598 if item.get("type") == TYPE_EVENT: |
2599 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) | |
2600 else: | |
2601 data, item_elt = await self.ap_item_2_mb_data_and_elt(item) | |
2490 await self.host.memory.storage.cachePubsubItems( | 2602 await self.host.memory.storage.cachePubsubItems( |
2491 client, | 2603 client, |
2492 cached_node, | 2604 cached_node, |
2493 [item_elt], | 2605 [item_elt], |
2494 [mb_data] | 2606 [data] |
2495 ) | 2607 ) |
2496 | 2608 |
2497 for subscription in cached_node.subscriptions: | 2609 for subscription in cached_node.subscriptions: |
2498 if subscription.state != SubscriptionState.SUBSCRIBED: | 2610 if subscription.state != SubscriptionState.SUBSCRIBED: |
2499 continue | 2611 continue |