diff sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3904:0aa7023dcd08

component AP gateway: events: - XMPP Events <=> AP Events conversion - `Join`/`Leave` activities are converted to RSVP attachments and vice versa - fix caching/notification on item published on a virtual pubsub node - add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account - handle `Update` activity - on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of the item is already present in cache - `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the actor mapping the same JID with the Events node (i.e. it links to the Events node of the entity) - fix subscription to nodes which are not the microblog one rel 372
author Goffi <goffi@goffi.org>
date Thu, 22 Sep 2022 00:01:41 +0200
parents aa7197b67c26
children 6fa4ca0c047e
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Wed Sep 21 22:43:55 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Thu Sep 22 00:01:41 2022 +0200
@@ -20,11 +20,22 @@
 import calendar
 import hashlib
 import json
+from os import access
 from pathlib import Path
 from pprint import pformat
 import re
 from typing import (
-    Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload
+    Any,
+    Awaitable,
+    Callable,
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+    Union,
+    overload,
 )
 from urllib import parse
 
@@ -34,6 +45,7 @@
 from cryptography.hazmat.primitives.asymmetric import rsa
 from cryptography.hazmat.primitives.asymmetric import padding
 import dateutil
+from dateutil.parser import parserinfo
 import shortuuid
 from sqlalchemy.exc import IntegrityError
 import treq
@@ -42,19 +54,21 @@
 from twisted.web import http
 from twisted.words.protocols.jabber import error, jid
 from twisted.words.xish import domish
-from wokkel import rsm, pubsub
+from wokkel import pubsub, rsm
 
 from sat.core import exceptions
 from sat.core.constants import Const as C
 from sat.core.core_types import SatXMPPEntity
 from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.memory.sqla_mapping import SubscriptionState, History
 from sat.memory import persistent
+from sat.memory.sqla_mapping import History, SubscriptionState
 from sat.tools import utils
-from sat.tools.common import data_format, tls, uri
+from sat.tools.common import data_format, date_utils, tls, uri
 from sat.tools.common.async_utils import async_lru
 
+from .ad_hoc import APAdHocService
+from .events import APEvents
 from .constants import (
     ACTIVITY_OBJECT_MANDATORY,
     ACTIVITY_TARGET_MANDATORY,
@@ -65,20 +79,23 @@
     IMPORT_NAME,
     LRU_MAX_SIZE,
     MEDIA_TYPE_AP,
-    TYPE_ACTOR,
-    TYPE_ITEM,
-    TYPE_FOLLOWERS,
-    TYPE_TOMBSTONE,
-    TYPE_MENTION,
-    TYPE_LIKE,
-    TYPE_REACTION,
     NS_AP,
     NS_AP_PUBLIC,
-    PUBLIC_TUPLE
+    PUBLIC_TUPLE,
+    TYPE_ACTOR,
+    TYPE_EVENT,
+    TYPE_FOLLOWERS,
+    TYPE_ITEM,
+    TYPE_LIKE,
+    TYPE_MENTION,
+    TYPE_REACTION,
+    TYPE_TOMBSTONE,
+    TYPE_JOIN,
+    TYPE_LEAVE
 )
-from .regex import RE_MENTION
 from .http_server import HTTPServer
 from .pubsub_service import APPubsubService
+from .regex import RE_MENTION
 
 
 log = getLogger(__name__)
@@ -92,9 +109,9 @@
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
     C.PI_DEPENDENCIES: [
-        "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292",
-        "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", "PUBSUB_CACHE",
-        "TEXT_SYNTAXES", "IDENTITY"
+        "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277",
+        "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470",
+        "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", "EVENTS"
     ],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
@@ -134,6 +151,7 @@
         self._t = host.plugins["TEXT_SYNTAXES"]
         self._i = host.plugins["IDENTITY"]
         self._pa = host.plugins["XEP-0470"]
+        self._events = host.plugins["EVENTS"]
         self._p.addManagedNode(
             "",
             items_cb=self._itemsReceived,
@@ -143,6 +161,8 @@
             priority=1000
         )
         self.pubsub_service = APPubsubService(self)
+        self.ad_hoc = APAdHocService(self)
+        self.ap_events = APEvents(self)
         host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000)
         host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract)
         host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived)
@@ -266,6 +286,9 @@
         )
         await self.init(client)
 
+    def profileConnected(self, client):
+        self.ad_hoc.init(client)
+
     async def _itemsReceived(
         self,
         client: SatXMPPEntity,
@@ -313,7 +336,7 @@
         local_jid = await self.getJIDFromId(actor_id)
         return self.client.getVirtualClient(local_jid)
 
-    def isActivity(self, data: dict) -> bool:
+    def is_activity(self, data: dict) -> bool:
         """Return True if the data has an activity type"""
         try:
             return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
@@ -434,12 +457,23 @@
                 except IndexError:
                     raise exceptions.NotFound("requested items can't be found")
 
-            mb_data = await self._m.item2mbdata(
-                self.client, found_item, author_jid, node
-            )
-            ap_item = await self.mbdata2APitem(self.client, mb_data)
-            # the URL must return the object and not the activity
-            return ap_item["object"]
+            if node.startswith(self._events.namespace):
+                # this is an event
+                event_data = self._events.event_elt_2_event_data(found_item)
+                ap_item = await self.ap_events.event_data_2_ap_item(
+                    event_data, author_jid
+                )
+                # the URL must return the object and not the activity
+                ap_item["object"]["@context"] = ap_item["@context"]
+                return ap_item["object"]
+            else:
+                # this is a blog item
+                mb_data = await self._m.item2mbdata(
+                    self.client, found_item, author_jid, node
+                )
+                ap_item = await self.mb_data_2_ap_item(self.client, mb_data)
+                # the URL must return the object and not the activity
+                return ap_item["object"]
         else:
             raise NotImplementedError(
                 'only object from "item" URLs can be retrieved for now'
@@ -531,22 +565,22 @@
     ) -> str:
         """Retrieve actor who sent data
 
-        This is done by checking "attributedTo" field first, then "actor" field.
-        Only the first found actor is taken into accoun
+        This is done by checking "actor" field first, then "attributedTo" field.
+        Only the first found actor is taken into account
         @param data: AP object
         @return: actor id of the sender
         @raise exceptions.NotFound: no actor has been found in data
         """
         try:
-            actors = await self.apGetActors(data, "attributedTo", as_account=False)
+            actors = await self.apGetActors(data, "actor", as_account=False)
         except exceptions.DataError:
             actors = None
         if not actors:
             try:
-                actors = await self.apGetActors(data, "actor", as_account=False)
+                actors = await self.apGetActors(data, "attributedTo", as_account=False)
             except exceptions.DataError:
                 raise exceptions.NotFound(
-                    'actor not specified in "attributedTo" or "actor"'
+                    'actor not specified in "actor" or "attributedTo"'
                 )
         try:
             return actors[0]
@@ -880,7 +914,7 @@
         if activity_id is None:
             activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}"
         data: Dict[str, Any] = {
-            "@context": NS_AP,
+            "@context": [NS_AP],
             "actor": actor_id,
             "id": activity_id,
             "type": activity,
@@ -993,37 +1027,82 @@
             published
         @param node: (virtual) node corresponding where the item has been published
         @param subscribe_extra_nodes: if True, extra data nodes will be automatically
-        subscribed, that is comment nodes if present and attachments nodes.
+            subscribed, that is comment nodes if present and attachments nodes.
         """
         actor_id = await self.getAPActorIdFromAccount(ap_account)
         inbox = await self.getAPInboxFromId(actor_id)
         for item in items:
             if item.name == "item":
-                mb_data = await self._m.item2mbdata(client, item, service, node)
-                author_jid = jid.JID(mb_data["author_jid"])
-                if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
-                    # we subscribe automatically to comment nodes if any
-                    recipient_jid = self.getLocalJIDFromAccount(ap_account)
-                    recipient_client = self.client.getVirtualClient(recipient_jid)
-                    for comment_data in mb_data.get("comments", []):
-                        comment_service = jid.JID(comment_data["service"])
-                        if self.isVirtualJID(comment_service):
-                            log.debug(f"ignoring virtual comment service: {comment_data}")
-                            continue
-                        comment_node = comment_data["node"]
-                        await self._p.subscribe(
-                            recipient_client, comment_service, comment_node
-                        )
+                cached_item = await self.host.memory.storage.searchPubsubItems({
+                    "profiles": [self.client.profile],
+                    "services": [service],
+                    "nodes": [node],
+                    "names": [item["id"]]
+                })
+                is_new = not bool(cached_item)
+                if node.startswith(self._events.namespace):
+                    # event item
+                    event_data = self._events.event_elt_2_event_data(item)
                     try:
-                        await self._pa.subscribe(
-                            recipient_client, service, node, mb_data["id"]
-                        )
-                    except exceptions.NotFound:
-                        log.debug(
-                            f"no attachment node found for item {mb_data['id']!r} on "
-                            f"{node!r} at {service}"
-                        )
-                ap_item = await self.mbdata2APitem(client, mb_data)
+                        author_jid = jid.JID(item["publisher"]).userhostJID()
+                    except (KeyError, RuntimeWarning):
+                        root_elt = item
+                        while root_elt.parent is not None:
+                            root_elt = root_elt.parent
+                        author_jid = jid.JID(root_elt["from"]).userhostJID()
+                    if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
+                        # we subscribe automatically to comment nodes if any
+                        recipient_jid = self.getLocalJIDFromAccount(ap_account)
+                        recipient_client = self.client.getVirtualClient(recipient_jid)
+                        comments_data = event_data.get("comments")
+                        if comments_data:
+                            comment_service = jid.JID(comments_data["jid"])
+                            comment_node = comments_data["node"]
+                            await self._p.subscribe(
+                                recipient_client, comment_service, comment_node
+                            )
+                        try:
+                            await self._pa.subscribe(
+                                recipient_client, service, node, event_data["id"]
+                            )
+                        except exceptions.NotFound:
+                            log.debug(
+                                f"no attachment node found for item {event_data['id']!r} "
+                                f"on {node!r} at {service}"
+                            )
+                    ap_item = await self.ap_events.event_data_2_ap_item(
+                        event_data, author_jid, is_new=is_new
+                    )
+                else:
+                    # blog item
+                    mb_data = await self._m.item2mbdata(client, item, service, node)
+                    author_jid = jid.JID(mb_data["author_jid"])
+                    if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
+                        # we subscribe automatically to comment nodes if any
+                        recipient_jid = self.getLocalJIDFromAccount(ap_account)
+                        recipient_client = self.client.getVirtualClient(recipient_jid)
+                        for comment_data in mb_data.get("comments", []):
+                            comment_service = jid.JID(comment_data["service"])
+                            if self.isVirtualJID(comment_service):
+                                log.debug(
+                                    f"ignoring virtual comment service: {comment_data}"
+                                )
+                                continue
+                            comment_node = comment_data["node"]
+                            await self._p.subscribe(
+                                recipient_client, comment_service, comment_node
+                            )
+                        try:
+                            await self._pa.subscribe(
+                                recipient_client, service, node, mb_data["id"]
+                            )
+                        except exceptions.NotFound:
+                            log.debug(
+                                f"no attachment node found for item {mb_data['id']!r} on "
+                                f"{node!r} at {service}"
+                            )
+                    ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new)
+
                 url_actor = ap_item["actor"]
             elif item.name == "retract":
                 url_actor, ap_item = await self.apDeleteItem(
@@ -1134,22 +1213,22 @@
             if not "noticed" in old_attachment:
                 # new "noticed" attachment, we translate to "Like" activity
                 activity_id = self.buildAPURL("like", item_account, item_id)
-                like = self.createActivity(
+                activity = self.createActivity(
                     TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
                 )
-                like["to"] = [ap_account]
-                like["cc"] = [NS_AP_PUBLIC]
-                await self.signAndPost(inbox, publisher_actor_id, like)
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                await self.signAndPost(inbox, publisher_actor_id, activity)
         else:
             if "noticed" in old_attachment:
                 # "noticed" attachment has been removed, we undo the "Like" activity
                 activity_id = self.buildAPURL("like", item_account, item_id)
-                like = self.createActivity(
+                activity = self.createActivity(
                     TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
                 )
-                like["to"] = [ap_account]
-                like["cc"] = [NS_AP_PUBLIC]
-                undo = self.createActivity("Undo", publisher_actor_id, like)
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                undo = self.createActivity("Undo", publisher_actor_id, activity)
                 await self.signAndPost(inbox, publisher_actor_id, undo)
 
         # reactions
@@ -1177,6 +1256,31 @@
                     activy = reaction_activity
                 await self.signAndPost(inbox, publisher_actor_id, activy)
 
+        # RSVP
+        if "rsvp" in attachments:
+            attending = attachments["rsvp"].get("attending", "no")
+            old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
+            if attending != old_attending:
+                activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE
+                activity_id = self.buildAPURL(activity_type.lower(), item_account, item_id)
+                activity = self.createActivity(
+                    activity_type, publisher_actor_id, item_url, activity_id=activity_id
+                )
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                await self.signAndPost(inbox, publisher_actor_id, activity)
+        else:
+            if "rsvp" in old_attachment:
+                old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
+                if old_attending == "yes":
+                    activity_id = self.buildAPURL(TYPE_LEAVE.lower(), item_account, item_id)
+                    activity = self.createActivity(
+                        TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id
+                    )
+                    activity["to"] = [ap_account]
+                    activity["cc"] = [NS_AP_PUBLIC]
+                    await self.signAndPost(inbox, publisher_actor_id, activity)
+
         if service.user and self.isVirtualJID(service):
             # the item is on a virtual service, we need to store it in cache
             log.debug("storing attachments item in cache")
@@ -1392,7 +1496,7 @@
             Items are always returned in chronological order in the result
         """
         if parser is None:
-            parser = self.apItem2Elt
+            parser = self.ap_item_2_mb_elt
 
         rsm_resp: Dict[str, Union[bool, int]] = {}
         try:
@@ -1520,7 +1624,7 @@
 
         return items, rsm.RSMResponse(**rsm_resp)
 
-    async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
+    async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
         """Convert AP item to parsed microblog data and corresponding item element"""
         mb_data = await self.apItem2MBdata(ap_item)
         item_elt = await self._m.data2entry(
@@ -1532,9 +1636,9 @@
             item_elt["publisher"] = mb_data["author_jid"]
         return mb_data, item_elt
 
-    async def apItem2Elt(self, ap_item: dict) -> domish.Element:
+    async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element:
         """Convert AP item to XMPP item element"""
-        __, item_elt = await self.apItem2MbDataAndElt(ap_item)
+        __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
         return item_elt
 
     async def parseAPPage(
@@ -1629,7 +1733,7 @@
         @raise NotImplementedError: some AP data is not handled yet
         @raise error.StanzaError: error while contacting the AP server
         """
-        is_activity = self.isActivity(ap_item)
+        is_activity = self.is_activity(ap_item)
         if is_activity:
             ap_object = await self.apGetObject(ap_item, "object")
             if not ap_object:
@@ -1839,11 +1943,12 @@
         ]
         return announce
 
-    async def mbdata2APitem(
+    async def mb_data_2_ap_item(
         self,
         client: SatXMPPEntity,
         mb_data: dict,
-        public=True
+        public: bool =True,
+        is_new: bool = True,
     ) -> dict:
         """Convert Libervia Microblog Data to ActivityPub item
 
@@ -1856,6 +1961,10 @@
             ``inReplyTo`` will also be set if suitable
             if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
             This is usually used for direct messages.
+        @param is_new: if True, the item is a new one (no instance has been found in
+            cache).
+            If True, a "Create" activity will be generated, otherwise an "Update" one will
+            be.
         @return: Activity item
         """
         extra = mb_data.get("extra", {})
@@ -1941,7 +2050,7 @@
                     )
 
         return self.createActivity(
-            "Create", url_actor, ap_object, activity_id=url_item
+            "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item
         )
 
     async def publishMessage(
@@ -1977,7 +2086,7 @@
         except KeyError:
             raise exceptions.DataError("Can't get ActivityPub actor inbox")
 
-        item_data = await self.mbdata2APitem(client, mess_data)
+        item_data = await self.mb_data_2_ap_item(client, mess_data)
         url_actor = item_data["actor"]
         resp = await self.signAndPost(inbox_url, url_actor, item_data)
 
@@ -2096,7 +2205,7 @@
             # we need to use origin ID when present to be able to retract the message
             mb_data["id"] = origin_id
         client = self.client.getVirtualClient(mess_data["from"])
-        ap_item = await self.mbdata2APitem(client, mb_data, public=False)
+        ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False)
         ap_item["object"]["to"] = ap_item["to"] = [actor_id]
         await self.signAndPost(inbox, ap_item["actor"], ap_item)
         return mess_data
@@ -2205,7 +2314,7 @@
         mb_data = await self._m.item2mbdata(
             client, cached_item.data, pubsub_service, pubsub_node
         )
-        ap_item = await self.mbdata2APitem(client, mb_data)
+        ap_item = await self.mb_data_2_ap_item(client, mb_data)
         ap_object = ap_item["object"]
         ap_object["to"] = [actor_id]
         ap_object.setdefault("tag", []).append({
@@ -2271,7 +2380,7 @@
             log.info(f"{ppElt(parent_item_elt.toXml())}")
             raise NotImplementedError()
         else:
-            __, item_elt = await self.apItem2MbDataAndElt(ap_item)
+            __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
             await self._p.publish(client, comment_service, comment_node, [item_elt])
             await self.notifyMentions(
                 targets, mentions, comment_service, comment_node, item_elt["id"]
@@ -2454,7 +2563,6 @@
         @param public: True if the item is public
         """
         # XXX: "public" is not used for now
-
         service = client.jid
         in_reply_to = item.get("inReplyTo")
 
@@ -2476,8 +2584,9 @@
             )
         else:
             # it is a root item (i.e. not a reply to an other item)
+            create = node == self._events.namespace
             cached_node = await self.host.memory.storage.getPubsubNode(
-                client, service, node, with_subscriptions=True
+                client, service, node, with_subscriptions=True, create=create
             )
             if cached_node is None:
                 log.warning(
@@ -2486,12 +2595,15 @@
 
                 )
                 return
-        mb_data, item_elt = await self.apItem2MbDataAndElt(item)
+        if item.get("type") == TYPE_EVENT:
+            data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item)
+        else:
+            data, item_elt = await self.ap_item_2_mb_data_and_elt(item)
         await self.host.memory.storage.cachePubsubItems(
             client,
             cached_node,
             [item_elt],
-            [mb_data]
+            [data]
         )
 
         for subscription in cached_node.subscriptions: