Mercurial > libervia-backend
diff sat/test/helpers_plugins.py @ 2562:26edcf3a30eb
core, setup: huge cleaning:
- moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention
- move twisted directory to root
- removed all hacks from setup.py, and added missing dependencies, it is now clean
- use https URL for website in setup.py
- removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed
- renamed sat.sh to sat and fixed its installation
- added python_requires to specify Python version needed
- replaced glib2reactor which use deprecated code by gtk3reactor
sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Apr 2018 19:44:50 +0200 |
parents | src/test/helpers_plugins.py@0046283a285d |
children | 56f94936df1e |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/test/helpers_plugins.py Mon Apr 02 19:44:50 2018 +0200 @@ -0,0 +1,282 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" Helpers class for plugin dependencies """ + +from twisted.internet import defer + +from wokkel.muc import Room, User +from wokkel.generic import parseXml +from wokkel.disco import DiscoItem, DiscoItems + +# temporary until the changes are integrated to Wokkel +from sat_tmp.wokkel.rsm import RSMResponse + +from constants import Const as C +from sat.plugins import plugin_xep_0045 +from collections import OrderedDict + + +class FakeMUCClient(object): + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + self.joined_rooms = {} + + def join(self, room_jid, nick, options=None, profile_key=C.PROF_KEY_NONE): + """ + @param room_jid: the room JID + @param nick: nick to be used in the room + @param options: joining options + @param profile_key: the profile key of the user joining the room + @return: the deferred joined wokkel.muc.Room instance + """ + profile = self.host.memory.getProfileName(profile_key) + roster = {} + + # ask the other profiles to fill our roster + for i in xrange(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] + if other_profile == profile: + continue + try: + other_room = self.plugin_parent.clients[other_profile].joined_rooms[room_jid] + roster.setdefault(other_room.nick, User(other_room.nick, C.PROFILE_DICT[other_profile])) + for other_nick in other_room.roster: + roster.setdefault(other_nick, other_room.roster[other_nick]) + except (AttributeError, KeyError): + pass + + # rename our nick if it already exists + while nick in roster.keys(): + if C.PROFILE_DICT[profile].userhost() == roster[nick].entity.userhost(): + break # same user with different resource --> same nickname + nick = nick + "_" + + room = Room(room_jid, nick) + room.roster = roster + self.joined_rooms[room_jid] = room + + # fill the other rosters with the new entry + for i in xrange(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] + if other_profile == profile: + continue + try: + other_room = self.plugin_parent.clients[other_profile].joined_rooms[room_jid] + other_room.roster.setdefault(room.nick, User(room.nick, C.PROFILE_DICT[profile])) + except (AttributeError, KeyError): + pass + + return defer.succeed(room) + + def leave(self, roomJID, profile_key=C.PROF_KEY_NONE): + """ + @param roomJID: the room JID + @param profile_key: the profile key of the user joining the room + @return: a dummy deferred + """ + profile = self.host.memory.getProfileName(profile_key) + room = self.joined_rooms[roomJID] + # remove ourself from the other rosters + for i in xrange(0, len(C.PROFILE)): + other_profile = C.PROFILE[i] + if other_profile == profile: + continue + try: + other_room = self.plugin_parent.clients[other_profile].joined_rooms[roomJID] + del other_room.roster[room.nick] + except (AttributeError, KeyError): + pass + del self.joined_rooms[roomJID] + return defer.Deferred() + + +class FakeXEP_0045(plugin_xep_0045.XEP_0045): + + def __init__(self, host): + self.host = host + self.clients = {} + for profile in C.PROFILE: + self.clients[profile] = FakeMUCClient(self) + + def join(self, room_jid, nick, options={}, profile_key='@DEFAULT@'): + """ + @param roomJID: the room JID + @param nick: nick to be used in the room + @param options: ignore + @param profile_key: the profile of the user joining the room + @return: the deferred joined wokkel.muc.Room instance or None + """ + profile = self.host.memory.getProfileName(profile_key) + if room_jid in self.clients[profile].joined_rooms: + return defer.succeed(None) + room = self.clients[profile].join(room_jid, nick, profile_key=profile) + return room + + def joinRoom(self, muc_index, user_index): + """Called by tests + @return: the nickname of the user who joined room""" + muc_jid = C.MUC[muc_index] + nick = C.JID[user_index].user + profile = C.PROFILE[user_index] + self.join(muc_jid, nick, profile_key=profile) + return self.getNick(muc_index, user_index) + + def leave(self, room_jid, profile_key='@DEFAULT@'): + """ + @param roomJID: the room JID + @param profile_key: the profile of the user leaving the room + @return: a dummy deferred + """ + profile = self.host.memory.getProfileName(profile_key) + if room_jid not in self.clients[profile].joined_rooms: + raise plugin_xep_0045.UnknownRoom("This room has not been joined") + return self.clients[profile].leave(room_jid, profile) + + def leaveRoom(self, muc_index, user_index): + """Called by tests + @return: the nickname of the user who left the room""" + muc_jid = C.MUC[muc_index] + nick = self.getNick(muc_index, user_index) + profile = C.PROFILE[user_index] + self.leave(muc_jid, profile_key=profile) + return nick + + def getRoom(self, muc_index, user_index): + """Called by tests + @return: a wokkel.muc.Room instance""" + profile = C.PROFILE[user_index] + muc_jid = C.MUC[muc_index] + try: + return self.clients[profile].joined_rooms[muc_jid] + except (AttributeError, KeyError): + return None + + def getNick(self, muc_index, user_index): + try: + return self.getRoomNick(C.MUC[muc_index], C.PROFILE[user_index]) + except (KeyError, AttributeError): + return '' + + def getNickOfUser(self, muc_index, user_index, profile_index, secure=True): + try: + room = self.clients[C.PROFILE[profile_index]].joined_rooms[C.MUC[muc_index]] + return self.getRoomNickOfUser(room, C.JID[user_index]) + except (KeyError, AttributeError): + return None + + +class FakeXEP_0249(object): + + def __init__(self, host): + self.host = host + + def invite(self, target, room, options={}, profile_key='@DEFAULT@'): + """ + Invite a user to a room. To accept the invitation from a test, + just call FakeXEP_0045.joinRoom (no need to have a dedicated method). + @param target: jid of the user to invite + @param room: jid of the room where the user is invited + @options: attribute with extra info (reason, password) as in #XEP-0249 + @profile_key: %(doc_profile_key)s + """ + pass + + +class FakeSatPubSubClient(object): + + def __init__(self, host, parent_plugin): + self.host = host + self.parent_plugin = parent_plugin + self.__items = OrderedDict() + self.__rsm_responses = {} + + def createNode(self, service, nodeIdentifier=None, options=None, + sender=None): + return defer.succeed(None) + + def deleteNode(self, service, nodeIdentifier, sender=None): + try: + del self.__items[nodeIdentifier] + except KeyError: + pass + return defer.succeed(None) + + def subscribe(self, service, nodeIdentifier, subscriber, + options=None, sender=None): + return defer.succeed(None) + + def unsubscribe(self, service, nodeIdentifier, subscriber, + subscriptionIdentifier=None, sender=None): + return defer.succeed(None) + + def publish(self, service, nodeIdentifier, items=None, sender=None): + node = self.__items.setdefault(nodeIdentifier, []) + + def replace(item_obj): + index = 0 + for current in node: + if current['id'] == item_obj['id']: + node[index] = item_obj + return True + index += 1 + return False + + for item in items: + item_obj = parseXml(item) if isinstance(item, unicode) else item + if not replace(item_obj): + node.append(item_obj) + return defer.succeed(None) + + def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, + subscriptionIdentifier=None, sender=None, ext_data=None): + try: + items = self.__items[nodeIdentifier] + except KeyError: + items = [] + if ext_data: + assert('id' in ext_data) + if 'rsm' in ext_data: + args = (0, items[0]['id'], items[-1]['id']) if items else () + self.__rsm_responses[ext_data['id']] = RSMResponse(len(items), *args) + return defer.succeed(items) + + def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None): + node = self.__items[nodeIdentifier] + for item in [item for item in node if item['id'] in itemIdentifiers]: + node.remove(item) + return defer.succeed(None) + + def getRSMResponse(self, id): + if id not in self.__rsm_responses: + return {} + result = self.__rsm_responses[id].toDict() + del self.__rsm_responses[id] + return result + + def subscriptions(self, service, nodeIdentifier, sender=None): + return defer.succeed([]) + + def service_getDiscoItems(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): + items = DiscoItems() + for item in self.__items.keys(): + items.append(DiscoItem(service, item)) + return defer.succeed(items)