comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3764:125c7043b277

comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers: this patch implements those major features: - `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish to AP using this service - replies to XMPP items are managed - `inReplyTo` is filled when converting XMPP items to AP objects - `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to XMPP and put in cache. Subscriptions are always public. - `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription (which should be XEP-0465, but the XEP is not yet published at the time of commit), in both directions. - new helper methods to check if an URL is local and to get JID from actor ID doc will follow to explain behaviour rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 19:12:33 +0200
parents a8c7e5cef0cb
children 865167c34b82
comparison
equal deleted inserted replaced
3763:b2ade5ecdbab 3764:125c7043b277
14 # GNU Affero General Public License for more details. 14 # GNU Affero General Public License for more details.
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 from typing import Optional, Tuple, List, Dict, Any 19 from typing import Optional, Tuple, List, Union
20 20
21 from twisted.internet import defer 21 from twisted.internet import defer
22 from twisted.words.protocols.jabber import jid, error 22 from twisted.words.protocols.jabber import jid, error
23 from twisted.words.xish import domish 23 from twisted.words.xish import domish
24 from wokkel import rsm, pubsub, data_form 24 from wokkel import rsm, pubsub, disco
25 25
26 from sat.core.i18n import _ 26 from sat.core.i18n import _
27 from sat.core import exceptions 27 from sat.core import exceptions
28 from sat.core.log import getLogger 28 from sat.core.log import getLogger
29 from sat.core.constants import Const as C 29 from sat.core.constants import Const as C
90 return requestor_actor_id, recipient_actor_id, inbox 90 return requestor_actor_id, recipient_actor_id, inbox
91 91
92 92
93 @ensure_deferred 93 @ensure_deferred
94 async def publish(self, requestor, service, nodeIdentifier, items): 94 async def publish(self, requestor, service, nodeIdentifier, items):
95 raise NotImplementedError 95 if self.apg.local_only and not self.apg.isLocal(requestor):
96 raise error.StanzaError(
97 "forbidden",
98 "Only local users can publish on this gateway."
99 )
100 if not service.user:
101 raise error.StanzaError(
102 "bad-request",
103 "You must specify an ActivityPub actor account in JID user part."
104 )
105 ap_account = self.apg._e.unescape(service.user)
106 if ap_account.count("@") != 1:
107 raise error.StanzaError(
108 "bad-request",
109 f"{ap_account!r} is not a valid ActivityPub actor account."
110 )
111
112 client = self.apg.client.getVirtualClient(requestor)
113 await self.apg.convertAndPostItems(
114 client, ap_account, service, nodeIdentifier, items
115 )
116
117 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
118 """Convert actor ID from following collection to XMPP item"""
119 actor_id = ap_item["id"]
120 actor_jid = await self.apg.getJIDFromId(actor_id)
121 subscription_elt = self.apg._pps.buildSubscriptionElt(
122 self.apg._m.namespace, actor_jid
123 )
124 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
125 return item_elt
126
127 async def apFollower2Elt(self, ap_item: dict) -> domish.Element:
128 """Convert actor ID from followers collection to XMPP item"""
129 actor_id = ap_item["id"]
130 actor_jid = await self.apg.getJIDFromId(actor_id)
131 subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid)
132 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
133 return item_elt
96 134
97 @ensure_deferred 135 @ensure_deferred
98 async def items( 136 async def items(
99 self, 137 self,
100 requestor: jid.JID, 138 requestor: jid.JID,
108 return [], None 146 return [], None
109 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) 147 ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
110 if ap_account.count("@") != 1: 148 if ap_account.count("@") != 1:
111 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") 149 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
112 return [], None 150 return [], None
113 if not node.startswith(self.apg._m.namespace): 151
114 raise error.StanzaError( 152 kwargs = {}
115 "feature-not-implemented", 153
116 text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} " 154 if node == self.apg._pps.subscriptions_node:
117 "node for now" 155 collection_name = "following"
118 ) 156 parser = self.apFollowing2Elt
157 kwargs["only_ids"] = True
158 use_cache = False
159 elif node.startswith(self.apg._pps.subscribers_node_prefix):
160 collection_name = "followers"
161 parser = self.apFollower2Elt
162 kwargs["only_ids"] = True
163 use_cache = False
164 else:
165 if not node.startswith(self.apg._m.namespace):
166 raise error.StanzaError(
167 "feature-not-implemented",
168 text=f"AP Gateway {C.APP_VERSION} only supports "
169 f"{self.apg._m.namespace} node for now"
170 )
171 collection_name = "outbox"
172 parser = self.apg.apItem2Elt
173 use_cache = True
174
119 client = self.apg.client 175 client = self.apg.client
120 cached_node = await self.host.memory.storage.getPubsubNode( 176 if use_cache:
121 client, service, node 177 cached_node = await self.host.memory.storage.getPubsubNode(
122 ) 178 client, service, node
123 # TODO: check if node is synchronised 179 )
124 if cached_node is not None: 180 # TODO: check if node is synchronised
125 # the node is cached, we return items from cache 181 if cached_node is not None:
126 log.debug(f"node {node!r} from {service} is in cache") 182 # the node is cached, we return items from cache
127 pubsub_items, metadata = await self.apg._c.getItemsFromCache( 183 log.debug(f"node {node!r} from {service} is in cache")
128 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req 184 pubsub_items, metadata = await self.apg._c.getItemsFromCache(
129 ) 185 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
130 try: 186 )
131 rsm_resp = rsm.RSMResponse(**metadata["rsm"]) 187 try:
132 except KeyError: 188 rsm_resp = rsm.RSMResponse(**metadata["rsm"])
133 rsm_resp = None 189 except KeyError:
134 return [i.data for i in pubsub_items], rsm_resp 190 rsm_resp = None
191 return [i.data for i in pubsub_items], rsm_resp
135 192
136 if itemIdentifiers: 193 if itemIdentifiers:
137 items = [] 194 items = []
138 for item_id in itemIdentifiers: 195 for item_id in itemIdentifiers:
139 item_data = await self.apg.apGet(item_id) 196 item_data = await self.apg.apGet(item_id)
140 item_elt = await self.apg.apItem2Elt(item_data) 197 item_elt = await parser(item_data)
141 items.append(item_elt) 198 items.append(item_elt)
142 return items, None 199 return items, None
143 else: 200 else:
144 if rsm_req is None: 201 if rsm_req is None:
145 if maxItems is None: 202 if maxItems is None:
146 maxItems = 20 203 maxItems = 20
147 kwargs = { 204 kwargs.update({
148 "max_items": maxItems, 205 "max_items": maxItems,
149 "chronological_pagination": False, 206 "chronological_pagination": False,
150 } 207 })
151 else: 208 else:
152 if len( 209 if len(
153 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) 210 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
154 if v is not None] 211 if v is not None]
155 ) > 1: 212 ) > 1:
156 raise error.StanzaError( 213 raise error.StanzaError(
157 "bad-request", 214 "bad-request",
158 text="You can't use after, before and index at the same time" 215 text="You can't use after, before and index at the same time"
159 ) 216 )
160 kwargs = {"max_items": rsm_req.max} 217 kwargs.update({"max_items": rsm_req.max})
161 if rsm_req.after is not None: 218 if rsm_req.after is not None:
162 kwargs["after_id"] = rsm_req.after 219 kwargs["after_id"] = rsm_req.after
163 elif rsm_req.before is not None: 220 elif rsm_req.before is not None:
164 kwargs["chronological_pagination"] = False 221 kwargs["chronological_pagination"] = False
165 if rsm_req.before != "": 222 if rsm_req.before != "":
169 226
170 log.info( 227 log.info(
171 f"No cache found for node {node} at {service} (AP account {ap_account}), " 228 f"No cache found for node {node} at {service} (AP account {ap_account}), "
172 "using Collection Paging to RSM translation" 229 "using Collection Paging to RSM translation"
173 ) 230 )
174 if self.apg._m.isCommentsNode(node): 231 if self.apg._m.isCommentNode(node):
175 parent_node = self.apg._m.getParentNode(node) 232 parent_item = self.apg._m.getParentItem(node)
176 try: 233 try:
177 parent_data = await self.apg.apGet(parent_node) 234 parent_data = await self.apg.apGet(parent_item)
178 collection = await self.apg.apGetObject( 235 collection = await self.apg.apGetObject(
179 parent_data.get("object", {}), 236 parent_data.get("object", {}),
180 "replies" 237 "replies"
181 ) 238 )
182 except Exception as e: 239 except Exception as e:
184 "item-not-found", 241 "item-not-found",
185 text=e 242 text=e
186 ) 243 )
187 else: 244 else:
188 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) 245 actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
189 collection = await self.apg.apGetObject(actor_data, "outbox") 246 collection = await self.apg.apGetObject(actor_data, collection_name)
190 if not collection: 247 if not collection:
191 raise error.StanzaError( 248 raise error.StanzaError(
192 "item-not-found", 249 "item-not-found",
193 text=f"No collection found for node {node!r} (account: {ap_account})" 250 text=f"No collection found for node {node!r} (account: {ap_account})"
194 ) 251 )
252
253 kwargs["parser"] = parser
195 return await self.apg.getAPItems(collection, **kwargs) 254 return await self.apg.getAPItems(collection, **kwargs)
196 255
197 @ensure_deferred 256 @ensure_deferred
198 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): 257 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
199 raise NotImplementedError 258 raise NotImplementedError