Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3904:0aa7023dcd08
component AP gateway: events:
- XMPP Events <=> AP Events conversion
- `Join`/`Leave` activities are converted to RSVP attachments and vice versa
- fix caching/notification on item published on a virtual pubsub node
- add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account
- handle `Update` activity
- on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of
the item is already present in cache
- `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the
actor mapping the same JID with the Events node (i.e. it links to the Events node of the
entity)
- fix subscription to nodes which are not the microblog one
rel 372
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 22 Sep 2022 00:01:41 +0200 |
parents | aa7197b67c26 |
children | 6fa4ca0c047e |
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Wed Sep 21 22:43:55 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Thu Sep 22 00:01:41 2022 +0200 @@ -20,11 +20,22 @@ import calendar import hashlib import json +from os import access from pathlib import Path from pprint import pformat import re from typing import ( - Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload + Any, + Awaitable, + Callable, + Dict, + List, + Optional, + Set, + Tuple, + Type, + Union, + overload, ) from urllib import parse @@ -34,6 +45,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import padding import dateutil +from dateutil.parser import parserinfo import shortuuid from sqlalchemy.exc import IntegrityError import treq @@ -42,19 +54,21 @@ from twisted.web import http from twisted.words.protocols.jabber import error, jid from twisted.words.xish import domish -from wokkel import rsm, pubsub +from wokkel import pubsub, rsm from sat.core import exceptions from sat.core.constants import Const as C from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger -from sat.memory.sqla_mapping import SubscriptionState, History from sat.memory import persistent +from sat.memory.sqla_mapping import History, SubscriptionState from sat.tools import utils -from sat.tools.common import data_format, tls, uri +from sat.tools.common import data_format, date_utils, tls, uri from sat.tools.common.async_utils import async_lru +from .ad_hoc import APAdHocService +from .events import APEvents from .constants import ( ACTIVITY_OBJECT_MANDATORY, ACTIVITY_TARGET_MANDATORY, @@ -65,20 +79,23 @@ IMPORT_NAME, LRU_MAX_SIZE, MEDIA_TYPE_AP, - TYPE_ACTOR, - TYPE_ITEM, - TYPE_FOLLOWERS, - TYPE_TOMBSTONE, - TYPE_MENTION, - TYPE_LIKE, - TYPE_REACTION, NS_AP, NS_AP_PUBLIC, - PUBLIC_TUPLE + PUBLIC_TUPLE, + TYPE_ACTOR, + TYPE_EVENT, + TYPE_FOLLOWERS, + TYPE_ITEM, + TYPE_LIKE, + TYPE_MENTION, + TYPE_REACTION, + TYPE_TOMBSTONE, + TYPE_JOIN, + TYPE_LEAVE ) -from .regex import RE_MENTION from .http_server import HTTPServer from .pubsub_service import APPubsubService +from .regex import RE_MENTION log = getLogger(__name__) @@ -92,9 +109,9 @@ C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ - "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", - "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", "PUBSUB_CACHE", - "TEXT_SYNTAXES", "IDENTITY" + "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", + "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", + "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", "EVENTS" ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", @@ -134,6 +151,7 @@ self._t = host.plugins["TEXT_SYNTAXES"] self._i = host.plugins["IDENTITY"] self._pa = host.plugins["XEP-0470"] + self._events = host.plugins["EVENTS"] self._p.addManagedNode( "", items_cb=self._itemsReceived, @@ -143,6 +161,8 @@ priority=1000 ) self.pubsub_service = APPubsubService(self) + self.ad_hoc = APAdHocService(self) + self.ap_events = APEvents(self) host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000) host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract) host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived) @@ -266,6 +286,9 @@ ) await self.init(client) + def profileConnected(self, client): + self.ad_hoc.init(client) + async def _itemsReceived( self, client: SatXMPPEntity, @@ -313,7 +336,7 @@ local_jid = await self.getJIDFromId(actor_id) return self.client.getVirtualClient(local_jid) - def isActivity(self, data: dict) -> bool: + def is_activity(self, data: dict) -> bool: """Return True if the data has an activity type""" try: return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER @@ -434,12 +457,23 @@ except IndexError: raise exceptions.NotFound("requested items can't be found") - mb_data = await self._m.item2mbdata( - self.client, found_item, author_jid, node - ) - ap_item = await self.mbdata2APitem(self.client, mb_data) - # the URL must return the object and not the activity - return ap_item["object"] + if node.startswith(self._events.namespace): + # this is an event + event_data = self._events.event_elt_2_event_data(found_item) + ap_item = await self.ap_events.event_data_2_ap_item( + event_data, author_jid + ) + # the URL must return the object and not the activity + ap_item["object"]["@context"] = ap_item["@context"] + return ap_item["object"] + else: + # this is a blog item + mb_data = await self._m.item2mbdata( + self.client, found_item, author_jid, node + ) + ap_item = await self.mb_data_2_ap_item(self.client, mb_data) + # the URL must return the object and not the activity + return ap_item["object"] else: raise NotImplementedError( 'only object from "item" URLs can be retrieved for now' @@ -531,22 +565,22 @@ ) -> str: """Retrieve actor who sent data - This is done by checking "attributedTo" field first, then "actor" field. - Only the first found actor is taken into accoun + This is done by checking "actor" field first, then "attributedTo" field. + Only the first found actor is taken into account @param data: AP object @return: actor id of the sender @raise exceptions.NotFound: no actor has been found in data """ try: - actors = await self.apGetActors(data, "attributedTo", as_account=False) + actors = await self.apGetActors(data, "actor", as_account=False) except exceptions.DataError: actors = None if not actors: try: - actors = await self.apGetActors(data, "actor", as_account=False) + actors = await self.apGetActors(data, "attributedTo", as_account=False) except exceptions.DataError: raise exceptions.NotFound( - 'actor not specified in "attributedTo" or "actor"' + 'actor not specified in "actor" or "attributedTo"' ) try: return actors[0] @@ -880,7 +914,7 @@ if activity_id is None: activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" data: Dict[str, Any] = { - "@context": NS_AP, + "@context": [NS_AP], "actor": actor_id, "id": activity_id, "type": activity, @@ -993,37 +1027,82 @@ published @param node: (virtual) node corresponding where the item has been published @param subscribe_extra_nodes: if True, extra data nodes will be automatically - subscribed, that is comment nodes if present and attachments nodes. + subscribed, that is comment nodes if present and attachments nodes. """ actor_id = await self.getAPActorIdFromAccount(ap_account) inbox = await self.getAPInboxFromId(actor_id) for item in items: if item.name == "item": - mb_data = await self._m.item2mbdata(client, item, service, node) - author_jid = jid.JID(mb_data["author_jid"]) - if subscribe_extra_nodes and not self.isVirtualJID(author_jid): - # we subscribe automatically to comment nodes if any - recipient_jid = self.getLocalJIDFromAccount(ap_account) - recipient_client = self.client.getVirtualClient(recipient_jid) - for comment_data in mb_data.get("comments", []): - comment_service = jid.JID(comment_data["service"]) - if self.isVirtualJID(comment_service): - log.debug(f"ignoring virtual comment service: {comment_data}") - continue - comment_node = comment_data["node"] - await self._p.subscribe( - recipient_client, comment_service, comment_node - ) + cached_item = await self.host.memory.storage.searchPubsubItems({ + "profiles": [self.client.profile], + "services": [service], + "nodes": [node], + "names": [item["id"]] + }) + is_new = not bool(cached_item) + if node.startswith(self._events.namespace): + # event item + event_data = self._events.event_elt_2_event_data(item) try: - await self._pa.subscribe( - recipient_client, service, node, mb_data["id"] - ) - except exceptions.NotFound: - log.debug( - f"no attachment node found for item {mb_data['id']!r} on " - f"{node!r} at {service}" - ) - ap_item = await self.mbdata2APitem(client, mb_data) + author_jid = jid.JID(item["publisher"]).userhostJID() + except (KeyError, RuntimeWarning): + root_elt = item + while root_elt.parent is not None: + root_elt = root_elt.parent + author_jid = jid.JID(root_elt["from"]).userhostJID() + if subscribe_extra_nodes and not self.isVirtualJID(author_jid): + # we subscribe automatically to comment nodes if any + recipient_jid = self.getLocalJIDFromAccount(ap_account) + recipient_client = self.client.getVirtualClient(recipient_jid) + comments_data = event_data.get("comments") + if comments_data: + comment_service = jid.JID(comments_data["jid"]) + comment_node = comments_data["node"] + await self._p.subscribe( + recipient_client, comment_service, comment_node + ) + try: + await self._pa.subscribe( + recipient_client, service, node, event_data["id"] + ) + except exceptions.NotFound: + log.debug( + f"no attachment node found for item {event_data['id']!r} " + f"on {node!r} at {service}" + ) + ap_item = await self.ap_events.event_data_2_ap_item( + event_data, author_jid, is_new=is_new + ) + else: + # blog item + mb_data = await self._m.item2mbdata(client, item, service, node) + author_jid = jid.JID(mb_data["author_jid"]) + if subscribe_extra_nodes and not self.isVirtualJID(author_jid): + # we subscribe automatically to comment nodes if any + recipient_jid = self.getLocalJIDFromAccount(ap_account) + recipient_client = self.client.getVirtualClient(recipient_jid) + for comment_data in mb_data.get("comments", []): + comment_service = jid.JID(comment_data["service"]) + if self.isVirtualJID(comment_service): + log.debug( + f"ignoring virtual comment service: {comment_data}" + ) + continue + comment_node = comment_data["node"] + await self._p.subscribe( + recipient_client, comment_service, comment_node + ) + try: + await self._pa.subscribe( + recipient_client, service, node, mb_data["id"] + ) + except exceptions.NotFound: + log.debug( + f"no attachment node found for item {mb_data['id']!r} on " + f"{node!r} at {service}" + ) + ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new) + url_actor = ap_item["actor"] elif item.name == "retract": url_actor, ap_item = await self.apDeleteItem( @@ -1134,22 +1213,22 @@ if not "noticed" in old_attachment: # new "noticed" attachment, we translate to "Like" activity activity_id = self.buildAPURL("like", item_account, item_id) - like = self.createActivity( + activity = self.createActivity( TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id ) - like["to"] = [ap_account] - like["cc"] = [NS_AP_PUBLIC] - await self.signAndPost(inbox, publisher_actor_id, like) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.signAndPost(inbox, publisher_actor_id, activity) else: if "noticed" in old_attachment: # "noticed" attachment has been removed, we undo the "Like" activity activity_id = self.buildAPURL("like", item_account, item_id) - like = self.createActivity( + activity = self.createActivity( TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id ) - like["to"] = [ap_account] - like["cc"] = [NS_AP_PUBLIC] - undo = self.createActivity("Undo", publisher_actor_id, like) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + undo = self.createActivity("Undo", publisher_actor_id, activity) await self.signAndPost(inbox, publisher_actor_id, undo) # reactions @@ -1177,6 +1256,31 @@ activy = reaction_activity await self.signAndPost(inbox, publisher_actor_id, activy) + # RSVP + if "rsvp" in attachments: + attending = attachments["rsvp"].get("attending", "no") + old_attending = old_attachment.get("rsvp", {}).get("attending", "no") + if attending != old_attending: + activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE + activity_id = self.buildAPURL(activity_type.lower(), item_account, item_id) + activity = self.createActivity( + activity_type, publisher_actor_id, item_url, activity_id=activity_id + ) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.signAndPost(inbox, publisher_actor_id, activity) + else: + if "rsvp" in old_attachment: + old_attending = old_attachment.get("rsvp", {}).get("attending", "no") + if old_attending == "yes": + activity_id = self.buildAPURL(TYPE_LEAVE.lower(), item_account, item_id) + activity = self.createActivity( + TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id + ) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.signAndPost(inbox, publisher_actor_id, activity) + if service.user and self.isVirtualJID(service): # the item is on a virtual service, we need to store it in cache log.debug("storing attachments item in cache") @@ -1392,7 +1496,7 @@ Items are always returned in chronological order in the result """ if parser is None: - parser = self.apItem2Elt + parser = self.ap_item_2_mb_elt rsm_resp: Dict[str, Union[bool, int]] = {} try: @@ -1520,7 +1624,7 @@ return items, rsm.RSMResponse(**rsm_resp) - async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: + async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]: """Convert AP item to parsed microblog data and corresponding item element""" mb_data = await self.apItem2MBdata(ap_item) item_elt = await self._m.data2entry( @@ -1532,9 +1636,9 @@ item_elt["publisher"] = mb_data["author_jid"] return mb_data, item_elt - async def apItem2Elt(self, ap_item: dict) -> domish.Element: + async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: """Convert AP item to XMPP item element""" - __, item_elt = await self.apItem2MbDataAndElt(ap_item) + __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) return item_elt async def parseAPPage( @@ -1629,7 +1733,7 @@ @raise NotImplementedError: some AP data is not handled yet @raise error.StanzaError: error while contacting the AP server """ - is_activity = self.isActivity(ap_item) + is_activity = self.is_activity(ap_item) if is_activity: ap_object = await self.apGetObject(ap_item, "object") if not ap_object: @@ -1839,11 +1943,12 @@ ] return announce - async def mbdata2APitem( + async def mb_data_2_ap_item( self, client: SatXMPPEntity, mb_data: dict, - public=True + public: bool =True, + is_new: bool = True, ) -> dict: """Convert Libervia Microblog Data to ActivityPub item @@ -1856,6 +1961,10 @@ ``inReplyTo`` will also be set if suitable if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). This is usually used for direct messages. + @param is_new: if True, the item is a new one (no instance has been found in + cache). + If True, a "Create" activity will be generated, otherwise an "Update" one will + be. @return: Activity item """ extra = mb_data.get("extra", {}) @@ -1941,7 +2050,7 @@ ) return self.createActivity( - "Create", url_actor, ap_object, activity_id=url_item + "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item ) async def publishMessage( @@ -1977,7 +2086,7 @@ except KeyError: raise exceptions.DataError("Can't get ActivityPub actor inbox") - item_data = await self.mbdata2APitem(client, mess_data) + item_data = await self.mb_data_2_ap_item(client, mess_data) url_actor = item_data["actor"] resp = await self.signAndPost(inbox_url, url_actor, item_data) @@ -2096,7 +2205,7 @@ # we need to use origin ID when present to be able to retract the message mb_data["id"] = origin_id client = self.client.getVirtualClient(mess_data["from"]) - ap_item = await self.mbdata2APitem(client, mb_data, public=False) + ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) ap_item["object"]["to"] = ap_item["to"] = [actor_id] await self.signAndPost(inbox, ap_item["actor"], ap_item) return mess_data @@ -2205,7 +2314,7 @@ mb_data = await self._m.item2mbdata( client, cached_item.data, pubsub_service, pubsub_node ) - ap_item = await self.mbdata2APitem(client, mb_data) + ap_item = await self.mb_data_2_ap_item(client, mb_data) ap_object = ap_item["object"] ap_object["to"] = [actor_id] ap_object.setdefault("tag", []).append({ @@ -2271,7 +2380,7 @@ log.info(f"{ppElt(parent_item_elt.toXml())}") raise NotImplementedError() else: - __, item_elt = await self.apItem2MbDataAndElt(ap_item) + __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) await self._p.publish(client, comment_service, comment_node, [item_elt]) await self.notifyMentions( targets, mentions, comment_service, comment_node, item_elt["id"] @@ -2454,7 +2563,6 @@ @param public: True if the item is public """ # XXX: "public" is not used for now - service = client.jid in_reply_to = item.get("inReplyTo") @@ -2476,8 +2584,9 @@ ) else: # it is a root item (i.e. not a reply to an other item) + create = node == self._events.namespace cached_node = await self.host.memory.storage.getPubsubNode( - client, service, node, with_subscriptions=True + client, service, node, with_subscriptions=True, create=create ) if cached_node is None: log.warning( @@ -2486,12 +2595,15 @@ ) return - mb_data, item_elt = await self.apItem2MbDataAndElt(item) + if item.get("type") == TYPE_EVENT: + data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) + else: + data, item_elt = await self.ap_item_2_mb_data_and_elt(item) await self.host.memory.storage.cachePubsubItems( client, cached_node, [item_elt], - [mb_data] + [data] ) for subscription in cached_node.subscriptions: