# HG changeset patch # User Goffi # Date 1663797701 -7200 # Node ID 0aa7023dcd083d2b840f9d7c09ab0847ce9c0ada # Parent 384b7e6c2dbf1828ec5886dcbf1d55e389befcfb component AP gateway: events: - XMPP Events <=> AP Events conversion - `Join`/`Leave` activities are converted to RSVP attachments and vice versa - fix caching/notification on item published on a virtual pubsub node - add Ad-Hoc command to convert XMPP Jid/Node to virtual AP Account - handle `Update` activity - on `convertAndPostItems`, `Update` activity is used instead of `Create` if a version of the item is already present in cache - `events` field is added to actor data (and to `endpoints`), it links the `outbox` of the actor mapping the same JID with the Events node (i.e. it links to the Events node of the entity) - fix subscription to nodes which are not the microblog one rel 372 diff -r 384b7e6c2dbf -r 0aa7023dcd08 sat/plugins/plugin_comp_ap_gateway/__init__.py --- a/sat/plugins/plugin_comp_ap_gateway/__init__.py Wed Sep 21 22:43:55 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Thu Sep 22 00:01:41 2022 +0200 @@ -20,11 +20,22 @@ import calendar import hashlib import json +from os import access from pathlib import Path from pprint import pformat import re from typing import ( - Any, Dict, List, Set, Optional, Tuple, Union, Callable, Awaitable, overload + Any, + Awaitable, + Callable, + Dict, + List, + Optional, + Set, + Tuple, + Type, + Union, + overload, ) from urllib import parse @@ -34,6 +45,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import padding import dateutil +from dateutil.parser import parserinfo import shortuuid from sqlalchemy.exc import IntegrityError import treq @@ -42,19 +54,21 @@ from twisted.web import http from twisted.words.protocols.jabber import error, jid from twisted.words.xish import domish -from wokkel import rsm, pubsub +from wokkel import pubsub, rsm from sat.core import exceptions from sat.core.constants import Const as C from sat.core.core_types import SatXMPPEntity from sat.core.i18n import _ from sat.core.log import getLogger -from sat.memory.sqla_mapping import SubscriptionState, History from sat.memory import persistent +from sat.memory.sqla_mapping import History, SubscriptionState from sat.tools import utils -from sat.tools.common import data_format, tls, uri +from sat.tools.common import data_format, date_utils, tls, uri from sat.tools.common.async_utils import async_lru +from .ad_hoc import APAdHocService +from .events import APEvents from .constants import ( ACTIVITY_OBJECT_MANDATORY, ACTIVITY_TARGET_MANDATORY, @@ -65,20 +79,23 @@ IMPORT_NAME, LRU_MAX_SIZE, MEDIA_TYPE_AP, - TYPE_ACTOR, - TYPE_ITEM, - TYPE_FOLLOWERS, - TYPE_TOMBSTONE, - TYPE_MENTION, - TYPE_LIKE, - TYPE_REACTION, NS_AP, NS_AP_PUBLIC, - PUBLIC_TUPLE + PUBLIC_TUPLE, + TYPE_ACTOR, + TYPE_EVENT, + TYPE_FOLLOWERS, + TYPE_ITEM, + TYPE_LIKE, + TYPE_MENTION, + TYPE_REACTION, + TYPE_TOMBSTONE, + TYPE_JOIN, + TYPE_LEAVE ) -from .regex import RE_MENTION from .http_server import HTTPServer from .pubsub_service import APPubsubService +from .regex import RE_MENTION log = getLogger(__name__) @@ -92,9 +109,9 @@ C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ - "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", "XEP-0292", - "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", "PUBSUB_CACHE", - "TEXT_SYNTAXES", "IDENTITY" + "XEP-0050", "XEP-0054", "XEP-0060", "XEP-0084", "XEP-0106", "XEP-0277", + "XEP-0292", "XEP-0329", "XEP-0372", "XEP-0424", "XEP-0465", "XEP-0470", + "PUBSUB_CACHE", "TEXT_SYNTAXES", "IDENTITY", "EVENTS" ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "APGateway", @@ -134,6 +151,7 @@ self._t = host.plugins["TEXT_SYNTAXES"] self._i = host.plugins["IDENTITY"] self._pa = host.plugins["XEP-0470"] + self._events = host.plugins["EVENTS"] self._p.addManagedNode( "", items_cb=self._itemsReceived, @@ -143,6 +161,8 @@ priority=1000 ) self.pubsub_service = APPubsubService(self) + self.ad_hoc = APAdHocService(self) + self.ap_events = APEvents(self) host.trigger.add("messageReceived", self._messageReceivedTrigger, priority=-1000) host.trigger.add("XEP-0424_retractReceived", self._onMessageRetract) host.trigger.add("XEP-0372_ref_received", self._onReferenceReceived) @@ -266,6 +286,9 @@ ) await self.init(client) + def profileConnected(self, client): + self.ad_hoc.init(client) + async def _itemsReceived( self, client: SatXMPPEntity, @@ -313,7 +336,7 @@ local_jid = await self.getJIDFromId(actor_id) return self.client.getVirtualClient(local_jid) - def isActivity(self, data: dict) -> bool: + def is_activity(self, data: dict) -> bool: """Return True if the data has an activity type""" try: return (data.get("type") or "").lower() in ACTIVITY_TYPES_LOWER @@ -434,12 +457,23 @@ except IndexError: raise exceptions.NotFound("requested items can't be found") - mb_data = await self._m.item2mbdata( - self.client, found_item, author_jid, node - ) - ap_item = await self.mbdata2APitem(self.client, mb_data) - # the URL must return the object and not the activity - return ap_item["object"] + if node.startswith(self._events.namespace): + # this is an event + event_data = self._events.event_elt_2_event_data(found_item) + ap_item = await self.ap_events.event_data_2_ap_item( + event_data, author_jid + ) + # the URL must return the object and not the activity + ap_item["object"]["@context"] = ap_item["@context"] + return ap_item["object"] + else: + # this is a blog item + mb_data = await self._m.item2mbdata( + self.client, found_item, author_jid, node + ) + ap_item = await self.mb_data_2_ap_item(self.client, mb_data) + # the URL must return the object and not the activity + return ap_item["object"] else: raise NotImplementedError( 'only object from "item" URLs can be retrieved for now' @@ -531,22 +565,22 @@ ) -> str: """Retrieve actor who sent data - This is done by checking "attributedTo" field first, then "actor" field. - Only the first found actor is taken into accoun + This is done by checking "actor" field first, then "attributedTo" field. + Only the first found actor is taken into account @param data: AP object @return: actor id of the sender @raise exceptions.NotFound: no actor has been found in data """ try: - actors = await self.apGetActors(data, "attributedTo", as_account=False) + actors = await self.apGetActors(data, "actor", as_account=False) except exceptions.DataError: actors = None if not actors: try: - actors = await self.apGetActors(data, "actor", as_account=False) + actors = await self.apGetActors(data, "attributedTo", as_account=False) except exceptions.DataError: raise exceptions.NotFound( - 'actor not specified in "attributedTo" or "actor"' + 'actor not specified in "actor" or "attributedTo"' ) try: return actors[0] @@ -880,7 +914,7 @@ if activity_id is None: activity_id = f"{actor_id}#{activity.lower()}_{shortuuid.uuid()}" data: Dict[str, Any] = { - "@context": NS_AP, + "@context": [NS_AP], "actor": actor_id, "id": activity_id, "type": activity, @@ -993,37 +1027,82 @@ published @param node: (virtual) node corresponding where the item has been published @param subscribe_extra_nodes: if True, extra data nodes will be automatically - subscribed, that is comment nodes if present and attachments nodes. + subscribed, that is comment nodes if present and attachments nodes. """ actor_id = await self.getAPActorIdFromAccount(ap_account) inbox = await self.getAPInboxFromId(actor_id) for item in items: if item.name == "item": - mb_data = await self._m.item2mbdata(client, item, service, node) - author_jid = jid.JID(mb_data["author_jid"]) - if subscribe_extra_nodes and not self.isVirtualJID(author_jid): - # we subscribe automatically to comment nodes if any - recipient_jid = self.getLocalJIDFromAccount(ap_account) - recipient_client = self.client.getVirtualClient(recipient_jid) - for comment_data in mb_data.get("comments", []): - comment_service = jid.JID(comment_data["service"]) - if self.isVirtualJID(comment_service): - log.debug(f"ignoring virtual comment service: {comment_data}") - continue - comment_node = comment_data["node"] - await self._p.subscribe( - recipient_client, comment_service, comment_node - ) + cached_item = await self.host.memory.storage.searchPubsubItems({ + "profiles": [self.client.profile], + "services": [service], + "nodes": [node], + "names": [item["id"]] + }) + is_new = not bool(cached_item) + if node.startswith(self._events.namespace): + # event item + event_data = self._events.event_elt_2_event_data(item) try: - await self._pa.subscribe( - recipient_client, service, node, mb_data["id"] - ) - except exceptions.NotFound: - log.debug( - f"no attachment node found for item {mb_data['id']!r} on " - f"{node!r} at {service}" - ) - ap_item = await self.mbdata2APitem(client, mb_data) + author_jid = jid.JID(item["publisher"]).userhostJID() + except (KeyError, RuntimeWarning): + root_elt = item + while root_elt.parent is not None: + root_elt = root_elt.parent + author_jid = jid.JID(root_elt["from"]).userhostJID() + if subscribe_extra_nodes and not self.isVirtualJID(author_jid): + # we subscribe automatically to comment nodes if any + recipient_jid = self.getLocalJIDFromAccount(ap_account) + recipient_client = self.client.getVirtualClient(recipient_jid) + comments_data = event_data.get("comments") + if comments_data: + comment_service = jid.JID(comments_data["jid"]) + comment_node = comments_data["node"] + await self._p.subscribe( + recipient_client, comment_service, comment_node + ) + try: + await self._pa.subscribe( + recipient_client, service, node, event_data["id"] + ) + except exceptions.NotFound: + log.debug( + f"no attachment node found for item {event_data['id']!r} " + f"on {node!r} at {service}" + ) + ap_item = await self.ap_events.event_data_2_ap_item( + event_data, author_jid, is_new=is_new + ) + else: + # blog item + mb_data = await self._m.item2mbdata(client, item, service, node) + author_jid = jid.JID(mb_data["author_jid"]) + if subscribe_extra_nodes and not self.isVirtualJID(author_jid): + # we subscribe automatically to comment nodes if any + recipient_jid = self.getLocalJIDFromAccount(ap_account) + recipient_client = self.client.getVirtualClient(recipient_jid) + for comment_data in mb_data.get("comments", []): + comment_service = jid.JID(comment_data["service"]) + if self.isVirtualJID(comment_service): + log.debug( + f"ignoring virtual comment service: {comment_data}" + ) + continue + comment_node = comment_data["node"] + await self._p.subscribe( + recipient_client, comment_service, comment_node + ) + try: + await self._pa.subscribe( + recipient_client, service, node, mb_data["id"] + ) + except exceptions.NotFound: + log.debug( + f"no attachment node found for item {mb_data['id']!r} on " + f"{node!r} at {service}" + ) + ap_item = await self.mb_data_2_ap_item(client, mb_data, is_new=is_new) + url_actor = ap_item["actor"] elif item.name == "retract": url_actor, ap_item = await self.apDeleteItem( @@ -1134,22 +1213,22 @@ if not "noticed" in old_attachment: # new "noticed" attachment, we translate to "Like" activity activity_id = self.buildAPURL("like", item_account, item_id) - like = self.createActivity( + activity = self.createActivity( TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id ) - like["to"] = [ap_account] - like["cc"] = [NS_AP_PUBLIC] - await self.signAndPost(inbox, publisher_actor_id, like) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.signAndPost(inbox, publisher_actor_id, activity) else: if "noticed" in old_attachment: # "noticed" attachment has been removed, we undo the "Like" activity activity_id = self.buildAPURL("like", item_account, item_id) - like = self.createActivity( + activity = self.createActivity( TYPE_LIKE, publisher_actor_id, item_url, activity_id=activity_id ) - like["to"] = [ap_account] - like["cc"] = [NS_AP_PUBLIC] - undo = self.createActivity("Undo", publisher_actor_id, like) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + undo = self.createActivity("Undo", publisher_actor_id, activity) await self.signAndPost(inbox, publisher_actor_id, undo) # reactions @@ -1177,6 +1256,31 @@ activy = reaction_activity await self.signAndPost(inbox, publisher_actor_id, activy) + # RSVP + if "rsvp" in attachments: + attending = attachments["rsvp"].get("attending", "no") + old_attending = old_attachment.get("rsvp", {}).get("attending", "no") + if attending != old_attending: + activity_type = TYPE_JOIN if attending == "yes" else TYPE_LEAVE + activity_id = self.buildAPURL(activity_type.lower(), item_account, item_id) + activity = self.createActivity( + activity_type, publisher_actor_id, item_url, activity_id=activity_id + ) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.signAndPost(inbox, publisher_actor_id, activity) + else: + if "rsvp" in old_attachment: + old_attending = old_attachment.get("rsvp", {}).get("attending", "no") + if old_attending == "yes": + activity_id = self.buildAPURL(TYPE_LEAVE.lower(), item_account, item_id) + activity = self.createActivity( + TYPE_LEAVE, publisher_actor_id, item_url, activity_id=activity_id + ) + activity["to"] = [ap_account] + activity["cc"] = [NS_AP_PUBLIC] + await self.signAndPost(inbox, publisher_actor_id, activity) + if service.user and self.isVirtualJID(service): # the item is on a virtual service, we need to store it in cache log.debug("storing attachments item in cache") @@ -1392,7 +1496,7 @@ Items are always returned in chronological order in the result """ if parser is None: - parser = self.apItem2Elt + parser = self.ap_item_2_mb_elt rsm_resp: Dict[str, Union[bool, int]] = {} try: @@ -1520,7 +1624,7 @@ return items, rsm.RSMResponse(**rsm_resp) - async def apItem2MbDataAndElt(self, ap_item: dict) -> Tuple[dict, domish.Element]: + async def ap_item_2_mb_data_and_elt(self, ap_item: dict) -> Tuple[dict, domish.Element]: """Convert AP item to parsed microblog data and corresponding item element""" mb_data = await self.apItem2MBdata(ap_item) item_elt = await self._m.data2entry( @@ -1532,9 +1636,9 @@ item_elt["publisher"] = mb_data["author_jid"] return mb_data, item_elt - async def apItem2Elt(self, ap_item: dict) -> domish.Element: + async def ap_item_2_mb_elt(self, ap_item: dict) -> domish.Element: """Convert AP item to XMPP item element""" - __, item_elt = await self.apItem2MbDataAndElt(ap_item) + __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) return item_elt async def parseAPPage( @@ -1629,7 +1733,7 @@ @raise NotImplementedError: some AP data is not handled yet @raise error.StanzaError: error while contacting the AP server """ - is_activity = self.isActivity(ap_item) + is_activity = self.is_activity(ap_item) if is_activity: ap_object = await self.apGetObject(ap_item, "object") if not ap_object: @@ -1839,11 +1943,12 @@ ] return announce - async def mbdata2APitem( + async def mb_data_2_ap_item( self, client: SatXMPPEntity, mb_data: dict, - public=True + public: bool =True, + is_new: bool = True, ) -> dict: """Convert Libervia Microblog Data to ActivityPub item @@ -1856,6 +1961,10 @@ ``inReplyTo`` will also be set if suitable if False, no destinee will be set (i.e., no ``to`` or ``cc`` or public flag). This is usually used for direct messages. + @param is_new: if True, the item is a new one (no instance has been found in + cache). + If True, a "Create" activity will be generated, otherwise an "Update" one will + be. @return: Activity item """ extra = mb_data.get("extra", {}) @@ -1941,7 +2050,7 @@ ) return self.createActivity( - "Create", url_actor, ap_object, activity_id=url_item + "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item ) async def publishMessage( @@ -1977,7 +2086,7 @@ except KeyError: raise exceptions.DataError("Can't get ActivityPub actor inbox") - item_data = await self.mbdata2APitem(client, mess_data) + item_data = await self.mb_data_2_ap_item(client, mess_data) url_actor = item_data["actor"] resp = await self.signAndPost(inbox_url, url_actor, item_data) @@ -2096,7 +2205,7 @@ # we need to use origin ID when present to be able to retract the message mb_data["id"] = origin_id client = self.client.getVirtualClient(mess_data["from"]) - ap_item = await self.mbdata2APitem(client, mb_data, public=False) + ap_item = await self.mb_data_2_ap_item(client, mb_data, public=False) ap_item["object"]["to"] = ap_item["to"] = [actor_id] await self.signAndPost(inbox, ap_item["actor"], ap_item) return mess_data @@ -2205,7 +2314,7 @@ mb_data = await self._m.item2mbdata( client, cached_item.data, pubsub_service, pubsub_node ) - ap_item = await self.mbdata2APitem(client, mb_data) + ap_item = await self.mb_data_2_ap_item(client, mb_data) ap_object = ap_item["object"] ap_object["to"] = [actor_id] ap_object.setdefault("tag", []).append({ @@ -2271,7 +2380,7 @@ log.info(f"{ppElt(parent_item_elt.toXml())}") raise NotImplementedError() else: - __, item_elt = await self.apItem2MbDataAndElt(ap_item) + __, item_elt = await self.ap_item_2_mb_data_and_elt(ap_item) await self._p.publish(client, comment_service, comment_node, [item_elt]) await self.notifyMentions( targets, mentions, comment_service, comment_node, item_elt["id"] @@ -2454,7 +2563,6 @@ @param public: True if the item is public """ # XXX: "public" is not used for now - service = client.jid in_reply_to = item.get("inReplyTo") @@ -2476,8 +2584,9 @@ ) else: # it is a root item (i.e. not a reply to an other item) + create = node == self._events.namespace cached_node = await self.host.memory.storage.getPubsubNode( - client, service, node, with_subscriptions=True + client, service, node, with_subscriptions=True, create=create ) if cached_node is None: log.warning( @@ -2486,12 +2595,15 @@ ) return - mb_data, item_elt = await self.apItem2MbDataAndElt(item) + if item.get("type") == TYPE_EVENT: + data, item_elt = await self.ap_events.ap_item_2_event_data_and_elt(item) + else: + data, item_elt = await self.ap_item_2_mb_data_and_elt(item) await self.host.memory.storage.cachePubsubItems( client, cached_node, [item_elt], - [mb_data] + [data] ) for subscription in cached_node.subscriptions: diff -r 384b7e6c2dbf -r 0aa7023dcd08 sat/plugins/plugin_comp_ap_gateway/ad_hoc.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway/ad_hoc.py Thu Sep 22 00:01:41 2022 +0200 @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish +from wokkel import data_form + +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _ +from sat.core.log import getLogger + + +log = getLogger(__name__) +NS_XMPP_JID_NODE_2_AP = "https://libervia.org/ap_gateway/xmpp_jid_node_2_ap_actor" + +class APAdHocService: + """Ad-Hoc commands for AP Gateway""" + + def __init__(self, apg): + self.host = apg.host + self.apg = apg + self._c = self.host.plugins["XEP-0050"] + + def init(self, client: SatXMPPEntity) -> None: + self._c.addAdHocCommand( + client, + self.xmpp_jid_node_2_ap_actor, + "Convert XMPP JID/Node to AP actor", + node=NS_XMPP_JID_NODE_2_AP, + allowed_magics=C.ENTITY_ALL, + ) + + async def xmpp_jid_node_2_ap_actor( + self, + client: SatXMPPEntity, + command_elt: domish.Element, + session_data: dict, + action: str, + node: str + ): + try: + x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) + command_form = data_form.Form.fromElement(x_elt) + except StopIteration: + command_form = None + if command_form is None or len(command_form.fields) == 0: + # root request + status = self._c.STATUS.EXECUTING + form = data_form.Form( + "form", title="XMPP JID/node to AP actor conversion", + formNamespace=NS_XMPP_JID_NODE_2_AP + ) + + field = data_form.Field( + "text-single", "jid", required=True + ) + form.addField(field) + + field = data_form.Field( + "text-single", "node", required=False + ) + form.addField(field) + + payload = form.toElement() + return payload, status, None, None + else: + xmpp_jid = jid.JID(command_form["jid"]) + xmpp_node = command_form.get("node") + actor = await self.apg.getAPAccountFromJidAndNode(xmpp_jid, xmpp_node) + note = (self._c.NOTE.INFO, actor) + status = self._c.STATUS.COMPLETED + payload = None + return (payload, status, None, note) diff -r 384b7e6c2dbf -r 0aa7023dcd08 sat/plugins/plugin_comp_ap_gateway/constants.py --- a/sat/plugins/plugin_comp_ap_gateway/constants.py Wed Sep 21 22:43:55 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/constants.py Thu Sep 22 00:01:41 2022 +0200 @@ -31,6 +31,9 @@ TYPE_MENTION = "Mention" TYPE_LIKE = "Like" TYPE_REACTION = "EmojiReact" +TYPE_EVENT = "Event" +TYPE_JOIN = "Join" +TYPE_LEAVE = "Leave" MEDIA_TYPE_AP = "application/activity+json" NS_AP = "https://www.w3.org/ns/activitystreams" NS_AP_PUBLIC = f"{NS_AP}#Public" @@ -73,7 +76,8 @@ # activities which can be used with Shared Inbox (i.e. with no account specified) # must be lowercase ACTIVIY_NO_ACCOUNT_ALLOWED = ( - "create", "delete", "announce", "undo", "like", "emojireact" + "create", "update", "delete", "announce", "undo", "like", "emojireact", "join", + "leave" ) # maximum number of parents to retrieve when comments_max_depth option is set COMMENTS_MAX_PARENTS = 100 diff -r 384b7e6c2dbf -r 0aa7023dcd08 sat/plugins/plugin_comp_ap_gateway/events.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway/events.py Thu Sep 22 00:01:41 2022 +0200 @@ -0,0 +1,407 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Tuple + +import mimetypes +import html + +import shortuuid +from twisted.words.xish import domish +from twisted.words.protocols.jabber import jid + +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.core import exceptions +from sat.tools.common import date_utils, uri + +from .constants import NS_AP_PUBLIC, TYPE_ACTOR, TYPE_EVENT, TYPE_ITEM + + +log = getLogger(__name__) + +# direct copy of what Mobilizon uses +AP_EVENTS_CONTEXT = { + "@language": "und", + "Hashtag": "as:Hashtag", + "PostalAddress": "sc:PostalAddress", + "PropertyValue": "sc:PropertyValue", + "address": {"@id": "sc:address", "@type": "sc:PostalAddress"}, + "addressCountry": "sc:addressCountry", + "addressLocality": "sc:addressLocality", + "addressRegion": "sc:addressRegion", + "anonymousParticipationEnabled": {"@id": "mz:anonymousParticipationEnabled", + "@type": "sc:Boolean"}, + "category": "sc:category", + "commentsEnabled": {"@id": "pt:commentsEnabled", + "@type": "sc:Boolean"}, + "discoverable": "toot:discoverable", + "discussions": {"@id": "mz:discussions", "@type": "@id"}, + "events": {"@id": "mz:events", "@type": "@id"}, + "ical": "http://www.w3.org/2002/12/cal/ical#", + "inLanguage": "sc:inLanguage", + "isOnline": {"@id": "mz:isOnline", "@type": "sc:Boolean"}, + "joinMode": {"@id": "mz:joinMode", "@type": "mz:joinModeType"}, + "joinModeType": {"@id": "mz:joinModeType", + "@type": "rdfs:Class"}, + "location": {"@id": "sc:location", "@type": "sc:Place"}, + "manuallyApprovesFollowers": "as:manuallyApprovesFollowers", + "maximumAttendeeCapacity": "sc:maximumAttendeeCapacity", + "memberCount": {"@id": "mz:memberCount", "@type": "sc:Integer"}, + "members": {"@id": "mz:members", "@type": "@id"}, + "mz": "https://joinmobilizon.org/ns#", + "openness": {"@id": "mz:openness", "@type": "@id"}, + "participantCount": {"@id": "mz:participantCount", + "@type": "sc:Integer"}, + "participationMessage": {"@id": "mz:participationMessage", + "@type": "sc:Text"}, + "postalCode": "sc:postalCode", + "posts": {"@id": "mz:posts", "@type": "@id"}, + "propertyID": "sc:propertyID", + "pt": "https://joinpeertube.org/ns#", + "remainingAttendeeCapacity": "sc:remainingAttendeeCapacity", + "repliesModerationOption": {"@id": "mz:repliesModerationOption", + "@type": "mz:repliesModerationOptionType"}, + "repliesModerationOptionType": {"@id": "mz:repliesModerationOptionType", + "@type": "rdfs:Class"}, + "resources": {"@id": "mz:resources", "@type": "@id"}, + "sc": "http://schema.org#", + "streetAddress": "sc:streetAddress", + "timezone": {"@id": "mz:timezone", "@type": "sc:Text"}, + "todos": {"@id": "mz:todos", "@type": "@id"}, + "toot": "http://joinmastodon.org/ns#", + "uuid": "sc:identifier", + "value": "sc:value" +} + + +class APEvents: + """XMPP Events <=> AP Events conversion""" + + def __init__(self, apg): + self.host = apg.host + self.apg = apg + self._events = self.host.plugins["EVENTS"] + + async def event_data_2_ap_item( + self, event_data: dict, author_jid: jid.JID, is_new: bool=True + ) -> dict: + """Convert event data to AP activity + + @param event_data: event data as used in [plugin_exp_events] + @param author_jid: jid of the published of the event + @param is_new: if True, the item is a new one (no instance has been found in + cache). + If True, a "Create" activity will be generated, otherwise an "Update" one will + be + @return: AP activity wrapping an Event object + """ + if not event_data.get("id"): + event_data["id"] = shortuuid.uuid() + ap_account = await self.apg.getAPAccountFromJidAndNode( + author_jid, + self._events.namespace + ) + url_actor = self.apg.buildAPURL(TYPE_ACTOR, ap_account) + url_item = self.apg.buildAPURL(TYPE_ITEM, ap_account, event_data["id"]) + ap_object = { + "actor": url_actor, + "attributedTo": url_actor, + "to": [NS_AP_PUBLIC], + "id": url_item, + "type": TYPE_EVENT, + "name": next(iter(event_data["name"].values())), + "startTime": date_utils.date_fmt(event_data["start"], "iso"), + "endTime": date_utils.date_fmt(event_data["end"], "iso"), + "url": url_item, + } + + attachment = ap_object["attachment"] = [] + + # FIXME: we only handle URL head-picture for now + # TODO: handle jingle and use file metadata + try: + head_picture_url = event_data["head-picture"]["sources"][0]["url"] + except (KeyError, IndexError, TypeError): + pass + else: + media_type = mimetypes.guess_type(head_picture_url, False)[0] or "image/jpeg" + attachment.append({ + "name": "Banner", + "type": "Document", + "mediaType": media_type, + "url": head_picture_url, + }) + + descriptions = event_data.get("descriptions") + if descriptions: + for description in descriptions: + content = description["description"] + if description["type"] == "xhtml": + break + else: + content = f"

{html.escape(content)}

" # type: ignore + ap_object["content"] = content + + categories = event_data.get("categories") + if categories: + tag = ap_object["tag"] = [] + for category in categories: + tag.append({ + "name": f"#{category['term']}", + "type": "Hashtag", + }) + + locations = event_data.get("locations") + if locations: + ap_loc = ap_object["location"] = {} + # we only use the first found location + location = locations[0] + for source, dest in ( + ("description", "name"), + ("lat", "latitude"), + ("lon", "longitude"), + ): + value = location.get(source) + if value is not None: + ap_loc[dest] = value + for source, dest in ( + ("country", "addressCountry"), + ("locality", "addressLocality"), + ("region", "addressRegion"), + ("postalcode", "postalCode"), + ("street", "streetAddress"), + ): + value = location.get(source) + if value is not None: + ap_loc.setdefault("address", {})[dest] = value + + if event_data.get("comments"): + ap_object["commentsEnabled"] = True + + extra = event_data.get("extra") + + if extra: + status = extra.get("status") + if status: + ap_object["ical:status"] = status.upper() + + website = extra.get("website") + if website: + attachment.append({ + "href": website, + "mediaType": "text/html", + "name": "Website", + "type": "Link" + }) + + accessibility = extra.get("accessibility") + if accessibility: + wheelchair = accessibility.get("wheelchair") + if wheelchair: + if wheelchair == "full": + ap_wc_value = "fully" + elif wheelchair == "partial": + ap_wc_value = "partially" + elif wheelchair == "no": + ap_wc_value = "no" + else: + log.error(f"unexpected wheelchair value: {wheelchair}") + ap_wc_value = None + if ap_wc_value is not None: + attachment.append({ + "propertyID": "mz:accessibility:wheelchairAccessible", + "type": "PropertyValue", + "value": ap_wc_value + }) + + activity = self.apg.createActivity( + "Create" if is_new else "Update", url_actor, ap_object, activity_id=url_item + ) + activity["@context"].append(AP_EVENTS_CONTEXT) + return activity + + async def ap_item_2_event_data(self, ap_item: dict) -> dict: + """Convert AP activity or object to event data + + @param ap_item: ActivityPub item to convert + Can be either an activity of an object + @return: AP Item's Object and event data + @raise exceptions.DataError: something is invalid in the AP item + """ + is_activity = self.apg.is_activity(ap_item) + if is_activity: + ap_object = await self.apg.apGetObject(ap_item, "object") + if not ap_object: + log.warning(f'No "object" found in AP item {ap_item!r}') + raise exceptions.DataError + else: + ap_object = ap_item + + # id + if "_repeated" in ap_item: + # if the event is repeated, we use the original one ID + repeated_uri = ap_item["_repeated"]["uri"] + parsed_uri = uri.parseXMPPUri(repeated_uri) + object_id = parsed_uri["item"] + else: + object_id = ap_object.get("id") + if not object_id: + raise exceptions.DataError('"id" is missing in AP object') + + if ap_item["type"] != TYPE_EVENT: + raise exceptions.DataError("AP Object is not an event") + + # author + actor = await self.apg.apGetSenderActor(ap_object) + + account = await self.apg.getAPAccountFromId(actor) + author_jid = self.apg.getLocalJIDFromAccount(account).full() + + # name, start, end + event_data = { + "id": object_id, + "name": {"": ap_object.get("name") or "unnamed"}, + "start": date_utils.date_parse(ap_object["startTime"]), + "end": date_utils.date_parse(ap_object["endTime"]), + } + + # attachments/extra + event_data["extra"] = extra = {} + attachments = ap_object.get("attachment") or [] + for attachment in attachments: + name = attachment.get("name") + if name == "Banner": + try: + url = attachment["url"] + except KeyError: + log.warning(f"invalid attachment: {attachment}") + continue + event_data["head-picture"] = {"sources": [{"url": url}]} + elif name == "Website": + try: + url = attachment["href"] + except KeyError: + log.warning(f"invalid attachment: {attachment}") + continue + extra["website"] = url + else: + log.debug(f"unmanaged attachment: {attachment}") + + # description + content = ap_object.get("content") + if content: + event_data["descriptions"] = [{ + "type": "xhtml", + "description": content + }] + + # categories + tags = ap_object.get("tag") + if tags: + categories = event_data["categories"] = [] + for tag in tags: + if tag.get("type") == "Hashtag": + try: + term = tag["name"][1:] + except KeyError: + log.warning(f"invalid tag: {tag}") + continue + categories.append({"term": term}) + + #location + ap_location = ap_object.get("location") + if ap_location: + location = {} + for source, dest in ( + ("name", "description"), + ("latitude", "lat"), + ("longitude", "lon"), + ): + value = ap_location.get(source) + if value is not None: + location[dest] = value + address = ap_location.get("address") + if address: + for source, dest in ( + ("addressCountry", "country"), + ("addressLocality", "locality"), + ("addressRegion", "region"), + ("postalCode", "postalcode"), + ("streetAddress", "street"), + ): + value = address.get(source) + if value is not None: + location[dest] = value + if location: + event_data["locations"] = [location] + + # rsvp + # So far Mobilizon seems to only handle participate/don't participate, thus we use + # a simple "yes"/"no" form. + rsvp_data = {"fields": []} + event_data["rsvp"] = [rsvp_data] + rsvp_data["fields"].append({ + "type": "list-single", + "name": "attending", + "label": "Attending", + "options": [ + {"label": "yes", "value": "yes"}, + {"label": "no", "value": "no"} + ], + "required": True + }) + + # comments + + if ap_object.get("commentsEnabled"): + __, comments_node = await self.apg.getCommentsNodes(object_id, None) + event_data["comments"] = { + "service": author_jid, + "node": comments_node, + } + + # extra + # part of extra come from "attachment" above + + status = ap_object.get("ical:status") + if status is None: + pass + elif status in ("CONFIRMED", "CANCELLED", "TENTATIVE"): + extra["status"] = status.lower() + else: + log.warning(f"unknown event status: {status}") + + return event_data + + async def ap_item_2_event_data_and_elt( + self, + ap_item: dict + ) -> Tuple[dict, domish.Element]: + """Convert AP item to parsed event data and corresponding item element""" + event_data = await self.ap_item_2_event_data(ap_item) + event_elt = self._events.event_data_2_event_elt(event_data) + item_elt = domish.Element((None, "item")) + item_elt["id"] = event_data["id"] + item_elt.addChild(event_elt) + return event_data, item_elt + + async def ap_item_2_event_elt(self, ap_item: dict) -> domish.Element: + """Convert AP item to XMPP item element""" + __, item_elt = await self.ap_item_2_event_data_and_elt(ap_item) + return item_elt diff -r 384b7e6c2dbf -r 0aa7023dcd08 sat/plugins/plugin_comp_ap_gateway/http_server.py --- a/sat/plugins/plugin_comp_ap_gateway/http_server.py Wed Sep 21 22:43:55 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/http_server.py Thu Sep 22 00:01:41 2022 +0200 @@ -41,9 +41,9 @@ from .constants import ( NS_AP, CONTENT_TYPE_AP, TYPE_ACTOR, TYPE_INBOX, TYPE_SHARED_INBOX, TYPE_OUTBOX, - AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, ACTIVIY_NO_ACCOUNT_ALLOWED, - SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, - TYPE_REACTION, ST_AP_CACHE + TYPE_EVENT, AP_REQUEST_TYPES, PAGE_SIZE, ACTIVITY_TYPES_LOWER, + ACTIVIY_NO_ACCOUNT_ALLOWED, SIGN_HEADERS, HS2019, SIGN_EXP, TYPE_FOLLOWERS, + TYPE_FOLLOWING, TYPE_ITEM, TYPE_LIKE, TYPE_REACTION, ST_AP_CACHE ) from .regex import RE_SIG_PARAM @@ -84,6 +84,7 @@ f"internal error: {failure_.value}" ) request.finish() + raise failure_ async def webfinger(self, request): url_parsed = parse.urlparse(request.uri.decode()) @@ -295,11 +296,14 @@ f"happen. Ignoring object from {signing_actor}\n{data}" ) raise exceptions.DataError("unexpected field in item") - if node is None: - node = self.apg._m.namespace client = await self.apg.getVirtualClient(signing_actor) objects = await self.apg.apGetList(data, "object") for obj in objects: + if node is None: + if obj.get("type") == TYPE_EVENT: + node = self.apg._events.namespace + else: + node = self.apg._m.namespace sender = await self.apg.apGetSenderActor(obj) if repeated: # we don't check sender when item is repeated, as it should be different @@ -353,6 +357,7 @@ "Ignoring object not attributed to signing actor: {obj}" ) continue + await self.apg.newAPItem(client, account_jid, node, obj) async def handleCreateActivity( @@ -367,6 +372,20 @@ ): await self.handleNewAPItems(request, data, account_jid, node, signing_actor) + async def handleUpdateActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ): + # Update is the same as create: the item ID stays the same, thus the item will be + # overwritten + await self.handleNewAPItems(request, data, account_jid, node, signing_actor) + async def handleAnnounceActivity( self, request: "HTTPRequest", @@ -511,6 +530,32 @@ "reactions": {"operation": "update", "add": [data["content"]]} }) + async def handleJoinActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ) -> None: + client = await self.apg.getVirtualClient(signing_actor) + await self.handleAttachmentItem(client, data, {"rsvp": {"attending": "yes"}}) + + async def handleLeaveActivity( + self, + request: "HTTPRequest", + data: dict, + account_jid: Optional[jid.JID], + node: Optional[str], + ap_account: Optional[str], + ap_url: str, + signing_actor: str + ) -> None: + client = await self.apg.getVirtualClient(signing_actor) + await self.handleAttachmentItem(client, data, {"rsvp": {"attending": "no"}}) + async def APActorRequest( self, request: "HTTPRequest", @@ -531,6 +576,13 @@ preferred_username = ap_account.split("@", 1)[0] identity_data = await self.apg._i.getIdentity(self.apg.client, account_jid) + if node and node.startswith(self.apg._events.namespace): + events = outbox + else: + events_account = await self.apg.getAPAccountFromJidAndNode( + account_jid, self.apg._events.namespace + ) + events = self.apg.buildAPURL(TYPE_OUTBOX, events_account) actor_data = { "@context": [ @@ -543,6 +595,7 @@ "preferredUsername": preferred_username, "inbox": inbox, "outbox": outbox, + "events": events, "followers": followers, "following": following, "publicKey": { @@ -551,7 +604,8 @@ "publicKeyPem": self.apg.public_key_pem }, "endpoints": { - "sharedInbox": shared_inbox + "sharedInbox": shared_inbox, + "events": events, }, } @@ -633,13 +687,17 @@ base_url = self.getCanonicalURL(request) url = f"{base_url}?{parse.urlencode(query_data, True)}" - data = { - "@context": "https://www.w3.org/ns/activitystreams", - "id": url, - "type": "OrderedCollectionPage", - "partOf": base_url, - "orderedItems" : [ - await self.apg.mbdata2APitem( + if node and node.startswith(self.apg._events.namespace): + ordered_items = [ + await self.apg.ap_events.event_data_2_ap_item( + self.apg._events.event_elt_2_event_data(item), + account_jid + ) + for item in reversed(items) + ] + else: + ordered_items = [ + await self.apg.mb_data_2_ap_item( self.apg.client, await self.apg._m.item2mbdata( self.apg.client, @@ -650,6 +708,12 @@ ) for item in reversed(items) ] + data = { + "@context": ["https://www.w3.org/ns/activitystreams"], + "id": url, + "type": "OrderedCollectionPage", + "partOf": base_url, + "orderedItems": ordered_items } # AP OrderedCollection must be in reversed chronological order, thus the opposite @@ -718,7 +782,7 @@ url_first_page = f"{url}?{parse.urlencode({'page': 'first'})}" url_last_page = f"{url}?{parse.urlencode({'page': 'last'})}" return { - "@context": "https://www.w3.org/ns/activitystreams", + "@context": ["https://www.w3.org/ns/activitystreams"], "id": url, "totalItems": items_count, "type": "OrderedCollection", @@ -737,8 +801,6 @@ ) -> None: if signing_actor is None: raise exceptions.InternalError("signing_actor must be set for inbox requests") - if node is None: - node = self.apg._m.namespace try: data = json.load(request.content) if not isinstance(data, dict): @@ -805,7 +867,7 @@ url = self.getCanonicalURL(request) return { - "@context": "https://www.w3.org/ns/activitystreams", + "@context": ["https://www.w3.org/ns/activitystreams"], "type": "OrderedCollection", "id": url, "totalItems": len(subscribers), @@ -844,7 +906,7 @@ url = self.getCanonicalURL(request) return { - "@context": "https://www.w3.org/ns/activitystreams", + "@context": ["https://www.w3.org/ns/activitystreams"], "type": "OrderedCollection", "id": url, "totalItems": len(subscriptions), @@ -901,7 +963,8 @@ return static.File(str(avatar_path)).render(request) elif request_type == "item": ret_data = await self.apg.apGetLocalObject(ap_url) - ret_data["@context"] = NS_AP + if "@context" not in ret_data: + ret_data["@context"] = [NS_AP] else: if len(extra_args) > 1: log.warning(f"unexpected extra arguments: {extra_args!r}") diff -r 384b7e6c2dbf -r 0aa7023dcd08 sat/plugins/plugin_comp_ap_gateway/pubsub_service.py --- a/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Wed Sep 21 22:43:55 2022 +0200 +++ b/sat/plugins/plugin_comp_ap_gateway/pubsub_service.py Thu Sep 22 00:01:41 2022 +0200 @@ -127,6 +127,22 @@ await self.apg.convertAndPostItems( client, ap_account, service, nodeIdentifier, items ) + cached_node = await self.host.memory.storage.getPubsubNode( + client, service, nodeIdentifier, with_subscriptions=True, create=True + ) + await self.host.memory.storage.cachePubsubItems( + client, + cached_node, + items + ) + for subscription in cached_node.subscriptions: + if subscription.state != SubscriptionState.SUBSCRIBED: + continue + self.notifyPublish( + service, + nodeIdentifier, + [(subscription.subscriber, None, items)] + ) async def apFollowing2Elt(self, ap_item: dict) -> domish.Element: """Convert actor ID from following collection to XMPP item""" @@ -334,14 +350,17 @@ if cached_node is None: raise error.StanzaError("item-not-found") else: - if not node.startswith(self.apg._m.namespace): + if node.startswith(self.apg._m.namespace): + parser = self.apg.ap_item_2_mb_elt + elif node.startswith(self.apg._events.namespace): + parser = self.apg.ap_events.ap_item_2_event_elt + else: raise error.StanzaError( "feature-not-implemented", text=f"AP Gateway {C.APP_VERSION} only supports " f"{self.apg._m.namespace} node for now" ) collection_name = "outbox" - parser = self.apg.apItem2Elt use_cache = True if use_cache: @@ -433,6 +452,13 @@ async def subscribe(self, requestor, service, nodeIdentifier, subscriber): # TODO: handle comments nodes client = self.apg.client + # we use PENDING state for microblog, it will be set to SUBSCRIBED once the Follow + # is accepted. Other nodes are directly set to subscribed, their subscriptions + # being internal. + if nodeIdentifier == self.apg._m.namespace: + sub_state = SubscriptionState.PENDING + else: + sub_state = SubscriptionState.SUBSCRIBED node = await self.host.memory.storage.getPubsubNode( client, service, nodeIdentifier, with_subscriptions=True ) @@ -455,13 +481,13 @@ if subscription is None: subscription = PubsubSub( subscriber=requestor.userhostJID(), - state=SubscriptionState.PENDING + state=sub_state ) node.subscriptions.append(subscription) await self.host.memory.storage.add(node) else: if subscription.state is None: - subscription.state = SubscriptionState.PENDING + subscription.state = sub_state await self.host.memory.storage.add(node) elif subscription.state == SubscriptionState.SUBSCRIBED: log.info( @@ -473,13 +499,17 @@ f"{requestor.userhostJID()} has already a pending subscription to " f"{node!r} at {service}. Doing the request anyway." ) + if sub_state != SubscriptionState.PENDING: + subscription.state = sub_state + await self.host.memory.storage.add(node) else: raise exceptions.InternalError( f"unmanaged subscription state: {subscription.state}" ) - if nodeIdentifier == self.apg._m.namespace: - # if we subscribe to microblog node, we follow the corresponding account + if nodeIdentifier in (self.apg._m.namespace, self.apg._events.namespace): + # if we subscribe to microblog or events node, we follow the corresponding + # account req_actor_id, recip_actor_id, inbox = await self.getAPActorIdsAndInbox( requestor, service ) @@ -490,7 +520,7 @@ if resp.code >= 300: text = await resp.text() raise error.StanzaError("service-unavailable", text=text) - return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") + return pubsub.Subscription(nodeIdentifier, requestor, "subscribed") @ensure_deferred async def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):