Mercurial > gcp
comparison test_gcp.py @ 60:40ec9445060a
added test suit
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 20 Jun 2011 14:27:06 +0200 |
parents | gcp_test.py@d4f1235c860b |
children |
comparison
equal
deleted
inserted
replaced
59:7830d4244fb5 | 60:40ec9445060a |
---|---|
1 #!/usr/bin/python | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 """ | |
5 gcp: Goffi's CoPier | |
6 Copyright (C) 2010, 2011 Jérôme Poisson <goffi@goffi.org> | |
7 | |
8 This program is free software: you can redistribute it and/or modify | |
9 it under the terms of the GNU General Public License as published by | |
10 the Free Software Foundation, either version 3 of the License, or | |
11 (at your option) any later version. | |
12 | |
13 This program is distributed in the hope that it will be useful, | |
14 but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 GNU General Public License for more details. | |
17 | |
18 You should have received a copy of the GNU General Public License | |
19 along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 """ | |
21 | |
22 from __future__ import with_statement | |
23 import tempfile | |
24 import unittest | |
25 from os import getcwd, chdir, system, mkdir, makedirs, listdir | |
26 from os.path import join, isdir | |
27 from shutil import rmtree | |
28 from random import randrange | |
29 from hashlib import sha1 | |
30 | |
31 #size shorcuts | |
32 S10K = 1024 * 10 | |
33 S100K = 1024 * 100 | |
34 S1M = 1024 * 1024 | |
35 S10M = 1024 * 1024 * 10 | |
36 S100M = 1024 * 1024 * 100 | |
37 | |
38 | |
39 def sha1sum(filename, buf_size=4096): | |
40 """Return the SHA1 hash of a file | |
41 @param filename: path to the file | |
42 @param buf_size: size of the buffer to use for calculation""" | |
43 csum = sha1() | |
44 with open(filename) as fd: | |
45 data = fd.read(buf_size) | |
46 while data: | |
47 csum.update(data) | |
48 data = fd.read(buf_size) | |
49 return csum.digest() | |
50 | |
51 def dirCheck(dir_path): | |
52 """Recursively calculate SHA1 sum of a dir | |
53 @param path: path of the dir to check | |
54 @return: a dict in the form [{filepath: sum,...}] | |
55 """ | |
56 def recursive_sum(directory, result): | |
57 for current_path in listdir(directory): | |
58 full_path = join(directory, current_path) | |
59 if isdir(full_path): | |
60 recursive_sum(full_path, result) | |
61 else: | |
62 result[full_path] = sha1sum(full_path) | |
63 | |
64 result = {} | |
65 _ori_dir = getcwd() | |
66 chdir(dir_path) | |
67 recursive_sum(".", result) | |
68 chdir(_ori_dir) | |
69 return result | |
70 | |
71 #def makeRandomFile(path, size, buf_size=4096): | |
72 # """Create a fake file | |
73 # @param path: where the file is created | |
74 # @param size: size of the file to create in bytes""" | |
75 # def seq(size): | |
76 # return ''.join(chr(randrange(256)) for i in range(size)) | |
77 # fd = open(path, 'w') | |
78 # for byte in range(size//buf_size): | |
79 # fd.write(seq(buf_size)) | |
80 # fd.write(seq(size%buf_size)) | |
81 # fd.close() | |
82 | |
83 def makeRandomFile(path, size=S10K, buf_size=4096): | |
84 """Create a fake file using /dev/urandom | |
85 @param path: where the file must be created | |
86 @param size: size of the file to create in bytes""" | |
87 source = open('/dev/urandom','r') | |
88 dest = open(path, 'w') | |
89 for byte in range(size//buf_size): | |
90 dest.write(source.read(buf_size)) | |
91 dest.write(source.read(size%buf_size)) | |
92 dest.close() | |
93 | |
94 def makeTestDir(path): | |
95 """Helper method to easily create a test dir | |
96 @param path: where the dir must be created""" | |
97 | |
98 for i in range(2): | |
99 subdir = join(path,'subdir_%d' % i) | |
100 makedirs(subdir) | |
101 for j in range(2): | |
102 makeRandomFile(join(subdir, 'file_%d' % j), S10K) | |
103 for i in range(2): | |
104 makeRandomFile(join(path,'file_%d' % i), S10K) | |
105 | |
106 class TestCopyCases(unittest.TestCase): | |
107 """Test basic copy use cases, using gcp externally | |
108 gcp must be available in the PATH""" | |
109 #TODO: check journal | |
110 | |
111 def setUp(self): | |
112 self.ori_dir = getcwd() | |
113 self.tmp_dir = tempfile.mkdtemp() | |
114 chdir(self.tmp_dir) | |
115 | |
116 def tearDown(self): | |
117 chdir(self.ori_dir) | |
118 rmtree(self.tmp_dir) | |
119 | |
120 def test_one_file_copy(self): | |
121 """Copy one file and test the result""" | |
122 makeRandomFile('file_1', S10K) | |
123 ori_sum = sha1sum('file_1') | |
124 ret = system("gcp file_1 file_2") | |
125 self.assertEqual(ret,0) | |
126 dest_sum = sha1sum('file_2') | |
127 self.assertEqual(ori_sum, dest_sum) | |
128 | |
129 def test_one_file_copy_already_exists(self): | |
130 """Check that an existing file is not overwritten""" | |
131 makeRandomFile('file_1', S10K) | |
132 makeRandomFile('file_2', S10K) | |
133 file_2_sum = sha1sum('file_2') | |
134 ret = system("gcp file_1 file_2") | |
135 self.assertNotEqual(ret,0) | |
136 file_2_sum_bis = sha1sum('file_2') | |
137 self.assertEqual(file_2_sum, file_2_sum_bis) | |
138 | |
139 def test_one_file_copy_already_exists_force(self): | |
140 """Check that an existing file is overwritten with --force""" | |
141 makeRandomFile('file_1', S10K) | |
142 makeRandomFile('file_2', S10K) | |
143 file_1_sum = sha1sum('file_1') | |
144 ret = system("gcp -f file_1 file_2") | |
145 self.assertEqual(ret,0) | |
146 file_2_sum_bis = sha1sum('file_2') | |
147 self.assertEqual(file_1_sum, file_2_sum_bis) | |
148 | |
149 def test_one_dir_copy(self): | |
150 """Check copy of one dir to a non existant path""" | |
151 makeTestDir('dir_1') | |
152 check_1 = dirCheck('dir_1') | |
153 ret = system("gcp -r dir_1 dir_2") | |
154 self.assertEqual(ret,0) | |
155 check_2 = dirCheck('dir_2') | |
156 self.assertEqual(check_1, check_2) | |
157 | |
158 def test_one_dir_copy_nocopy(self): | |
159 """Check that a dir is not copied without the recursive option""" | |
160 makeTestDir('dir_1') | |
161 check_before = dirCheck('.') | |
162 ret = system("gcp dir_1 dir_2") | |
163 self.assertEqual(ret,0) | |
164 check_after = dirCheck('.') | |
165 self.assertEqual(check_before, check_after) | |
166 | |
167 def test_one_dir_copy_existing_dest(self): | |
168 """Check that a dir is copied inside an existing destination""" | |
169 makeTestDir('dir_1') | |
170 mkdir('dir_2') | |
171 check_1 = dirCheck('dir_1') | |
172 ret = system("gcp -r dir_1 dir_2") | |
173 self.assertEqual(ret,0) | |
174 self.assertEqual(listdir('dir_2'), ['dir_1']) | |
175 check_2 = dirCheck('dir_2/dir_1') | |
176 self.assertEqual(check_1, check_2) | |
177 | |
178 def test_mixt_copy_existing_dest(self): | |
179 """Check that a mixt copy (files + dir) to an existing dest work as expected""" | |
180 for i in range(2): | |
181 makeRandomFile('file_%d' % i, S10K) | |
182 makeTestDir('dir_%d' % i) | |
183 check_1 = dirCheck('.') | |
184 mkdir('dest_dir') | |
185 ret = system("gcp -r file_0 file_1 dir_0 dir_1 dest_dir") | |
186 self.assertEqual(ret,0) | |
187 check_2 = dirCheck('dest_dir') | |
188 self.assertEqual(check_1, check_2) | |
189 | |
190 def test_mixt_copy_nonexisting_dest(self): | |
191 """Check that a mixt copy (files + dir) to an non existing dest work as expected | |
192 /!\\ the behavious is different of the one of cp in this case ! (cp doesn't copy at all, while gcp create the dest)""" | |
193 for i in range(2): | |
194 makeRandomFile('file_%d' % i, S10K) | |
195 makeTestDir('dir_%d' % i) | |
196 check_1 = dirCheck('.') | |
197 ret = system("gcp -r file_0 file_1 dir_0 dir_1 dest_dir") | |
198 self.assertEqual(ret,0) | |
199 check_2 = dirCheck('dest_dir') | |
200 self.assertEqual(check_1, check_2) | |
201 | |
202 def test_mixt_copy_existing_dest_nonrecursive(self): | |
203 """Check that a mixt copy (files + dir) to an existing dest without the recursive option work as expected""" | |
204 for i in range(2): | |
205 makeRandomFile('file_%d' % i, S10K) | |
206 makeTestDir('dir_%d' % i) | |
207 mkdir('dest_dir') | |
208 ret = system("gcp file_0 file_1 dir_0 dir_1 dest_dir") | |
209 self.assertEqual(ret,0) | |
210 self.assertEqual(set(listdir('dest_dir')), set(['file_0', 'file_1'])) | |
211 self.assertEqual(sha1sum('file_0'), sha1sum('dest_dir/file_0')) | |
212 self.assertEqual(sha1sum('file_1'), sha1sum('dest_dir/file_1')) | |
213 | |
214 def test_mixt_copy_nonexisting_dest_nonrecursive(self): | |
215 """Check that a mixt copy (files + dir) to an existing dest without the recursive option work as expected""" | |
216 for i in range(2): | |
217 makeRandomFile('file_%d' % i, S10K) | |
218 makeTestDir('dir_%d' % i) | |
219 check_before = dirCheck('.') | |
220 ret = system("gcp file_0 file_1 dir_0 dir_1 dest_dir") | |
221 self.assertEqual(ret >> 8, 1) | |
222 check_after = dirCheck('.') | |
223 self.assertEqual(check_before, check_after) | |
224 | |
225 if __name__ == '__main__': | |
226 unittest.main() |