Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3764:125c7043b277
comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
this patch implements those major features:
- `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish
to AP using this service
- replies to XMPP items are managed
- `inReplyTo` is filled when converting XMPP items to AP objects
- `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to
XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to
XMPP and put in cache. Subscriptions are always public.
- `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription
(which should be XEP-0465, but the XEP is not yet published at the time of commit), in
both directions.
- new helper methods to check if an URL is local and to get JID from actor ID
doc will follow to explain behaviour
rel 365
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 13 May 2022 19:12:33 +0200 |
parents | a8c7e5cef0cb |
children | 865167c34b82 |
comparison
equal
deleted
inserted
replaced
3763:b2ade5ecdbab | 3764:125c7043b277 |
---|---|
14 # GNU Affero General Public License for more details. | 14 # GNU Affero General Public License for more details. |
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 from typing import Optional, Tuple, List, Dict, Any | 19 from typing import Optional, Tuple, List, Union |
20 | 20 |
21 from twisted.internet import defer | 21 from twisted.internet import defer |
22 from twisted.words.protocols.jabber import jid, error | 22 from twisted.words.protocols.jabber import jid, error |
23 from twisted.words.xish import domish | 23 from twisted.words.xish import domish |
24 from wokkel import rsm, pubsub, data_form | 24 from wokkel import rsm, pubsub, disco |
25 | 25 |
26 from sat.core.i18n import _ | 26 from sat.core.i18n import _ |
27 from sat.core import exceptions | 27 from sat.core import exceptions |
28 from sat.core.log import getLogger | 28 from sat.core.log import getLogger |
29 from sat.core.constants import Const as C | 29 from sat.core.constants import Const as C |
90 return requestor_actor_id, recipient_actor_id, inbox | 90 return requestor_actor_id, recipient_actor_id, inbox |
91 | 91 |
92 | 92 |
93 @ensure_deferred | 93 @ensure_deferred |
94 async def publish(self, requestor, service, nodeIdentifier, items): | 94 async def publish(self, requestor, service, nodeIdentifier, items): |
95 raise NotImplementedError | 95 if self.apg.local_only and not self.apg.isLocal(requestor): |
96 raise error.StanzaError( | |
97 "forbidden", | |
98 "Only local users can publish on this gateway." | |
99 ) | |
100 if not service.user: | |
101 raise error.StanzaError( | |
102 "bad-request", | |
103 "You must specify an ActivityPub actor account in JID user part." | |
104 ) | |
105 ap_account = self.apg._e.unescape(service.user) | |
106 if ap_account.count("@") != 1: | |
107 raise error.StanzaError( | |
108 "bad-request", | |
109 f"{ap_account!r} is not a valid ActivityPub actor account." | |
110 ) | |
111 | |
112 client = self.apg.client.getVirtualClient(requestor) | |
113 await self.apg.convertAndPostItems( | |
114 client, ap_account, service, nodeIdentifier, items | |
115 ) | |
116 | |
117 async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: | |
118 """Convert actor ID from following collection to XMPP item""" | |
119 actor_id = ap_item["id"] | |
120 actor_jid = await self.apg.getJIDFromId(actor_id) | |
121 subscription_elt = self.apg._pps.buildSubscriptionElt( | |
122 self.apg._m.namespace, actor_jid | |
123 ) | |
124 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) | |
125 return item_elt | |
126 | |
127 async def apFollower2Elt(self, ap_item: dict) -> domish.Element: | |
128 """Convert actor ID from followers collection to XMPP item""" | |
129 actor_id = ap_item["id"] | |
130 actor_jid = await self.apg.getJIDFromId(actor_id) | |
131 subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid) | |
132 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) | |
133 return item_elt | |
96 | 134 |
97 @ensure_deferred | 135 @ensure_deferred |
98 async def items( | 136 async def items( |
99 self, | 137 self, |
100 requestor: jid.JID, | 138 requestor: jid.JID, |
108 return [], None | 146 return [], None |
109 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) | 147 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) |
110 if ap_account.count("@") != 1: | 148 if ap_account.count("@") != 1: |
111 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") | 149 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") |
112 return [], None | 150 return [], None |
113 if not node.startswith(self.apg._m.namespace): | 151 |
114 raise error.StanzaError( | 152 kwargs = {} |
115 "feature-not-implemented", | 153 |
116 text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} " | 154 if node == self.apg._pps.subscriptions_node: |
117 "node for now" | 155 collection_name = "following" |
118 ) | 156 parser = self.apFollowing2Elt |
157 kwargs["only_ids"] = True | |
158 use_cache = False | |
159 elif node.startswith(self.apg._pps.subscribers_node_prefix): | |
160 collection_name = "followers" | |
161 parser = self.apFollower2Elt | |
162 kwargs["only_ids"] = True | |
163 use_cache = False | |
164 else: | |
165 if not node.startswith(self.apg._m.namespace): | |
166 raise error.StanzaError( | |
167 "feature-not-implemented", | |
168 text=f"AP Gateway {C.APP_VERSION} only supports " | |
169 f"{self.apg._m.namespace} node for now" | |
170 ) | |
171 collection_name = "outbox" | |
172 parser = self.apg.apItem2Elt | |
173 use_cache = True | |
174 | |
119 client = self.apg.client | 175 client = self.apg.client |
120 cached_node = await self.host.memory.storage.getPubsubNode( | 176 if use_cache: |
121 client, service, node | 177 cached_node = await self.host.memory.storage.getPubsubNode( |
122 ) | 178 client, service, node |
123 # TODO: check if node is synchronised | 179 ) |
124 if cached_node is not None: | 180 # TODO: check if node is synchronised |
125 # the node is cached, we return items from cache | 181 if cached_node is not None: |
126 log.debug(f"node {node!r} from {service} is in cache") | 182 # the node is cached, we return items from cache |
127 pubsub_items, metadata = await self.apg._c.getItemsFromCache( | 183 log.debug(f"node {node!r} from {service} is in cache") |
128 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req | 184 pubsub_items, metadata = await self.apg._c.getItemsFromCache( |
129 ) | 185 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req |
130 try: | 186 ) |
131 rsm_resp = rsm.RSMResponse(**metadata["rsm"]) | 187 try: |
132 except KeyError: | 188 rsm_resp = rsm.RSMResponse(**metadata["rsm"]) |
133 rsm_resp = None | 189 except KeyError: |
134 return [i.data for i in pubsub_items], rsm_resp | 190 rsm_resp = None |
191 return [i.data for i in pubsub_items], rsm_resp | |
135 | 192 |
136 if itemIdentifiers: | 193 if itemIdentifiers: |
137 items = [] | 194 items = [] |
138 for item_id in itemIdentifiers: | 195 for item_id in itemIdentifiers: |
139 item_data = await self.apg.apGet(item_id) | 196 item_data = await self.apg.apGet(item_id) |
140 item_elt = await self.apg.apItem2Elt(item_data) | 197 item_elt = await parser(item_data) |
141 items.append(item_elt) | 198 items.append(item_elt) |
142 return items, None | 199 return items, None |
143 else: | 200 else: |
144 if rsm_req is None: | 201 if rsm_req is None: |
145 if maxItems is None: | 202 if maxItems is None: |
146 maxItems = 20 | 203 maxItems = 20 |
147 kwargs = { | 204 kwargs.update({ |
148 "max_items": maxItems, | 205 "max_items": maxItems, |
149 "chronological_pagination": False, | 206 "chronological_pagination": False, |
150 } | 207 }) |
151 else: | 208 else: |
152 if len( | 209 if len( |
153 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) | 210 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) |
154 if v is not None] | 211 if v is not None] |
155 ) > 1: | 212 ) > 1: |
156 raise error.StanzaError( | 213 raise error.StanzaError( |
157 "bad-request", | 214 "bad-request", |
158 text="You can't use after, before and index at the same time" | 215 text="You can't use after, before and index at the same time" |
159 ) | 216 ) |
160 kwargs = {"max_items": rsm_req.max} | 217 kwargs.update({"max_items": rsm_req.max}) |
161 if rsm_req.after is not None: | 218 if rsm_req.after is not None: |
162 kwargs["after_id"] = rsm_req.after | 219 kwargs["after_id"] = rsm_req.after |
163 elif rsm_req.before is not None: | 220 elif rsm_req.before is not None: |
164 kwargs["chronological_pagination"] = False | 221 kwargs["chronological_pagination"] = False |
165 if rsm_req.before != "": | 222 if rsm_req.before != "": |
169 | 226 |
170 log.info( | 227 log.info( |
171 f"No cache found for node {node} at {service} (AP account {ap_account}), " | 228 f"No cache found for node {node} at {service} (AP account {ap_account}), " |
172 "using Collection Paging to RSM translation" | 229 "using Collection Paging to RSM translation" |
173 ) | 230 ) |
174 if self.apg._m.isCommentsNode(node): | 231 if self.apg._m.isCommentNode(node): |
175 parent_node = self.apg._m.getParentNode(node) | 232 parent_item = self.apg._m.getParentItem(node) |
176 try: | 233 try: |
177 parent_data = await self.apg.apGet(parent_node) | 234 parent_data = await self.apg.apGet(parent_item) |
178 collection = await self.apg.apGetObject( | 235 collection = await self.apg.apGetObject( |
179 parent_data.get("object", {}), | 236 parent_data.get("object", {}), |
180 "replies" | 237 "replies" |
181 ) | 238 ) |
182 except Exception as e: | 239 except Exception as e: |
184 "item-not-found", | 241 "item-not-found", |
185 text=e | 242 text=e |
186 ) | 243 ) |
187 else: | 244 else: |
188 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) | 245 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) |
189 collection = await self.apg.apGetObject(actor_data, "outbox") | 246 collection = await self.apg.apGetObject(actor_data, collection_name) |
190 if not collection: | 247 if not collection: |
191 raise error.StanzaError( | 248 raise error.StanzaError( |
192 "item-not-found", | 249 "item-not-found", |
193 text=f"No collection found for node {node!r} (account: {ap_account})" | 250 text=f"No collection found for node {node!r} (account: {ap_account})" |
194 ) | 251 ) |
252 | |
253 kwargs["parser"] = parser | |
195 return await self.apg.getAPItems(collection, **kwargs) | 254 return await self.apg.getAPItems(collection, **kwargs) |
196 | 255 |
197 @ensure_deferred | 256 @ensure_deferred |
198 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): | 257 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): |
199 raise NotImplementedError | 258 raise NotImplementedError |