Mercurial > libervia-backend
comparison libervia/backend/test/test_plugin_misc_radiocol.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/test/test_plugin_misc_radiocol.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009, 2010, 2011, 2012, 2013 Jérôme Poisson (goffi@goffi.org) | |
6 # Copyright (C) 2013 Adrien Cossa (souliane@mailoo.org) | |
7 | |
8 # This program is free software: you can redistribute it and/or modify | |
9 # it under the terms of the GNU Affero General Public License as published by | |
10 # the Free Software Foundation, either version 3 of the License, or | |
11 # (at your option) any later version. | |
12 | |
13 # This program is distributed in the hope that it will be useful, | |
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 # GNU Affero General Public License for more details. | |
17 | |
18 # You should have received a copy of the GNU Affero General Public License | |
19 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 | |
21 """ Tests for the plugin radiocol """ | |
22 | |
23 from libervia.backend.core import exceptions | |
24 from libervia.backend.test import helpers, helpers_plugins | |
25 from libervia.backend.plugins import plugin_misc_radiocol as plugin | |
26 from libervia.backend.plugins import plugin_misc_room_game as plugin_room_game | |
27 from .constants import Const | |
28 | |
29 from twisted.words.protocols.jabber.jid import JID | |
30 from twisted.words.xish import domish | |
31 from twisted.internet import reactor | |
32 from twisted.internet import defer | |
33 from twisted.python.failure import Failure | |
34 from twisted.trial.unittest import SkipTest | |
35 | |
36 try: | |
37 from mutagen.oggvorbis import OggVorbis | |
38 from mutagen.mp3 import MP3 | |
39 from mutagen.easyid3 import EasyID3 | |
40 from mutagen.id3 import ID3NoHeaderError | |
41 except ImportError: | |
42 raise exceptions.MissingModule( | |
43 "Missing module Mutagen, please download/install from https://bitbucket.org/lazka/mutagen" | |
44 ) | |
45 | |
46 import uuid | |
47 import os | |
48 import copy | |
49 import shutil | |
50 | |
51 | |
52 ROOM_JID = JID(Const.MUC_STR[0]) | |
53 PROFILE = Const.PROFILE[0] | |
54 REFEREE_FULL = JID(ROOM_JID.userhost() + "/" + Const.JID[0].user) | |
55 PLAYERS_INDICES = [0, 1, 3] # referee included | |
56 OTHER_PROFILES = [Const.PROFILE[1], Const.PROFILE[3]] | |
57 OTHER_PLAYERS = [Const.JID[1], Const.JID[3]] | |
58 | |
59 | |
60 class RadiocolTest(helpers.SatTestCase): | |
61 def setUp(self): | |
62 self.host = helpers.FakeSAT() | |
63 | |
64 def reinit(self): | |
65 self.host.reinit() | |
66 self.host.plugins["ROOM-GAME"] = plugin_room_game.RoomGame(self.host) | |
67 self.plugin = plugin.Radiocol(self.host) # must be init after ROOM-GAME | |
68 self.plugin.testing = True | |
69 self.plugin_0045 = self.host.plugins["XEP-0045"] = helpers_plugins.FakeXEP_0045( | |
70 self.host | |
71 ) | |
72 self.plugin_0249 = self.host.plugins["XEP-0249"] = helpers_plugins.FakeXEP_0249( | |
73 self.host | |
74 ) | |
75 for profile in Const.PROFILE: | |
76 self.host.get_client(profile) # init self.host.profiles[profile] | |
77 self.songs = [] | |
78 self.playlist = [] | |
79 self.sound_dir = self.host.memory.config_get("", "media_dir") + "/test/sound/" | |
80 try: | |
81 for filename in os.listdir(self.sound_dir): | |
82 if filename.endswith(".ogg") or filename.endswith(".mp3"): | |
83 self.songs.append(filename) | |
84 except OSError: | |
85 raise SkipTest("The sound samples in sat_media/test/sound were not found") | |
86 | |
87 def _build_players(self, players=[]): | |
88 """@return: the "started" content built with the given players""" | |
89 content = "<started" | |
90 if not players: | |
91 content += "/>" | |
92 else: | |
93 content += ">" | |
94 for i in range(0, len(players)): | |
95 content += "<player index='%s'>%s</player>" % (i, players[i]) | |
96 content += "</started>" | |
97 return content | |
98 | |
99 def _expected_message(self, to_jid, type_, content): | |
100 """ | |
101 @param to_jid: recipient full jid | |
102 @param type_: message type ('normal' or 'groupchat') | |
103 @param content: content as unicode or list of domish elements | |
104 @return: the message XML built from the given recipient, message type and content | |
105 """ | |
106 if isinstance(content, list): | |
107 new_content = copy.deepcopy(content) | |
108 for element in new_content: | |
109 if not element.hasAttribute("xmlns"): | |
110 element["xmlns"] = "" | |
111 content = "".join([element.toXml() for element in new_content]) | |
112 return "<message to='%s' type='%s'><%s xmlns='%s'>%s</%s></message>" % ( | |
113 to_jid.full(), | |
114 type_, | |
115 plugin.RADIOC_TAG, | |
116 plugin.NC_RADIOCOL, | |
117 content, | |
118 plugin.RADIOC_TAG, | |
119 ) | |
120 | |
121 def _reject_song_cb(self, profile_index): | |
122 """Check if the message "song_rejected" has been sent by the referee | |
123 and process the command with the profile of the uploader | |
124 @param profile_index: uploader's profile""" | |
125 sent = self.host.get_sent_message(0) | |
126 content = "<song_rejected xmlns='' reason='Too many songs in queue'/>" | |
127 self.assert_equal_xml( | |
128 sent.toXml(), | |
129 self._expected_message( | |
130 JID( | |
131 ROOM_JID.userhost() | |
132 + "/" | |
133 + self.plugin_0045.get_nick(0, profile_index), | |
134 "normal", | |
135 content, | |
136 ) | |
137 ), | |
138 ) | |
139 self._room_game_cmd( | |
140 sent, ["radiocol_song_rejected", ROOM_JID.full(), "Too many songs in queue"] | |
141 ) | |
142 | |
143 def _no_upload_cb(self): | |
144 """Check if the message "no_upload" has been sent by the referee | |
145 and process the command with the profiles of each room users""" | |
146 sent = self.host.get_sent_message(0) | |
147 content = "<no_upload xmlns=''/>" | |
148 self.assert_equal_xml( | |
149 sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content) | |
150 ) | |
151 self._room_game_cmd(sent, ["radiocol_no_upload", ROOM_JID.full()]) | |
152 | |
153 def _upload_ok_cb(self): | |
154 """Check if the message "upload_ok" has been sent by the referee | |
155 and process the command with the profiles of each room users""" | |
156 sent = self.host.get_sent_message(0) | |
157 content = "<upload_ok xmlns=''/>" | |
158 self.assert_equal_xml( | |
159 sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content) | |
160 ) | |
161 self._room_game_cmd(sent, ["radiocol_upload_ok", ROOM_JID.full()]) | |
162 | |
163 def _preload_cb(self, attrs, profile_index): | |
164 """Check if the message "preload" has been sent by the referee | |
165 and process the command with the profiles of each room users | |
166 @param attrs: information dict about the song | |
167 @param profile_index: profile index of the uploader | |
168 """ | |
169 sent = self.host.get_sent_message(0) | |
170 attrs["sender"] = self.plugin_0045.get_nick(0, profile_index) | |
171 radiocol_elt = next(domish.generateElementsNamed(sent.elements(), "radiocol")) | |
172 preload_elt = next(domish.generateElementsNamed( | |
173 radiocol_elt.elements(), "preload" | |
174 )) | |
175 attrs["timestamp"] = preload_elt["timestamp"] # we could not guess it... | |
176 content = "<preload xmlns='' %s/>" % " ".join( | |
177 ["%s='%s'" % (attr, attrs[attr]) for attr in attrs] | |
178 ) | |
179 if sent.hasAttribute("from"): | |
180 del sent["from"] | |
181 self.assert_equal_xml( | |
182 sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content) | |
183 ) | |
184 self._room_game_cmd( | |
185 sent, | |
186 [ | |
187 "radiocol_preload", | |
188 ROOM_JID.full(), | |
189 attrs["timestamp"], | |
190 attrs["filename"], | |
191 attrs["title"], | |
192 attrs["artist"], | |
193 attrs["album"], | |
194 attrs["sender"], | |
195 ], | |
196 ) | |
197 | |
198 def _play_next_song_cb(self): | |
199 """Check if the message "play" has been sent by the referee | |
200 and process the command with the profiles of each room users""" | |
201 sent = self.host.get_sent_message(0) | |
202 filename = self.playlist.pop(0) | |
203 content = "<play xmlns='' filename='%s' />" % filename | |
204 self.assert_equal_xml( | |
205 sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content) | |
206 ) | |
207 self._room_game_cmd(sent, ["radiocol_play", ROOM_JID.full(), filename]) | |
208 | |
209 game_data = self.plugin.games[ROOM_JID] | |
210 if len(game_data["queue"]) == plugin.QUEUE_LIMIT - 1: | |
211 self._upload_ok_cb() | |
212 | |
213 def _add_song_cb(self, d, filepath, profile_index): | |
214 """Check if the message "song_added" has been sent by the uploader | |
215 and process the command with the profile of the referee | |
216 @param d: deferred value or failure got from self.plugin.radiocol_song_added | |
217 @param filepath: full path to the sound file | |
218 @param profile_index: the profile index of the uploader | |
219 """ | |
220 if isinstance(d, Failure): | |
221 self.fail("OGG or MP3 song could not be added!") | |
222 | |
223 game_data = self.plugin.games[ROOM_JID] | |
224 | |
225 # this is copied from the plugin | |
226 if filepath.lower().endswith(".mp3"): | |
227 actual_song = MP3(filepath) | |
228 try: | |
229 song = EasyID3(filepath) | |
230 | |
231 class Info(object): | |
232 def __init__(self, length): | |
233 self.length = length | |
234 | |
235 song.info = Info(actual_song.info.length) | |
236 except ID3NoHeaderError: | |
237 song = actual_song | |
238 else: | |
239 song = OggVorbis(filepath) | |
240 | |
241 attrs = { | |
242 "filename": os.path.basename(filepath), | |
243 "title": song.get("title", ["Unknown"])[0], | |
244 "artist": song.get("artist", ["Unknown"])[0], | |
245 "album": song.get("album", ["Unknown"])[0], | |
246 "length": str(song.info.length), | |
247 } | |
248 self.assertEqual(game_data["to_delete"][attrs["filename"]], filepath) | |
249 | |
250 content = "<song_added xmlns='' %s/>" % " ".join( | |
251 ["%s='%s'" % (attr, attrs[attr]) for attr in attrs] | |
252 ) | |
253 sent = self.host.get_sent_message(profile_index) | |
254 self.assert_equal_xml( | |
255 sent.toXml(), self._expected_message(REFEREE_FULL, "normal", content) | |
256 ) | |
257 | |
258 reject_song = len(game_data["queue"]) >= plugin.QUEUE_LIMIT | |
259 no_upload = len(game_data["queue"]) + 1 >= plugin.QUEUE_LIMIT | |
260 play_next = ( | |
261 not game_data["playing"] | |
262 and len(game_data["queue"]) + 1 == plugin.QUEUE_TO_START | |
263 ) | |
264 | |
265 self._room_game_cmd(sent, profile_index) # queue unchanged or +1 | |
266 if reject_song: | |
267 self._reject_song_cb(profile_index) | |
268 return | |
269 if no_upload: | |
270 self._no_upload_cb() | |
271 self._preload_cb(attrs, profile_index) | |
272 self.playlist.append(attrs["filename"]) | |
273 if play_next: | |
274 self._play_next_song_cb() # queue -1 | |
275 | |
276 def _room_game_cmd(self, sent, from_index=0, call=[]): | |
277 """Process a command. It is also possible to call this method as | |
278 _room_game_cmd(sent, call) instead of _room_game_cmd(sent, from_index, call). | |
279 If from index is a list, it is assumed that it is containing the value | |
280 for call and from_index will take its default value. | |
281 @param sent: the sent message that we need to process | |
282 @param from_index: index of the message sender | |
283 @param call: list containing the name of the expected bridge call | |
284 followed by its arguments, or empty list if no call is expected | |
285 """ | |
286 if isinstance(from_index, list): | |
287 call = from_index | |
288 from_index = 0 | |
289 | |
290 sent["from"] = ROOM_JID.full() + "/" + self.plugin_0045.get_nick(0, from_index) | |
291 recipient = JID(sent["to"]).resource | |
292 | |
293 # The message could have been sent to a room user (room_jid + '/' + nick), | |
294 # but when it is received, the 'to' attribute of the message has been | |
295 # changed to the recipient own JID. We need to simulate that here. | |
296 if recipient: | |
297 room = self.plugin_0045.get_room(0, 0) | |
298 sent["to"] = ( | |
299 Const.JID_STR[0] | |
300 if recipient == room.nick | |
301 else room.roster[recipient].entity.full() | |
302 ) | |
303 | |
304 for index in range(0, len(Const.PROFILE)): | |
305 nick = self.plugin_0045.get_nick(0, index) | |
306 if nick: | |
307 if not recipient or nick == recipient: | |
308 if call and ( | |
309 self.plugin.is_player(ROOM_JID, nick) | |
310 or call[0] == "radiocol_started" | |
311 ): | |
312 args = copy.deepcopy(call) | |
313 args.append(Const.PROFILE[index]) | |
314 self.host.bridge.expect_call(*args) | |
315 self.plugin.room_game_cmd(sent, Const.PROFILE[index]) | |
316 | |
317 def _sync_cb(self, sync_data, profile_index): | |
318 """Synchronize one player when he joins a running game. | |
319 @param sync_data: result from self.plugin.getSyncData | |
320 @param profile_index: index of the profile to be synchronized | |
321 """ | |
322 for nick in sync_data: | |
323 expected = self._expected_message( | |
324 JID(ROOM_JID.userhost() + "/" + nick), "normal", sync_data[nick] | |
325 ) | |
326 sent = self.host.get_sent_message(0) | |
327 self.assert_equal_xml(sent.toXml(), expected) | |
328 for elt in sync_data[nick]: | |
329 if elt.name == "preload": | |
330 self.host.bridge.expect_call( | |
331 "radiocol_preload", | |
332 ROOM_JID.full(), | |
333 elt["timestamp"], | |
334 elt["filename"], | |
335 elt["title"], | |
336 elt["artist"], | |
337 elt["album"], | |
338 elt["sender"], | |
339 Const.PROFILE[profile_index], | |
340 ) | |
341 elif elt.name == "play": | |
342 self.host.bridge.expect_call( | |
343 "radiocol_play", | |
344 ROOM_JID.full(), | |
345 elt["filename"], | |
346 Const.PROFILE[profile_index], | |
347 ) | |
348 elif elt.name == "no_upload": | |
349 self.host.bridge.expect_call( | |
350 "radiocol_no_upload", ROOM_JID.full(), Const.PROFILE[profile_index] | |
351 ) | |
352 sync_data[nick] | |
353 self._room_game_cmd(sent, []) | |
354 | |
355 def _join_room(self, room, nicks, player_index, sync=True): | |
356 """Make a player join a room and update the list of nicks | |
357 @param room: wokkel.muc.Room instance from the referee perspective | |
358 @param nicks: list of the players which will be updated | |
359 @param player_index: profile index of the new player | |
360 @param sync: set to True to synchronize data | |
361 """ | |
362 user_nick = self.plugin_0045.join_room(0, player_index) | |
363 self.plugin.user_joined_trigger(room, room.roster[user_nick], PROFILE) | |
364 if player_index not in PLAYERS_INDICES: | |
365 # this user is actually not a player | |
366 self.assertFalse(self.plugin.is_player(ROOM_JID, user_nick)) | |
367 to_jid, type_ = (JID(ROOM_JID.userhost() + "/" + user_nick), "normal") | |
368 else: | |
369 # this user is a player | |
370 self.assertTrue(self.plugin.is_player(ROOM_JID, user_nick)) | |
371 nicks.append(user_nick) | |
372 to_jid, type_ = (ROOM_JID, "groupchat") | |
373 | |
374 # Check that the message "players" has been sent by the referee | |
375 expected = self._expected_message(to_jid, type_, self._build_players(nicks)) | |
376 sent = self.host.get_sent_message(0) | |
377 self.assert_equal_xml(sent.toXml(), expected) | |
378 | |
379 # Process the command with the profiles of each room users | |
380 self._room_game_cmd( | |
381 sent, | |
382 [ | |
383 "radiocol_started", | |
384 ROOM_JID.full(), | |
385 REFEREE_FULL.full(), | |
386 nicks, | |
387 [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT], | |
388 ], | |
389 ) | |
390 | |
391 if sync: | |
392 self._sync_cb(self.plugin._get_sync_data(ROOM_JID, [user_nick]), player_index) | |
393 | |
394 def _leave_room(self, room, nicks, player_index): | |
395 """Make a player leave a room and update the list of nicks | |
396 @param room: wokkel.muc.Room instance from the referee perspective | |
397 @param nicks: list of the players which will be updated | |
398 @param player_index: profile index of the new player | |
399 """ | |
400 user_nick = self.plugin_0045.get_nick(0, player_index) | |
401 user = room.roster[user_nick] | |
402 self.plugin_0045.leave_room(0, player_index) | |
403 self.plugin.user_left_trigger(room, user, PROFILE) | |
404 nicks.remove(user_nick) | |
405 | |
406 def _upload_song(self, song_index, profile_index): | |
407 """Upload the song of index song_index (modulo self.songs size) from the profile of index profile_index. | |
408 | |
409 @param song_index: index of the song or None to test with non existing file | |
410 @param profile_index: index of the uploader's profile | |
411 """ | |
412 if song_index is None: | |
413 dst_filepath = str(uuid.uuid1()) | |
414 expect_io_error = True | |
415 else: | |
416 song_index = song_index % len(self.songs) | |
417 src_filename = self.songs[song_index] | |
418 dst_filepath = "/tmp/%s%s" % (uuid.uuid1(), os.path.splitext(src_filename)[1]) | |
419 shutil.copy(self.sound_dir + src_filename, dst_filepath) | |
420 expect_io_error = False | |
421 | |
422 try: | |
423 d = self.plugin.radiocol_song_added( | |
424 REFEREE_FULL, dst_filepath, Const.PROFILE[profile_index] | |
425 ) | |
426 except IOError: | |
427 self.assertTrue(expect_io_error) | |
428 return | |
429 | |
430 self.assertFalse(expect_io_error) | |
431 cb = lambda defer: self._add_song_cb(defer, dst_filepath, profile_index) | |
432 | |
433 def eb(failure): | |
434 if not isinstance(failure, Failure): | |
435 self.fail("Adding a song which is not OGG nor MP3 should fail!") | |
436 self.assertEqual(failure.value.__class__, exceptions.DataError) | |
437 | |
438 if src_filename.endswith(".ogg") or src_filename.endswith(".mp3"): | |
439 d.addCallbacks(cb, cb) | |
440 else: | |
441 d.addCallbacks(eb, eb) | |
442 | |
443 def test_init(self): | |
444 self.reinit() | |
445 self.assertEqual(self.plugin.invite_mode, self.plugin.FROM_PLAYERS) | |
446 self.assertEqual(self.plugin.wait_mode, self.plugin.FOR_NONE) | |
447 self.assertEqual(self.plugin.join_mode, self.plugin.INVITED) | |
448 self.assertEqual(self.plugin.ready_mode, self.plugin.FORCE) | |
449 | |
450 def test_game(self): | |
451 self.reinit() | |
452 | |
453 # create game | |
454 self.plugin.prepare_room(OTHER_PLAYERS, ROOM_JID, PROFILE) | |
455 self.assertTrue(self.plugin._game_exists(ROOM_JID, True)) | |
456 room = self.plugin_0045.get_room(0, 0) | |
457 nicks = [self.plugin_0045.get_nick(0, 0)] | |
458 | |
459 sent = self.host.get_sent_message(0) | |
460 self.assert_equal_xml( | |
461 sent.toXml(), | |
462 self._expected_message(ROOM_JID, "groupchat", self._build_players(nicks)), | |
463 ) | |
464 self._room_game_cmd( | |
465 sent, | |
466 [ | |
467 "radiocol_started", | |
468 ROOM_JID.full(), | |
469 REFEREE_FULL.full(), | |
470 nicks, | |
471 [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT], | |
472 ], | |
473 ) | |
474 | |
475 self._join_room(room, nicks, 1) # player joins | |
476 self._join_room(room, nicks, 4) # user not playing joins | |
477 | |
478 song_index = 0 | |
479 self._upload_song( | |
480 song_index, 0 | |
481 ) # ogg or mp3 file should exist in sat_media/test/song | |
482 self._upload_song(None, 0) # non existing file | |
483 | |
484 # another songs are added by Const.JID[1] until the radio starts + 1 to fill the queue | |
485 # when the first song starts + 1 to be rejected because the queue is full | |
486 for song_index in range(1, plugin.QUEUE_TO_START + 1): | |
487 self._upload_song(song_index, 1) | |
488 | |
489 self.plugin.play_next(Const.MUC[0], PROFILE) # simulate the end of the first song | |
490 self._play_next_song_cb() | |
491 self._upload_song( | |
492 song_index, 1 | |
493 ) # now the song is accepted and the queue is full again | |
494 | |
495 self._join_room(room, nicks, 3) # new player joins | |
496 | |
497 self.plugin.play_next(Const.MUC[0], PROFILE) # the second song finishes | |
498 self._play_next_song_cb() | |
499 self._upload_song(0, 3) # the player who recently joined re-upload the first file | |
500 | |
501 self._leave_room(room, nicks, 1) # one player leaves | |
502 self._join_room(room, nicks, 1) # and join again | |
503 | |
504 self.plugin.play_next(Const.MUC[0], PROFILE) # empty the queue | |
505 self._play_next_song_cb() | |
506 self.plugin.play_next(Const.MUC[0], PROFILE) | |
507 self._play_next_song_cb() | |
508 | |
509 for filename in self.playlist: | |
510 self.plugin.delete_file("/tmp/" + filename) | |
511 | |
512 return defer.succeed(None) | |
513 | |
514 def tearDown(self, *args, **kwargs): | |
515 """Clean the reactor""" | |
516 helpers.SatTestCase.tearDown(self, *args, **kwargs) | |
517 for delayed_call in reactor.getDelayedCalls(): | |
518 delayed_call.cancel() |