Commit e7f82416 authored by Shane Hathaway's avatar Shane Hathaway

Merged shane-oid-length-branch.

The ZEO client cache now works with OIDs of length up to 65535 bytes.
Also added some sanity checks that check the size of the data being written,
since there are some hard limits.
parent bd8fbd11
......@@ -44,7 +44,9 @@ Each record has the following form:
offset in record: name -- description
0: oid -- 8-byte object id
0: oidlen -- 2-byte unsigned object id length
2: reserved (6 bytes)
8: status -- 1-byte status 'v': valid, 'n': non-version valid, 'i': invalid
('n' means only the non-version data in the record is valid)
......@@ -57,23 +59,25 @@ Each record has the following form:
19: serial -- 8-byte non-version serial (timestamp)
27: data -- non-version data
27: oid -- object id
27+oidlen: data -- non-version data
27+dlen: version -- Version string (if vlen > 0)
27+oidlen+dlen: version -- Version string (if vlen > 0)
27+dlen+vlen: vdlen -- 4-byte length of version data (if vlen > 0)
27+oidlen+dlen+vlen: vdlen -- 4-byte length of version data (if vlen > 0)
31+dlen+vlen: vdata -- version data (if vlen > 0)
31+oidlen+dlen+vlen: vdata -- version data (if vlen > 0)
31+dlen+vlen+vdlen: vserial -- 8-byte version serial (timestamp)
31+oidlen+dlen+vlen+vdlen: vserial -- 8-byte version serial (timestamp)
(if vlen > 0)
27+dlen (if vlen == 0) **or**
39+dlen+vlen+vdlen: tlen -- 4-byte (unsigned) record length (for
27+oidlen+dlen (if vlen == 0) **or**
39+oidlen+dlen+vlen+vdlen: tlen -- 4-byte (unsigned) record length (for
redundancy and backward traversal)
31+dlen (if vlen == 0) **or**
43+dlen+vlen+vdlen: -- total record length (equal to tlen)
31+oidlen+dlen (if vlen == 0) **or**
43+oidlen+dlen+vlen+vdlen: -- total record length (equal to tlen)
There is a cache size limit.
......@@ -111,12 +115,12 @@ import tempfile
from struct import pack, unpack
from thread import allocate_lock
from ZODB.utils import U64
from ZODB.utils import oid_repr
import zLOG
from ZEO.ICache import ICache
magic = 'ZEC1'
magic = 'ZEC2'
headersize = 12
MB = 1024**2
......@@ -293,15 +297,17 @@ class ClientCache:
f.seek(ap)
h = f.read(27)
if len(h) != 27:
self.log("invalidate: short record for oid %16x "
self.log("invalidate: short record for oid %s "
"at position %d in cache file %d"
% (U64(oid), ap, p < 0))
% (oid_repr(oid), ap, p < 0))
del self._index[oid]
return None
if h[:8] != oid:
self.log("invalidate: oid mismatch: expected %16x read %16x "
oidlen = unpack(">H", h[:2])[0]
rec_oid = f.read(oidlen)
if rec_oid != oid:
self.log("invalidate: oid mismatch: expected %s read %s "
"at position %d in cache file %d"
% (U64(oid), U64(h[:8]), ap, p < 0))
% (oid_repr(oid), oid_repr(rec_oid), ap, p < 0))
del self._index[oid]
return None
f.seek(ap+8) # Switch from reading to writing
......@@ -329,14 +335,16 @@ class ClientCache:
read = f.read
seek(ap)
h = read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
oidlen = unpack(">H", h[:2])[0]
rec_oid = read(oidlen)
if len(h)==27 and h[8] in 'nv' and rec_oid == oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
tlen = -1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
self.log("load: bad record for oid %16x "
self.log("load: bad record for oid %s "
"at position %d in cache file %d"
% (U64(oid), ap, p < 0))
% (oid_repr(oid), ap, p < 0))
del self._index[oid]
return None
......@@ -355,7 +363,8 @@ class ClientCache:
data = read(dlen)
self._trace(0x2A, oid, version, h[19:], dlen)
if (p < 0) != self._current:
self._copytocurrent(ap, tlen, dlen, vlen, h, data)
self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h,
oid, data)
return data, h[19:]
else:
self._trace(0x26, oid, version)
......@@ -367,12 +376,12 @@ class ClientCache:
v = vheader[:-4]
if version != v:
if dlen:
seek(ap+27)
seek(ap+27+oidlen)
data = read(dlen)
self._trace(0x2C, oid, version, h[19:], dlen)
if (p < 0) != self._current:
self._copytocurrent(ap, tlen, dlen, vlen, h,
data, vheader)
self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h,
oid, data, vheader)
return data, h[19:]
else:
self._trace(0x28, oid, version)
......@@ -383,13 +392,13 @@ class ClientCache:
vserial = read(8)
self._trace(0x2E, oid, version, vserial, vdlen)
if (p < 0) != self._current:
self._copytocurrent(ap, tlen, dlen, vlen, h,
None, vheader, vdata, vserial)
self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h,
oid, None, vheader, vdata, vserial)
return vdata, vserial
finally:
self._release()
def _copytocurrent(self, pos, tlen, dlen, vlen, header,
def _copytocurrent(self, pos, oidlen, tlen, dlen, vlen, header, oid,
data=None, vheader=None, vdata=None, vserial=None):
"""Copy a cache hit from the non-current file to the current file.
......@@ -403,26 +412,27 @@ class ClientCache:
if header[8] == 'n':
# Rewrite the header to drop the version data.
# This shortens the record.
tlen = 31 + dlen
tlen = 31 + oidlen + dlen
vlen = 0
# (oid:8, status:1, tlen:4, vlen:2, dlen:4, serial:8)
# (oidlen:2, reserved:6, status:1, tlen:4,
# vlen:2, dlen:4, serial:8)
header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
else:
assert header[8] == 'v'
f = self._f[not self._current]
if data is None:
f.seek(pos+27)
f.seek(pos+27+oidlen)
data = f.read(dlen)
if len(data) != dlen:
return
l = [header, data]
l = [header, oid, data]
if vlen:
assert vheader is not None
l.append(vheader)
assert (vdata is None) == (vserial is None)
if vdata is None:
vdlen = unpack(">I", vheader[-4:])[0]
f.seek(pos+27+dlen+vlen+4)
f.seek(pos+27+oidlen+dlen+vlen+4)
vdata = f.read(vdlen)
if len(vdata) != vdlen:
return
......@@ -438,13 +448,12 @@ class ClientCache:
g.seek(self._pos)
g.writelines(l)
assert g.tell() == self._pos + tlen
oid = header[:8]
if self._current:
self._index[oid] = - self._pos
else:
self._index[oid] = self._pos
self._pos += tlen
self._trace(0x6A, header[:8], vlen and vheader[:-4] or '',
self._trace(0x6A, oid, vlen and vheader[:-4] or '',
vlen and vserial or header[-8:], dlen)
def update(self, oid, serial, version, data):
......@@ -462,7 +471,9 @@ class ClientCache:
read = f.read
seek(ap)
h = read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
oidlen = unpack(">H", h[:2])[0]
rec_oid = read(oidlen)
if len(h) == 27 and h[8] in 'nv' and rec_oid == oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
return self._store(oid, '', '', version, data, serial)
......@@ -500,14 +511,16 @@ class ClientCache:
read = f.read
seek(ap)
h = read(27)
if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
oidlen = unpack(">H", h[:2])[0]
rec_oid = read(oidlen)
if len(h) == 27 and h[8] in 'nv' and rec_oid == oid:
tlen, vlen, dlen = unpack(">iHi", h[9:19])
else:
tlen = -1
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
self.log("modifiedInVersion: bad record for oid %16x "
self.log("modifiedInVersion: bad record for oid %s "
"at position %d in cache file %d"
% (U64(oid), ap, p < 0))
% (oid_repr(oid), ap, p < 0))
del self._index[oid]
return None
......@@ -579,7 +592,7 @@ class ClientCache:
if not s:
p = ''
s = '\0\0\0\0\0\0\0\0'
tlen = 31 + len(p)
tlen = 31 + len(oid) + len(p)
if version:
tlen = tlen + len(version) + 12 + len(pv)
vlen = len(version)
......@@ -588,7 +601,11 @@ class ClientCache:
stlen = pack(">I", tlen)
# accumulate various data to write into a list
l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
assert len(oid) < 2**16
assert vlen < 2**16
assert tlen < 2L**32
l = [pack(">H6x", len(oid)), 'v', stlen,
pack(">HI", vlen, len(p)), s, oid]
if p:
l.append(p)
if version:
......@@ -641,11 +658,11 @@ class ClientCache:
if version:
code |= 0x80
self._tracefile.write(
struct_pack(">ii8s8s",
struct_pack(">iiH8s",
time_time(),
(dlen+255) & 0x7fffff00 | code | self._current,
oid,
serial))
len(oid),
serial) + oid)
def read_index(self, serial, fileindex):
index = self._index
......@@ -672,7 +689,8 @@ class ClientCache:
self.rilog("invalid header data", pos, fileindex)
break
oid = h[:8]
oidlen = unpack(">H", h[:2])[0]
oid = read(oidlen)
if h[8] == 'v' and vlen:
seek(dlen+vlen, 1)
......@@ -681,7 +699,7 @@ class ClientCache:
self.rilog("truncated record", pos, fileindex)
break
vdlen = unpack(">i", vdlen)[0]
if vlen+dlen+43+vdlen != tlen:
if vlen + oidlen + dlen + 43 + vdlen != tlen:
self.rilog("inconsistent lengths", pos, fileindex)
break
seek(vdlen, 1)
......@@ -691,7 +709,7 @@ class ClientCache:
break
else:
if h[8] in 'vn' and vlen == 0:
if dlen+31 != tlen:
if oidlen + dlen + 31 != tlen:
self.rilog("inconsistent nv lengths", pos, fileindex)
seek(dlen, 1)
if read(4) != h[9:13]:
......
......@@ -35,8 +35,9 @@ Offset Size Contents
0 4 timestamp (seconds since 1/1/1970)
4 3 data size, in 256-byte increments, rounded up
7 1 code (see below)
8 8 object id
16 8 serial number
8 2 object id length
10 8 serial number
18 variable object id
The code at offset 7 packs three fields:
......@@ -56,6 +57,7 @@ import sys
import time
import getopt
import struct
from types import StringType
def usage(msg):
print >>sys.stderr, msg
......@@ -155,12 +157,16 @@ def main():
if not quiet:
print "Skipping 8 bytes at offset", offset-8
continue
r = f_read(16)
if len(r) < 16:
r = f_read(10)
if len(r) < 10:
break
offset += 16
offset += 10
records += 1
oid, serial = struct_unpack(">8s8s", r)
oidlen, serial = struct_unpack(">H8s", r)
oid = f_read(oidlen)
if len(oid) != oidlen:
break
offset += oidlen
if t0 is None:
t0 = ts
thisinterval = t0 / interval
......@@ -197,11 +203,11 @@ def main():
bysizew[dlen] = d = bysizew.get(dlen) or {}
d[oid] = d.get(oid, 0) + 1
if verbose:
print "%s %d %02x %016x %016x %1s %s" % (
print "%s %d %02x %s %016x %1s %s" % (
time.ctime(ts)[4:-5],
current,
code,
U64(oid),
oid_repr(oid),
U64(serial),
version,
dlen and str(dlen) or "")
......@@ -346,6 +352,12 @@ def U64(s):
h, v = struct.unpack(">II", s)
return (long(h) << 32) + v
def oid_repr(oid):
if isinstance(oid, StringType) and len(oid) == 8:
return '%16x' % U64(oid)
else:
return repr(oid)
def addcommas(n):
sign, s = '', str(n)
if s[0] == '-':
......
......@@ -27,6 +27,10 @@ from ZEO.ClientCache import ClientCache
class ClientCacheTests(unittest.TestCase):
_oid = 'abcdefgh'
_oid2 = 'bcdefghi'
_oid3 = 'cdefghij'
def setUp(self):
unittest.TestCase.setUp(self)
self.cachesize = 10*1000*1000
......@@ -42,7 +46,7 @@ class ClientCacheTests(unittest.TestCase):
def testStoreLoad(self):
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
......@@ -51,7 +55,7 @@ class ClientCacheTests(unittest.TestCase):
def testMissingLoad(self):
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
......@@ -60,7 +64,7 @@ class ClientCacheTests(unittest.TestCase):
def testInvalidate(self):
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
......@@ -72,7 +76,7 @@ class ClientCacheTests(unittest.TestCase):
def testVersion(self):
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = '1234'*100
serial = 'ABCDEFGH'
vname = 'myversion'
......@@ -86,7 +90,7 @@ class ClientCacheTests(unittest.TestCase):
def testVersionOnly(self):
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = ''
serial = ''
vname = 'myversion'
......@@ -100,7 +104,7 @@ class ClientCacheTests(unittest.TestCase):
def testInvalidateNonVersion(self):
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = '1234'*100
serial = 'ABCDEFGH'
vname = 'myversion'
......@@ -122,7 +126,7 @@ class ClientCacheTests(unittest.TestCase):
# Invalidating a version should not invalidate the non-version data.
# (This tests for the same bug as testInvalidatePersists below.)
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
......@@ -139,7 +143,7 @@ class ClientCacheTests(unittest.TestCase):
results.append((oid, serial, vserial))
cache.verify(verifier)
self.assertEqual(results, [])
oid = 'abcdefgh'
oid = self._oid
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
......@@ -151,12 +155,12 @@ class ClientCacheTests(unittest.TestCase):
# Make sure that cache._index[oid] is erased for oids that are
# stored in the cache file that's rewritten after a flip.
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
cache.checkSize(10*self.cachesize) # Force a file flip
oid2 = 'abcdefgz'
oid2 = self._oid2
data2 = '1234'*10
serial2 = 'ABCDEFGZ'
cache.store(oid2, data2, serial2, '', '', '')
......@@ -178,17 +182,17 @@ class ClientCacheTests(unittest.TestCase):
cache = self.cache
# Create some objects
oid1 = 'abcdefgh'
oid1 = self._oid
data1 = '1234' * 100
serial1 = 'ABCDEFGH'
oid2 = 'bcdefghi'
oid2 = self._oid2
data2 = '2345' * 200
serial2 = 'BCDEFGHI'
version2 = 'myversion'
nonversion = 'nada'
vdata2 = '5432' * 250
vserial2 = 'IHGFEDCB'
oid3 = 'cdefghij'
oid3 = self._oid3
data3 = '3456' * 300
serial3 = 'CDEFGHIJ'
......@@ -276,6 +280,8 @@ class ClientCacheTests(unittest.TestCase):
class PersistentClientCacheTests(unittest.TestCase):
_oid = 'abcdefgh'
def setUp(self):
unittest.TestCase.setUp(self)
self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
......@@ -323,13 +329,12 @@ class PersistentClientCacheTests(unittest.TestCase):
# 'current' file when a persistent cache was opened.
cache = self.cache
self.assertEqual(cache._current, 0) # Check that file 0 is current
oid = 'abcdefgh'
oid = self._oid
data = '1234'
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
cache.checkSize(10*self.cachesize) # Force a file flip
self.assertEqual(cache._current, 1) # Check that the flip worked
oid = 'abcdefgh'
data = '123'
serial = 'ABCDEFGZ'
cache.store(oid, data, serial, '', '', '')
......@@ -348,7 +353,7 @@ class PersistentClientCacheTests(unittest.TestCase):
cache = self.cache
magicsize = (ord('i') + 1) << 16
cache = self.cache
oid = 'abcdefgh'
oid = self._oid
data = '!'*magicsize
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
......@@ -367,7 +372,7 @@ class PersistentClientCacheTests(unittest.TestCase):
ltid = 'pqrstuvw'
cache.setLastTid(ltid)
self.assertEqual(cache.getLastTid(), ltid)
oid = 'abcdefgh'
oid = self._oid
data = '1234'
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
......@@ -381,10 +386,23 @@ class PersistentClientCacheTests(unittest.TestCase):
cache.checkSize(10*self.cachesize) # Force a file flip
self.failUnless(cache.getLastTid() is None)
class ClientCacheLongOIDTests(ClientCacheTests):
_oid = 'abcdefghijklmnop' * 2
_oid2 = 'bcdefghijklmnopq' * 2
_oid3 = 'cdefghijklmnopqr' * 2
class PersistentClientCacheLongOIDTests(PersistentClientCacheTests):
_oid = 'abcdefghijklmnop' * 2
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ClientCacheTests))
suite.addTest(unittest.makeSuite(ClientCacheLongOIDTests))
suite.addTest(unittest.makeSuite(PersistentClientCacheTests))
suite.addTest(unittest.makeSuite(PersistentClientCacheLongOIDTests))
return suite
if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment