diff libervia/backend/test/helpers_plugins.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/test/helpers_plugins.py@524856bd7b19
children 0d7bb4df2343
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/test/helpers_plugins.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,301 @@
+#!/usr/bin/env python3
+
+
+# SAT: a jabber client
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+""" Helpers class for plugin dependencies """
+
+from twisted.internet import defer
+
+from wokkel.muc import Room, User
+from wokkel.generic import parseXml
+from wokkel.disco import DiscoItem, DiscoItems
+
+# temporary until the changes are integrated to Wokkel
+from sat_tmp.wokkel.rsm import RSMResponse
+
+from .constants import Const as C
+from libervia.backend.plugins import plugin_xep_0045
+from collections import OrderedDict
+
+
+class FakeMUCClient(object):
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+        self.host = plugin_parent.host
+        self.joined_rooms = {}
+
+    def join(self, room_jid, nick, options=None, profile_key=C.PROF_KEY_NONE):
+        """
+        @param room_jid: the room JID
+        @param nick: nick to be used in the room
+        @param options: joining options
+        @param profile_key: the profile key of the user joining the room
+        @return: the deferred joined wokkel.muc.Room instance
+        """
+        profile = self.host.memory.get_profile_name(profile_key)
+        roster = {}
+
+        # ask the other profiles to fill our roster
+        for i in range(0, len(C.PROFILE)):
+            other_profile = C.PROFILE[i]
+            if other_profile == profile:
+                continue
+            try:
+                other_room = self.plugin_parent.clients[other_profile].joined_rooms[
+                    room_jid
+                ]
+                roster.setdefault(
+                    other_room.nick, User(other_room.nick, C.PROFILE_DICT[other_profile])
+                )
+                for other_nick in other_room.roster:
+                    roster.setdefault(other_nick, other_room.roster[other_nick])
+            except (AttributeError, KeyError):
+                pass
+
+        # rename our nick if it already exists
+        while nick in list(roster.keys()):
+            if C.PROFILE_DICT[profile].userhost() == roster[nick].entity.userhost():
+                break  # same user with different resource --> same nickname
+            nick = nick + "_"
+
+        room = Room(room_jid, nick)
+        room.roster = roster
+        self.joined_rooms[room_jid] = room
+
+        # fill the other rosters with the new entry
+        for i in range(0, len(C.PROFILE)):
+            other_profile = C.PROFILE[i]
+            if other_profile == profile:
+                continue
+            try:
+                other_room = self.plugin_parent.clients[other_profile].joined_rooms[
+                    room_jid
+                ]
+                other_room.roster.setdefault(
+                    room.nick, User(room.nick, C.PROFILE_DICT[profile])
+                )
+            except (AttributeError, KeyError):
+                pass
+
+        return defer.succeed(room)
+
+    def leave(self, roomJID, profile_key=C.PROF_KEY_NONE):
+        """
+        @param roomJID: the room JID
+        @param profile_key: the profile key of the user joining the room
+        @return: a dummy deferred
+        """
+        profile = self.host.memory.get_profile_name(profile_key)
+        room = self.joined_rooms[roomJID]
+        # remove ourself from the other rosters
+        for i in range(0, len(C.PROFILE)):
+            other_profile = C.PROFILE[i]
+            if other_profile == profile:
+                continue
+            try:
+                other_room = self.plugin_parent.clients[other_profile].joined_rooms[
+                    roomJID
+                ]
+                del other_room.roster[room.nick]
+            except (AttributeError, KeyError):
+                pass
+        del self.joined_rooms[roomJID]
+        return defer.Deferred()
+
+
+class FakeXEP_0045(plugin_xep_0045.XEP_0045):
+    def __init__(self, host):
+        self.host = host
+        self.clients = {}
+        for profile in C.PROFILE:
+            self.clients[profile] = FakeMUCClient(self)
+
+    def join(self, room_jid, nick, options={}, profile_key="@DEFAULT@"):
+        """
+        @param roomJID: the room JID
+        @param nick: nick to be used in the room
+        @param options: ignore
+        @param profile_key: the profile of the user joining the room
+        @return: the deferred joined wokkel.muc.Room instance or None
+        """
+        profile = self.host.memory.get_profile_name(profile_key)
+        if room_jid in self.clients[profile].joined_rooms:
+            return defer.succeed(None)
+        room = self.clients[profile].join(room_jid, nick, profile_key=profile)
+        return room
+
+    def join_room(self, muc_index, user_index):
+        """Called by tests
+        @return: the nickname of the user who joined room"""
+        muc_jid = C.MUC[muc_index]
+        nick = C.JID[user_index].user
+        profile = C.PROFILE[user_index]
+        self.join(muc_jid, nick, profile_key=profile)
+        return self.get_nick(muc_index, user_index)
+
+    def leave(self, room_jid, profile_key="@DEFAULT@"):
+        """
+        @param roomJID: the room JID
+        @param profile_key: the profile of the user leaving the room
+        @return: a dummy deferred
+        """
+        profile = self.host.memory.get_profile_name(profile_key)
+        if room_jid not in self.clients[profile].joined_rooms:
+            raise plugin_xep_0045.UnknownRoom("This room has not been joined")
+        return self.clients[profile].leave(room_jid, profile)
+
+    def leave_room(self, muc_index, user_index):
+        """Called by tests
+        @return: the nickname of the user who left the room"""
+        muc_jid = C.MUC[muc_index]
+        nick = self.get_nick(muc_index, user_index)
+        profile = C.PROFILE[user_index]
+        self.leave(muc_jid, profile_key=profile)
+        return nick
+
+    def get_room(self, muc_index, user_index):
+        """Called by tests
+        @return: a wokkel.muc.Room instance"""
+        profile = C.PROFILE[user_index]
+        muc_jid = C.MUC[muc_index]
+        try:
+            return self.clients[profile].joined_rooms[muc_jid]
+        except (AttributeError, KeyError):
+            return None
+
+    def get_nick(self, muc_index, user_index):
+        try:
+            return self.get_room_nick(C.MUC[muc_index], C.PROFILE[user_index])
+        except (KeyError, AttributeError):
+            return ""
+
+    def get_nick_of_user(self, muc_index, user_index, profile_index, secure=True):
+        try:
+            room = self.clients[C.PROFILE[profile_index]].joined_rooms[C.MUC[muc_index]]
+            return self.getRoomNickOfUser(room, C.JID[user_index])
+        except (KeyError, AttributeError):
+            return None
+
+
+class FakeXEP_0249(object):
+    def __init__(self, host):
+        self.host = host
+
+    def invite(self, target, room, options={}, profile_key="@DEFAULT@"):
+        """
+        Invite a user to a room. To accept the invitation from a test,
+        just call FakeXEP_0045.join_room (no need to have a dedicated method).
+        @param target: jid of the user to invite
+        @param room: jid of the room where the user is invited
+        @options: attribute with extra info (reason, password) as in #XEP-0249
+        @profile_key: %(doc_profile_key)s
+        """
+        pass
+
+
+class FakeSatPubSubClient(object):
+    def __init__(self, host, parent_plugin):
+        self.host = host
+        self.parent_plugin = parent_plugin
+        self.__items = OrderedDict()
+        self.__rsm_responses = {}
+
+    def createNode(self, service, nodeIdentifier=None, options=None, sender=None):
+        return defer.succeed(None)
+
+    def deleteNode(self, service, nodeIdentifier, sender=None):
+        try:
+            del self.__items[nodeIdentifier]
+        except KeyError:
+            pass
+        return defer.succeed(None)
+
+    def subscribe(self, service, nodeIdentifier, subscriber, options=None, sender=None):
+        return defer.succeed(None)
+
+    def unsubscribe(
+        self,
+        service,
+        nodeIdentifier,
+        subscriber,
+        subscriptionIdentifier=None,
+        sender=None,
+    ):
+        return defer.succeed(None)
+
+    def publish(self, service, nodeIdentifier, items=None, sender=None):
+        node = self.__items.setdefault(nodeIdentifier, [])
+
+        def replace(item_obj):
+            index = 0
+            for current in node:
+                if current["id"] == item_obj["id"]:
+                    node[index] = item_obj
+                    return True
+                index += 1
+            return False
+
+        for item in items:
+            item_obj = parseXml(item) if isinstance(item, str) else item
+            if not replace(item_obj):
+                node.append(item_obj)
+        return defer.succeed(None)
+
+    def items(
+        self,
+        service,
+        nodeIdentifier,
+        maxItems=None,
+        itemIdentifiers=None,
+        subscriptionIdentifier=None,
+        sender=None,
+        ext_data=None,
+    ):
+        try:
+            items = self.__items[nodeIdentifier]
+        except KeyError:
+            items = []
+        if ext_data:
+            assert "id" in ext_data
+            if "rsm" in ext_data:
+                args = (0, items[0]["id"], items[-1]["id"]) if items else ()
+                self.__rsm_responses[ext_data["id"]] = RSMResponse(len(items), *args)
+        return defer.succeed(items)
+
+    def retract_items(self, service, nodeIdentifier, itemIdentifiers, sender=None):
+        node = self.__items[nodeIdentifier]
+        for item in [item for item in node if item["id"] in itemIdentifiers]:
+            node.remove(item)
+        return defer.succeed(None)
+
+    def get_rsm_response(self, id):
+        if id not in self.__rsm_responses:
+            return {}
+        result = self.__rsm_responses[id].toDict()
+        del self.__rsm_responses[id]
+        return result
+
+    def subscriptions(self, service, nodeIdentifier, sender=None):
+        return defer.succeed([])
+
+    def service_get_disco_items(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
+        items = DiscoItems()
+        for item in list(self.__items.keys()):
+            items.append(DiscoItem(service, item))
+        return defer.succeed(items)