Mercurial > libervia-backend
diff sat/test/test_plugin_misc_radiocol.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | ab2696e34d29 |
line wrap: on
line diff
--- a/sat/test/test_plugin_misc_radiocol.py Wed Jun 27 07:51:29 2018 +0200 +++ b/sat/test/test_plugin_misc_radiocol.py Wed Jun 27 20:14:46 2018 +0200 @@ -39,7 +39,9 @@ from mutagen.easyid3 import EasyID3 from mutagen.id3 import ID3NoHeaderError except ImportError: - raise exceptions.MissingModule(u"Missing module Mutagen, please download/install from https://bitbucket.org/lazka/mutagen") + raise exceptions.MissingModule( + u"Missing module Mutagen, please download/install from https://bitbucket.org/lazka/mutagen" + ) import uuid import os @@ -49,35 +51,38 @@ ROOM_JID = JID(Const.MUC_STR[0]) PROFILE = Const.PROFILE[0] -REFEREE_FULL = JID(ROOM_JID.userhost() + '/' + Const.JID[0].user) +REFEREE_FULL = JID(ROOM_JID.userhost() + "/" + Const.JID[0].user) PLAYERS_INDICES = [0, 1, 3] # referee included OTHER_PROFILES = [Const.PROFILE[1], Const.PROFILE[3]] OTHER_PLAYERS = [Const.JID[1], Const.JID[3]] class RadiocolTest(helpers.SatTestCase): - def setUp(self): self.host = helpers.FakeSAT() def reinit(self): self.host.reinit() - self.host.plugins['ROOM-GAME'] = plugin_room_game.RoomGame(self.host) + self.host.plugins["ROOM-GAME"] = plugin_room_game.RoomGame(self.host) self.plugin = plugin.Radiocol(self.host) # must be init after ROOM-GAME self.plugin.testing = True - self.plugin_0045 = self.host.plugins['XEP-0045'] = helpers_plugins.FakeXEP_0045(self.host) - self.plugin_0249 = self.host.plugins['XEP-0249'] = helpers_plugins.FakeXEP_0249(self.host) + self.plugin_0045 = self.host.plugins["XEP-0045"] = helpers_plugins.FakeXEP_0045( + self.host + ) + self.plugin_0249 = self.host.plugins["XEP-0249"] = helpers_plugins.FakeXEP_0249( + self.host + ) for profile in Const.PROFILE: self.host.getClient(profile) # init self.host.profiles[profile] self.songs = [] self.playlist = [] - self.sound_dir = self.host.memory.getConfig('', 'media_dir') + '/test/sound/' + self.sound_dir = self.host.memory.getConfig("", "media_dir") + "/test/sound/" try: for filename in os.listdir(self.sound_dir): - if filename.endswith('.ogg') or filename.endswith('.mp3'): + if filename.endswith(".ogg") or filename.endswith(".mp3"): self.songs.append(filename) except OSError: - raise SkipTest('The sound samples in sat_media/test/sound were not found') + raise SkipTest("The sound samples in sat_media/test/sound were not found") def _buildPlayers(self, players=[]): """@return: the "started" content built with the given players""" @@ -101,10 +106,17 @@ if isinstance(content, list): new_content = copy.deepcopy(content) for element in new_content: - if not element.hasAttribute('xmlns'): - element['xmlns'] = '' + if not element.hasAttribute("xmlns"): + element["xmlns"] = "" content = "".join([element.toXml() for element in new_content]) - return "<message to='%s' type='%s'><%s xmlns='%s'>%s</%s></message>" % (to_jid.full(), type_, plugin.RADIOC_TAG, plugin.NC_RADIOCOL, content, plugin.RADIOC_TAG) + return "<message to='%s' type='%s'><%s xmlns='%s'>%s</%s></message>" % ( + to_jid.full(), + type_, + plugin.RADIOC_TAG, + plugin.NC_RADIOCOL, + content, + plugin.RADIOC_TAG, + ) def _rejectSongCb(self, profile_index): """Check if the message "song_rejected" has been sent by the referee @@ -112,24 +124,41 @@ @param profile_index: uploader's profile""" sent = self.host.getSentMessage(0) content = "<song_rejected xmlns='' reason='Too many songs in queue'/>" - self.assertEqualXML(sent.toXml(), self._expectedMessage(JID(ROOM_JID.userhost() + '/' + self.plugin_0045.getNick(0, profile_index), 'normal', content))) - self._roomGameCmd(sent, ['radiocolSongRejected', ROOM_JID.full(), 'Too many songs in queue']) + self.assertEqualXML( + sent.toXml(), + self._expectedMessage( + JID( + ROOM_JID.userhost() + + "/" + + self.plugin_0045.getNick(0, profile_index), + "normal", + content, + ) + ), + ) + self._roomGameCmd( + sent, ["radiocolSongRejected", ROOM_JID.full(), "Too many songs in queue"] + ) def _noUploadCb(self): """Check if the message "no_upload" has been sent by the referee and process the command with the profiles of each room users""" sent = self.host.getSentMessage(0) content = "<no_upload xmlns=''/>" - self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID, 'groupchat', content)) - self._roomGameCmd(sent, ['radiocolNoUpload', ROOM_JID.full()]) + self.assertEqualXML( + sent.toXml(), self._expectedMessage(ROOM_JID, "groupchat", content) + ) + self._roomGameCmd(sent, ["radiocolNoUpload", ROOM_JID.full()]) def _uploadOkCb(self): """Check if the message "upload_ok" has been sent by the referee and process the command with the profiles of each room users""" sent = self.host.getSentMessage(0) content = "<upload_ok xmlns=''/>" - self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID, 'groupchat', content)) - self._roomGameCmd(sent, ['radiocolUploadOk', ROOM_JID.full()]) + self.assertEqualXML( + sent.toXml(), self._expectedMessage(ROOM_JID, "groupchat", content) + ) + self._roomGameCmd(sent, ["radiocolUploadOk", ROOM_JID.full()]) def _preloadCb(self, attrs, profile_index): """Check if the message "preload" has been sent by the referee @@ -138,15 +167,33 @@ @param profile_index: profile index of the uploader """ sent = self.host.getSentMessage(0) - attrs['sender'] = self.plugin_0045.getNick(0, profile_index) - radiocol_elt = domish.generateElementsNamed(sent.elements(), 'radiocol').next() - preload_elt = domish.generateElementsNamed(radiocol_elt.elements(), 'preload').next() - attrs['timestamp'] = preload_elt['timestamp'] # we could not guess it... - content = "<preload xmlns='' %s/>" % " ".join(["%s='%s'" % (attr, attrs[attr]) for attr in attrs]) - if sent.hasAttribute('from'): - del sent['from'] - self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID, 'groupchat', content)) - self._roomGameCmd(sent, ['radiocolPreload', ROOM_JID.full(), attrs['timestamp'], attrs['filename'], attrs['title'], attrs['artist'], attrs['album'], attrs['sender']]) + attrs["sender"] = self.plugin_0045.getNick(0, profile_index) + radiocol_elt = domish.generateElementsNamed(sent.elements(), "radiocol").next() + preload_elt = domish.generateElementsNamed( + radiocol_elt.elements(), "preload" + ).next() + attrs["timestamp"] = preload_elt["timestamp"] # we could not guess it... + content = "<preload xmlns='' %s/>" % " ".join( + ["%s='%s'" % (attr, attrs[attr]) for attr in attrs] + ) + if sent.hasAttribute("from"): + del sent["from"] + self.assertEqualXML( + sent.toXml(), self._expectedMessage(ROOM_JID, "groupchat", content) + ) + self._roomGameCmd( + sent, + [ + "radiocolPreload", + ROOM_JID.full(), + attrs["timestamp"], + attrs["filename"], + attrs["title"], + attrs["artist"], + attrs["album"], + attrs["sender"], + ], + ) def _playNextSongCb(self): """Check if the message "play" has been sent by the referee @@ -154,11 +201,13 @@ sent = self.host.getSentMessage(0) filename = self.playlist.pop(0) content = "<play xmlns='' filename='%s' />" % filename - self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID, 'groupchat', content)) - self._roomGameCmd(sent, ['radiocolPlay', ROOM_JID.full(), filename]) + self.assertEqualXML( + sent.toXml(), self._expectedMessage(ROOM_JID, "groupchat", content) + ) + self._roomGameCmd(sent, ["radiocolPlay", ROOM_JID.full(), filename]) game_data = self.plugin.games[ROOM_JID] - if len(game_data['queue']) == plugin.QUEUE_LIMIT - 1: + if len(game_data["queue"]) == plugin.QUEUE_LIMIT - 1: self._uploadOkCb() def _addSongCb(self, d, filepath, profile_index): @@ -174,7 +223,7 @@ game_data = self.plugin.games[ROOM_JID] # this is copied from the plugin - if filepath.lower().endswith('.mp3'): + if filepath.lower().endswith(".mp3"): actual_song = MP3(filepath) try: song = EasyID3(filepath) @@ -182,27 +231,36 @@ class Info(object): def __init__(self, length): self.length = length + song.info = Info(actual_song.info.length) except ID3NoHeaderError: song = actual_song else: song = OggVorbis(filepath) - attrs = {'filename': os.path.basename(filepath), - 'title': song.get("title", ["Unknown"])[0], - 'artist': song.get("artist", ["Unknown"])[0], - 'album': song.get("album", ["Unknown"])[0], - 'length': str(song.info.length) - } - self.assertEqual(game_data['to_delete'][attrs['filename']], filepath) + attrs = { + "filename": os.path.basename(filepath), + "title": song.get("title", ["Unknown"])[0], + "artist": song.get("artist", ["Unknown"])[0], + "album": song.get("album", ["Unknown"])[0], + "length": str(song.info.length), + } + self.assertEqual(game_data["to_delete"][attrs["filename"]], filepath) - content = "<song_added xmlns='' %s/>" % " ".join(["%s='%s'" % (attr, attrs[attr]) for attr in attrs]) + content = "<song_added xmlns='' %s/>" % " ".join( + ["%s='%s'" % (attr, attrs[attr]) for attr in attrs] + ) sent = self.host.getSentMessage(profile_index) - self.assertEqualXML(sent.toXml(), self._expectedMessage(REFEREE_FULL, 'normal', content)) + self.assertEqualXML( + sent.toXml(), self._expectedMessage(REFEREE_FULL, "normal", content) + ) - reject_song = len(game_data['queue']) >= plugin.QUEUE_LIMIT - no_upload = len(game_data['queue']) + 1 >= plugin.QUEUE_LIMIT - play_next = not game_data['playing'] and len(game_data['queue']) + 1 == plugin.QUEUE_TO_START + reject_song = len(game_data["queue"]) >= plugin.QUEUE_LIMIT + no_upload = len(game_data["queue"]) + 1 >= plugin.QUEUE_LIMIT + play_next = ( + not game_data["playing"] + and len(game_data["queue"]) + 1 == plugin.QUEUE_TO_START + ) self._roomGameCmd(sent, profile_index) # queue unchanged or +1 if reject_song: @@ -211,7 +269,7 @@ if no_upload: self._noUploadCb() self._preloadCb(attrs, profile_index) - self.playlist.append(attrs['filename']) + self.playlist.append(attrs["filename"]) if play_next: self._playNextSongCb() # queue -1 @@ -229,21 +287,28 @@ call = from_index from_index = 0 - sent['from'] = ROOM_JID.full() + '/' + self.plugin_0045.getNick(0, from_index) - recipient = JID(sent['to']).resource + sent["from"] = ROOM_JID.full() + "/" + self.plugin_0045.getNick(0, from_index) + recipient = JID(sent["to"]).resource # The message could have been sent to a room user (room_jid + '/' + nick), # but when it is received, the 'to' attribute of the message has been # changed to the recipient own JID. We need to simulate that here. if recipient: room = self.plugin_0045.getRoom(0, 0) - sent['to'] = Const.JID_STR[0] if recipient == room.nick else room.roster[recipient].entity.full() + sent["to"] = ( + Const.JID_STR[0] + if recipient == room.nick + else room.roster[recipient].entity.full() + ) for index in xrange(0, len(Const.PROFILE)): nick = self.plugin_0045.getNick(0, index) if nick: if not recipient or nick == recipient: - if call and (self.plugin.isPlayer(ROOM_JID, nick) or call[0] == 'radiocolStarted'): + if call and ( + self.plugin.isPlayer(ROOM_JID, nick) + or call[0] == "radiocolStarted" + ): args = copy.deepcopy(call) args.append(Const.PROFILE[index]) self.host.bridge.expectCall(*args) @@ -255,16 +320,35 @@ @param profile_index: index of the profile to be synchronized """ for nick in sync_data: - expected = self._expectedMessage(JID(ROOM_JID.userhost() + '/' + nick), 'normal', sync_data[nick]) + expected = self._expectedMessage( + JID(ROOM_JID.userhost() + "/" + nick), "normal", sync_data[nick] + ) sent = self.host.getSentMessage(0) self.assertEqualXML(sent.toXml(), expected) for elt in sync_data[nick]: - if elt.name == 'preload': - self.host.bridge.expectCall('radiocolPreload', ROOM_JID.full(), elt['timestamp'], elt['filename'], elt['title'], elt['artist'], elt['album'], elt['sender'], Const.PROFILE[profile_index]) - elif elt.name == 'play': - self.host.bridge.expectCall('radiocolPlay', ROOM_JID.full(), elt['filename'], Const.PROFILE[profile_index]) - elif elt.name == 'no_upload': - self.host.bridge.expectCall('radiocolNoUpload', ROOM_JID.full(), Const.PROFILE[profile_index]) + if elt.name == "preload": + self.host.bridge.expectCall( + "radiocolPreload", + ROOM_JID.full(), + elt["timestamp"], + elt["filename"], + elt["title"], + elt["artist"], + elt["album"], + elt["sender"], + Const.PROFILE[profile_index], + ) + elif elt.name == "play": + self.host.bridge.expectCall( + "radiocolPlay", + ROOM_JID.full(), + elt["filename"], + Const.PROFILE[profile_index], + ) + elif elt.name == "no_upload": + self.host.bridge.expectCall( + "radiocolNoUpload", ROOM_JID.full(), Const.PROFILE[profile_index] + ) sync_data[nick] self._roomGameCmd(sent, []) @@ -280,12 +364,12 @@ if player_index not in PLAYERS_INDICES: # this user is actually not a player self.assertFalse(self.plugin.isPlayer(ROOM_JID, user_nick)) - to_jid, type_ = (JID(ROOM_JID.userhost() + '/' + user_nick), 'normal') + to_jid, type_ = (JID(ROOM_JID.userhost() + "/" + user_nick), "normal") else: # this user is a player self.assertTrue(self.plugin.isPlayer(ROOM_JID, user_nick)) nicks.append(user_nick) - to_jid, type_ = (ROOM_JID, 'groupchat') + to_jid, type_ = (ROOM_JID, "groupchat") # Check that the message "players" has been sent by the referee expected = self._expectedMessage(to_jid, type_, self._buildPlayers(nicks)) @@ -293,7 +377,16 @@ self.assertEqualXML(sent.toXml(), expected) # Process the command with the profiles of each room users - self._roomGameCmd(sent, ['radiocolStarted', ROOM_JID.full(), REFEREE_FULL.full(), nicks, [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT]]) + self._roomGameCmd( + sent, + [ + "radiocolStarted", + ROOM_JID.full(), + REFEREE_FULL.full(), + nicks, + [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT], + ], + ) if sync: self._syncCb(self.plugin._getSyncData(ROOM_JID, [user_nick]), player_index) @@ -322,12 +415,14 @@ else: song_index = song_index % len(self.songs) src_filename = self.songs[song_index] - dst_filepath = '/tmp/%s%s' % (uuid.uuid1(), os.path.splitext(src_filename)[1]) + dst_filepath = "/tmp/%s%s" % (uuid.uuid1(), os.path.splitext(src_filename)[1]) shutil.copy(self.sound_dir + src_filename, dst_filepath) expect_io_error = False try: - d = self.plugin.radiocolSongAdded(REFEREE_FULL, dst_filepath, Const.PROFILE[profile_index]) + d = self.plugin.radiocolSongAdded( + REFEREE_FULL, dst_filepath, Const.PROFILE[profile_index] + ) except IOError: self.assertTrue(expect_io_error) return @@ -340,7 +435,7 @@ self.fail("Adding a song which is not OGG nor MP3 should fail!") self.assertEqual(failure.value.__class__, exceptions.DataError) - if src_filename.endswith('.ogg') or src_filename.endswith('.mp3'): + if src_filename.endswith(".ogg") or src_filename.endswith(".mp3"): d.addCallbacks(cb, cb) else: d.addCallbacks(eb, eb) @@ -362,14 +457,28 @@ nicks = [self.plugin_0045.getNick(0, 0)] sent = self.host.getSentMessage(0) - self.assertEqualXML(sent.toXml(), self._expectedMessage(ROOM_JID, 'groupchat', self._buildPlayers(nicks))) - self._roomGameCmd(sent, ['radiocolStarted', ROOM_JID.full(), REFEREE_FULL.full(), nicks, [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT]]) + self.assertEqualXML( + sent.toXml(), + self._expectedMessage(ROOM_JID, "groupchat", self._buildPlayers(nicks)), + ) + self._roomGameCmd( + sent, + [ + "radiocolStarted", + ROOM_JID.full(), + REFEREE_FULL.full(), + nicks, + [plugin.QUEUE_TO_START, plugin.QUEUE_LIMIT], + ], + ) self._joinRoom(room, nicks, 1) # player joins self._joinRoom(room, nicks, 4) # user not playing joins song_index = 0 - self._uploadSong(song_index, 0) # ogg or mp3 file should exist in sat_media/test/song + self._uploadSong( + song_index, 0 + ) # ogg or mp3 file should exist in sat_media/test/song self._uploadSong(None, 0) # non existing file # another songs are added by Const.JID[1] until the radio starts + 1 to fill the queue @@ -379,7 +488,9 @@ self.plugin.playNext(Const.MUC[0], PROFILE) # simulate the end of the first song self._playNextSongCb() - self._uploadSong(song_index, 1) # now the song is accepted and the queue is full again + self._uploadSong( + song_index, 1 + ) # now the song is accepted and the queue is full again self._joinRoom(room, nicks, 3) # new player joins @@ -396,7 +507,7 @@ self._playNextSongCb() for filename in self.playlist: - self.plugin.deleteFile('/tmp/' + filename) + self.plugin.deleteFile("/tmp/" + filename) return defer.succeed(None)