Mercurial > libervia-backend
comparison sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3784:efc34a89e70b
comp AP gateway: message conversion:
Convert direct AP items to XMPP `<message>` stanzas, and vice versa.
Documentation will follow soon to explain the behaviour.
rel 366
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 24 May 2022 17:57:36 +0200 |
parents | 125c7043b277 |
children | 865167c34b82 |
comparison
equal
deleted
inserted
replaced
3783:fedbf7aade11 | 3784:efc34a89e70b |
---|---|
21 import hashlib | 21 import hashlib |
22 import json | 22 import json |
23 from pathlib import Path | 23 from pathlib import Path |
24 from pprint import pformat | 24 from pprint import pformat |
25 import re | 25 import re |
26 from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Awaitable, overload | 26 from typing import ( |
27 Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload | |
28 ) | |
27 from urllib import parse | 29 from urllib import parse |
28 | 30 |
29 from cryptography.exceptions import InvalidSignature | 31 from cryptography.exceptions import InvalidSignature |
30 from cryptography.hazmat.primitives import serialization | 32 from cryptography.hazmat.primitives import serialization |
31 from cryptography.hazmat.primitives import hashes | 33 from cryptography.hazmat.primitives import hashes |
63 IMPORT_NAME, | 65 IMPORT_NAME, |
64 LRU_MAX_SIZE, | 66 LRU_MAX_SIZE, |
65 MEDIA_TYPE_AP, | 67 MEDIA_TYPE_AP, |
66 TYPE_ACTOR, | 68 TYPE_ACTOR, |
67 TYPE_ITEM, | 69 TYPE_ITEM, |
70 TYPE_FOLLOWERS, | |
71 NS_AP_PUBLIC, | |
72 PUBLIC_TUPLE | |
68 ) | 73 ) |
69 from .http_server import HTTPServer | 74 from .http_server import HTTPServer |
70 from .pubsub_service import APPubsubService | 75 from .pubsub_service import APPubsubService |
71 | 76 |
72 | 77 |
78 C.PI_NAME: "ActivityPub Gateway component", | 83 C.PI_NAME: "ActivityPub Gateway component", |
79 C.PI_IMPORT_NAME: IMPORT_NAME, | 84 C.PI_IMPORT_NAME: IMPORT_NAME, |
80 C.PI_MODES: [C.PLUG_MODE_COMPONENT], | 85 C.PI_MODES: [C.PLUG_MODE_COMPONENT], |
81 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, | 86 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, |
82 C.PI_PROTOCOLS: [], | 87 C.PI_PROTOCOLS: [], |
83 C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE"], | 88 C.PI_DEPENDENCIES: [ |
89 "XEP-0106", "XEP-0277", "XEP-0060", "XEP-0465", "PUBSUB_CACHE", "TEXT_SYNTAXES" | |
90 ], | |
84 C.PI_RECOMMENDATIONS: [], | 91 C.PI_RECOMMENDATIONS: [], |
85 C.PI_MAIN: "APGateway", | 92 C.PI_MAIN: "APGateway", |
86 C.PI_HANDLER: C.BOOL_TRUE, | 93 C.PI_HANDLER: C.BOOL_TRUE, |
87 C.PI_DESCRIPTION: _( | 94 C.PI_DESCRIPTION: _( |
88 "Gateway for bidirectional communication between XMPP and ActivityPub." | 95 "Gateway for bidirectional communication between XMPP and ActivityPub." |
107 self._pps = host.plugins["XEP-0465"] | 114 self._pps = host.plugins["XEP-0465"] |
108 self._c = host.plugins["PUBSUB_CACHE"] | 115 self._c = host.plugins["PUBSUB_CACHE"] |
109 self._p.addManagedNode( | 116 self._p.addManagedNode( |
110 "", items_cb=self._itemsReceived | 117 "", items_cb=self._itemsReceived |
111 ) | 118 ) |
119 self._t = host.plugins["TEXT_SYNTAXES"] | |
112 self.pubsub_service = APPubsubService(self) | 120 self.pubsub_service = APPubsubService(self) |
113 | 121 |
114 host.bridge.addMethod( | 122 host.bridge.addMethod( |
115 "APSend", | 123 "APSend", |
116 ".plugin", | 124 ".plugin", |
215 reactor.listenSSL(self.http_port, self.server, context_factory) | 223 reactor.listenSSL(self.http_port, self.server, context_factory) |
216 | 224 |
217 async def profileConnecting(self, client): | 225 async def profileConnecting(self, client): |
218 self.client = client | 226 self.client = client |
219 await self.init(client) | 227 await self.init(client) |
228 | |
229 def profileConnected(self, client): | |
230 client.xmlstream.addObserver("/message/body", self.onMessage) | |
220 | 231 |
221 async def _itemsReceived(self, client, itemsEvent): | 232 async def _itemsReceived(self, client, itemsEvent): |
222 """Callback called when pubsub items are received | 233 """Callback called when pubsub items are received |
223 | 234 |
224 if the items are adressed to a JID corresponding to an AP actor, they are | 235 if the items are adressed to a JID corresponding to an AP actor, they are |
453 The account construction will use escaping when necessary | 464 The account construction will use escaping when necessary |
454 """ | 465 """ |
455 if not node or node == self._m.namespace: | 466 if not node or node == self._m.namespace: |
456 node = None | 467 node = None |
457 | 468 |
469 if self.client is None: | |
470 raise exceptions.InternalError("Client is not set yet") | |
471 | |
472 if jid_.host == self.client.jid.userhost(): | |
473 # this is an proxy JId to an AP Actor | |
474 return self._e.unescape(jid_.user) | |
475 | |
458 if node and not jid_.user and not self.mustEncode(node): | 476 if node and not jid_.user and not self.mustEncode(node): |
459 is_pubsub = await self.isPubsub(jid_) | 477 is_pubsub = await self.isPubsub(jid_) |
460 # when we have a pubsub service, the user part can be used to set the node | 478 # when we have a pubsub service, the user part can be used to set the node |
461 # this produces more user-friendly AP accounts | 479 # this produces more user-friendly AP accounts |
462 if is_pubsub: | 480 if is_pubsub: |
680 if algo != "SHA-256": | 698 if algo != "SHA-256": |
681 raise NotImplementedError("only SHA-256 is implemented for now") | 699 raise NotImplementedError("only SHA-256 is implemented for now") |
682 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() | 700 return algo, base64.b64encode(hashlib.sha256(body).digest()).decode() |
683 | 701 |
684 @async_lru(maxsize=LRU_MAX_SIZE) | 702 @async_lru(maxsize=LRU_MAX_SIZE) |
703 async def getActorData(self, actor_id) -> dict: | |
704 """Retrieve actor data with LRU cache""" | |
705 return await self.apGet(actor_id) | |
706 | |
707 @async_lru(maxsize=LRU_MAX_SIZE) | |
685 async def getActorPubKeyData( | 708 async def getActorPubKeyData( |
686 self, | 709 self, |
687 actor_id: str | 710 actor_id: str |
688 ) -> Tuple[str, str, rsa.RSAPublicKey]: | 711 ) -> Tuple[str, str, rsa.RSAPublicKey]: |
689 """Retrieve Public Key data from actor ID | 712 """Retrieve Public Key data from actor ID |
690 | 713 |
691 @param actor_id: actor ID (url) | 714 @param actor_id: actor ID (url) |
692 @return: key_id, owner and public_key | 715 @return: key_id, owner and public_key |
693 @raise KeyError: publicKey is missing from actor data | 716 @raise KeyError: publicKey is missing from actor data |
694 """ | 717 """ |
695 actor_data = await self.apGet(actor_id) | 718 actor_data = await self.getActorData(actor_id) |
696 pub_key_data = actor_data["publicKey"] | 719 pub_key_data = actor_data["publicKey"] |
697 key_id = pub_key_data["id"] | 720 key_id = pub_key_data["id"] |
698 owner = pub_key_data["owner"] | 721 owner = pub_key_data["owner"] |
699 pub_key_pem = pub_key_data["publicKeyPem"] | 722 pub_key_pem = pub_key_data["publicKeyPem"] |
700 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) | 723 pub_key = serialization.load_pem_public_key(pub_key_pem.encode()) |
899 | 922 |
900 @param account: AP handle (user@domain.tld) | 923 @param account: AP handle (user@domain.tld) |
901 @return: Actor ID (which is an URL) | 924 @return: Actor ID (which is an URL) |
902 """ | 925 """ |
903 if account.count("@") != 1 or "/" in account: | 926 if account.count("@") != 1 or "/" in account: |
904 raise ValueError("Invalid account: {account!r}") | 927 raise ValueError(f"Invalid account: {account!r}") |
905 host = account.split("@")[1] | 928 host = account.split("@")[1] |
906 try: | 929 try: |
907 finger_data = await treq.json_content(await treq.get( | 930 finger_data = await treq.json_content(await treq.get( |
908 f"https://{host}/.well-known/webfinger?" | 931 f"https://{host}/.well-known/webfinger?" |
909 f"resource=acct:{parse.quote_plus(account)}", | 932 f"resource=acct:{parse.quote_plus(account)}", |
910 )) | 933 )) |
911 except Exception as e: | 934 except Exception as e: |
912 raise exceptions.DataError(f"Can't get webfinger data: {e}") | 935 raise exceptions.DataError(f"Can't get webfinger data for {account!r}: {e}") |
913 for link in finger_data.get("links", []): | 936 for link in finger_data.get("links", []): |
914 if ( | 937 if ( |
915 link.get("type") == "application/activity+json" | 938 link.get("type") == "application/activity+json" |
916 and link.get("rel") == "self" | 939 and link.get("rel") == "self" |
917 ): | 940 ): |
933 @param account: ActivityPub Actor identifier | 956 @param account: ActivityPub Actor identifier |
934 """ | 957 """ |
935 href = await self.getAPActorIdFromAccount(account) | 958 href = await self.getAPActorIdFromAccount(account) |
936 return await self.apGet(href) | 959 return await self.apGet(href) |
937 | 960 |
938 @async_lru(maxsize=LRU_MAX_SIZE) | |
939 async def getAPInboxFromId(self, actor_id: str) -> str: | 961 async def getAPInboxFromId(self, actor_id: str) -> str: |
940 """Retrieve inbox of an actor_id""" | 962 """Retrieve inbox of an actor_id""" |
941 data = await self.apGet(actor_id) | 963 data = await self.getActorData(actor_id) |
942 return data["inbox"] | 964 return data["inbox"] |
943 | 965 |
944 @async_lru(maxsize=LRU_MAX_SIZE) | 966 @async_lru(maxsize=LRU_MAX_SIZE) |
945 async def getAPAccountFromId(self, actor_id: str) -> str: | 967 async def getAPAccountFromId(self, actor_id: str) -> str: |
946 """Retrieve AP account from the ID URL | 968 """Retrieve AP account from the ID URL |
947 | 969 |
948 @param actor_id: AP ID of the actor (URL to the actor data) | 970 @param actor_id: AP ID of the actor (URL to the actor data) |
949 """ | 971 """ |
950 url_parsed = parse.urlparse(actor_id) | 972 url_parsed = parse.urlparse(actor_id) |
951 actor_data = await self.apGet(actor_id) | 973 actor_data = await self.getActorData(actor_id) |
952 username = actor_data.get("preferredUsername") | 974 username = actor_data.get("preferredUsername") |
953 if not username: | 975 if not username: |
954 raise exceptions.DataError( | 976 raise exceptions.DataError( |
955 'No "preferredUsername" field found, can\'t retrieve actor account' | 977 'No "preferredUsername" field found, can\'t retrieve actor account' |
956 ) | 978 ) |
1267 log.warning(f"no content found:\n{ap_object!r}") | 1289 log.warning(f"no content found:\n{ap_object!r}") |
1268 raise exceptions.DataError | 1290 raise exceptions.DataError |
1269 else: | 1291 else: |
1270 mb_data["language"] = language | 1292 mb_data["language"] = language |
1271 mb_data["content_xhtml"] = content_xhtml | 1293 mb_data["content_xhtml"] = content_xhtml |
1294 if not mb_data.get("content"): | |
1295 mb_data["content"] = await self._t.convert( | |
1296 content_xhtml, | |
1297 self._t.SYNTAX_XHTML, | |
1298 self._t.SYNTAX_TEXT, | |
1299 False, | |
1300 ) | |
1272 | 1301 |
1273 # author | 1302 # author |
1274 if is_activity: | 1303 if is_activity: |
1275 authors = await self.apGetActors(ap_item, "actor") | 1304 authors = await self.apGetActors(ap_item, "actor") |
1276 else: | 1305 else: |
1377 | 1406 |
1378 return self.buildAPURL( | 1407 return self.buildAPURL( |
1379 TYPE_ITEM, parent_ap_account, parent_item | 1408 TYPE_ITEM, parent_ap_account, parent_item |
1380 ) | 1409 ) |
1381 | 1410 |
1382 async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: | 1411 async def mbdata2APitem( |
1383 """Convert Libervia Microblog Data to ActivityPub item""" | 1412 self, |
1384 try: | 1413 client: SatXMPPEntity, |
1385 node = mb_data["node"] | 1414 mb_data: dict, |
1386 service = jid.JID(mb_data["service"]) | 1415 public=True |
1387 except KeyError: | 1416 ) -> dict: |
1388 # node and service must always be specified when this method is used | 1417 """Convert Libervia Microblog Data to ActivityPub item |
1389 raise exceptions.InternalError( | 1418 |
1390 "node or service is missing in mb_data" | 1419 @param mb_data: microblog data (as used in plugin XEP-0277) to convert |
1391 ) | 1420 If ``public`` is True, ``service`` and ``node`` keys must be set. |
1421 If ``published`` is not set, current datetime will be used | |
1422 @param public: True if the message is not a private/direct one | |
1423 if True, the AP Item will be marked as public, and AP followers of target AP | |
1424 account (which retrieve from ``service``) will be put in ``cc``. | |
1425 ``inReplyTo`` will also be set if suitable | |
1426 if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). | |
1427 This is usually used for direct messages. | |
1428 @return: AP item | |
1429 """ | |
1392 if not mb_data.get("id"): | 1430 if not mb_data.get("id"): |
1393 mb_data["id"] = shortuuid.uuid() | 1431 mb_data["id"] = shortuuid.uuid() |
1394 if not mb_data.get("author_jid"): | 1432 if not mb_data.get("author_jid"): |
1395 mb_data["author_jid"] = client.jid.full() | 1433 mb_data["author_jid"] = client.jid.userhost() |
1396 ap_account = await self.getAPAccountFromJidAndNode( | 1434 ap_account = await self.getAPAccountFromJidAndNode( |
1397 jid.JID(mb_data["author_jid"]), | 1435 jid.JID(mb_data["author_jid"]), |
1398 None | 1436 None |
1399 ) | 1437 ) |
1400 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) | 1438 url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) |
1401 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) | 1439 url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) |
1402 ap_object = { | 1440 ap_object = { |
1403 "id": url_item, | 1441 "id": url_item, |
1404 "type": "Note", | 1442 "type": "Note", |
1405 "published": utils.xmpp_date(mb_data["published"]), | 1443 "published": utils.xmpp_date(mb_data.get("published")), |
1406 "attributedTo": url_actor, | 1444 "attributedTo": url_actor, |
1407 "content": mb_data.get("content_xhtml") or mb_data["content"], | 1445 "content": mb_data.get("content_xhtml") or mb_data["content"], |
1408 "to": ["https://www.w3.org/ns/activitystreams#Public"], | |
1409 } | 1446 } |
1410 | 1447 |
1411 target_ap_account = self._e.unescape(service.user) | 1448 language = mb_data.get("language") |
1412 actor_data = await self.getAPActorDataFromAccount(target_ap_account) | 1449 if language: |
1413 followers = actor_data.get("followers") | 1450 ap_object["contentMap"] = {language: ap_object["content"]} |
1414 if followers: | 1451 |
1415 ap_object["cc"] = [followers] | 1452 if public: |
1453 ap_object["to"] = [NS_AP_PUBLIC] | |
1454 try: | |
1455 node = mb_data["node"] | |
1456 service = jid.JID(mb_data["service"]) | |
1457 except KeyError: | |
1458 # node and service must always be specified when this method is used | |
1459 raise exceptions.InternalError( | |
1460 "node or service is missing in mb_data" | |
1461 ) | |
1462 target_ap_account = await self.getAPAccountFromJidAndNode( | |
1463 service, node | |
1464 ) | |
1465 if service.host == self.client.jid.userhost: | |
1466 # service is a proxy JID for AP account | |
1467 actor_data = await self.getAPActorDataFromAccount(target_ap_account) | |
1468 followers = actor_data.get("followers") | |
1469 else: | |
1470 # service is a real XMPP entity | |
1471 followers = self.buildAPURL(TYPE_FOLLOWERS, target_ap_account) | |
1472 if followers: | |
1473 ap_object["cc"] = [followers] | |
1474 if self._m.isCommentNode(node): | |
1475 parent_item = self._m.getParentItem(node) | |
1476 if service.host == self.client.jid.userhost(): | |
1477 # the publication is on a virtual node (i.e. an XMPP node managed by | |
1478 # this gateway and linking to an ActivityPub actor) | |
1479 ap_object["inReplyTo"] = parent_item | |
1480 else: | |
1481 # the publication is from a followed real XMPP node | |
1482 ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode( | |
1483 client, | |
1484 ap_account, | |
1485 parent_item, | |
1486 mb_data | |
1487 ) | |
1416 | 1488 |
1417 ap_item = { | 1489 ap_item = { |
1418 "@context": "https://www.w3.org/ns/activitystreams", | 1490 "@context": "https://www.w3.org/ns/activitystreams", |
1419 "id": url_item, | 1491 "id": url_item, |
1420 "type": "Create", | 1492 "type": "Create", |
1421 "actor": url_actor, | 1493 "actor": url_actor, |
1422 | |
1423 "object": ap_object | 1494 "object": ap_object |
1424 } | 1495 } |
1425 language = mb_data.get("language") | |
1426 if language: | |
1427 ap_object["contentMap"] = {language: ap_object["content"]} | |
1428 if self._m.isCommentNode(node): | |
1429 parent_item = self._m.getParentItem(node) | |
1430 if service.host == self.client.jid.userhost(): | |
1431 # the publication is on a virtual node (i.e. an XMPP node managed by | |
1432 # this gateway and linking to an ActivityPub actor) | |
1433 ap_object["inReplyTo"] = parent_item | |
1434 else: | |
1435 # the publication is from a followed real XMPP node | |
1436 ap_object["inReplyTo"] = await self.getReplyToIdFromXMPPNode( | |
1437 client, | |
1438 ap_account, | |
1439 parent_item, | |
1440 mb_data | |
1441 ) | |
1442 | 1496 |
1443 return ap_item | 1497 return ap_item |
1444 | 1498 |
1445 async def publishMessage( | 1499 async def publishMessage( |
1446 self, | 1500 self, |
1478 item_data = await self.mbdata2APitem(client, mess_data) | 1532 item_data = await self.mbdata2APitem(client, mess_data) |
1479 url_actor = item_data["object"]["attributedTo"] | 1533 url_actor = item_data["object"]["attributedTo"] |
1480 resp = await self.signAndPost(inbox_url, url_actor, item_data) | 1534 resp = await self.signAndPost(inbox_url, url_actor, item_data) |
1481 if resp.code != 202: | 1535 if resp.code != 202: |
1482 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") | 1536 raise exceptions.NetworkError(f"unexpected return code: {resp.code}") |
1537 | |
1538 @utils.ensure_deferred | |
1539 async def onMessage(self, message_elt: domish.Element) -> None: | |
1540 """Called when a XMPP message is received""" | |
1541 if not self.client: | |
1542 log.warning(f"no client set, ignoring message: {message_elt.toXml()}") | |
1543 return | |
1544 mess_type = message_elt.getAttribute("type") | |
1545 if mess_type and mess_type not in ("chat", "normal"): | |
1546 log.warning(f"ignoring message with unexpected type: {message_elt.toXml()}") | |
1547 return | |
1548 from_jid = jid.JID(message_elt["from"]) | |
1549 if not self.isLocal(from_jid): | |
1550 log.warning(f"ignoring non local message: {message_elt.toXml()}") | |
1551 return | |
1552 to_jid = jid.JID(message_elt["to"]) | |
1553 if not to_jid.user: | |
1554 log.warning( | |
1555 f"ignoring message addressed to gateway itself: {message_elt.toXml()}" | |
1556 ) | |
1557 return | |
1558 | |
1559 actor_account = self._e.unescape(to_jid.user) | |
1560 actor_id = await self.getAPActorIdFromAccount(actor_account) | |
1561 inbox = await self.getAPInboxFromId(actor_id) | |
1562 | |
1563 mb_data = { | |
1564 "content": str(message_elt.body), | |
1565 } | |
1566 client = self.client.getVirtualClient(from_jid) | |
1567 ap_item = await self.mbdata2APitem(client, mb_data, public=False) | |
1568 ap_item["object"]["to"] = ap_item["to"] = [actor_id] | |
1569 await self.signAndPost(inbox, ap_item["actor"], ap_item) | |
1483 | 1570 |
1484 async def newReplyToXMPPItem( | 1571 async def newReplyToXMPPItem( |
1485 self, | 1572 self, |
1486 client: SatXMPPEntity, | 1573 client: SatXMPPEntity, |
1487 ap_item: dict, | 1574 ap_item: dict, |
1542 | 1629 |
1543 @param destinee: jid of the destinee, | 1630 @param destinee: jid of the destinee, |
1544 @param node: XMPP pubsub node | 1631 @param node: XMPP pubsub node |
1545 @param item: AP object payload | 1632 @param item: AP object payload |
1546 """ | 1633 """ |
1634 targets: Set[str] = set() | |
1635 is_public = False | |
1636 # TODO: handle "audience" | |
1637 for key in ("to", "bto", "cc", "bcc"): | |
1638 values = item.get(key) | |
1639 if not values: | |
1640 continue | |
1641 if isinstance(values, str): | |
1642 values = [values] | |
1643 for value in values: | |
1644 if value in PUBLIC_TUPLE: | |
1645 is_public = True | |
1646 continue | |
1647 if not value: | |
1648 continue | |
1649 if not self.isLocalURL(value): | |
1650 continue | |
1651 targets.add(value) | |
1652 | |
1653 targets_types = {self.parseAPURL(t)[0] for t in targets} | |
1654 if not is_public and targets_types == {TYPE_ACTOR}: | |
1655 # this is a direct message | |
1656 await self.handleMessageAPItem( | |
1657 client, targets, destinee, item | |
1658 ) | |
1659 else: | |
1660 await self.handlePubsubAPItem( | |
1661 client, targets, destinee, node, item, is_public | |
1662 ) | |
1663 | |
1664 async def handleMessageAPItem( | |
1665 self, | |
1666 client: SatXMPPEntity, | |
1667 targets: Set[str], | |
1668 destinee: Optional[jid.JID], | |
1669 item: dict, | |
1670 ) -> None: | |
1671 """Parse and deliver direct AP items translating to XMPP messages | |
1672 | |
1673 @param targets: actors where the item must be delivered | |
1674 @param destinee: jid of the destinee, | |
1675 @param item: AP object payload | |
1676 """ | |
1677 targets_jids = {await self.getJIDFromId(t) for t in targets} | |
1678 if destinee is not None: | |
1679 targets_jids.add(destinee) | |
1680 mb_data = await self.apItem2MBdata(item) | |
1681 defer_l = [] | |
1682 for target_jid in targets_jids: | |
1683 defer_l.append( | |
1684 client.sendMessage( | |
1685 target_jid, | |
1686 {'': mb_data.get("content", "")}, | |
1687 mb_data.get("title"), | |
1688 | |
1689 ) | |
1690 ) | |
1691 await defer.DeferredList(defer_l) | |
1692 | |
1693 async def handlePubsubAPItem( | |
1694 self, | |
1695 client: SatXMPPEntity, | |
1696 targets: Set[str], | |
1697 destinee: Optional[jid.JID], | |
1698 node: str, | |
1699 item: dict, | |
1700 public: bool | |
1701 ) -> None: | |
1702 """Analyse, cache and deliver AP items translating to Pubsub | |
1703 | |
1704 @param targets: actors/collections where the item must be delivered | |
1705 @param destinee: jid of the destinee, | |
1706 @param node: XMPP pubsub node | |
1707 @param item: AP object payload | |
1708 @param public: True if the item is public | |
1709 """ | |
1710 # XXX: "public" is not used for now | |
1711 | |
1547 service = client.jid | 1712 service = client.jid |
1548 in_reply_to = item.get("inReplyTo") | 1713 in_reply_to = item.get("inReplyTo") |
1714 | |
1549 if in_reply_to and isinstance(in_reply_to, list): | 1715 if in_reply_to and isinstance(in_reply_to, list): |
1550 in_reply_to = in_reply_to[0] | 1716 in_reply_to = in_reply_to[0] |
1551 if in_reply_to and isinstance(in_reply_to, str): | 1717 if in_reply_to and isinstance(in_reply_to, str): |
1552 if self.isLocalURL(in_reply_to): | 1718 if self.isLocalURL(in_reply_to): |
1553 # this is a reply to an XMPP item | 1719 # this is a reply to an XMPP item |
1605 self.pubsub_service.notifyPublish( | 1771 self.pubsub_service.notifyPublish( |
1606 service, | 1772 service, |
1607 node, | 1773 node, |
1608 [(subscription.subscriber, None, [item_elt])] | 1774 [(subscription.subscriber, None, [item_elt])] |
1609 ) | 1775 ) |
1776 |