Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3745:a8c7e5cef0cb
comp AP gateway: signature checking, caching and threads management:
- HTTP signature is checked for incoming messages
- AP actor can now be followed using pubsub subscription. When following is accepted, the
node is cached
- replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth`
option to limit the number of comment nodes for a root message (documentation will come
to explain this).
ticket 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | 86eea17cafa7 |
children | 125c7043b277 |
comparison
equal
deleted
inserted
replaced
3744:658ddbabaf36 | 3745:a8c7e5cef0cb |
---|---|
14 # GNU Affero General Public License for more details. | 14 # GNU Affero General Public License for more details. |
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 from typing import Optional, List | 19 from typing import Optional, Tuple, List, Dict, Any |
20 | 20 |
21 from twisted.internet import defer | |
21 from twisted.words.protocols.jabber import jid, error | 22 from twisted.words.protocols.jabber import jid, error |
22 from twisted.words.xish import domish | 23 from twisted.words.xish import domish |
23 from wokkel import rsm | 24 from wokkel import rsm, pubsub, data_form |
24 | 25 |
25 from sat.core.i18n import _ | 26 from sat.core.i18n import _ |
27 from sat.core import exceptions | |
26 from sat.core.log import getLogger | 28 from sat.core.log import getLogger |
29 from sat.core.constants import Const as C | |
27 from sat.tools.utils import ensure_deferred | 30 from sat.tools.utils import ensure_deferred |
31 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState | |
32 | |
33 from .constants import ( | |
34 TYPE_ACTOR, | |
35 ) | |
28 | 36 |
29 | 37 |
30 log = getLogger(__name__) | 38 log = getLogger(__name__) |
39 | |
40 # all nodes have the same config | |
41 NODE_CONFIG = [ | |
42 {"var": "pubsub#persist_items", "type": "boolean", "value": True}, | |
43 {"var": "pubsub#max_items", "value": "max"}, | |
44 {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, | |
45 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, | |
46 | |
47 ] | |
48 | |
49 NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} | |
50 NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} | |
51 for c in NODE_CONFIG: | |
52 NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")}) | |
31 | 53 |
32 | 54 |
33 class APPubsubService(rsm.PubSubService): | 55 class APPubsubService(rsm.PubSubService): |
34 """Pubsub service for XMPP requests""" | 56 """Pubsub service for XMPP requests""" |
35 | 57 |
41 "category": "pubsub", | 63 "category": "pubsub", |
42 "type": "service", | 64 "type": "service", |
43 "name": "Libervia ActivityPub Gateway", | 65 "name": "Libervia ActivityPub Gateway", |
44 } | 66 } |
45 | 67 |
68 async def getAPActorIdsAndInbox( | |
69 self, | |
70 requestor: jid.JID, | |
71 recipient: jid.JID, | |
72 ) -> Tuple[str, str, str]: | |
73 """Get AP actor IDs from requestor and destinee JIDs | |
74 | |
75 @param requestor: XMPP entity doing a request to an AP actor via the gateway | |
76 @param recipient: JID mapping an AP actor via the gateway | |
77 @return: requestor actor ID, recipient actor ID and recipient inbox | |
78 @raise error.StanzaError: "item-not-found" is raised if not user part is specified | |
79 in requestor | |
80 """ | |
81 if not recipient.user: | |
82 raise error.StanzaError( | |
83 "item-not-found", | |
84 text="No user part specified" | |
85 ) | |
86 requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost()) | |
87 recipient_account = self.apg._e.unescape(recipient.user) | |
88 recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account) | |
89 inbox = await self.apg.getAPInboxFromId(recipient_actor_id) | |
90 return requestor_actor_id, recipient_actor_id, inbox | |
91 | |
92 | |
46 @ensure_deferred | 93 @ensure_deferred |
47 async def publish(self, requestor, service, nodeIdentifier, items): | 94 async def publish(self, requestor, service, nodeIdentifier, items): |
48 raise NotImplementedError | 95 raise NotImplementedError |
49 | 96 |
50 @ensure_deferred | 97 @ensure_deferred |
54 service: jid.JID, | 101 service: jid.JID, |
55 node: str, | 102 node: str, |
56 maxItems: Optional[int], | 103 maxItems: Optional[int], |
57 itemIdentifiers: Optional[List[str]], | 104 itemIdentifiers: Optional[List[str]], |
58 rsm_req: Optional[rsm.RSMRequest] | 105 rsm_req: Optional[rsm.RSMRequest] |
59 ) -> List[domish.Element]: | 106 ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: |
60 if not service.user: | 107 if not service.user: |
61 return [] | 108 return [], None |
62 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) | 109 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) |
63 if ap_account.count("@") != 1: | 110 if ap_account.count("@") != 1: |
64 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") | 111 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") |
65 return [] | 112 return [], None |
66 if node != self.apg._m.namespace: | 113 if not node.startswith(self.apg._m.namespace): |
67 raise error.StanzaError( | 114 raise error.StanzaError( |
68 "feature-not-implemented", | 115 "feature-not-implemented", |
69 text=f"{VERSION} only supports {self.apg._m.namespace} " | 116 text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} " |
70 "node for now" | 117 "node for now" |
71 ) | 118 ) |
72 if rsm_req is None: | 119 client = self.apg.client |
73 if maxItems is None: | 120 cached_node = await self.host.memory.storage.getPubsubNode( |
74 maxItems = 20 | 121 client, service, node |
75 kwargs = { | 122 ) |
76 "max_items": maxItems, | 123 # TODO: check if node is synchronised |
77 "chronological_pagination": False, | 124 if cached_node is not None: |
78 } | 125 # the node is cached, we return items from cache |
126 log.debug(f"node {node!r} from {service} is in cache") | |
127 pubsub_items, metadata = await self.apg._c.getItemsFromCache( | |
128 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req | |
129 ) | |
130 try: | |
131 rsm_resp = rsm.RSMResponse(**metadata["rsm"]) | |
132 except KeyError: | |
133 rsm_resp = None | |
134 return [i.data for i in pubsub_items], rsm_resp | |
135 | |
136 if itemIdentifiers: | |
137 items = [] | |
138 for item_id in itemIdentifiers: | |
139 item_data = await self.apg.apGet(item_id) | |
140 item_elt = await self.apg.apItem2Elt(item_data) | |
141 items.append(item_elt) | |
142 return items, None | |
79 else: | 143 else: |
80 if len( | 144 if rsm_req is None: |
81 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) | 145 if maxItems is None: |
82 if v is not None] | 146 maxItems = 20 |
83 ) > 1: | 147 kwargs = { |
148 "max_items": maxItems, | |
149 "chronological_pagination": False, | |
150 } | |
151 else: | |
152 if len( | |
153 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) | |
154 if v is not None] | |
155 ) > 1: | |
156 raise error.StanzaError( | |
157 "bad-request", | |
158 text="You can't use after, before and index at the same time" | |
159 ) | |
160 kwargs = {"max_items": rsm_req.max} | |
161 if rsm_req.after is not None: | |
162 kwargs["after_id"] = rsm_req.after | |
163 elif rsm_req.before is not None: | |
164 kwargs["chronological_pagination"] = False | |
165 if rsm_req.before != "": | |
166 kwargs["after_id"] = rsm_req.before | |
167 elif rsm_req.index is not None: | |
168 kwargs["start_index"] = rsm_req.index | |
169 | |
170 log.info( | |
171 f"No cache found for node {node} at {service} (AP account {ap_account}), " | |
172 "using Collection Paging to RSM translation" | |
173 ) | |
174 if self.apg._m.isCommentsNode(node): | |
175 parent_node = self.apg._m.getParentNode(node) | |
176 try: | |
177 parent_data = await self.apg.apGet(parent_node) | |
178 collection = await self.apg.apGetObject( | |
179 parent_data.get("object", {}), | |
180 "replies" | |
181 ) | |
182 except Exception as e: | |
183 raise error.StanzaError( | |
184 "item-not-found", | |
185 text=e | |
186 ) | |
187 else: | |
188 actor_data = await self.apg.getAPActorDataFromAccount(ap_account) | |
189 collection = await self.apg.apGetObject(actor_data, "outbox") | |
190 if not collection: | |
84 raise error.StanzaError( | 191 raise error.StanzaError( |
85 "bad-request", | 192 "item-not-found", |
86 text="You can't use after, before and index at the same time" | 193 text=f"No collection found for node {node!r} (account: {ap_account})" |
87 ) | 194 ) |
88 kwargs = {"max_items": rsm_req.max} | 195 return await self.apg.getAPItems(collection, **kwargs) |
89 if rsm_req.after is not None: | |
90 kwargs["after_id"] = rsm_req.after | |
91 elif rsm_req.before is not None: | |
92 kwargs["chronological_pagination"] = False | |
93 if rsm_req.before != "": | |
94 kwargs["after_id"] = rsm_req.before | |
95 elif rsm_req.index is not None: | |
96 kwargs["start_index"] = rsm_req.index | |
97 | |
98 log.info( | |
99 f"No cache found for node {node} at {service} (AP account {ap_account}), " | |
100 "using Collection Paging to RSM translation" | |
101 ) | |
102 return await self.apg.getAPItems(ap_account, **kwargs) | |
103 | 196 |
104 @ensure_deferred | 197 @ensure_deferred |
105 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): | 198 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): |
106 raise NotImplementedError | 199 raise NotImplementedError |
200 | |
201 @ensure_deferred | |
202 async def subscribe(self, requestor, service, nodeIdentifier, subscriber): | |
203 # TODO: handle comments nodes | |
204 client = self.apg.client | |
205 node = await self.host.memory.storage.getPubsubNode( | |
206 client, service, nodeIdentifier, with_subscriptions=True | |
207 ) | |
208 if node is None: | |
209 node = await self.host.memory.storage.setPubsubNode( | |
210 client, | |
211 service, | |
212 nodeIdentifier, | |
213 ) | |
214 subscription = None | |
215 else: | |
216 try: | |
217 subscription = next( | |
218 s for s in node.subscriptions | |
219 if s.subscriber == requestor.userhostJID() | |
220 ) | |
221 except StopIteration: | |
222 subscription = None | |
223 | |
224 if subscription is None: | |
225 subscription = PubsubSub( | |
226 subscriber=requestor.userhostJID(), | |
227 state=SubscriptionState.PENDING | |
228 ) | |
229 node.subscriptions.append(subscription) | |
230 await self.host.memory.storage.add(node) | |
231 else: | |
232 if subscription.state is None: | |
233 subscription.state = SubscriptionState.PENDING | |
234 await self.host.memory.storage.add(node) | |
235 elif subscription.state == SubscriptionState.SUBSCRIBED: | |
236 log.info( | |
237 f"{requestor.userhostJID()} has already a subscription to {node!r} " | |
238 f"at {service}. Doing the request anyway." | |
239 ) | |
240 elif subscription.state == SubscriptionState.PENDING: | |
241 log.info( | |
242 f"{requestor.userhostJID()} has already a pending subscription to " | |
243 f"{node!r} at {service}. Doing the request anyway." | |
244 ) | |
245 else: | |
246 raise exceptions.InternalError( | |
247 f"unmanaged subscription state: {subscription.state}" | |
248 ) | |
249 | |
250 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( | |
251 requestor, service | |
252 ) | |
253 | |
254 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) | |
255 | |
256 resp = await self.apg.signAndPost(inbox, req_actor_id, data) | |
257 if resp.code >= 400: | |
258 text = await resp.text() | |
259 raise error.StanzaError("service-unavailable", text=text) | |
260 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") | |
261 | |
262 @ensure_deferred | |
263 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): | |
264 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( | |
265 requestor, service | |
266 ) | |
267 data = self.apg.createActivity( | |
268 "Undo", | |
269 req_actor_id, | |
270 self.apg.createActivity( | |
271 "Follow", | |
272 req_actor_id, | |
273 recip_actor_id | |
274 ) | |
275 ) | |
276 | |
277 resp = await self.apg.signAndPost(inbox, req_actor_id, data) | |
278 if resp.code >= 400: | |
279 text = await resp.text() | |
280 raise error.StanzaError("service-unavailable", text=text) | |
281 | |
282 def getConfigurationOptions(self): | |
283 return NODE_OPTIONS | |
284 | |
285 def getConfiguration( | |
286 self, | |
287 requestor: jid.JID, | |
288 service: jid.JID, | |
289 nodeIdentifier: str | |
290 ) -> defer.Deferred: | |
291 return defer.succeed(NODE_CONFIG_VALUES) | |
107 | 292 |
108 def getNodeInfo( | 293 def getNodeInfo( |
109 self, | 294 self, |
110 requestor: jid.JID, | 295 requestor: jid.JID, |
111 service: jid.JID, | 296 service: jid.JID, |
115 ) -> Optional[dict]: | 300 ) -> Optional[dict]: |
116 if not nodeIdentifier: | 301 if not nodeIdentifier: |
117 return None | 302 return None |
118 info = { | 303 info = { |
119 "type": "leaf", | 304 "type": "leaf", |
120 "meta-data": [ | 305 "meta-data": NODE_CONFIG |
121 {"var": "pubsub#persist_items", "type": "boolean", "value": True}, | |
122 {"var": "pubsub#max_items", "value": "max"}, | |
123 {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, | |
124 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, | |
125 | |
126 ] | |
127 | |
128 } | 306 } |
129 return info | 307 return info |