Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_comp_ap_gateway/pubsub_service.py@524856bd7b19 |
children | 92551baea115 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia ActivityPub Gateway | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import Optional, Tuple, List, Dict, Any, Union | |
20 from urllib.parse import urlparse | |
21 from pathlib import Path | |
22 from base64 import b64encode | |
23 import tempfile | |
24 | |
25 from twisted.internet import defer, threads | |
26 from twisted.words.protocols.jabber import jid, error | |
27 from twisted.words.xish import domish | |
28 from wokkel import rsm, pubsub, disco | |
29 | |
30 from libervia.backend.core.i18n import _ | |
31 from libervia.backend.core import exceptions | |
32 from libervia.backend.core.core_types import SatXMPPEntity | |
33 from libervia.backend.core.log import getLogger | |
34 from libervia.backend.core.constants import Const as C | |
35 from libervia.backend.tools import image | |
36 from libervia.backend.tools.utils import ensure_deferred | |
37 from libervia.backend.tools.web import download_file | |
38 from libervia.backend.memory.sqla_mapping import PubsubSub, SubscriptionState | |
39 | |
40 from .constants import ( | |
41 TYPE_ACTOR, | |
42 ST_AVATAR, | |
43 MAX_AVATAR_SIZE | |
44 ) | |
45 | |
46 | |
47 log = getLogger(__name__) | |
48 | |
49 # all nodes have the same config | |
50 NODE_CONFIG = [ | |
51 {"var": "pubsub#persist_items", "type": "boolean", "value": True}, | |
52 {"var": "pubsub#max_items", "value": "max"}, | |
53 {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, | |
54 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, | |
55 | |
56 ] | |
57 | |
58 NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} | |
59 NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} | |
60 for c in NODE_CONFIG: | |
61 NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")}) | |
62 | |
63 | |
64 class APPubsubService(rsm.PubSubService): | |
65 """Pubsub service for XMPP requests""" | |
66 | |
67 def __init__(self, apg): | |
68 super(APPubsubService, self).__init__() | |
69 self.host = apg.host | |
70 self.apg = apg | |
71 self.discoIdentity = { | |
72 "category": "pubsub", | |
73 "type": "service", | |
74 "name": "Libervia ActivityPub Gateway", | |
75 } | |
76 | |
77 async def get_ap_actor_ids_and_inbox( | |
78 self, | |
79 requestor: jid.JID, | |
80 recipient: jid.JID, | |
81 ) -> Tuple[str, str, str]: | |
82 """Get AP actor IDs from requestor and destinee JIDs | |
83 | |
84 @param requestor: XMPP entity doing a request to an AP actor via the gateway | |
85 @param recipient: JID mapping an AP actor via the gateway | |
86 @return: requestor actor ID, recipient actor ID and recipient inbox | |
87 @raise error.StanzaError: "item-not-found" is raised if not user part is specified | |
88 in requestor | |
89 """ | |
90 if not recipient.user: | |
91 raise error.StanzaError( | |
92 "item-not-found", | |
93 text="No user part specified" | |
94 ) | |
95 requestor_actor_id = self.apg.build_apurl(TYPE_ACTOR, requestor.userhost()) | |
96 recipient_account = self.apg._e.unescape(recipient.user) | |
97 recipient_actor_id = await self.apg.get_ap_actor_id_from_account(recipient_account) | |
98 inbox = await self.apg.get_ap_inbox_from_id(recipient_actor_id, use_shared=False) | |
99 return requestor_actor_id, recipient_actor_id, inbox | |
100 | |
101 | |
102 @ensure_deferred | |
103 async def publish(self, requestor, service, nodeIdentifier, items): | |
104 if self.apg.local_only and not self.apg.is_local(requestor): | |
105 raise error.StanzaError( | |
106 "forbidden", | |
107 "Only local users can publish on this gateway." | |
108 ) | |
109 if not service.user: | |
110 raise error.StanzaError( | |
111 "bad-request", | |
112 "You must specify an ActivityPub actor account in JID user part." | |
113 ) | |
114 ap_account = self.apg._e.unescape(service.user) | |
115 if ap_account.count("@") != 1: | |
116 raise error.StanzaError( | |
117 "bad-request", | |
118 f"{ap_account!r} is not a valid ActivityPub actor account." | |
119 ) | |
120 | |
121 client = self.apg.client.get_virtual_client(requestor) | |
122 if self.apg._pa.is_attachment_node(nodeIdentifier): | |
123 await self.apg.convert_and_post_attachments( | |
124 client, ap_account, service, nodeIdentifier, items, publisher=requestor | |
125 ) | |
126 else: | |
127 await self.apg.convert_and_post_items( | |
128 client, ap_account, service, nodeIdentifier, items | |
129 ) | |
130 cached_node = await self.host.memory.storage.get_pubsub_node( | |
131 client, service, nodeIdentifier, with_subscriptions=True, create=True | |
132 ) | |
133 await self.host.memory.storage.cache_pubsub_items( | |
134 client, | |
135 cached_node, | |
136 items | |
137 ) | |
138 for subscription in cached_node.subscriptions: | |
139 if subscription.state != SubscriptionState.SUBSCRIBED: | |
140 continue | |
141 self.notifyPublish( | |
142 service, | |
143 nodeIdentifier, | |
144 [(subscription.subscriber, None, items)] | |
145 ) | |
146 | |
147 async def ap_following_2_elt(self, ap_item: dict) -> domish.Element: | |
148 """Convert actor ID from following collection to XMPP item""" | |
149 actor_id = ap_item["id"] | |
150 actor_jid = await self.apg.get_jid_from_id(actor_id) | |
151 subscription_elt = self.apg._pps.build_subscription_elt( | |
152 self.apg._m.namespace, actor_jid | |
153 ) | |
154 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) | |
155 return item_elt | |
156 | |
157 async def ap_follower_2_elt(self, ap_item: dict) -> domish.Element: | |
158 """Convert actor ID from followers collection to XMPP item""" | |
159 actor_id = ap_item["id"] | |
160 actor_jid = await self.apg.get_jid_from_id(actor_id) | |
161 subscriber_elt = self.apg._pps.build_subscriber_elt(actor_jid) | |
162 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) | |
163 return item_elt | |
164 | |
165 async def generate_v_card(self, ap_account: str) -> domish.Element: | |
166 """Generate vCard4 (XEP-0292) item element from ap_account's metadata""" | |
167 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) | |
168 identity_data = {} | |
169 | |
170 summary = actor_data.get("summary") | |
171 # summary is HTML, we have to convert it to text | |
172 if summary: | |
173 identity_data["description"] = await self.apg._t.convert( | |
174 summary, | |
175 self.apg._t.SYNTAX_XHTML, | |
176 self.apg._t.SYNTAX_TEXT, | |
177 False, | |
178 ) | |
179 | |
180 for field in ("name", "preferredUsername"): | |
181 value = actor_data.get(field) | |
182 if value: | |
183 identity_data.setdefault("nicknames", []).append(value) | |
184 vcard_elt = self.apg._v.dict_2_v_card(identity_data) | |
185 item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) | |
186 item_elt.addChild(vcard_elt) | |
187 item_elt["id"] = self.apg._p.ID_SINGLETON | |
188 return item_elt | |
189 | |
190 async def get_avatar_data( | |
191 self, | |
192 client: SatXMPPEntity, | |
193 ap_account: str | |
194 ) -> Dict[str, Any]: | |
195 """Retrieve actor's avatar if any, cache it and file actor_data | |
196 | |
197 ``cache_uid``, `path``` and ``media_type`` keys are always files | |
198 ``base64`` key is only filled if the file was not already in cache | |
199 """ | |
200 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) | |
201 | |
202 for icon in await self.apg.ap_get_list(actor_data, "icon"): | |
203 url = icon.get("url") | |
204 if icon["type"] != "Image" or not url: | |
205 continue | |
206 parsed_url = urlparse(url) | |
207 if not parsed_url.scheme in ("http", "https"): | |
208 log.warning(f"unexpected URL scheme: {url!r}") | |
209 continue | |
210 filename = Path(parsed_url.path).name | |
211 if not filename: | |
212 log.warning(f"ignoring URL with invald path: {url!r}") | |
213 continue | |
214 break | |
215 else: | |
216 raise error.StanzaError("item-not-found") | |
217 | |
218 key = f"{ST_AVATAR}{url}" | |
219 cache_uid = await client._ap_storage.get(key) | |
220 | |
221 if cache_uid is None: | |
222 cache = None | |
223 else: | |
224 cache = self.apg.host.common_cache.get_metadata(cache_uid) | |
225 | |
226 if cache is None: | |
227 with tempfile.TemporaryDirectory() as dir_name: | |
228 dest_path = Path(dir_name, filename) | |
229 await download_file(url, dest_path, max_size=MAX_AVATAR_SIZE) | |
230 avatar_data = { | |
231 "path": dest_path, | |
232 "filename": filename, | |
233 'media_type': image.guess_type(dest_path), | |
234 } | |
235 | |
236 await self.apg._i.cache_avatar( | |
237 self.apg.IMPORT_NAME, | |
238 avatar_data | |
239 ) | |
240 else: | |
241 avatar_data = { | |
242 "cache_uid": cache["uid"], | |
243 "path": cache["path"], | |
244 "media_type": cache["mime_type"] | |
245 } | |
246 | |
247 return avatar_data | |
248 | |
249 async def generate_avatar_metadata( | |
250 self, | |
251 client: SatXMPPEntity, | |
252 ap_account: str | |
253 ) -> domish.Element: | |
254 """Generate the metadata element for user avatar | |
255 | |
256 @raise StanzaError("item-not-found"): no avatar is present in actor data (in | |
257 ``icon`` field) | |
258 """ | |
259 avatar_data = await self.get_avatar_data(client, ap_account) | |
260 return self.apg._a.build_item_metadata_elt(avatar_data) | |
261 | |
262 def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None: | |
263 with avatar_data["path"].open("rb") as f: | |
264 avatar_data["base64"] = b64encode(f.read()).decode() | |
265 | |
266 async def generate_avatar_data( | |
267 self, | |
268 client: SatXMPPEntity, | |
269 ap_account: str, | |
270 itemIdentifiers: Optional[List[str]], | |
271 ) -> domish.Element: | |
272 """Generate the data element for user avatar | |
273 | |
274 @raise StanzaError("item-not-found"): no avatar cached with requested ID | |
275 """ | |
276 if not itemIdentifiers: | |
277 avatar_data = await self.get_avatar_data(client, ap_account) | |
278 if "base64" not in avatar_data: | |
279 await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data) | |
280 else: | |
281 if len(itemIdentifiers) > 1: | |
282 # only a single item ID is supported | |
283 raise error.StanzaError("item-not-found") | |
284 item_id = itemIdentifiers[0] | |
285 # just to be sure that that we don't have an empty string | |
286 assert item_id | |
287 cache_data = self.apg.host.common_cache.get_metadata(item_id) | |
288 if cache_data is None: | |
289 raise error.StanzaError("item-not-found") | |
290 avatar_data = { | |
291 "cache_uid": item_id, | |
292 "path": cache_data["path"] | |
293 } | |
294 await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data) | |
295 | |
296 return self.apg._a.build_item_data_elt(avatar_data) | |
297 | |
298 @ensure_deferred | |
299 async def items( | |
300 self, | |
301 requestor: jid.JID, | |
302 service: jid.JID, | |
303 node: str, | |
304 maxItems: Optional[int], | |
305 itemIdentifiers: Optional[List[str]], | |
306 rsm_req: Optional[rsm.RSMRequest] | |
307 ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: | |
308 if not service.user: | |
309 return [], None | |
310 ap_account = self.host.plugins["XEP-0106"].unescape(service.user) | |
311 if ap_account.count("@") != 1: | |
312 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") | |
313 return [], None | |
314 | |
315 # cached_node may be pre-filled with some nodes (e.g. attachments nodes), | |
316 # otherwise it is filled when suitable | |
317 cached_node = None | |
318 client = self.apg.client | |
319 kwargs = {} | |
320 | |
321 if node == self.apg._pps.subscriptions_node: | |
322 collection_name = "following" | |
323 parser = self.ap_following_2_elt | |
324 kwargs["only_ids"] = True | |
325 use_cache = False | |
326 elif node.startswith(self.apg._pps.subscribers_node_prefix): | |
327 collection_name = "followers" | |
328 parser = self.ap_follower_2_elt | |
329 kwargs["only_ids"] = True | |
330 use_cache = False | |
331 elif node == self.apg._v.node: | |
332 # vCard4 request | |
333 item_elt = await self.generate_v_card(ap_account) | |
334 return [item_elt], None | |
335 elif node == self.apg._a.namespace_metadata: | |
336 item_elt = await self.generate_avatar_metadata(self.apg.client, ap_account) | |
337 return [item_elt], None | |
338 elif node == self.apg._a.namespace_data: | |
339 item_elt = await self.generate_avatar_data( | |
340 self.apg.client, ap_account, itemIdentifiers | |
341 ) | |
342 return [item_elt], None | |
343 elif self.apg._pa.is_attachment_node(node): | |
344 use_cache = True | |
345 # we check cache here because we emit an item-not-found error if the node is | |
346 # not in cache, as we are not dealing with real AP items | |
347 cached_node = await self.host.memory.storage.get_pubsub_node( | |
348 client, service, node | |
349 ) | |
350 if cached_node is None: | |
351 raise error.StanzaError("item-not-found") | |
352 else: | |
353 if node.startswith(self.apg._m.namespace): | |
354 parser = self.apg.ap_item_2_mb_elt | |
355 elif node.startswith(self.apg._events.namespace): | |
356 parser = self.apg.ap_events.ap_item_2_event_elt | |
357 else: | |
358 raise error.StanzaError( | |
359 "feature-not-implemented", | |
360 text=f"AP Gateway {C.APP_VERSION} only supports " | |
361 f"{self.apg._m.namespace} node for now" | |
362 ) | |
363 collection_name = "outbox" | |
364 use_cache = True | |
365 | |
366 if use_cache: | |
367 if cached_node is None: | |
368 cached_node = await self.host.memory.storage.get_pubsub_node( | |
369 client, service, node | |
370 ) | |
371 # TODO: check if node is synchronised | |
372 if cached_node is not None: | |
373 # the node is cached, we return items from cache | |
374 log.debug(f"node {node!r} from {service} is in cache") | |
375 pubsub_items, metadata = await self.apg._c.get_items_from_cache( | |
376 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req | |
377 ) | |
378 try: | |
379 rsm_resp = rsm.RSMResponse(**metadata["rsm"]) | |
380 except KeyError: | |
381 rsm_resp = None | |
382 return [i.data for i in pubsub_items], rsm_resp | |
383 | |
384 if itemIdentifiers: | |
385 items = [] | |
386 for item_id in itemIdentifiers: | |
387 item_data = await self.apg.ap_get(item_id) | |
388 item_elt = await parser(item_data) | |
389 items.append(item_elt) | |
390 return items, None | |
391 else: | |
392 if rsm_req is None: | |
393 if maxItems is None: | |
394 maxItems = 20 | |
395 kwargs.update({ | |
396 "max_items": maxItems, | |
397 "chronological_pagination": False, | |
398 }) | |
399 else: | |
400 if len( | |
401 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) | |
402 if v is not None] | |
403 ) > 1: | |
404 raise error.StanzaError( | |
405 "bad-request", | |
406 text="You can't use after, before and index at the same time" | |
407 ) | |
408 kwargs.update({"max_items": rsm_req.max}) | |
409 if rsm_req.after is not None: | |
410 kwargs["after_id"] = rsm_req.after | |
411 elif rsm_req.before is not None: | |
412 kwargs["chronological_pagination"] = False | |
413 if rsm_req.before != "": | |
414 kwargs["after_id"] = rsm_req.before | |
415 elif rsm_req.index is not None: | |
416 kwargs["start_index"] = rsm_req.index | |
417 | |
418 log.info( | |
419 f"No cache found for node {node} at {service} (AP account {ap_account}), " | |
420 "using Collection Paging to RSM translation" | |
421 ) | |
422 if self.apg._m.is_comment_node(node): | |
423 parent_item = self.apg._m.get_parent_item(node) | |
424 try: | |
425 parent_data = await self.apg.ap_get(parent_item) | |
426 collection = await self.apg.ap_get_object( | |
427 parent_data.get("object", {}), | |
428 "replies" | |
429 ) | |
430 except Exception as e: | |
431 raise error.StanzaError( | |
432 "item-not-found", | |
433 text=e | |
434 ) | |
435 else: | |
436 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) | |
437 collection = await self.apg.ap_get_object(actor_data, collection_name) | |
438 if not collection: | |
439 raise error.StanzaError( | |
440 "item-not-found", | |
441 text=f"No collection found for node {node!r} (account: {ap_account})" | |
442 ) | |
443 | |
444 kwargs["parser"] = parser | |
445 return await self.apg.get_ap_items(collection, **kwargs) | |
446 | |
447 @ensure_deferred | |
448 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): | |
449 raise error.StanzaError("forbidden") | |
450 | |
451 @ensure_deferred | |
452 async def subscribe(self, requestor, service, nodeIdentifier, subscriber): | |
453 # TODO: handle comments nodes | |
454 client = self.apg.client | |
455 # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow | |
456 # is accepted. Other nodes are directly set to subscribed, their subscriptions | |
457 # being internal. | |
458 if nodeIdentifier == self.apg._m.namespace: | |
459 sub_state = SubscriptionState.PENDING | |
460 else: | |
461 sub_state = SubscriptionState.SUBSCRIBED | |
462 node = await self.host.memory.storage.get_pubsub_node( | |
463 client, service, nodeIdentifier, with_subscriptions=True | |
464 ) | |
465 if node is None: | |
466 node = await self.host.memory.storage.set_pubsub_node( | |
467 client, | |
468 service, | |
469 nodeIdentifier, | |
470 ) | |
471 subscription = None | |
472 else: | |
473 try: | |
474 subscription = next( | |
475 s for s in node.subscriptions | |
476 if s.subscriber == requestor.userhostJID() | |
477 ) | |
478 except StopIteration: | |
479 subscription = None | |
480 | |
481 if subscription is None: | |
482 subscription = PubsubSub( | |
483 subscriber=requestor.userhostJID(), | |
484 state=sub_state | |
485 ) | |
486 node.subscriptions.append(subscription) | |
487 await self.host.memory.storage.add(node) | |
488 else: | |
489 if subscription.state is None: | |
490 subscription.state = sub_state | |
491 await self.host.memory.storage.add(node) | |
492 elif subscription.state == SubscriptionState.SUBSCRIBED: | |
493 log.info( | |
494 f"{requestor.userhostJID()} has already a subscription to {node!r} " | |
495 f"at {service}. Doing the request anyway." | |
496 ) | |
497 elif subscription.state == SubscriptionState.PENDING: | |
498 log.info( | |
499 f"{requestor.userhostJID()} has already a pending subscription to " | |
500 f"{node!r} at {service}. Doing the request anyway." | |
501 ) | |
502 if sub_state != SubscriptionState.PENDING: | |
503 subscription.state = sub_state | |
504 await self.host.memory.storage.add(node) | |
505 else: | |
506 raise exceptions.InternalError( | |
507 f"unmanaged subscription state: {subscription.state}" | |
508 ) | |
509 | |
510 if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): | |
511 # if we subscribe to microblog or events node, we follow the corresponding | |
512 # account | |
513 req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox( | |
514 requestor, service | |
515 ) | |
516 | |
517 data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id) | |
518 | |
519 resp = await self.apg.sign_and_post(inbox, req_actor_id, data) | |
520 if resp.code >= 300: | |
521 text = await resp.text() | |
522 raise error.StanzaError("service-unavailable", text=text) | |
523 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") | |
524 | |
525 @ensure_deferred | |
526 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): | |
527 req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox( | |
528 requestor, service | |
529 ) | |
530 data = self.apg.create_activity( | |
531 "Undo", | |
532 req_actor_id, | |
533 self.apg.create_activity( | |
534 "Follow", | |
535 req_actor_id, | |
536 recip_actor_id | |
537 ) | |
538 ) | |
539 | |
540 resp = await self.apg.sign_and_post(inbox, req_actor_id, data) | |
541 if resp.code >= 300: | |
542 text = await resp.text() | |
543 raise error.StanzaError("service-unavailable", text=text) | |
544 | |
545 def getConfigurationOptions(self): | |
546 return NODE_OPTIONS | |
547 | |
548 def getConfiguration( | |
549 self, | |
550 requestor: jid.JID, | |
551 service: jid.JID, | |
552 nodeIdentifier: str | |
553 ) -> defer.Deferred: | |
554 return defer.succeed(NODE_CONFIG_VALUES) | |
555 | |
556 def getNodeInfo( | |
557 self, | |
558 requestor: jid.JID, | |
559 service: jid.JID, | |
560 nodeIdentifier: str, | |
561 pep: bool = False, | |
562 recipient: Optional[jid.JID] = None | |
563 ) -> Optional[dict]: | |
564 if not nodeIdentifier: | |
565 return None | |
566 info = { | |
567 "type": "leaf", | |
568 "meta-data": NODE_CONFIG | |
569 } | |
570 return info |