comparison sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3745:a8c7e5cef0cb

comp AP gateway: signature checking, caching and threads management: - HTTP signature is checked for incoming messages - AP actor can now be followed using pubsub subscription. When following is accepted, the node is cached - replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth` option to limit the number of comment nodes for a root message (documentation will come to explain this). ticket 364
author Goffi <goffi@goffi.org>
date Tue, 22 Mar 2022 17:00:42 +0100
parents 86eea17cafa7
children 125c7043b277
comparison
equal deleted inserted replaced
3744:658ddbabaf36 3745:a8c7e5cef0cb
14 # GNU Affero General Public License for more details. 14 # GNU Affero General Public License for more details.
15 15
16 # You should have received a copy of the GNU Affero General Public License 16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 from typing import Optional, List 19 from typing import Optional, Tuple, List, Dict, Any
20 20
21 from twisted.internet import defer
21 from twisted.words.protocols.jabber import jid, error 22 from twisted.words.protocols.jabber import jid, error
22 from twisted.words.xish import domish 23 from twisted.words.xish import domish
23 from wokkel import rsm 24 from wokkel import rsm, pubsub, data_form
24 25
25 from sat.core.i18n import _ 26 from sat.core.i18n import _
27 from sat.core import exceptions
26 from sat.core.log import getLogger 28 from sat.core.log import getLogger
29 from sat.core.constants import Const as C
27 from sat.tools.utils import ensure_deferred 30 from sat.tools.utils import ensure_deferred
31 from sat.memory.sqla_mapping import PubsubSub, SubscriptionState
32
33 from .constants import (
34 TYPE_ACTOR,
35 )
28 36
29 37
30 log = getLogger(__name__) 38 log = getLogger(__name__)
39
40 # all nodes have the same config
41 NODE_CONFIG = [
42 {"var": "pubsub#persist_items", "type": "boolean", "value": True},
43 {"var": "pubsub#max_items", "value": "max"},
44 {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
45 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
46
47 ]
48
49 NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
50 NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
51 for c in NODE_CONFIG:
52 NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")})
31 53
32 54
33 class APPubsubService(rsm.PubSubService): 55 class APPubsubService(rsm.PubSubService):
34 """Pubsub service for XMPP requests""" 56 """Pubsub service for XMPP requests"""
35 57
41 "category": "pubsub", 63 "category": "pubsub",
42 "type": "service", 64 "type": "service",
43 "name": "Libervia ActivityPub Gateway", 65 "name": "Libervia ActivityPub Gateway",
44 } 66 }
45 67
68 async def getAPActorIdsAndInbox(
69 self,
70 requestor: jid.JID,
71 recipient: jid.JID,
72 ) -> Tuple[str, str, str]:
73 """Get AP actor IDs from requestor and destinee JIDs
74
75 @param requestor: XMPP entity doing a request to an AP actor via the gateway
76 @param recipient: JID mapping an AP actor via the gateway
77 @return: requestor actor ID, recipient actor ID and recipient inbox
78 @raise error.StanzaError: "item-not-found" is raised if not user part is specified
79 in requestor
80 """
81 if not recipient.user:
82 raise error.StanzaError(
83 "item-not-found",
84 text="No user part specified"
85 )
86 requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost())
87 recipient_account = self.apg._e.unescape(recipient.user)
88 recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account)
89 inbox = await self.apg.getAPInboxFromId(recipient_actor_id)
90 return requestor_actor_id, recipient_actor_id, inbox
91
92
46 @ensure_deferred 93 @ensure_deferred
47 async def publish(self, requestor, service, nodeIdentifier, items): 94 async def publish(self, requestor, service, nodeIdentifier, items):
48 raise NotImplementedError 95 raise NotImplementedError
49 96
50 @ensure_deferred 97 @ensure_deferred
54 service: jid.JID, 101 service: jid.JID,
55 node: str, 102 node: str,
56 maxItems: Optional[int], 103 maxItems: Optional[int],
57 itemIdentifiers: Optional[List[str]], 104 itemIdentifiers: Optional[List[str]],
58 rsm_req: Optional[rsm.RSMRequest] 105 rsm_req: Optional[rsm.RSMRequest]
59 ) -> List[domish.Element]: 106 ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
60 if not service.user: 107 if not service.user:
61 return [] 108 return [], None
62 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) 109 ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
63 if ap_account.count("@") != 1: 110 if ap_account.count("@") != 1:
64 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") 111 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
65 return [] 112 return [], None
66 if node != self.apg._m.namespace: 113 if not node.startswith(self.apg._m.namespace):
67 raise error.StanzaError( 114 raise error.StanzaError(
68 "feature-not-implemented", 115 "feature-not-implemented",
69 text=f"{VERSION} only supports {self.apg._m.namespace} " 116 text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} "
70 "node for now" 117 "node for now"
71 ) 118 )
72 if rsm_req is None: 119 client = self.apg.client
73 if maxItems is None: 120 cached_node = await self.host.memory.storage.getPubsubNode(
74 maxItems = 20 121 client, service, node
75 kwargs = { 122 )
76 "max_items": maxItems, 123 # TODO: check if node is synchronised
77 "chronological_pagination": False, 124 if cached_node is not None:
78 } 125 # the node is cached, we return items from cache
126 log.debug(f"node {node!r} from {service} is in cache")
127 pubsub_items, metadata = await self.apg._c.getItemsFromCache(
128 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
129 )
130 try:
131 rsm_resp = rsm.RSMResponse(**metadata["rsm"])
132 except KeyError:
133 rsm_resp = None
134 return [i.data for i in pubsub_items], rsm_resp
135
136 if itemIdentifiers:
137 items = []
138 for item_id in itemIdentifiers:
139 item_data = await self.apg.apGet(item_id)
140 item_elt = await self.apg.apItem2Elt(item_data)
141 items.append(item_elt)
142 return items, None
79 else: 143 else:
80 if len( 144 if rsm_req is None:
81 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) 145 if maxItems is None:
82 if v is not None] 146 maxItems = 20
83 ) > 1: 147 kwargs = {
148 "max_items": maxItems,
149 "chronological_pagination": False,
150 }
151 else:
152 if len(
153 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
154 if v is not None]
155 ) > 1:
156 raise error.StanzaError(
157 "bad-request",
158 text="You can't use after, before and index at the same time"
159 )
160 kwargs = {"max_items": rsm_req.max}
161 if rsm_req.after is not None:
162 kwargs["after_id"] = rsm_req.after
163 elif rsm_req.before is not None:
164 kwargs["chronological_pagination"] = False
165 if rsm_req.before != "":
166 kwargs["after_id"] = rsm_req.before
167 elif rsm_req.index is not None:
168 kwargs["start_index"] = rsm_req.index
169
170 log.info(
171 f"No cache found for node {node} at {service} (AP account {ap_account}), "
172 "using Collection Paging to RSM translation"
173 )
174 if self.apg._m.isCommentsNode(node):
175 parent_node = self.apg._m.getParentNode(node)
176 try:
177 parent_data = await self.apg.apGet(parent_node)
178 collection = await self.apg.apGetObject(
179 parent_data.get("object", {}),
180 "replies"
181 )
182 except Exception as e:
183 raise error.StanzaError(
184 "item-not-found",
185 text=e
186 )
187 else:
188 actor_data = await self.apg.getAPActorDataFromAccount(ap_account)
189 collection = await self.apg.apGetObject(actor_data, "outbox")
190 if not collection:
84 raise error.StanzaError( 191 raise error.StanzaError(
85 "bad-request", 192 "item-not-found",
86 text="You can't use after, before and index at the same time" 193 text=f"No collection found for node {node!r} (account: {ap_account})"
87 ) 194 )
88 kwargs = {"max_items": rsm_req.max} 195 return await self.apg.getAPItems(collection, **kwargs)
89 if rsm_req.after is not None:
90 kwargs["after_id"] = rsm_req.after
91 elif rsm_req.before is not None:
92 kwargs["chronological_pagination"] = False
93 if rsm_req.before != "":
94 kwargs["after_id"] = rsm_req.before
95 elif rsm_req.index is not None:
96 kwargs["start_index"] = rsm_req.index
97
98 log.info(
99 f"No cache found for node {node} at {service} (AP account {ap_account}), "
100 "using Collection Paging to RSM translation"
101 )
102 return await self.apg.getAPItems(ap_account, **kwargs)
103 196
104 @ensure_deferred 197 @ensure_deferred
105 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): 198 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
106 raise NotImplementedError 199 raise NotImplementedError
200
201 @ensure_deferred
202 async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
203 # TODO: handle comments nodes
204 client = self.apg.client
205 node = await self.host.memory.storage.getPubsubNode(
206 client, service, nodeIdentifier, with_subscriptions=True
207 )
208 if node is None:
209 node = await self.host.memory.storage.setPubsubNode(
210 client,
211 service,
212 nodeIdentifier,
213 )
214 subscription = None
215 else:
216 try:
217 subscription = next(
218 s for s in node.subscriptions
219 if s.subscriber == requestor.userhostJID()
220 )
221 except StopIteration:
222 subscription = None
223
224 if subscription is None:
225 subscription = PubsubSub(
226 subscriber=requestor.userhostJID(),
227 state=SubscriptionState.PENDING
228 )
229 node.subscriptions.append(subscription)
230 await self.host.memory.storage.add(node)
231 else:
232 if subscription.state is None:
233 subscription.state = SubscriptionState.PENDING
234 await self.host.memory.storage.add(node)
235 elif subscription.state == SubscriptionState.SUBSCRIBED:
236 log.info(
237 f"{requestor.userhostJID()} has already a subscription to {node!r} "
238 f"at {service}. Doing the request anyway."
239 )
240 elif subscription.state == SubscriptionState.PENDING:
241 log.info(
242 f"{requestor.userhostJID()} has already a pending subscription to "
243 f"{node!r} at {service}. Doing the request anyway."
244 )
245 else:
246 raise exceptions.InternalError(
247 f"unmanaged subscription state: {subscription.state}"
248 )
249
250 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
251 requestor, service
252 )
253
254 data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id)
255
256 resp = await self.apg.signAndPost(inbox, req_actor_id, data)
257 if resp.code >= 400:
258 text = await resp.text()
259 raise error.StanzaError("service-unavailable", text=text)
260 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
261
262 @ensure_deferred
263 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
264 req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
265 requestor, service
266 )
267 data = self.apg.createActivity(
268 "Undo",
269 req_actor_id,
270 self.apg.createActivity(
271 "Follow",
272 req_actor_id,
273 recip_actor_id
274 )
275 )
276
277 resp = await self.apg.signAndPost(inbox, req_actor_id, data)
278 if resp.code >= 400:
279 text = await resp.text()
280 raise error.StanzaError("service-unavailable", text=text)
281
282 def getConfigurationOptions(self):
283 return NODE_OPTIONS
284
285 def getConfiguration(
286 self,
287 requestor: jid.JID,
288 service: jid.JID,
289 nodeIdentifier: str
290 ) -> defer.Deferred:
291 return defer.succeed(NODE_CONFIG_VALUES)
107 292
108 def getNodeInfo( 293 def getNodeInfo(
109 self, 294 self,
110 requestor: jid.JID, 295 requestor: jid.JID,
111 service: jid.JID, 296 service: jid.JID,
115 ) -> Optional[dict]: 300 ) -> Optional[dict]:
116 if not nodeIdentifier: 301 if not nodeIdentifier:
117 return None 302 return None
118 info = { 303 info = {
119 "type": "leaf", 304 "type": "leaf",
120 "meta-data": [ 305 "meta-data": NODE_CONFIG
121 {"var": "pubsub#persist_items", "type": "boolean", "value": True},
122 {"var": "pubsub#max_items", "value": "max"},
123 {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
124 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
125
126 ]
127
128 } 306 }
129 return info 307 return info