comparison libervia/backend/test/helpers_plugins.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/test/helpers_plugins.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
7
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
17
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21 """ Helpers class for plugin dependencies """
22
23 from twisted.internet import defer
24
25 from wokkel.muc import Room, User
26 from wokkel.generic import parseXml
27 from wokkel.disco import DiscoItem, DiscoItems
28
29 # temporary until the changes are integrated to Wokkel
30 from sat_tmp.wokkel.rsm import RSMResponse
31
32 from .constants import Const as C
33 from libervia.backend.plugins import plugin_xep_0045
34 from collections import OrderedDict
35
36
37 class FakeMUCClient(object):
38 def __init__(self, plugin_parent):
39 self.plugin_parent = plugin_parent
40 self.host = plugin_parent.host
41 self.joined_rooms = {}
42
43 def join(self, room_jid, nick, options=None, profile_key=C.PROF_KEY_NONE):
44 """
45 @param room_jid: the room JID
46 @param nick: nick to be used in the room
47 @param options: joining options
48 @param profile_key: the profile key of the user joining the room
49 @return: the deferred joined wokkel.muc.Room instance
50 """
51 profile = self.host.memory.get_profile_name(profile_key)
52 roster = {}
53
54 # ask the other profiles to fill our roster
55 for i in range(0, len(C.PROFILE)):
56 other_profile = C.PROFILE[i]
57 if other_profile == profile:
58 continue
59 try:
60 other_room = self.plugin_parent.clients[other_profile].joined_rooms[
61 room_jid
62 ]
63 roster.setdefault(
64 other_room.nick, User(other_room.nick, C.PROFILE_DICT[other_profile])
65 )
66 for other_nick in other_room.roster:
67 roster.setdefault(other_nick, other_room.roster[other_nick])
68 except (AttributeError, KeyError):
69 pass
70
71 # rename our nick if it already exists
72 while nick in list(roster.keys()):
73 if C.PROFILE_DICT[profile].userhost() == roster[nick].entity.userhost():
74 break # same user with different resource --> same nickname
75 nick = nick + "_"
76
77 room = Room(room_jid, nick)
78 room.roster = roster
79 self.joined_rooms[room_jid] = room
80
81 # fill the other rosters with the new entry
82 for i in range(0, len(C.PROFILE)):
83 other_profile = C.PROFILE[i]
84 if other_profile == profile:
85 continue
86 try:
87 other_room = self.plugin_parent.clients[other_profile].joined_rooms[
88 room_jid
89 ]
90 other_room.roster.setdefault(
91 room.nick, User(room.nick, C.PROFILE_DICT[profile])
92 )
93 except (AttributeError, KeyError):
94 pass
95
96 return defer.succeed(room)
97
98 def leave(self, roomJID, profile_key=C.PROF_KEY_NONE):
99 """
100 @param roomJID: the room JID
101 @param profile_key: the profile key of the user joining the room
102 @return: a dummy deferred
103 """
104 profile = self.host.memory.get_profile_name(profile_key)
105 room = self.joined_rooms[roomJID]
106 # remove ourself from the other rosters
107 for i in range(0, len(C.PROFILE)):
108 other_profile = C.PROFILE[i]
109 if other_profile == profile:
110 continue
111 try:
112 other_room = self.plugin_parent.clients[other_profile].joined_rooms[
113 roomJID
114 ]
115 del other_room.roster[room.nick]
116 except (AttributeError, KeyError):
117 pass
118 del self.joined_rooms[roomJID]
119 return defer.Deferred()
120
121
122 class FakeXEP_0045(plugin_xep_0045.XEP_0045):
123 def __init__(self, host):
124 self.host = host
125 self.clients = {}
126 for profile in C.PROFILE:
127 self.clients[profile] = FakeMUCClient(self)
128
129 def join(self, room_jid, nick, options={}, profile_key="@DEFAULT@"):
130 """
131 @param roomJID: the room JID
132 @param nick: nick to be used in the room
133 @param options: ignore
134 @param profile_key: the profile of the user joining the room
135 @return: the deferred joined wokkel.muc.Room instance or None
136 """
137 profile = self.host.memory.get_profile_name(profile_key)
138 if room_jid in self.clients[profile].joined_rooms:
139 return defer.succeed(None)
140 room = self.clients[profile].join(room_jid, nick, profile_key=profile)
141 return room
142
143 def join_room(self, muc_index, user_index):
144 """Called by tests
145 @return: the nickname of the user who joined room"""
146 muc_jid = C.MUC[muc_index]
147 nick = C.JID[user_index].user
148 profile = C.PROFILE[user_index]
149 self.join(muc_jid, nick, profile_key=profile)
150 return self.get_nick(muc_index, user_index)
151
152 def leave(self, room_jid, profile_key="@DEFAULT@"):
153 """
154 @param roomJID: the room JID
155 @param profile_key: the profile of the user leaving the room
156 @return: a dummy deferred
157 """
158 profile = self.host.memory.get_profile_name(profile_key)
159 if room_jid not in self.clients[profile].joined_rooms:
160 raise plugin_xep_0045.UnknownRoom("This room has not been joined")
161 return self.clients[profile].leave(room_jid, profile)
162
163 def leave_room(self, muc_index, user_index):
164 """Called by tests
165 @return: the nickname of the user who left the room"""
166 muc_jid = C.MUC[muc_index]
167 nick = self.get_nick(muc_index, user_index)
168 profile = C.PROFILE[user_index]
169 self.leave(muc_jid, profile_key=profile)
170 return nick
171
172 def get_room(self, muc_index, user_index):
173 """Called by tests
174 @return: a wokkel.muc.Room instance"""
175 profile = C.PROFILE[user_index]
176 muc_jid = C.MUC[muc_index]
177 try:
178 return self.clients[profile].joined_rooms[muc_jid]
179 except (AttributeError, KeyError):
180 return None
181
182 def get_nick(self, muc_index, user_index):
183 try:
184 return self.get_room_nick(C.MUC[muc_index], C.PROFILE[user_index])
185 except (KeyError, AttributeError):
186 return ""
187
188 def get_nick_of_user(self, muc_index, user_index, profile_index, secure=True):
189 try:
190 room = self.clients[C.PROFILE[profile_index]].joined_rooms[C.MUC[muc_index]]
191 return self.getRoomNickOfUser(room, C.JID[user_index])
192 except (KeyError, AttributeError):
193 return None
194
195
196 class FakeXEP_0249(object):
197 def __init__(self, host):
198 self.host = host
199
200 def invite(self, target, room, options={}, profile_key="@DEFAULT@"):
201 """
202 Invite a user to a room. To accept the invitation from a test,
203 just call FakeXEP_0045.join_room (no need to have a dedicated method).
204 @param target: jid of the user to invite
205 @param room: jid of the room where the user is invited
206 @options: attribute with extra info (reason, password) as in #XEP-0249
207 @profile_key: %(doc_profile_key)s
208 """
209 pass
210
211
212 class FakeSatPubSubClient(object):
213 def __init__(self, host, parent_plugin):
214 self.host = host
215 self.parent_plugin = parent_plugin
216 self.__items = OrderedDict()
217 self.__rsm_responses = {}
218
219 def createNode(self, service, nodeIdentifier=None, options=None, sender=None):
220 return defer.succeed(None)
221
222 def deleteNode(self, service, nodeIdentifier, sender=None):
223 try:
224 del self.__items[nodeIdentifier]
225 except KeyError:
226 pass
227 return defer.succeed(None)
228
229 def subscribe(self, service, nodeIdentifier, subscriber, options=None, sender=None):
230 return defer.succeed(None)
231
232 def unsubscribe(
233 self,
234 service,
235 nodeIdentifier,
236 subscriber,
237 subscriptionIdentifier=None,
238 sender=None,
239 ):
240 return defer.succeed(None)
241
242 def publish(self, service, nodeIdentifier, items=None, sender=None):
243 node = self.__items.setdefault(nodeIdentifier, [])
244
245 def replace(item_obj):
246 index = 0
247 for current in node:
248 if current["id"] == item_obj["id"]:
249 node[index] = item_obj
250 return True
251 index += 1
252 return False
253
254 for item in items:
255 item_obj = parseXml(item) if isinstance(item, str) else item
256 if not replace(item_obj):
257 node.append(item_obj)
258 return defer.succeed(None)
259
260 def items(
261 self,
262 service,
263 nodeIdentifier,
264 maxItems=None,
265 itemIdentifiers=None,
266 subscriptionIdentifier=None,
267 sender=None,
268 ext_data=None,
269 ):
270 try:
271 items = self.__items[nodeIdentifier]
272 except KeyError:
273 items = []
274 if ext_data:
275 assert "id" in ext_data
276 if "rsm" in ext_data:
277 args = (0, items[0]["id"], items[-1]["id"]) if items else ()
278 self.__rsm_responses[ext_data["id"]] = RSMResponse(len(items), *args)
279 return defer.succeed(items)
280
281 def retract_items(self, service, nodeIdentifier, itemIdentifiers, sender=None):
282 node = self.__items[nodeIdentifier]
283 for item in [item for item in node if item["id"] in itemIdentifiers]:
284 node.remove(item)
285 return defer.succeed(None)
286
287 def get_rsm_response(self, id):
288 if id not in self.__rsm_responses:
289 return {}
290 result = self.__rsm_responses[id].toDict()
291 del self.__rsm_responses[id]
292 return result
293
294 def subscriptions(self, service, nodeIdentifier, sender=None):
295 return defer.succeed([])
296
297 def service_get_disco_items(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
298 items = DiscoItems()
299 for item in list(self.__items.keys()):
300 items.append(DiscoItem(service, item))
301 return defer.succeed(items)