Mercurial > libervia-backend
view sat/plugins/plugin_comp_ap_gateway/pubsub_service.py @ 3947:08c1d5485411
cli (pubsub/set, edit): add `--encrypt` flag:
rel 380
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 15 Oct 2022 20:38:33 +0200 |
parents | 0aa7023dcd08 |
children | 78b5f356900c |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia ActivityPub Gateway # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional, Tuple, List, Dict, Any, Union from urllib.parse import urlparse from pathlib import Path from base64 import b64encode import tempfile from twisted.internet import defer, threads from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish from wokkel import rsm, pubsub, disco from sat.core.i18n import _ from sat.core import exceptions from sat.core.core_types import SatXMPPEntity from sat.core.log import getLogger from sat.core.constants import Const as C from sat.tools import image from sat.tools.utils import ensure_deferred from sat.tools.web import downloadFile from sat.memory.sqla_mapping import PubsubSub, SubscriptionState from .constants import ( TYPE_ACTOR, ST_AVATAR, MAX_AVATAR_SIZE ) log = getLogger(__name__) # all nodes have the same config NODE_CONFIG = [ {"var": "pubsub#persist_items", "type": "boolean", "value": True}, {"var": "pubsub#max_items", "value": "max"}, {"var": "pubsub#access_model", "type": "list-single", "value": "open"}, {"var": "pubsub#publish_model", "type": "list-single", "value": "open"}, ] NODE_CONFIG_VALUES = {c["var"]: c["value"] for c in NODE_CONFIG} NODE_OPTIONS = {c["var"]: {} for c in NODE_CONFIG} for c in NODE_CONFIG: NODE_OPTIONS[c["var"]].update({k:v for k,v in c.items() if k not in ("var", "value")}) class APPubsubService(rsm.PubSubService): """Pubsub service for XMPP requests""" def __init__(self, apg): super(APPubsubService, self).__init__() self.host = apg.host self.apg = apg self.discoIdentity = { "category": "pubsub", "type": "service", "name": "Libervia ActivityPub Gateway", } async def getAPActorIdsAndInbox( self, requestor: jid.JID, recipient: jid.JID, ) -> Tuple[str, str, str]: """Get AP actor IDs from requestor and destinee JIDs @param requestor: XMPP entity doing a request to an AP actor via the gateway @param recipient: JID mapping an AP actor via the gateway @return: requestor actor ID, recipient actor ID and recipient inbox @raise error.StanzaError: "item-not-found" is raised if not user part is specified in requestor """ if not recipient.user: raise error.StanzaError( "item-not-found", text="No user part specified" ) requestor_actor_id = self.apg.buildAPURL(TYPE_ACTOR, requestor.userhost()) recipient_account = self.apg._e.unescape(recipient.user) recipient_actor_id = await self.apg.getAPActorIdFromAccount(recipient_account) inbox = await self.apg.getAPInboxFromId(recipient_actor_id, use_shared=False) return requestor_actor_id, recipient_actor_id, inbox @ensure_deferred async def publish(self, requestor, service, nodeIdentifier, items): if self.apg.local_only and not self.apg.isLocal(requestor): raise error.StanzaError( "forbidden", "Only local users can publish on this gateway." ) if not service.user: raise error.StanzaError( "bad-request", "You must specify an ActivityPub actor account in JID user part." ) ap_account = self.apg._e.unescape(service.user) if ap_account.count("@") != 1: raise error.StanzaError( "bad-request", f"{ap_account!r} is not a valid ActivityPub actor account." ) client = self.apg.client.getVirtualClient(requestor) if self.apg._pa.isAttachmentNode(nodeIdentifier): await self.apg.convertAndPostAttachments( client, ap_account, service, nodeIdentifier, items, publisher=requestor ) else: await self.apg.convertAndPostItems( client, ap_account, service, nodeIdentifier, items ) cached_node = await self.host.memory.storage.getPubsubNode( client, service, nodeIdentifier, with_subscriptions=True, create=True ) await self.host.memory.storage.cachePubsubItems( client, cached_node, items ) for subscription in cached_node.subscriptions: if subscription.state != SubscriptionState.SUBSCRIBED: continue self.notifyPublish( service, nodeIdentifier, [(subscription.subscriber, None, items)] ) async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: """Convert actor ID from following collection to XMPP item""" actor_id = ap_item["id"] actor_jid = await self.apg.getJIDFromId(actor_id) subscription_elt = self.apg._pps.buildSubscriptionElt( self.apg._m.namespace, actor_jid ) item_elt = pubsub.Item(id=actor_id, payload=subscription_elt) return item_elt async def apFollower2Elt(self, ap_item: dict) -> domish.Element: """Convert actor ID from followers collection to XMPP item""" actor_id = ap_item["id"] actor_jid = await self.apg.getJIDFromId(actor_id) subscriber_elt = self.apg._pps.buildSubscriberElt(actor_jid) item_elt = pubsub.Item(id=actor_id, payload=subscriber_elt) return item_elt async def generateVCard(self, ap_account: str) -> domish.Element: """Generate vCard4 (XEP-0292) item element from ap_account's metadata""" actor_data = await self.apg.getAPActorDataFromAccount(ap_account) identity_data = {} summary = actor_data.get("summary") # summary is HTML, we have to convert it to text if summary: identity_data["description"] = await self.apg._t.convert( summary, self.apg._t.SYNTAX_XHTML, self.apg._t.SYNTAX_TEXT, False, ) for field in ("name", "preferredUsername"): value = actor_data.get(field) if value: identity_data.setdefault("nicknames", []).append(value) vcard_elt = self.apg._v.dict2VCard(identity_data) item_elt = domish.Element((pubsub.NS_PUBSUB, "item")) item_elt.addChild(vcard_elt) item_elt["id"] = self.apg._p.ID_SINGLETON return item_elt async def getAvatarData( self, client: SatXMPPEntity, ap_account: str ) -> Dict[str, Any]: """Retrieve actor's avatar if any, cache it and file actor_data ``cache_uid``, `path``` and ``media_type`` keys are always files ``base64`` key is only filled if the file was not already in cache """ actor_data = await self.apg.getAPActorDataFromAccount(ap_account) for icon in await self.apg.apGetList(actor_data, "icon"): url = icon.get("url") if icon["type"] != "Image" or not url: continue parsed_url = urlparse(url) if not parsed_url.scheme in ("http", "https"): log.warning(f"unexpected URL scheme: {url!r}") continue filename = Path(parsed_url.path).name if not filename: log.warning(f"ignoring URL with invald path: {url!r}") continue break else: raise error.StanzaError("item-not-found") key = f"{ST_AVATAR}{url}" cache_uid = await client._ap_storage.get(key) if cache_uid is None: cache = None else: cache = self.apg.host.common_cache.getMetadata(cache_uid) if cache is None: with tempfile.TemporaryDirectory() as dir_name: dest_path = Path(dir_name, filename) await downloadFile(url, dest_path, max_size=MAX_AVATAR_SIZE) avatar_data = { "path": dest_path, "filename": filename, 'media_type': image.guess_type(dest_path), } await self.apg._i.cacheAvatar( self.apg.IMPORT_NAME, avatar_data ) else: avatar_data = { "cache_uid": cache["uid"], "path": cache["path"], "media_type": cache["mime_type"] } return avatar_data async def generateAvatarMetadata( self, client: SatXMPPEntity, ap_account: str ) -> domish.Element: """Generate the metadata element for user avatar @raise StanzaError("item-not-found"): no avatar is present in actor data (in ``icon`` field) """ avatar_data = await self.getAvatarData(client, ap_account) return self.apg._a.buildItemMetadataElt(avatar_data) def _blockingB64EncodeAvatar(self, avatar_data: Dict[str, Any]) -> None: with avatar_data["path"].open("rb") as f: avatar_data["base64"] = b64encode(f.read()).decode() async def generateAvatarData( self, client: SatXMPPEntity, ap_account: str, itemIdentifiers: Optional[List[str]], ) -> domish.Element: """Generate the data element for user avatar @raise StanzaError("item-not-found"): no avatar cached with requested ID """ if not itemIdentifiers: avatar_data = await self.getAvatarData(client, ap_account) if "base64" not in avatar_data: await threads.deferToThread(self._blockingB64EncodeAvatar, avatar_data) else: if len(itemIdentifiers) > 1: # only a single item ID is supported raise error.StanzaError("item-not-found") item_id = itemIdentifiers[0] # just to be sure that that we don't have an empty string assert item_id cache_data = self.apg.host.common_cache.getMetadata(item_id) if cache_data is None: raise error.StanzaError("item-not-found") avatar_data = { "cache_uid": item_id, "path": cache_data["path"] } await threads.deferToThread(self._blockingB64EncodeAvatar, avatar_data) return self.apg._a.buildItemDataElt(avatar_data) @ensure_deferred async def items( self, requestor: jid.JID, service: jid.JID, node: str, maxItems: Optional[int], itemIdentifiers: Optional[List[str]], rsm_req: Optional[rsm.RSMRequest] ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]: if not service.user: return [], None ap_account = self.host.plugins["XEP-0106"].unescape(service.user) if ap_account.count("@") != 1: log.warning(f"Invalid AP account used by {requestor}: {ap_account!r}") return [], None # cached_node may be pre-filled with some nodes (e.g. attachments nodes), # otherwise it is filled when suitable cached_node = None client = self.apg.client kwargs = {} if node == self.apg._pps.subscriptions_node: collection_name = "following" parser = self.apFollowing2Elt kwargs["only_ids"] = True use_cache = False elif node.startswith(self.apg._pps.subscribers_node_prefix): collection_name = "followers" parser = self.apFollower2Elt kwargs["only_ids"] = True use_cache = False elif node == self.apg._v.node: # vCard4 request item_elt = await self.generateVCard(ap_account) return [item_elt], None elif node == self.apg._a.namespace_metadata: item_elt = await self.generateAvatarMetadata(self.apg.client, ap_account) return [item_elt], None elif node == self.apg._a.namespace_data: item_elt = await self.generateAvatarData( self.apg.client, ap_account, itemIdentifiers ) return [item_elt], None elif self.apg._pa.isAttachmentNode(node): use_cache = True # we check cache here because we emit an item-not-found error if the node is # not in cache, as we are not dealing with real AP items cached_node = await self.host.memory.storage.getPubsubNode( client, service, node ) if cached_node is None: raise error.StanzaError("item-not-found") else: if node.startswith(self.apg._m.namespace): parser = self.apg.ap_item_2_mb_elt elif node.startswith(self.apg._events.namespace): parser = self.apg.ap_events.ap_item_2_event_elt else: raise error.StanzaError( "feature-not-implemented", text=f"AP Gateway {C.APP_VERSION} only supports " f"{self.apg._m.namespace} node for now" ) collection_name = "outbox" use_cache = True if use_cache: if cached_node is None: cached_node = await self.host.memory.storage.getPubsubNode( client, service, node ) # TODO: check if node is synchronised if cached_node is not None: # the node is cached, we return items from cache log.debug(f"node {node!r} from {service} is in cache") pubsub_items, metadata = await self.apg._c.getItemsFromCache( client, cached_node, maxItems, itemIdentifiers, rsm_request=rsm_req ) try: rsm_resp = rsm.RSMResponse(**metadata["rsm"]) except KeyError: rsm_resp = None return [i.data for i in pubsub_items], rsm_resp if itemIdentifiers: items = [] for item_id in itemIdentifiers: item_data = await self.apg.apGet(item_id) item_elt = await parser(item_data) items.append(item_elt) return items, None else: if rsm_req is None: if maxItems is None: maxItems = 20 kwargs.update({ "max_items": maxItems, "chronological_pagination": False, }) else: if len( [v for v in (rsm_req.after, rsm_req.before, rsm_req.index) if v is not None] ) > 1: raise error.StanzaError( "bad-request", text="You can't use after, before and index at the same time" ) kwargs.update({"max_items": rsm_req.max}) if rsm_req.after is not None: kwargs["after_id"] = rsm_req.after elif rsm_req.before is not None: kwargs["chronological_pagination"] = False if rsm_req.before != "": kwargs["after_id"] = rsm_req.before elif rsm_req.index is not None: kwargs["start_index"] = rsm_req.index log.info( f"No cache found for node {node} at {service} (AP account {ap_account}), " "using Collection Paging to RSM translation" ) if self.apg._m.isCommentNode(node): parent_item = self.apg._m.getParentItem(node) try: parent_data = await self.apg.apGet(parent_item) collection = await self.apg.apGetObject( parent_data.get("object", {}), "replies" ) except Exception as e: raise error.StanzaError( "item-not-found", text=e ) else: actor_data = await self.apg.getAPActorDataFromAccount(ap_account) collection = await self.apg.apGetObject(actor_data, collection_name) if not collection: raise error.StanzaError( "item-not-found", text=f"No collection found for node {node!r} (account: {ap_account})" ) kwargs["parser"] = parser return await self.apg.getAPItems(collection, **kwargs) @ensure_deferred async def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): raise error.StanzaError("forbidden") @ensure_deferred async def subscribe(self, requestor, service, nodeIdentifier, subscriber): # TODO: handle comments nodes client = self.apg.client # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow # is accepted. Other nodes are directly set to subscribed, their subscriptions # being internal. if nodeIdentifier == self.apg._m.namespace: sub_state = SubscriptionState.PENDING else: sub_state = SubscriptionState.SUBSCRIBED node = await self.host.memory.storage.getPubsubNode( client, service, nodeIdentifier, with_subscriptions=True ) if node is None: node = await self.host.memory.storage.setPubsubNode( client, service, nodeIdentifier, ) subscription = None else: try: subscription = next( s for s in node.subscriptions if s.subscriber == requestor.userhostJID() ) except StopIteration: subscription = None if subscription is None: subscription = PubsubSub( subscriber=requestor.userhostJID(), state=sub_state ) node.subscriptions.append(subscription) await self.host.memory.storage.add(node) else: if subscription.state is None: subscription.state = sub_state await self.host.memory.storage.add(node) elif subscription.state == SubscriptionState.SUBSCRIBED: log.info( f"{requestor.userhostJID()} has already a subscription to {node!r} " f"at {service}. Doing the request anyway." ) elif subscription.state == SubscriptionState.PENDING: log.info( f"{requestor.userhostJID()} has already a pending subscription to " f"{node!r} at {service}. Doing the request anyway." ) if sub_state != SubscriptionState.PENDING: subscription.state = sub_state await self.host.memory.storage.add(node) else: raise exceptions.InternalError( f"unmanaged subscription state: {subscription.state}" ) if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): # if we subscribe to microblog or events node, we follow the corresponding # account req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( requestor, service ) data = self.apg.createActivity("Follow", req_actor_id, recip_actor_id) resp = await self.apg.signAndPost(inbox, req_actor_id, data) if resp.code >= 300: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") @ensure_deferred async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( requestor, service ) data = self.apg.createActivity( "Undo", req_actor_id, self.apg.createActivity( "Follow", req_actor_id, recip_actor_id ) ) resp = await self.apg.signAndPost(inbox, req_actor_id, data) if resp.code >= 300: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) def getConfigurationOptions(self): return NODE_OPTIONS def getConfiguration( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str ) -> defer.Deferred: return defer.succeed(NODE_CONFIG_VALUES) def getNodeInfo( self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str, pep: bool = False, recipient: Optional[jid.JID] = None ) -> Optional[dict]: if not nodeIdentifier: return None info = { "type": "leaf", "meta-data": NODE_CONFIG } return info