comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3904:0aa7023dcd08

component AP gateway: events: - XMPP Events <=> AP Events conversion - `Join`/`Leave` activities are converted to RSVP attachments and vice versa - fix caching/notification on item published on a virtual pubsub node - add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account - handle `Update` activity - on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of the item is already present in cache - `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the actor mapping the same JID with the Events node (i.e. it links to the Events node of the entity) - fix subscription to nodes which are not the microblog one rel 372
author Goffi <goffi@goffi.org>
date Thu, 22 Sep 2022 00:01:41 +0200
parents aa7197b67c26
children 6fa4ca0c047e
comparison
equal deleted inserted replaced
3903:384b7e6c2dbf 3904:0aa7023dcd08
18 18
19 import base64 19 import base64
20 import calendar 20 import calendar
21 import hashlib 21 import hashlib
22 import json 22 import json
23 from os import access
23 from pathlib import Path 24 from pathlib import Path
24 from pprint import pformat 25 from pprint import pformat
25 import re 26 import re
26 from typing import ( 27 from typing import (
27 Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload 28 Any,
29 Awaitable,
30 Callable,
31 Dict,
32 List,
33 Optional,
34 Set,
35 Tuple,
36 Type,
37 Union,
38 overload,
28 ) 39 )
29 from urllib import parse 40 from urllib import parse
30 41
31 from cryptography.exceptions import InvalidSignature 42 from cryptography.exceptions import InvalidSignature
32 from cryptography.hazmat.primitives import serialization 43 from cryptography.hazmat.primitives import serialization
33 from cryptography.hazmat.primitives import hashes 44 from cryptography.hazmat.primitives import hashes
34 from cryptography.hazmat.primitives.asymmetric import rsa 45 from cryptography.hazmat.primitives.asymmetric import rsa
35 from cryptography.hazmat.primitives.asymmetric import padding 46 from cryptography.hazmat.primitives.asymmetric import padding
36 import dateutil 47 import dateutil
48 from dateutil.parser import parserinfo
37 import shortuuid 49 import shortuuid
38 from sqlalchemy.exc import IntegrityError 50 from sqlalchemy.exc import IntegrityError
39 import treq 51 import treq
40 from treq.response import _Response as TReqResponse 52 from treq.response import _Response as TReqResponse
41 from twisted.internet import defer, reactor, threads 53 from twisted.internet import defer, reactor, threads
42 from twisted.web import http 54 from twisted.web import http
43 from twisted.words.protocols.jabber import error, jid 55 from twisted.words.protocols.jabber import error, jid
44 from twisted.words.xish import domish 56 from twisted.words.xish import domish
45 from wokkel import rsm, pubsub 57 from wokkel import pubsub, rsm
46 58
47 from sat.core import exceptions 59 from sat.core import exceptions
48 from sat.core.constants import Const as C 60 from sat.core.constants import Const as C
49 from sat.core.core_types import SatXMPPEntity 61 from sat.core.core_types import SatXMPPEntity
50 from sat.core.i18n import _ 62 from sat.core.i18n import _
51 from sat.core.log import getLogger 63 from sat.core.log import getLogger
52 from sat.memory.sqla_mapping import SubscriptionState, History
53 from sat.memory import persistent 64 from sat.memory import persistent
65 from sat.memory.sqla_mapping import History, SubscriptionState
54 from sat.tools import utils 66 from sat.tools import utils
55 from sat.tools.common import data_format, tls, uri 67 from sat.tools.common import data_format, date_utils, tls, uri
56 from sat.tools.common.async_utils import async_lru 68 from sat.tools.common.async_utils import async_lru
57 69
70 from .ad_hoc import APAdHocService
71 from .events import APEvents
58 from .constants import ( 72 from .constants import (
59 ACTIVITY_OBJECT_MANDATORY, 73 ACTIVITY_OBJECT_MANDATORY,
60 ACTIVITY_TARGET_MANDATORY, 74 ACTIVITY_TARGET_MANDATORY,
61 ACTIVITY_TYPES, 75 ACTIVITY_TYPES,
62 ACTIVITY_TYPES_LOWER, 76 ACTIVITY_TYPES_LOWER,
63 COMMENTS_MAX_PARENTS, 77 COMMENTS_MAX_PARENTS,
64 CONF_SECTION, 78 CONF_SECTION,
65 IMPORT_NAME, 79 IMPORT_NAME,
66 LRU_MAX_SIZE, 80 LRU_MAX_SIZE,
67 MEDIA_TYPE_AP, 81 MEDIA_TYPE_AP,
68 TYPE_ACTOR,
69 TYPE_ITEM,
70 TYPE_FOLLOWERS,
71 TYPE_TOMBSTONE,
72 TYPE_MENTION,
73 TYPE_LIKE,
74 TYPE_REACTION,
75 NS_AP, 82 NS_AP,
76 NS_AP_PUBLIC, 83 NS_AP_PUBLIC,
77 PUBLIC_TUPLE 84 PUBLIC_TUPLE,
85 TYPE_ACTOR,
86 TYPE_EVENT,
87 TYPE_FOLLOWERS,
88 TYPE_ITEM,
89 TYPE_LIKE,
90 TYPE_MENTION,
91 TYPE_REACTION,
92 TYPE_TOMBSTONE,
93 TYPE_JOIN,
94 TYPE_LEAVE
78 ) 95 )
79 from .regex import RE_MENTION
80 from .http_server import HTTPServer 96 from .http_server import HTTPServer
81 from .pubsub_service import APPubsubService 97 from .pubsub_service import APPubsubService
98 from .regex import RE_MENTION
82 99
83 100
84 log = getLogger(__name__) 101 log = getLogger(__name__)
85 102
86 IMPORT_NAME = "ap-gateway" 103 IMPORT_NAME = "ap-gateway"
90 C.PI_IMPORT_NAME: IMPORT_NAME, 107 C.PI_IMPORT_NAME: IMPORT_NAME,
91 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 108 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
92 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 109 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
93 C.PI_PROTOCOLS: [], 110 C.PI_PROTOCOLS: [],
94 C.PI_DEPENDENCIES: [ 111 C.PI_DEPENDENCIES: [
95 "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", 112 "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277",
96 "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", "PUBSUB_CACHE", 113 "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470",
97 "TEXT_SYNTAXES", "IDENTITY" 114 "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", "EVENTS"
98 ], 115 ],
99 C.PI_RECOMMENDATIONS: [], 116 C.PI_RECOMMENDATIONS: [],
100 C.PI_MAIN: "APGateway", 117 C.PI_MAIN: "APGateway",
101 C.PI_HANDLER: C.BOOL_TRUE, 118 C.PI_HANDLER: C.BOOL_TRUE,
102 C.PI_DESCRIPTION: _( 119 C.PI_DESCRIPTION: _(
132 self._pps = host.plugins["XEP-0465"] 149 self._pps = host.plugins["XEP-0465"]
133 self._c = host.plugins["PUBSUB_CACHE"] 150 self._c = host.plugins["PUBSUB_CACHE"]
134 self._t = host.plugins["TEXT_SYNTAXES"] 151 self._t = host.plugins["TEXT_SYNTAXES"]
135 self._i = host.plugins["IDENTITY"] 152 self._i = host.plugins["IDENTITY"]
136 self._pa = host.plugins["XEP-0470"] 153 self._pa = host.plugins["XEP-0470"]
154 self._events = host.plugins["EVENTS"]
137 self._p.addManagedNode( 155 self._p.addManagedNode(
138 "", 156 "",
139 items_cb=self._itemsReceived, 157 items_cb=self._itemsReceived,
140 # we want to be sure that the callbacks are launched before pubsub cache's 158 # we want to be sure that the callbacks are launched before pubsub cache's
141 # one, as we need to inspect items before they are actually removed from cache 159 # one, as we need to inspect items before they are actually removed from cache
142 # or updated 160 # or updated
143 priority=1000 161 priority=1000
144 ) 162 )
145 self.pubsub_service = APPubsubService(self) 163 self.pubsub_service = APPubsubService(self)
164 self.ad_hoc = APAdHocService(self)
165 self.ap_events = APEvents(self)
146 host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000) 166 host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000)
147 host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract) 167 host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract)
148 host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived) 168 host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived)
149 169
150 host.bridge.addMethod( 170 host.bridge.addMethod(
264 IMPORT_NAME, 284 IMPORT_NAME,
265 client.profile 285 client.profile
266 ) 286 )
267 await self.init(client) 287 await self.init(client)
268 288
289 def profileConnected(self, client):
290 self.ad_hoc.init(client)
291
269 async def _itemsReceived( 292 async def _itemsReceived(
270 self, 293 self,
271 client: SatXMPPEntity, 294 client: SatXMPPEntity,
272 itemsEvent: pubsub.ItemsEvent 295 itemsEvent: pubsub.ItemsEvent
273 ) -> None: 296 ) -> None:
311 @return: virtual client 334 @return: virtual client
312 """ 335 """
313 local_jid = await self.getJIDFromId(actor_id) 336 local_jid = await self.getJIDFromId(actor_id)
314 return self.client.getVirtualClient(local_jid) 337 return self.client.getVirtualClient(local_jid)
315 338
316 def isActivity(self, data: dict) -> bool: 339 def is_activity(self, data: dict) -> bool:
317 """Return True if the data has an activity type""" 340 """Return True if the data has an activity type"""
318 try: 341 try:
319 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER 342 return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
320 except (KeyError, TypeError): 343 except (KeyError, TypeError):
321 return False 344 return False
432 try: 455 try:
433 found_item = found_items[0] 456 found_item = found_items[0]
434 except IndexError: 457 except IndexError:
435 raise exceptions.NotFound("requested items can't be found") 458 raise exceptions.NotFound("requested items can't be found")
436 459
437 mb_data = await self._m.item2mbdata( 460 if node.startswith(self._events.namespace):
438 self.client, found_item, author_jid, node 461 # this is an event
439 ) 462 event_data = self._events.event_elt_2_event_data(found_item)
440 ap_item = await self.mbdata2APitem(self.client, mb_data) 463 ap_item = await self.ap_events.event_data_2_ap_item(
441 # the URL must return the object and not the activity 464 event_data, author_jid
442 return ap_item["object"] 465 )
466 # the URL must return the object and not the activity
467 ap_item["object"]["@context"] = ap_item["@context"]
468 return ap_item["object"]
469 else:
470 # this is a blog item
471 mb_data = await self._m.item2mbdata(
472 self.client, found_item, author_jid, node
473 )
474 ap_item = await self.mb_data_2_ap_item(self.client, mb_data)
475 # the URL must return the object and not the activity
476 return ap_item["object"]
443 else: 477 else:
444 raise NotImplementedError( 478 raise NotImplementedError(
445 'only object from "item" URLs can be retrieved for now' 479 'only object from "item" URLs can be retrieved for now'
446 ) 480 )
447 481
529 self, 563 self,
530 data: dict, 564 data: dict,
531 ) -> str: 565 ) -> str:
532 """Retrieve actor who sent data 566 """Retrieve actor who sent data
533 567
534 This is done by checking "attributedTo" field first, then "actor" field. 568 This is done by checking "actor" field first, then "attributedTo" field.
535 Only the first found actor is taken into accoun 569 Only the first found actor is taken into account
536 @param data: AP object 570 @param data: AP object
537 @return: actor id of the sender 571 @return: actor id of the sender
538 @raise exceptions.NotFound: no actor has been found in data 572 @raise exceptions.NotFound: no actor has been found in data
539 """ 573 """
540 try: 574 try:
541 actors = await self.apGetActors(data, "attributedTo", as_account=False) 575 actors = await self.apGetActors(data, "actor", as_account=False)
542 except exceptions.DataError: 576 except exceptions.DataError:
543 actors = None 577 actors = None
544 if not actors: 578 if not actors:
545 try: 579 try:
546 actors = await self.apGetActors(data, "actor", as_account=False) 580 actors = await self.apGetActors(data, "attributedTo", as_account=False)
547 except exceptions.DataError: 581 except exceptions.DataError:
548 raise exceptions.NotFound( 582 raise exceptions.NotFound(
549 'actor not specified in "attributedTo" or "actor"' 583 'actor not specified in "actor" or "attributedTo"'
550 ) 584 )
551 try: 585 try:
552 return actors[0] 586 return actors[0]
553 except IndexError: 587 except IndexError:
554 raise exceptions.NotFound("list of actors is empty") 588 raise exceptions.NotFound("list of actors is empty")
878 f'"target" is mandatory for activity {activity!r}' 912 f'"target" is mandatory for activity {activity!r}'
879 ) 913 )
880 if activity_id is None: 914 if activity_id is None:
881 activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" 915 activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}"
882 data: Dict[str, Any] = { 916 data: Dict[str, Any] = {
883 "@context": NS_AP, 917 "@context": [NS_AP],
884 "actor": actor_id, 918 "actor": actor_id,
885 "id": activity_id, 919 "id": activity_id,
886 "type": activity, 920 "type": activity,
887 } 921 }
888 data.update(kwargs) 922 data.update(kwargs)
991 @param ap_account: account of ActivityPub actor receiving the item 1025 @param ap_account: account of ActivityPub actor receiving the item
992 @param service: JID of the (virtual) pubsub service where the item has been 1026 @param service: JID of the (virtual) pubsub service where the item has been
993 published 1027 published
994 @param node: (virtual) node corresponding where the item has been published 1028 @param node: (virtual) node corresponding where the item has been published
995 @param subscribe_extra_nodes: if True, extra data nodes will be automatically 1029 @param subscribe_extra_nodes: if True, extra data nodes will be automatically
996 subscribed, that is comment nodes if present and attachments nodes. 1030 subscribed, that is comment nodes if present and attachments nodes.
997 """ 1031 """
998 actor_id = await self.getAPActorIdFromAccount(ap_account) 1032 actor_id = await self.getAPActorIdFromAccount(ap_account)
999 inbox = await self.getAPInboxFromId(actor_id) 1033 inbox = await self.getAPInboxFromId(actor_id)
1000 for item in items: 1034 for item in items:
1001 if item.name == "item": 1035 if item.name == "item":
1002 mb_data = await self._m.item2mbdata(client, item, service, node) 1036 cached_item = await self.host.memory.storage.searchPubsubItems({
1003 author_jid = jid.JID(mb_data["author_jid"]) 1037 "profiles": [self.client.profile],
1004 if subscribe_extra_nodes and not self.isVirtualJID(author_jid): 1038 "services": [service],
1005 # we subscribe automatically to comment nodes if any 1039 "nodes": [node],
1006 recipient_jid = self.getLocalJIDFromAccount(ap_account) 1040 "names": [item["id"]]
1007 recipient_client = self.client.getVirtualClient(recipient_jid) 1041 })
1008 for comment_data in mb_data.get("comments", []): 1042 is_new = not bool(cached_item)
1009 comment_service = jid.JID(comment_data["service"]) 1043 if node.startswith(self._events.namespace):
1010 if self.isVirtualJID(comment_service): 1044 # event item
1011 log.debug(f"ignoring virtual comment service: {comment_data}") 1045 event_data = self._events.event_elt_2_event_data(item)
1012 continue
1013 comment_node = comment_data["node"]
1014 await self._p.subscribe(
1015 recipient_client, comment_service, comment_node
1016 )
1017 try: 1046 try:
1018 await self._pa.subscribe( 1047 author_jid = jid.JID(item["publisher"]).userhostJID()
1019 recipient_client, service, node, mb_data["id"] 1048 except (KeyError, RuntimeWarning):
1020 ) 1049 root_elt = item
1021 except exceptions.NotFound: 1050 while root_elt.parent is not None:
1022 log.debug( 1051 root_elt = root_elt.parent
1023 f"no attachment node found for item {mb_data['id']!r} on " 1052 author_jid = jid.JID(root_elt["from"]).userhostJID()
1024 f"{node!r} at {service}" 1053 if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
1025 ) 1054 # we subscribe automatically to comment nodes if any
1026 ap_item = await self.mbdata2APitem(client, mb_data) 1055 recipient_jid = self.getLocalJIDFromAccount(ap_account)
1056 recipient_client = self.client.getVirtualClient(recipient_jid)
1057 comments_data = event_data.get("comments")
1058 if comments_data:
1059 comment_service = jid.JID(comments_data["jid"])
1060 comment_node = comments_data["node"]
1061 await self._p.subscribe(
1062 recipient_client, comment_service, comment_node
1063 )
1064 try:
1065 await self._pa.subscribe(
1066 recipient_client, service, node, event_data["id"]
1067 )
1068 except exceptions.NotFound:
1069 log.debug(
1070 f"no attachment node found for item {event_data['id']!r} "
1071 f"on {node!r} at {service}"
1072 )
1073 ap_item = await self.ap_events.event_data_2_ap_item(
1074 event_data, author_jid, is_new=is_new
1075 )
1076 else:
1077 # blog item
1078 mb_data = await self._m.item2mbdata(client, item, service, node)
1079 author_jid = jid.JID(mb_data["author_jid"])
1080 if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
1081 # we subscribe automatically to comment nodes if any
1082 recipient_jid = self.getLocalJIDFromAccount(ap_account)
1083 recipient_client = self.client.getVirtualClient(recipient_jid)
1084 for comment_data in mb_data.get("comments", []):
1085 comment_service = jid.JID(comment_data["service"])
1086 if self.isVirtualJID(comment_service):
1087 log.debug(
1088 f"ignoring virtual comment service: {comment_data}"
1089 )
1090 continue
1091 comment_node = comment_data["node"]
1092 await self._p.subscribe(
1093 recipient_client, comment_service, comment_node
1094 )
1095 try:
1096 await self._pa.subscribe(
1097 recipient_client, service, node, mb_data["id"]
1098 )
1099 except exceptions.NotFound:
1100 log.debug(
1101 f"no attachment node found for item {mb_data['id']!r} on "
1102 f"{node!r} at {service}"
1103 )
1104 ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new)
1105
1027 url_actor = ap_item["actor"] 1106 url_actor = ap_item["actor"]
1028 elif item.name == "retract": 1107 elif item.name == "retract":
1029 url_actor, ap_item = await self.apDeleteItem( 1108 url_actor, ap_item = await self.apDeleteItem(
1030 client.jid, node, item["id"] 1109 client.jid, node, item["id"]
1031 ) 1110 )
1132 # noticed 1211 # noticed
1133 if "noticed" in attachments: 1212 if "noticed" in attachments:
1134 if not "noticed" in old_attachment: 1213 if not "noticed" in old_attachment:
1135 # new "noticed" attachment, we translate to "Like" activity 1214 # new "noticed" attachment, we translate to "Like" activity
1136 activity_id = self.buildAPURL("like", item_account, item_id) 1215 activity_id = self.buildAPURL("like", item_account, item_id)
1137 like = self.createActivity( 1216 activity = self.createActivity(
1138 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id 1217 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1139 ) 1218 )
1140 like["to"] = [ap_account] 1219 activity["to"] = [ap_account]
1141 like["cc"] = [NS_AP_PUBLIC] 1220 activity["cc"] = [NS_AP_PUBLIC]
1142 await self.signAndPost(inbox, publisher_actor_id, like) 1221 await self.signAndPost(inbox, publisher_actor_id, activity)
1143 else: 1222 else:
1144 if "noticed" in old_attachment: 1223 if "noticed" in old_attachment:
1145 # "noticed" attachment has been removed, we undo the "Like" activity 1224 # "noticed" attachment has been removed, we undo the "Like" activity
1146 activity_id = self.buildAPURL("like", item_account, item_id) 1225 activity_id = self.buildAPURL("like", item_account, item_id)
1147 like = self.createActivity( 1226 activity = self.createActivity(
1148 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id 1227 TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
1149 ) 1228 )
1150 like["to"] = [ap_account] 1229 activity["to"] = [ap_account]
1151 like["cc"] = [NS_AP_PUBLIC] 1230 activity["cc"] = [NS_AP_PUBLIC]
1152 undo = self.createActivity("Undo", publisher_actor_id, like) 1231 undo = self.createActivity("Undo", publisher_actor_id, activity)
1153 await self.signAndPost(inbox, publisher_actor_id, undo) 1232 await self.signAndPost(inbox, publisher_actor_id, undo)
1154 1233
1155 # reactions 1234 # reactions
1156 new_reactions = set(attachments.get("reactions", {}).get("reactions", [])) 1235 new_reactions = set(attachments.get("reactions", {}).get("reactions", []))
1157 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", [])) 1236 old_reactions = set(old_attachment.get("reactions", {}).get("reactions", []))
1174 "Undo", publisher_actor_id, reaction_activity 1253 "Undo", publisher_actor_id, reaction_activity
1175 ) 1254 )
1176 else: 1255 else:
1177 activy = reaction_activity 1256 activy = reaction_activity
1178 await self.signAndPost(inbox, publisher_actor_id, activy) 1257 await self.signAndPost(inbox, publisher_actor_id, activy)
1258
1259 # RSVP
1260 if "rsvp" in attachments:
1261 attending = attachments["rsvp"].get("attending", "no")
1262 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1263 if attending != old_attending:
1264 activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE
1265 activity_id = self.buildAPURL(activity_type.lower(), item_account, item_id)
1266 activity = self.createActivity(
1267 activity_type, publisher_actor_id, item_url, activity_id=activity_id
1268 )
1269 activity["to"] = [ap_account]
1270 activity["cc"] = [NS_AP_PUBLIC]
1271 await self.signAndPost(inbox, publisher_actor_id, activity)
1272 else:
1273 if "rsvp" in old_attachment:
1274 old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
1275 if old_attending == "yes":
1276 activity_id = self.buildAPURL(TYPE_LEAVE.lower(), item_account, item_id)
1277 activity = self.createActivity(
1278 TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id
1279 )
1280 activity["to"] = [ap_account]
1281 activity["cc"] = [NS_AP_PUBLIC]
1282 await self.signAndPost(inbox, publisher_actor_id, activity)
1179 1283
1180 if service.user and self.isVirtualJID(service): 1284 if service.user and self.isVirtualJID(service):
1181 # the item is on a virtual service, we need to store it in cache 1285 # the item is on a virtual service, we need to store it in cache
1182 log.debug("storing attachments item in cache") 1286 log.debug("storing attachments item in cache")
1183 cached_node = await self.host.memory.storage.getPubsubNode( 1287 cached_node = await self.host.memory.storage.getPubsubNode(
1390 collections) 1494 collections)
1391 @return: XMPP Pubsub items and corresponding RSM Response 1495 @return: XMPP Pubsub items and corresponding RSM Response
1392 Items are always returned in chronological order in the result 1496 Items are always returned in chronological order in the result
1393 """ 1497 """
1394 if parser is None: 1498 if parser is None:
1395 parser = self.apItem2Elt 1499 parser = self.ap_item_2_mb_elt
1396 1500
1397 rsm_resp: Dict[str, Union[bool, int]] = {} 1501 rsm_resp: Dict[str, Union[bool, int]] = {}
1398 try: 1502 try:
1399 count = collection["totalItems"] 1503 count = collection["totalItems"]
1400 except KeyError: 1504 except KeyError:
1518 "last": items[-1]["id"] 1622 "last": items[-1]["id"]
1519 }) 1623 })
1520 1624
1521 return items, rsm.RSMResponse(**rsm_resp) 1625 return items, rsm.RSMResponse(**rsm_resp)
1522 1626
1523 async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: 1627 async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
1524 """Convert AP item to parsed microblog data and corresponding item element""" 1628 """Convert AP item to parsed microblog data and corresponding item element"""
1525 mb_data = await self.apItem2MBdata(ap_item) 1629 mb_data = await self.apItem2MBdata(ap_item)
1526 item_elt = await self._m.data2entry( 1630 item_elt = await self._m.data2entry(
1527 self.client, mb_data, mb_data["id"], None, self._m.namespace 1631 self.client, mb_data, mb_data["id"], None, self._m.namespace
1528 ) 1632 )
1530 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"] 1634 item_elt["publisher"] = mb_data["extra"]["repeated"]["by"]
1531 else: 1635 else:
1532 item_elt["publisher"] = mb_data["author_jid"] 1636 item_elt["publisher"] = mb_data["author_jid"]
1533 return mb_data, item_elt 1637 return mb_data, item_elt
1534 1638
1535 async def apItem2Elt(self, ap_item: dict) -> domish.Element: 1639 async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element:
1536 """Convert AP item to XMPP item element""" 1640 """Convert AP item to XMPP item element"""
1537 __, item_elt = await self.apItem2MbDataAndElt(ap_item) 1641 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
1538 return item_elt 1642 return item_elt
1539 1643
1540 async def parseAPPage( 1644 async def parseAPPage(
1541 self, 1645 self,
1542 page: Union[str, dict], 1646 page: Union[str, dict],
1627 @return: AP Item's Object and microblog data 1731 @return: AP Item's Object and microblog data
1628 @raise exceptions.DataError: something is invalid in the AP item 1732 @raise exceptions.DataError: something is invalid in the AP item
1629 @raise NotImplementedError: some AP data is not handled yet 1733 @raise NotImplementedError: some AP data is not handled yet
1630 @raise error.StanzaError: error while contacting the AP server 1734 @raise error.StanzaError: error while contacting the AP server
1631 """ 1735 """
1632 is_activity = self.isActivity(ap_item) 1736 is_activity = self.is_activity(ap_item)
1633 if is_activity: 1737 if is_activity:
1634 ap_object = await self.apGetObject(ap_item, "object") 1738 ap_object = await self.apGetObject(ap_item, "object")
1635 if not ap_object: 1739 if not ap_object:
1636 log.warning(f'No "object" found in AP item {ap_item!r}') 1740 log.warning(f'No "object" found in AP item {ap_item!r}')
1637 raise exceptions.DataError 1741 raise exceptions.DataError
1837 self.buildAPURL(TYPE_FOLLOWERS, repeater_account), 1941 self.buildAPURL(TYPE_FOLLOWERS, repeater_account),
1838 await self.getAPActorIdFromAccount(repeated_account) 1942 await self.getAPActorIdFromAccount(repeated_account)
1839 ] 1943 ]
1840 return announce 1944 return announce
1841 1945
1842 async def mbdata2APitem( 1946 async def mb_data_2_ap_item(
1843 self, 1947 self,
1844 client: SatXMPPEntity, 1948 client: SatXMPPEntity,
1845 mb_data: dict, 1949 mb_data: dict,
1846 public=True 1950 public: bool =True,
1951 is_new: bool = True,
1847 ) -> dict: 1952 ) -> dict:
1848 """Convert Libervia Microblog Data to ActivityPub item 1953 """Convert Libervia Microblog Data to ActivityPub item
1849 1954
1850 @param mb_data: microblog data (as used in plugin XEP-0277) to convert 1955 @param mb_data: microblog data (as used in plugin XEP-0277) to convert
1851 If ``public`` is True, ``service`` and ``node`` keys must be set. 1956 If ``public`` is True, ``service`` and ``node`` keys must be set.
1854 if True, the AP Item will be marked as public, and AP followers of target AP 1959 if True, the AP Item will be marked as public, and AP followers of target AP
1855 account (which retrieve from ``service``) will be put in ``cc``. 1960 account (which retrieve from ``service``) will be put in ``cc``.
1856 ``inReplyTo`` will also be set if suitable 1961 ``inReplyTo`` will also be set if suitable
1857 if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). 1962 if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
1858 This is usually used for direct messages. 1963 This is usually used for direct messages.
1964 @param is_new: if True, the item is a new one (no instance has been found in
1965 cache).
1966 If True, a "Create" activity will be generated, otherwise an "Update" one will
1967 be.
1859 @return: Activity item 1968 @return: Activity item
1860 """ 1969 """
1861 extra = mb_data.get("extra", {}) 1970 extra = mb_data.get("extra", {})
1862 if "repeated" in extra: 1971 if "repeated" in extra:
1863 return await self.repeatedMB2APItem(mb_data) 1972 return await self.repeatedMB2APItem(mb_data)
1939 parent_item, 2048 parent_item,
1940 mb_data 2049 mb_data
1941 ) 2050 )
1942 2051
1943 return self.createActivity( 2052 return self.createActivity(
1944 "Create", url_actor, ap_object, activity_id=url_item 2053 "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item
1945 ) 2054 )
1946 2055
1947 async def publishMessage( 2056 async def publishMessage(
1948 self, 2057 self,
1949 client: SatXMPPEntity, 2058 client: SatXMPPEntity,
1975 try: 2084 try:
1976 inbox_url = ap_actor_data["endpoints"]["sharedInbox"] 2085 inbox_url = ap_actor_data["endpoints"]["sharedInbox"]
1977 except KeyError: 2086 except KeyError:
1978 raise exceptions.DataError("Can't get ActivityPub actor inbox") 2087 raise exceptions.DataError("Can't get ActivityPub actor inbox")
1979 2088
1980 item_data = await self.mbdata2APitem(client, mess_data) 2089 item_data = await self.mb_data_2_ap_item(client, mess_data)
1981 url_actor = item_data["actor"] 2090 url_actor = item_data["actor"]
1982 resp = await self.signAndPost(inbox_url, url_actor, item_data) 2091 resp = await self.signAndPost(inbox_url, url_actor, item_data)
1983 2092
1984 async def apDeleteItem( 2093 async def apDeleteItem(
1985 self, 2094 self,
2094 origin_id = mess_data["extra"].get("origin_id") 2203 origin_id = mess_data["extra"].get("origin_id")
2095 if origin_id: 2204 if origin_id:
2096 # we need to use origin ID when present to be able to retract the message 2205 # we need to use origin ID when present to be able to retract the message
2097 mb_data["id"] = origin_id 2206 mb_data["id"] = origin_id
2098 client = self.client.getVirtualClient(mess_data["from"]) 2207 client = self.client.getVirtualClient(mess_data["from"])
2099 ap_item = await self.mbdata2APitem(client, mb_data, public=False) 2208 ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False)
2100 ap_item["object"]["to"] = ap_item["to"] = [actor_id] 2209 ap_item["object"]["to"] = ap_item["to"] = [actor_id]
2101 await self.signAndPost(inbox, ap_item["actor"], ap_item) 2210 await self.signAndPost(inbox, ap_item["actor"], ap_item)
2102 return mess_data 2211 return mess_data
2103 2212
2104 async def _onMessageRetract( 2213 async def _onMessageRetract(
2203 cached_item = cached_items[0] 2312 cached_item = cached_items[0]
2204 2313
2205 mb_data = await self._m.item2mbdata( 2314 mb_data = await self._m.item2mbdata(
2206 client, cached_item.data, pubsub_service, pubsub_node 2315 client, cached_item.data, pubsub_service, pubsub_node
2207 ) 2316 )
2208 ap_item = await self.mbdata2APitem(client, mb_data) 2317 ap_item = await self.mb_data_2_ap_item(client, mb_data)
2209 ap_object = ap_item["object"] 2318 ap_object = ap_item["object"]
2210 ap_object["to"] = [actor_id] 2319 ap_object["to"] = [actor_id]
2211 ap_object.setdefault("tag", []).append({ 2320 ap_object.setdefault("tag", []).append({
2212 "type": TYPE_MENTION, 2321 "type": TYPE_MENTION,
2213 "href": actor_id, 2322 "href": actor_id,
2269 # we don't have a comment node set for this item 2378 # we don't have a comment node set for this item
2270 from sat.tools.xml_tools import ppElt 2379 from sat.tools.xml_tools import ppElt
2271 log.info(f"{ppElt(parent_item_elt.toXml())}") 2380 log.info(f"{ppElt(parent_item_elt.toXml())}")
2272 raise NotImplementedError() 2381 raise NotImplementedError()
2273 else: 2382 else:
2274 __, item_elt = await self.apItem2MbDataAndElt(ap_item) 2383 __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
2275 await self._p.publish(client, comment_service, comment_node, [item_elt]) 2384 await self._p.publish(client, comment_service, comment_node, [item_elt])
2276 await self.notifyMentions( 2385 await self.notifyMentions(
2277 targets, mentions, comment_service, comment_node, item_elt["id"] 2386 targets, mentions, comment_service, comment_node, item_elt["id"]
2278 ) 2387 )
2279 2388
2452 @param node: XMPP pubsub node 2561 @param node: XMPP pubsub node
2453 @param item: AP object payload 2562 @param item: AP object payload
2454 @param public: True if the item is public 2563 @param public: True if the item is public
2455 """ 2564 """
2456 # XXX: "public" is not used for now 2565 # XXX: "public" is not used for now
2457
2458 service = client.jid 2566 service = client.jid
2459 in_reply_to = item.get("inReplyTo") 2567 in_reply_to = item.get("inReplyTo")
2460 2568
2461 if in_reply_to and isinstance(in_reply_to, list): 2569 if in_reply_to and isinstance(in_reply_to, list):
2462 in_reply_to = in_reply_to[0] 2570 in_reply_to = in_reply_to[0]
2474 client, service, node, with_subscriptions=True, create=True, 2582 client, service, node, with_subscriptions=True, create=True,
2475 create_kwargs={"subscribed": True} 2583 create_kwargs={"subscribed": True}
2476 ) 2584 )
2477 else: 2585 else:
2478 # it is a root item (i.e. not a reply to an other item) 2586 # it is a root item (i.e. not a reply to an other item)
2587 create = node == self._events.namespace
2479 cached_node = await self.host.memory.storage.getPubsubNode( 2588 cached_node = await self.host.memory.storage.getPubsubNode(
2480 client, service, node, with_subscriptions=True 2589 client, service, node, with_subscriptions=True, create=create
2481 ) 2590 )
2482 if cached_node is None: 2591 if cached_node is None:
2483 log.warning( 2592 log.warning(
2484 f"Received item in unknown node {node!r} at {service}. This may be " 2593 f"Received item in unknown node {node!r} at {service}. This may be "
2485 f"due to a cache purge. We synchronise the node\n{item}" 2594 f"due to a cache purge. We synchronise the node\n{item}"
2486 2595
2487 ) 2596 )
2488 return 2597 return
2489 mb_data, item_elt = await self.apItem2MbDataAndElt(item) 2598 if item.get("type") == TYPE_EVENT:
2599 data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item)
2600 else:
2601 data, item_elt = await self.ap_item_2_mb_data_and_elt(item)
2490 await self.host.memory.storage.cachePubsubItems( 2602 await self.host.memory.storage.cachePubsubItems(
2491 client, 2603 client,
2492 cached_node, 2604 cached_node,
2493 [item_elt], 2605 [item_elt],
2494 [mb_data] 2606 [data]
2495 ) 2607 )
2496 2608
2497 for subscription in cached_node.subscriptions: 2609 for subscription in cached_node.subscriptions:
2498 if subscription.state != SubscriptionState.SUBSCRIBED: 2610 if subscription.state != SubscriptionState.SUBSCRIBED:
2499 continue 2611 continue