view libervia/backend/plugins/plugin_comp_ap_gateway/ad_hoc.py @ 4141:ba8ddfdd334f

cli (loops): run GLib loop in same thread as asyncio: use the new `install_glib_asyncio_iteration` to run GLib in the same thread as asyncio. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:05:53 +0100
parents 4b842c1fb686
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia ActivityPub Gateway
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish
from wokkel import data_form

from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger


log = getLogger(__name__)
NS_XMPP_JID_NODE_2_AP = "https://libervia.org/ap_gateway/xmpp_jid_node_2_ap_actor"

class APAdHocService:
    """Ad-Hoc commands for AP Gateway"""

    def __init__(self, apg):
        self.host = apg.host
        self.apg = apg
        self._c = self.host.plugins["XEP-0050"]

    def init(self, client: SatXMPPEntity) -> None:
        self._c.add_ad_hoc_command(
            client,
            self.xmpp_jid_node_2_ap_actor,
            "Convert XMPP JID/Node to AP actor",
            node=NS_XMPP_JID_NODE_2_AP,
            allowed_magics=C.ENTITY_ALL,
        )

    async def xmpp_jid_node_2_ap_actor(
        self,
        client: SatXMPPEntity,
        command_elt: domish.Element,
        session_data: dict,
        action: str,
        node: str
    ):
        try:
            x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
            command_form = data_form.Form.fromElement(x_elt)
        except StopIteration:
            command_form = None
        if command_form is None or len(command_form.fields) == 0:
            # root request
            status = self._c.STATUS.EXECUTING
            form = data_form.Form(
                "form", title="XMPP JID/node to AP actor conversion",
                formNamespace=NS_XMPP_JID_NODE_2_AP
            )

            field = data_form.Field(
                "text-single", "jid", required=True
            )
            form.addField(field)

            field = data_form.Field(
                "text-single", "node", required=False
            )
            form.addField(field)

            payload = form.toElement()
            return payload, status, None, None
        else:
            xmpp_jid = jid.JID(command_form["jid"])
            xmpp_node = command_form.get("node")
            actor = await self.apg.get_ap_account_from_jid_and_node(xmpp_jid, xmpp_node)
            note = (self._c.NOTE.INFO, actor)
            status = self._c.STATUS.COMPLETED
            payload = None
            return (payload, status, None, note)