Commit c92d1b34 authored by Jim Fulton's avatar Jim Fulton

Merged the tseaver-better_repozo_tests branch.

parent 30fd2029
...@@ -93,6 +93,9 @@ READCHUNK = 16 * 1024 ...@@ -93,6 +93,9 @@ READCHUNK = 16 * 1024
VERBOSE = False VERBOSE = False
class WouldOverwriteFiles(Exception):
pass
def usage(code, msg=''): def usage(code, msg=''):
outfp = sys.stderr outfp = sys.stderr
if code == 0: if code == 0:
...@@ -301,7 +304,9 @@ def gen_filename(options, ext=None): ...@@ -301,7 +304,9 @@ def gen_filename(options, ext=None):
ext = '.deltafs' ext = '.deltafs'
if options.gzip: if options.gzip:
ext += 'z' ext += 'z'
t = time.gmtime()[:6] + (ext,) # Hook for testing
now = getattr(options, 'test_now', time.gmtime()[:6])
t = now + (ext,)
return '%04d-%02d-%02d-%02d-%02d-%02d%s' % t return '%04d-%02d-%02d-%02d-%02d-%02d%s' % t
# Return a list of files needed to reproduce state at time options.date. # Return a list of files needed to reproduce state at time options.date.
...@@ -419,8 +424,7 @@ def do_full_backup(options): ...@@ -419,8 +424,7 @@ def do_full_backup(options):
options.full = True options.full = True
dest = os.path.join(options.repository, gen_filename(options)) dest = os.path.join(options.repository, gen_filename(options))
if os.path.exists(dest): if os.path.exists(dest):
print >> sys.stderr, 'Cannot overwrite existing file:', dest raise WouldOverwriteFiles('Cannot overwrite existing file: %s' % dest)
sys.exit(2)
log('writing full backup: %s bytes to %s', pos, dest) log('writing full backup: %s bytes to %s', pos, dest)
sum = copyfile(options, dest, 0, pos) sum = copyfile(options, dest, 0, pos)
# Write the data file for this full backup # Write the data file for this full backup
...@@ -447,8 +451,7 @@ def do_incremental_backup(options, reposz, repofiles): ...@@ -447,8 +451,7 @@ def do_incremental_backup(options, reposz, repofiles):
options.full = False options.full = False
dest = os.path.join(options.repository, gen_filename(options)) dest = os.path.join(options.repository, gen_filename(options))
if os.path.exists(dest): if os.path.exists(dest):
print >> sys.stderr, 'Cannot overwrite existing file:', dest raise WouldOverwriteFiles('Cannot overwrite existing file: %s' % dest)
sys.exit(2)
log('writing incremental: %s bytes to %s', pos-reposz, dest) log('writing incremental: %s bytes to %s', pos-reposz, dest)
sum = copyfile(options, dest, reposz, pos - reposz) sum = copyfile(options, dest, reposz, pos - reposz)
# The first file in repofiles points to the last full backup. Use this to # The first file in repofiles points to the last full backup. Use this to
...@@ -570,7 +573,11 @@ def main(argv=None): ...@@ -570,7 +573,11 @@ def main(argv=None):
argv = sys.argv[1:] argv = sys.argv[1:]
options = parseargs(argv) options = parseargs(argv)
if options.mode == BACKUP: if options.mode == BACKUP:
try:
do_backup(options) do_backup(options)
except WouldOverwriteFiles, e:
print >> sys.stderr, str(e)
sys.exit(2)
else: else:
assert options.mode == RECOVER assert options.mode == RECOVER
do_recover(options) do_recover(options)
......
...@@ -13,12 +13,21 @@ ...@@ -13,12 +13,21 @@
############################################################################## ##############################################################################
import unittest import unittest
import os import os
try:
# the hashlib package is available from Python 2.5
from hashlib import md5
except ImportError:
# the md5 package is deprecated in Python 2.6
from md5 import new as md5
import ZODB.tests.util # layer used at class scope import ZODB.tests.util # layer used at class scope
_NOISY = os.environ.get('NOISY_REPOZO_TEST_OUTPUT') _NOISY = os.environ.get('NOISY_REPOZO_TEST_OUTPUT')
class OurDB: class OurDB:
_file_name = None
def __init__(self, dir): def __init__(self, dir):
from BTrees.OOBTree import OOBTree from BTrees.OOBTree import OOBTree
import transaction import transaction
...@@ -27,12 +36,13 @@ class OurDB: ...@@ -27,12 +36,13 @@ class OurDB:
conn = self.db.open() conn = self.db.open()
conn.root()['tree'] = OOBTree() conn.root()['tree'] = OOBTree()
transaction.commit() transaction.commit()
self.pos = self.db.storage._pos
self.close() self.close()
def getdb(self): def getdb(self):
from ZODB import DB from ZODB import DB
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
storage_filename = os.path.join(self.dir, 'Data.fs') self._file_name = storage_filename = os.path.join(self.dir, 'Data.fs')
storage = FileStorage(storage_filename) storage = FileStorage(storage_filename)
self.db = DB(storage) self.db = DB(storage)
...@@ -63,10 +73,573 @@ class OurDB: ...@@ -63,10 +73,573 @@ class OurDB:
if keys: if keys:
del tree[keys[0]] del tree[keys[0]]
transaction.commit() transaction.commit()
self.pos = self.db.storage._pos
self.close() self.close()
class RepozoTests(unittest.TestCase): class FileopsBase:
def _makeChunks(self):
from ZODB.scripts.repozo import READCHUNK
return ['x' * READCHUNK, 'y' * READCHUNK, 'z']
def _makeFile(self, text=None):
from StringIO import StringIO
if text is None:
text = ''.join(self._makeChunks())
return StringIO(text)
class Test_dofile(unittest.TestCase, FileopsBase):
def _callFUT(self, func, fp, n):
from ZODB.scripts.repozo import dofile
return dofile(func, fp, n)
def test_empty_read_all(self):
chunks = []
file = self._makeFile('')
bytes = self._callFUT(chunks.append, file, None)
self.assertEqual(bytes, 0)
self.assertEqual(chunks, [])
def test_empty_read_count(self):
chunks = []
file = self._makeFile('')
bytes = self._callFUT(chunks.append, file, 42)
self.assertEqual(bytes, 0)
self.assertEqual(chunks, [])
def test_nonempty_read_all(self):
chunks = []
file = self._makeFile()
bytes = self._callFUT(chunks.append, file, None)
self.assertEqual(bytes, file.tell())
self.assertEqual(chunks, self._makeChunks())
def test_nonempty_read_count(self):
chunks = []
file = self._makeFile()
bytes = self._callFUT(chunks.append, file, 42)
self.assertEqual(bytes, 42)
self.assertEqual(chunks, ['x' * 42])
class Test_checksum(unittest.TestCase, FileopsBase):
def _callFUT(self, fp, n):
from ZODB.scripts.repozo import checksum
return checksum(fp, n)
def test_empty_read_all(self):
file = self._makeFile('')
sum = self._callFUT(file, None)
self.assertEqual(sum, md5('').hexdigest())
def test_empty_read_count(self):
file = self._makeFile('')
sum = self._callFUT(file, 42)
self.assertEqual(sum, md5('').hexdigest())
def test_nonempty_read_all(self):
file = self._makeFile()
sum = self._callFUT(file, None)
self.assertEqual(sum, md5(''.join(self._makeChunks())).hexdigest())
def test_nonempty_read_count(self):
chunks = []
file = self._makeFile()
sum = self._callFUT(file, 42)
self.assertEqual(sum, md5('x' * 42).hexdigest())
class OptionsTestBase:
_repository_directory = None
_data_directory = None
def tearDown(self):
if self._repository_directory is not None:
from shutil import rmtree
rmtree(self._repository_directory)
if self._data_directory is not None:
from shutil import rmtree
rmtree(self._data_directory)
def _makeOptions(self, **kw):
import tempfile
self._repository_directory = tempfile.mkdtemp()
class Options(object):
repository = self._repository_directory
def __init__(self, **kw):
self.__dict__.update(kw)
return Options(**kw)
class Test_copyfile(OptionsTestBase, unittest.TestCase):
def _callFUT(self, options, dest, start, n):
from ZODB.scripts.repozo import copyfile
return copyfile(options, dest, start, n)
def test_no_gzip(self):
options = self._makeOptions(gzip=False)
source = options.file = os.path.join(self._repository_directory,
'source.txt')
f = open(source, 'wb')
f.write('x' * 1000)
f.close()
target = os.path.join(self._repository_directory, 'target.txt')
sum = self._callFUT(options, target, 0, 100)
self.assertEqual(sum, md5('x' * 100).hexdigest())
self.assertEqual(open(target, 'rb').read(), 'x' * 100)
def test_w_gzip(self):
import gzip
options = self._makeOptions(gzip=True)
source = options.file = os.path.join(self._repository_directory,
'source.txt')
f = open(source, 'wb')
f.write('x' * 1000)
f.close()
target = os.path.join(self._repository_directory, 'target.txt')
sum = self._callFUT(options, target, 0, 100)
self.assertEqual(sum, md5('x' * 100).hexdigest())
self.assertEqual(gzip.open(target, 'rb').read(), 'x' * 100)
class Test_concat(OptionsTestBase, unittest.TestCase):
def _callFUT(self, files, ofp):
from ZODB.scripts.repozo import concat
return concat(files, ofp)
def _makeFile(self, name, text, gzip_file=False):
import gzip
import tempfile
if self._repository_directory is None:
self._repository_directory = tempfile.mkdtemp()
fqn = os.path.join(self._repository_directory, name)
if gzip_file:
f = gzip.open(fqn, 'wb')
else:
f = open(fqn, 'wb')
f.write(text)
f.flush()
f.close()
return fqn
def test_empty_list_no_ofp(self):
bytes, sum = self._callFUT([], None)
self.assertEqual(bytes, 0)
self.assertEqual(sum, md5('').hexdigest())
def test_w_plain_files_no_ofp(self):
files = [self._makeFile(x, x, False) for x in 'ABC']
bytes, sum = self._callFUT(files, None)
self.assertEqual(bytes, 3)
self.assertEqual(sum, md5('ABC').hexdigest())
def test_w_gzipped_files_no_ofp(self):
files = [self._makeFile('%s.fsz' % x, x, True) for x in 'ABC']
bytes, sum = self._callFUT(files, None)
self.assertEqual(bytes, 3)
self.assertEqual(sum, md5('ABC').hexdigest())
def test_w_ofp(self):
class Faux:
_closed = False
def __init__(self):
self._written = []
def write(self, data):
self._written.append(data)
def close(self):
self._closed = True
files = [self._makeFile(x, x, False) for x in 'ABC']
ofp = Faux()
bytes, sum = self._callFUT(files, ofp)
self.assertEqual(ofp._written, [x for x in 'ABC'])
self.failUnless(ofp._closed)
_marker = object()
class Test_gen_filename(OptionsTestBase, unittest.TestCase):
def _callFUT(self, options, ext=_marker):
from ZODB.scripts.repozo import gen_filename
if ext is _marker:
return gen_filename(options)
return gen_filename(options, ext)
def test_explicit_ext(self):
options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31))
fn = self._callFUT(options, '.txt')
self.assertEqual(fn, '2010-05-14-12-52-31.txt')
def test_full_no_gzip(self):
options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31),
full = True,
gzip = False,
)
fn = self._callFUT(options)
self.assertEqual(fn, '2010-05-14-12-52-31.fs')
def test_full_w_gzip(self):
options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31),
full = True,
gzip = True,
)
fn = self._callFUT(options)
self.assertEqual(fn, '2010-05-14-12-52-31.fsz')
def test_incr_no_gzip(self):
options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31),
full = False,
gzip = False,
)
fn = self._callFUT(options)
self.assertEqual(fn, '2010-05-14-12-52-31.deltafs')
def test_incr_w_gzip(self):
options = self._makeOptions(test_now = (2010, 5, 14, 12, 52, 31),
full = False,
gzip = True,
)
fn = self._callFUT(options)
self.assertEqual(fn, '2010-05-14-12-52-31.deltafsz')
class Test_find_files(OptionsTestBase, unittest.TestCase):
def _callFUT(self, options):
from ZODB.scripts.repozo import find_files
return find_files(options)
def _makeFile(self, hour, min, sec, ext):
# call _makeOptions first!
name = '2010-05-14-%02d-%02d-%02d%s' % (hour, min, sec, ext)
fqn = os.path.join(self._repository_directory, name)
f = open(fqn, 'wb')
f.write(name)
f.flush()
f.close()
return fqn
def test_explicit_date(self):
options = self._makeOptions(date='2010-05-14-13-30-57')
files = []
for h, m, s, e in [(2, 13, 14, '.fs'),
(2, 13, 14, '.dat'),
(3, 14, 15, '.deltafs'),
(4, 14, 15, '.deltafs'),
(5, 14, 15, '.deltafs'),
(12, 13, 14, '.fs'),
(12, 13, 14, '.dat'),
(13, 14, 15, '.deltafs'),
(14, 15, 16, '.deltafs'),
]:
files.append(self._makeFile(h, m, s, e))
found = self._callFUT(options)
# Older files, .dat file not included
self.assertEqual(found, [files[5], files[7]])
def test_using_gen_filename(self):
options = self._makeOptions(date=None,
test_now=(2010, 5, 14, 13, 30, 57))
files = []
for h, m, s, e in [(2, 13, 14, '.fs'),
(2, 13, 14, '.dat'),
(3, 14, 15, '.deltafs'),
(4, 14, 15, '.deltafs'),
(5, 14, 15, '.deltafs'),
(12, 13, 14, '.fs'),
(12, 13, 14, '.dat'),
(13, 14, 15, '.deltafs'),
(14, 15, 16, '.deltafs'),
]:
files.append(self._makeFile(h, m, s, e))
found = self._callFUT(options)
# Older files, .dat file not included
self.assertEqual(found, [files[5], files[7]])
class Test_scandat(OptionsTestBase, unittest.TestCase):
def _callFUT(self, repofiles):
from ZODB.scripts.repozo import scandat
return scandat(repofiles)
def test_no_dat_file(self):
options = self._makeOptions()
fsfile = os.path.join(self._repository_directory, 'foo.fs')
fn, startpos, endpos, sum = self._callFUT([fsfile])
self.assertEqual(fn, None)
self.assertEqual(startpos, None)
self.assertEqual(endpos, None)
self.assertEqual(sum, None)
def test_empty_dat_file(self):
options = self._makeOptions()
fsfile = os.path.join(self._repository_directory, 'foo.fs')
datfile = os.path.join(self._repository_directory, 'foo.dat')
open(datfile, 'wb').close()
fn, startpos, endpos, sum = self._callFUT([fsfile])
self.assertEqual(fn, None)
self.assertEqual(startpos, None)
self.assertEqual(endpos, None)
self.assertEqual(sum, None)
def test_single_line(self):
options = self._makeOptions()
fsfile = os.path.join(self._repository_directory, 'foo.fs')
datfile = os.path.join(self._repository_directory, 'foo.dat')
f = open(datfile, 'wb')
f.write('foo.fs 0 123 ABC\n')
f.flush()
f.close()
fn, startpos, endpos, sum = self._callFUT([fsfile])
self.assertEqual(fn, 'foo.fs')
self.assertEqual(startpos, 0)
self.assertEqual(endpos, 123)
self.assertEqual(sum, 'ABC')
def test_multiple_lines(self):
options = self._makeOptions()
fsfile = os.path.join(self._repository_directory, 'foo.fs')
datfile = os.path.join(self._repository_directory, 'foo.dat')
f = open(datfile, 'wb')
f.write('foo.fs 0 123 ABC\n')
f.write('bar.deltafs 123 456 DEF\n')
f.flush()
f.close()
fn, startpos, endpos, sum = self._callFUT([fsfile])
self.assertEqual(fn, 'bar.deltafs')
self.assertEqual(startpos, 123)
self.assertEqual(endpos, 456)
self.assertEqual(sum, 'DEF')
class Test_delete_old_backups(OptionsTestBase, unittest.TestCase):
def _makeOptions(self, filenames=()):
options = super(Test_delete_old_backups, self)._makeOptions()
for filename in filenames:
fqn = os.path.join(options.repository, filename)
f = open(fqn, 'wb')
f.write('testing delete_old_backups')
f.close()
return options
def _callFUT(self, options=None, filenames=()):
from ZODB.scripts.repozo import delete_old_backups
if options is None:
options = self._makeOptions(filenames)
return delete_old_backups(options)
def test_empty_dir_doesnt_raise(self):
self._callFUT()
self.assertEqual(len(os.listdir(self._repository_directory)), 0)
def test_no_repozo_files_doesnt_raise(self):
FILENAMES = ['bogus.txt', 'not_a_repozo_file']
self._callFUT(filenames=FILENAMES)
remaining = os.listdir(self._repository_directory)
self.assertEqual(len(remaining), len(FILENAMES))
for name in FILENAMES:
fqn = os.path.join(self._repository_directory, name)
self.failUnless(os.path.isfile(fqn))
def test_doesnt_remove_current_repozo_files(self):
FILENAMES = ['2009-12-20-10-08-03.fs', '2009-12-20-10-08-03.dat']
self._callFUT(filenames=FILENAMES)
remaining = os.listdir(self._repository_directory)
self.assertEqual(len(remaining), len(FILENAMES))
for name in FILENAMES:
fqn = os.path.join(self._repository_directory, name)
self.failUnless(os.path.isfile(fqn))
def test_removes_older_repozo_files(self):
OLDER_FULL = ['2009-12-20-00-01-03.fs', '2009-12-20-00-01-03.dat']
DELTAS = ['2009-12-21-00-00-01.deltafs', '2009-12-22-00-00-01.deltafs']
CURRENT_FULL = ['2009-12-23-00-00-01.fs', '2009-12-23-00-00-01.dat']
FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
self._callFUT(filenames=FILENAMES)
remaining = os.listdir(self._repository_directory)
self.assertEqual(len(remaining), len(CURRENT_FULL))
for name in OLDER_FULL:
fqn = os.path.join(self._repository_directory, name)
self.failIf(os.path.isfile(fqn))
for name in DELTAS:
fqn = os.path.join(self._repository_directory, name)
self.failIf(os.path.isfile(fqn))
for name in CURRENT_FULL:
fqn = os.path.join(self._repository_directory, name)
self.failUnless(os.path.isfile(fqn))
def test_removes_older_repozo_files_zipped(self):
OLDER_FULL = ['2009-12-20-00-01-03.fsz', '2009-12-20-00-01-03.dat']
DELTAS = ['2009-12-21-00-00-01.deltafsz',
'2009-12-22-00-00-01.deltafsz']
CURRENT_FULL = ['2009-12-23-00-00-01.fsz', '2009-12-23-00-00-01.dat']
FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
self._callFUT(filenames=FILENAMES)
remaining = os.listdir(self._repository_directory)
self.assertEqual(len(remaining), len(CURRENT_FULL))
for name in OLDER_FULL:
fqn = os.path.join(self._repository_directory, name)
self.failIf(os.path.isfile(fqn))
for name in DELTAS:
fqn = os.path.join(self._repository_directory, name)
self.failIf(os.path.isfile(fqn))
for name in CURRENT_FULL:
fqn = os.path.join(self._repository_directory, name)
self.failUnless(os.path.isfile(fqn))
class Test_do_full_backup(OptionsTestBase, unittest.TestCase):
def _callFUT(self, options):
from ZODB.scripts.repozo import do_full_backup
return do_full_backup(options)
def _makeDB(self):
import tempfile
datadir = self._data_directory = tempfile.mkdtemp()
return OurDB(self._data_directory)
def test_dont_overwrite_existing_file(self):
from ZODB.scripts.repozo import WouldOverwriteFiles
from ZODB.scripts.repozo import gen_filename
db = self._makeDB()
options = self._makeOptions(full=True,
file=db._file_name,
gzip=False,
test_now = (2010, 5, 14, 10, 51, 22),
)
f = open(os.path.join(self._repository_directory,
gen_filename(options)), 'w')
f.write('TESTING')
f.flush()
f.close()
self.assertRaises(WouldOverwriteFiles, self._callFUT, options)
def test_empty(self):
from ZODB.scripts.repozo import gen_filename
db = self._makeDB()
options = self._makeOptions(file=db._file_name,
gzip=False,
killold=False,
test_now = (2010, 5, 14, 10, 51, 22),
)
self._callFUT(options)
target = os.path.join(self._repository_directory,
gen_filename(options))
original = open(db._file_name, 'rb').read()
self.assertEqual(open(target, 'rb').read(), original)
datfile = os.path.join(self._repository_directory,
gen_filename(options, '.dat'))
self.assertEqual(open(datfile).read(),
'%s 0 %d %s\n' %
(target, len(original), md5(original).hexdigest()))
class Test_do_incremental_backup(OptionsTestBase, unittest.TestCase):
def _callFUT(self, options, reposz, repofiles):
from ZODB.scripts.repozo import do_incremental_backup
return do_incremental_backup(options, reposz, repofiles)
def _makeDB(self):
import tempfile
datadir = self._data_directory = tempfile.mkdtemp()
return OurDB(self._data_directory)
def test_dont_overwrite_existing_file(self):
from ZODB.scripts.repozo import WouldOverwriteFiles
from ZODB.scripts.repozo import gen_filename
from ZODB.scripts.repozo import find_files
db = self._makeDB()
options = self._makeOptions(full=False,
file=db._file_name,
gzip=False,
test_now = (2010, 5, 14, 10, 51, 22),
date = None,
)
f = open(os.path.join(self._repository_directory,
gen_filename(options)), 'w')
f.write('TESTING')
f.flush()
f.close()
repofiles = find_files(options)
self.assertRaises(WouldOverwriteFiles,
self._callFUT, options, 0, repofiles)
def test_no_changes(self):
from ZODB.scripts.repozo import gen_filename
db = self._makeDB()
oldpos = db.pos
options = self._makeOptions(file=db._file_name,
gzip=False,
killold=False,
test_now = (2010, 5, 14, 10, 51, 22),
date = None,
)
fullfile = os.path.join(self._repository_directory,
'2010-05-14-00-00-00.fs')
original = open(db._file_name, 'rb').read()
last = len(original)
f = open(fullfile, 'wb')
f.write(original)
f.flush()
f.close()
datfile = os.path.join(self._repository_directory,
'2010-05-14-00-00-00.dat')
repofiles = [fullfile, datfile]
self._callFUT(options, oldpos, repofiles)
target = os.path.join(self._repository_directory,
gen_filename(options))
self.assertEqual(open(target, 'rb').read(), '')
self.assertEqual(open(datfile).read(),
'%s %d %d %s\n' %
(target, oldpos, oldpos, md5('').hexdigest()))
def test_w_changes(self):
from ZODB.scripts.repozo import gen_filename
db = self._makeDB()
oldpos = db.pos
options = self._makeOptions(file=db._file_name,
gzip=False,
killold=False,
test_now = (2010, 5, 14, 10, 51, 22),
date = None,
)
fullfile = os.path.join(self._repository_directory,
'2010-05-14-00-00-00.fs')
original = open(db._file_name, 'rb').read()
f = open(fullfile, 'wb')
f.write(original)
f.flush()
f.close()
datfile = os.path.join(self._repository_directory,
'2010-05-14-00-00-00.dat')
repofiles = [fullfile, datfile]
db.mutate()
newpos = db.pos
self._callFUT(options, oldpos, repofiles)
target = os.path.join(self._repository_directory,
gen_filename(options))
f = open(db._file_name, 'rb')
f.seek(oldpos)
increment = f.read()
self.assertEqual(open(target, 'rb').read(), increment)
self.assertEqual(open(datfile).read(),
'%s %d %d %s\n' %
(target, oldpos, newpos,
md5(increment).hexdigest()))
class MonteCarloTests(unittest.TestCase):
layer = ZODB.tests.util.MininalTestLayer('repozo') layer = ZODB.tests.util.MininalTestLayer('repozo')
...@@ -96,7 +669,7 @@ class RepozoTests(unittest.TestCase): ...@@ -96,7 +669,7 @@ class RepozoTests(unittest.TestCase):
from ZODB.scripts.repozo import main from ZODB.scripts.repozo import main
main(argv) main(argv)
def testRepozo(self): def test_via_monte_carlo(self):
self.saved_snapshots = [] # list of (name, time) pairs for copies. self.saved_snapshots = [] # list of (name, time) pairs for copies.
for i in range(100): for i in range(100):
...@@ -168,94 +741,18 @@ class RepozoTests(unittest.TestCase): ...@@ -168,94 +741,18 @@ class RepozoTests(unittest.TestCase):
(correctpath, when, ' '.join(argv))) (correctpath, when, ' '.join(argv)))
self.assertEquals(fguts, gguts, msg) self.assertEquals(fguts, gguts, msg)
class Test_delete_old_backups(unittest.TestCase):
_repository_directory = None
def tearDown(self):
if self._repository_directory is not None:
from shutil import rmtree
rmtree(self._repository_directory)
def _callFUT(self, options=None, filenames=()):
from ZODB.scripts.repozo import delete_old_backups
if options is None:
options = self._makeOptions(filenames)
delete_old_backups(options)
def _makeOptions(self, filenames=()):
import tempfile
dir = self._repository_directory = tempfile.mkdtemp()
for filename in filenames:
fqn = os.path.join(dir, filename)
f = open(fqn, 'wb')
f.write('testing delete_old_backups')
f.close()
class Options(object):
repository = dir
return Options()
def test_empty_dir_doesnt_raise(self):
self._callFUT()
self.assertEqual(len(os.listdir(self._repository_directory)), 0)
def test_no_repozo_files_doesnt_raise(self):
FILENAMES = ['bogus.txt', 'not_a_repozo_file']
self._callFUT(filenames=FILENAMES)
remaining = os.listdir(self._repository_directory)
self.assertEqual(len(remaining), len(FILENAMES))
for name in FILENAMES:
fqn = os.path.join(self._repository_directory, name)
self.failUnless(os.path.isfile(fqn))
def test_doesnt_remove_current_repozo_files(self):
FILENAMES = ['2009-12-20-10-08-03.fs', '2009-12-20-10-08-03.dat']
self._callFUT(filenames=FILENAMES)
remaining = os.listdir(self._repository_directory)
self.assertEqual(len(remaining), len(FILENAMES))
for name in FILENAMES:
fqn = os.path.join(self._repository_directory, name)
self.failUnless(os.path.isfile(fqn))
def test_removes_older_repozo_files(self):
OLDER_FULL = ['2009-12-20-00-01-03.fs', '2009-12-20-00-01-03.dat']
DELTAS = ['2009-12-21-00-00-01.deltafs', '2009-12-22-00-00-01.deltafs']
CURRENT_FULL = ['2009-12-23-00-00-01.fs', '2009-12-23-00-00-01.dat']
FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
self._callFUT(filenames=FILENAMES)
remaining = os.listdir(self._repository_directory)
self.assertEqual(len(remaining), len(CURRENT_FULL))
for name in OLDER_FULL:
fqn = os.path.join(self._repository_directory, name)
self.failIf(os.path.isfile(fqn))
for name in DELTAS:
fqn = os.path.join(self._repository_directory, name)
self.failIf(os.path.isfile(fqn))
for name in CURRENT_FULL:
fqn = os.path.join(self._repository_directory, name)
self.failUnless(os.path.isfile(fqn))
def test_removes_older_repozo_files_zipped(self):
OLDER_FULL = ['2009-12-20-00-01-03.fsz', '2009-12-20-00-01-03.dat']
DELTAS = ['2009-12-21-00-00-01.deltafsz',
'2009-12-22-00-00-01.deltafsz']
CURRENT_FULL = ['2009-12-23-00-00-01.fsz', '2009-12-23-00-00-01.dat']
FILENAMES = OLDER_FULL + DELTAS + CURRENT_FULL
self._callFUT(filenames=FILENAMES)
remaining = os.listdir(self._repository_directory)
self.assertEqual(len(remaining), len(CURRENT_FULL))
for name in OLDER_FULL:
fqn = os.path.join(self._repository_directory, name)
self.failIf(os.path.isfile(fqn))
for name in DELTAS:
fqn = os.path.join(self._repository_directory, name)
self.failIf(os.path.isfile(fqn))
for name in CURRENT_FULL:
fqn = os.path.join(self._repository_directory, name)
self.failUnless(os.path.isfile(fqn))
def test_suite(): def test_suite():
return unittest.TestSuite([ return unittest.TestSuite([
unittest.makeSuite(RepozoTests), unittest.makeSuite(Test_dofile),
unittest.makeSuite(Test_checksum),
unittest.makeSuite(Test_copyfile),
unittest.makeSuite(Test_concat),
unittest.makeSuite(Test_gen_filename),
unittest.makeSuite(Test_find_files),
unittest.makeSuite(Test_scandat),
unittest.makeSuite(Test_delete_old_backups), unittest.makeSuite(Test_delete_old_backups),
unittest.makeSuite(Test_do_full_backup),
unittest.makeSuite(Test_do_incremental_backup),
unittest.makeSuite(MonteCarloTests),
]) ])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment