# HG changeset patch # User Goffi # Date 1653407856 -7200 # Node ID efc34a89e70bc52008abb6f29e467b6cdaaff5b3 # Parent fedbf7aade11038c477ea6aaf145406fbb4572d4 comp AP gateway: message conversion: Convert direct AP items to XMPP `` stanzas, and vice versa. Documentation will follow soon to explain the behaviour. rel 366 diff -r fedbf7aade11 -r efc34a89e70b sat/core/xmpp.py --- a/sat/core/xmpp.py Tue May 24 17:49:14 2022 +0200 +++ b/sat/core/xmpp.py Tue May 24 17:57:36 2022 +0200 @@ -612,6 +612,43 @@ """ raise NotImplementedError + def send(self, obj): + # original send method accept string + # but we restrict to domish.Element to make trigger treatments easier + assert isinstance(obj, domish.Element) + # XXX: this trigger is the last one before sending stanza on wire + # it is intended for things like end 2 end encryption. + # *DO NOT* cancel (i.e. return False) without very good reason + # (out of band transmission for instance). + # e2e should have a priority of 0 here, and out of band transmission + # a lower priority + #  FIXME: trigger not used yet, can be uncommented when e2e full stanza + # encryption is implemented + #  if not self.host_app.trigger.point("send", self, obj): + #   return + super().send(obj) + + @defer.inlineCallbacks + def sendMessageData(self, mess_data): + """Convenient method to send message data to stream + + This method will send mess_data[u'xml'] to stream, but a trigger is there + The trigger can't be cancelled, it's a good place for e2e encryption which + don't handle full stanza encryption + This trigger can return a Deferred (it's an asyncPoint) + @param mess_data(dict): message data as constructed by onMessage workflow + @return (dict): mess_data (so it can be used in a deferred chain) + """ + # XXX: This is the last trigger before u"send" (last but one globally) + # for sending message. + # This is intented for e2e encryption which doesn't do full stanza + # encryption (e.g. OTR) + # This trigger point can't cancel the method + yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data, + triggers_no_cancel=True) + self.send(mess_data["xml"]) + defer.returnValue(mess_data) + def sendMessage( self, to_jid, message, subject=None, mess_type="auto", extra=None, uid=None, no_trigger=False): @@ -659,8 +696,7 @@ # we try to guess the type if data["subject"]: data["type"] = C.MESS_TYPE_NORMAL - elif not data["to"].resource: # if to JID has a resource, - # the type is not 'groupchat' + elif not data["to"].resource: # we may have a groupchat message, we check if the we know this jid try: entity_type = self.host_app.memory.getEntityDatum( @@ -675,8 +711,7 @@ else: data["type"] = C.MESS_TYPE_CHAT else: - data["type"] == C.MESS_TYPE_CHAT - data["type"] == C.MESS_TYPE_CHAT if data["subject"] else C.MESS_TYPE_NORMAL + data["type"] = C.MESS_TYPE_CHAT # FIXME: send_only is used by libervia's OTR plugin to avoid # the triggers from frontend, and no_trigger do the same @@ -895,43 +930,6 @@ ) post_xml_treatments.addCallback(self.messageSendToBridge) - def send(self, obj): - # original send method accept string - # but we restrict to domish.Element to make trigger treatments easier - assert isinstance(obj, domish.Element) - # XXX: this trigger is the last one before sending stanza on wire - # it is intended for things like end 2 end encryption. - # *DO NOT* cancel (i.e. return False) without very good reason - # (out of band transmission for instance). - # e2e should have a priority of 0 here, and out of band transmission - # a lower priority - #  FIXME: trigger not used yet, can be uncommented when e2e full stanza - # encryption is implemented - #  if not self.host_app.trigger.point("send", self, obj): - #   return - super(SatXMPPClient, self).send(obj) - - @defer.inlineCallbacks - def sendMessageData(self, mess_data): - """Convenient method to send message data to stream - - This method will send mess_data[u'xml'] to stream, but a trigger is there - The trigger can't be cancelled, it's a good place for e2e encryption which - don't handle full stanza encryption - This trigger can return a Deferred (it's an asyncPoint) - @param mess_data(dict): message data as constructed by onMessage workflow - @return (dict): mess_data (so it can be used in a deferred chain) - """ - # XXX: This is the last trigger before u"send" (last but one globally) - # for sending message. - # This is intented for e2e encryption which doesn't do full stanza - # encryption (e.g. OTR) - # This trigger point can't cancel the method - yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data, - triggers_no_cancel=True) - self.send(mess_data["xml"]) - defer.returnValue(mess_data) - def feedback(self, to_jid, message, extra=None): """Send message to frontends diff -r fedbf7aade11 -r efc34a89e70b sat/plugins/plugin_comp_ap_gateway/__init__.py --- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Tue May 24 17:49:14 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Tue May 24 17:57:36 2022 +0200 @@ -23,7 +23,9 @@ from pathlib import Path from pprint import pformat import re -from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload +from typing import ( + Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload +) from urllib import parse from cryptography.exceptions import InvalidSignature @@ -65,6 +67,9 @@ MEDIA_TYPE_AP, TYPE_ACTOR, TYPE_ITEM, + TYPE_FOLLOWERS, + NS_AP_PUBLIC, + PUBLIC_TUPLE ) from .http_server import HTTPServer from .pubsub_service import APPubsubService @@ -80,7 +85,9 @@ C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], - C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"], + C.PI_DEPENDENCIES: [ + "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES" + ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", C.PI_HANDLER: C.BOOL_TRUE, @@ -109,6 +116,7 @@ self._p.addManagedNode( "", items_cb=self._itemsReceived ) + self._t = host.plugins["TEXT_SYNTAXES"] self.pubsub_service = APPubsubService(self) host.bridge.addMethod( @@ -218,6 +226,9 @@ self.client = client await self.init(client) + def profileConnected(self, client): + client.xmlstream.addObserver("/message/body", self.onMessage) + async def _itemsReceived(self, client, itemsEvent): """Callback called when pubsub items are received @@ -455,6 +466,13 @@ if not node or node == self._m.namespace: node = None + if self.client is None: + raise exceptions.InternalError("Client is not set yet") + + if jid_.host == self.client.jid.userhost(): + # this is an proxy JId to an AP Actor + return self._e.unescape(jid_.user) + if node and not jid_.user and not self.mustEncode(node): is_pubsub = await self.isPubsub(jid_) # when we have a pubsub service, the user part can be used to set the node @@ -682,6 +700,11 @@ return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() @async_lru(maxsize=LRU_MAX_SIZE) + async def getActorData(self, actor_id) -> dict: + """Retrieve actor data with LRU cache""" + return await self.apGet(actor_id) + + @async_lru(maxsize=LRU_MAX_SIZE) async def getActorPubKeyData( self, actor_id: str @@ -692,7 +715,7 @@ @return: key_id, owner and public_key @raise KeyError: publicKey is missing from actor data """ - actor_data = await self.apGet(actor_id) + actor_data = await self.getActorData(actor_id) pub_key_data = actor_data["publicKey"] key_id = pub_key_data["id"] owner = pub_key_data["owner"] @@ -901,7 +924,7 @@ @return: Actor ID (which is an URL) """ if account.count("@") != 1 or "/" in account: - raise ValueError("Invalid account: {account!r}") + raise ValueError(f"Invalid account: {account!r}") host = account.split("@")[1] try: finger_data = await treq.json_content(await treq.get( @@ -909,7 +932,7 @@ f"resource=acct:{parse.quote_plus(account)}", )) except Exception as e: - raise exceptions.DataError(f"Can't get webfinger data: {e}") + raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") for link in finger_data.get("links", []): if ( link.get("type") == "application/activity+json" @@ -935,10 +958,9 @@ href = await self.getAPActorIdFromAccount(account) return await self.apGet(href) - @async_lru(maxsize=LRU_MAX_SIZE) async def getAPInboxFromId(self, actor_id: str) -> str: """Retrieve inbox of an actor_id""" - data = await self.apGet(actor_id) + data = await self.getActorData(actor_id) return data["inbox"] @async_lru(maxsize=LRU_MAX_SIZE) @@ -948,7 +970,7 @@ @param actor_id: AP ID of the actor (URL to the actor data) """ url_parsed = parse.urlparse(actor_id) - actor_data = await self.apGet(actor_id) + actor_data = await self.getActorData(actor_id) username = actor_data.get("preferredUsername") if not username: raise exceptions.DataError( @@ -1269,6 +1291,13 @@ else: mb_data["language"] = language mb_data["content_xhtml"] = content_xhtml + if not mb_data.get("content"): + mb_data["content"] = await self._t.convert( + content_xhtml, + self._t.SYNTAX_XHTML, + self._t.SYNTAX_TEXT, + False, + ) # author if is_activity: @@ -1379,20 +1408,29 @@ TYPE_ITEM, parent_ap_account, parent_item ) - async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: - """Convert Libervia Microblog Data to ActivityPub item""" - try: - node = mb_data["node"] - service = jid.JID(mb_data["service"]) - except KeyError: - # node and service must always be specified when this method is used - raise exceptions.InternalError( - "node or service is missing in mb_data" - ) + async def mbdata2APitem( + self, + client: SatXMPPEntity, + mb_data: dict, + public=True + ) -> dict: + """Convert Libervia Microblog Data to ActivityPub item + + @param mb_data: microblog data (as used in plugin XEP-0277) to convert + If ``public`` is True, ``service`` and ``node`` keys must be set. + If ``published`` is not set, current datetime will be used + @param public: True if the message is not a private/direct one + if True, the AP Item will be marked as public, and AP followers of target AP + account (which retrieve from ``service``) will be put in ``cc``. + ``inReplyTo`` will also be set if suitable + if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). + This is usually used for direct messages. + @return: AP item + """ if not mb_data.get("id"): mb_data["id"] = shortuuid.uuid() if not mb_data.get("author_jid"): - mb_data["author_jid"] = client.jid.full() + mb_data["author_jid"] = client.jid.userhost() ap_account = await self.getAPAccountFromJidAndNode( jid.JID(mb_data["author_jid"]), None @@ -1402,43 +1440,59 @@ ap_object = { "id": url_item, "type": "Note", - "published": utils.xmpp_date(mb_data["published"]), + "published": utils.xmpp_date(mb_data.get("published")), "attributedTo": url_actor, "content": mb_data.get("content_xhtml") or mb_data["content"], - "to": ["https://www.w3.org/ns/activitystreams#Public"], } - target_ap_account = self._e.unescape(service.user) - actor_data = await self.getAPActorDataFromAccount(target_ap_account) - followers = actor_data.get("followers") - if followers: - ap_object["cc"] = [followers] + language = mb_data.get("language") + if language: + ap_object["contentMap"] = {language: ap_object["content"]} + + if public: + ap_object["to"] = [NS_AP_PUBLIC] + try: + node = mb_data["node"] + service = jid.JID(mb_data["service"]) + except KeyError: + # node and service must always be specified when this method is used + raise exceptions.InternalError( + "node or service is missing in mb_data" + ) + target_ap_account = await self.getAPAccountFromJidAndNode( + service, node + ) + if service.host == self.client.jid.userhost: + # service is a proxy JID for AP account + actor_data = await self.getAPActorDataFromAccount(target_ap_account) + followers = actor_data.get("followers") + else: + # service is a real XMPP entity + followers = self.buildAPURL(TYPE_FOLLOWERS, target_ap_account) + if followers: + ap_object["cc"] = [followers] + if self._m.isCommentNode(node): + parent_item = self._m.getParentItem(node) + if service.host == self.client.jid.userhost(): + # the publication is on a virtual node (i.e. an XMPP node managed by + # this gateway and linking to an ActivityPub actor) + ap_object["inReplyTo"] = parent_item + else: + # the publication is from a followed real XMPP node + ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode( + client, + ap_account, + parent_item, + mb_data + ) ap_item = { "@context": "https://www.w3.org/ns/activitystreams", "id": url_item, "type": "Create", "actor": url_actor, - "object": ap_object } - language = mb_data.get("language") - if language: - ap_object["contentMap"] = {language: ap_object["content"]} - if self._m.isCommentNode(node): - parent_item = self._m.getParentItem(node) - if service.host == self.client.jid.userhost(): - # the publication is on a virtual node (i.e. an XMPP node managed by - # this gateway and linking to an ActivityPub actor) - ap_object["inReplyTo"] = parent_item - else: - # the publication is from a followed real XMPP node - ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode( - client, - ap_account, - parent_item, - mb_data - ) return ap_item @@ -1481,6 +1535,39 @@ if resp.code != 202: raise exceptions.NetworkError(f"unexpected return code: {resp.code}") + @utils.ensure_deferred + async def onMessage(self, message_elt: domish.Element) -> None: + """Called when a XMPP message is received""" + if not self.client: + log.warning(f"no client set, ignoring message: {message_elt.toXml()}") + return + mess_type = message_elt.getAttribute("type") + if mess_type and mess_type not in ("chat", "normal"): + log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}") + return + from_jid = jid.JID(message_elt["from"]) + if not self.isLocal(from_jid): + log.warning(f"ignoring non local message: {message_elt.toXml()}") + return + to_jid = jid.JID(message_elt["to"]) + if not to_jid.user: + log.warning( + f"ignoring message addressed to gateway itself: {message_elt.toXml()}" + ) + return + + actor_account = self._e.unescape(to_jid.user) + actor_id = await self.getAPActorIdFromAccount(actor_account) + inbox = await self.getAPInboxFromId(actor_id) + + mb_data = { + "content": str(message_elt.body), + } + client = self.client.getVirtualClient(from_jid) + ap_item = await self.mbdata2APitem(client, mb_data, public=False) + ap_item["object"]["to"] = ap_item["to"] = [actor_id] + await self.signAndPost(inbox, ap_item["actor"], ap_item) + async def newReplyToXMPPItem( self, client: SatXMPPEntity, @@ -1544,8 +1631,87 @@ @param node: XMPP pubsub node @param item: AP object payload """ + targets: Set[str] = set() + is_public = False + # TODO: handle "audience" + for key in ("to", "bto", "cc", "bcc"): + values = item.get(key) + if not values: + continue + if isinstance(values, str): + values = [values] + for value in values: + if value in PUBLIC_TUPLE: + is_public = True + continue + if not value: + continue + if not self.isLocalURL(value): + continue + targets.add(value) + + targets_types = {self.parseAPURL(t)[0] for t in targets} + if not is_public and targets_types == {TYPE_ACTOR}: + # this is a direct message + await self.handleMessageAPItem( + client, targets, destinee, item + ) + else: + await self.handlePubsubAPItem( + client, targets, destinee, node, item, is_public + ) + + async def handleMessageAPItem( + self, + client: SatXMPPEntity, + targets: Set[str], + destinee: Optional[jid.JID], + item: dict, + ) -> None: + """Parse and deliver direct AP items translating to XMPP messages + + @param targets: actors where the item must be delivered + @param destinee: jid of the destinee, + @param item: AP object payload + """ + targets_jids = {await self.getJIDFromId(t) for t in targets} + if destinee is not None: + targets_jids.add(destinee) + mb_data = await self.apItem2MBdata(item) + defer_l = [] + for target_jid in targets_jids: + defer_l.append( + client.sendMessage( + target_jid, + {'': mb_data.get("content", "")}, + mb_data.get("title"), + + ) + ) + await defer.DeferredList(defer_l) + + async def handlePubsubAPItem( + self, + client: SatXMPPEntity, + targets: Set[str], + destinee: Optional[jid.JID], + node: str, + item: dict, + public: bool + ) -> None: + """Analyse, cache and deliver AP items translating to Pubsub + + @param targets: actors/collections where the item must be delivered + @param destinee: jid of the destinee, + @param node: XMPP pubsub node + @param item: AP object payload + @param public: True if the item is public + """ + # XXX: "public" is not used for now + service = client.jid in_reply_to = item.get("inReplyTo") + if in_reply_to and isinstance(in_reply_to, list): in_reply_to = in_reply_to[0] if in_reply_to and isinstance(in_reply_to, str): @@ -1607,3 +1773,4 @@ node, [(subscription.subscriber, None, [item_elt])] ) + diff -r fedbf7aade11 -r efc34a89e70b sat/plugins/plugin_comp_ap_gateway/constants.py --- a/sat/plugins/plugin_comp_ap_gateway/constants.py Tue May 24 17:49:14 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/constants.py Tue May 24 17:57:36 2022 +0200 @@ -29,6 +29,8 @@ TYPE_ITEM = "item" MEDIA_TYPE_AP = "application/activity+json" NS_AP_PUBLIC = "https://www.w3.org/ns/activitystreams#Public" +# 3 values can be used, see https://www.w3.org/TR/activitypub/#public-addressing +PUBLIC_TUPLE = (NS_AP_PUBLIC, "as:Public", "Public") # mapping from AP metadata to microblog data AP_MB_MAP = { "content": "content_xhtml",