Mercurial > libervia-backend
diff libervia/backend/test/helpers_plugins.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/test/helpers_plugins.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/test/helpers_plugins.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 + + +# SAT: a jabber client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" Helpers class for plugin dependencies """ + +from twisted.internet import defer + +from wokkel.muc import Room, User +from wokkel.generic import parseXml +from wokkel.disco import DiscoItem, DiscoItems + +# temporary until the changes are integrated to Wokkel +from sat_tmp.wokkel.rsm import RSMResponse + +from .constants import Const as C +from libervia.backend.plugins import plugin_xep_0045 +from collections import OrderedDict + + +class FakeMUCClient(object): + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + self.joined_rooms = {} + + def join(self, room_jid, nick, options=None, profile_key=C.PROF_KEY_NONE): + """ + @param room_jid: the room JID + @param nick: nick to be used in the room + @param options: joining options + @param profile_key: the profile key of the user joining the room + @return: the deferred joined wokkel.muc.Room instance + """ + profile = self.host.memory.get_profile_name(profile_key) + roster = {} + + # ask the other profiles to fill our roster + for i in range(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] + if other_profile == profile: + continue + try: + other_room = self.plugin_parent.clients[other_profile].joined_rooms[ + room_jid + ] + roster.setdefault( + other_room.nick, User(other_room.nick, C.PROFILE_DICT[other_profile]) + ) + for other_nick in other_room.roster: + roster.setdefault(other_nick, other_room.roster[other_nick]) + except (AttributeError, KeyError): + pass + + # rename our nick if it already exists + while nick in list(roster.keys()): + if C.PROFILE_DICT[profile].userhost() == roster[nick].entity.userhost(): + break # same user with different resource --> same nickname + nick = nick + "_" + + room = Room(room_jid, nick) + room.roster = roster + self.joined_rooms[room_jid] = room + + # fill the other rosters with the new entry + for i in range(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] + if other_profile == profile: + continue + try: + other_room = self.plugin_parent.clients[other_profile].joined_rooms[ + room_jid + ] + other_room.roster.setdefault( + room.nick, User(room.nick, C.PROFILE_DICT[profile]) + ) + except (AttributeError, KeyError): + pass + + return defer.succeed(room) + + def leave(self, roomJID, profile_key=C.PROF_KEY_NONE): + """ + @param roomJID: the room JID + @param profile_key: the profile key of the user joining the room + @return: a dummy deferred + """ + profile = self.host.memory.get_profile_name(profile_key) + room = self.joined_rooms[roomJID] + # remove ourself from the other rosters + for i in range(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] + if other_profile == profile: + continue + try: + other_room = self.plugin_parent.clients[other_profile].joined_rooms[ + roomJID + ] + del other_room.roster[room.nick] + except (AttributeError, KeyError): + pass + del self.joined_rooms[roomJID] + return defer.Deferred() + + +class FakeXEP_0045(plugin_xep_0045.XEP_0045): + def __init__(self, host): + self.host = host + self.clients = {} + for profile in C.PROFILE: + self.clients[profile] = FakeMUCClient(self) + + def join(self, room_jid, nick, options={}, profile_key="@DEFAULT@"): + """ + @param roomJID: the room JID + @param nick: nick to be used in the room + @param options: ignore + @param profile_key: the profile of the user joining the room + @return: the deferred joined wokkel.muc.Room instance or None + """ + profile = self.host.memory.get_profile_name(profile_key) + if room_jid in self.clients[profile].joined_rooms: + return defer.succeed(None) + room = self.clients[profile].join(room_jid, nick, profile_key=profile) + return room + + def join_room(self, muc_index, user_index): + """Called by tests + @return: the nickname of the user who joined room""" + muc_jid = C.MUC[muc_index] + nick = C.JID[user_index].user + profile = C.PROFILE[user_index] + self.join(muc_jid, nick, profile_key=profile) + return self.get_nick(muc_index, user_index) + + def leave(self, room_jid, profile_key="@DEFAULT@"): + """ + @param roomJID: the room JID + @param profile_key: the profile of the user leaving the room + @return: a dummy deferred + """ + profile = self.host.memory.get_profile_name(profile_key) + if room_jid not in self.clients[profile].joined_rooms: + raise plugin_xep_0045.UnknownRoom("This room has not been joined") + return self.clients[profile].leave(room_jid, profile) + + def leave_room(self, muc_index, user_index): + """Called by tests + @return: the nickname of the user who left the room""" + muc_jid = C.MUC[muc_index] + nick = self.get_nick(muc_index, user_index) + profile = C.PROFILE[user_index] + self.leave(muc_jid, profile_key=profile) + return nick + + def get_room(self, muc_index, user_index): + """Called by tests + @return: a wokkel.muc.Room instance""" + profile = C.PROFILE[user_index] + muc_jid = C.MUC[muc_index] + try: + return self.clients[profile].joined_rooms[muc_jid] + except (AttributeError, KeyError): + return None + + def get_nick(self, muc_index, user_index): + try: + return self.get_room_nick(C.MUC[muc_index], C.PROFILE[user_index]) + except (KeyError, AttributeError): + return "" + + def get_nick_of_user(self, muc_index, user_index, profile_index, secure=True): + try: + room = self.clients[C.PROFILE[profile_index]].joined_rooms[C.MUC[muc_index]] + return self.getRoomNickOfUser(room, C.JID[user_index]) + except (KeyError, AttributeError): + return None + + +class FakeXEP_0249(object): + def __init__(self, host): + self.host = host + + def invite(self, target, room, options={}, profile_key="@DEFAULT@"): + """ + Invite a user to a room. To accept the invitation from a test, + just call FakeXEP_0045.join_room (no need to have a dedicated method). + @param target: jid of the user to invite + @param room: jid of the room where the user is invited + @options: attribute with extra info (reason, password) as in #XEP-0249 + @profile_key: %(doc_profile_key)s + """ + pass + + +class FakeSatPubSubClient(object): + def __init__(self, host, parent_plugin): + self.host = host + self.parent_plugin = parent_plugin + self.__items = OrderedDict() + self.__rsm_responses = {} + + def createNode(self, service, nodeIdentifier=None, options=None, sender=None): + return defer.succeed(None) + + def deleteNode(self, service, nodeIdentifier, sender=None): + try: + del self.__items[nodeIdentifier] + except KeyError: + pass + return defer.succeed(None) + + def subscribe(self, service, nodeIdentifier, subscriber, options=None, sender=None): + return defer.succeed(None) + + def unsubscribe( + self, + service, + nodeIdentifier, + subscriber, + subscriptionIdentifier=None, + sender=None, + ): + return defer.succeed(None) + + def publish(self, service, nodeIdentifier, items=None, sender=None): + node = self.__items.setdefault(nodeIdentifier, []) + + def replace(item_obj): + index = 0 + for current in node: + if current["id"] == item_obj["id"]: + node[index] = item_obj + return True + index += 1 + return False + + for item in items: + item_obj = parseXml(item) if isinstance(item, str) else item + if not replace(item_obj): + node.append(item_obj) + return defer.succeed(None) + + def items( + self, + service, + nodeIdentifier, + maxItems=None, + itemIdentifiers=None, + subscriptionIdentifier=None, + sender=None, + ext_data=None, + ): + try: + items = self.__items[nodeIdentifier] + except KeyError: + items = [] + if ext_data: + assert "id" in ext_data + if "rsm" in ext_data: + args = (0, items[0]["id"], items[-1]["id"]) if items else () + self.__rsm_responses[ext_data["id"]] = RSMResponse(len(items), *args) + return defer.succeed(items) + + def retract_items(self, service, nodeIdentifier, itemIdentifiers, sender=None): + node = self.__items[nodeIdentifier] + for item in [item for item in node if item["id"] in itemIdentifiers]: + node.remove(item) + return defer.succeed(None) + + def get_rsm_response(self, id): + if id not in self.__rsm_responses: + return {} + result = self.__rsm_responses[id].toDict() + del self.__rsm_responses[id] + return result + + def subscriptions(self, service, nodeIdentifier, sender=None): + return defer.succeed([]) + + def service_get_disco_items(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): + items = DiscoItems() + for item in list(self.__items.keys()): + items.append(DiscoItem(service, item)) + return defer.succeed(items)