Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_comp_ap_gateway/pubsub_service.py@524856bd7b19 |
children | 92551baea115 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,570 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Optional, Tuple, List, Dict, Any, Union +from urllib.parse import urlparse +from pathlib import Path +from base64 import b64encode +import tempfile + +from twisted.internet import defer, threads +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish +from wokkel import rsm, pubsub, disco + +from libervia.backend.core.i18n import _ +from libervia.backend.core import exceptions +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.log import getLogger +from libervia.backend.core.constants import Const as C +from libervia.backend.tools import image +from libervia.backend.tools.utils import ensure_deferred +from libervia.backend.tools.web import download_file +from libervia.backend.memory.sqla_mapping import PubsubSub, SubscriptionState + +from .constants import ( + TYPE_ACTOR, + ST_AVATAR, + MAX_AVATAR_SIZE +) + + +log = getLogger(__name__) + +# all nodes have the same config +NODE_CONFIG = [ + {"var": "pubsub#persist_items", "type": "boolean", "value": True}, + {"var": "pubsub#max_items", "value": "max"}, + {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, + {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, + +] + +NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} +NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} +for c in NODE_CONFIG: + NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")}) + + +class APPubsubService(rsm.PubSubService): + """Pubsub service for XMPP requests""" + + def __init__(self, apg): + super(APPubsubService, self).__init__() + self.host = apg.host + self.apg = apg + self.discoIdentity = { + "category": "pubsub", + "type": "service", + "name": "Libervia ActivityPub Gateway", + } + + async def get_ap_actor_ids_and_inbox( + self, + requestor: jid.JID, + recipient: jid.JID, + ) -> Tuple[str, str, str]: + """Get AP actor IDs from requestor and destinee JIDs + + @param requestor: XMPP entity doing a request to an AP actor via the gateway + @param recipient: JID mapping an AP actor via the gateway + @return: requestor actor ID, recipient actor ID and recipient inbox + @raise error.StanzaError: "item-not-found" is raised if not user part is specified + in requestor + """ + if not recipient.user: + raise error.StanzaError( + "item-not-found", + text="No user part specified" + ) + requestor_actor_id = self.apg.build_apurl(TYPE_ACTOR, requestor.userhost()) + recipient_account = self.apg._e.unescape(recipient.user) + recipient_actor_id = await self.apg.get_ap_actor_id_from_account(recipient_account) + inbox = await self.apg.get_ap_inbox_from_id(recipient_actor_id, use_shared=False) + return requestor_actor_id, recipient_actor_id, inbox + + + @ensure_deferred + async def publish(self, requestor, service, nodeIdentifier, items): + if self.apg.local_only and not self.apg.is_local(requestor): + raise error.StanzaError( + "forbidden", + "Only local users can publish on this gateway." + ) + if not service.user: + raise error.StanzaError( + "bad-request", + "You must specify an ActivityPub actor account in JID user part." + ) + ap_account = self.apg._e.unescape(service.user) + if ap_account.count("@") != 1: + raise error.StanzaError( + "bad-request", + f"{ap_account!r} is not a valid ActivityPub actor account." + ) + + client = self.apg.client.get_virtual_client(requestor) + if self.apg._pa.is_attachment_node(nodeIdentifier): + await self.apg.convert_and_post_attachments( + client, ap_account, service, nodeIdentifier, items, publisher=requestor + ) + else: + await self.apg.convert_and_post_items( + client, ap_account, service, nodeIdentifier, items + ) + cached_node = await self.host.memory.storage.get_pubsub_node( + client, service, nodeIdentifier, with_subscriptions=True, create=True + ) + await self.host.memory.storage.cache_pubsub_items( + client, + cached_node, + items + ) + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.notifyPublish( + service, + nodeIdentifier, + [(subscription.subscriber, None, items)] + ) + + async def ap_following_2_elt(self, ap_item: dict) -> domish.Element: + """Convert actor ID from following collection to XMPP item""" + actor_id = ap_item["id"] + actor_jid = await self.apg.get_jid_from_id(actor_id) + subscription_elt = self.apg._pps.build_subscription_elt( + self.apg._m.namespace, actor_jid + ) + item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) + return item_elt + + async def ap_follower_2_elt(self, ap_item: dict) -> domish.Element: + """Convert actor ID from followers collection to XMPP item""" + actor_id = ap_item["id"] + actor_jid = await self.apg.get_jid_from_id(actor_id) + subscriber_elt = self.apg._pps.build_subscriber_elt(actor_jid) + item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) + return item_elt + + async def generate_v_card(self, ap_account: str) -> domish.Element: + """Generate vCard4 (XEP-0292) item element from ap_account's metadata""" + actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) + identity_data = {} + + summary = actor_data.get("summary") + # summary is HTML, we have to convert it to text + if summary: + identity_data["description"] = await self.apg._t.convert( + summary, + self.apg._t.SYNTAX_XHTML, + self.apg._t.SYNTAX_TEXT, + False, + ) + + for field in ("name", "preferredUsername"): + value = actor_data.get(field) + if value: + identity_data.setdefault("nicknames", []).append(value) + vcard_elt = self.apg._v.dict_2_v_card(identity_data) + item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) + item_elt.addChild(vcard_elt) + item_elt["id"] = self.apg._p.ID_SINGLETON + return item_elt + + async def get_avatar_data( + self, + client: SatXMPPEntity, + ap_account: str + ) -> Dict[str, Any]: + """Retrieve actor's avatar if any, cache it and file actor_data + + ``cache_uid``, `path``` and ``media_type`` keys are always files + ``base64`` key is only filled if the file was not already in cache + """ + actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) + + for icon in await self.apg.ap_get_list(actor_data, "icon"): + url = icon.get("url") + if icon["type"] != "Image" or not url: + continue + parsed_url = urlparse(url) + if not parsed_url.scheme in ("http", "https"): + log.warning(f"unexpected URL scheme: {url!r}") + continue + filename = Path(parsed_url.path).name + if not filename: + log.warning(f"ignoring URL with invald path: {url!r}") + continue + break + else: + raise error.StanzaError("item-not-found") + + key = f"{ST_AVATAR}{url}" + cache_uid = await client._ap_storage.get(key) + + if cache_uid is None: + cache = None + else: + cache = self.apg.host.common_cache.get_metadata(cache_uid) + + if cache is None: + with tempfile.TemporaryDirectory() as dir_name: + dest_path = Path(dir_name, filename) + await download_file(url, dest_path, max_size=MAX_AVATAR_SIZE) + avatar_data = { + "path": dest_path, + "filename": filename, + 'media_type': image.guess_type(dest_path), + } + + await self.apg._i.cache_avatar( + self.apg.IMPORT_NAME, + avatar_data + ) + else: + avatar_data = { + "cache_uid": cache["uid"], + "path": cache["path"], + "media_type": cache["mime_type"] + } + + return avatar_data + + async def generate_avatar_metadata( + self, + client: SatXMPPEntity, + ap_account: str + ) -> domish.Element: + """Generate the metadata element for user avatar + + @raise StanzaError("item-not-found"): no avatar is present in actor data (in + ``icon`` field) + """ + avatar_data = await self.get_avatar_data(client, ap_account) + return self.apg._a.build_item_metadata_elt(avatar_data) + + def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None: + with avatar_data["path"].open("rb") as f: + avatar_data["base64"] = b64encode(f.read()).decode() + + async def generate_avatar_data( + self, + client: SatXMPPEntity, + ap_account: str, + itemIdentifiers: Optional[List[str]], + ) -> domish.Element: + """Generate the data element for user avatar + + @raise StanzaError("item-not-found"): no avatar cached with requested ID + """ + if not itemIdentifiers: + avatar_data = await self.get_avatar_data(client, ap_account) + if "base64" not in avatar_data: + await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data) + else: + if len(itemIdentifiers) > 1: + # only a single item ID is supported + raise error.StanzaError("item-not-found") + item_id = itemIdentifiers[0] + # just to be sure that that we don't have an empty string + assert item_id + cache_data = self.apg.host.common_cache.get_metadata(item_id) + if cache_data is None: + raise error.StanzaError("item-not-found") + avatar_data = { + "cache_uid": item_id, + "path": cache_data["path"] + } + await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data) + + return self.apg._a.build_item_data_elt(avatar_data) + + @ensure_deferred + async def items( + self, + requestor: jid.JID, + service: jid.JID, + node: str, + maxItems: Optional[int], + itemIdentifiers: Optional[List[str]], + rsm_req: Optional[rsm.RSMRequest] + ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: + if not service.user: + return [], None + ap_account = self.host.plugins["XEP-0106"].unescape(service.user) + if ap_account.count("@") != 1: + log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") + return [], None + + # cached_node may be pre-filled with some nodes (e.g. attachments nodes), + # otherwise it is filled when suitable + cached_node = None + client = self.apg.client + kwargs = {} + + if node == self.apg._pps.subscriptions_node: + collection_name = "following" + parser = self.ap_following_2_elt + kwargs["only_ids"] = True + use_cache = False + elif node.startswith(self.apg._pps.subscribers_node_prefix): + collection_name = "followers" + parser = self.ap_follower_2_elt + kwargs["only_ids"] = True + use_cache = False + elif node == self.apg._v.node: + # vCard4 request + item_elt = await self.generate_v_card(ap_account) + return [item_elt], None + elif node == self.apg._a.namespace_metadata: + item_elt = await self.generate_avatar_metadata(self.apg.client, ap_account) + return [item_elt], None + elif node == self.apg._a.namespace_data: + item_elt = await self.generate_avatar_data( + self.apg.client, ap_account, itemIdentifiers + ) + return [item_elt], None + elif self.apg._pa.is_attachment_node(node): + use_cache = True + # we check cache here because we emit an item-not-found error if the node is + # not in cache, as we are not dealing with real AP items + cached_node = await self.host.memory.storage.get_pubsub_node( + client, service, node + ) + if cached_node is None: + raise error.StanzaError("item-not-found") + else: + if node.startswith(self.apg._m.namespace): + parser = self.apg.ap_item_2_mb_elt + elif node.startswith(self.apg._events.namespace): + parser = self.apg.ap_events.ap_item_2_event_elt + else: + raise error.StanzaError( + "feature-not-implemented", + text=f"AP Gateway {C.APP_VERSION} only supports " + f"{self.apg._m.namespace} node for now" + ) + collection_name = "outbox" + use_cache = True + + if use_cache: + if cached_node is None: + cached_node = await self.host.memory.storage.get_pubsub_node( + client, service, node + ) + # TODO: check if node is synchronised + if cached_node is not None: + # the node is cached, we return items from cache + log.debug(f"node {node!r} from {service} is in cache") + pubsub_items, metadata = await self.apg._c.get_items_from_cache( + client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req + ) + try: + rsm_resp = rsm.RSMResponse(**metadata["rsm"]) + except KeyError: + rsm_resp = None + return [i.data for i in pubsub_items], rsm_resp + + if itemIdentifiers: + items = [] + for item_id in itemIdentifiers: + item_data = await self.apg.ap_get(item_id) + item_elt = await parser(item_data) + items.append(item_elt) + return items, None + else: + if rsm_req is None: + if maxItems is None: + maxItems = 20 + kwargs.update({ + "max_items": maxItems, + "chronological_pagination": False, + }) + else: + if len( + [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) + if v is not None] + ) > 1: + raise error.StanzaError( + "bad-request", + text="You can't use after, before and index at the same time" + ) + kwargs.update({"max_items": rsm_req.max}) + if rsm_req.after is not None: + kwargs["after_id"] = rsm_req.after + elif rsm_req.before is not None: + kwargs["chronological_pagination"] = False + if rsm_req.before != "": + kwargs["after_id"] = rsm_req.before + elif rsm_req.index is not None: + kwargs["start_index"] = rsm_req.index + + log.info( + f"No cache found for node {node} at {service} (AP account {ap_account}), " + "using Collection Paging to RSM translation" + ) + if self.apg._m.is_comment_node(node): + parent_item = self.apg._m.get_parent_item(node) + try: + parent_data = await self.apg.ap_get(parent_item) + collection = await self.apg.ap_get_object( + parent_data.get("object", {}), + "replies" + ) + except Exception as e: + raise error.StanzaError( + "item-not-found", + text=e + ) + else: + actor_data = await self.apg.get_ap_actor_data_from_account(ap_account) + collection = await self.apg.ap_get_object(actor_data, collection_name) + if not collection: + raise error.StanzaError( + "item-not-found", + text=f"No collection found for node {node!r} (account: {ap_account})" + ) + + kwargs["parser"] = parser + return await self.apg.get_ap_items(collection, **kwargs) + + @ensure_deferred + async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): + raise error.StanzaError("forbidden") + + @ensure_deferred + async def subscribe(self, requestor, service, nodeIdentifier, subscriber): + # TODO: handle comments nodes + client = self.apg.client + # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow + # is accepted. Other nodes are directly set to subscribed, their subscriptions + # being internal. + if nodeIdentifier == self.apg._m.namespace: + sub_state = SubscriptionState.PENDING + else: + sub_state = SubscriptionState.SUBSCRIBED + node = await self.host.memory.storage.get_pubsub_node( + client, service, nodeIdentifier, with_subscriptions=True + ) + if node is None: + node = await self.host.memory.storage.set_pubsub_node( + client, + service, + nodeIdentifier, + ) + subscription = None + else: + try: + subscription = next( + s for s in node.subscriptions + if s.subscriber == requestor.userhostJID() + ) + except StopIteration: + subscription = None + + if subscription is None: + subscription = PubsubSub( + subscriber=requestor.userhostJID(), + state=sub_state + ) + node.subscriptions.append(subscription) + await self.host.memory.storage.add(node) + else: + if subscription.state is None: + subscription.state = sub_state + await self.host.memory.storage.add(node) + elif subscription.state == SubscriptionState.SUBSCRIBED: + log.info( + f"{requestor.userhostJID()} has already a subscription to {node!r} " + f"at {service}. Doing the request anyway." + ) + elif subscription.state == SubscriptionState.PENDING: + log.info( + f"{requestor.userhostJID()} has already a pending subscription to " + f"{node!r} at {service}. Doing the request anyway." + ) + if sub_state != SubscriptionState.PENDING: + subscription.state = sub_state + await self.host.memory.storage.add(node) + else: + raise exceptions.InternalError( + f"unmanaged subscription state: {subscription.state}" + ) + + if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): + # if we subscribe to microblog or events node, we follow the corresponding + # account + req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox( + requestor, service + ) + + data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id) + + resp = await self.apg.sign_and_post(inbox, req_actor_id, data) + if resp.code >= 300: + text = await resp.text() + raise error.StanzaError("service-unavailable", text=text) + return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") + + @ensure_deferred + async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): + req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox( + requestor, service + ) + data = self.apg.create_activity( + "Undo", + req_actor_id, + self.apg.create_activity( + "Follow", + req_actor_id, + recip_actor_id + ) + ) + + resp = await self.apg.sign_and_post(inbox, req_actor_id, data) + if resp.code >= 300: + text = await resp.text() + raise error.StanzaError("service-unavailable", text=text) + + def getConfigurationOptions(self): + return NODE_OPTIONS + + def getConfiguration( + self, + requestor: jid.JID, + service: jid.JID, + nodeIdentifier: str + ) -> defer.Deferred: + return defer.succeed(NODE_CONFIG_VALUES) + + def getNodeInfo( + self, + requestor: jid.JID, + service: jid.JID, + nodeIdentifier: str, + pep: bool = False, + recipient: Optional[jid.JID] = None + ) -> Optional[dict]: + if not nodeIdentifier: + return None + info = { + "type": "leaf", + "meta-data": NODE_CONFIG + } + return info