Mercurial > libervia-backend
diff sat/plugins/plugin_comp_ap_gateway/__init__.py @ 3729:86eea17cafa7
component AP gateway: split plugin in several files:
constants, HTTP server and Pubsub service have been put in separated files.
rel: 363
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 31 Jan 2022 18:35:49 +0100 |
parents | sat/plugins/plugin_comp_ap_gateway.py@b15644cae50d |
children | bf0505d41c09 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_comp_ap_gateway/__init__.py Mon Jan 31 18:35:49 2022 +0100 @@ -0,0 +1,853 @@ +#!/usr/bin/env python3 + +# Libervia ActivityPub Gateway +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import base64 +import hashlib +import json +from pathlib import Path +from typing import Optional, Dict, Tuple, List, Union +from urllib import parse +import calendar +import re + +import dateutil +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import padding +import shortuuid +import treq +from treq.response import _Response as TReqResponse +from twisted.internet import defer, reactor, threads +from twisted.web import http +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish +from wokkel import rsm + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.tools import utils +from sat.tools.common import data_format, tls +from sat.tools.common.async_utils import async_lru + +from .constants import (IMPORT_NAME, CONF_SECTION, TYPE_ACTOR, TYPE_ITEM, MEDIA_TYPE_AP, + AP_MB_MAP, LRU_MAX_SIZE) +from .http_server import HTTPServer +from .pubsub_service import APPubsubService + + +log = getLogger(__name__) + +IMPORT_NAME = "ap-gateway" + +PLUGIN_INFO = { + C.PI_NAME: "ActivityPub Gateway component", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_MODES: [C.PLUG_MODE_COMPONENT], + C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0106", "XEP-0277", "XEP-0060"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "APGateway", + C.PI_HANDLER: C.BOOL_TRUE, + C.PI_DESCRIPTION: _( + "Gateway for bidirectional communication between XMPP and ActivityPub." + ), +} + +HEXA_ENC = r"(?P<hex>[0-9a-fA-f]{2})" +RE_PERIOD_ENC = re.compile(f"\\.{HEXA_ENC}") +RE_PERCENT_ENC = re.compile(f"%{HEXA_ENC}") +RE_ALLOWED_UNQUOTED = re.compile(r"^[a-zA-Z0-9_-]+$") + + +class APGateway: + + def __init__(self, host): + self.host = host + self.initialised = False + self._m = host.plugins["XEP-0277"] + self._p = host.plugins["XEP-0060"] + + host.bridge.addMethod( + "APSend", + ".plugin", + in_sign="sss", + out_sign="", + method=self._publishMessage, + async_=True, + ) + + def getHandler(self, __): + return APPubsubService(self) + + async def init(self, client): + if self.initialised: + return + + self.initialised = True + log.info(_("ActivityPub Gateway initialization")) + + # RSA keys + stored_data = await self.host.memory.storage.getPrivates( + IMPORT_NAME, ["rsa_key"], profile=client.profile + ) + private_key_pem = stored_data.get("rsa_key") + if private_key_pem is None: + self.private_key = await threads.deferToThread( + rsa.generate_private_key, + public_exponent=65537, + key_size=4096, + ) + private_key_pem = self.private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ).decode() + await self.host.memory.storage.setPrivateValue( + IMPORT_NAME, "rsa_key", private_key_pem, profile=client.profile + ) + else: + self.private_key = serialization.load_pem_private_key( + private_key_pem.encode(), + password=None, + ) + self.public_key = self.private_key.public_key() + self.public_key_pem = self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ).decode() + + # params + # URL and port + self.public_url = self.host.memory.getConfig( + CONF_SECTION, "public_url" + ) or self.host.memory.getConfig( + CONF_SECTION, "xmpp_domain" + ) + if self.public_url is None: + log.error( + '"public_url" not set in configuration, this is mandatory to have' + "ActivityPub Gateway running. Please set this option it to public facing " + f"url in {CONF_SECTION!r} configuration section." + ) + return + if parse.urlparse(self.public_url).scheme: + log.error( + "Scheme must not be specified in \"public_url\", please remove it from " + "\"public_url\" configuration option. ActivityPub Gateway won't be run." + ) + return + self.http_port = int(self.host.memory.getConfig( + CONF_SECTION, 'http_port', 8123)) + connection_type = self.host.memory.getConfig( + CONF_SECTION, 'http_connection_type', 'https') + if connection_type not in ('http', 'https'): + raise exceptions.ConfigError( + 'bad ap-gateay http_connection_type, you must use one of "http" or ' + '"https"' + ) + self.max_items = self.host.memory.getConfig( + CONF_SECTION, 'new_node_max_items', 50 + + ) + self.ap_path = self.host.memory.getConfig(CONF_SECTION, 'ap_path', '_ap') + self.base_ap_url = parse.urljoin(f"https://{self.public_url}", f"{self.ap_path}/") + # True (default) if we provide gateway only to entities/services from our server + self.local_only = C.bool( + self.host.memory.getConfig(CONF_SECTION, 'local_only', C.BOOL_TRUE) + ) + + # HTTP server launch + self.server = HTTPServer(self) + if connection_type == 'http': + reactor.listenTCP(self.http_port, self.server) + else: + options = tls.getOptionsFromConfig( + self.host.memory.config, CONF_SECTION) + tls.TLSOptionsCheck(options) + context_factory = tls.getTLSContextFactory(options) + reactor.listenSSL(self.http_port, self.server, context_factory) + + async def profileConnecting(self, client): + self.client = client + await self.init(client) + + async def apGet(self, url: str) -> dict: + """Retrieve AP JSON from given URL + + @raise error.StanzaError: "service-unavailable" is sent when something went wrong + with AP server + """ + try: + return await treq.json_content(await treq.get( + url, + headers = { + "Accept": [MEDIA_TYPE_AP], + "Content-Type": [MEDIA_TYPE_AP], + } + )) + except Exception as e: + raise error.StanzaError( + "service-unavailable", + text=f"Can't get AP data at {url}: {e}" + ) + + def mustEncode(self, text: str) -> bool: + """Indicate if a text must be period encoded""" + return ( + not RE_ALLOWED_UNQUOTED.match(text) + or text.startswith("___") + or "---" in text + ) + + def periodEncode(self, text: str) -> str: + """Period encode a text + + see [getJIDAndNode] for reasons of period encoding + """ + return ( + parse.quote(text, safe="") + .replace("---", "%2d%2d%2d") + .replace("___", "%5f%5f%5f") + .replace(".", "%2e") + .replace("~", "%7e") + .replace("%", ".") + ) + + async def getAPAccountFromJidAndNode( + self, + jid_: jid.JID, + node: Optional[str] + ) -> str: + """Construct AP account from JID and node + + The account construction will use escaping when necessary + """ + if not node or node == self._m.namespace: + node = None + + if node and not jid_.user and not self.mustEncode(node): + is_pubsub = await self.isPubsub(jid_) + # when we have a pubsub service, the user part can be used to set the node + # this produces more user-friendly AP accounts + if is_pubsub: + jid_.user = node + node = None + + is_local = self.isLocal(jid_) + user = jid_.user if is_local else jid_.userhost() + if user is None: + user = "" + account_elts = [] + if node and self.mustEncode(node) or self.mustEncode(user): + account_elts = ["___"] + if node: + node = self.periodEncode(node) + user = self.periodEncode(user) + + if not user: + raise exceptions.InternalError("there should be a user part") + + if node: + account_elts.extend((node, "---")) + + account_elts.extend(( + user, "@", jid_.host if is_local else self.client.jid.userhost() + )) + return "".join(account_elts) + + def isLocal(self, jid_: jid.JID) -> bool: + """Returns True if jid_ use a domain or subdomain of gateway's host""" + local_host = self.client.host.split(".") + assert local_host + return jid_.host.split(".")[-len(local_host):] == local_host + + async def isPubsub(self, jid_: jid.JID) -> bool: + """Indicate if a JID is a Pubsub service""" + host_disco = await self.host.getDiscoInfos(self.client, jid_) + return ( + ("pubsub", "service") in host_disco.identities + and not ("pubsub", "pep") in host_disco.identities + ) + + async def getJIDAndNode(self, ap_account: str) -> Tuple[jid.JID, Optional[str]]: + """Decode raw AP account handle to get XMPP JID and Pubsub Node + + Username are case insensitive. + + By default, the username correspond to local username (i.e. username from + component's server). + + If local name's domain is a pubsub service (and not PEP), the username is taken as + a pubsub node. + + If ``---`` is present in username, the part before is used as pubsub node, and the + rest as a JID user part. + + If username starts with ``___``, characters are encoded using period encoding + (i.e. percent encoding where a ``.`` is used instead of ``%``). + + This horror is necessary due to limitation in some AP implementation (notably + Mastodon), cf. https://github.com/mastodon/mastodon/issues/17222 + + examples: + + ``toto@example.org`` => JID = toto@example.org, node = None + + ``___toto.40example.net@example.org`` => JID = toto@example.net (this one is a + non-local JID, and will work only if setings ``local_only`` is False), node = None + + ``toto@pubsub.example.org`` (with pubsub.example.org being a pubsub service) => + JID = pubsub.example.org, node = toto + + ``tata---toto@example.org`` => JID = toto@example.org, node = tata + + ``___urn.3axmpp.3amicroblog.3a0@pubsub.example.org`` (with pubsub.example.org + being a pubsub service) ==> JID = pubsub.example.org, node = urn:xmpp:microblog:0 + + @param ap_account: ActivityPub account handle (``username@domain.tld``) + @return: service JID and pubsub node + if pubsub is None, default microblog pubsub node (and possibly other nodes + that plugins may hanlde) will be used + @raise ValueError: invalid account + @raise PermissionError: non local jid is used when gateway doesn't allow them + """ + if ap_account.count("@") != 1: + raise ValueError("Invalid AP account") + if ap_account.startswith("___"): + encoded = True + ap_account = ap_account[3:] + else: + encoded = False + + username, domain = ap_account.split("@") + + if "---" in username: + node, username = username.rsplit("---", 1) + else: + node = None + + if encoded: + username = parse.unquote( + RE_PERIOD_ENC.sub(r"%\g<hex>", username), + errors="strict" + ) + if node: + node = parse.unquote( + RE_PERIOD_ENC.sub(r"%\g<hex>", node), + errors="strict" + ) + + if "@" in username: + username, domain = username.rsplit("@", 1) + + if not node: + # we need to check host disco, because disco request to user may be + # blocked for privacy reason (see + # https://xmpp.org/extensions/xep-0030.html#security) + is_pubsub = await self.isPubsub(jid.JID(domain)) + + if is_pubsub: + # if the host is a pubsub service and not a PEP, we consider that username + # is in fact the node name + node = username + username = None + + jid_s = f"{username}@{domain}" if username else domain + try: + jid_ = jid.JID(jid_s) + except RuntimeError: + raise ValueError(f"Invalid jid: {jid_s!r}") + + if self.local_only and not self.isLocal(jid_): + raise exceptions.PermissionError( + "This gateway is configured to map only local entities and services" + ) + + return jid_, node + + def parseAPURL(self, url: str) -> Tuple[str, str]: + """Parse an URL leading to an AP endpoint + + @param url: URL to parse (schema is not mandatory) + @return: endpoint type and AP account + """ + path = parse.urlparse(url).path.lstrip("/") + type_, account = path[len(self.ap_path):].lstrip("/").split("/", 1) + return type_, parse.unquote(account) + + def buildAPURL(self, type_:str , *args: str) -> str: + """Build an AP endpoint URL + + @param type_: type of AP endpoing + @param arg: endpoint dependant arguments + """ + return parse.urljoin( + self.base_ap_url, + str(Path(type_).joinpath(*(parse.quote_plus(a) for a in args))) + ) + + async def signAndPost(self, url: str, url_actor: str, doc: dict) -> TReqResponse: + """Sign a documentent and post it to AP server + + @param url: AP server endpoint + @param url_actor: URL generated by this gateway for local actor + @param doc: document to send + """ + p_url = parse.urlparse(url) + date = http.datetimeToString().decode() + body = json.dumps(doc).encode() + digest_hash = base64.b64encode(hashlib.sha256(body).digest()).decode() + digest = f"sha-256={digest_hash}" + to_sign = ( + f"(request-target): post {p_url.path}\nhost: {p_url.hostname}\n" + f"date: {date}\ndigest: {digest}" + ) + signature = base64.b64encode(self.private_key.sign( + to_sign.encode(), + # we have to use PKCS1v15 padding to be compatible with Mastodon + padding.PKCS1v15(), + hashes.SHA256() + )).decode() + h_signature = ( + f'keyId="{url_actor}",headers="(request-target) host date digest",' + f'signature="{signature}"' + ) + return await treq.post( + url, + body, + headers={ + "Host": [p_url.hostname], + "Date": [date], + "Digest": [digest], + "Signature": [h_signature], + } + ) + + def _publishMessage(self, mess_data_s: str, service_s: str, profile: str): + mess_data: dict = data_format.deserialise(mess_data_s) # type: ignore + service = jid.JID(service_s) + client = self.host.getClient(profile) + return defer.ensureDeferred(self.publishMessage(client, mess_data, service)) + + async def getAPActorIdFromAccount(self, account: str) -> str: + """Retrieve account ID from it's handle using WebFinger + + @param account: AP handle (user@domain.tld) + @return: Actor ID (which is an URL) + """ + if account.count("@") != 1 or "/" in account: + raise ValueError("Invalid account: {account!r}") + host = account.split("@")[1] + try: + finger_data = await treq.json_content(await treq.get( + f"https://{host}/.well-known/webfinger?" + f"resource=acct:{parse.quote_plus(account)}", + )) + except Exception as e: + raise exceptions.DataError(f"Can't get webfinger data: {e}") + for link in finger_data.get("links", []): + if ( + link.get("type") == "application/activity+json" + and link.get("rel") == "self" + ): + href = link.get("href", "").strip() + if not href: + raise ValueError( + f"Invalid webfinger data for {account:r}: missing href" + ) + break + else: + raise ValueError( + f"No ActivityPub link found for {account!r}" + ) + return href + + async def getAPActorDataFromId(self, account: str) -> dict: + """Retrieve ActivityPub Actor data + + @param account: ActivityPub Actor identifier + """ + href = await self.getAPActorIdFromAccount(account) + return await self.apGet(href) + + @async_lru(maxsize=LRU_MAX_SIZE) + async def getAPAccountFromId(self, actor_id: str): + """Retrieve AP account from the ID URL + + @param actor_id: AP ID of the actor (URL to the actor data) + """ + url_parsed = parse.urlparse(actor_id) + actor_data = await self.apGet(actor_id) + username = actor_data.get("preferredUsername") + if not username: + raise exceptions.DataError( + 'No "preferredUsername" field found, can\'t retrieve actor account' + ) + account = f"{username}@{url_parsed.hostname}" + # we try to retrieve the actor ID from the account to check it + found_id = await self.getAPActorIdFromAccount(account) + if found_id != actor_id: + # cf. https://socialhub.activitypub.rocks/t/how-to-retrieve-user-server-tld-handle-from-actors-url/2196 + msg = ( + f"Account ID found on WebFinger {found_id!r} doesn't match our actor ID " + f"({actor_id!r}). This AP instance doesn't seems to use " + '"preferredUsername" as we expect.' + ) + log.warning(msg) + raise exceptions.DataError(msg) + return account + + async def getAPItems( + self, + account: str, + max_items: Optional[int] = None, + chronological_pagination: bool = True, + after_id: Optional[str] = None, + start_index: Optional[int] = None, + ) -> Tuple[List[domish.Element], rsm.RSMResponse]: + """Retrieve AP items and convert them to XMPP items + + @param account: AP account handle to get items from + @param max_items: maximum number of items to retrieve + retrieve all items by default + @param chronological_pagination: get pages in chronological order + AP use reversed chronological order for pagination, "first" page returns more + recent items. If "chronological_pagination" is True, "last" AP page will be + retrieved first. + @param after_id: if set, retrieve items starting from given ID + Due to ActivityStream Collection Paging limitations, this is inefficient and + if ``after_id`` is not already in cache, we have to retrieve every page until + we find it. + In most common cases, ``after_id`` should be in cache though (client usually + use known ID when in-order pagination is used). + @param start_index: start retrieving items from the one with given index + Due to ActivityStream Collection Paging limitations, this is inefficient and + all pages before the requested index will be retrieved to count items. + @return: XMPP Pubsub items and corresponding RSM Response + Items are always returned in chronological order in the result + """ + actor_data = await self.getAPActorDataFromId(account) + outbox = actor_data.get("outbox") + rsm_resp: Dict[str, Union[bool, int]] = {} + if not outbox: + raise exceptions.DataError(f"No outbox found for actor {account}") + outbox_data = await self.apGet(outbox) + try: + count = outbox_data["totalItems"] + except KeyError: + log.warning( + f'"totalItems" not found in outbox of {account}, defaulting to 20' + ) + count = 20 + else: + log.info(f"{account}'s outbox has {count} item(s)") + rsm_resp["count"] = count + + if start_index is not None: + assert chronological_pagination and after_id is None + if start_index >= count: + return [], rsm_resp + elif start_index == 0: + # this is the default behaviour + pass + elif start_index > 5000: + raise error.StanzaError( + "feature-not-implemented", + text="Maximum limit for previous_index has been reached, this limit" + "is set to avoid DoS" + ) + else: + # we'll convert "start_index" to "after_id", thus we need the item just + # before "start_index" + previous_index = start_index - 1 + retrieved_items = 0 + current_page = outbox_data["last"] + while retrieved_items < count: + page_data, items = await self.parseAPPage(current_page) + if not items: + log.warning(f"found an empty AP page at {current_page}") + return [], rsm_resp + page_start_idx = retrieved_items + retrieved_items += len(items) + if previous_index <= retrieved_items: + after_id = items[previous_index - page_start_idx]["id"] + break + try: + current_page = page_data["prev"] + except KeyError: + log.warning( + f"missing previous page link at {current_page}: {page_data!r}" + ) + raise error.StanzaError( + "service-unavailable", + "Error while retrieving previous page from AP service at " + f"{current_page}" + ) + + init_page = "last" if chronological_pagination else "first" + page = outbox_data.get(init_page) + if not page: + raise exceptions.DataError( + f"Initial page {init_page!r} not found for outbox {outbox}" + ) + items = [] + page_items = [] + retrieved_items = 0 + found_after_id = False + + while retrieved_items < count: + __, page_items = await self.parseAPPage(page) + retrieved_items += len(page_items) + if after_id is not None and not found_after_id: + # if we have an after_id, we ignore all items until the requested one is + # found + try: + limit_idx = [i["id"] for i in page_items].index(after_id) + except ValueError: + # if "after_id" is not found, we don't add any item from this page + log.debug(f"{after_id!r} not found at {page}, skipping") + else: + found_after_id = True + if chronological_pagination: + start_index = retrieved_items - len(page_items) + limit_idx + 1 + page_items = page_items[limit_idx+1:] + else: + start_index = count - (retrieved_items - len(page_items) + + limit_idx + 1) + page_items = page_items[:limit_idx] + items.extend(page_items) + else: + items.extend(page_items) + if max_items is not None and len(items) >= max_items: + if chronological_pagination: + items = items[:max_items] + else: + items = items[-max_items:] + break + page = outbox_data.get("prev" if chronological_pagination else "next") + if not page: + break + + if after_id is not None and not found_after_id: + raise error.StanzaError("item-not-found") + + if after_id is None: + rsm_resp["index"] = 0 if chronological_pagination else count - len(items) + + if start_index is not None: + rsm_resp["index"] = start_index + elif after_id is not None: + log.warning("Can't determine index of first element") + elif chronological_pagination: + rsm_resp["index"] = 0 + else: + rsm_resp["index"] = count - len(items) + if items: + rsm_resp.update({ + "first": items[0]["id"], + "last": items[-1]["id"] + }) + + return items, rsm.RSMResponse(**rsm_resp) + + async def parseAPPage(self, url: str) -> Tuple[dict, List[domish.Element]]: + """Convert AP objects from an AP page to XMPP items + + @param url: url linking and AP page + @return: page data, pubsub items + """ + page_data = await self.apGet(url) + ap_items = page_data.get("orderedItems") + if not ap_items: + log.warning('No "orderedItems" collection found') + return page_data, [] + items = [] + # AP Collections are in antichronological order, but we expect chronological in + # Pubsub, thus we reverse it + for ap_item in reversed(ap_items): + try: + ap_object, mb_data = await self.apItem2MBdata(ap_item) + except (exceptions.DataError, NotImplementedError, error.StanzaError): + continue + + item_elt = await self._m.data2entry( + self.client, mb_data, ap_object["id"], None, self._m.namespace + ) + item_elt["publisher"] = mb_data["author_jid"].full() + items.append(item_elt) + + return page_data, items + + async def apItem2MBdata(self, ap_item: dict) -> Tuple[dict, dict]: + """Convert AP item to microblog data + + @return: AP Item's Object and microblog data + @raise exceptions.DataError: something is invalid in the AP item + @raise NotImplemented: some AP data is not handled yet + @raise error.StanzaError: error while contacting the AP server + """ + ap_object = ap_item.get("object") + if not ap_object: + log.warning(f'No "object" found in AP item {ap_item!r}') + raise exceptions.DataError + if isinstance(ap_object, str): + ap_object = await self.apGet(ap_object) + obj_id = ap_object.get("id") + if not obj_id: + log.warning(f'No "id" found in AP object: {ap_object!r}') + raise exceptions.DataError + if ap_object.get("inReplyTo") is not None: + raise NotImplementedError + mb_data = {} + for ap_key, mb_key in AP_MB_MAP.items(): + data = ap_object.get(ap_key) + if data is None: + continue + mb_data[mb_key] = data + + # content + try: + language, content_xhtml = ap_object["contentMap"].popitem() + except (KeyError, AttributeError): + try: + mb_data["content_xhtml"] = mb_data["content"] + except KeyError: + log.warning(f"no content found:\n{ap_object!r}") + raise exceptions.DataError + else: + mb_data["language"] = language + mb_data["content_xhtml"] = content_xhtml + + # author + actor = ap_item.get("actor") + if not actor: + log.warning(f"no actor associated to object id {obj_id!r}") + raise exceptions.DataError + elif isinstance(actor, list): + # we only keep first item of list as author + # TODO: handle multiple actors + if len(actor) > 1: + log.warning("multiple actors are not managed") + actor = actor[0] + + if isinstance(actor, dict): + actor = actor.get("id") + if not actor: + log.warning(f"no actor id found: {actor!r}") + raise exceptions.DataError + + if isinstance(actor, str): + account = await self.getAPAccountFromId(actor) + mb_data["author"] = account.split("@", 1)[0] + author_jid = mb_data["author_jid"] = jid.JID( + None, + ( + self.host.plugins["XEP-0106"].escape(account), + self.client.jid.host, + None + ) + ) + else: + log.warning(f"unknown actor type found: {actor!r}") + raise exceptions.DataError + + # published/updated + for field in ("published", "updated"): + value = ap_object.get(field) + if not value and field == "updated": + value = ap_object.get("published") + if value: + try: + mb_data[field] = calendar.timegm( + dateutil.parser.parse(str(value)).utctimetuple() + ) + except dateutil.parser.ParserError as e: + log.warning(f"Can't parse {field!r} field: {e}") + return ap_object, mb_data + + async def mbdata2APitem(self, client: SatXMPPEntity, mb_data: dict) -> dict: + """Convert Libervia Microblog Data to ActivityPub item""" + if not mb_data.get("id"): + mb_data["id"] = shortuuid.uuid() + if not mb_data.get("author_jid"): + mb_data["author_jid"] = client.jid + ap_account = await self.getAPAccountFromJidAndNode( + jid.JID(mb_data["author_jid"]), + None + ) + url_actor = self.buildAPURL(TYPE_ACTOR, ap_account) + url_item = self.buildAPURL(TYPE_ITEM, ap_account, mb_data["id"]) + return { + "@context": "https://www.w3.org/ns/activitystreams", + "id": url_item, + "type": "Create", + "actor": url_actor, + + "object": { + "id": url_item, + "type": "Note", + "published": utils.xmpp_date(mb_data["published"]), + "attributedTo": url_actor, + "content": mb_data.get("content_xhtml") or mb_data["content"], + "to": "https://www.w3.org/ns/activitystreams#Public" + } + } + + async def publishMessage( + self, + client: SatXMPPEntity, + mess_data: dict, + service: jid.JID + ) -> None: + """Send an AP message + + .. note:: + + This is a temporary method used for development only + + @param mess_data: message data. Following keys must be set: + + ``node`` + identifier of message which is being replied (this will + correspond to pubsub node in the future) + + ``content_xhtml`` or ``content`` + message body (respectively in XHTML or plain text) + + @param service: JID corresponding to the AP actor. + """ + if not service.user: + raise ValueError("service must have a local part") + account = self.host.plugins["XEP-0106"].unescape(service.user) + ap_actor_data = await self.getAPActorDataFromId(account) + + try: + inbox_url = ap_actor_data["endpoints"]["sharedInbox"] + except KeyError: + raise exceptions.DataError("Can't get ActivityPub actor inbox") + + item_data = await self.mbdata2APitem(client, mess_data) + url_actor = item_data["object"]["attributedTo"] + resp = await self.signAndPost(inbox_url, url_actor, item_data) + if resp.code != 202: + raise exceptions.NetworkError(f"unexpected return code: {resp.code}")