Mercurial > libervia-backend
comparison libervia/backend/test/helpers_plugins.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/test/helpers_plugins.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) | |
7 | |
8 # This program is free software: you can redistribute it and/or modify | |
9 # it under the terms of the GNU Affero General Public License as published by | |
10 # the Free Software Foundation, either version 3 of the License, or | |
11 # (at your option) any later version. | |
12 | |
13 # This program is distributed in the hope that it will be useful, | |
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 # GNU Affero General Public License for more details. | |
17 | |
18 # You should have received a copy of the GNU Affero General Public License | |
19 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 | |
21 """ Helpers class for plugin dependencies """ | |
22 | |
23 from twisted.internet import defer | |
24 | |
25 from wokkel.muc import Room, User | |
26 from wokkel.generic import parseXml | |
27 from wokkel.disco import DiscoItem, DiscoItems | |
28 | |
29 # temporary until the changes are integrated to Wokkel | |
30 from sat_tmp.wokkel.rsm import RSMResponse | |
31 | |
32 from .constants import Const as C | |
33 from libervia.backend.plugins import plugin_xep_0045 | |
34 from collections import OrderedDict | |
35 | |
36 | |
37 class FakeMUCClient(object): | |
38 def __init__(self, plugin_parent): | |
39 self.plugin_parent = plugin_parent | |
40 self.host = plugin_parent.host | |
41 self.joined_rooms = {} | |
42 | |
43 def join(self, room_jid, nick, options=None, profile_key=C.PROF_KEY_NONE): | |
44 """ | |
45 @param room_jid: the room JID | |
46 @param nick: nick to be used in the room | |
47 @param options: joining options | |
48 @param profile_key: the profile key of the user joining the room | |
49 @return: the deferred joined wokkel.muc.Room instance | |
50 """ | |
51 profile = self.host.memory.get_profile_name(profile_key) | |
52 roster = {} | |
53 | |
54 # ask the other profiles to fill our roster | |
55 for i in range(0, len(C.PROFILE)): | |
56 other_profile = C.PROFILE[i] | |
57 if other_profile == profile: | |
58 continue | |
59 try: | |
60 other_room = self.plugin_parent.clients[other_profile].joined_rooms[ | |
61 room_jid | |
62 ] | |
63 roster.setdefault( | |
64 other_room.nick, User(other_room.nick, C.PROFILE_DICT[other_profile]) | |
65 ) | |
66 for other_nick in other_room.roster: | |
67 roster.setdefault(other_nick, other_room.roster[other_nick]) | |
68 except (AttributeError, KeyError): | |
69 pass | |
70 | |
71 # rename our nick if it already exists | |
72 while nick in list(roster.keys()): | |
73 if C.PROFILE_DICT[profile].userhost() == roster[nick].entity.userhost(): | |
74 break # same user with different resource --> same nickname | |
75 nick = nick + "_" | |
76 | |
77 room = Room(room_jid, nick) | |
78 room.roster = roster | |
79 self.joined_rooms[room_jid] = room | |
80 | |
81 # fill the other rosters with the new entry | |
82 for i in range(0, len(C.PROFILE)): | |
83 other_profile = C.PROFILE[i] | |
84 if other_profile == profile: | |
85 continue | |
86 try: | |
87 other_room = self.plugin_parent.clients[other_profile].joined_rooms[ | |
88 room_jid | |
89 ] | |
90 other_room.roster.setdefault( | |
91 room.nick, User(room.nick, C.PROFILE_DICT[profile]) | |
92 ) | |
93 except (AttributeError, KeyError): | |
94 pass | |
95 | |
96 return defer.succeed(room) | |
97 | |
98 def leave(self, roomJID, profile_key=C.PROF_KEY_NONE): | |
99 """ | |
100 @param roomJID: the room JID | |
101 @param profile_key: the profile key of the user joining the room | |
102 @return: a dummy deferred | |
103 """ | |
104 profile = self.host.memory.get_profile_name(profile_key) | |
105 room = self.joined_rooms[roomJID] | |
106 # remove ourself from the other rosters | |
107 for i in range(0, len(C.PROFILE)): | |
108 other_profile = C.PROFILE[i] | |
109 if other_profile == profile: | |
110 continue | |
111 try: | |
112 other_room = self.plugin_parent.clients[other_profile].joined_rooms[ | |
113 roomJID | |
114 ] | |
115 del other_room.roster[room.nick] | |
116 except (AttributeError, KeyError): | |
117 pass | |
118 del self.joined_rooms[roomJID] | |
119 return defer.Deferred() | |
120 | |
121 | |
122 class FakeXEP_0045(plugin_xep_0045.XEP_0045): | |
123 def __init__(self, host): | |
124 self.host = host | |
125 self.clients = {} | |
126 for profile in C.PROFILE: | |
127 self.clients[profile] = FakeMUCClient(self) | |
128 | |
129 def join(self, room_jid, nick, options={}, profile_key="@DEFAULT@"): | |
130 """ | |
131 @param roomJID: the room JID | |
132 @param nick: nick to be used in the room | |
133 @param options: ignore | |
134 @param profile_key: the profile of the user joining the room | |
135 @return: the deferred joined wokkel.muc.Room instance or None | |
136 """ | |
137 profile = self.host.memory.get_profile_name(profile_key) | |
138 if room_jid in self.clients[profile].joined_rooms: | |
139 return defer.succeed(None) | |
140 room = self.clients[profile].join(room_jid, nick, profile_key=profile) | |
141 return room | |
142 | |
143 def join_room(self, muc_index, user_index): | |
144 """Called by tests | |
145 @return: the nickname of the user who joined room""" | |
146 muc_jid = C.MUC[muc_index] | |
147 nick = C.JID[user_index].user | |
148 profile = C.PROFILE[user_index] | |
149 self.join(muc_jid, nick, profile_key=profile) | |
150 return self.get_nick(muc_index, user_index) | |
151 | |
152 def leave(self, room_jid, profile_key="@DEFAULT@"): | |
153 """ | |
154 @param roomJID: the room JID | |
155 @param profile_key: the profile of the user leaving the room | |
156 @return: a dummy deferred | |
157 """ | |
158 profile = self.host.memory.get_profile_name(profile_key) | |
159 if room_jid not in self.clients[profile].joined_rooms: | |
160 raise plugin_xep_0045.UnknownRoom("This room has not been joined") | |
161 return self.clients[profile].leave(room_jid, profile) | |
162 | |
163 def leave_room(self, muc_index, user_index): | |
164 """Called by tests | |
165 @return: the nickname of the user who left the room""" | |
166 muc_jid = C.MUC[muc_index] | |
167 nick = self.get_nick(muc_index, user_index) | |
168 profile = C.PROFILE[user_index] | |
169 self.leave(muc_jid, profile_key=profile) | |
170 return nick | |
171 | |
172 def get_room(self, muc_index, user_index): | |
173 """Called by tests | |
174 @return: a wokkel.muc.Room instance""" | |
175 profile = C.PROFILE[user_index] | |
176 muc_jid = C.MUC[muc_index] | |
177 try: | |
178 return self.clients[profile].joined_rooms[muc_jid] | |
179 except (AttributeError, KeyError): | |
180 return None | |
181 | |
182 def get_nick(self, muc_index, user_index): | |
183 try: | |
184 return self.get_room_nick(C.MUC[muc_index], C.PROFILE[user_index]) | |
185 except (KeyError, AttributeError): | |
186 return "" | |
187 | |
188 def get_nick_of_user(self, muc_index, user_index, profile_index, secure=True): | |
189 try: | |
190 room = self.clients[C.PROFILE[profile_index]].joined_rooms[C.MUC[muc_index]] | |
191 return self.getRoomNickOfUser(room, C.JID[user_index]) | |
192 except (KeyError, AttributeError): | |
193 return None | |
194 | |
195 | |
196 class FakeXEP_0249(object): | |
197 def __init__(self, host): | |
198 self.host = host | |
199 | |
200 def invite(self, target, room, options={}, profile_key="@DEFAULT@"): | |
201 """ | |
202 Invite a user to a room. To accept the invitation from a test, | |
203 just call FakeXEP_0045.join_room (no need to have a dedicated method). | |
204 @param target: jid of the user to invite | |
205 @param room: jid of the room where the user is invited | |
206 @options: attribute with extra info (reason, password) as in #XEP-0249 | |
207 @profile_key: %(doc_profile_key)s | |
208 """ | |
209 pass | |
210 | |
211 | |
212 class FakeSatPubSubClient(object): | |
213 def __init__(self, host, parent_plugin): | |
214 self.host = host | |
215 self.parent_plugin = parent_plugin | |
216 self.__items = OrderedDict() | |
217 self.__rsm_responses = {} | |
218 | |
219 def createNode(self, service, nodeIdentifier=None, options=None, sender=None): | |
220 return defer.succeed(None) | |
221 | |
222 def deleteNode(self, service, nodeIdentifier, sender=None): | |
223 try: | |
224 del self.__items[nodeIdentifier] | |
225 except KeyError: | |
226 pass | |
227 return defer.succeed(None) | |
228 | |
229 def subscribe(self, service, nodeIdentifier, subscriber, options=None, sender=None): | |
230 return defer.succeed(None) | |
231 | |
232 def unsubscribe( | |
233 self, | |
234 service, | |
235 nodeIdentifier, | |
236 subscriber, | |
237 subscriptionIdentifier=None, | |
238 sender=None, | |
239 ): | |
240 return defer.succeed(None) | |
241 | |
242 def publish(self, service, nodeIdentifier, items=None, sender=None): | |
243 node = self.__items.setdefault(nodeIdentifier, []) | |
244 | |
245 def replace(item_obj): | |
246 index = 0 | |
247 for current in node: | |
248 if current["id"] == item_obj["id"]: | |
249 node[index] = item_obj | |
250 return True | |
251 index += 1 | |
252 return False | |
253 | |
254 for item in items: | |
255 item_obj = parseXml(item) if isinstance(item, str) else item | |
256 if not replace(item_obj): | |
257 node.append(item_obj) | |
258 return defer.succeed(None) | |
259 | |
260 def items( | |
261 self, | |
262 service, | |
263 nodeIdentifier, | |
264 maxItems=None, | |
265 itemIdentifiers=None, | |
266 subscriptionIdentifier=None, | |
267 sender=None, | |
268 ext_data=None, | |
269 ): | |
270 try: | |
271 items = self.__items[nodeIdentifier] | |
272 except KeyError: | |
273 items = [] | |
274 if ext_data: | |
275 assert "id" in ext_data | |
276 if "rsm" in ext_data: | |
277 args = (0, items[0]["id"], items[-1]["id"]) if items else () | |
278 self.__rsm_responses[ext_data["id"]] = RSMResponse(len(items), *args) | |
279 return defer.succeed(items) | |
280 | |
281 def retract_items(self, service, nodeIdentifier, itemIdentifiers, sender=None): | |
282 node = self.__items[nodeIdentifier] | |
283 for item in [item for item in node if item["id"] in itemIdentifiers]: | |
284 node.remove(item) | |
285 return defer.succeed(None) | |
286 | |
287 def get_rsm_response(self, id): | |
288 if id not in self.__rsm_responses: | |
289 return {} | |
290 result = self.__rsm_responses[id].toDict() | |
291 del self.__rsm_responses[id] | |
292 return result | |
293 | |
294 def subscriptions(self, service, nodeIdentifier, sender=None): | |
295 return defer.succeed([]) | |
296 | |
297 def service_get_disco_items(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): | |
298 items = DiscoItems() | |
299 for item in list(self.__items.keys()): | |
300 items.append(DiscoItem(service, item)) | |
301 return defer.succeed(items) |