diff sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3764:125c7043b277

comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers: this patch implements those major features: - `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish to AP using this service - replies to XMPP items are managed - `inReplyTo` is filled when converting XMPP items to AP objects - `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to XMPP and put in cache. Subscriptions are always public. - `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription (which should be XEP-0465, but the XEP is not yet published at the time of commit), in both directions. - new helper methods to check if an URL is local and to get JID from actor ID doc will follow to explain behaviour rel 365
author Goffi <goffi@goffi.org>
date Fri, 13 May 2022 19:12:33 +0200
parents a8c7e5cef0cb
children b5c9021020df
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Fri May 13 18:50:33 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Fri May 13 19:12:33 2022 +0200
@@ -17,7 +17,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import time
-from typing import Optional, Dict, List
+from typing import Optional, Dict, List, Any
 import json
 from urllib import parse
 from collections import deque
@@ -25,6 +25,7 @@
 from pprint import pformat
 
 from twisted.web import http, resource as web_resource, server
+from twisted.python import failure
 from twisted.internet import reactor, defer
 from twisted.words.protocols.jabber import jid, error
 from wokkel import pubsub, rsm
@@ -33,13 +34,14 @@
 from sat.core.constants import Const as C
 from sat.core.i18n import _
 from sat.core.log import getLogger
+from sat.tools import utils
 from sat.tools.common import date_utils
 from sat.memory.sqla_mapping import SubscriptionState
 
 from .constants import (
     CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
     AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
-    SIGN_HEADERS, HS2019, SIGN_EXP
+    SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING
 )
 from .regex import RE_SIG_PARAM
 
@@ -71,6 +73,15 @@
         log.warning(msg)
         request.setResponseCode(http_code, None if msg is None else msg.encode())
 
+    def _onRequestError(self, failure_: failure.Failure, request: "HTTPRequest") -> None:
+        log.error(f"Internal error: {failure_.value}")
+        self.responseCode(
+            request,
+            http.INTERNAL_SERVER_ERROR,
+            f"internal error: {failure_.value}"
+        )
+        request.finish()
+
     async def webfinger(self, request):
         url_parsed = parse.urlparse(request.uri.decode())
         query = parse.parse_qs(url_parsed.query)
@@ -97,6 +108,45 @@
         request.write(json.dumps(resp).encode())
         request.finish()
 
+    async def handleUndoActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: str,
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        if node is None:
+            node = self.apg._m.namespace
+        client = await self.apg.getVirtualClient(signing_actor)
+        objects = await self.apg.apGetList(data, "object")
+        for obj in objects:
+            type_ = obj.get("type")
+            actor = await self.apg.apGetSenderActor(obj)
+            if actor != signing_actor:
+                log.warning(f"ignoring object not attributed to signing actor: {data}")
+                continue
+            try:
+                target_account = obj["object"]
+            except KeyError:
+                log.warning(f'ignoring invalid object, missing "object" key: {data}')
+                continue
+            if not self.apg.isLocalURL(target_account):
+                log.warning(f"ignoring unfollow request to non local actor: {data}")
+                continue
+
+            if type_ == "Follow":
+                await self.apg._p.unsubscribe(
+                    client,
+                    account_jid,
+                    node,
+                    sender=client.jid,
+                )
+            else:
+                log.warning(f"Unmanaged undo type: {type_!r}")
+
     async def handleFollowActivity(
         self,
         request: "HTTPRequest",
@@ -114,7 +164,9 @@
             subscription = await self.apg._p.subscribe(
                 client,
                 account_jid,
-                node
+                node,
+                # subscriptions from AP are always public
+                options=self.apg._pps.setPublicOpt()
             )
         except pubsub.SubscriptionPending:
             log.info(f"subscription to node {node!r} of {account_jid} is pending")
@@ -129,6 +181,7 @@
                 "Accept", actor_id, object_=data
             )
             await self.apg.signAndPost(inbox, actor_id, accept_data)
+        await self.apg._c.synchronise(client, account_jid, node, resync=False)
 
     async def handleAcceptActivity(
         self,
@@ -216,9 +269,11 @@
         actor_url: str,
         signing_actor: Optional[str]
     ) -> dict:
-        inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account)
+        inbox = self.apg.buildAPURL(TYPE_INBOX, ap_account)
         shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX)
-        outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
+        outbox = self.apg.buildAPURL(TYPE_OUTBOX, ap_account)
+        followers = self.apg.buildAPURL(TYPE_FOLLOWERS, ap_account)
+        following = self.apg.buildAPURL(TYPE_FOLLOWING, ap_account)
 
         # we have to use AP account as preferredUsername because it is used to retrieve
         # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196)
@@ -232,8 +287,10 @@
             "id": actor_url,
             "type": "Person",
             "preferredUsername": preferred_username,
-            "inbox": inbox_url,
-            "outbox": outbox_url,
+            "inbox": inbox,
+            "outbox": outbox,
+            "followers": followers,
+            "following": following,
             "publicKey": {
                 "id": f"{actor_url}#main-key",
                 "owner": actor_url,
@@ -280,6 +337,8 @@
         ap_url: str,
         query_data: Dict[str, List[str]]
     ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
         # we only keep useful keys, and sort to have consistent URL which can
         # be used as ID
         url_keys = sorted(set(query_data) & {"page", "index", "before", "after"})
@@ -443,6 +502,83 @@
                 request, data, account_jid, node, ap_account, ap_url, signing_actor
             )
 
+    async def APFollowersRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> dict:
+        if node is None:
+            node = self.apg._m.namespace
+        client = self.apg.client
+        subscribers = await self.apg._pps.getPublicNodeSubscriptions(
+            client, account_jid, node
+        )
+        followers = []
+        for subscriber in subscribers.keys():
+            if subscriber.host == self.apg.client.jid.userhost():
+                # the subscriber is an AP user subscribed with this gateway
+                ap_account = self.apg._e.unescape(subscriber.user)
+            else:
+                # regular XMPP user
+                ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node)
+            followers.append(ap_account)
+
+        url = self.getCanonicalURL(request)
+        return {
+          "@context": "https://www.w3.org/ns/activitystreams",
+          "type": "OrderedCollection",
+          "id": url,
+          "totalItems": len(subscribers),
+          "first": {
+            "type": "OrderedCollectionPage",
+            "id": url,
+            "orderedItems": followers
+          }
+        }
+
+    async def APFollowingRequest(
+        self,
+        request: "HTTPRequest",
+        account_jid: jid.JID,
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: Optional[str]
+    ) -> dict[str, Any]:
+        client = self.apg.client
+        subscriptions = await self.apg._pps.subscriptions(
+            client, account_jid, node
+        )
+        following = []
+        for sub_dict in subscriptions:
+            service = jid.JID(sub_dict["service"])
+            if service.host == self.apg.client.jid.userhost():
+                # the subscription is to an AP actor with this gateway
+                ap_account = self.apg._e.unescape(service.user)
+            else:
+                # regular XMPP user
+                ap_account = await self.apg.getAPAccountFromJidAndNode(
+                    service, sub_dict["node"]
+                )
+            following.append(ap_account)
+
+        url = self.getCanonicalURL(request)
+        return {
+          "@context": "https://www.w3.org/ns/activitystreams",
+          "type": "OrderedCollection",
+          "id": url,
+          "totalItems": len(subscriptions),
+          "first": {
+            "type": "OrderedCollectionPage",
+            "id": url,
+            "orderedItems": following
+          }
+        }
+
     async def APRequest(
         self,
         request: "HTTPRequest",
@@ -490,7 +626,10 @@
             request.finish()
             return
 
-        return await self.APRequest(request, signing_actor)
+        try:
+            return await self.APRequest(request, signing_actor)
+        except Exception as e:
+            self._onRequestError(failure.Failure(e), request)
 
     async def checkSigningActor(self, data: dict, signing_actor: str) -> None:
         """That that signing actor correspond to actor declared in data
@@ -663,7 +802,8 @@
             defer.ensureDeferred(self.webfinger(request))
             return server.NOT_DONE_YET
         elif path.startswith(self.apg.ap_path):
-            defer.ensureDeferred(self.APRequest(request))
+            d = defer.ensureDeferred(self.APRequest(request))
+            d.addErrback(self._onRequestError, request)
             return server.NOT_DONE_YET
 
         return web_resource.NoResource().render(request)