57
|
1 #!/usr/bin/python |
|
2 # -*- coding: utf-8 -*- |
|
3 |
|
4 """ |
|
5 gcp: Goffi's CoPier |
|
6 Copyright (C) 2010, 2011 Jérôme Poisson <goffi@goffi.org> |
|
7 |
|
8 This program is free software: you can redistribute it and/or modify |
|
9 it under the terms of the GNU General Public License as published by |
|
10 the Free Software Foundation, either version 3 of the License, or |
|
11 (at your option) any later version. |
|
12 |
|
13 This program is distributed in the hope that it will be useful, |
|
14 but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
16 GNU General Public License for more details. |
|
17 |
|
18 You should have received a copy of the GNU General Public License |
|
19 along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
20 """ |
|
21 |
|
22 from __future__ import with_statement |
|
23 import tempfile |
|
24 import unittest |
60
|
25 from os import getcwd, chdir, system, mkdir, makedirs, listdir |
|
26 from os.path import join, isdir |
57
|
27 from shutil import rmtree |
|
28 from random import randrange |
|
29 from hashlib import sha1 |
|
30 |
|
31 #size shorcuts |
|
32 S10K = 1024 * 10 |
|
33 S100K = 1024 * 100 |
|
34 S1M = 1024 * 1024 |
|
35 S10M = 1024 * 1024 * 10 |
|
36 S100M = 1024 * 1024 * 100 |
|
37 |
|
38 |
|
39 def sha1sum(filename, buf_size=4096): |
60
|
40 """Return the SHA1 hash of a file |
|
41 @param filename: path to the file |
|
42 @param buf_size: size of the buffer to use for calculation""" |
57
|
43 csum = sha1() |
|
44 with open(filename) as fd: |
|
45 data = fd.read(buf_size) |
|
46 while data: |
|
47 csum.update(data) |
|
48 data = fd.read(buf_size) |
|
49 return csum.digest() |
|
50 |
60
|
51 def dirCheck(dir_path): |
|
52 """Recursively calculate SHA1 sum of a dir |
|
53 @param path: path of the dir to check |
|
54 @return: a dict in the form [{filepath: sum,...}] |
|
55 """ |
|
56 def recursive_sum(directory, result): |
|
57 for current_path in listdir(directory): |
|
58 full_path = join(directory, current_path) |
|
59 if isdir(full_path): |
|
60 recursive_sum(full_path, result) |
|
61 else: |
|
62 result[full_path] = sha1sum(full_path) |
|
63 |
|
64 result = {} |
|
65 _ori_dir = getcwd() |
|
66 chdir(dir_path) |
|
67 recursive_sum(".", result) |
|
68 chdir(_ori_dir) |
|
69 return result |
|
70 |
57
|
71 #def makeRandomFile(path, size, buf_size=4096): |
|
72 # """Create a fake file |
|
73 # @param path: where the file is created |
|
74 # @param size: size of the file to create in bytes""" |
|
75 # def seq(size): |
|
76 # return ''.join(chr(randrange(256)) for i in range(size)) |
|
77 # fd = open(path, 'w') |
|
78 # for byte in range(size//buf_size): |
|
79 # fd.write(seq(buf_size)) |
|
80 # fd.write(seq(size%buf_size)) |
|
81 # fd.close() |
|
82 |
60
|
83 def makeRandomFile(path, size=S10K, buf_size=4096): |
57
|
84 """Create a fake file using /dev/urandom |
60
|
85 @param path: where the file must be created |
57
|
86 @param size: size of the file to create in bytes""" |
|
87 source = open('/dev/urandom','r') |
|
88 dest = open(path, 'w') |
|
89 for byte in range(size//buf_size): |
|
90 dest.write(source.read(buf_size)) |
|
91 dest.write(source.read(size%buf_size)) |
|
92 dest.close() |
|
93 |
60
|
94 def makeTestDir(path): |
|
95 """Helper method to easily create a test dir |
|
96 @param path: where the dir must be created""" |
|
97 |
|
98 for i in range(2): |
|
99 subdir = join(path,'subdir_%d' % i) |
|
100 makedirs(subdir) |
|
101 for j in range(2): |
|
102 makeRandomFile(join(subdir, 'file_%d' % j), S10K) |
|
103 for i in range(2): |
|
104 makeRandomFile(join(path,'file_%d' % i), S10K) |
57
|
105 |
|
106 class TestCopyCases(unittest.TestCase): |
|
107 """Test basic copy use cases, using gcp externally |
|
108 gcp must be available in the PATH""" |
|
109 #TODO: check journal |
|
110 |
|
111 def setUp(self): |
|
112 self.ori_dir = getcwd() |
|
113 self.tmp_dir = tempfile.mkdtemp() |
|
114 chdir(self.tmp_dir) |
|
115 |
|
116 def tearDown(self): |
|
117 chdir(self.ori_dir) |
|
118 rmtree(self.tmp_dir) |
|
119 |
|
120 def test_one_file_copy(self): |
|
121 """Copy one file and test the result""" |
|
122 makeRandomFile('file_1', S10K) |
|
123 ori_sum = sha1sum('file_1') |
|
124 ret = system("gcp file_1 file_2") |
|
125 self.assertEqual(ret,0) |
|
126 dest_sum = sha1sum('file_2') |
|
127 self.assertEqual(ori_sum, dest_sum) |
|
128 |
|
129 def test_one_file_copy_already_exists(self): |
|
130 """Check that an existing file is not overwritten""" |
|
131 makeRandomFile('file_1', S10K) |
|
132 makeRandomFile('file_2', S10K) |
|
133 file_2_sum = sha1sum('file_2') |
|
134 ret = system("gcp file_1 file_2") |
|
135 self.assertNotEqual(ret,0) |
|
136 file_2_sum_bis = sha1sum('file_2') |
|
137 self.assertEqual(file_2_sum, file_2_sum_bis) |
|
138 |
|
139 def test_one_file_copy_already_exists_force(self): |
|
140 """Check that an existing file is overwritten with --force""" |
|
141 makeRandomFile('file_1', S10K) |
|
142 makeRandomFile('file_2', S10K) |
|
143 file_1_sum = sha1sum('file_1') |
|
144 ret = system("gcp -f file_1 file_2") |
|
145 self.assertEqual(ret,0) |
|
146 file_2_sum_bis = sha1sum('file_2') |
|
147 self.assertEqual(file_1_sum, file_2_sum_bis) |
|
148 |
60
|
149 def test_one_dir_copy(self): |
|
150 """Check copy of one dir to a non existant path""" |
|
151 makeTestDir('dir_1') |
|
152 check_1 = dirCheck('dir_1') |
|
153 ret = system("gcp -r dir_1 dir_2") |
|
154 self.assertEqual(ret,0) |
|
155 check_2 = dirCheck('dir_2') |
|
156 self.assertEqual(check_1, check_2) |
|
157 |
|
158 def test_one_dir_copy_nocopy(self): |
|
159 """Check that a dir is not copied without the recursive option""" |
|
160 makeTestDir('dir_1') |
|
161 check_before = dirCheck('.') |
|
162 ret = system("gcp dir_1 dir_2") |
|
163 self.assertEqual(ret,0) |
|
164 check_after = dirCheck('.') |
|
165 self.assertEqual(check_before, check_after) |
|
166 |
|
167 def test_one_dir_copy_existing_dest(self): |
|
168 """Check that a dir is copied inside an existing destination""" |
|
169 makeTestDir('dir_1') |
|
170 mkdir('dir_2') |
|
171 check_1 = dirCheck('dir_1') |
|
172 ret = system("gcp -r dir_1 dir_2") |
|
173 self.assertEqual(ret,0) |
|
174 self.assertEqual(listdir('dir_2'), ['dir_1']) |
|
175 check_2 = dirCheck('dir_2/dir_1') |
|
176 self.assertEqual(check_1, check_2) |
|
177 |
|
178 def test_mixt_copy_existing_dest(self): |
|
179 """Check that a mixt copy (files + dir) to an existing dest work as expected""" |
|
180 for i in range(2): |
|
181 makeRandomFile('file_%d' % i, S10K) |
|
182 makeTestDir('dir_%d' % i) |
|
183 check_1 = dirCheck('.') |
|
184 mkdir('dest_dir') |
|
185 ret = system("gcp -r file_0 file_1 dir_0 dir_1 dest_dir") |
|
186 self.assertEqual(ret,0) |
|
187 check_2 = dirCheck('dest_dir') |
|
188 self.assertEqual(check_1, check_2) |
|
189 |
|
190 def test_mixt_copy_nonexisting_dest(self): |
|
191 """Check that a mixt copy (files + dir) to an non existing dest work as expected |
|
192 /!\\ the behavious is different of the one of cp in this case ! (cp doesn't copy at all, while gcp create the dest)""" |
|
193 for i in range(2): |
|
194 makeRandomFile('file_%d' % i, S10K) |
|
195 makeTestDir('dir_%d' % i) |
|
196 check_1 = dirCheck('.') |
|
197 ret = system("gcp -r file_0 file_1 dir_0 dir_1 dest_dir") |
|
198 self.assertEqual(ret,0) |
|
199 check_2 = dirCheck('dest_dir') |
|
200 self.assertEqual(check_1, check_2) |
|
201 |
|
202 def test_mixt_copy_existing_dest_nonrecursive(self): |
|
203 """Check that a mixt copy (files + dir) to an existing dest without the recursive option work as expected""" |
|
204 for i in range(2): |
|
205 makeRandomFile('file_%d' % i, S10K) |
|
206 makeTestDir('dir_%d' % i) |
|
207 mkdir('dest_dir') |
|
208 ret = system("gcp file_0 file_1 dir_0 dir_1 dest_dir") |
|
209 self.assertEqual(ret,0) |
|
210 self.assertEqual(set(listdir('dest_dir')), set(['file_0', 'file_1'])) |
|
211 self.assertEqual(sha1sum('file_0'), sha1sum('dest_dir/file_0')) |
|
212 self.assertEqual(sha1sum('file_1'), sha1sum('dest_dir/file_1')) |
|
213 |
|
214 def test_mixt_copy_nonexisting_dest_nonrecursive(self): |
|
215 """Check that a mixt copy (files + dir) to an existing dest without the recursive option work as expected""" |
|
216 for i in range(2): |
|
217 makeRandomFile('file_%d' % i, S10K) |
|
218 makeTestDir('dir_%d' % i) |
|
219 check_before = dirCheck('.') |
|
220 ret = system("gcp file_0 file_1 dir_0 dir_1 dest_dir") |
|
221 self.assertEqual(ret >> 8, 1) |
|
222 check_after = dirCheck('.') |
|
223 self.assertEqual(check_before, check_after) |
57
|
224 |
|
225 if __name__ == '__main__': |
|
226 unittest.main() |