diff libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_comp_ap_gateway/pubsub_service.py@524856bd7b19
children 92551baea115
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,570 @@
+#!/usr/bin/env python3
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+from typing import Optional, Tuple, List, Dict, Any, Union
+from urllib.parse import urlparse
+from pathlib import Path
+from base64 import b64encode
+import tempfile
+from twisted.internet import defer, threads
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+from wokkel import rsm, pubsub, disco
+from libervia.backend.core.i18n import _
+from libervia.backend.core import exceptions
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.constants import Const as C
+from libervia.backend.tools import image
+from libervia.backend.tools.utils import ensure_deferred
+from libervia.backend.tools.web import download_file
+from libervia.backend.memory.sqla_mapping import PubsubSub, SubscriptionState
+from .constants import (
+log = getLogger(__name__)
+# all nodes have the same config
+    {"var": "pubsub#persist_items", "type": "boolean", "value": True},
+    {"var": "pubsub#max_items", "value": "max"},
+    {"var": "pubsub#access_model", "type": "list-single", "value": "open"},
+    {"var": "pubsub#publish_model", "type": "list-single", "value": "open"},
+NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG}
+NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG}
+for c in NODE_CONFIG:
+    NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")})
+class APPubsubService(rsm.PubSubService):
+    """Pubsub service for XMPP requests"""
+    def __init__(self, apg):
+        super(APPubsubService, self).__init__()
+        self.host = apg.host
+        self.apg = apg
+        self.discoIdentity = {
+            "category": "pubsub",
+            "type": "service",
+            "name": "Libervia ActivityPub Gateway",
+        }
+    async def get_ap_actor_ids_and_inbox(
+        self,
+        requestor: jid.JID,
+        recipient: jid.JID,
+    ) -> Tuple[str, str, str]:
+        """Get AP actor IDs from requestor and destinee JIDs
+        @param requestor: XMPP entity doing a request to an AP actor via the gateway
+        @param recipient: JID mapping an AP actor via the gateway
+        @return: requestor actor ID, recipient actor ID and recipient inbox
+        @raise error.StanzaError: "item-not-found" is raised if not user part is specified
+            in requestor
+        """
+        if not recipient.user:
+            raise error.StanzaError(
+                "item-not-found",
+                text="No user part specified"
+            )
+        requestor_actor_id = self.apg.build_apurl(TYPE_ACTOR, requestor.userhost())
+        recipient_account = self.apg._e.unescape(recipient.user)
+        recipient_actor_id = await self.apg.get_ap_actor_id_from_account(recipient_account)
+        inbox = await self.apg.get_ap_inbox_from_id(recipient_actor_id, use_shared=False)
+        return requestor_actor_id, recipient_actor_id, inbox
+    @ensure_deferred
+    async def publish(self, requestor, service, nodeIdentifier, items):
+        if self.apg.local_only and not self.apg.is_local(requestor):
+            raise error.StanzaError(
+                "forbidden",
+                "Only local users can publish on this gateway."
+            )
+        if not service.user:
+            raise error.StanzaError(
+                "bad-request",
+                "You must specify an ActivityPub actor account in JID user part."
+            )
+        ap_account = self.apg._e.unescape(service.user)
+        if ap_account.count("@") != 1:
+            raise error.StanzaError(
+                "bad-request",
+                f"{ap_account!r} is not a valid ActivityPub actor account."
+            )
+        client = self.apg.client.get_virtual_client(requestor)
+        if self.apg._pa.is_attachment_node(nodeIdentifier):
+            await self.apg.convert_and_post_attachments(
+                client, ap_account, service, nodeIdentifier, items, publisher=requestor
+            )
+        else:
+            await self.apg.convert_and_post_items(
+                client, ap_account, service, nodeIdentifier, items
+            )
+            cached_node = await self.host.memory.storage.get_pubsub_node(
+                client, service, nodeIdentifier, with_subscriptions=True, create=True
+            )
+            await self.host.memory.storage.cache_pubsub_items(
+                client,
+                cached_node,
+                items
+            )
+            for subscription in cached_node.subscriptions:
+                if subscription.state != SubscriptionState.SUBSCRIBED:
+                    continue
+                self.notifyPublish(
+                    service,
+                    nodeIdentifier,
+                    [(subscription.subscriber, None, items)]
+                )
+    async def ap_following_2_elt(self, ap_item: dict) -> domish.Element:
+        """Convert actor ID from following collection to XMPP item"""
+        actor_id = ap_item["id"]
+        actor_jid = await self.apg.get_jid_from_id(actor_id)
+        subscription_elt = self.apg._pps.build_subscription_elt(
+            self.apg._m.namespace, actor_jid
+        )
+        item_elt = pubsub.Item(id=actor_id, payload=subscription_elt)
+        return item_elt
+    async def ap_follower_2_elt(self, ap_item: dict) -> domish.Element:
+        """Convert actor ID from followers collection to XMPP item"""
+        actor_id = ap_item["id"]
+        actor_jid = await self.apg.get_jid_from_id(actor_id)
+        subscriber_elt = self.apg._pps.build_subscriber_elt(actor_jid)
+        item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt)
+        return item_elt
+    async def generate_v_card(self, ap_account: str) -> domish.Element:
+        """Generate vCard4 (XEP-0292) item element from ap_account's metadata"""
+        actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
+        identity_data = {}
+        summary = actor_data.get("summary")
+        # summary is HTML, we have to convert it to text
+        if summary:
+            identity_data["description"] = await self.apg._t.convert(
+                summary,
+                self.apg._t.SYNTAX_XHTML,
+                self.apg._t.SYNTAX_TEXT,
+                False,
+            )
+        for field in ("name", "preferredUsername"):
+            value = actor_data.get(field)
+            if value:
+                identity_data.setdefault("nicknames", []).append(value)
+        vcard_elt = self.apg._v.dict_2_v_card(identity_data)
+        item_elt = domish.Element((pubsub.NS_PUBSUB, "item"))
+        item_elt.addChild(vcard_elt)
+        item_elt["id"] = self.apg._p.ID_SINGLETON
+        return item_elt
+    async def get_avatar_data(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str
+    ) -> Dict[str, Any]:
+        """Retrieve actor's avatar if any, cache it and file actor_data
+        ``cache_uid``, `path``` and ``media_type`` keys are always files
+        ``base64`` key is only filled if the file was not already in cache
+        """
+        actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
+        for icon in await self.apg.ap_get_list(actor_data, "icon"):
+            url = icon.get("url")
+            if icon["type"] != "Image" or not url:
+                continue
+            parsed_url = urlparse(url)
+            if not parsed_url.scheme in ("http", "https"):
+                log.warning(f"unexpected URL scheme: {url!r}")
+                continue
+            filename = Path(parsed_url.path).name
+            if not filename:
+                log.warning(f"ignoring URL with invald path: {url!r}")
+                continue
+            break
+        else:
+            raise error.StanzaError("item-not-found")
+        key = f"{ST_AVATAR}{url}"
+        cache_uid = await client._ap_storage.get(key)
+        if cache_uid is None:
+            cache = None
+        else:
+            cache = self.apg.host.common_cache.get_metadata(cache_uid)
+        if cache is None:
+            with tempfile.TemporaryDirectory() as dir_name:
+                dest_path = Path(dir_name, filename)
+                await download_file(url, dest_path, max_size=MAX_AVATAR_SIZE)
+                avatar_data = {
+                    "path": dest_path,
+                    "filename": filename,
+                    'media_type': image.guess_type(dest_path),
+                }
+                await self.apg._i.cache_avatar(
+                    self.apg.IMPORT_NAME,
+                    avatar_data
+                )
+        else:
+            avatar_data = {
+            "cache_uid": cache["uid"],
+            "path": cache["path"],
+            "media_type": cache["mime_type"]
+        }
+        return avatar_data
+    async def generate_avatar_metadata(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str
+    ) -> domish.Element:
+        """Generate the metadata element for user avatar
+        @raise StanzaError("item-not-found"): no avatar is present in actor data (in
+            ``icon`` field)
+        """
+        avatar_data = await self.get_avatar_data(client, ap_account)
+        return self.apg._a.build_item_metadata_elt(avatar_data)
+    def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None:
+        with avatar_data["path"].open("rb") as f:
+            avatar_data["base64"] = b64encode(f.read()).decode()
+    async def generate_avatar_data(
+        self,
+        client: SatXMPPEntity,
+        ap_account: str,
+        itemIdentifiers: Optional[List[str]],
+    ) -> domish.Element:
+        """Generate the data element for user avatar
+        @raise StanzaError("item-not-found"): no avatar cached with requested ID
+        """
+        if not itemIdentifiers:
+            avatar_data = await self.get_avatar_data(client, ap_account)
+            if "base64" not in avatar_data:
+                await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data)
+        else:
+            if len(itemIdentifiers) > 1:
+                # only a single item ID is supported
+                raise error.StanzaError("item-not-found")
+            item_id = itemIdentifiers[0]
+            # just to be sure that that we don't have an empty string
+            assert item_id
+            cache_data = self.apg.host.common_cache.get_metadata(item_id)
+            if cache_data is None:
+                raise error.StanzaError("item-not-found")
+            avatar_data = {
+                "cache_uid": item_id,
+                "path": cache_data["path"]
+            }
+            await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data)
+        return self.apg._a.build_item_data_elt(avatar_data)
+    @ensure_deferred
+    async def items(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        node: str,
+        maxItems: Optional[int],
+        itemIdentifiers: Optional[List[str]],
+        rsm_req: Optional[rsm.RSMRequest]
+    ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
+        if not service.user:
+            return [], None
+        ap_account = self.host.plugins["XEP-0106"].unescape(service.user)
+        if ap_account.count("@") != 1:
+            log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}")
+            return [], None
+        # cached_node may be pre-filled with some nodes (e.g. attachments nodes),
+        # otherwise it is filled when suitable
+        cached_node = None
+        client = self.apg.client
+        kwargs = {}
+        if node == self.apg._pps.subscriptions_node:
+            collection_name = "following"
+            parser = self.ap_following_2_elt
+            kwargs["only_ids"] = True
+            use_cache = False
+        elif node.startswith(self.apg._pps.subscribers_node_prefix):
+            collection_name = "followers"
+            parser = self.ap_follower_2_elt
+            kwargs["only_ids"] = True
+            use_cache = False
+        elif node == self.apg._v.node:
+            # vCard4 request
+            item_elt = await self.generate_v_card(ap_account)
+            return [item_elt], None
+        elif node == self.apg._a.namespace_metadata:
+            item_elt = await self.generate_avatar_metadata(self.apg.client, ap_account)
+            return [item_elt], None
+        elif node == self.apg._a.namespace_data:
+            item_elt = await self.generate_avatar_data(
+                self.apg.client, ap_account, itemIdentifiers
+            )
+            return [item_elt], None
+        elif self.apg._pa.is_attachment_node(node):
+            use_cache = True
+            # we check cache here because we emit an item-not-found error if the node is
+            # not in cache, as we are not dealing with real AP items
+            cached_node = await self.host.memory.storage.get_pubsub_node(
+                client, service, node
+            )
+            if cached_node is None:
+                raise error.StanzaError("item-not-found")
+        else:
+            if node.startswith(self.apg._m.namespace):
+                parser = self.apg.ap_item_2_mb_elt
+            elif node.startswith(self.apg._events.namespace):
+                parser = self.apg.ap_events.ap_item_2_event_elt
+            else:
+                raise error.StanzaError(
+                    "feature-not-implemented",
+                    text=f"AP Gateway {C.APP_VERSION} only supports "
+                    f"{self.apg._m.namespace} node for now"
+                )
+            collection_name = "outbox"
+            use_cache = True
+        if use_cache:
+            if cached_node is None:
+                cached_node = await self.host.memory.storage.get_pubsub_node(
+                    client, service, node
+                )
+            # TODO: check if node is synchronised
+            if cached_node is not None:
+                # the node is cached, we return items from cache
+                log.debug(f"node {node!r} from {service} is in cache")
+                pubsub_items, metadata = await self.apg._c.get_items_from_cache(
+                    client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req
+                )
+                try:
+                    rsm_resp = rsm.RSMResponse(**metadata["rsm"])
+                except KeyError:
+                    rsm_resp = None
+                return [i.data for i in pubsub_items], rsm_resp
+        if itemIdentifiers:
+            items = []
+            for item_id in itemIdentifiers:
+                item_data = await self.apg.ap_get(item_id)
+                item_elt = await parser(item_data)
+                items.append(item_elt)
+            return items, None
+        else:
+            if rsm_req is None:
+                if maxItems is None:
+                    maxItems = 20
+                kwargs.update({
+                    "max_items": maxItems,
+                    "chronological_pagination": False,
+                })
+            else:
+                if len(
+                    [v for v in (rsm_req.after, rsm_req.before, rsm_req.index)
+                     if v is not None]
+                ) > 1:
+                    raise error.StanzaError(
+                        "bad-request",
+                        text="You can't use after, before and index at the same time"
+                    )
+                kwargs.update({"max_items": rsm_req.max})
+                if rsm_req.after is not None:
+                    kwargs["after_id"] = rsm_req.after
+                elif rsm_req.before is not None:
+                    kwargs["chronological_pagination"] = False
+                    if rsm_req.before != "":
+                        kwargs["after_id"] = rsm_req.before
+                elif rsm_req.index is not None:
+                    kwargs["start_index"] = rsm_req.index
+            log.info(
+                f"No cache found for node {node} at {service} (AP account {ap_account}), "
+                "using Collection Paging to RSM translation"
+            )
+            if self.apg._m.is_comment_node(node):
+                parent_item = self.apg._m.get_parent_item(node)
+                try:
+                    parent_data = await self.apg.ap_get(parent_item)
+                    collection = await self.apg.ap_get_object(
+                        parent_data.get("object", {}),
+                        "replies"
+                    )
+                except Exception as e:
+                    raise error.StanzaError(
+                        "item-not-found",
+                        text=e
+                    )
+            else:
+                actor_data = await self.apg.get_ap_actor_data_from_account(ap_account)
+                collection = await self.apg.ap_get_object(actor_data, collection_name)
+            if not collection:
+                raise error.StanzaError(
+                    "item-not-found",
+                    text=f"No collection found for node {node!r} (account: {ap_account})"
+                )
+            kwargs["parser"] = parser
+            return await self.apg.get_ap_items(collection, **kwargs)
+    @ensure_deferred
+    async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
+        raise error.StanzaError("forbidden")
+    @ensure_deferred
+    async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
+        # TODO: handle comments nodes
+        client = self.apg.client
+        # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow
+        # is accepted. Other nodes are directly set to subscribed, their subscriptions
+        # being internal.
+        if nodeIdentifier == self.apg._m.namespace:
+            sub_state = SubscriptionState.PENDING
+        else:
+            sub_state = SubscriptionState.SUBSCRIBED
+        node = await self.host.memory.storage.get_pubsub_node(
+            client, service, nodeIdentifier, with_subscriptions=True
+        )
+        if node is None:
+            node = await self.host.memory.storage.set_pubsub_node(
+                client,
+                service,
+                nodeIdentifier,
+            )
+            subscription = None
+        else:
+            try:
+                subscription = next(
+                    s for s in node.subscriptions
+                    if s.subscriber == requestor.userhostJID()
+                )
+            except StopIteration:
+                subscription = None
+        if subscription is None:
+            subscription = PubsubSub(
+                subscriber=requestor.userhostJID(),
+                state=sub_state
+            )
+            node.subscriptions.append(subscription)
+            await self.host.memory.storage.add(node)
+        else:
+            if subscription.state is None:
+                subscription.state = sub_state
+                await self.host.memory.storage.add(node)
+            elif subscription.state == SubscriptionState.SUBSCRIBED:
+                log.info(
+                    f"{requestor.userhostJID()} has already a subscription to {node!r} "
+                    f"at {service}. Doing the request anyway."
+                )
+            elif subscription.state == SubscriptionState.PENDING:
+                log.info(
+                    f"{requestor.userhostJID()} has already a pending subscription to "
+                    f"{node!r} at {service}. Doing the request anyway."
+                )
+                if sub_state != SubscriptionState.PENDING:
+                    subscription.state = sub_state
+                    await self.host.memory.storage.add(node)
+            else:
+                raise exceptions.InternalError(
+                    f"unmanaged subscription state: {subscription.state}"
+                )
+        if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace):
+            # if we subscribe to microblog or events node, we follow the corresponding
+            # account
+            req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox(
+                requestor, service
+            )
+            data = self.apg.create_activity("Follow", req_actor_id, recip_actor_id)
+            resp = await self.apg.sign_and_post(inbox, req_actor_id, data)
+            if resp.code >= 300:
+                text = await resp.text()
+                raise error.StanzaError("service-unavailable", text=text)
+        return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
+    @ensure_deferred
+    async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
+        req_actor_id, recip_actor_id, inbox = await self.get_ap_actor_ids_and_inbox(
+            requestor, service
+        )
+        data = self.apg.create_activity(
+            "Undo",
+            req_actor_id,
+            self.apg.create_activity(
+                "Follow",
+                req_actor_id,
+                recip_actor_id
+            )
+        )
+        resp = await self.apg.sign_and_post(inbox, req_actor_id, data)
+        if resp.code >= 300:
+            text = await resp.text()
+            raise error.StanzaError("service-unavailable", text=text)
+    def getConfigurationOptions(self):
+        return NODE_OPTIONS
+    def getConfiguration(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        nodeIdentifier: str
+    ) -> defer.Deferred:
+        return defer.succeed(NODE_CONFIG_VALUES)
+    def getNodeInfo(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        nodeIdentifier: str,
+        pep: bool = False,
+        recipient: Optional[jid.JID] = None
+    ) -> Optional[dict]:
+        if not nodeIdentifier:
+            return None
+        info = {
+            "type": "leaf",
+            "meta-data": NODE_CONFIG
+        }
+        return info