Commit 7f4c41b3 authored by Tres Seaver's avatar Tres Seaver

Suppress resource warnings under Py3k for 'test_repozo'.

parent 2f429e61
......@@ -31,6 +31,29 @@ except ImportError:
_NOISY = os.environ.get('NOISY_REPOZO_TEST_OUTPUT')
class _GzipCloser(object):
def __init__(self, fqn, mode):
import gzip
self._opened = gzip.open(fqn, mode)
def __enter__(self):
return self._opened
def __exit__(self, exc_type, exc_value, traceback):
self._opened.close()
def _write_file(name, bits, mode='wb'):
with open(name, mode) as f:
f.write(bits)
f.flush()
def _read_file(name, mode='rb'):
with open(name, mode) as f:
return f.read()
class OurDB:
_file_name = None
......@@ -182,7 +205,6 @@ class OptionsTestBase:
self.__dict__.update(kw)
return Options(**kw)
class Test_copyfile(OptionsTestBase, unittest.TestCase):
def _callFUT(self, options, dest, start, n):
......@@ -193,26 +215,22 @@ class Test_copyfile(OptionsTestBase, unittest.TestCase):
options = self._makeOptions(gzip=False)
source = options.file = os.path.join(self._repository_directory,
'source.txt')
f = open(source, 'wb')
f.write(b'x' * 1000)
f.close()
_write_file(source, b'x' * 1000)
target = os.path.join(self._repository_directory, 'target.txt')
sum = self._callFUT(options, target, 0, 100)
self.assertEqual(sum, md5(b'x' * 100).hexdigest())
self.assertEqual(open(target, 'rb').read(), b'x' * 100)
self.assertEqual(_read_file(target), b'x' * 100)
def test_w_gzip(self):
import gzip
options = self._makeOptions(gzip=True)
source = options.file = os.path.join(self._repository_directory,
'source.txt')
f = open(source, 'wb')
f.write(b'x' * 1000)
f.close()
_write_file(source, b'x' * 1000)
target = os.path.join(self._repository_directory, 'target.txt')
sum = self._callFUT(options, target, 0, 100)
self.assertEqual(sum, md5(b'x' * 100).hexdigest())
self.assertEqual(gzip.open(target, 'rb').read(), b'x' * 100)
with _GzipCloser(target, 'rb') as f:
self.assertEqual(f.read(), b'x' * 100)
class Test_concat(OptionsTestBase, unittest.TestCase):
......@@ -222,18 +240,17 @@ class Test_concat(OptionsTestBase, unittest.TestCase):
return concat(files, ofp)
def _makeFile(self, name, text, gzip_file=False):
import gzip
import tempfile
if self._repository_directory is None:
self._repository_directory = tempfile.mkdtemp()
fqn = os.path.join(self._repository_directory, name)
if gzip_file:
f = gzip.open(fqn, 'wb')
_opener = _GzipCloser
else:
f = open(fqn, 'wb')
f.write(text)
f.flush()
f.close()
_opener = open
with _opener(fqn, 'wb') as f:
f.write(text)
f.flush()
return fqn
def test_empty_list_no_ofp(self):
......@@ -327,10 +344,7 @@ class Test_find_files(OptionsTestBase, unittest.TestCase):
# call _makeOptions first!
name = '2010-05-14-%02d-%02d-%02d%s' % (hour, min, sec, ext)
fqn = os.path.join(self._repository_directory, name)
f = open(fqn, 'wb')
f.write(name.encode())
f.flush()
f.close()
_write_file(fqn, name.encode())
return fqn
def test_no_files(self):
......@@ -395,7 +409,7 @@ class Test_scandat(OptionsTestBase, unittest.TestCase):
options = self._makeOptions()
fsfile = os.path.join(self._repository_directory, 'foo.fs')
datfile = os.path.join(self._repository_directory, 'foo.dat')
open(datfile, 'wb').close()
_write_file(datfile, b'')
fn, startpos, endpos, sum = self._callFUT([fsfile])
self.assertEqual(fn, None)
self.assertEqual(startpos, None)
......@@ -406,10 +420,7 @@ class Test_scandat(OptionsTestBase, unittest.TestCase):
options = self._makeOptions()
fsfile = os.path.join(self._repository_directory, 'foo.fs')
datfile = os.path.join(self._repository_directory, 'foo.dat')
f = open(datfile, 'wb')
f.write(b'foo.fs 0 123 ABC\n')
f.flush()
f.close()
_write_file(datfile, b'foo.fs 0 123 ABC\n')
fn, startpos, endpos, sum = self._callFUT([fsfile])
self.assertEqual(fn, 'foo.fs')
self.assertEqual(startpos, 0)
......@@ -420,11 +431,8 @@ class Test_scandat(OptionsTestBase, unittest.TestCase):
options = self._makeOptions()
fsfile = os.path.join(self._repository_directory, 'foo.fs')
datfile = os.path.join(self._repository_directory, 'foo.dat')
f = open(datfile, 'wb')
f.write(b'foo.fs 0 123 ABC\n')
f.write(b'bar.deltafs 123 456 DEF\n')
f.flush()
f.close()
_write_file(datfile, b'foo.fs 0 123 ABC\n'
b'bar.deltafs 123 456 DEF\n')
fn, startpos, endpos, sum = self._callFUT([fsfile])
self.assertEqual(fn, 'bar.deltafs')
self.assertEqual(startpos, 123)
......@@ -438,9 +446,7 @@ class Test_delete_old_backups(OptionsTestBase, unittest.TestCase):
options = super(Test_delete_old_backups, self)._makeOptions()
for filename in filenames:
fqn = os.path.join(options.repository, filename)
f = open(fqn, 'wb')
f.write(b'testing delete_old_backups')
f.close()
_write_file(fqn, b'testing delete_old_backups')
return options
def _callFUT(self, options=None, filenames=()):
......@@ -551,11 +557,8 @@ class Test_do_full_backup(OptionsTestBase, unittest.TestCase):
gzip=False,
test_now = (2010, 5, 14, 10, 51, 22),
)
f = open(os.path.join(self._repository_directory,
gen_filename(options)), 'w')
f.write('TESTING')
f.flush()
f.close()
fqn = os.path.join(self._repository_directory, gen_filename(options))
_write_file(fqn, b'TESTING')
self.assertRaises(WouldOverwriteFiles, self._callFUT, options)
def test_empty(self):
......@@ -571,11 +574,11 @@ class Test_do_full_backup(OptionsTestBase, unittest.TestCase):
self._callFUT(options)
target = os.path.join(self._repository_directory,
gen_filename(options))
original = open(db._file_name, 'rb').read()
self.assertEqual(open(target, 'rb').read(), original)
original = _read_file(db._file_name)
self.assertEqual(_read_file(target), original)
datfile = os.path.join(self._repository_directory,
gen_filename(options, '.dat'))
self.assertEqual(open(datfile).read(),
self.assertEqual(_read_file(datfile, mode='r'), #XXX 'rb'?
'%s 0 %d %s\n' %
(target, len(original), md5(original).hexdigest()))
ndxfile = os.path.join(self._repository_directory,
......@@ -611,11 +614,8 @@ class Test_do_incremental_backup(OptionsTestBase, unittest.TestCase):
test_now = (2010, 5, 14, 10, 51, 22),
date = None,
)
f = open(os.path.join(self._repository_directory,
gen_filename(options)), 'w')
f.write('TESTING')
f.flush()
f.close()
fqn = os.path.join(self._repository_directory, gen_filename(options))
_write_file(fqn, b'TESTING')
repofiles = find_files(options)
self.assertRaises(WouldOverwriteFiles,
self._callFUT, options, 0, repofiles)
......@@ -634,20 +634,17 @@ class Test_do_incremental_backup(OptionsTestBase, unittest.TestCase):
)
fullfile = os.path.join(self._repository_directory,
'2010-05-14-00-00-00.fs')
original = open(db._file_name, 'rb').read()
original = _read_file(db._file_name)
last = len(original)
f = open(fullfile, 'wb')
f.write(original)
f.flush()
f.close()
_write_file(fullfile, original)
datfile = os.path.join(self._repository_directory,
'2010-05-14-00-00-00.dat')
repofiles = [fullfile, datfile]
self._callFUT(options, oldpos, repofiles)
target = os.path.join(self._repository_directory,
gen_filename(options))
self.assertEqual(open(target, 'rb').read(), b'')
self.assertEqual(open(datfile).read(),
self.assertEqual(_read_file(target), b'')
self.assertEqual(_read_file(datfile, mode='r'), #XXX mode='rb'?
'%s %d %d %s\n' %
(target, oldpos, oldpos, md5(b'').hexdigest()))
ndxfile = os.path.join(self._repository_directory,
......@@ -674,11 +671,8 @@ class Test_do_incremental_backup(OptionsTestBase, unittest.TestCase):
)
fullfile = os.path.join(self._repository_directory,
'2010-05-14-00-00-00.fs')
original = open(db._file_name, 'rb').read()
f = open(fullfile, 'wb')
f.write(original)
f.flush()
f.close()
original = _read_file(db._file_name)
f = _write_file(fullfile, original)
datfile = os.path.join(self._repository_directory,
'2010-05-14-00-00-00.dat')
repofiles = [fullfile, datfile]
......@@ -687,11 +681,11 @@ class Test_do_incremental_backup(OptionsTestBase, unittest.TestCase):
self._callFUT(options, oldpos, repofiles)
target = os.path.join(self._repository_directory,
gen_filename(options))
f = open(db._file_name, 'rb')
f.seek(oldpos)
increment = f.read()
self.assertEqual(open(target, 'rb').read(), increment)
self.assertEqual(open(datfile).read(),
with open(db._file_name, 'rb') as f:
f.seek(oldpos)
increment = f.read()
self.assertEqual(_read_file(target), increment)
self.assertEqual(_read_file(datfile, mode='r'), #XXX mode='rb'?
'%s %d %d %s\n' %
(target, oldpos, newpos,
md5(increment).hexdigest()))
......@@ -717,10 +711,7 @@ class Test_do_recover(OptionsTestBase, unittest.TestCase):
if text is None:
text = name
fqn = os.path.join(self._repository_directory, name)
f = open(fqn, 'wb')
f.write(text.encode())
f.flush()
f.close()
f = _write_file(fqn, text.encode())
return fqn
def test_no_files(self):
......@@ -756,7 +747,7 @@ class Test_do_recover(OptionsTestBase, unittest.TestCase):
self._makeFile(2, 3, 4, '.fs', 'AAA')
self._makeFile(4, 5, 6, '.fs', 'BBB')
self._callFUT(options)
self.assertEqual(open(output, 'rb').read(), b'BBB')
self.assertEqual(_read_file(output), b'BBB')
def test_w_full_backup_latest_index(self):
import tempfile
......@@ -769,8 +760,8 @@ class Test_do_recover(OptionsTestBase, unittest.TestCase):
self._makeFile(4, 5, 6, '.fs', 'BBB')
self._makeFile(4, 5, 6, '.index', 'CCC')
self._callFUT(options)
self.assertEqual(open(output, 'rb').read(), b'BBB')
self.assertEqual(open(index, 'rb').read(), b'CCC')
self.assertEqual(_read_file(output), b'BBB')
self.assertEqual(_read_file(index), b'CCC')
def test_w_incr_backup_latest_no_index(self):
import tempfile
......@@ -782,7 +773,7 @@ class Test_do_recover(OptionsTestBase, unittest.TestCase):
self._makeFile(2, 3, 4, '.fs', 'AAA')
self._makeFile(4, 5, 6, '.deltafs', 'BBB')
self._callFUT(options)
self.assertEqual(open(output, 'rb').read(), b'AAABBB')
self.assertEqual(_read_file(output), b'AAABBB')
def test_w_incr_backup_latest_index(self):
import tempfile
......@@ -795,8 +786,8 @@ class Test_do_recover(OptionsTestBase, unittest.TestCase):
self._makeFile(4, 5, 6, '.deltafs', 'BBB')
self._makeFile(4, 5, 6, '.index', 'CCC')
self._callFUT(options)
self.assertEqual(open(output, 'rb').read(), b'AAABBB')
self.assertEqual(open(index, 'rb').read(), b'CCC')
self.assertEqual(_read_file(output), b'AAABBB')
self.assertEqual(_read_file(index), b'CCC')
class MonteCarloTests(unittest.TestCase):
......@@ -890,12 +881,8 @@ class MonteCarloTests(unittest.TestCase):
self._callRepozoMain(argv)
# check restored file content is equal to file that was backed up
f = open(correctpath, 'rb')
g = open(restoredfile, 'rb')
fguts = f.read()
gguts = g.read()
f.close()
g.close()
fguts = _read_file(correctpath)
gguts = _read_file(restoredfile)
msg = ("guts don't match\ncorrectpath=%r when=%r\n cmd=%r" %
(correctpath, when, ' '.join(argv)))
self.assertEqual(fguts, gguts, msg)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment