diff libervia/backend/plugins/plugin_xep_0249.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0249.py@c23cad65ae99
children a7d4007a8fa5
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0249.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,237 @@
+#!/usr/bin/env python3
+
+
+# SAT plugin for managing xep-0249
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import D_, _
+from libervia.backend.core.log import getLogger
+from libervia.backend.tools import xml_tools
+
+log = getLogger(__name__)
+
+
+
+try:
+    from twisted.words.protocols.xmlstream import XMPPHandler
+except ImportError:
+    from wokkel.subprotocols import XMPPHandler
+
+MESSAGE = "/message"
+NS_X_CONFERENCE = "jabber:x:conference"
+AUTOJOIN_KEY = "Misc"
+AUTOJOIN_NAME = "Auto-join MUC on invitation"
+AUTOJOIN_VALUES = ["ask", "always", "never"]
+
+PLUGIN_INFO = {
+    C.PI_NAME: "XEP 0249 Plugin",
+    C.PI_IMPORT_NAME: "XEP-0249",
+    C.PI_TYPE: "XEP",
+    C.PI_PROTOCOLS: ["XEP-0249"],
+    C.PI_DEPENDENCIES: ["XEP-0045"],
+    C.PI_RECOMMENDATIONS: [C.TEXT_CMDS],
+    C.PI_MAIN: "XEP_0249",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Implementation of Direct MUC Invitations"""),
+}
+
+
+class XEP_0249(object):
+
+    params = """
+    <params>
+    <individual>
+    <category name="%(category_name)s" label="%(category_label)s">
+        <param name="%(param_name)s" label="%(param_label)s" type="list" security="0">
+            %(param_options)s
+        </param>
+     </category>
+    </individual>
+    </params>
+    """ % {
+        "category_name": AUTOJOIN_KEY,
+        "category_label": _("Misc"),
+        "param_name": AUTOJOIN_NAME,
+        "param_label": _("Auto-join MUC on invitation"),
+        "param_options": "\n".join(
+            [
+                '<option value="%s" %s/>'
+                % (value, 'selected="true"' if value == AUTOJOIN_VALUES[0] else "")
+                for value in AUTOJOIN_VALUES
+            ]
+        ),
+    }
+
+    def __init__(self, host):
+        log.info(_("Plugin XEP_0249 initialization"))
+        self.host = host
+        host.memory.update_params(self.params)
+        host.bridge.add_method(
+            "muc_invite", ".plugin", in_sign="ssa{ss}s", out_sign="", method=self._invite
+        )
+        try:
+            self.host.plugins[C.TEXT_CMDS].register_text_commands(self)
+        except KeyError:
+            log.info(_("Text commands not available"))
+        host.register_namespace('x-conference', NS_X_CONFERENCE)
+        host.trigger.add("message_received", self._message_received_trigger)
+
+    def get_handler(self, client):
+        return XEP_0249_handler()
+
+    def _invite(self, guest_jid_s, room_jid_s, options, profile_key):
+        """Invite an user to a room
+
+        @param guest_jid_s: jid of the user to invite
+        @param service: jid of the MUC service
+        @param roomId: name of the room
+        @param profile_key: %(doc_profile_key)s
+        """
+        # TODO: check parameters validity
+        client = self.host.get_client(profile_key)
+        self.invite(client, jid.JID(guest_jid_s), jid.JID(room_jid_s, options))
+
+    def invite(self, client, guest, room, options={}):
+        """Invite a user to a room
+
+        @param guest(jid.JID): jid of the user to invite
+        @param room(jid.JID): jid of the room where the user is invited
+        @param options(dict): attribute with extra info (reason, password) as in #XEP-0249
+        """
+        message = domish.Element((None, "message"))
+        message["to"] = guest.full()
+        x_elt = message.addElement((NS_X_CONFERENCE, "x"))
+        x_elt["jid"] = room.userhost()
+        for key, value in options.items():
+            if key not in ("password", "reason", "thread"):
+                log.warning("Ignoring invalid invite option: {}".format(key))
+                continue
+            x_elt[key] = value
+        #  there is not body in this message, so we can use directly send()
+        client.send(message)
+
+    def _accept(self, room_jid, profile_key=C.PROF_KEY_NONE):
+        """Accept the invitation to join a MUC.
+
+        @param room (jid.JID): JID of the room
+        """
+        client = self.host.get_client(profile_key)
+        log.info(
+            _("Invitation accepted for room %(room)s [%(profile)s]")
+            % {"room": room_jid.userhost(), "profile": client.profile}
+        )
+        d = defer.ensureDeferred(
+            self.host.plugins["XEP-0045"].join(client, room_jid, client.jid.user, {})
+        )
+        return d
+
+    def _message_received_trigger(self, client, message_elt, post_treat):
+        """Check if a direct invitation is in the message, and handle it"""
+        x_elt = next(message_elt.elements(NS_X_CONFERENCE, 'x'), None)
+        if x_elt is None:
+            return True
+
+        try:
+            room_jid_s = x_elt["jid"]
+        except KeyError:
+            log.warning(_("invalid invitation received: {xml}").format(
+                xml=message_elt.toXml()))
+            return False
+        log.info(
+            _("Invitation received for room %(room)s [%(profile)s]")
+            % {"room": room_jid_s, "profile": client.profile}
+        )
+        from_jid_s = message_elt["from"]
+        room_jid = jid.JID(room_jid_s)
+        try:
+            self.host.plugins["XEP-0045"].check_room_joined(client, room_jid)
+        except exceptions.NotFound:
+            pass
+        else:
+            log.info(
+                _("Invitation silently discarded because user is already in the room.")
+            )
+            return
+
+        autojoin = self.host.memory.param_get_a(
+            AUTOJOIN_NAME, AUTOJOIN_KEY, profile_key=client.profile
+        )
+
+        if autojoin == "always":
+            self._accept(room_jid, client.profile)
+        elif autojoin == "never":
+            msg = D_(
+                "An invitation from %(user)s to join the room %(room)s has been "
+                "declined according to your personal settings."
+            ) % {"user": from_jid_s, "room": room_jid_s}
+            title = D_("MUC invitation")
+            xml_tools.quick_note(self.host, client, msg, title, C.XMLUI_DATA_LVL_INFO)
+        else:  # leave the default value here
+            confirm_msg = D_(
+                "You have been invited by %(user)s to join the room %(room)s. "
+                "Do you accept?"
+            ) % {"user": from_jid_s, "room": room_jid_s}
+            confirm_title = D_("MUC invitation")
+            d = xml_tools.defer_confirm(
+                self.host, confirm_msg, confirm_title, profile=client.profile
+            )
+
+            def accept_cb(accepted):
+                if accepted:
+                    self._accept(room_jid, client.profile)
+
+            d.addCallback(accept_cb)
+        return False
+
+    def cmd_invite(self, client, mess_data):
+        """invite someone in the room
+
+        @command (group): JID
+            - JID: the JID of the person to invite
+        """
+        contact_jid_s = mess_data["unparsed"].strip()
+        my_host = client.jid.host
+        try:
+            contact_jid = jid.JID(contact_jid_s)
+        except (RuntimeError, jid.InvalidFormat, AttributeError):
+            feedback = _(
+                "You must provide a valid JID to invite, like in '/invite "
+                "contact@{host}'"
+            ).format(host=my_host)
+            self.host.plugins[C.TEXT_CMDS].feed_back(client, feedback, mess_data)
+            return False
+        if not contact_jid.user:
+            contact_jid.user, contact_jid.host = contact_jid.host, my_host
+        self.invite(client, contact_jid, mess_data["to"])
+        return False
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0249_handler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_X_CONFERENCE)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []