Mercurial > libervia-backend
view sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3745:a8c7e5cef0cb
comp AP gateway: signature checking, caching and threads management:
- HTTP signature is checked for incoming messages
- AP actor can now be followed using pubsub subscription. When following is accepted, the
node is cached
- replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth`
option to limit the number of comment nodes for a root message (documentation will come
to explain this).
ticket 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | 86eea17cafa7 |
children | 125c7043b277 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Tuple, List, Dict, Any from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish from wokkel import rsm, pubsub, data_form from sat.core.i18n import _ from sat.core import exceptions from sat.core.log import getLogger from sat.core.constants import Const as C from sat.tools.utils import ensure_deferred from sat.memory.sqla_mapping import PubsubSub, SubscriptionState from .constants import ( TYPE_ACTOR, ) log = getLogger(__name__) # all nodes have the same config NODE_CONFIG = [ {"var": "pubsub#persist_items", "type": "boolean", "value": True}, {"var": "pubsub#max_items", "value": "max"}, {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, ] NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} for c in NODE_CONFIG: NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")}) class APPubsubService(rsm.PubSubService): """Pubsub service for XMPP requests""" def __init__(self, apg): super(APPubsubService, self).__init__() self.host = apg.host self.apg = apg self.discoIdentity = { "category": "pubsub", "type": "service", "name": "Libervia ActivityPub Gateway", } async def getAPActorIdsAndInbox( self, requestor: jid.JID, recipient: jid.JID, ) -> Tuple[str, str, str]: """Get AP actor IDs from requestor and destinee JIDs @param requestor: XMPP entity doing a request to an AP actor via the gateway @param recipient: JID mapping an AP actor via the gateway @return: requestor actor ID, recipient actor ID and recipient inbox @raise error.StanzaError: "item-not-found" is raised if not user part is specified in requestor """ if not recipient.user: raise error.StanzaError( "item-not-found", text="No user part specified" ) requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost()) recipient_account = self.apg._e.unescape(recipient.user) recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account) inbox = await self.apg.getAPInboxFromId(recipient_actor_id) return requestor_actor_id, recipient_actor_id, inbox @ensure_deferred async def publish(self, requestor, service, nodeIdentifier, items): raise NotImplementedError @ensure_deferred async def items( self, requestor: jid.JID, service: jid.JID, node: str, maxItems: Optional[int], itemIdentifiers: Optional[List[str]], rsm_req: Optional[rsm.RSMRequest] ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: if not service.user: return [], None ap_account = self.host.plugins["XEP-0106"].unescape(service.user) if ap_account.count("@") != 1: log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") return [], None if not node.startswith(self.apg._m.namespace): raise error.StanzaError( "feature-not-implemented", text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} " "node for now" ) client = self.apg.client cached_node = await self.host.memory.storage.getPubsubNode( client, service, node ) # TODO: check if node is synchronised if cached_node is not None: # the node is cached, we return items from cache log.debug(f"node {node!r} from {service} is in cache") pubsub_items, metadata = await self.apg._c.getItemsFromCache( client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req ) try: rsm_resp = rsm.RSMResponse(**metadata["rsm"]) except KeyError: rsm_resp = None return [i.data for i in pubsub_items], rsm_resp if itemIdentifiers: items = [] for item_id in itemIdentifiers: item_data = await self.apg.apGet(item_id) item_elt = await self.apg.apItem2Elt(item_data) items.append(item_elt) return items, None else: if rsm_req is None: if maxItems is None: maxItems = 20 kwargs = { "max_items": maxItems, "chronological_pagination": False, } else: if len( [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) if v is not None] ) > 1: raise error.StanzaError( "bad-request", text="You can't use after, before and index at the same time" ) kwargs = {"max_items": rsm_req.max} if rsm_req.after is not None: kwargs["after_id"] = rsm_req.after elif rsm_req.before is not None: kwargs["chronological_pagination"] = False if rsm_req.before != "": kwargs["after_id"] = rsm_req.before elif rsm_req.index is not None: kwargs["start_index"] = rsm_req.index log.info( f"No cache found for node {node} at {service} (AP account {ap_account}), " "using Collection Paging to RSM translation" ) if self.apg._m.isCommentsNode(node): parent_node = self.apg._m.getParentNode(node) try: parent_data = await self.apg.apGet(parent_node) collection = await self.apg.apGetObject( parent_data.get("object", {}), "replies" ) except Exception as e: raise error.StanzaError( "item-not-found", text=e ) else: actor_data = await self.apg.getAPActorDataFromAccount(ap_account) collection = await self.apg.apGetObject(actor_data, "outbox") if not collection: raise error.StanzaError( "item-not-found", text=f"No collection found for node {node!r} (account: {ap_account})" ) return await self.apg.getAPItems(collection, **kwargs) @ensure_deferred async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): raise NotImplementedError @ensure_deferred async def subscribe(self, requestor, service, nodeIdentifier, subscriber): # TODO: handle comments nodes client = self.apg.client node = await self.host.memory.storage.getPubsubNode( client, service, nodeIdentifier, with_subscriptions=True ) if node is None: node = await self.host.memory.storage.setPubsubNode( client, service, nodeIdentifier, ) subscription = None else: try: subscription = next( s for s in node.subscriptions if s.subscriber == requestor.userhostJID() ) except StopIteration: subscription = None if subscription is None: subscription = PubsubSub( subscriber=requestor.userhostJID(), state=SubscriptionState.PENDING ) node.subscriptions.append(subscription) await self.host.memory.storage.add(node) else: if subscription.state is None: subscription.state = SubscriptionState.PENDING await self.host.memory.storage.add(node) elif subscription.state == SubscriptionState.SUBSCRIBED: log.info( f"{requestor.userhostJID()} has already a subscription to {node!r} " f"at {service}. Doing the request anyway." ) elif subscription.state == SubscriptionState.PENDING: log.info( f"{requestor.userhostJID()} has already a pending subscription to " f"{node!r} at {service}. Doing the request anyway." ) else: raise exceptions.InternalError( f"unmanaged subscription state: {subscription.state}" ) req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( requestor, service ) data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) resp = await self.apg.signAndPost(inbox, req_actor_id, data) if resp.code >= 400: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") @ensure_deferred async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( requestor, service ) data = self.apg.createActivity( "Undo", req_actor_id, self.apg.createActivity( "Follow", req_actor_id, recip_actor_id ) ) resp = await self.apg.signAndPost(inbox, req_actor_id, data) if resp.code >= 400: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) def getConfigurationOptions(self): return NODE_OPTIONS def getConfiguration( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str ) -> defer.Deferred: return defer.succeed(NODE_CONFIG_VALUES) def getNodeInfo( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str, pep: bool = False, recipient: Optional[jid.JID] = None ) -> Optional[dict]: if not nodeIdentifier: return None info = { "type": "leaf", "meta-data": NODE_CONFIG } return info