Mercurial > libervia-backend
diff libervia/backend/test/test_plugin_misc_radiocol.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/test/test_plugin_misc_radiocol.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/test/test_plugin_misc_radiocol.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,518 @@ +#!/usr/bin/env python3 + + +# SAT: a jabber client +# Copyright (C) 2009, 2010, 2011, 2012, 2013 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" Tests for the plugin radiocol """ + +from libervia.backend.core import exceptions +from libervia.backend.test import helpers, helpers_plugins +from libervia.backend.plugins import plugin_misc_radiocol as plugin +from libervia.backend.plugins import plugin_misc_room_game as plugin_room_game +from .constants import Const + +from twisted.words.protocols.jabber.jid import JID +from twisted.words.xish import domish +from twisted.internet import reactor +from twisted.internet import defer +from twisted.python.failure import Failure +from twisted.trial.unittest import SkipTest + +try: + from mutagen.oggvorbis import OggVorbis + from mutagen.mp3 import MP3 + from mutagen.easyid3 import EasyID3 + from mutagen.id3 import ID3NoHeaderError +except ImportError: + raise exceptions.MissingModule( + "Missing module Mutagen, please download/install from https://bitbucket.org/lazka/mutagen" + ) + +import uuid +import os +import copy +import shutil + + +ROOM_JID = JID(Const.MUC_STR[0]) +PROFILE = Const.PROFILE[0] +REFEREE_FULL = JID(ROOM_JID.userhost() + "/" + Const.JID[0].user) +PLAYERS_INDICES = [0, 1, 3] # referee included +OTHER_PROFILES = [Const.PROFILE[1], Const.PROFILE[3]] +OTHER_PLAYERS = [Const.JID[1], Const.JID[3]] + + +class RadiocolTest(helpers.SatTestCase): + def setUp(self): + self.host = helpers.FakeSAT() + + def reinit(self): + self.host.reinit() + self.host.plugins["ROOM-GAME"] = plugin_room_game.RoomGame(self.host) + self.plugin = plugin.Radiocol(self.host) # must be init after ROOM-GAME + self.plugin.testing = True + self.plugin_0045 = self.host.plugins["XEP-0045"] = helpers_plugins.FakeXEP_0045( + self.host + ) + self.plugin_0249 = self.host.plugins["XEP-0249"] = helpers_plugins.FakeXEP_0249( + self.host + ) + for profile in Const.PROFILE: + self.host.get_client(profile) # init self.host.profiles[profile] + self.songs = [] + self.playlist = [] + self.sound_dir = self.host.memory.config_get("", "media_dir") + "/test/sound/" + try: + for filename in os.listdir(self.sound_dir): + if filename.endswith(".ogg") or filename.endswith(".mp3"): + self.songs.append(filename) + except OSError: + raise SkipTest("The sound samples in sat_media/test/sound were not found") + + def _build_players(self, players=[]): + """@return: the "started" content built with the given players""" + content = "<started" + if not players: + content += "/>" + else: + content += ">" + for i in range(0, len(players)): + content += "<player index='%s'>%s</player>" % (i, players[i]) + content += "</started>" + return content + + def _expected_message(self, to_jid, type_, content): + """ + @param to_jid: recipient full jid + @param type_: message type ('normal' or 'groupchat') + @param content: content as unicode or list of domish elements + @return: the message XML built from the given recipient, message type and content + """ + if isinstance(content, list): + new_content = copy.deepcopy(content) + for element in new_content: + if not element.hasAttribute("xmlns"): + element["xmlns"] = "" + content = "".join([element.toXml() for element in new_content]) + return "<message to='%s' type='%s'><%s xmlns='%s'>%s</%s></message>" % ( + to_jid.full(), + type_, + plugin.RADIOC_TAG, + plugin.NC_RADIOCOL, + content, + plugin.RADIOC_TAG, + ) + + def _reject_song_cb(self, profile_index): + """Check if the message "song_rejected" has been sent by the referee + and process the command with the profile of the uploader + @param profile_index: uploader's profile""" + sent = self.host.get_sent_message(0) + content = "<song_rejected xmlns='' reason='Too many songs in queue'/>" + self.assert_equal_xml( + sent.toXml(), + self._expected_message( + JID( + ROOM_JID.userhost() + + "/" + + self.plugin_0045.get_nick(0, profile_index), + "normal", + content, + ) + ), + ) + self._room_game_cmd( + sent, ["radiocol_song_rejected", ROOM_JID.full(), "Too many songs in queue"] + ) + + def _no_upload_cb(self): + """Check if the message "no_upload" has been sent by the referee + and process the command with the profiles of each room users""" + sent = self.host.get_sent_message(0) + content = "<no_upload xmlns=''/>" + self.assert_equal_xml( + sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content) + ) + self._room_game_cmd(sent, ["radiocol_no_upload", ROOM_JID.full()]) + + def _upload_ok_cb(self): + """Check if the message "upload_ok" has been sent by the referee + and process the command with the profiles of each room users""" + sent = self.host.get_sent_message(0) + content = "<upload_ok xmlns=''/>" + self.assert_equal_xml( + sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content) + ) + self._room_game_cmd(sent, ["radiocol_upload_ok", ROOM_JID.full()]) + + def _preload_cb(self, attrs, profile_index): + """Check if the message "preload" has been sent by the referee + and process the command with the profiles of each room users + @param attrs: information dict about the song + @param profile_index: profile index of the uploader + """ + sent = self.host.get_sent_message(0) + attrs["sender"] = self.plugin_0045.get_nick(0, profile_index) + radiocol_elt = next(domish.generateElementsNamed(sent.elements(), "radiocol")) + preload_elt = next(domish.generateElementsNamed( + radiocol_elt.elements(), "preload" + )) + attrs["timestamp"] = preload_elt["timestamp"] # we could not guess it... + content = "<preload xmlns='' %s/>" % " ".join( + ["%s='%s'" % (attr, attrs[attr]) for attr in attrs] + ) + if sent.hasAttribute("from"): + del sent["from"] + self.assert_equal_xml( + sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content) + ) + self._room_game_cmd( + sent, + [ + "radiocol_preload", + ROOM_JID.full(), + attrs["timestamp"], + attrs["filename"], + attrs["title"], + attrs["artist"], + attrs["album"], + attrs["sender"], + ], + ) + + def _play_next_song_cb(self): + """Check if the message "play" has been sent by the referee + and process the command with the profiles of each room users""" + sent = self.host.get_sent_message(0) + filename = self.playlist.pop(0) + content = "<play xmlns='' filename='%s' />" % filename + self.assert_equal_xml( + sent.toXml(), self._expected_message(ROOM_JID, "groupchat", content) + ) + self._room_game_cmd(sent, ["radiocol_play", ROOM_JID.full(), filename]) + + game_data = self.plugin.games[ROOM_JID] + if len(game_data["queue"]) == plugin.QUEUE_LIMIT - 1: + self._upload_ok_cb() + + def _add_song_cb(self, d, filepath, profile_index): + """Check if the message "song_added" has been sent by the uploader + and process the command with the profile of the referee + @param d: deferred value or failure got from self.plugin.radiocol_song_added + @param filepath: full path to the sound file + @param profile_index: the profile index of the uploader + """ + if isinstance(d, Failure): + self.fail("OGG or MP3 song could not be added!") + + game_data = self.plugin.games[ROOM_JID] + + # this is copied from the plugin + if filepath.lower().endswith(".mp3"): + actual_song = MP3(filepath) + try: + song = EasyID3(filepath) + + class Info(object): + def __init__(self, length): + self.length = length + + song.info = Info(actual_song.info.length) + except ID3NoHeaderError: + song = actual_song + else: + song = OggVorbis(filepath) + + attrs = { + "filename": os.path.basename(filepath), + "title": song.get("title", ["Unknown"])[0], + "artist": song.get("artist", ["Unknown"])[0], + "album": song.get("album", ["Unknown"])[0], + "length": str(song.info.length), + } + self.assertEqual(game_data["to_delete"][attrs["filename"]], filepath) + + content = "<song_added xmlns='' %s/>" % " ".join( + ["%s='%s'" % (attr, attrs[attr]) for attr in attrs] + ) + sent = self.host.get_sent_message(profile_index) + self.assert_equal_xml( + sent.toXml(), self._expected_message(REFEREE_FULL, "normal", content) + ) + + reject_song = len(game_data["queue"]) >= plugin.QUEUE_LIMIT + no_upload = len(game_data["queue"]) + 1 >= plugin.QUEUE_LIMIT + play_next = ( + not game_data["playing"] + and len(game_data["queue"]) + 1 == plugin.QUEUE_TO_START + ) + + self._room_game_cmd(sent, profile_index) # queue unchanged or +1 + if reject_song: + self._reject_song_cb(profile_index) + return + if no_upload: + self._no_upload_cb() + self._preload_cb(attrs, profile_index) + self.playlist.append(attrs["filename"]) + if play_next: + self._play_next_song_cb() # queue -1 + + def _room_game_cmd(self, sent, from_index=0, call=[]): + """Process a command. It is also possible to call this method as + _room_game_cmd(sent, call) instead of _room_game_cmd(sent, from_index, call). + If from index is a list, it is assumed that it is containing the value + for call and from_index will take its default value. + @param sent: the sent message that we need to process + @param from_index: index of the message sender + @param call: list containing the name of the expected bridge call + followed by its arguments, or empty list if no call is expected + """ + if isinstance(from_index, list): + call = from_index + from_index = 0 + + sent["from"] = ROOM_JID.full() + "/" + self.plugin_0045.get_nick(0, from_index) + recipient = JID(sent["to"]).resource + + # The message could have been sent to a room user (room_jid + '/' + nick), + # but when it is received, the 'to' attribute of the message has been + # changed to the recipient own JID. We need to simulate that here. + if recipient: + room = self.plugin_0045.get_room(0, 0) + sent["to"] = ( + Const.JID_STR[0] + if recipient == room.nick + else room.roster[recipient].entity.full() + ) + + for index in range(0, len(Const.PROFILE)): + nick = self.plugin_0045.get_nick(0, index) + if nick: + if not recipient or nick == recipient: + if call and ( + self.plugin.is_player(ROOM_JID, nick) + or call[0] == "radiocol_started" + ): + args = copy.deepcopy(call) + args.append(Const.PROFILE[index]) + self.host.bridge.expect_call(*args) + self.plugin.room_game_cmd(sent, Const.PROFILE[index]) + + def _sync_cb(self, sync_data, profile_index): + """Synchronize one player when he joins a running game. + @param sync_data: result from self.plugin.getSyncData + @param profile_index: index of the profile to be synchronized + """ + for nick in sync_data: + expected = self._expected_message( + JID(ROOM_JID.userhost() + "/" + nick), "normal", sync_data[nick] + ) + sent = self.host.get_sent_message(0) + self.assert_equal_xml(sent.toXml(), expected) + for elt in sync_data[nick]: + if elt.name == "preload": + self.host.bridge.expect_call( + "radiocol_preload", + ROOM_JID.full(), + elt["timestamp"], + elt["filename"], + elt["title"], + elt["artist"], + elt["album"], + elt["sender"], + Const.PROFILE[profile_index], + ) + elif elt.name == "play": + self.host.bridge.expect_call( + "radiocol_play", + ROOM_JID.full(), + elt["filename"], + Const.PROFILE[profile_index], + ) + elif elt.name == "no_upload": + self.host.bridge.expect_call( + "radiocol_no_upload", ROOM_JID.full(), Const.PROFILE[profile_index] + ) + sync_data[nick] + self._room_game_cmd(sent, []) + + def _join_room(self, room, nicks, player_index, sync=True): + """Make a player join a room and update the list of nicks + @param room: wokkel.muc.Room instance from the referee perspective + @param nicks: list of the players which will be updated + @param player_index: profile index of the new player + @param sync: set to True to synchronize data + """ + user_nick = self.plugin_0045.join_room(0, player_index) + self.plugin.user_joined_trigger(room, room.roster[user_nick], PROFILE) + if player_index not in PLAYERS_INDICES: + # this user is actually not a player + self.assertFalse(self.plugin.is_player(ROOM_JID, user_nick)) + to_jid, type_ = (JID(ROOM_JID.userhost() + "/" + user_nick), "normal") + else: + # this user is a player + self.assertTrue(self.plugin.is_player(ROOM_JID, user_nick)) + nicks.append(user_nick) + to_jid, type_ = (ROOM_JID, "groupchat") + + # Check that the message "players" has been sent by the referee + expected = self._expected_message(to_jid, type_, self._build_players(nicks)) + sent = self.host.get_sent_message(0) + self.assert_equal_xml(sent.toXml(), expected) + + # Process the command with the profiles of each room users + self._room_game_cmd( + sent, + [ + "radiocol_started", + ROOM_JID.full(), + REFEREE_FULL.full(), + nicks, + [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT], + ], + ) + + if sync: + self._sync_cb(self.plugin._get_sync_data(ROOM_JID, [user_nick]), player_index) + + def _leave_room(self, room, nicks, player_index): + """Make a player leave a room and update the list of nicks + @param room: wokkel.muc.Room instance from the referee perspective + @param nicks: list of the players which will be updated + @param player_index: profile index of the new player + """ + user_nick = self.plugin_0045.get_nick(0, player_index) + user = room.roster[user_nick] + self.plugin_0045.leave_room(0, player_index) + self.plugin.user_left_trigger(room, user, PROFILE) + nicks.remove(user_nick) + + def _upload_song(self, song_index, profile_index): + """Upload the song of index song_index (modulo self.songs size) from the profile of index profile_index. + + @param song_index: index of the song or None to test with non existing file + @param profile_index: index of the uploader's profile + """ + if song_index is None: + dst_filepath = str(uuid.uuid1()) + expect_io_error = True + else: + song_index = song_index % len(self.songs) + src_filename = self.songs[song_index] + dst_filepath = "/tmp/%s%s" % (uuid.uuid1(), os.path.splitext(src_filename)[1]) + shutil.copy(self.sound_dir + src_filename, dst_filepath) + expect_io_error = False + + try: + d = self.plugin.radiocol_song_added( + REFEREE_FULL, dst_filepath, Const.PROFILE[profile_index] + ) + except IOError: + self.assertTrue(expect_io_error) + return + + self.assertFalse(expect_io_error) + cb = lambda defer: self._add_song_cb(defer, dst_filepath, profile_index) + + def eb(failure): + if not isinstance(failure, Failure): + self.fail("Adding a song which is not OGG nor MP3 should fail!") + self.assertEqual(failure.value.__class__, exceptions.DataError) + + if src_filename.endswith(".ogg") or src_filename.endswith(".mp3"): + d.addCallbacks(cb, cb) + else: + d.addCallbacks(eb, eb) + + def test_init(self): + self.reinit() + self.assertEqual(self.plugin.invite_mode, self.plugin.FROM_PLAYERS) + self.assertEqual(self.plugin.wait_mode, self.plugin.FOR_NONE) + self.assertEqual(self.plugin.join_mode, self.plugin.INVITED) + self.assertEqual(self.plugin.ready_mode, self.plugin.FORCE) + + def test_game(self): + self.reinit() + + # create game + self.plugin.prepare_room(OTHER_PLAYERS, ROOM_JID, PROFILE) + self.assertTrue(self.plugin._game_exists(ROOM_JID, True)) + room = self.plugin_0045.get_room(0, 0) + nicks = [self.plugin_0045.get_nick(0, 0)] + + sent = self.host.get_sent_message(0) + self.assert_equal_xml( + sent.toXml(), + self._expected_message(ROOM_JID, "groupchat", self._build_players(nicks)), + ) + self._room_game_cmd( + sent, + [ + "radiocol_started", + ROOM_JID.full(), + REFEREE_FULL.full(), + nicks, + [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT], + ], + ) + + self._join_room(room, nicks, 1) # player joins + self._join_room(room, nicks, 4) # user not playing joins + + song_index = 0 + self._upload_song( + song_index, 0 + ) # ogg or mp3 file should exist in sat_media/test/song + self._upload_song(None, 0) # non existing file + + # another songs are added by Const.JID[1] until the radio starts + 1 to fill the queue + # when the first song starts + 1 to be rejected because the queue is full + for song_index in range(1, plugin.QUEUE_TO_START + 1): + self._upload_song(song_index, 1) + + self.plugin.play_next(Const.MUC[0], PROFILE) # simulate the end of the first song + self._play_next_song_cb() + self._upload_song( + song_index, 1 + ) # now the song is accepted and the queue is full again + + self._join_room(room, nicks, 3) # new player joins + + self.plugin.play_next(Const.MUC[0], PROFILE) # the second song finishes + self._play_next_song_cb() + self._upload_song(0, 3) # the player who recently joined re-upload the first file + + self._leave_room(room, nicks, 1) # one player leaves + self._join_room(room, nicks, 1) # and join again + + self.plugin.play_next(Const.MUC[0], PROFILE) # empty the queue + self._play_next_song_cb() + self.plugin.play_next(Const.MUC[0], PROFILE) + self._play_next_song_cb() + + for filename in self.playlist: + self.plugin.delete_file("/tmp/" + filename) + + return defer.succeed(None) + + def tearDown(self, *args, **kwargs): + """Clean the reactor""" + helpers.SatTestCase.tearDown(self, *args, **kwargs) + for delayed_call in reactor.getDelayedCalls(): + delayed_call.cancel()