Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3764:125c7043b277
comp AP gateway: publish, (un)subscribe/(un)follow, public subscription/following/followers:
this patch implements those major features:
- `publish` is implemented on virtual pubsub service, thus XMPP entities can now publish
to AP using this service
- replies to XMPP items are managed
- `inReplyTo` is filled when converting XMPP items to AP objects
- `follow` and `unfollow` (actually an `undo` activity) are implemented and mapped to
XMPP's (un)subscribe. On subscription, AP actor's `outbox` collection is converted to
XMPP and put in cache. Subscriptions are always public.
- `following` and `followers` collections are mapped to XMPP's Public Pubsub Subscription
(which should be XEP-0465, but the XEP is not yet published at the time of commit), in
both directions.
- new helper methods to check if an URL is local and to get JID from actor ID
doc will follow to explain behaviour
rel 365
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 13 May 2022 19:12:33 +0200 |
parents | a8c7e5cef0cb |
children | 865167c34b82 |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Fri May 13 18:50:33 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Fri May 13 19:12:33 2022 +0200 @@ -16,12 +16,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from typing import Optional, Tuple, List, Dict, Any +from typing import Optional, Tuple, List, Union from twisted.internet import defer from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish -from wokkel import rsm, pubsub, data_form +from wokkel import rsm, pubsub, disco from sat.core.i18n import _ from sat.core import exceptions @@ -92,7 +92,45 @@ @ensure_deferred async def publish(self, requestor, service, nodeIdentifier, items): - raise NotImplementedError + if self.apg.local_only and not self.apg.isLocal(requestor): + raise error.StanzaError( + "forbidden", + "Only local users can publish on this gateway." + ) + if not service.user: + raise error.StanzaError( + "bad-request", + "You must specify an ActivityPub actor account in JID user part." + ) + ap_account = self.apg._e.unescape(service.user) + if ap_account.count("@") != 1: + raise error.StanzaError( + "bad-request", + f"{ap_account!r} is not a valid ActivityPub actor account." + ) + + client = self.apg.client.getVirtualClient(requestor) + await self.apg.convertAndPostItems( + client, ap_account, service, nodeIdentifier, items + ) + + async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: + """Convert actor ID from following collection to XMPP item""" + actor_id = ap_item["id"] + actor_jid = await self.apg.getJIDFromId(actor_id) + subscription_elt = self.apg._pps.buildSubscriptionElt( + self.apg._m.namespace, actor_jid + ) + item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) + return item_elt + + async def apFollower2Elt(self, ap_item: dict) -> domish.Element: + """Convert actor ID from followers collection to XMPP item""" + actor_id = ap_item["id"] + actor_jid = await self.apg.getJIDFromId(actor_id) + subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid) + item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) + return item_elt @ensure_deferred async def items( @@ -110,44 +148,63 @@ if ap_account.count("@") != 1: log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") return [], None - if not node.startswith(self.apg._m.namespace): - raise error.StanzaError( - "feature-not-implemented", - text=f"AP Gateway {C.APP_VERSION} only supports {self.apg._m.namespace} " - "node for now" - ) + + kwargs = {} + + if node == self.apg._pps.subscriptions_node: + collection_name = "following" + parser = self.apFollowing2Elt + kwargs["only_ids"] = True + use_cache = False + elif node.startswith(self.apg._pps.subscribers_node_prefix): + collection_name = "followers" + parser = self.apFollower2Elt + kwargs["only_ids"] = True + use_cache = False + else: + if not node.startswith(self.apg._m.namespace): + raise error.StanzaError( + "feature-not-implemented", + text=f"AP Gateway {C.APP_VERSION} only supports " + f"{self.apg._m.namespace} node for now" + ) + collection_name = "outbox" + parser = self.apg.apItem2Elt + use_cache = True + client = self.apg.client - cached_node = await self.host.memory.storage.getPubsubNode( - client, service, node - ) - # TODO: check if node is synchronised - if cached_node is not None: - # the node is cached, we return items from cache - log.debug(f"node {node!r} from {service} is in cache") - pubsub_items, metadata = await self.apg._c.getItemsFromCache( - client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req + if use_cache: + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, node ) - try: - rsm_resp = rsm.RSMResponse(**metadata["rsm"]) - except KeyError: - rsm_resp = None - return [i.data for i in pubsub_items], rsm_resp + # TODO: check if node is synchronised + if cached_node is not None: + # the node is cached, we return items from cache + log.debug(f"node {node!r} from {service} is in cache") + pubsub_items, metadata = await self.apg._c.getItemsFromCache( + client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req + ) + try: + rsm_resp = rsm.RSMResponse(**metadata["rsm"]) + except KeyError: + rsm_resp = None + return [i.data for i in pubsub_items], rsm_resp if itemIdentifiers: items = [] for item_id in itemIdentifiers: item_data = await self.apg.apGet(item_id) - item_elt = await self.apg.apItem2Elt(item_data) + item_elt = await parser(item_data) items.append(item_elt) return items, None else: if rsm_req is None: if maxItems is None: maxItems = 20 - kwargs = { + kwargs.update({ "max_items": maxItems, "chronological_pagination": False, - } + }) else: if len( [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) @@ -157,7 +214,7 @@ "bad-request", text="You can't use after, before and index at the same time" ) - kwargs = {"max_items": rsm_req.max} + kwargs.update({"max_items": rsm_req.max}) if rsm_req.after is not None: kwargs["after_id"] = rsm_req.after elif rsm_req.before is not None: @@ -171,10 +228,10 @@ f"No cache found for node {node} at {service} (AP account {ap_account}), " "using Collection Paging to RSM translation" ) - if self.apg._m.isCommentsNode(node): - parent_node = self.apg._m.getParentNode(node) + if self.apg._m.isCommentNode(node): + parent_item = self.apg._m.getParentItem(node) try: - parent_data = await self.apg.apGet(parent_node) + parent_data = await self.apg.apGet(parent_item) collection = await self.apg.apGetObject( parent_data.get("object", {}), "replies" @@ -186,12 +243,14 @@ ) else: actor_data = await self.apg.getAPActorDataFromAccount(ap_account) - collection = await self.apg.apGetObject(actor_data, "outbox") + collection = await self.apg.apGetObject(actor_data, collection_name) if not collection: raise error.StanzaError( "item-not-found", text=f"No collection found for node {node!r} (account: {ap_account})" ) + + kwargs["parser"] = parser return await self.apg.getAPItems(collection, **kwargs) @ensure_deferred