Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3745:a8c7e5cef0cb
comp AP gateway: signature checking, caching and threads management:
- HTTP signature is checked for incoming messages
- AP actor can now be followed using pubsub subscription. When following is accepted, the
node is cached
- replies to posts are put in cached pubsub comment nodes, with a `comments_max_depth`
option to limit the number of comment nodes for a root message (documentation will come
to explain this).
ticket 364
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 22 Mar 2022 17:00:42 +0100 |
parents | 86eea17cafa7 |
children | 125c7043b277 |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Tue Mar 22 17:00:42 2022 +0100 +++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Tue Mar 22 17:00:42 2022 +0100 @@ -16,19 +16,41 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import Optional, List +from typing import Optional, Tuple, List, Dict, Any +from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish -from wokkel import rsm +from wokkel import rsm, pubsub, data_form from sat.core.i18n import _ +from sat.core import exceptions from sat.core.log import getLogger +from sat.core.constants import Const as C from sat.tools.utils import ensure_deferred +from sat.memory.sqla_mapping import PubsubSub, SubscriptionState + +from .constants import ( + TYPE_ACTOR, +) log = getLogger(__name__) +# all nodes have the same config +NODE_CONFIG = [ + {"var": "pubsub#persist_items", "type": "boolean", "value": True}, + {"var": "pubsub#max_items", "value": "max"}, + {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, + {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, + +] + +NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} +NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} +for c in NODE_CONFIG: + NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")}) + class APPubsubService(rsm.PubSubService): """Pubsub service for XMPP requests""" @@ -43,6 +65,31 @@ "name": "Libervia ActivityPub Gateway", } + async def getAPActorIdsAndInbox( + self, + requestor: jid.JID, + recipient: jid.JID, + ) -> Tuple[str, str, str]: + """Get AP actor IDs from requestor and destinee JIDs + + @param requestor: XMPP entity doing a request to an AP actor via the gateway + @param recipient: JID mapping an AP actor via the gateway + @return: requestor actor ID, recipient actor ID and recipient inbox + @raise error.StanzaError: "item-not-found" is raised if not user part is specified + in requestor + """ + if not recipient.user: + raise error.StanzaError( + "item-not-found", + text="No user part specified" + ) + requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost()) + recipient_account = self.apg._e.unescape(recipient.user) + recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account) + inbox = await self.apg.getAPInboxFromId(recipient_actor_id) + return requestor_actor_id, recipient_actor_id, inbox + + @ensure_deferred async def publish(self, requestor, service, nodeIdentifier, items): raise NotImplementedError @@ -56,55 +103,193 @@ maxItems: Optional[int], itemIdentifiers: Optional[List[str]], rsm_req: Optional[rsm.RSMRequest] - ) -> List[domish.Element]: + ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: if not service.user: - return [] + return [], None ap_account = self.host.plugins["XEP-0106"].unescape(service.user) if ap_account.count("@") != 1: log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") - return [] - if node != self.apg._m.namespace: + return [], None + if not node.startswith(self.apg._m.namespace): raise error.StanzaError( "feature-not-implemented", - text=f"{VERSION} only supports {self.apg._m.namespace} " + text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} " "node for now" ) - if rsm_req is None: - if maxItems is None: - maxItems = 20 - kwargs = { - "max_items": maxItems, - "chronological_pagination": False, - } + client = self.apg.client + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node + ) + # TODO: check if node is synchronised + if cached_node is not None: + # the node is cached, we return items from cache + log.debug(f"node {node!r} from {service} is in cache") + pubsub_items, metadata = await self.apg._c.getItemsFromCache( + client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req + ) + try: + rsm_resp = rsm.RSMResponse(**metadata["rsm"]) + except KeyError: + rsm_resp = None + return [i.data for i in pubsub_items], rsm_resp + + if itemIdentifiers: + items = [] + for item_id in itemIdentifiers: + item_data = await self.apg.apGet(item_id) + item_elt = await self.apg.apItem2Elt(item_data) + items.append(item_elt) + return items, None else: - if len( - [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) - if v is not None] - ) > 1: + if rsm_req is None: + if maxItems is None: + maxItems = 20 + kwargs = { + "max_items": maxItems, + "chronological_pagination": False, + } + else: + if len( + [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) + if v is not None] + ) > 1: + raise error.StanzaError( + "bad-request", + text="You can't use after, before and index at the same time" + ) + kwargs = {"max_items": rsm_req.max} + if rsm_req.after is not None: + kwargs["after_id"] = rsm_req.after + elif rsm_req.before is not None: + kwargs["chronological_pagination"] = False + if rsm_req.before != "": + kwargs["after_id"] = rsm_req.before + elif rsm_req.index is not None: + kwargs["start_index"] = rsm_req.index + + log.info( + f"No cache found for node {node} at {service} (AP account {ap_account}), " + "using Collection Paging to RSM translation" + ) + if self.apg._m.isCommentsNode(node): + parent_node = self.apg._m.getParentNode(node) + try: + parent_data = await self.apg.apGet(parent_node) + collection = await self.apg.apGetObject( + parent_data.get("object", {}), + "replies" + ) + except Exception as e: + raise error.StanzaError( + "item-not-found", + text=e + ) + else: + actor_data = await self.apg.getAPActorDataFromAccount(ap_account) + collection = await self.apg.apGetObject(actor_data, "outbox") + if not collection: raise error.StanzaError( - "bad-request", - text="You can't use after, before and index at the same time" + "item-not-found", + text=f"No collection found for node {node!r} (account: {ap_account})" ) - kwargs = {"max_items": rsm_req.max} - if rsm_req.after is not None: - kwargs["after_id"] = rsm_req.after - elif rsm_req.before is not None: - kwargs["chronological_pagination"] = False - if rsm_req.before != "": - kwargs["after_id"] = rsm_req.before - elif rsm_req.index is not None: - kwargs["start_index"] = rsm_req.index - - log.info( - f"No cache found for node {node} at {service} (AP account {ap_account}), " - "using Collection Paging to RSM translation" - ) - return await self.apg.getAPItems(ap_account, **kwargs) + return await self.apg.getAPItems(collection, **kwargs) @ensure_deferred async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): raise NotImplementedError + @ensure_deferred + async def subscribe(self, requestor, service, nodeIdentifier, subscriber): + # TODO: handle comments nodes + client = self.apg.client + node = await self.host.memory.storage.getPubsubNode( + client, service, nodeIdentifier, with_subscriptions=True + ) + if node is None: + node = await self.host.memory.storage.setPubsubNode( + client, + service, + nodeIdentifier, + ) + subscription = None + else: + try: + subscription = next( + s for s in node.subscriptions + if s.subscriber == requestor.userhostJID() + ) + except StopIteration: + subscription = None + + if subscription is None: + subscription = PubsubSub( + subscriber=requestor.userhostJID(), + state=SubscriptionState.PENDING + ) + node.subscriptions.append(subscription) + await self.host.memory.storage.add(node) + else: + if subscription.state is None: + subscription.state = SubscriptionState.PENDING + await self.host.memory.storage.add(node) + elif subscription.state == SubscriptionState.SUBSCRIBED: + log.info( + f"{requestor.userhostJID()} has already a subscription to {node!r} " + f"at {service}. Doing the request anyway." + ) + elif subscription.state == SubscriptionState.PENDING: + log.info( + f"{requestor.userhostJID()} has already a pending subscription to " + f"{node!r} at {service}. Doing the request anyway." + ) + else: + raise exceptions.InternalError( + f"unmanaged subscription state: {subscription.state}" + ) + + req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( + requestor, service + ) + + data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) + + resp = await self.apg.signAndPost(inbox, req_actor_id, data) + if resp.code >= 400: + text = await resp.text() + raise error.StanzaError("service-unavailable", text=text) + return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") + + @ensure_deferred + async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): + req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( + requestor, service + ) + data = self.apg.createActivity( + "Undo", + req_actor_id, + self.apg.createActivity( + "Follow", + req_actor_id, + recip_actor_id + ) + ) + + resp = await self.apg.signAndPost(inbox, req_actor_id, data) + if resp.code >= 400: + text = await resp.text() + raise error.StanzaError("service-unavailable", text=text) + + def getConfigurationOptions(self): + return NODE_OPTIONS + + def getConfiguration( + self, + requestor: jid.JID, + service: jid.JID, + nodeIdentifier: str + ) -> defer.Deferred: + return defer.succeed(NODE_CONFIG_VALUES) + def getNodeInfo( self, requestor: jid.JID, @@ -117,13 +302,6 @@ return None info = { "type": "leaf", - "meta-data": [ - {"var": "pubsub#persist_items", "type": "boolean", "value": True}, - {"var": "pubsub#max_items", "value": "max"}, - {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, - {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, - - ] - + "meta-data": NODE_CONFIG } return info