comparison libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_comp_ap_gateway/pubsub_service.py@524856bd7b19
children 92551baea115
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia ActivityPub Gateway
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Optional, Tuple, List, Dict, Any, Union
20 from urllib.parse import urlparse
21 from pathlib import Path
22 from base64 import b64encode
23 import tempfile
24
25 from twisted.internet import defer, threads
26 from twisted.words.protocols.jabber import jid, error
27 from twisted.words.xish import domish
28 from wokkel import rsm, pubsub, disco
29
30 from libervia.backend.core.i18n import _
31 from libervia.backend.core import exceptions
32 from libervia.backend.core.core_types import SatXMPPEntity
33 from libervia.backend.core.log import getLogger
34 from libervia.backend.core.constants import Const as C
35 from libervia.backend.tools import image
36 from libervia.backend.tools.utils import ensure_deferred
37 from libervia.backend.tools.web import download_file
38 from libervia.backend.memory.sqla_mapping import PubsubSub, SubscriptionState
39
40 from .constants import (
41 TYPE_ACTOR,
42 ST_AVATAR,
43 MAX_AVATAR_SIZE
44 )
45
46
47 log = getLogger(__name__)
48
49 # all nodes have the same config
50 NODE_CONFIG = [
51 {"var": "pubsub#persist_items", "type": "boolean", "value": True},
52 {"var": "pubsub#max_items", "value": "max"},
53 {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
54 {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
55
56 ]
57
58 NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
59 NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
60 for c in NODE_CONFIG:
61 NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")})
62
63
64 class APPubsubService(rsm.PubSubService):
65 """Pubsub service for XMPP requests"""
66
67 def __init__(self, apg):
68 super(APPubsubService, self).__init__()
69 self.host = apg.host
70 self.apg = apg
71 self.discoIdentity = {
72 "category": "pubsub",
73 "type": "service",
74 "name": "Libervia ActivityPub Gateway",
75 }
76
77 async def get_ap_actor_ids_and_inbox(
78 self,
79 requestor: jid.JID,
80 recipient: jid.JID,
81 ) -> Tuple[str, str, str]:
82 """Get AP actor IDs from requestor and destinee JIDs
83
84 @param requestor: XMPP entity doing a request to an AP actor via the gateway
85 @param recipient: JID mapping an AP actor via the gateway
86 @return: requestor actor ID, recipient actor ID and recipient inbox
87 @raise error.StanzaError: "item-not-found" is raised if not user part is specified
88 in requestor
89 """
90 if not recipient.user:
91 raise error.StanzaError(
92 "item-not-found",
93 text="No user part specified"
94 )
95 requestor_actor_id = self.apg.build_apurl(TYPE_ACTOR, requestor.userhost())
96 recipient_account = self.apg._e.unescape(recipient.user)
97 recipient_actor_id = await self.apg.get_ap_actor_id_from_account(recipient_account)
98 inbox = await self.apg.get_ap_inbox_from_id(recipient_actor_id, use_shared=False)
99 return requestor_actor_id, recipient_actor_id, inbox
100
101
102 @ensure_deferred
103 async def publish(self, requestor, service, nodeIdentifier, items):
104 if self.apg.local_only and not self.apg.is_local(requestor):
105 raise error.StanzaError(
106 "forbidden",
107 "Only local users can publish on this gateway."
108 )
109 if not service.user:
110 raise error.StanzaError(
111 "bad-request",
112 "You must specify an ActivityPub actor account in JID user part."
113 )
114 ap_account = self.apg._e.unescape(service.user)
115 if ap_account.count("@") != 1:
116 raise error.StanzaError(
117 "bad-request",
118 f"{ap_account!r} is not a valid ActivityPub actor account."
119 )
120
121 client = self.apg.client.get_virtual_client(requestor)
122 if self.apg._pa.is_attachment_node(nodeIdentifier):
123 await self.apg.convert_and_post_attachments(
124 client, ap_account, service, nodeIdentifier, items, publisher=requestor
125 )
126 else:
127 await self.apg.convert_and_post_items(
128 client, ap_account, service, nodeIdentifier, items
129 )
130 cached_node = await self.host.memory.storage.get_pubsub_node(
131 client, service, nodeIdentifier, with_subscriptions=True, create=True
132 )
133 await self.host.memory.storage.cache_pubsub_items(
134 client,
135 cached_node,
136 items
137 )
138 for subscription in cached_node.subscriptions:
139 if subscription.state != SubscriptionState.SUBSCRIBED:
140 continue
141 self.notifyPublish(
142 service,
143 nodeIdentifier,
144 [(subscription.subscriber, None, items)]
145 )
146
147 async def ap_following_2_elt(self, ap_item: dict) -> domish.Element:
148 """Convert actor ID from following collection to XMPP item"""
149 actor_id = ap_item["id"]
150 actor_jid = await self.apg.get_jid_from_id(actor_id)
151 subscription_elt = self.apg._pps.build_subscription_elt(
152 self.apg._m.namespace, actor_jid
153 )
154 item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
155 return item_elt
156
157 async def ap_follower_2_elt(self, ap_item: dict) -> domish.Element:
158 """Convert actor ID from followers collection to XMPP item"""
159 actor_id = ap_item["id"]
160 actor_jid = await self.apg.get_jid_from_id(actor_id)
161 subscriber_elt = self.apg._pps.build_subscriber_elt(actor_jid)
162 item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
163 return item_elt
164
165 async def generate_v_card(self, ap_account: str) -> domish.Element:
166 """Generate vCard4 (XEP-0292) item element from ap_account's metadata"""
167 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
168 identity_data = {}
169
170 summary = actor_data.get("summary")
171 # summary is HTML, we have to convert it to text
172 if summary:
173 identity_data["description"] = await self.apg._t.convert(
174 summary,
175 self.apg._t.SYNTAX_XHTML,
176 self.apg._t.SYNTAX_TEXT,
177 False,
178 )
179
180 for field in ("name", "preferredUsername"):
181 value = actor_data.get(field)
182 if value:
183 identity_data.setdefault("nicknames", []).append(value)
184 vcard_elt = self.apg._v.dict_2_v_card(identity_data)
185 item_elt = domish.Element((pubsub.NS_PUBSUB, "item"))
186 item_elt.addChild(vcard_elt)
187 item_elt["id"] = self.apg._p.ID_SINGLETON
188 return item_elt
189
190 async def get_avatar_data(
191 self,
192 client: SatXMPPEntity,
193 ap_account: str
194 ) -> Dict[str, Any]:
195 """Retrieve actor's avatar if any, cache it and file actor_data
196
197 ``cache_uid``, `path``` and ``media_type`` keys are always files
198 ``base64`` key is only filled if the file was not already in cache
199 """
200 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
201
202 for icon in await self.apg.ap_get_list(actor_data, "icon"):
203 url = icon.get("url")
204 if icon["type"] != "Image" or not url:
205 continue
206 parsed_url = urlparse(url)
207 if not parsed_url.scheme in ("http", "https"):
208 log.warning(f"unexpected URL scheme: {url!r}")
209 continue
210 filename = Path(parsed_url.path).name
211 if not filename:
212 log.warning(f"ignoring URL with invald path: {url!r}")
213 continue
214 break
215 else:
216 raise error.StanzaError("item-not-found")
217
218 key = f"{ST_AVATAR}{url}"
219 cache_uid = await client._ap_storage.get(key)
220
221 if cache_uid is None:
222 cache = None
223 else:
224 cache = self.apg.host.common_cache.get_metadata(cache_uid)
225
226 if cache is None:
227 with tempfile.TemporaryDirectory() as dir_name:
228 dest_path = Path(dir_name, filename)
229 await download_file(url, dest_path, max_size=MAX_AVATAR_SIZE)
230 avatar_data = {
231 "path": dest_path,
232 "filename": filename,
233 'media_type': image.guess_type(dest_path),
234 }
235
236 await self.apg._i.cache_avatar(
237 self.apg.IMPORT_NAME,
238 avatar_data
239 )
240 else:
241 avatar_data = {
242 "cache_uid": cache["uid"],
243 "path": cache["path"],
244 "media_type": cache["mime_type"]
245 }
246
247 return avatar_data
248
249 async def generate_avatar_metadata(
250 self,
251 client: SatXMPPEntity,
252 ap_account: str
253 ) -> domish.Element:
254 """Generate the metadata element for user avatar
255
256 @raise StanzaError("item-not-found"): no avatar is present in actor data (in
257 ``icon`` field)
258 """
259 avatar_data = await self.get_avatar_data(client, ap_account)
260 return self.apg._a.build_item_metadata_elt(avatar_data)
261
262 def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None:
263 with avatar_data["path"].open("rb") as f:
264 avatar_data["base64"] = b64encode(f.read()).decode()
265
266 async def generate_avatar_data(
267 self,
268 client: SatXMPPEntity,
269 ap_account: str,
270 itemIdentifiers: Optional[List[str]],
271 ) -> domish.Element:
272 """Generate the data element for user avatar
273
274 @raise StanzaError("item-not-found"): no avatar cached with requested ID
275 """
276 if not itemIdentifiers:
277 avatar_data = await self.get_avatar_data(client, ap_account)
278 if "base64" not in avatar_data:
279 await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data)
280 else:
281 if len(itemIdentifiers) > 1:
282 # only a single item ID is supported
283 raise error.StanzaError("item-not-found")
284 item_id = itemIdentifiers[0]
285 # just to be sure that that we don't have an empty string
286 assert item_id
287 cache_data = self.apg.host.common_cache.get_metadata(item_id)
288 if cache_data is None:
289 raise error.StanzaError("item-not-found")
290 avatar_data = {
291 "cache_uid": item_id,
292 "path": cache_data["path"]
293 }
294 await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data)
295
296 return self.apg._a.build_item_data_elt(avatar_data)
297
298 @ensure_deferred
299 async def items(
300 self,
301 requestor: jid.JID,
302 service: jid.JID,
303 node: str,
304 maxItems: Optional[int],
305 itemIdentifiers: Optional[List[str]],
306 rsm_req: Optional[rsm.RSMRequest]
307 ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
308 if not service.user:
309 return [], None
310 ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
311 if ap_account.count("@") != 1:
312 log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
313 return [], None
314
315 # cached_node may be pre-filled with some nodes (e.g. attachments nodes),
316 # otherwise it is filled when suitable
317 cached_node = None
318 client = self.apg.client
319 kwargs = {}
320
321 if node == self.apg._pps.subscriptions_node:
322 collection_name = "following"
323 parser = self.ap_following_2_elt
324 kwargs["only_ids"] = True
325 use_cache = False
326 elif node.startswith(self.apg._pps.subscribers_node_prefix):
327 collection_name = "followers"
328 parser = self.ap_follower_2_elt
329 kwargs["only_ids"] = True
330 use_cache = False
331 elif node == self.apg._v.node:
332 # vCard4 request
333 item_elt = await self.generate_v_card(ap_account)
334 return [item_elt], None
335 elif node == self.apg._a.namespace_metadata:
336 item_elt = await self.generate_avatar_metadata(self.apg.client, ap_account)
337 return [item_elt], None
338 elif node == self.apg._a.namespace_data:
339 item_elt = await self.generate_avatar_data(
340 self.apg.client, ap_account, itemIdentifiers
341 )
342 return [item_elt], None
343 elif self.apg._pa.is_attachment_node(node):
344 use_cache = True
345 # we check cache here because we emit an item-not-found error if the node is
346 # not in cache, as we are not dealing with real AP items
347 cached_node = await self.host.memory.storage.get_pubsub_node(
348 client, service, node
349 )
350 if cached_node is None:
351 raise error.StanzaError("item-not-found")
352 else:
353 if node.startswith(self.apg._m.namespace):
354 parser = self.apg.ap_item_2_mb_elt
355 elif node.startswith(self.apg._events.namespace):
356 parser = self.apg.ap_events.ap_item_2_event_elt
357 else:
358 raise error.StanzaError(
359 "feature-not-implemented",
360 text=f"AP Gateway {C.APP_VERSION} only supports "
361 f"{self.apg._m.namespace} node for now"
362 )
363 collection_name = "outbox"
364 use_cache = True
365
366 if use_cache:
367 if cached_node is None:
368 cached_node = await self.host.memory.storage.get_pubsub_node(
369 client, service, node
370 )
371 # TODO: check if node is synchronised
372 if cached_node is not None:
373 # the node is cached, we return items from cache
374 log.debug(f"node {node!r} from {service} is in cache")
375 pubsub_items, metadata = await self.apg._c.get_items_from_cache(
376 client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
377 )
378 try:
379 rsm_resp = rsm.RSMResponse(**metadata["rsm"])
380 except KeyError:
381 rsm_resp = None
382 return [i.data for i in pubsub_items], rsm_resp
383
384 if itemIdentifiers:
385 items = []
386 for item_id in itemIdentifiers:
387 item_data = await self.apg.ap_get(item_id)
388 item_elt = await parser(item_data)
389 items.append(item_elt)
390 return items, None
391 else:
392 if rsm_req is None:
393 if maxItems is None:
394 maxItems = 20
395 kwargs.update({
396 "max_items": maxItems,
397 "chronological_pagination": False,
398 })
399 else:
400 if len(
401 [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
402 if v is not None]
403 ) > 1:
404 raise error.StanzaError(
405 "bad-request",
406 text="You can't use after, before and index at the same time"
407 )
408 kwargs.update({"max_items": rsm_req.max})
409 if rsm_req.after is not None:
410 kwargs["after_id"] = rsm_req.after
411 elif rsm_req.before is not None:
412 kwargs["chronological_pagination"] = False
413 if rsm_req.before != "":
414 kwargs["after_id"] = rsm_req.before
415 elif rsm_req.index is not None:
416 kwargs["start_index"] = rsm_req.index
417
418 log.info(
419 f"No cache found for node {node} at {service} (AP account {ap_account}), "
420 "using Collection Paging to RSM translation"
421 )
422 if self.apg._m.is_comment_node(node):
423 parent_item = self.apg._m.get_parent_item(node)
424 try:
425 parent_data = await self.apg.ap_get(parent_item)
426 collection = await self.apg.ap_get_object(
427 parent_data.get("object", {}),
428 "replies"
429 )
430 except Exception as e:
431 raise error.StanzaError(
432 "item-not-found",
433 text=e
434 )
435 else:
436 actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
437 collection = await self.apg.ap_get_object(actor_data, collection_name)
438 if not collection:
439 raise error.StanzaError(
440 "item-not-found",
441 text=f"No collection found for node {node!r} (account: {ap_account})"
442 )
443
444 kwargs["parser"] = parser
445 return await self.apg.get_ap_items(collection, **kwargs)
446
447 @ensure_deferred
448 async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
449 raise error.StanzaError("forbidden")
450
451 @ensure_deferred
452 async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
453 # TODO: handle comments nodes
454 client = self.apg.client
455 # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow
456 # is accepted. Other nodes are directly set to subscribed, their subscriptions
457 # being internal.
458 if nodeIdentifier == self.apg._m.namespace:
459 sub_state = SubscriptionState.PENDING
460 else:
461 sub_state = SubscriptionState.SUBSCRIBED
462 node = await self.host.memory.storage.get_pubsub_node(
463 client, service, nodeIdentifier, with_subscriptions=True
464 )
465 if node is None:
466 node = await self.host.memory.storage.set_pubsub_node(
467 client,
468 service,
469 nodeIdentifier,
470 )
471 subscription = None
472 else:
473 try:
474 subscription = next(
475 s for s in node.subscriptions
476 if s.subscriber == requestor.userhostJID()
477 )
478 except StopIteration:
479 subscription = None
480
481 if subscription is None:
482 subscription = PubsubSub(
483 subscriber=requestor.userhostJID(),
484 state=sub_state
485 )
486 node.subscriptions.append(subscription)
487 await self.host.memory.storage.add(node)
488 else:
489 if subscription.state is None:
490 subscription.state = sub_state
491 await self.host.memory.storage.add(node)
492 elif subscription.state == SubscriptionState.SUBSCRIBED:
493 log.info(
494 f"{requestor.userhostJID()} has already a subscription to {node!r} "
495 f"at {service}. Doing the request anyway."
496 )
497 elif subscription.state == SubscriptionState.PENDING:
498 log.info(
499 f"{requestor.userhostJID()} has already a pending subscription to "
500 f"{node!r} at {service}. Doing the request anyway."
501 )
502 if sub_state != SubscriptionState.PENDING:
503 subscription.state = sub_state
504 await self.host.memory.storage.add(node)
505 else:
506 raise exceptions.InternalError(
507 f"unmanaged subscription state: {subscription.state}"
508 )
509
510 if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace):
511 # if we subscribe to microblog or events node, we follow the corresponding
512 # account
513 req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox(
514 requestor, service
515 )
516
517 data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id)
518
519 resp = await self.apg.sign_and_post(inbox, req_actor_id, data)
520 if resp.code >= 300:
521 text = await resp.text()
522 raise error.StanzaError("service-unavailable", text=text)
523 return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
524
525 @ensure_deferred
526 async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
527 req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox(
528 requestor, service
529 )
530 data = self.apg.create_activity(
531 "Undo",
532 req_actor_id,
533 self.apg.create_activity(
534 "Follow",
535 req_actor_id,
536 recip_actor_id
537 )
538 )
539
540 resp = await self.apg.sign_and_post(inbox, req_actor_id, data)
541 if resp.code >= 300:
542 text = await resp.text()
543 raise error.StanzaError("service-unavailable", text=text)
544
545 def getConfigurationOptions(self):
546 return NODE_OPTIONS
547
548 def getConfiguration(
549 self,
550 requestor: jid.JID,
551 service: jid.JID,
552 nodeIdentifier: str
553 ) -> defer.Deferred:
554 return defer.succeed(NODE_CONFIG_VALUES)
555
556 def getNodeInfo(
557 self,
558 requestor: jid.JID,
559 service: jid.JID,
560 nodeIdentifier: str,
561 pep: bool = False,
562 recipient: Optional[jid.JID] = None
563 ) -> Optional[dict]:
564 if not nodeIdentifier:
565 return None
566 info = {
567 "type": "leaf",
568 "meta-data": NODE_CONFIG
569 }
570 return info