comparison libervia/backend/test/test_plugin_misc_radiocol.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/test/test_plugin_misc_radiocol.py@524856bd7b19
children
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT: a jabber client
5 # Copyright (C) 2009, 2010, 2011, 2012, 2013 Jérôme Poisson (goffi@goffi.org)
6 # Copyright (C) 2013 Adrien Cossa (souliane@mailoo.org)
7
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
17
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21 """ Tests for the plugin radiocol """
22
23 from libervia.backend.core import exceptions
24 from libervia.backend.test import helpers, helpers_plugins
25 from libervia.backend.plugins import plugin_misc_radiocol as plugin
26 from libervia.backend.plugins import plugin_misc_room_game as plugin_room_game
27 from .constants import Const
28
29 from twisted.words.protocols.jabber.jid import JID
30 from twisted.words.xish import domish
31 from twisted.internet import reactor
32 from twisted.internet import defer
33 from twisted.python.failure import Failure
34 from twisted.trial.unittest import SkipTest
35
36 try:
37 from mutagen.oggvorbis import OggVorbis
38 from mutagen.mp3 import MP3
39 from mutagen.easyid3 import EasyID3
40 from mutagen.id3 import ID3NoHeaderError
41 except ImportError:
42 raise exceptions.MissingModule(
43 "Missing module Mutagen, please download/install from https://bitbucket.org/lazka/mutagen"
44 )
45
46 import uuid
47 import os
48 import copy
49 import shutil
50
51
52 ROOM_JID = JID(Const.MUC_STR[0])
53 PROFILE = Const.PROFILE[0]
54 REFEREE_FULL = JID(ROOM_JID.userhost() + "/" + Const.JID[0].user)
55 PLAYERS_INDICES = [0, 1, 3] # referee included
56 OTHER_PROFILES = [Const.PROFILE[1], Const.PROFILE[3]]
57 OTHER_PLAYERS = [Const.JID[1], Const.JID[3]]
58
59
60 class RadiocolTest(helpers.SatTestCase):
61 def setUp(self):
62 self.host = helpers.FakeSAT()
63
64 def reinit(self):
65 self.host.reinit()
66 self.host.plugins["ROOM-GAME"] = plugin_room_game.RoomGame(self.host)
67 self.plugin = plugin.Radiocol(self.host) # must be init after ROOM-GAME
68 self.plugin.testing = True
69 self.plugin_0045 = self.host.plugins["XEP-0045"] = helpers_plugins.FakeXEP_0045(
70 self.host
71 )
72 self.plugin_0249 = self.host.plugins["XEP-0249"] = helpers_plugins.FakeXEP_0249(
73 self.host
74 )
75 for profile in Const.PROFILE:
76 self.host.get_client(profile) # init self.host.profiles[profile]
77 self.songs = []
78 self.playlist = []
79 self.sound_dir = self.host.memory.config_get("", "media_dir") + "/test/sound/"
80 try:
81 for filename in os.listdir(self.sound_dir):
82 if filename.endswith(".ogg") or filename.endswith(".mp3"):
83 self.songs.append(filename)
84 except OSError:
85 raise SkipTest("The sound samples in sat_media/test/sound were not found")
86
87 def _build_players(self, players=[]):
88 """@return: the "started" content built with the given players"""
89 content = "<started"
90 if not players:
91 content += "/>"
92 else:
93 content += ">"
94 for i in range(0, len(players)):
95 content += "<player index='%s'>%s</player>" % (i, players[i])
96 content += "</started>"
97 return content
98
99 def _expected_message(self, to_jid, type_, content):
100 """
101 @param to_jid: recipient full jid
102 @param type_: message type ('normal' or 'groupchat')
103 @param content: content as unicode or list of domish elements
104 @return: the message XML built from the given recipient, message type and content
105 """
106 if isinstance(content, list):
107 new_content = copy.deepcopy(content)
108 for element in new_content:
109 if not element.hasAttribute("xmlns"):
110 element["xmlns"] = ""
111 content = "".join([element.toXml() for element in new_content])
112 return "<message to='%s' type='%s'><%s xmlns='%s'>%s</%s></message>" % (
113 to_jid.full(),
114 type_,
115 plugin.RADIOC_TAG,
116 plugin.NC_RADIOCOL,
117 content,
118 plugin.RADIOC_TAG,
119 )
120
121 def _reject_song_cb(self, profile_index):
122 """Check if the message "song_rejected" has been sent by the referee
123 and process the command with the profile of the uploader
124 @param profile_index: uploader's profile"""
125 sent = self.host.get_sent_message(0)
126 content = "<song_rejected xmlns='' reason='Too many songs in queue'/>"
127 self.assert_equal_xml(
128 sent.toXml(),
129 self._expected_message(
130 JID(
131 ROOM_JID.userhost()
132 + "/"
133 + self.plugin_0045.get_nick(0, profile_index),
134 "normal",
135 content,
136 )
137 ),
138 )
139 self._room_game_cmd(
140 sent, ["radiocol_song_rejected", ROOM_JID.full(), "Too many songs in queue"]
141 )
142
143 def _no_upload_cb(self):
144 """Check if the message "no_upload" has been sent by the referee
145 and process the command with the profiles of each room users"""
146 sent = self.host.get_sent_message(0)
147 content = "<no_upload xmlns=''/>"
148 self.assert_equal_xml(
149 sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content)
150 )
151 self._room_game_cmd(sent, ["radiocol_no_upload", ROOM_JID.full()])
152
153 def _upload_ok_cb(self):
154 """Check if the message "upload_ok" has been sent by the referee
155 and process the command with the profiles of each room users"""
156 sent = self.host.get_sent_message(0)
157 content = "<upload_ok xmlns=''/>"
158 self.assert_equal_xml(
159 sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content)
160 )
161 self._room_game_cmd(sent, ["radiocol_upload_ok", ROOM_JID.full()])
162
163 def _preload_cb(self, attrs, profile_index):
164 """Check if the message "preload" has been sent by the referee
165 and process the command with the profiles of each room users
166 @param attrs: information dict about the song
167 @param profile_index: profile index of the uploader
168 """
169 sent = self.host.get_sent_message(0)
170 attrs["sender"] = self.plugin_0045.get_nick(0, profile_index)
171 radiocol_elt = next(domish.generateElementsNamed(sent.elements(), "radiocol"))
172 preload_elt = next(domish.generateElementsNamed(
173 radiocol_elt.elements(), "preload"
174 ))
175 attrs["timestamp"] = preload_elt["timestamp"] # we could not guess it...
176 content = "<preload xmlns='' %s/>" % " ".join(
177 ["%s='%s'" % (attr, attrs[attr]) for attr in attrs]
178 )
179 if sent.hasAttribute("from"):
180 del sent["from"]
181 self.assert_equal_xml(
182 sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content)
183 )
184 self._room_game_cmd(
185 sent,
186 [
187 "radiocol_preload",
188 ROOM_JID.full(),
189 attrs["timestamp"],
190 attrs["filename"],
191 attrs["title"],
192 attrs["artist"],
193 attrs["album"],
194 attrs["sender"],
195 ],
196 )
197
198 def _play_next_song_cb(self):
199 """Check if the message "play" has been sent by the referee
200 and process the command with the profiles of each room users"""
201 sent = self.host.get_sent_message(0)
202 filename = self.playlist.pop(0)
203 content = "<play xmlns='' filename='%s' />" % filename
204 self.assert_equal_xml(
205 sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content)
206 )
207 self._room_game_cmd(sent, ["radiocol_play", ROOM_JID.full(), filename])
208
209 game_data = self.plugin.games[ROOM_JID]
210 if len(game_data["queue"]) == plugin.QUEUE_LIMIT - 1:
211 self._upload_ok_cb()
212
213 def _add_song_cb(self, d, filepath, profile_index):
214 """Check if the message "song_added" has been sent by the uploader
215 and process the command with the profile of the referee
216 @param d: deferred value or failure got from self.plugin.radiocol_song_added
217 @param filepath: full path to the sound file
218 @param profile_index: the profile index of the uploader
219 """
220 if isinstance(d, Failure):
221 self.fail("OGG or MP3 song could not be added!")
222
223 game_data = self.plugin.games[ROOM_JID]
224
225 # this is copied from the plugin
226 if filepath.lower().endswith(".mp3"):
227 actual_song = MP3(filepath)
228 try:
229 song = EasyID3(filepath)
230
231 class Info(object):
232 def __init__(self, length):
233 self.length = length
234
235 song.info = Info(actual_song.info.length)
236 except ID3NoHeaderError:
237 song = actual_song
238 else:
239 song = OggVorbis(filepath)
240
241 attrs = {
242 "filename": os.path.basename(filepath),
243 "title": song.get("title", ["Unknown"])[0],
244 "artist": song.get("artist", ["Unknown"])[0],
245 "album": song.get("album", ["Unknown"])[0],
246 "length": str(song.info.length),
247 }
248 self.assertEqual(game_data["to_delete"][attrs["filename"]], filepath)
249
250 content = "<song_added xmlns='' %s/>" % " ".join(
251 ["%s='%s'" % (attr, attrs[attr]) for attr in attrs]
252 )
253 sent = self.host.get_sent_message(profile_index)
254 self.assert_equal_xml(
255 sent.toXml(), self._expected_message(REFEREE_FULL, "normal", content)
256 )
257
258 reject_song = len(game_data["queue"]) >= plugin.QUEUE_LIMIT
259 no_upload = len(game_data["queue"]) + 1 >= plugin.QUEUE_LIMIT
260 play_next = (
261 not game_data["playing"]
262 and len(game_data["queue"]) + 1 == plugin.QUEUE_TO_START
263 )
264
265 self._room_game_cmd(sent, profile_index) # queue unchanged or +1
266 if reject_song:
267 self._reject_song_cb(profile_index)
268 return
269 if no_upload:
270 self._no_upload_cb()
271 self._preload_cb(attrs, profile_index)
272 self.playlist.append(attrs["filename"])
273 if play_next:
274 self._play_next_song_cb() # queue -1
275
276 def _room_game_cmd(self, sent, from_index=0, call=[]):
277 """Process a command. It is also possible to call this method as
278 _room_game_cmd(sent, call) instead of _room_game_cmd(sent, from_index, call).
279 If from index is a list, it is assumed that it is containing the value
280 for call and from_index will take its default value.
281 @param sent: the sent message that we need to process
282 @param from_index: index of the message sender
283 @param call: list containing the name of the expected bridge call
284 followed by its arguments, or empty list if no call is expected
285 """
286 if isinstance(from_index, list):
287 call = from_index
288 from_index = 0
289
290 sent["from"] = ROOM_JID.full() + "/" + self.plugin_0045.get_nick(0, from_index)
291 recipient = JID(sent["to"]).resource
292
293 # The message could have been sent to a room user (room_jid + '/' + nick),
294 # but when it is received, the 'to' attribute of the message has been
295 # changed to the recipient own JID. We need to simulate that here.
296 if recipient:
297 room = self.plugin_0045.get_room(0, 0)
298 sent["to"] = (
299 Const.JID_STR[0]
300 if recipient == room.nick
301 else room.roster[recipient].entity.full()
302 )
303
304 for index in range(0, len(Const.PROFILE)):
305 nick = self.plugin_0045.get_nick(0, index)
306 if nick:
307 if not recipient or nick == recipient:
308 if call and (
309 self.plugin.is_player(ROOM_JID, nick)
310 or call[0] == "radiocol_started"
311 ):
312 args = copy.deepcopy(call)
313 args.append(Const.PROFILE[index])
314 self.host.bridge.expect_call(*args)
315 self.plugin.room_game_cmd(sent, Const.PROFILE[index])
316
317 def _sync_cb(self, sync_data, profile_index):
318 """Synchronize one player when he joins a running game.
319 @param sync_data: result from self.plugin.getSyncData
320 @param profile_index: index of the profile to be synchronized
321 """
322 for nick in sync_data:
323 expected = self._expected_message(
324 JID(ROOM_JID.userhost() + "/" + nick), "normal", sync_data[nick]
325 )
326 sent = self.host.get_sent_message(0)
327 self.assert_equal_xml(sent.toXml(), expected)
328 for elt in sync_data[nick]:
329 if elt.name == "preload":
330 self.host.bridge.expect_call(
331 "radiocol_preload",
332 ROOM_JID.full(),
333 elt["timestamp"],
334 elt["filename"],
335 elt["title"],
336 elt["artist"],
337 elt["album"],
338 elt["sender"],
339 Const.PROFILE[profile_index],
340 )
341 elif elt.name == "play":
342 self.host.bridge.expect_call(
343 "radiocol_play",
344 ROOM_JID.full(),
345 elt["filename"],
346 Const.PROFILE[profile_index],
347 )
348 elif elt.name == "no_upload":
349 self.host.bridge.expect_call(
350 "radiocol_no_upload", ROOM_JID.full(), Const.PROFILE[profile_index]
351 )
352 sync_data[nick]
353 self._room_game_cmd(sent, [])
354
355 def _join_room(self, room, nicks, player_index, sync=True):
356 """Make a player join a room and update the list of nicks
357 @param room: wokkel.muc.Room instance from the referee perspective
358 @param nicks: list of the players which will be updated
359 @param player_index: profile index of the new player
360 @param sync: set to True to synchronize data
361 """
362 user_nick = self.plugin_0045.join_room(0, player_index)
363 self.plugin.user_joined_trigger(room, room.roster[user_nick], PROFILE)
364 if player_index not in PLAYERS_INDICES:
365 # this user is actually not a player
366 self.assertFalse(self.plugin.is_player(ROOM_JID, user_nick))
367 to_jid, type_ = (JID(ROOM_JID.userhost() + "/" + user_nick), "normal")
368 else:
369 # this user is a player
370 self.assertTrue(self.plugin.is_player(ROOM_JID, user_nick))
371 nicks.append(user_nick)
372 to_jid, type_ = (ROOM_JID, "groupchat")
373
374 # Check that the message "players" has been sent by the referee
375 expected = self._expected_message(to_jid, type_, self._build_players(nicks))
376 sent = self.host.get_sent_message(0)
377 self.assert_equal_xml(sent.toXml(), expected)
378
379 # Process the command with the profiles of each room users
380 self._room_game_cmd(
381 sent,
382 [
383 "radiocol_started",
384 ROOM_JID.full(),
385 REFEREE_FULL.full(),
386 nicks,
387 [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT],
388 ],
389 )
390
391 if sync:
392 self._sync_cb(self.plugin._get_sync_data(ROOM_JID, [user_nick]), player_index)
393
394 def _leave_room(self, room, nicks, player_index):
395 """Make a player leave a room and update the list of nicks
396 @param room: wokkel.muc.Room instance from the referee perspective
397 @param nicks: list of the players which will be updated
398 @param player_index: profile index of the new player
399 """
400 user_nick = self.plugin_0045.get_nick(0, player_index)
401 user = room.roster[user_nick]
402 self.plugin_0045.leave_room(0, player_index)
403 self.plugin.user_left_trigger(room, user, PROFILE)
404 nicks.remove(user_nick)
405
406 def _upload_song(self, song_index, profile_index):
407 """Upload the song of index song_index (modulo self.songs size) from the profile of index profile_index.
408
409 @param song_index: index of the song or None to test with non existing file
410 @param profile_index: index of the uploader's profile
411 """
412 if song_index is None:
413 dst_filepath = str(uuid.uuid1())
414 expect_io_error = True
415 else:
416 song_index = song_index % len(self.songs)
417 src_filename = self.songs[song_index]
418 dst_filepath = "/tmp/%s%s" % (uuid.uuid1(), os.path.splitext(src_filename)[1])
419 shutil.copy(self.sound_dir + src_filename, dst_filepath)
420 expect_io_error = False
421
422 try:
423 d = self.plugin.radiocol_song_added(
424 REFEREE_FULL, dst_filepath, Const.PROFILE[profile_index]
425 )
426 except IOError:
427 self.assertTrue(expect_io_error)
428 return
429
430 self.assertFalse(expect_io_error)
431 cb = lambda defer: self._add_song_cb(defer, dst_filepath, profile_index)
432
433 def eb(failure):
434 if not isinstance(failure, Failure):
435 self.fail("Adding a song which is not OGG nor MP3 should fail!")
436 self.assertEqual(failure.value.__class__, exceptions.DataError)
437
438 if src_filename.endswith(".ogg") or src_filename.endswith(".mp3"):
439 d.addCallbacks(cb, cb)
440 else:
441 d.addCallbacks(eb, eb)
442
443 def test_init(self):
444 self.reinit()
445 self.assertEqual(self.plugin.invite_mode, self.plugin.FROM_PLAYERS)
446 self.assertEqual(self.plugin.wait_mode, self.plugin.FOR_NONE)
447 self.assertEqual(self.plugin.join_mode, self.plugin.INVITED)
448 self.assertEqual(self.plugin.ready_mode, self.plugin.FORCE)
449
450 def test_game(self):
451 self.reinit()
452
453 # create game
454 self.plugin.prepare_room(OTHER_PLAYERS, ROOM_JID, PROFILE)
455 self.assertTrue(self.plugin._game_exists(ROOM_JID, True))
456 room = self.plugin_0045.get_room(0, 0)
457 nicks = [self.plugin_0045.get_nick(0, 0)]
458
459 sent = self.host.get_sent_message(0)
460 self.assert_equal_xml(
461 sent.toXml(),
462 self._expected_message(ROOM_JID, "groupchat", self._build_players(nicks)),
463 )
464 self._room_game_cmd(
465 sent,
466 [
467 "radiocol_started",
468 ROOM_JID.full(),
469 REFEREE_FULL.full(),
470 nicks,
471 [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT],
472 ],
473 )
474
475 self._join_room(room, nicks, 1) # player joins
476 self._join_room(room, nicks, 4) # user not playing joins
477
478 song_index = 0
479 self._upload_song(
480 song_index, 0
481 ) # ogg or mp3 file should exist in sat_media/test/song
482 self._upload_song(None, 0) # non existing file
483
484 # another songs are added by Const.JID[1] until the radio starts + 1 to fill the queue
485 # when the first song starts + 1 to be rejected because the queue is full
486 for song_index in range(1, plugin.QUEUE_TO_START + 1):
487 self._upload_song(song_index, 1)
488
489 self.plugin.play_next(Const.MUC[0], PROFILE) # simulate the end of the first song
490 self._play_next_song_cb()
491 self._upload_song(
492 song_index, 1
493 ) # now the song is accepted and the queue is full again
494
495 self._join_room(room, nicks, 3) # new player joins
496
497 self.plugin.play_next(Const.MUC[0], PROFILE) # the second song finishes
498 self._play_next_song_cb()
499 self._upload_song(0, 3) # the player who recently joined re-upload the first file
500
501 self._leave_room(room, nicks, 1) # one player leaves
502 self._join_room(room, nicks, 1) # and join again
503
504 self.plugin.play_next(Const.MUC[0], PROFILE) # empty the queue
505 self._play_next_song_cb()
506 self.plugin.play_next(Const.MUC[0], PROFILE)
507 self._play_next_song_cb()
508
509 for filename in self.playlist:
510 self.plugin.delete_file("/tmp/" + filename)
511
512 return defer.succeed(None)
513
514 def tearDown(self, *args, **kwargs):
515 """Clean the reactor"""
516 helpers.SatTestCase.tearDown(self, *args, **kwargs)
517 for delayed_call in reactor.getDelayedCalls():
518 delayed_call.cancel()