Commit db7f03f0 authored by bescoto's avatar bescoto

Added metadata diffing, and an iterfile hash bugfix


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@669 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 58073772
......@@ -4,6 +4,9 @@ New in v1.1.1 (????/??/??)
rdiff-backup now writes SHA1 sums into its mirror_metadata file for
all regular files, and checks them when restoring.
The above greatly increases the size of the mirror_metadata files, so
diff them for space efficiency, as suggested by Dean Gaudet.
Added two new comparison modes: full file (using the --compare-full or
--compare-full-at-time) or by hash (--compare-hash and
--compare-hash-at-time).
......
For comparing, check source filesystem
For comparing, check source filesystem's abilities
Clean up connection dropped message
Add metadata diffing
Make sure regress handles metadata diffs
Clean up compare reports
......
......@@ -55,7 +55,7 @@ def get_delta_sigrp(rp_signature, rp_new):
def get_delta_sigrp_hash(rp_signature, rp_new):
"""Like above but also calculate hash of new as close() value"""
log.Log("Getting delta with hash of %s with signature %s" %
log.Log("Getting delta (with hash) of %s with signature %s" %
(rp_new.path, rp_signature.get_indexpath()), 7)
return librsync.DeltaFile(rp_signature.open("rb"),
hash.FileWrapper(rp_new.open("rb")))
......
......@@ -206,12 +206,12 @@ def vet_request(request, arglist):
"""Examine request for security violations"""
#if Globals.server: sys.stderr.write(str(request) + "\n")
security_level = Globals.security_level
if security_level == "override": return
if Globals.restrict_path:
for arg in arglist:
if isinstance(arg, rpath.RPath): vet_rpath(arg)
if request.function_string in file_requests:
vet_filename(request, arglist)
if security_level == "override": return
if request.function_string in allowed_requests: return
if request.function_string in ("Globals.set", "Globals.set_local"):
if arglist[0] not in disallowed_server_globals: return
......
......@@ -443,6 +443,7 @@ class CacheCollatedPostProcess:
dir_rp, perms = self.dir_perms_list.pop()
dir_rp.chmod(perms)
self.metawriter.close()
metadata.ManagerObj.ConvertMetaToDiff()
if Globals.print_statistics: statistics.print_active_stats()
if Globals.file_statistics: statistics.FileStats.close()
......
......@@ -44,6 +44,7 @@ class UnwrapFile:
"o" for an object,
"f" for file,
"c" for a continution of a file,
"h" for the close value of a file
"e" for an exception, or
None if no more data can be read.
......@@ -57,7 +58,7 @@ class UnwrapFile:
assert None, "Header %s is only %d bytes" % (header, len(header))
type, length = header[0], C.str2long(header[1:])
buf = self.file.read(length)
if type in ("o", "e"): return type, cPickle.loads(buf)
if type in ("o", "e", "h"): return type, cPickle.loads(buf)
else:
assert type in ("f", "c")
return type, buf
......@@ -82,11 +83,7 @@ class IterWrappingFile(UnwrapFile):
type, data = self._get()
if not type: raise StopIteration
if type == "o" or type == "e": return data
elif type == "f":
file = IterVirtualFile(self, data)
if data: self.currently_in_file = file
else: self.currently_in_file = None
return file
elif type == "f": return IterVirtualFile(self, data)
else: raise IterFileException("Bad file type %s" % type)
......@@ -107,8 +104,10 @@ class IterVirtualFile(UnwrapFile):
"""
UnwrapFile.__init__(self, iwf.file)
self.iwf = iwf
iwf.currently_in_file = self
self.buffer = initial_data
self.closed = None
if not initial_data: self.set_close_val()
def read(self, length = -1):
"""Read length bytes from the file, updating buffers as necessary"""
......@@ -140,15 +139,24 @@ class IterVirtualFile(UnwrapFile):
self.buffer += data
return 1
else:
self.iwf.currently_in_file = None
self.set_close_val()
return None
def set_close_val(self):
"""Read the close value and clear currently_in_file"""
assert self.iwf.currently_in_file
self.iwf.currently_in_file = None
type, object = self.iwf._get()
assert type == 'h', type
self.close_value = object
def close(self):
"""Currently just reads whats left and discards it"""
while self.iwf.currently_in_file:
self.addtobuffer()
self.buffer = ""
self.closed = 1
return self.close_value
class FileWrappingIter:
......@@ -214,13 +222,16 @@ class FileWrappingIter:
buf = robust.check_common_error(self.read_error_handler,
self.currently_in_file.read,
[Globals.blocksize])
if buf == "" or buf is None:
self.currently_in_file.close()
self.currently_in_file = None
if buf is None: # error occurred above, encode exception
prefix_letter = "e"
buf = cPickle.dumps(self.last_exception, 1)
total = "".join((prefix_letter, C.long2str(long(len(buf))), buf))
if buf is None: # error occurred above, encode exception
self.currently_in_file = None
excstr = cPickle.dumps(self.last_exception, 1)
total = "".join(('e', C.long2str(long(len(excstr))), excstr))
else:
total = "".join((prefix_letter, C.long2str(long(len(buf))), buf))
if buf == "": # end of file
cstr = cPickle.dumps(self.currently_in_file.close(), 1)
self.currently_in_file = None
total += "".join(('h', C.long2str(long(len(cstr))), cstr))
self.array_buf.fromstring(total)
def read_error_handler(self, exc, blocksize):
......@@ -386,11 +397,7 @@ class FileToMiscIter(IterWrappingFile):
def get_file(self):
"""Read file object from file"""
type, data = self._get()
if type == "f":
file = IterVirtualFile(self, data)
if data: self.currently_in_file = file
else: self.currently_in_file = None
return file
if type == "f": return IterVirtualFile(self, data)
assert type == "e", "Expected type e, got %s" % (type,)
assert isinstance(data, Exception)
return ErrorFile(data)
......@@ -411,7 +418,7 @@ class FileToMiscIter(IterWrappingFile):
type, length = self.buf[0], C.str2long(self.buf[1:8])
data = self.buf[8:8+length]
self.buf = self.buf[8+length:]
if type in "oer": return type, cPickle.loads(data)
if type in "oerh": return type, cPickle.loads(data)
else: return type, data
......
......@@ -267,8 +267,10 @@ class FlatExtractor:
"""Yield all text records in order"""
while 1:
next_pos = self.get_next_pos()
if self.at_end:
if next_pos: yield self.buf[:next_pos]
break
yield self.buf[:next_pos]
if self.at_end: break
self.buf = self.buf[next_pos:]
assert not self.fileobj.close()
......@@ -428,16 +430,23 @@ class Manager:
def __init__(self):
"""Set listing of rdiff-backup-data dir"""
self.rplist = []
self.timerpmap = {}
self.timerpmap, self.prefixmap = {}, {}
for filename in Globals.rbdir.listdir():
rp = Globals.rbdir.append(filename)
if rp.isincfile():
self.rplist.append(rp)
time = rp.getinctime()
if self.timerpmap.has_key(time):
self.timerpmap[time].append(rp)
else: self.timerpmap[time] = [rp]
if rp.isincfile(): self.add_incrp(rp)
def add_incrp(self, rp):
"""Add rp to list of inc rps in the rbdir"""
self.rplist.append(rp)
time = rp.getinctime()
if self.timerpmap.has_key(time):
self.timerpmap[time].append(rp)
else: self.timerpmap[time] = [rp]
incbase = rp.getincbase_str()
if self.prefixmap.has_key(incbase): self.prefixmap[incbase].append(rp)
else: self.prefixmap[incbase] = [rp]
def _iter_helper(self, prefix, flatfileclass, time, restrict_index):
"""Used below to find the right kind of file by time"""
if not self.timerpmap.has_key(time): return None
......@@ -490,6 +499,8 @@ class Manager:
filename = '%s.%s.%s.gz' % (prefix, timestr, typestr)
rp = Globals.rbdir.append(filename)
assert not rp.lstat(), "File %s already exists!" % (rp.path,)
assert rp.isincfile()
self.add_incrp(rp)
return flatfileclass(rp, 'w')
def get_meta_writer(self, typestr, time):
......@@ -514,49 +525,112 @@ class Manager:
return metawriter # no need for a CombinedWriter
if Globals.eas_active: ea_writer = self.get_ea_writer(typestr, time)
else: ea_writer = None
if Globals.acls_active: acl_writer = self.get_acl_writer(typestr, time)
else: acl_writer = None
return CombinedWriter(metawriter, ea_writer, acl_writer)
ManagerObj = None # Set this later to Manager instance
def SetManager():
global ManagerObj
ManagerObj = Manager()
class PatchDiffMan(Manager):
"""Contains functions for patching and diffing metadata
def patch(*meta_iters):
"""Return an iterator of metadata files by combining all the given iters
To save space, we can record a full list of only the most recent
metadata, using the normal rdiff-backup reverse increment
strategy. Instead of using librsync to compute diffs, though, we
use our own technique so that the diff files are still
hand-editable.
The iters should be given as a list/tuple in reverse chronological
order. The earliest rorp in each iter will supercede all the
later ones.
A mirror_metadata diff has the same format as a mirror_metadata
snapshot. If the record for an index is missing from the diff, it
indicates no change from the original. If it is present it
replaces the mirror_metadata entry, unless it has Type None, which
indicates the record should be deleted from the original.
"""
for meta_tuple in rorpiter.CollateIterators(*meta_iters):
for i in range(len(meta_tuple)-1, -1, -1):
if meta_tuple[i]:
if meta_tuple[i].lstat(): yield meta_tuple[i]
break # move to next index
else: assert 0, "No valid rorps"
max_diff_chain = 9 # After this many diffs, make a new snapshot
def get_diffiter(self, new_iter, old_iter):
"""Iterate meta diffs of new_iter -> old_iter"""
for new_rorp, old_rorp in rorpiter.Collate2Iters(new_iter, old_iter):
if not old_rorp: yield rpath.RORPath(new_rorp.index)
elif not new_rorp or new_rorp.data != old_rorp.data:
# exact compare here, can't use == on rorps
yield old_rorp
def sorted_meta_inclist(self, min_time = 0):
"""Return list of mirror_metadata incs, reverse sorted by time"""
if not self.prefixmap.has_key('mirror_metadata'): return []
sortlist = [(rp.getinctime(), rp)
for rp in self.prefixmap['mirror_metadata']]
sortlist.sort()
sortlist.reverse()
return [rp for (time, rp) in sortlist if time >= min_time]
def check_needs_diff(self):
"""Check if we should diff, returns (new, old) rps, or (None, None)"""
inclist = self.sorted_meta_inclist()
assert len(inclist) >= 1
if len(inclist) == 1: return (None, None)
newrp, oldrp = inclist[:2]
assert newrp.getinctype() == oldrp.getinctype() == 'snapshot'
chainlen = 1
for rp in inclist[2:]:
if rp.getinctype() != 'diff': break
chainlen += 1
if chainlen >= self.max_diff_chain: return (None, None)
return (newrp, oldrp)
def ConvertMetaToDiff(self):
"""Replace a mirror snapshot with a diff if it's appropriate"""
newrp, oldrp = self.check_needs_diff()
if not newrp: return
log.Log("Writing mirror_metadata diff", 6)
diff_writer = self.get_meta_writer('diff', oldrp.getinctime())
new_iter = MetadataFile(newrp, 'r').get_objects()
old_iter = MetadataFile(oldrp, 'r').get_objects()
for diff_rorp in self.get_diffiter(new_iter, old_iter):
diff_writer.write_object(diff_rorp)
diff_writer.close() # includes sync
oldrp.delete()
def Convert_diff(cur_time, old_time):
"""Convert the metadata snapshot at old_time to diff format
def get_meta_at_time(self, time, restrict_index):
"""Get metadata rorp iter, possibly by patching with diffs"""
meta_iters = [MetadataFile(rp, 'r').get_objects(restrict_index)
for rp in self.relevant_meta_incs(time)]
if not meta_iters: return None
if len(meta_iters) == 1: return meta_iters[0]
return self.iterate_patched_meta(meta_iters)
def relevant_meta_incs(self, time):
"""Return list [snapshotrp, diffrps ...] time sorted"""
inclist = self.sorted_meta_inclist(min_time = time)
if not inclist: return inclist
assert inclist[-1].getinctime() == time, inclist[-1]
for i in range(len(inclist)-1, -1, -1):
if inclist[i].getinctype() == 'snapshot':
return inclist[i:]
assert 0, "Inclist %s contains no snapshots" % (inclist,)
def iterate_patched_meta(self, meta_iter_list):
"""Return an iter of metadata rorps by combining the given iters
The iters should be given as a list/tuple in reverse
chronological order. The earliest rorp in each iter will
supercede all the later ones.
The point is just to save space. The diff format is simple, just
include in the diff all of the older rorps that are different in
the two metadata rorps.
"""
for meta_tuple in rorpiter.CollateIterators(*meta_iter_list):
for i in range(len(meta_tuple)-1, -1, -1):
if meta_tuple[i]:
if meta_tuple[i].lstat(): yield meta_tuple[i]
break # move to next index
else: assert 0, "No valid rorps"
"""
rblist = [Globals.rbdir.append(filename)
for filename in robust.listrp(Globals.rbdir)]
cur_iter = MetadataFile.get_objects_at_time(
Globals.rbdir, cur_time, None, rblist)
old_iter = MetadataFile.get_objects_at_time(
Globals.rbdir, old_time, None, rblist)
assert cur_iter.type == old_iter.type == 'snapshot'
diff_file = MetadataFile.open_file(None, 1, 'diff', old_time)
for cur_rorp, old_rorp in rorpiter.Collate2Iters(cur_iter, old_iter):
XXX
ManagerObj = None # Set this later to Manager instance
def SetManager():
global ManagerObj
ManagerObj = PatchDiffMan()
import eas_acls # put at bottom to avoid python circularity bug
......@@ -177,7 +177,7 @@ class MirrorStruct:
"""
if rest_time is None: rest_time = cls._rest_time
if not metadata.ManagerObj: metadata.SetManager()
metadata.SetManager()
rorp_iter = metadata.ManagerObj.GetAtTime(rest_time,
cls.mirror_base.index)
if not rorp_iter:
......
......@@ -116,9 +116,9 @@ def copy_reg_file(rpin, rpout, compress = 0):
try:
if (rpout.conn is rpin.conn and
rpout.conn is not Globals.local_connection):
rpout.conn.rpath.copy_reg_file(rpin.path, rpout.path, compress)
v = rpout.conn.rpath.copy_reg_file(rpin.path, rpout.path, compress)
rpout.setdata()
return
return v
except AttributeError: pass
return rpout.write_from_fileobj(rpin.open("rb"), compress = compress)
......
......@@ -6,7 +6,7 @@ class RemoteMirrorTest(unittest.TestCase):
"""Test mirroring"""
def setUp(self):
"""Start server"""
Log.setverbosity(3)
Log.setverbosity(5)
Globals.change_source_perms = 1
SetConnections.UpdateGlobal('checkpoint_interval', 3)
user_group.init_user_mapping()
......
......@@ -141,16 +141,16 @@ user.empty
# Now write records corresponding to above rps into file
Globals.rbdir = tempdir
Time.setcurtime(10000)
ExtendedAttributesFile.open_file()
man = metadata.PatchDiffMan()
writer = man.get_ea_writer('snapshot', 10000)
for rp in [self.ea_testdir1, rp1, rp2, rp3]:
ea = ExtendedAttributes(rp.index)
ea.read_from_rp(rp)
ExtendedAttributesFile.write_object(ea)
ExtendedAttributesFile.close_file()
writer.write_object(ea)
writer.close()
# Read back records and compare
ea_iter = ExtendedAttributesFile.get_objects_at_time(tempdir, 10000)
ea_iter = man.get_eas_at_time(10000, None)
assert ea_iter, "No extended_attributes.<time> file found"
sample_ea_reread = ea_iter.next()
assert sample_ea_reread == self.sample_ea
......@@ -357,22 +357,23 @@ other::---
def testIterate(self):
"""Test writing several records and then reading them back"""
self.make_backup_dirs()
self.make_temp()
rp1 = self.acl_testdir1.append('1')
rp2 = self.acl_testdir1.append('2')
rp3 = self.acl_testdir1.append('3')
# Now write records corresponding to above rps into file
Globals.rbdir = tempdir
Time.setcurtime(10000)
AccessControlListFile.open_file()
man = metadata.PatchDiffMan()
writer = man.get_acl_writer('snapshot', 10000)
for rp in [self.acl_testdir1, rp1, rp2, rp3]:
acl = AccessControlLists(rp.index)
acl.read_from_rp(rp)
AccessControlListFile.write_object(acl)
AccessControlListFile.close_file()
writer.write_object(acl)
writer.close()
# Read back records and compare
acl_iter = AccessControlListFile.get_objects_at_time(tempdir, 10000)
acl_iter = man.get_acls_at_time(10000, None)
assert acl_iter, "No acl file found"
dir_acl_reread = acl_iter.next()
assert dir_acl_reread == self.dir_acl
......
import unittest, StringIO
from rdiff_backup import hash, rpath, regress, restore, metadata
from rdiff_backup import hash
from commontest import *
class HashTest(unittest.TestCase):
......@@ -81,7 +81,7 @@ class HashTest(unittest.TestCase):
in_rp1, hashlist1, in_rp2, hashlist2 = self.make_dirs()
Myrm("testfiles/output")
rdiff_backup(1, 1, in_rp1.path, "testfiles/output", 10000, "-v5")
rdiff_backup(1, 1, in_rp1.path, "testfiles/output", 10000, "-v3")
meta_prefix = rpath.RPath(Globals.local_connection,
"testfiles/output/rdiff-backup-data/mirror_metadata")
incs = restore.get_inclist(meta_prefix)
......@@ -90,14 +90,53 @@ class HashTest(unittest.TestCase):
hashlist = self.extract_hashs(metadata_rp)
assert hashlist == hashlist1, (hashlist1, hashlist)
rdiff_backup(1, 1, in_rp2.path, "testfiles/output", 20000, "-v7")
rdiff_backup(1, 1, in_rp2.path, "testfiles/output", 20000, "-v3")
incs = restore.get_inclist(meta_prefix)
assert len(incs) == 2
metadata_rp.delete() # easy way to find the other one
incs = restore.get_inclist(meta_prefix)
assert len(incs) == 1
hashlist = self.extract_hashs(incs[0])
if incs[0].getinctype() == 'snapshot': inc = incs[0]
else: inc = incs[1]
hashlist = self.extract_hashs(inc)
assert hashlist == hashlist2, (hashlist2, hashlist)
def test_rorpiter_xfer(self):
"""Test if hashes are transferred in files, rorpiter"""
#log.Log.setverbosity(5)
Globals.security_level = 'override'
conn = SetConnections.init_connection('python ./server.py .')
assert conn.reval("lambda x: x+1", 4) == 5 # connection sanity check
fp = hash.FileWrapper(StringIO.StringIO(self.s1))
conn.Globals.set('tmp_file', fp)
fp_remote = conn.Globals.get('tmp_file')
assert fp_remote.read() == self.s1
assert fp_remote.close().sha1_digest == self.s1_hash
# Tested xfer of file, now test xfer of files in rorpiter
root = MakeOutputDir()
rp1 = root.append('s1')
rp1.write_string(self.s1)
rp2 = root.append('s2')
rp2.write_string(self.s2)
rp1.setfile(hash.FileWrapper(rp1.open('rb')))
rp2.setfile(hash.FileWrapper(rp2.open('rb')))
rpiter = iter([rp1, rp2])
conn.Globals.set('tmp_conn_iter', rpiter)
remote_iter = conn.Globals.get('tmp_conn_iter')
rorp1 = remote_iter.next()
fp = rorp1.open('rb')
assert fp.read() == self.s1, fp.read()
ret_val = fp.close()
assert isinstance(ret_val, hash.Report), ret_val
assert ret_val.sha1_digest == self.s1_hash
rorp2 = remote_iter.next()
fp2 = rorp1.open('rb')
assert fp2.close().sha1_digest == self.s2_hash
conn.quit()
from rdiff_backup import rpath, regress, restore, metadata, log, Globals
if __name__ == "__main__": unittest.main()
import unittest, os, cStringIO, time
from rdiff_backup.metadata import *
from rdiff_backup import rpath, connection, Globals, selection
from rdiff_backup import rpath, connection, Globals, selection, lazy
tempdir = rpath.RPath(Globals.local_connection, "testfiles/output")
......@@ -151,7 +151,9 @@ class MetadataTest(unittest.TestCase):
diff1 = [rp1, rp4]
diff2 = [rp1new, rp2, zero]
output = patch(iter(current), iter(diff1), iter(diff2))
Globals.rbdir = tempdir
output = PatchDiffMan().iterate_patched_meta(
[iter(current), iter(diff1), iter(diff2)])
out1 = output.next()
assert out1 is rp1new, out1
out2 = output.next()
......@@ -160,5 +162,57 @@ class MetadataTest(unittest.TestCase):
assert out3 is rp3, out3
self.assertRaises(StopIteration, output.next)
def test_meta_patch_cycle(self):
"""Create various metadata rorps, diff them, then compare"""
def write_dir_to_meta(manager, rp, time):
"""Record the metadata under rp to a mirror_metadata file"""
metawriter = man.get_meta_writer('snapshot', time)
for rorp in selection.Select(rp).set_iter():
metawriter.write_object(rorp)
metawriter.close()
def compare(man, rootrp, time):
assert lazy.Iter.equal(selection.Select(rootrp).set_iter(),
man.get_meta_at_time(time, None))
self.make_temp()
Globals.rbdir = tempdir
man = PatchDiffMan()
inc1 = rpath.RPath(Globals.local_connection, "testfiles/increment1")
inc2 = rpath.RPath(Globals.local_connection, "testfiles/increment2")
inc3 = rpath.RPath(Globals.local_connection, "testfiles/increment3")
inc4 = rpath.RPath(Globals.local_connection, "testfiles/increment4")
write_dir_to_meta(man, inc1, 10000)
compare(man, inc1, 10000)
write_dir_to_meta(man, inc2, 20000)
compare(man, inc2, 20000)
man.ConvertMetaToDiff()
man = PatchDiffMan()
write_dir_to_meta(man, inc3, 30000)
compare(man, inc3, 30000)
man.ConvertMetaToDiff()
man = PatchDiffMan()
man.max_diff_chain = 3
write_dir_to_meta(man, inc4, 40000)
compare(man, inc4, 40000)
man.ConvertMetaToDiff()
man = PatchDiffMan()
l = man.sorted_meta_inclist()
assert l[0].getinctype() == 'snapshot'
assert l[0].getinctime() == 40000
assert l[1].getinctype() == 'snapshot'
assert l[1].getinctime() == 30000
assert l[2].getinctype() == 'diff'
assert l[2].getinctime() == 20000
assert l[3].getinctype() == 'diff'
assert l[3].getinctime() == 10000
compare(man, inc1, 10000)
compare(man, inc2, 20000)
compare(man, inc3, 30000)
compare(man, inc4, 40000)
if __name__ == "__main__": unittest.main()
......@@ -2,7 +2,7 @@ import unittest
from commontest import *
from rdiff_backup import log, restore, Globals, rpath, TempFile
Log.setverbosity(5)
Log.setverbosity(3)
lc = Globals.local_connection
tempdir = rpath.RPath(Globals.local_connection, "testfiles/output")
restore_base_rp = rpath.RPath(Globals.local_connection,
......@@ -31,8 +31,8 @@ class RestoreFileComparer:
"""Restore file, make sure it is the same at time t"""
log.Log("Checking result at time %s" % (t,), 7)
tf = TempFile.new(tempdir.append("foo"))
restore._mirror_time = mirror_time
restore._rest_time = t
restore.MirrorStruct._mirror_time = mirror_time
restore.MirrorStruct._rest_time = t
self.rf.set_relevant_incs()
out_rorpath = self.rf.get_attribs().getRORPath()
correct_result = self.time_rp_dict[t]
......@@ -59,7 +59,7 @@ class RestoreTimeTest(unittest.TestCase):
rdiff-backup-data directory already being laid out.
"""
restore._mirror_time = None # Reset
restore.MirrorStruct._mirror_time = None # Reset
Globals.rbdir = rpath.RPath(lc,
"testfiles/restoretest3/rdiff-backup-data")
assert Time.genstrtotime("0B") == Time.time_from_session(0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment