changeset 3904:0aa7023dcd08

component AP gateway: events: - XMPP Events <=> AP Events conversion - `Join`/`Leave` activities are converted to RSVP attachments and vice versa - fix caching/notification on item published on a virtual pubsub node - add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account - handle `Update` activity - on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of the item is already present in cache - `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the actor mapping the same JID with the Events node (i.e. it links to the Events node of the entity) - fix subscription to nodes which are not the microblog one rel 372
author Goffi <goffi@goffi.org>
date Thu, 22 Sep 2022 00:01:41 +0200
parents 384b7e6c2dbf
children 92482cc80d0b
files sat/plugins/plugin_comp_ap_gateway/__init__.py sat/plugins/plugin_comp_ap_gateway/ad_hoc.py sat/plugins/plugin_comp_ap_gateway/constants.py sat/plugins/plugin_comp_ap_gateway/events.py sat/plugins/plugin_comp_ap_gateway/http_server.py sat/plugins/plugin_comp_ap_gateway/pubsub_service.py
diffstat 6 files changed, 810 insertions(+), 105 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_comp_ap_gateway/__init__.py	Wed Sep 21 22:43:55 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py	Thu Sep 22 00:01:41 2022 +0200
@@ -20,11 +20,22 @@
 import calendar
 import hashlib
 import json
+from os import access
 from pathlib import Path
 from pprint import pformat
 import re
 from typing import (
-    Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload
+    Any,
+    Awaitable,
+    Callable,
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+    Union,
+    overload,
 )
 from urllib import parse
 
@@ -34,6 +45,7 @@
 from cryptography.hazmat.primitives.asymmetric import rsa
 from cryptography.hazmat.primitives.asymmetric import padding
 import dateutil
+from dateutil.parser import parserinfo
 import shortuuid
 from sqlalchemy.exc import IntegrityError
 import treq
@@ -42,19 +54,21 @@
 from twisted.web import http
 from twisted.words.protocols.jabber import error, jid
 from twisted.words.xish import domish
-from wokkel import rsm, pubsub
+from wokkel import pubsub, rsm
 
 from sat.core import exceptions
 from sat.core.constants import Const as C
 from sat.core.core_types import SatXMPPEntity
 from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.memory.sqla_mapping import SubscriptionState, History
 from sat.memory import persistent
+from sat.memory.sqla_mapping import History, SubscriptionState
 from sat.tools import utils
-from sat.tools.common import data_format, tls, uri
+from sat.tools.common import data_format, date_utils, tls, uri
 from sat.tools.common.async_utils import async_lru
 
+from .ad_hoc import APAdHocService
+from .events import APEvents
 from .constants import (
     ACTIVITY_OBJECT_MANDATORY,
     ACTIVITY_TARGET_MANDATORY,
@@ -65,20 +79,23 @@
     IMPORT_NAME,
     LRU_MAX_SIZE,
     MEDIA_TYPE_AP,
-    TYPE_ACTOR,
-    TYPE_ITEM,
-    TYPE_FOLLOWERS,
-    TYPE_TOMBSTONE,
-    TYPE_MENTION,
-    TYPE_LIKE,
-    TYPE_REACTION,
     NS_AP,
     NS_AP_PUBLIC,
-    PUBLIC_TUPLE
+    PUBLIC_TUPLE,
+    TYPE_ACTOR,
+    TYPE_EVENT,
+    TYPE_FOLLOWERS,
+    TYPE_ITEM,
+    TYPE_LIKE,
+    TYPE_MENTION,
+    TYPE_REACTION,
+    TYPE_TOMBSTONE,
+    TYPE_JOIN,
+    TYPE_LEAVE
 )
-from .regex import RE_MENTION
 from .http_server import HTTPServer
 from .pubsub_service import APPubsubService
+from .regex import RE_MENTION
 
 
 log = getLogger(__name__)
@@ -92,9 +109,9 @@
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
     C.PI_DEPENDENCIES: [
-        "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292",
-        "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", "PUBSUB_CACHE",
-        "TEXT_SYNTAXES", "IDENTITY"
+        "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277",
+        "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470",
+        "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", "EVENTS"
     ],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "APGateway",
@@ -134,6 +151,7 @@
         self._t = host.plugins["TEXT_SYNTAXES"]
         self._i = host.plugins["IDENTITY"]
         self._pa = host.plugins["XEP-0470"]
+        self._events = host.plugins["EVENTS"]
         self._p.addManagedNode(
             "",
             items_cb=self._itemsReceived,
@@ -143,6 +161,8 @@
             priority=1000
         )
         self.pubsub_service = APPubsubService(self)
+        self.ad_hoc = APAdHocService(self)
+        self.ap_events = APEvents(self)
         host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000)
         host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract)
         host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived)
@@ -266,6 +286,9 @@
         )
         await self.init(client)
 
+    def profileConnected(self, client):
+        self.ad_hoc.init(client)
+
     async def _itemsReceived(
         self,
         client: SatXMPPEntity,
@@ -313,7 +336,7 @@
         local_jid = await self.getJIDFromId(actor_id)
         return self.client.getVirtualClient(local_jid)
 
-    def isActivity(self, data: dict) -> bool:
+    def is_activity(self, data: dict) -> bool:
         """Return True if the data has an activity type"""
         try:
             return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER
@@ -434,12 +457,23 @@
                 except IndexError:
                     raise exceptions.NotFound("requested items can't be found")
 
-            mb_data = await self._m.item2mbdata(
-                self.client, found_item, author_jid, node
-            )
-            ap_item = await self.mbdata2APitem(self.client, mb_data)
-            # the URL must return the object and not the activity
-            return ap_item["object"]
+            if node.startswith(self._events.namespace):
+                # this is an event
+                event_data = self._events.event_elt_2_event_data(found_item)
+                ap_item = await self.ap_events.event_data_2_ap_item(
+                    event_data, author_jid
+                )
+                # the URL must return the object and not the activity
+                ap_item["object"]["@context"] = ap_item["@context"]
+                return ap_item["object"]
+            else:
+                # this is a blog item
+                mb_data = await self._m.item2mbdata(
+                    self.client, found_item, author_jid, node
+                )
+                ap_item = await self.mb_data_2_ap_item(self.client, mb_data)
+                # the URL must return the object and not the activity
+                return ap_item["object"]
         else:
             raise NotImplementedError(
                 'only object from "item" URLs can be retrieved for now'
@@ -531,22 +565,22 @@
     ) -> str:
         """Retrieve actor who sent data
 
-        This is done by checking "attributedTo" field first, then "actor" field.
-        Only the first found actor is taken into accoun
+        This is done by checking "actor" field first, then "attributedTo" field.
+        Only the first found actor is taken into account
         @param data: AP object
         @return: actor id of the sender
         @raise exceptions.NotFound: no actor has been found in data
         """
         try:
-            actors = await self.apGetActors(data, "attributedTo", as_account=False)
+            actors = await self.apGetActors(data, "actor", as_account=False)
         except exceptions.DataError:
             actors = None
         if not actors:
             try:
-                actors = await self.apGetActors(data, "actor", as_account=False)
+                actors = await self.apGetActors(data, "attributedTo", as_account=False)
             except exceptions.DataError:
                 raise exceptions.NotFound(
-                    'actor not specified in "attributedTo" or "actor"'
+                    'actor not specified in "actor" or "attributedTo"'
                 )
         try:
             return actors[0]
@@ -880,7 +914,7 @@
         if activity_id is None:
             activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}"
         data: Dict[str, Any] = {
-            "@context": NS_AP,
+            "@context": [NS_AP],
             "actor": actor_id,
             "id": activity_id,
             "type": activity,
@@ -993,37 +1027,82 @@
             published
         @param node: (virtual) node corresponding where the item has been published
         @param subscribe_extra_nodes: if True, extra data nodes will be automatically
-        subscribed, that is comment nodes if present and attachments nodes.
+            subscribed, that is comment nodes if present and attachments nodes.
         """
         actor_id = await self.getAPActorIdFromAccount(ap_account)
         inbox = await self.getAPInboxFromId(actor_id)
         for item in items:
             if item.name == "item":
-                mb_data = await self._m.item2mbdata(client, item, service, node)
-                author_jid = jid.JID(mb_data["author_jid"])
-                if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
-                    # we subscribe automatically to comment nodes if any
-                    recipient_jid = self.getLocalJIDFromAccount(ap_account)
-                    recipient_client = self.client.getVirtualClient(recipient_jid)
-                    for comment_data in mb_data.get("comments", []):
-                        comment_service = jid.JID(comment_data["service"])
-                        if self.isVirtualJID(comment_service):
-                            log.debug(f"ignoring virtual comment service: {comment_data}")
-                            continue
-                        comment_node = comment_data["node"]
-                        await self._p.subscribe(
-                            recipient_client, comment_service, comment_node
-                        )
+                cached_item = await self.host.memory.storage.searchPubsubItems({
+                    "profiles": [self.client.profile],
+                    "services": [service],
+                    "nodes": [node],
+                    "names": [item["id"]]
+                })
+                is_new = not bool(cached_item)
+                if node.startswith(self._events.namespace):
+                    # event item
+                    event_data = self._events.event_elt_2_event_data(item)
                     try:
-                        await self._pa.subscribe(
-                            recipient_client, service, node, mb_data["id"]
-                        )
-                    except exceptions.NotFound:
-                        log.debug(
-                            f"no attachment node found for item {mb_data['id']!r} on "
-                            f"{node!r} at {service}"
-                        )
-                ap_item = await self.mbdata2APitem(client, mb_data)
+                        author_jid = jid.JID(item["publisher"]).userhostJID()
+                    except (KeyError, RuntimeWarning):
+                        root_elt = item
+                        while root_elt.parent is not None:
+                            root_elt = root_elt.parent
+                        author_jid = jid.JID(root_elt["from"]).userhostJID()
+                    if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
+                        # we subscribe automatically to comment nodes if any
+                        recipient_jid = self.getLocalJIDFromAccount(ap_account)
+                        recipient_client = self.client.getVirtualClient(recipient_jid)
+                        comments_data = event_data.get("comments")
+                        if comments_data:
+                            comment_service = jid.JID(comments_data["jid"])
+                            comment_node = comments_data["node"]
+                            await self._p.subscribe(
+                                recipient_client, comment_service, comment_node
+                            )
+                        try:
+                            await self._pa.subscribe(
+                                recipient_client, service, node, event_data["id"]
+                            )
+                        except exceptions.NotFound:
+                            log.debug(
+                                f"no attachment node found for item {event_data['id']!r} "
+                                f"on {node!r} at {service}"
+                            )
+                    ap_item = await self.ap_events.event_data_2_ap_item(
+                        event_data, author_jid, is_new=is_new
+                    )
+                else:
+                    # blog item
+                    mb_data = await self._m.item2mbdata(client, item, service, node)
+                    author_jid = jid.JID(mb_data["author_jid"])
+                    if subscribe_extra_nodes and not self.isVirtualJID(author_jid):
+                        # we subscribe automatically to comment nodes if any
+                        recipient_jid = self.getLocalJIDFromAccount(ap_account)
+                        recipient_client = self.client.getVirtualClient(recipient_jid)
+                        for comment_data in mb_data.get("comments", []):
+                            comment_service = jid.JID(comment_data["service"])
+                            if self.isVirtualJID(comment_service):
+                                log.debug(
+                                    f"ignoring virtual comment service: {comment_data}"
+                                )
+                                continue
+                            comment_node = comment_data["node"]
+                            await self._p.subscribe(
+                                recipient_client, comment_service, comment_node
+                            )
+                        try:
+                            await self._pa.subscribe(
+                                recipient_client, service, node, mb_data["id"]
+                            )
+                        except exceptions.NotFound:
+                            log.debug(
+                                f"no attachment node found for item {mb_data['id']!r} on "
+                                f"{node!r} at {service}"
+                            )
+                    ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new)
+
                 url_actor = ap_item["actor"]
             elif item.name == "retract":
                 url_actor, ap_item = await self.apDeleteItem(
@@ -1134,22 +1213,22 @@
             if not "noticed" in old_attachment:
                 # new "noticed" attachment, we translate to "Like" activity
                 activity_id = self.buildAPURL("like", item_account, item_id)
-                like = self.createActivity(
+                activity = self.createActivity(
                     TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
                 )
-                like["to"] = [ap_account]
-                like["cc"] = [NS_AP_PUBLIC]
-                await self.signAndPost(inbox, publisher_actor_id, like)
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                await self.signAndPost(inbox, publisher_actor_id, activity)
         else:
             if "noticed" in old_attachment:
                 # "noticed" attachment has been removed, we undo the "Like" activity
                 activity_id = self.buildAPURL("like", item_account, item_id)
-                like = self.createActivity(
+                activity = self.createActivity(
                     TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id
                 )
-                like["to"] = [ap_account]
-                like["cc"] = [NS_AP_PUBLIC]
-                undo = self.createActivity("Undo", publisher_actor_id, like)
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                undo = self.createActivity("Undo", publisher_actor_id, activity)
                 await self.signAndPost(inbox, publisher_actor_id, undo)
 
         # reactions
@@ -1177,6 +1256,31 @@
                     activy = reaction_activity
                 await self.signAndPost(inbox, publisher_actor_id, activy)
 
+        # RSVP
+        if "rsvp" in attachments:
+            attending = attachments["rsvp"].get("attending", "no")
+            old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
+            if attending != old_attending:
+                activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE
+                activity_id = self.buildAPURL(activity_type.lower(), item_account, item_id)
+                activity = self.createActivity(
+                    activity_type, publisher_actor_id, item_url, activity_id=activity_id
+                )
+                activity["to"] = [ap_account]
+                activity["cc"] = [NS_AP_PUBLIC]
+                await self.signAndPost(inbox, publisher_actor_id, activity)
+        else:
+            if "rsvp" in old_attachment:
+                old_attending = old_attachment.get("rsvp", {}).get("attending", "no")
+                if old_attending == "yes":
+                    activity_id = self.buildAPURL(TYPE_LEAVE.lower(), item_account, item_id)
+                    activity = self.createActivity(
+                        TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id
+                    )
+                    activity["to"] = [ap_account]
+                    activity["cc"] = [NS_AP_PUBLIC]
+                    await self.signAndPost(inbox, publisher_actor_id, activity)
+
         if service.user and self.isVirtualJID(service):
             # the item is on a virtual service, we need to store it in cache
             log.debug("storing attachments item in cache")
@@ -1392,7 +1496,7 @@
             Items are always returned in chronological order in the result
         """
         if parser is None:
-            parser = self.apItem2Elt
+            parser = self.ap_item_2_mb_elt
 
         rsm_resp: Dict[str, Union[bool, int]] = {}
         try:
@@ -1520,7 +1624,7 @@
 
         return items, rsm.RSMResponse(**rsm_resp)
 
-    async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
+    async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]:
         """Convert AP item to parsed microblog data and corresponding item element"""
         mb_data = await self.apItem2MBdata(ap_item)
         item_elt = await self._m.data2entry(
@@ -1532,9 +1636,9 @@
             item_elt["publisher"] = mb_data["author_jid"]
         return mb_data, item_elt
 
-    async def apItem2Elt(self, ap_item: dict) -> domish.Element:
+    async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element:
         """Convert AP item to XMPP item element"""
-        __, item_elt = await self.apItem2MbDataAndElt(ap_item)
+        __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
         return item_elt
 
     async def parseAPPage(
@@ -1629,7 +1733,7 @@
         @raise NotImplementedError: some AP data is not handled yet
         @raise error.StanzaError: error while contacting the AP server
         """
-        is_activity = self.isActivity(ap_item)
+        is_activity = self.is_activity(ap_item)
         if is_activity:
             ap_object = await self.apGetObject(ap_item, "object")
             if not ap_object:
@@ -1839,11 +1943,12 @@
         ]
         return announce
 
-    async def mbdata2APitem(
+    async def mb_data_2_ap_item(
         self,
         client: SatXMPPEntity,
         mb_data: dict,
-        public=True
+        public: bool =True,
+        is_new: bool = True,
     ) -> dict:
         """Convert Libervia Microblog Data to ActivityPub item
 
@@ -1856,6 +1961,10 @@
             ``inReplyTo`` will also be set if suitable
             if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag).
             This is usually used for direct messages.
+        @param is_new: if True, the item is a new one (no instance has been found in
+            cache).
+            If True, a "Create" activity will be generated, otherwise an "Update" one will
+            be.
         @return: Activity item
         """
         extra = mb_data.get("extra", {})
@@ -1941,7 +2050,7 @@
                     )
 
         return self.createActivity(
-            "Create", url_actor, ap_object, activity_id=url_item
+            "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item
         )
 
     async def publishMessage(
@@ -1977,7 +2086,7 @@
         except KeyError:
             raise exceptions.DataError("Can't get ActivityPub actor inbox")
 
-        item_data = await self.mbdata2APitem(client, mess_data)
+        item_data = await self.mb_data_2_ap_item(client, mess_data)
         url_actor = item_data["actor"]
         resp = await self.signAndPost(inbox_url, url_actor, item_data)
 
@@ -2096,7 +2205,7 @@
             # we need to use origin ID when present to be able to retract the message
             mb_data["id"] = origin_id
         client = self.client.getVirtualClient(mess_data["from"])
-        ap_item = await self.mbdata2APitem(client, mb_data, public=False)
+        ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False)
         ap_item["object"]["to"] = ap_item["to"] = [actor_id]
         await self.signAndPost(inbox, ap_item["actor"], ap_item)
         return mess_data
@@ -2205,7 +2314,7 @@
         mb_data = await self._m.item2mbdata(
             client, cached_item.data, pubsub_service, pubsub_node
         )
-        ap_item = await self.mbdata2APitem(client, mb_data)
+        ap_item = await self.mb_data_2_ap_item(client, mb_data)
         ap_object = ap_item["object"]
         ap_object["to"] = [actor_id]
         ap_object.setdefault("tag", []).append({
@@ -2271,7 +2380,7 @@
             log.info(f"{ppElt(parent_item_elt.toXml())}")
             raise NotImplementedError()
         else:
-            __, item_elt = await self.apItem2MbDataAndElt(ap_item)
+            __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item)
             await self._p.publish(client, comment_service, comment_node, [item_elt])
             await self.notifyMentions(
                 targets, mentions, comment_service, comment_node, item_elt["id"]
@@ -2454,7 +2563,6 @@
         @param public: True if the item is public
         """
         # XXX: "public" is not used for now
-
         service = client.jid
         in_reply_to = item.get("inReplyTo")
 
@@ -2476,8 +2584,9 @@
             )
         else:
             # it is a root item (i.e. not a reply to an other item)
+            create = node == self._events.namespace
             cached_node = await self.host.memory.storage.getPubsubNode(
-                client, service, node, with_subscriptions=True
+                client, service, node, with_subscriptions=True, create=create
             )
             if cached_node is None:
                 log.warning(
@@ -2486,12 +2595,15 @@
 
                 )
                 return
-        mb_data, item_elt = await self.apItem2MbDataAndElt(item)
+        if item.get("type") == TYPE_EVENT:
+            data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item)
+        else:
+            data, item_elt = await self.ap_item_2_mb_data_and_elt(item)
         await self.host.memory.storage.cachePubsubItems(
             client,
             cached_node,
             [item_elt],
-            [mb_data]
+            [data]
         )
 
         for subscription in cached_node.subscriptions:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/ad_hoc.py	Thu Sep 22 00:01:41 2022 +0200
@@ -0,0 +1,89 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+from wokkel import data_form
+
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import _
+from sat.core.log import getLogger
+
+
+log = getLogger(__name__)
+NS_XMPP_JID_NODE_2_AP = "https://libervia.org/ap_gateway/xmpp_jid_node_2_ap_actor"
+
+class APAdHocService:
+    """Ad-Hoc commands for AP Gateway"""
+
+    def __init__(self, apg):
+        self.host = apg.host
+        self.apg = apg
+        self._c = self.host.plugins["XEP-0050"]
+
+    def init(self, client: SatXMPPEntity) -> None:
+        self._c.addAdHocCommand(
+            client,
+            self.xmpp_jid_node_2_ap_actor,
+            "Convert XMPP JID/Node to AP actor",
+            node=NS_XMPP_JID_NODE_2_AP,
+            allowed_magics=C.ENTITY_ALL,
+        )
+
+    async def xmpp_jid_node_2_ap_actor(
+        self,
+        client: SatXMPPEntity,
+        command_elt: domish.Element,
+        session_data: dict,
+        action: str,
+        node: str
+    ):
+        try:
+            x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
+            command_form = data_form.Form.fromElement(x_elt)
+        except StopIteration:
+            command_form = None
+        if command_form is None or len(command_form.fields) == 0:
+            # root request
+            status = self._c.STATUS.EXECUTING
+            form = data_form.Form(
+                "form", title="XMPP JID/node to AP actor conversion",
+                formNamespace=NS_XMPP_JID_NODE_2_AP
+            )
+
+            field = data_form.Field(
+                "text-single", "jid", required=True
+            )
+            form.addField(field)
+
+            field = data_form.Field(
+                "text-single", "node", required=False
+            )
+            form.addField(field)
+
+            payload = form.toElement()
+            return payload, status, None, None
+        else:
+            xmpp_jid = jid.JID(command_form["jid"])
+            xmpp_node = command_form.get("node")
+            actor = await self.apg.getAPAccountFromJidAndNode(xmpp_jid, xmpp_node)
+            note = (self._c.NOTE.INFO, actor)
+            status = self._c.STATUS.COMPLETED
+            payload = None
+            return (payload, status, None, note)
--- a/sat/plugins/plugin_comp_ap_gateway/constants.py	Wed Sep 21 22:43:55 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/constants.py	Thu Sep 22 00:01:41 2022 +0200
@@ -31,6 +31,9 @@
 TYPE_MENTION = "Mention"
 TYPE_LIKE = "Like"
 TYPE_REACTION = "EmojiReact"
+TYPE_EVENT = "Event"
+TYPE_JOIN = "Join"
+TYPE_LEAVE = "Leave"
 MEDIA_TYPE_AP = "application/activity+json"
 NS_AP = "https://www.w3.org/ns/activitystreams"
 NS_AP_PUBLIC = f"{NS_AP}#Public"
@@ -73,7 +76,8 @@
 # activities which can be used with Shared Inbox (i.e. with no account specified)
 # must be lowercase
 ACTIVIY_NO_ACCOUNT_ALLOWED = (
-    "create", "delete", "announce", "undo", "like", "emojireact"
+    "create", "update", "delete", "announce", "undo", "like", "emojireact", "join",
+    "leave"
 )
 # maximum number of parents to retrieve when comments_max_depth option is set
 COMMENTS_MAX_PARENTS = 100
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_comp_ap_gateway/events.py	Thu Sep 22 00:01:41 2022 +0200
@@ -0,0 +1,407 @@
+#!/usr/bin/env python3
+
+# Libervia ActivityPub Gateway
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Tuple
+
+import mimetypes
+import html
+
+import shortuuid
+from twisted.words.xish import domish
+from twisted.words.protocols.jabber import jid
+
+from sat.core.i18n import _
+from sat.core.log import getLogger
+from sat.core import exceptions
+from sat.tools.common import date_utils, uri
+
+from .constants import NS_AP_PUBLIC, TYPE_ACTOR, TYPE_EVENT, TYPE_ITEM
+
+
+log = getLogger(__name__)
+
+# direct copy of what Mobilizon uses
+AP_EVENTS_CONTEXT = {
+    "@language": "und",
+    "Hashtag": "as:Hashtag",
+    "PostalAddress": "sc:PostalAddress",
+    "PropertyValue": "sc:PropertyValue",
+    "address": {"@id": "sc:address", "@type": "sc:PostalAddress"},
+    "addressCountry": "sc:addressCountry",
+    "addressLocality": "sc:addressLocality",
+    "addressRegion": "sc:addressRegion",
+    "anonymousParticipationEnabled": {"@id": "mz:anonymousParticipationEnabled",
+                                      "@type": "sc:Boolean"},
+    "category": "sc:category",
+    "commentsEnabled": {"@id": "pt:commentsEnabled",
+                        "@type": "sc:Boolean"},
+    "discoverable": "toot:discoverable",
+    "discussions": {"@id": "mz:discussions", "@type": "@id"},
+    "events": {"@id": "mz:events", "@type": "@id"},
+    "ical": "http://www.w3.org/2002/12/cal/ical#",
+    "inLanguage": "sc:inLanguage",
+    "isOnline": {"@id": "mz:isOnline", "@type": "sc:Boolean"},
+    "joinMode": {"@id": "mz:joinMode", "@type": "mz:joinModeType"},
+    "joinModeType": {"@id": "mz:joinModeType",
+                     "@type": "rdfs:Class"},
+    "location": {"@id": "sc:location", "@type": "sc:Place"},
+    "manuallyApprovesFollowers": "as:manuallyApprovesFollowers",
+    "maximumAttendeeCapacity": "sc:maximumAttendeeCapacity",
+    "memberCount": {"@id": "mz:memberCount", "@type": "sc:Integer"},
+    "members": {"@id": "mz:members", "@type": "@id"},
+    "mz": "https://joinmobilizon.org/ns#",
+    "openness": {"@id": "mz:openness", "@type": "@id"},
+    "participantCount": {"@id": "mz:participantCount",
+                         "@type": "sc:Integer"},
+    "participationMessage": {"@id": "mz:participationMessage",
+                             "@type": "sc:Text"},
+    "postalCode": "sc:postalCode",
+    "posts": {"@id": "mz:posts", "@type": "@id"},
+    "propertyID": "sc:propertyID",
+    "pt": "https://joinpeertube.org/ns#",
+    "remainingAttendeeCapacity": "sc:remainingAttendeeCapacity",
+    "repliesModerationOption": {"@id": "mz:repliesModerationOption",
+                                "@type": "mz:repliesModerationOptionType"},
+    "repliesModerationOptionType": {"@id": "mz:repliesModerationOptionType",
+                                    "@type": "rdfs:Class"},
+    "resources": {"@id": "mz:resources", "@type": "@id"},
+    "sc": "http://schema.org#",
+    "streetAddress": "sc:streetAddress",
+    "timezone": {"@id": "mz:timezone", "@type": "sc:Text"},
+    "todos": {"@id": "mz:todos", "@type": "@id"},
+    "toot": "http://joinmastodon.org/ns#",
+    "uuid": "sc:identifier",
+    "value": "sc:value"
+}
+
+
+class APEvents:
+    """XMPP Events <=> AP Events conversion"""
+
+    def __init__(self, apg):
+        self.host = apg.host
+        self.apg = apg
+        self._events = self.host.plugins["EVENTS"]
+
+    async def event_data_2_ap_item(
+        self, event_data: dict, author_jid: jid.JID, is_new: bool=True
+    ) -> dict:
+        """Convert event data to AP activity
+
+        @param event_data: event data as used in [plugin_exp_events]
+        @param author_jid: jid of the published of the event
+        @param is_new: if True, the item is a new one (no instance has been found in
+            cache).
+            If True, a "Create" activity will be generated, otherwise an "Update" one will
+            be
+        @return: AP activity wrapping an Event object
+        """
+        if not event_data.get("id"):
+            event_data["id"] = shortuuid.uuid()
+        ap_account = await self.apg.getAPAccountFromJidAndNode(
+            author_jid,
+            self._events.namespace
+        )
+        url_actor = self.apg.buildAPURL(TYPE_ACTOR, ap_account)
+        url_item = self.apg.buildAPURL(TYPE_ITEM, ap_account, event_data["id"])
+        ap_object = {
+            "actor": url_actor,
+            "attributedTo": url_actor,
+            "to": [NS_AP_PUBLIC],
+            "id": url_item,
+            "type": TYPE_EVENT,
+            "name": next(iter(event_data["name"].values())),
+            "startTime": date_utils.date_fmt(event_data["start"], "iso"),
+            "endTime": date_utils.date_fmt(event_data["end"], "iso"),
+            "url": url_item,
+        }
+
+        attachment = ap_object["attachment"] = []
+
+        # FIXME: we only handle URL head-picture for now
+        # TODO: handle jingle and use file metadata
+        try:
+            head_picture_url = event_data["head-picture"]["sources"][0]["url"]
+        except (KeyError, IndexError, TypeError):
+            pass
+        else:
+            media_type = mimetypes.guess_type(head_picture_url, False)[0] or "image/jpeg"
+            attachment.append({
+                "name": "Banner",
+                "type": "Document",
+                "mediaType": media_type,
+                "url": head_picture_url,
+            })
+
+        descriptions = event_data.get("descriptions")
+        if descriptions:
+            for description in descriptions:
+                content = description["description"]
+                if description["type"] == "xhtml":
+                    break
+            else:
+                content = f"<p>{html.escape(content)}</p>"  # type: ignore
+            ap_object["content"] = content
+
+        categories = event_data.get("categories")
+        if categories:
+            tag = ap_object["tag"] = []
+            for category in categories:
+                tag.append({
+                    "name": f"#{category['term']}",
+                    "type": "Hashtag",
+                })
+
+        locations = event_data.get("locations")
+        if locations:
+            ap_loc = ap_object["location"] = {}
+            # we only use the first found location
+            location = locations[0]
+            for source, dest in (
+                ("description", "name"),
+                ("lat", "latitude"),
+                ("lon", "longitude"),
+            ):
+                value = location.get(source)
+                if value is not None:
+                    ap_loc[dest] = value
+            for source, dest in (
+                ("country", "addressCountry"),
+                ("locality", "addressLocality"),
+                ("region", "addressRegion"),
+                ("postalcode", "postalCode"),
+                ("street", "streetAddress"),
+            ):
+                value = location.get(source)
+                if value is not None:
+                    ap_loc.setdefault("address", {})[dest] = value
+
+        if event_data.get("comments"):
+            ap_object["commentsEnabled"] = True
+
+        extra = event_data.get("extra")
+
+        if extra:
+            status = extra.get("status")
+            if status:
+                ap_object["ical:status"] = status.upper()
+
+            website = extra.get("website")
+            if website:
+                attachment.append({
+                    "href": website,
+                    "mediaType": "text/html",
+                    "name": "Website",
+                    "type": "Link"
+                })
+
+            accessibility = extra.get("accessibility")
+            if accessibility:
+                wheelchair = accessibility.get("wheelchair")
+                if wheelchair:
+                    if wheelchair == "full":
+                        ap_wc_value = "fully"
+                    elif wheelchair == "partial":
+                        ap_wc_value = "partially"
+                    elif wheelchair == "no":
+                        ap_wc_value = "no"
+                    else:
+                        log.error(f"unexpected wheelchair value: {wheelchair}")
+                        ap_wc_value = None
+                    if ap_wc_value is not None:
+                        attachment.append({
+                            "propertyID": "mz:accessibility:wheelchairAccessible",
+                            "type": "PropertyValue",
+                            "value": ap_wc_value
+                        })
+
+        activity = self.apg.createActivity(
+            "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item
+        )
+        activity["@context"].append(AP_EVENTS_CONTEXT)
+        return activity
+
+    async def ap_item_2_event_data(self, ap_item: dict) -> dict:
+        """Convert AP activity or object to event data
+
+        @param ap_item: ActivityPub item to convert
+            Can be either an activity of an object
+        @return: AP Item's Object and event data
+        @raise exceptions.DataError: something is invalid in the AP item
+        """
+        is_activity = self.apg.is_activity(ap_item)
+        if is_activity:
+            ap_object = await self.apg.apGetObject(ap_item, "object")
+            if not ap_object:
+                log.warning(f'No "object" found in AP item {ap_item!r}')
+                raise exceptions.DataError
+        else:
+            ap_object = ap_item
+
+        # id
+        if "_repeated" in ap_item:
+            # if the event is repeated, we use the original one ID
+            repeated_uri = ap_item["_repeated"]["uri"]
+            parsed_uri = uri.parseXMPPUri(repeated_uri)
+            object_id = parsed_uri["item"]
+        else:
+            object_id = ap_object.get("id")
+            if not object_id:
+                raise exceptions.DataError('"id" is missing in AP object')
+
+        if ap_item["type"] != TYPE_EVENT:
+            raise exceptions.DataError("AP Object is not an event")
+
+        # author
+        actor = await self.apg.apGetSenderActor(ap_object)
+
+        account = await self.apg.getAPAccountFromId(actor)
+        author_jid = self.apg.getLocalJIDFromAccount(account).full()
+
+        # name, start, end
+        event_data = {
+            "id": object_id,
+            "name": {"": ap_object.get("name") or "unnamed"},
+            "start": date_utils.date_parse(ap_object["startTime"]),
+            "end": date_utils.date_parse(ap_object["endTime"]),
+        }
+
+        # attachments/extra
+        event_data["extra"] = extra = {}
+        attachments = ap_object.get("attachment") or []
+        for attachment in attachments:
+            name = attachment.get("name")
+            if name == "Banner":
+                try:
+                    url = attachment["url"]
+                except KeyError:
+                    log.warning(f"invalid attachment: {attachment}")
+                    continue
+                event_data["head-picture"] = {"sources": [{"url": url}]}
+            elif name == "Website":
+                try:
+                    url = attachment["href"]
+                except KeyError:
+                    log.warning(f"invalid attachment: {attachment}")
+                    continue
+                extra["website"] = url
+            else:
+                log.debug(f"unmanaged attachment: {attachment}")
+
+        # description
+        content = ap_object.get("content")
+        if content:
+            event_data["descriptions"] = [{
+                "type": "xhtml",
+                "description": content
+            }]
+
+        # categories
+        tags = ap_object.get("tag")
+        if tags:
+            categories = event_data["categories"] = []
+            for tag in tags:
+                if tag.get("type") == "Hashtag":
+                    try:
+                        term = tag["name"][1:]
+                    except KeyError:
+                        log.warning(f"invalid tag: {tag}")
+                        continue
+                    categories.append({"term": term})
+
+        #location
+        ap_location = ap_object.get("location")
+        if ap_location:
+            location = {}
+            for source, dest in (
+                ("name", "description"),
+                ("latitude", "lat"),
+                ("longitude", "lon"),
+            ):
+                value = ap_location.get(source)
+                if value is not None:
+                    location[dest] = value
+            address = ap_location.get("address")
+            if address:
+                for source, dest in (
+                    ("addressCountry", "country"),
+                    ("addressLocality", "locality"),
+                    ("addressRegion", "region"),
+                    ("postalCode", "postalcode"),
+                    ("streetAddress", "street"),
+                ):
+                    value = address.get(source)
+                    if value is not None:
+                        location[dest] = value
+            if location:
+                event_data["locations"] = [location]
+
+        # rsvp
+        # So far Mobilizon seems to only handle participate/don't participate, thus we use
+        # a simple "yes"/"no" form.
+        rsvp_data = {"fields": []}
+        event_data["rsvp"] = [rsvp_data]
+        rsvp_data["fields"].append({
+            "type": "list-single",
+            "name": "attending",
+            "label": "Attending",
+            "options": [
+                {"label": "yes", "value": "yes"},
+                {"label": "no", "value": "no"}
+            ],
+            "required": True
+        })
+
+        # comments
+
+        if ap_object.get("commentsEnabled"):
+            __, comments_node = await self.apg.getCommentsNodes(object_id, None)
+            event_data["comments"] = {
+                "service": author_jid,
+                "node": comments_node,
+            }
+
+        # extra
+        # part of extra come from "attachment" above
+
+        status = ap_object.get("ical:status")
+        if status is None:
+            pass
+        elif status in ("CONFIRMED", "CANCELLED", "TENTATIVE"):
+            extra["status"] = status.lower()
+        else:
+            log.warning(f"unknown event status: {status}")
+
+        return event_data
+
+    async def ap_item_2_event_data_and_elt(
+        self,
+        ap_item: dict
+    ) -> Tuple[dict, domish.Element]:
+        """Convert AP item to parsed event data and corresponding item element"""
+        event_data = await self.ap_item_2_event_data(ap_item)
+        event_elt = self._events.event_data_2_event_elt(event_data)
+        item_elt = domish.Element((None, "item"))
+        item_elt["id"] = event_data["id"]
+        item_elt.addChild(event_elt)
+        return event_data, item_elt
+
+    async def ap_item_2_event_elt(self, ap_item: dict) -> domish.Element:
+        """Convert AP item to XMPP item element"""
+        __, item_elt = await self.ap_item_2_event_data_and_elt(ap_item)
+        return item_elt
--- a/sat/plugins/plugin_comp_ap_gateway/http_server.py	Wed Sep 21 22:43:55 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py	Thu Sep 22 00:01:41 2022 +0200
@@ -41,9 +41,9 @@
 
 from .constants import (
     NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX,
-    AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED,
-    SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE,
-    TYPE_REACTION, ST_AP_CACHE
+    TYPE_EVENT, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER,
+    ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS,
+    TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE
 )
 from .regex import RE_SIG_PARAM
 
@@ -84,6 +84,7 @@
             f"internal error: {failure_.value}"
         )
         request.finish()
+        raise failure_
 
     async def webfinger(self, request):
         url_parsed = parse.urlparse(request.uri.decode())
@@ -295,11 +296,14 @@
                 f"happen. Ignoring object from {signing_actor}\n{data}"
             )
             raise exceptions.DataError("unexpected field in item")
-        if node is None:
-            node = self.apg._m.namespace
         client = await self.apg.getVirtualClient(signing_actor)
         objects = await self.apg.apGetList(data, "object")
         for obj in objects:
+            if node is None:
+                if obj.get("type") == TYPE_EVENT:
+                    node = self.apg._events.namespace
+                else:
+                    node = self.apg._m.namespace
             sender = await self.apg.apGetSenderActor(obj)
             if repeated:
                 # we don't check sender when item is repeated, as it should be different
@@ -353,6 +357,7 @@
                         "Ignoring object not attributed to signing actor: {obj}"
                     )
                     continue
+
             await self.apg.newAPItem(client, account_jid, node, obj)
 
     async def handleCreateActivity(
@@ -367,6 +372,20 @@
     ):
         await self.handleNewAPItems(request, data, account_jid, node, signing_actor)
 
+    async def handleUpdateActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ):
+        # Update is the same as create: the item ID stays the same, thus the item will be
+        # overwritten
+        await self.handleNewAPItems(request, data, account_jid, node, signing_actor)
+
     async def handleAnnounceActivity(
         self,
         request: "HTTPRequest",
@@ -511,6 +530,32 @@
             "reactions": {"operation": "update", "add": [data["content"]]}
         })
 
+    async def handleJoinActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        client = await self.apg.getVirtualClient(signing_actor)
+        await self.handleAttachmentItem(client, data, {"rsvp": {"attending": "yes"}})
+
+    async def handleLeaveActivity(
+        self,
+        request: "HTTPRequest",
+        data: dict,
+        account_jid: Optional[jid.JID],
+        node: Optional[str],
+        ap_account: Optional[str],
+        ap_url: str,
+        signing_actor: str
+    ) -> None:
+        client = await self.apg.getVirtualClient(signing_actor)
+        await self.handleAttachmentItem(client, data, {"rsvp": {"attending": "no"}})
+
     async def APActorRequest(
         self,
         request: "HTTPRequest",
@@ -531,6 +576,13 @@
         preferred_username = ap_account.split("@", 1)[0]
 
         identity_data = await self.apg._i.getIdentity(self.apg.client, account_jid)
+        if node and node.startswith(self.apg._events.namespace):
+            events = outbox
+        else:
+            events_account = await self.apg.getAPAccountFromJidAndNode(
+                account_jid, self.apg._events.namespace
+            )
+            events = self.apg.buildAPURL(TYPE_OUTBOX, events_account)
 
         actor_data = {
             "@context": [
@@ -543,6 +595,7 @@
             "preferredUsername": preferred_username,
             "inbox": inbox,
             "outbox": outbox,
+            "events": events,
             "followers": followers,
             "following": following,
             "publicKey": {
@@ -551,7 +604,8 @@
                 "publicKeyPem": self.apg.public_key_pem
             },
             "endpoints": {
-                "sharedInbox": shared_inbox
+                "sharedInbox": shared_inbox,
+                "events": events,
             },
         }
 
@@ -633,13 +687,17 @@
 
         base_url = self.getCanonicalURL(request)
         url = f"{base_url}?{parse.urlencode(query_data, True)}"
-        data = {
-            "@context": "https://www.w3.org/ns/activitystreams",
-            "id": url,
-            "type": "OrderedCollectionPage",
-            "partOf": base_url,
-            "orderedItems" : [
-                await self.apg.mbdata2APitem(
+        if node and node.startswith(self.apg._events.namespace):
+            ordered_items = [
+                await self.apg.ap_events.event_data_2_ap_item(
+                    self.apg._events.event_elt_2_event_data(item),
+                    account_jid
+                )
+                for item in reversed(items)
+            ]
+        else:
+            ordered_items = [
+                await self.apg.mb_data_2_ap_item(
                     self.apg.client,
                     await self.apg._m.item2mbdata(
                         self.apg.client,
@@ -650,6 +708,12 @@
                 )
                 for item in reversed(items)
             ]
+        data = {
+            "@context": ["https://www.w3.org/ns/activitystreams"],
+            "id": url,
+            "type": "OrderedCollectionPage",
+            "partOf": base_url,
+            "orderedItems": ordered_items
         }
 
         # AP OrderedCollection must be in reversed chronological order, thus the opposite
@@ -718,7 +782,7 @@
         url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}"
         url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}"
         return {
-            "@context": "https://www.w3.org/ns/activitystreams",
+            "@context": ["https://www.w3.org/ns/activitystreams"],
             "id": url,
             "totalItems": items_count,
             "type": "OrderedCollection",
@@ -737,8 +801,6 @@
     ) -> None:
         if signing_actor is None:
             raise exceptions.InternalError("signing_actor must be set for inbox requests")
-        if node is None:
-            node = self.apg._m.namespace
         try:
             data = json.load(request.content)
             if not isinstance(data, dict):
@@ -805,7 +867,7 @@
 
         url = self.getCanonicalURL(request)
         return {
-          "@context": "https://www.w3.org/ns/activitystreams",
+          "@context": ["https://www.w3.org/ns/activitystreams"],
           "type": "OrderedCollection",
           "id": url,
           "totalItems": len(subscribers),
@@ -844,7 +906,7 @@
 
         url = self.getCanonicalURL(request)
         return {
-          "@context": "https://www.w3.org/ns/activitystreams",
+          "@context": ["https://www.w3.org/ns/activitystreams"],
           "type": "OrderedCollection",
           "id": url,
           "totalItems": len(subscriptions),
@@ -901,7 +963,8 @@
             return static.File(str(avatar_path)).render(request)
         elif request_type == "item":
             ret_data = await self.apg.apGetLocalObject(ap_url)
-            ret_data["@context"] = NS_AP
+            if "@context" not in ret_data:
+                ret_data["@context"] = [NS_AP]
         else:
             if len(extra_args) > 1:
                 log.warning(f"unexpected extra arguments: {extra_args!r}")
--- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Wed Sep 21 22:43:55 2022 +0200
+++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py	Thu Sep 22 00:01:41 2022 +0200
@@ -127,6 +127,22 @@
             await self.apg.convertAndPostItems(
                 client, ap_account, service, nodeIdentifier, items
             )
+            cached_node = await self.host.memory.storage.getPubsubNode(
+                client, service, nodeIdentifier, with_subscriptions=True, create=True
+            )
+            await self.host.memory.storage.cachePubsubItems(
+                client,
+                cached_node,
+                items
+            )
+            for subscription in cached_node.subscriptions:
+                if subscription.state != SubscriptionState.SUBSCRIBED:
+                    continue
+                self.notifyPublish(
+                    service,
+                    nodeIdentifier,
+                    [(subscription.subscriber, None, items)]
+                )
 
     async def apFollowing2Elt(self, ap_item: dict) -> domish.Element:
         """Convert actor ID from following collection to XMPP item"""
@@ -334,14 +350,17 @@
             if cached_node is None:
                 raise error.StanzaError("item-not-found")
         else:
-            if not node.startswith(self.apg._m.namespace):
+            if node.startswith(self.apg._m.namespace):
+                parser = self.apg.ap_item_2_mb_elt
+            elif node.startswith(self.apg._events.namespace):
+                parser = self.apg.ap_events.ap_item_2_event_elt
+            else:
                 raise error.StanzaError(
                     "feature-not-implemented",
                     text=f"AP Gateway {C.APP_VERSION} only supports "
                     f"{self.apg._m.namespace} node for now"
                 )
             collection_name = "outbox"
-            parser = self.apg.apItem2Elt
             use_cache = True
 
         if use_cache:
@@ -433,6 +452,13 @@
     async def subscribe(self, requestor, service, nodeIdentifier, subscriber):
         # TODO: handle comments nodes
         client = self.apg.client
+        # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow
+        # is accepted. Other nodes are directly set to subscribed, their subscriptions
+        # being internal.
+        if nodeIdentifier == self.apg._m.namespace:
+            sub_state = SubscriptionState.PENDING
+        else:
+            sub_state = SubscriptionState.SUBSCRIBED
         node = await self.host.memory.storage.getPubsubNode(
             client, service, nodeIdentifier, with_subscriptions=True
         )
@@ -455,13 +481,13 @@
         if subscription is None:
             subscription = PubsubSub(
                 subscriber=requestor.userhostJID(),
-                state=SubscriptionState.PENDING
+                state=sub_state
             )
             node.subscriptions.append(subscription)
             await self.host.memory.storage.add(node)
         else:
             if subscription.state is None:
-                subscription.state = SubscriptionState.PENDING
+                subscription.state = sub_state
                 await self.host.memory.storage.add(node)
             elif subscription.state == SubscriptionState.SUBSCRIBED:
                 log.info(
@@ -473,13 +499,17 @@
                     f"{requestor.userhostJID()} has already a pending subscription to "
                     f"{node!r} at {service}. Doing the request anyway."
                 )
+                if sub_state != SubscriptionState.PENDING:
+                    subscription.state = sub_state
+                    await self.host.memory.storage.add(node)
             else:
                 raise exceptions.InternalError(
                     f"unmanaged subscription state: {subscription.state}"
                 )
 
-        if nodeIdentifier == self.apg._m.namespace:
-            # if we subscribe to microblog node, we follow the corresponding account
+        if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace):
+            # if we subscribe to microblog or events node, we follow the corresponding
+            # account
             req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox(
                 requestor, service
             )
@@ -490,7 +520,7 @@
             if resp.code >= 300:
                 text = await resp.text()
                 raise error.StanzaError("service-unavailable", text=text)
-            return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
+        return pubsub.Subscription(nodeIdentifier, requestor, "subscribed")
 
     @ensure_deferred
     async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):