Mercurial > libervia-backend
view sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3808:39fc2e1b3793
tests (unit/ap gateway): fix `onMessage` call following change in the component:
the component now uses the normal message workflow, thus the tests had to be updated
rel 367
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 17 Jun 2022 15:50:34 +0200 |
parents | 865167c34b82 |
children | 6329ee6b6df4 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Tuple, List, Union from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish from wokkel import rsm, pubsub, disco from sat.core.i18n import _ from sat.core import exceptions from sat.core.log import getLogger from sat.core.constants import Const as C from sat.tools.utils import ensure_deferred from sat.memory.sqla_mapping import PubsubSub, SubscriptionState from .constants import ( TYPE_ACTOR, ) log = getLogger(__name__) # all nodes have the same config NODE_CONFIG = [ {"var": "pubsub#persist_items", "type": "boolean", "value": True}, {"var": "pubsub#max_items", "value": "max"}, {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, ] NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} for c in NODE_CONFIG: NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")}) class APPubsubService(rsm.PubSubService): """Pubsub service for XMPP requests""" def __init__(self, apg): super(APPubsubService, self).__init__() self.host = apg.host self.apg = apg self.discoIdentity = { "category": "pubsub", "type": "service", "name": "Libervia ActivityPub Gateway", } async def getAPActorIdsAndInbox( self, requestor: jid.JID, recipient: jid.JID, ) -> Tuple[str, str, str]: """Get AP actor IDs from requestor and destinee JIDs @param requestor: XMPP entity doing a request to an AP actor via the gateway @param recipient: JID mapping an AP actor via the gateway @return: requestor actor ID, recipient actor ID and recipient inbox @raise error.StanzaError: "item-not-found" is raised if not user part is specified in requestor """ if not recipient.user: raise error.StanzaError( "item-not-found", text="No user part specified" ) requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost()) recipient_account = self.apg._e.unescape(recipient.user) recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account) inbox = await self.apg.getAPInboxFromId(recipient_actor_id) return requestor_actor_id, recipient_actor_id, inbox @ensure_deferred async def publish(self, requestor, service, nodeIdentifier, items): if self.apg.local_only and not self.apg.isLocal(requestor): raise error.StanzaError( "forbidden", "Only local users can publish on this gateway." ) if not service.user: raise error.StanzaError( "bad-request", "You must specify an ActivityPub actor account in JID user part." ) ap_account = self.apg._e.unescape(service.user) if ap_account.count("@") != 1: raise error.StanzaError( "bad-request", f"{ap_account!r} is not a valid ActivityPub actor account." ) client = self.apg.client.getVirtualClient(requestor) await self.apg.convertAndPostItems( client, ap_account, service, nodeIdentifier, items ) async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: """Convert actor ID from following collection to XMPP item""" actor_id = ap_item["id"] actor_jid = await self.apg.getJIDFromId(actor_id) subscription_elt = self.apg._pps.buildSubscriptionElt( self.apg._m.namespace, actor_jid ) item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) return item_elt async def apFollower2Elt(self, ap_item: dict) -> domish.Element: """Convert actor ID from followers collection to XMPP item""" actor_id = ap_item["id"] actor_jid = await self.apg.getJIDFromId(actor_id) subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid) item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) return item_elt @ensure_deferred async def items( self, requestor: jid.JID, service: jid.JID, node: str, maxItems: Optional[int], itemIdentifiers: Optional[List[str]], rsm_req: Optional[rsm.RSMRequest] ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: if not service.user: return [], None ap_account = self.host.plugins["XEP-0106"].unescape(service.user) if ap_account.count("@") != 1: log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") return [], None kwargs = {} if node == self.apg._pps.subscriptions_node: collection_name = "following" parser = self.apFollowing2Elt kwargs["only_ids"] = True use_cache = False elif node.startswith(self.apg._pps.subscribers_node_prefix): collection_name = "followers" parser = self.apFollower2Elt kwargs["only_ids"] = True use_cache = False else: if not node.startswith(self.apg._m.namespace): raise error.StanzaError( "feature-not-implemented", text=f"AP Gateway {C.APP_VERSION} only supports " f"{self.apg._m.namespace} node for now" ) collection_name = "outbox" parser = self.apg.apItem2Elt use_cache = True client = self.apg.client if use_cache: cached_node = await self.host.memory.storage.getPubsubNode( client, service, node ) # TODO: check if node is synchronised if cached_node is not None: # the node is cached, we return items from cache log.debug(f"node {node!r} from {service} is in cache") pubsub_items, metadata = await self.apg._c.getItemsFromCache( client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req ) try: rsm_resp = rsm.RSMResponse(**metadata["rsm"]) except KeyError: rsm_resp = None return [i.data for i in pubsub_items], rsm_resp if itemIdentifiers: items = [] for item_id in itemIdentifiers: item_data = await self.apg.apGet(item_id) item_elt = await parser(item_data) items.append(item_elt) return items, None else: if rsm_req is None: if maxItems is None: maxItems = 20 kwargs.update({ "max_items": maxItems, "chronological_pagination": False, }) else: if len( [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) if v is not None] ) > 1: raise error.StanzaError( "bad-request", text="You can't use after, before and index at the same time" ) kwargs.update({"max_items": rsm_req.max}) if rsm_req.after is not None: kwargs["after_id"] = rsm_req.after elif rsm_req.before is not None: kwargs["chronological_pagination"] = False if rsm_req.before != "": kwargs["after_id"] = rsm_req.before elif rsm_req.index is not None: kwargs["start_index"] = rsm_req.index log.info( f"No cache found for node {node} at {service} (AP account {ap_account}), " "using Collection Paging to RSM translation" ) if self.apg._m.isCommentNode(node): parent_item = self.apg._m.getParentItem(node) try: parent_data = await self.apg.apGet(parent_item) collection = await self.apg.apGetObject( parent_data.get("object", {}), "replies" ) except Exception as e: raise error.StanzaError( "item-not-found", text=e ) else: actor_data = await self.apg.getAPActorDataFromAccount(ap_account) collection = await self.apg.apGetObject(actor_data, collection_name) if not collection: raise error.StanzaError( "item-not-found", text=f"No collection found for node {node!r} (account: {ap_account})" ) kwargs["parser"] = parser return await self.apg.getAPItems(collection, **kwargs) @ensure_deferred async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): raise error.StanzaError("forbidden") @ensure_deferred async def subscribe(self, requestor, service, nodeIdentifier, subscriber): # TODO: handle comments nodes client = self.apg.client node = await self.host.memory.storage.getPubsubNode( client, service, nodeIdentifier, with_subscriptions=True ) if node is None: node = await self.host.memory.storage.setPubsubNode( client, service, nodeIdentifier, ) subscription = None else: try: subscription = next( s for s in node.subscriptions if s.subscriber == requestor.userhostJID() ) except StopIteration: subscription = None if subscription is None: subscription = PubsubSub( subscriber=requestor.userhostJID(), state=SubscriptionState.PENDING ) node.subscriptions.append(subscription) await self.host.memory.storage.add(node) else: if subscription.state is None: subscription.state = SubscriptionState.PENDING await self.host.memory.storage.add(node) elif subscription.state == SubscriptionState.SUBSCRIBED: log.info( f"{requestor.userhostJID()} has already a subscription to {node!r} " f"at {service}. Doing the request anyway." ) elif subscription.state == SubscriptionState.PENDING: log.info( f"{requestor.userhostJID()} has already a pending subscription to " f"{node!r} at {service}. Doing the request anyway." ) else: raise exceptions.InternalError( f"unmanaged subscription state: {subscription.state}" ) req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( requestor, service ) data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) resp = await self.apg.signAndPost(inbox, req_actor_id, data) if resp.code >= 400: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") @ensure_deferred async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( requestor, service ) data = self.apg.createActivity( "Undo", req_actor_id, self.apg.createActivity( "Follow", req_actor_id, recip_actor_id ) ) resp = await self.apg.signAndPost(inbox, req_actor_id, data) if resp.code >= 400: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) def getConfigurationOptions(self): return NODE_OPTIONS def getConfiguration( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str ) -> defer.Deferred: return defer.succeed(NODE_CONFIG_VALUES) def getNodeInfo( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str, pep: bool = False, recipient: Optional[jid.JID] = None ) -> Optional[dict]: if not nodeIdentifier: return None info = { "type": "leaf", "meta-data": NODE_CONFIG } return info