Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3784:efc34a89e70b
comp AP gateway: message conversion:
Convert direct AP items to XMPP `<message>` stanzas, and vice versa.
Documentation will follow soon to explain the behaviour.
rel 366
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 24 May 2022 17:57:36 +0200 |
parents | 125c7043b277 |
children | 865167c34b82 |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Tue May 24 17:49:14 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Tue May 24 17:57:36 2022 +0200 @@ -23,7 +23,9 @@ from pathlib import Path from pprint import pformat import re -from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload +from typing import ( + Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload +) from urllib import parse from cryptography.exceptions import InvalidSignature @@ -65,6 +67,9 @@ MEDIA_TYPE_AP, TYPE_ACTOR, TYPE_ITEM, + TYPE_FOLLOWERS, + NS_AP_PUBLIC, + PUBLIC_TUPLE ) from .http_server import HTTPServer from .pubsub_service import APPubsubService @@ -80,7 +85,9 @@ C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], - C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"], + C.PI_DEPENDENCIES: [ + "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES" + ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", C.PI_HANDLER: C.BOOL_TRUE, @@ -109,6 +116,7 @@ self._p.addManagedNode( "", items_cb=self._itemsReceived ) + self._t = host.plugins["TEXT_SYNTAXES"] self.pubsub_service = APPubsubService(self) host.bridge.addMethod( @@ -218,6 +226,9 @@ self.client = client await self.init(client) + def profileConnected(self, client): + client.xmlstream.addObserver("/message/body", self.onMessage) + async def _itemsReceived(self, client, itemsEvent): """Callback called when pubsub items are received @@ -455,6 +466,13 @@ if not node or node == self._m.namespace: node = None + if self.client is None: + raise exceptions.InternalError("Client is not set yet") + + if jid_.host == self.client.jid.userhost(): + # this is an proxy JId to an AP Actor + return self._e.unescape(jid_.user) + if node and not jid_.user and not self.mustEncode(node): is_pubsub = await self.isPubsub(jid_) # when we have a pubsub service, the user part can be used to set the node @@ -682,6 +700,11 @@ return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() @async_lru(maxsize=LRU_MAX_SIZE) + async def getActorData(self, actor_id) -> dict: + """Retrieve actor data with LRU cache""" + return await self.apGet(actor_id) + + @async_lru(maxsize=LRU_MAX_SIZE) async def getActorPubKeyData( self, actor_id: str @@ -692,7 +715,7 @@ @return: key_id, owner and public_key @raise KeyError: publicKey is missing from actor data """ - actor_data = await self.apGet(actor_id) + actor_data = await self.getActorData(actor_id) pub_key_data = actor_data["publicKey"] key_id = pub_key_data["id"] owner = pub_key_data["owner"] @@ -901,7 +924,7 @@ @return: Actor ID (which is an URL) """ if account.count("@") != 1 or "/" in account: - raise ValueError("Invalid account: {account!r}") + raise ValueError(f"Invalid account: {account!r}") host = account.split("@")[1] try: finger_data = await treq.json_content(await treq.get( @@ -909,7 +932,7 @@ f"resource=acct:{parse.quote_plus(account)}", )) except Exception as e: - raise exceptions.DataError(f"Can't get webfinger data: {e}") + raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") for link in finger_data.get("links", []): if ( link.get("type") == "application/activity+json" @@ -935,10 +958,9 @@ href = await self.getAPActorIdFromAccount(account) return await self.apGet(href) - @async_lru(maxsize=LRU_MAX_SIZE) async def getAPInboxFromId(self, actor_id: str) -> str: """Retrieve inbox of an actor_id""" - data = await self.apGet(actor_id) + data = await self.getActorData(actor_id) return data["inbox"] @async_lru(maxsize=LRU_MAX_SIZE) @@ -948,7 +970,7 @@ @param actor_id: AP ID of the actor (URL to the actor data) """ url_parsed = parse.urlparse(actor_id) - actor_data = await self.apGet(actor_id) + actor_data = await self.getActorData(actor_id) username = actor_data.get("preferredUsername") if not username: raise exceptions.DataError( @@ -1269,6 +1291,13 @@ else: mb_data["language"] = language mb_data["content_xhtml"] = content_xhtml + if not mb_data.get("content"): + mb_data["content"] = await self._t.convert( + content_xhtml, + self._t.SYNTAX_XHTML, + self._t.SYNTAX_TEXT, + False, + ) # author if is_activity: @@ -1379,20 +1408,29 @@ TYPE_ITEM, parent_ap_account, parent_item ) - async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: - """Convert Libervia Microblog Data to ActivityPub item""" - try: - node = mb_data["node"] - service = jid.JID(mb_data["service"]) - except KeyError: - # node and service must always be specified when this method is used - raise exceptions.InternalError( - "node or service is missing in mb_data" - ) + async def mbdata2APitem( + self, + client: SatXMPPEntity, + mb_data: dict, + public=True + ) -> dict: + """Convert Libervia Microblog Data to ActivityPub item + + @param mb_data: microblog data (as used in plugin XEP-0277) to convert + If ``public`` is True, ``service`` and ``node`` keys must be set. + If ``published`` is not set, current datetime will be used + @param public: True if the message is not a private/direct one + if True, the AP Item will be marked as public, and AP followers of target AP + account (which retrieve from ``service``) will be put in ``cc``. + ``inReplyTo`` will also be set if suitable + if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). + This is usually used for direct messages. + @return: AP item + """ if not mb_data.get("id"): mb_data["id"] = shortuuid.uuid() if not mb_data.get("author_jid"): - mb_data["author_jid"] = client.jid.full() + mb_data["author_jid"] = client.jid.userhost() ap_account = await self.getAPAccountFromJidAndNode( jid.JID(mb_data["author_jid"]), None @@ -1402,43 +1440,59 @@ ap_object = { "id": url_item, "type": "Note", - "published": utils.xmpp_date(mb_data["published"]), + "published": utils.xmpp_date(mb_data.get("published")), "attributedTo": url_actor, "content": mb_data.get("content_xhtml") or mb_data["content"], - "to": ["https://www.w3.org/ns/activitystreams#Public"], } - target_ap_account = self._e.unescape(service.user) - actor_data = await self.getAPActorDataFromAccount(target_ap_account) - followers = actor_data.get("followers") - if followers: - ap_object["cc"] = [followers] + language = mb_data.get("language") + if language: + ap_object["contentMap"] = {language: ap_object["content"]} + + if public: + ap_object["to"] = [NS_AP_PUBLIC] + try: + node = mb_data["node"] + service = jid.JID(mb_data["service"]) + except KeyError: + # node and service must always be specified when this method is used + raise exceptions.InternalError( + "node or service is missing in mb_data" + ) + target_ap_account = await self.getAPAccountFromJidAndNode( + service, node + ) + if service.host == self.client.jid.userhost: + # service is a proxy JID for AP account + actor_data = await self.getAPActorDataFromAccount(target_ap_account) + followers = actor_data.get("followers") + else: + # service is a real XMPP entity + followers = self.buildAPURL(TYPE_FOLLOWERS, target_ap_account) + if followers: + ap_object["cc"] = [followers] + if self._m.isCommentNode(node): + parent_item = self._m.getParentItem(node) + if service.host == self.client.jid.userhost(): + # the publication is on a virtual node (i.e. an XMPP node managed by + # this gateway and linking to an ActivityPub actor) + ap_object["inReplyTo"] = parent_item + else: + # the publication is from a followed real XMPP node + ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode( + client, + ap_account, + parent_item, + mb_data + ) ap_item = { "@context": "https://www.w3.org/ns/activitystreams", "id": url_item, "type": "Create", "actor": url_actor, - "object": ap_object } - language = mb_data.get("language") - if language: - ap_object["contentMap"] = {language: ap_object["content"]} - if self._m.isCommentNode(node): - parent_item = self._m.getParentItem(node) - if service.host == self.client.jid.userhost(): - # the publication is on a virtual node (i.e. an XMPP node managed by - # this gateway and linking to an ActivityPub actor) - ap_object["inReplyTo"] = parent_item - else: - # the publication is from a followed real XMPP node - ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode( - client, - ap_account, - parent_item, - mb_data - ) return ap_item @@ -1481,6 +1535,39 @@ if resp.code != 202: raise exceptions.NetworkError(f"unexpected return code: {resp.code}") + @utils.ensure_deferred + async def onMessage(self, message_elt: domish.Element) -> None: + """Called when a XMPP message is received""" + if not self.client: + log.warning(f"no client set, ignoring message: {message_elt.toXml()}") + return + mess_type = message_elt.getAttribute("type") + if mess_type and mess_type not in ("chat", "normal"): + log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}") + return + from_jid = jid.JID(message_elt["from"]) + if not self.isLocal(from_jid): + log.warning(f"ignoring non local message: {message_elt.toXml()}") + return + to_jid = jid.JID(message_elt["to"]) + if not to_jid.user: + log.warning( + f"ignoring message addressed to gateway itself: {message_elt.toXml()}" + ) + return + + actor_account = self._e.unescape(to_jid.user) + actor_id = await self.getAPActorIdFromAccount(actor_account) + inbox = await self.getAPInboxFromId(actor_id) + + mb_data = { + "content": str(message_elt.body), + } + client = self.client.getVirtualClient(from_jid) + ap_item = await self.mbdata2APitem(client, mb_data, public=False) + ap_item["object"]["to"] = ap_item["to"] = [actor_id] + await self.signAndPost(inbox, ap_item["actor"], ap_item) + async def newReplyToXMPPItem( self, client: SatXMPPEntity, @@ -1544,8 +1631,87 @@ @param node: XMPP pubsub node @param item: AP object payload """ + targets: Set[str] = set() + is_public = False + # TODO: handle "audience" + for key in ("to", "bto", "cc", "bcc"): + values = item.get(key) + if not values: + continue + if isinstance(values, str): + values = [values] + for value in values: + if value in PUBLIC_TUPLE: + is_public = True + continue + if not value: + continue + if not self.isLocalURL(value): + continue + targets.add(value) + + targets_types = {self.parseAPURL(t)[0] for t in targets} + if not is_public and targets_types == {TYPE_ACTOR}: + # this is a direct message + await self.handleMessageAPItem( + client, targets, destinee, item + ) + else: + await self.handlePubsubAPItem( + client, targets, destinee, node, item, is_public + ) + + async def handleMessageAPItem( + self, + client: SatXMPPEntity, + targets: Set[str], + destinee: Optional[jid.JID], + item: dict, + ) -> None: + """Parse and deliver direct AP items translating to XMPP messages + + @param targets: actors where the item must be delivered + @param destinee: jid of the destinee, + @param item: AP object payload + """ + targets_jids = {await self.getJIDFromId(t) for t in targets} + if destinee is not None: + targets_jids.add(destinee) + mb_data = await self.apItem2MBdata(item) + defer_l = [] + for target_jid in targets_jids: + defer_l.append( + client.sendMessage( + target_jid, + {'': mb_data.get("content", "")}, + mb_data.get("title"), + + ) + ) + await defer.DeferredList(defer_l) + + async def handlePubsubAPItem( + self, + client: SatXMPPEntity, + targets: Set[str], + destinee: Optional[jid.JID], + node: str, + item: dict, + public: bool + ) -> None: + """Analyse, cache and deliver AP items translating to Pubsub + + @param targets: actors/collections where the item must be delivered + @param destinee: jid of the destinee, + @param node: XMPP pubsub node + @param item: AP object payload + @param public: True if the item is public + """ + # XXX: "public" is not used for now + service = client.jid in_reply_to = item.get("inReplyTo") + if in_reply_to and isinstance(in_reply_to, list): in_reply_to = in_reply_to[0] if in_reply_to and isinstance(in_reply_to, str): @@ -1607,3 +1773,4 @@ node, [(subscription.subscriber, None, [item_elt])] ) +