Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/http_server.py @ 3764:125c7043b277
comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
this patch implements those major features:
- `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish
to AP using this service
- replies to XMPP items are managed
- `inReplyTo` is filled when converting XMPP items to AP objects
- `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to
XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to
XMPP and put in cache. Subscriptions are always public.
- `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription
(which should be XEP-0465, but the XEP is not yet published at the time of commit), in
both directions.
- new helper methods to check if an URL is local and to get JID from actor ID
doc will follow to explain behaviour
rel 365
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 13 May 2022 19:12:33 +0200 |
parents | a8c7e5cef0cb |
children | b5c9021020df |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py Fri May 13 18:50:33 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Fri May 13 19:12:33 2022 +0200 @@ -17,7 +17,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import time -from typing import Optional, Dict, List +from typing import Optional, Dict, List, Any import json from urllib import parse from collections import deque @@ -25,6 +25,7 @@ from pprint import pformat from twisted.web import http, resource as web_resource, server +from twisted.python import failure from twisted.internet import reactor, defer from twisted.words.protocols.jabber import jid, error from wokkel import pubsub, rsm @@ -33,13 +34,14 @@ from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger +from sat.tools import utils from sat.tools.common import date_utils from sat.memory.sqla_mapping import SubscriptionState from .constants import ( CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, - SIGN_HEADERS, HS2019, SIGN_EXP + SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING ) from .regex import RE_SIG_PARAM @@ -71,6 +73,15 @@ log.warning(msg) request.setResponseCode(http_code, None if msg is None else msg.encode()) + def _onRequestError(self, failure_: failure.Failure, request: "HTTPRequest") -> None: + log.error(f"Internal error: {failure_.value}") + self.responseCode( + request, + http.INTERNAL_SERVER_ERROR, + f"internal error: {failure_.value}" + ) + request.finish() + async def webfinger(self, request): url_parsed = parse.urlparse(request.uri.decode()) query = parse.parse_qs(url_parsed.query) @@ -97,6 +108,45 @@ request.write(json.dumps(resp).encode()) request.finish() + async def handleUndoActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: jid.JID, + node: Optional[str], + ap_account: str, + ap_url: str, + signing_actor: str + ) -> None: + if node is None: + node = self.apg._m.namespace + client = await self.apg.getVirtualClient(signing_actor) + objects = await self.apg.apGetList(data, "object") + for obj in objects: + type_ = obj.get("type") + actor = await self.apg.apGetSenderActor(obj) + if actor != signing_actor: + log.warning(f"ignoring object not attributed to signing actor: {data}") + continue + try: + target_account = obj["object"] + except KeyError: + log.warning(f'ignoring invalid object, missing "object" key: {data}') + continue + if not self.apg.isLocalURL(target_account): + log.warning(f"ignoring unfollow request to non local actor: {data}") + continue + + if type_ == "Follow": + await self.apg._p.unsubscribe( + client, + account_jid, + node, + sender=client.jid, + ) + else: + log.warning(f"Unmanaged undo type: {type_!r}") + async def handleFollowActivity( self, request: "HTTPRequest", @@ -114,7 +164,9 @@ subscription = await self.apg._p.subscribe( client, account_jid, - node + node, + # subscriptions from AP are always public + options=self.apg._pps.setPublicOpt() ) except pubsub.SubscriptionPending: log.info(f"subscription to node {node!r} of {account_jid} is pending") @@ -129,6 +181,7 @@ "Accept", actor_id, object_=data ) await self.apg.signAndPost(inbox, actor_id, accept_data) + await self.apg._c.synchronise(client, account_jid, node, resync=False) async def handleAcceptActivity( self, @@ -216,9 +269,11 @@ actor_url: str, signing_actor: Optional[str] ) -> dict: - inbox_url = self.apg.buildAPURL(TYPE_INBOX, ap_account) + inbox = self.apg.buildAPURL(TYPE_INBOX, ap_account) shared_inbox = self.apg.buildAPURL(TYPE_SHARED_INBOX) - outbox_url = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) + outbox = self.apg.buildAPURL(TYPE_OUTBOX, ap_account) + followers = self.apg.buildAPURL(TYPE_FOLLOWERS, ap_account) + following = self.apg.buildAPURL(TYPE_FOLLOWING, ap_account) # we have to use AP account as preferredUsername because it is used to retrieve # actor handle (see https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196) @@ -232,8 +287,10 @@ "id": actor_url, "type": "Person", "preferredUsername": preferred_username, - "inbox": inbox_url, - "outbox": outbox_url, + "inbox": inbox, + "outbox": outbox, + "followers": followers, + "following": following, "publicKey": { "id": f"{actor_url}#main-key", "owner": actor_url, @@ -280,6 +337,8 @@ ap_url: str, query_data: Dict[str, List[str]] ) -> dict: + if node is None: + node = self.apg._m.namespace # we only keep useful keys, and sort to have consistent URL which can # be used as ID url_keys = sorted(set(query_data) & {"page", "index", "before", "after"}) @@ -443,6 +502,83 @@ request, data, account_jid, node, ap_account, ap_url, signing_actor ) + async def APFollowersRequest( + self, + request: "HTTPRequest", + account_jid: jid.JID, + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: Optional[str] + ) -> dict: + if node is None: + node = self.apg._m.namespace + client = self.apg.client + subscribers = await self.apg._pps.getPublicNodeSubscriptions( + client, account_jid, node + ) + followers = [] + for subscriber in subscribers.keys(): + if subscriber.host == self.apg.client.jid.userhost(): + # the subscriber is an AP user subscribed with this gateway + ap_account = self.apg._e.unescape(subscriber.user) + else: + # regular XMPP user + ap_account = await self.apg.getAPAccountFromJidAndNode(subscriber, node) + followers.append(ap_account) + + url = self.getCanonicalURL(request) + return { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": url, + "totalItems": len(subscribers), + "first": { + "type": "OrderedCollectionPage", + "id": url, + "orderedItems": followers + } + } + + async def APFollowingRequest( + self, + request: "HTTPRequest", + account_jid: jid.JID, + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: Optional[str] + ) -> dict[str, Any]: + client = self.apg.client + subscriptions = await self.apg._pps.subscriptions( + client, account_jid, node + ) + following = [] + for sub_dict in subscriptions: + service = jid.JID(sub_dict["service"]) + if service.host == self.apg.client.jid.userhost(): + # the subscription is to an AP actor with this gateway + ap_account = self.apg._e.unescape(service.user) + else: + # regular XMPP user + ap_account = await self.apg.getAPAccountFromJidAndNode( + service, sub_dict["node"] + ) + following.append(ap_account) + + url = self.getCanonicalURL(request) + return { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "OrderedCollection", + "id": url, + "totalItems": len(subscriptions), + "first": { + "type": "OrderedCollectionPage", + "id": url, + "orderedItems": following + } + } + async def APRequest( self, request: "HTTPRequest", @@ -490,7 +626,10 @@ request.finish() return - return await self.APRequest(request, signing_actor) + try: + return await self.APRequest(request, signing_actor) + except Exception as e: + self._onRequestError(failure.Failure(e), request) async def checkSigningActor(self, data: dict, signing_actor: str) -> None: """That that signing actor correspond to actor declared in data @@ -663,7 +802,8 @@ defer.ensureDeferred(self.webfinger(request)) return server.NOT_DONE_YET elif path.startswith(self.apg.ap_path): - defer.ensureDeferred(self.APRequest(request)) + d = defer.ensureDeferred(self.APRequest(request)) + d.addErrback(self._onRequestError, request) return server.NOT_DONE_YET return web_resource.NoResource().render(request)