comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3784:efc34a89e70b

comp AP gateway: message conversion: Convert direct AP items to XMPP `<message>` stanzas, and vice versa. Documentation will follow soon to explain the behaviour. rel 366
author Goffi <goffi@goffi.org>
date Tue, 24 May 2022 17:57:36 +0200
parents 125c7043b277
children 865167c34b82
comparison
equal deleted inserted replaced
3783:fedbf7aade11 3784:efc34a89e70b
21 import hashlib 21 import hashlib
22 import json 22 import json
23 from pathlib import Path 23 from pathlib import Path
24 from pprint import pformat 24 from pprint import pformat
25 import re 25 import re
26 from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload 26 from typing import (
27 Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload
28 )
27 from urllib import parse 29 from urllib import parse
28 30
29 from cryptography.exceptions import InvalidSignature 31 from cryptography.exceptions import InvalidSignature
30 from cryptography.hazmat.primitives import serialization 32 from cryptography.hazmat.primitives import serialization
31 from cryptography.hazmat.primitives import hashes 33 from cryptography.hazmat.primitives import hashes
63 IMPORT_NAME, 65 IMPORT_NAME,
64 LRU_MAX_SIZE, 66 LRU_MAX_SIZE,
65 MEDIA_TYPE_AP, 67 MEDIA_TYPE_AP,
66 TYPE_ACTOR, 68 TYPE_ACTOR,
67 TYPE_ITEM, 69 TYPE_ITEM,
70 TYPE_FOLLOWERS,
71 NS_AP_PUBLIC,
72 PUBLIC_TUPLE
68 ) 73 )
69 from .http_server import HTTPServer 74 from .http_server import HTTPServer
70 from .pubsub_service import APPubsubService 75 from .pubsub_service import APPubsubService
71 76
72 77
78 C.PI_NAME: "ActivityPub Gateway component", 83 C.PI_NAME: "ActivityPub Gateway component",
79 C.PI_IMPORT_NAME: IMPORT_NAME, 84 C.PI_IMPORT_NAME: IMPORT_NAME,
80 C.PI_MODES: [C.PLUG_MODE_COMPONENT], 85 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
81 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, 86 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
82 C.PI_PROTOCOLS: [], 87 C.PI_PROTOCOLS: [],
83 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"], 88 C.PI_DEPENDENCIES: [
89 "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES"
90 ],
84 C.PI_RECOMMENDATIONS: [], 91 C.PI_RECOMMENDATIONS: [],
85 C.PI_MAIN: "APGateway", 92 C.PI_MAIN: "APGateway",
86 C.PI_HANDLER: C.BOOL_TRUE, 93 C.PI_HANDLER: C.BOOL_TRUE,
87 C.PI_DESCRIPTION: _( 94 C.PI_DESCRIPTION: _(
88 "Gateway for bidirectional communication between XMPP and ActivityPub." 95 "Gateway for bidirectional communication between XMPP and ActivityPub."
107 self._pps = host.plugins["XEP-0465"] 114 self._pps = host.plugins["XEP-0465"]
108 self._c = host.plugins["PUBSUB_CACHE"] 115 self._c = host.plugins["PUBSUB_CACHE"]
109 self._p.addManagedNode( 116 self._p.addManagedNode(
110 "", items_cb=self._itemsReceived 117 "", items_cb=self._itemsReceived
111 ) 118 )
119 self._t = host.plugins["TEXT_SYNTAXES"]
112 self.pubsub_service = APPubsubService(self) 120 self.pubsub_service = APPubsubService(self)
113 121
114 host.bridge.addMethod( 122 host.bridge.addMethod(
115 "APSend", 123 "APSend",
116 ".plugin", 124 ".plugin",
215 reactor.listenSSL(self.http_port, self.server, context_factory) 223 reactor.listenSSL(self.http_port, self.server, context_factory)
216 224
217 async def profileConnecting(self, client): 225 async def profileConnecting(self, client):
218 self.client = client 226 self.client = client
219 await self.init(client) 227 await self.init(client)
228
229 def profileConnected(self, client):
230 client.xmlstream.addObserver("/message/body", self.onMessage)
220 231
221 async def _itemsReceived(self, client, itemsEvent): 232 async def _itemsReceived(self, client, itemsEvent):
222 """Callback called when pubsub items are received 233 """Callback called when pubsub items are received
223 234
224 if the items are adressed to a JID corresponding to an AP actor, they are 235 if the items are adressed to a JID corresponding to an AP actor, they are
453 The account construction will use escaping when necessary 464 The account construction will use escaping when necessary
454 """ 465 """
455 if not node or node == self._m.namespace: 466 if not node or node == self._m.namespace:
456 node = None 467 node = None
457 468
469 if self.client is None:
470 raise exceptions.InternalError("Client is not set yet")
471
472 if jid_.host == self.client.jid.userhost():
473 # this is an proxy JId to an AP Actor
474 return self._e.unescape(jid_.user)
475
458 if node and not jid_.user and not self.mustEncode(node): 476 if node and not jid_.user and not self.mustEncode(node):
459 is_pubsub = await self.isPubsub(jid_) 477 is_pubsub = await self.isPubsub(jid_)
460 # when we have a pubsub service, the user part can be used to set the node 478 # when we have a pubsub service, the user part can be used to set the node
461 # this produces more user-friendly AP accounts 479 # this produces more user-friendly AP accounts
462 if is_pubsub: 480 if is_pubsub:
680 if algo != "SHA-256": 698 if algo != "SHA-256":
681 raise NotImplementedError("only SHA-256 is implemented for now") 699 raise NotImplementedError("only SHA-256 is implemented for now")
682 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() 700 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode()
683 701
684 @async_lru(maxsize=LRU_MAX_SIZE) 702 @async_lru(maxsize=LRU_MAX_SIZE)
703 async def getActorData(self, actor_id) -> dict:
704 """Retrieve actor data with LRU cache"""
705 return await self.apGet(actor_id)
706
707 @async_lru(maxsize=LRU_MAX_SIZE)
685 async def getActorPubKeyData( 708 async def getActorPubKeyData(
686 self, 709 self,
687 actor_id: str 710 actor_id: str
688 ) -> Tuple[str, str, rsa.RSAPublicKey]: 711 ) -> Tuple[str, str, rsa.RSAPublicKey]:
689 """Retrieve Public Key data from actor ID 712 """Retrieve Public Key data from actor ID
690 713
691 @param actor_id: actor ID (url) 714 @param actor_id: actor ID (url)
692 @return: key_id, owner and public_key 715 @return: key_id, owner and public_key
693 @raise KeyError: publicKey is missing from actor data 716 @raise KeyError: publicKey is missing from actor data
694 """ 717 """
695 actor_data = await self.apGet(actor_id) 718 actor_data = await self.getActorData(actor_id)
696 pub_key_data = actor_data["publicKey"] 719 pub_key_data = actor_data["publicKey"]
697 key_id = pub_key_data["id"] 720 key_id = pub_key_data["id"]
698 owner = pub_key_data["owner"] 721 owner = pub_key_data["owner"]
699 pub_key_pem = pub_key_data["publicKeyPem"] 722 pub_key_pem = pub_key_data["publicKeyPem"]
700 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) 723 pub_key = serialization.load_pem_public_key(pub_key_pem.encode())
899 922
900 @param account: AP handle (user@domain.tld) 923 @param account: AP handle (user@domain.tld)
901 @return: Actor ID (which is an URL) 924 @return: Actor ID (which is an URL)
902 """ 925 """
903 if account.count("@") != 1 or "/" in account: 926 if account.count("@") != 1 or "/" in account:
904 raise ValueError("Invalid account: {account!r}") 927 raise ValueError(f"Invalid account: {account!r}")
905 host = account.split("@")[1] 928 host = account.split("@")[1]
906 try: 929 try:
907 finger_data = await treq.json_content(await treq.get( 930 finger_data = await treq.json_content(await treq.get(
908 f"https://{host}/.well-known/webfinger?" 931 f"https://{host}/.well-known/webfinger?"
909 f"resource=acct:{parse.quote_plus(account)}", 932 f"resource=acct:{parse.quote_plus(account)}",
910 )) 933 ))
911 except Exception as e: 934 except Exception as e:
912 raise exceptions.DataError(f"Can't get webfinger data: {e}") 935 raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}")
913 for link in finger_data.get("links", []): 936 for link in finger_data.get("links", []):
914 if ( 937 if (
915 link.get("type") == "application/activity+json" 938 link.get("type") == "application/activity+json"
916 and link.get("rel") == "self" 939 and link.get("rel") == "self"
917 ): 940 ):
933 @param account: ActivityPub Actor identifier 956 @param account: ActivityPub Actor identifier
934 """ 957 """
935 href = await self.getAPActorIdFromAccount(account) 958 href = await self.getAPActorIdFromAccount(account)
936 return await self.apGet(href) 959 return await self.apGet(href)
937 960
938 @async_lru(maxsize=LRU_MAX_SIZE)
939 async def getAPInboxFromId(self, actor_id: str) -> str: 961 async def getAPInboxFromId(self, actor_id: str) -> str:
940 """Retrieve inbox of an actor_id""" 962 """Retrieve inbox of an actor_id"""
941 data = await self.apGet(actor_id) 963 data = await self.getActorData(actor_id)
942 return data["inbox"] 964 return data["inbox"]
943 965
944 @async_lru(maxsize=LRU_MAX_SIZE) 966 @async_lru(maxsize=LRU_MAX_SIZE)
945 async def getAPAccountFromId(self, actor_id: str) -> str: 967 async def getAPAccountFromId(self, actor_id: str) -> str:
946 """Retrieve AP account from the ID URL 968 """Retrieve AP account from the ID URL
947 969
948 @param actor_id: AP ID of the actor (URL to the actor data) 970 @param actor_id: AP ID of the actor (URL to the actor data)
949 """ 971 """
950 url_parsed = parse.urlparse(actor_id) 972 url_parsed = parse.urlparse(actor_id)
951 actor_data = await self.apGet(actor_id) 973 actor_data = await self.getActorData(actor_id)
952 username = actor_data.get("preferredUsername") 974 username = actor_data.get("preferredUsername")
953 if not username: 975 if not username:
954 raise exceptions.DataError( 976 raise exceptions.DataError(
955 'No "preferredUsername" field found, can\'t retrieve actor account' 977 'No "preferredUsername" field found, can\'t retrieve actor account'
956 ) 978 )
1267 log.warning(f"no content found:\n{ap_object!r}") 1289 log.warning(f"no content found:\n{ap_object!r}")
1268 raise exceptions.DataError 1290 raise exceptions.DataError
1269 else: 1291 else:
1270 mb_data["language"] = language 1292 mb_data["language"] = language
1271 mb_data["content_xhtml"] = content_xhtml 1293 mb_data["content_xhtml"] = content_xhtml
1294 if not mb_data.get("content"):
1295 mb_data["content"] = await self._t.convert(
1296 content_xhtml,
1297 self._t.SYNTAX_XHTML,
1298 self._t.SYNTAX_TEXT,
1299 False,
1300 )
1272 1301
1273 # author 1302 # author
1274 if is_activity: 1303 if is_activity:
1275 authors = await self.apGetActors(ap_item, "actor") 1304 authors = await self.apGetActors(ap_item, "actor")
1276 else: 1305 else:
1377 1406
1378 return self.buildAPURL( 1407 return self.buildAPURL(
1379 TYPE_ITEM, parent_ap_account, parent_item 1408 TYPE_ITEM, parent_ap_account, parent_item
1380 ) 1409 )
1381 1410
1382 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: 1411 async def mbdata2APitem(
1383 """Convert Libervia Microblog Data to ActivityPub item""" 1412 self,
1384 try: 1413 client: SatXMPPEntity,
1385 node = mb_data["node"] 1414 mb_data: dict,
1386 service = jid.JID(mb_data["service"]) 1415 public=True
1387 except KeyError: 1416 ) -> dict:
1388 # node and service must always be specified when this method is used 1417 """Convert Libervia Microblog Data to ActivityPub item
1389 raise exceptions.InternalError( 1418
1390 "node or service is missing in mb_data" 1419 @param mb_data: microblog data (as used in plugin XEP-0277) to convert
1391 ) 1420 If ``public`` is True, ``service`` and ``node`` keys must be set.
1421 If ``published`` is not set, current datetime will be used
1422 @param public: True if the message is not a private/direct one
1423 if True, the AP Item will be marked as public, and AP followers of target AP
1424 account (which retrieve from ``service``) will be put in ``cc``.
1425 ``inReplyTo`` will also be set if suitable
1426 if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
1427 This is usually used for direct messages.
1428 @return: AP item
1429 """
1392 if not mb_data.get("id"): 1430 if not mb_data.get("id"):
1393 mb_data["id"] = shortuuid.uuid() 1431 mb_data["id"] = shortuuid.uuid()
1394 if not mb_data.get("author_jid"): 1432 if not mb_data.get("author_jid"):
1395 mb_data["author_jid"] = client.jid.full() 1433 mb_data["author_jid"] = client.jid.userhost()
1396 ap_account = await self.getAPAccountFromJidAndNode( 1434 ap_account = await self.getAPAccountFromJidAndNode(
1397 jid.JID(mb_data["author_jid"]), 1435 jid.JID(mb_data["author_jid"]),
1398 None 1436 None
1399 ) 1437 )
1400 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) 1438 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account)
1401 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) 1439 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"])
1402 ap_object = { 1440 ap_object = {
1403 "id": url_item, 1441 "id": url_item,
1404 "type": "Note", 1442 "type": "Note",
1405 "published": utils.xmpp_date(mb_data["published"]), 1443 "published": utils.xmpp_date(mb_data.get("published")),
1406 "attributedTo": url_actor, 1444 "attributedTo": url_actor,
1407 "content": mb_data.get("content_xhtml") or mb_data["content"], 1445 "content": mb_data.get("content_xhtml") or mb_data["content"],
1408 "to": ["https://www.w3.org/ns/activitystreams#Public"],
1409 } 1446 }
1410 1447
1411 target_ap_account = self._e.unescape(service.user) 1448 language = mb_data.get("language")
1412 actor_data = await self.getAPActorDataFromAccount(target_ap_account) 1449 if language:
1413 followers = actor_data.get("followers") 1450 ap_object["contentMap"] = {language: ap_object["content"]}
1414 if followers: 1451
1415 ap_object["cc"] = [followers] 1452 if public:
1453 ap_object["to"] = [NS_AP_PUBLIC]
1454 try:
1455 node = mb_data["node"]
1456 service = jid.JID(mb_data["service"])
1457 except KeyError:
1458 # node and service must always be specified when this method is used
1459 raise exceptions.InternalError(
1460 "node or service is missing in mb_data"
1461 )
1462 target_ap_account = await self.getAPAccountFromJidAndNode(
1463 service, node
1464 )
1465 if service.host == self.client.jid.userhost:
1466 # service is a proxy JID for AP account
1467 actor_data = await self.getAPActorDataFromAccount(target_ap_account)
1468 followers = actor_data.get("followers")
1469 else:
1470 # service is a real XMPP entity
1471 followers = self.buildAPURL(TYPE_FOLLOWERS, target_ap_account)
1472 if followers:
1473 ap_object["cc"] = [followers]
1474 if self._m.isCommentNode(node):
1475 parent_item = self._m.getParentItem(node)
1476 if service.host == self.client.jid.userhost():
1477 # the publication is on a virtual node (i.e. an XMPP node managed by
1478 # this gateway and linking to an ActivityPub actor)
1479 ap_object["inReplyTo"] = parent_item
1480 else:
1481 # the publication is from a followed real XMPP node
1482 ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
1483 client,
1484 ap_account,
1485 parent_item,
1486 mb_data
1487 )
1416 1488
1417 ap_item = { 1489 ap_item = {
1418 "@context": "https://www.w3.org/ns/activitystreams", 1490 "@context": "https://www.w3.org/ns/activitystreams",
1419 "id": url_item, 1491 "id": url_item,
1420 "type": "Create", 1492 "type": "Create",
1421 "actor": url_actor, 1493 "actor": url_actor,
1422
1423 "object": ap_object 1494 "object": ap_object
1424 } 1495 }
1425 language = mb_data.get("language")
1426 if language:
1427 ap_object["contentMap"] = {language: ap_object["content"]}
1428 if self._m.isCommentNode(node):
1429 parent_item = self._m.getParentItem(node)
1430 if service.host == self.client.jid.userhost():
1431 # the publication is on a virtual node (i.e. an XMPP node managed by
1432 # this gateway and linking to an ActivityPub actor)
1433 ap_object["inReplyTo"] = parent_item
1434 else:
1435 # the publication is from a followed real XMPP node
1436 ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode(
1437 client,
1438 ap_account,
1439 parent_item,
1440 mb_data
1441 )
1442 1496
1443 return ap_item 1497 return ap_item
1444 1498
1445 async def publishMessage( 1499 async def publishMessage(
1446 self, 1500 self,
1478 item_data = await self.mbdata2APitem(client, mess_data) 1532 item_data = await self.mbdata2APitem(client, mess_data)
1479 url_actor = item_data["object"]["attributedTo"] 1533 url_actor = item_data["object"]["attributedTo"]
1480 resp = await self.signAndPost(inbox_url, url_actor, item_data) 1534 resp = await self.signAndPost(inbox_url, url_actor, item_data)
1481 if resp.code != 202: 1535 if resp.code != 202:
1482 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") 1536 raise exceptions.NetworkError(f"unexpected return code: {resp.code}")
1537
1538 @utils.ensure_deferred
1539 async def onMessage(self, message_elt: domish.Element) -> None:
1540 """Called when a XMPP message is received"""
1541 if not self.client:
1542 log.warning(f"no client set, ignoring message: {message_elt.toXml()}")
1543 return
1544 mess_type = message_elt.getAttribute("type")
1545 if mess_type and mess_type not in ("chat", "normal"):
1546 log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}")
1547 return
1548 from_jid = jid.JID(message_elt["from"])
1549 if not self.isLocal(from_jid):
1550 log.warning(f"ignoring non local message: {message_elt.toXml()}")
1551 return
1552 to_jid = jid.JID(message_elt["to"])
1553 if not to_jid.user:
1554 log.warning(
1555 f"ignoring message addressed to gateway itself: {message_elt.toXml()}"
1556 )
1557 return
1558
1559 actor_account = self._e.unescape(to_jid.user)
1560 actor_id = await self.getAPActorIdFromAccount(actor_account)
1561 inbox = await self.getAPInboxFromId(actor_id)
1562
1563 mb_data = {
1564 "content": str(message_elt.body),
1565 }
1566 client = self.client.getVirtualClient(from_jid)
1567 ap_item = await self.mbdata2APitem(client, mb_data, public=False)
1568 ap_item["object"]["to"] = ap_item["to"] = [actor_id]
1569 await self.signAndPost(inbox, ap_item["actor"], ap_item)
1483 1570
1484 async def newReplyToXMPPItem( 1571 async def newReplyToXMPPItem(
1485 self, 1572 self,
1486 client: SatXMPPEntity, 1573 client: SatXMPPEntity,
1487 ap_item: dict, 1574 ap_item: dict,
1542 1629
1543 @param destinee: jid of the destinee, 1630 @param destinee: jid of the destinee,
1544 @param node: XMPP pubsub node 1631 @param node: XMPP pubsub node
1545 @param item: AP object payload 1632 @param item: AP object payload
1546 """ 1633 """
1634 targets: Set[str] = set()
1635 is_public = False
1636 # TODO: handle "audience"
1637 for key in ("to", "bto", "cc", "bcc"):
1638 values = item.get(key)
1639 if not values:
1640 continue
1641 if isinstance(values, str):
1642 values = [values]
1643 for value in values:
1644 if value in PUBLIC_TUPLE:
1645 is_public = True
1646 continue
1647 if not value:
1648 continue
1649 if not self.isLocalURL(value):
1650 continue
1651 targets.add(value)
1652
1653 targets_types = {self.parseAPURL(t)[0] for t in targets}
1654 if not is_public and targets_types == {TYPE_ACTOR}:
1655 # this is a direct message
1656 await self.handleMessageAPItem(
1657 client, targets, destinee, item
1658 )
1659 else:
1660 await self.handlePubsubAPItem(
1661 client, targets, destinee, node, item, is_public
1662 )
1663
1664 async def handleMessageAPItem(
1665 self,
1666 client: SatXMPPEntity,
1667 targets: Set[str],
1668 destinee: Optional[jid.JID],
1669 item: dict,
1670 ) -> None:
1671 """Parse and deliver direct AP items translating to XMPP messages
1672
1673 @param targets: actors where the item must be delivered
1674 @param destinee: jid of the destinee,
1675 @param item: AP object payload
1676 """
1677 targets_jids = {await self.getJIDFromId(t) for t in targets}
1678 if destinee is not None:
1679 targets_jids.add(destinee)
1680 mb_data = await self.apItem2MBdata(item)
1681 defer_l = []
1682 for target_jid in targets_jids:
1683 defer_l.append(
1684 client.sendMessage(
1685 target_jid,
1686 {'': mb_data.get("content", "")},
1687 mb_data.get("title"),
1688
1689 )
1690 )
1691 await defer.DeferredList(defer_l)
1692
1693 async def handlePubsubAPItem(
1694 self,
1695 client: SatXMPPEntity,
1696 targets: Set[str],
1697 destinee: Optional[jid.JID],
1698 node: str,
1699 item: dict,
1700 public: bool
1701 ) -> None:
1702 """Analyse, cache and deliver AP items translating to Pubsub
1703
1704 @param targets: actors/collections where the item must be delivered
1705 @param destinee: jid of the destinee,
1706 @param node: XMPP pubsub node
1707 @param item: AP object payload
1708 @param public: True if the item is public
1709 """
1710 # XXX: "public" is not used for now
1711
1547 service = client.jid 1712 service = client.jid
1548 in_reply_to = item.get("inReplyTo") 1713 in_reply_to = item.get("inReplyTo")
1714
1549 if in_reply_to and isinstance(in_reply_to, list): 1715 if in_reply_to and isinstance(in_reply_to, list):
1550 in_reply_to = in_reply_to[0] 1716 in_reply_to = in_reply_to[0]
1551 if in_reply_to and isinstance(in_reply_to, str): 1717 if in_reply_to and isinstance(in_reply_to, str):
1552 if self.isLocalURL(in_reply_to): 1718 if self.isLocalURL(in_reply_to):
1553 # this is a reply to an XMPP item 1719 # this is a reply to an XMPP item
1605 self.pubsub_service.notifyPublish( 1771 self.pubsub_service.notifyPublish(
1606 service, 1772 service,
1607 node, 1773 node,
1608 [(subscription.subscriber, None, [item_elt])] 1774 [(subscription.subscriber, None, [item_elt])]
1609 ) 1775 )
1776