Commit b7e88bab authored by Jeremy Hylton's avatar Jeremy Hylton

Merge MVCC branch to the HEAD.

parent 3e29b5b6
This diff is collapsed.
This diff is collapsed.
try:
from Interface import Base
except ImportError:
class Base:
# a dummy interface for use when Zope's is unavailable
pass
class ICache(Base):
"""ZEO client cache.
__init__(storage, size, client, var)
All arguments optional.
storage -- name of storage
size -- max size of cache in bytes
client -- a string; if specified, cache is persistent.
var -- var directory to store cache files in
"""
def open():
"""Returns a sequence of object info tuples.
An object info tuple is a pair containing an object id and a
pair of serialnos, a non-version serialno and a version serialno:
oid, (serial, ver_serial)
This method builds an index of the cache and returns a
sequence used for cache validation.
"""
def close():
"""Closes the cache."""
def verify(func):
"""Call func on every object in cache.
func is called with three arguments
func(oid, serial, ver_serial)
"""
def invalidate(oid, version):
"""Remove object from cache."""
def load(oid, version):
"""Load object from cache.
Return None if object not in cache.
Return data, serialno if object is in cache.
"""
def store(oid, p, s, version, pv, sv):
"""Store a new object in the cache."""
def update(oid, serial, version, data):
"""Update an object already in the cache.
XXX This method is called to update objects that were modified by
a transaction. It's likely that it is already in the cache,
and it may be possible for the implementation to operate more
efficiently.
"""
def modifiedInVersion(oid):
"""Return the version an object is modified in.
'' signifies the trunk.
Returns None if the object is not in the cache.
"""
def checkSize(size):
"""Check if adding size bytes would exceed cache limit.
This method is often called just before store or update. The
size is a hint about the amount of data that is about to be
stored. The cache may want to evict some data to make space.
"""
......@@ -13,6 +13,18 @@
##############################################################################
"""RPC stubs for interface exported by StorageServer."""
##
# ZEO storage server.
# <p>
# Remote method calls can be synchronous or asynchronous. If the call
# is synchronous, the client thread blocks until the call returns. A
# single client can only have one synchronous request outstanding. If
# several threads share a single client, threads other than the caller
# will block only if the attempt to make another synchronous call.
# An asynchronous call does not cause the client thread to block. An
# exception raised by an asynchronous method is logged on the server,
# but is not returned to the client.
class StorageServer:
"""An RPC stub class for the interface exported by ClientStorage.
......@@ -43,47 +55,174 @@ class StorageServer:
def extensionMethod(self, name):
return ExtensionMethodWrapper(self.rpc, name).call
##
# Register current connection with a storage and a mode.
# In effect, it is like an open call.
# @param storage_name a string naming the storage. This argument
# is primarily for backwards compatibility with servers
# that supported multiple storages.
# @param read_only boolean
# @exception ValueError unknown storage_name or already registered
# @exception ReadOnlyError storage is read-only and a read-write
# connectio was requested
def register(self, storage_name, read_only):
self.rpc.call('register', storage_name, read_only)
##
# Return dictionary of meta-data about the storage.
# @defreturn dict
def get_info(self):
return self.rpc.call('get_info')
##
# Check whether the server requires authentication. Returns
# the name of the protocol.
# @defreturn string
def getAuthProtocol(self):
return self.rpc.call('getAuthProtocol')
##
# Return id of the last committed transaction
# @defreturn string
def lastTransaction(self):
# Not in protocol version 2.0.0; see __init__()
return self.rpc.call('lastTransaction')
##
# Return invalidations for all transactions after tid.
# @param tid transaction id
# @defreturn 2-tuple, (tid, list)
# @return tuple containing the last committed transaction
# and a list of oids that were invalidated. Returns
# None and an empty list if the server does not have
# the list of oids available.
def getInvalidations(self, tid):
# Not in protocol version 2.0.0; see __init__()
return self.rpc.call('getInvalidations', tid)
##
# Check whether serial numbers s and sv are current for oid.
# If one or both of the serial numbers are not current, the
# server will make an asynchronous invalidateVerify() call.
# @param oid object id
# @param s serial number on non-version data
# @param sv serial number of version data or None
# @defreturn async
def zeoVerify(self, oid, s, sv):
self.rpc.callAsync('zeoVerify', oid, s, sv)
##
# Check whether current serial number is valid for oid and version.
# If the serial number is not current, the server will make an
# asynchronous invalidateVerify() call.
# @param oid object id
# @param version name of version for oid
# @param serial client's current serial number
# @defreturn async
def verify(self, oid, version, serial):
self.rpc.callAsync('verify', oid, version, serial)
##
# Signal to the server that cache verification is done.
# @defreturn async
def endZeoVerify(self):
self.rpc.callAsync('endZeoVerify')
##
# Generate a new set of oids.
# @param n number of new oids to return
# @defreturn list
# @return list of oids
def new_oids(self, n=None):
if n is None:
return self.rpc.call('new_oids')
else:
return self.rpc.call('new_oids', n)
##
# Pack the storage.
# @param t pack time
# @param wait optional, boolean. If true, the call will not
# return until the pack is complete.
def pack(self, t, wait=None):
if wait is None:
self.rpc.call('pack', t)
else:
self.rpc.call('pack', t, wait)
##
# Return current data for oid. Version data is returned if
# present.
# @param oid object id
# @defreturn 5-tuple
# @return 5-tuple, current non-version data, serial number,
# version name, version data, version data serial number
# @exception KeyError if oid is not found
def zeoLoad(self, oid):
return self.rpc.call('zeoLoad', oid)
##
# Return current data for oid along with tid if transaction that
# wrote the date.
# @param oid object id
# @param version string, name of version
# @defreturn 4-tuple
# @return data, serial number, transaction id, version,
# where version is the name of the version the data came
# from or "" for non-version data
# @exception KeyError if oid is not found
def loadEx(self, oid, version):
return self.rpc.call("loadEx", oid, version)
##
# Return non-current data along with transaction ids that identify
# the lifetime of the specific revision.
# @param oid object id
# @param tid a transaction id that provides an upper bound on
# the lifetime of the revision. That is, loadBefore
# returns the revision that was current before tid committed.
# @defreturn 4-tuple
# @return data, serial numbr, start transaction id, end transaction id
def loadBefore(self, oid, tid):
return self.rpc.call("loadBefore", oid, tid)
##
# Storage new revision of oid.
# @param oid object id
# @param serial serial number that this transaction read
# @param data new data record for oid
# @param version name of version or ""
# @param id id of current transaction
# @defreturn async
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
##
# Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The
# only purpose of this argument is to distinguish among multiple
# threads using a single ClientStorage.
# @param user name of user committing transaction (can be "")
# @param description string containing transaction metadata (can be "")
# @param ext dictionary of extended metadata (?)
# @param tid optional explicit tid to pass to underlying storage
# @param status optional status character, e.g "p" for pack
# @defreturn async
def tpc_begin(self, id, user, descr, ext, tid, status):
return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
......
......@@ -235,6 +235,14 @@ class ZEOStorage:
def getExtensionMethods(self):
return self._extensions
def loadEx(self, oid, version):
self.stats.loads += 1
return self.storage.loadEx(oid, version)
def loadBefore(self, oid, tid):
self.stats.loads += 1
return self.storage.loadBefore(oid, tid)
def zeoLoad(self, oid):
self.stats.loads += 1
v = self.storage.modifiedInVersion(oid)
......@@ -260,12 +268,26 @@ class ZEOStorage:
% (len(invlist), u64(invtid)))
return invtid, invlist
def verify(self, oid, version, tid):
try:
t = self.storage.getTid(oid)
except KeyError:
self.client.invalidateVerify((oid, ""))
else:
if tid != t:
# This will invalidate non-version data when the
# client only has invalid version data. Since this is
# an uncommon case, we avoid the cost of checking
# whether the serial number matches the current
# non-version data.
self.client.invalidateVerify((oid, version))
def zeoVerify(self, oid, s, sv):
if not self.verifying:
self.verifying = 1
self.stats.verifying_clients += 1
try:
os = self.storage.getSerial(oid)
os = self.storage.getTid(oid)
except KeyError:
self.client.invalidateVerify((oid, ''))
# XXX It's not clear what we should do now. The KeyError
......@@ -344,7 +366,7 @@ class ZEOStorage:
def undoLog(self, first, last):
return run_in_thread(self.storage.undoLog, first, last)
def tpc_begin(self, id, user, description, ext, tid, status):
def tpc_begin(self, id, user, description, ext, tid=None, status=" "):
if self.read_only:
raise ReadOnlyError()
if self.transaction is not None:
......@@ -521,25 +543,25 @@ class ZEOStorage:
return self.storage.tpc_vote(self.transaction)
def _abortVersion(self, src):
oids = self.storage.abortVersion(src, self.transaction)
tid, oids = self.storage.abortVersion(src, self.transaction)
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
return oids
return tid, oids
def _commitVersion(self, src, dest):
oids = self.storage.commitVersion(src, dest, self.transaction)
tid, oids = self.storage.commitVersion(src, dest, self.transaction)
inv = [(oid, dest) for oid in oids]
self.invalidated.extend(inv)
if dest:
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
return oids
return tid, oids
def _transactionalUndo(self, trans_id):
oids = self.storage.transactionalUndo(trans_id, self.transaction)
tid, oids = self.storage.transactionalUndo(trans_id, self.transaction)
inv = [(oid, None) for oid in oids]
self.invalidated.extend(inv)
return oids
return tid, oids
# When a delayed transaction is restarted, the dance is
# complicated. The restart occurs when one ZEOStorage instance
......@@ -854,6 +876,9 @@ class StorageServer:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, []
# XXX this is wrong! must check against tid or we invalidate
# too much.
oids = {}
for tid, L in self.invq:
for key in L:
......
This diff is collapsed.
......@@ -128,15 +128,21 @@ def main():
# Read file, gathering statistics, and printing each record if verbose
rt0 = time.time()
# bycode -- map code to count of occurrences
bycode = {}
# records -- number of records
records = 0
# version -- number of records with versions
versions = 0
t0 = te = None
# datarecords -- number of records with dlen set
datarecords = 0
datasize = 0L
file0 = file1 = 0
# oids -- maps oid to number of times it was loaded
oids = {}
# bysize -- maps data size to number of loads
bysize = {}
# bysize -- maps data size to number of writes
bysizew = {}
total_loads = 0
byinterval = {}
......@@ -157,12 +163,12 @@ def main():
if not quiet:
print "Skipping 8 bytes at offset", offset-8
continue
r = f_read(10)
r = f_read(18)
if len(r) < 10:
break
offset += 10
records += 1
oidlen, serial = struct_unpack(">H8s", r)
oidlen, start_tid, end_tid = struct_unpack(">H8s8s", r)
oid = f_read(oidlen)
if len(oid) != oidlen:
break
......@@ -187,11 +193,6 @@ def main():
if code & 0x80:
version = 'V'
versions += 1
current = code & 1
if current:
file1 += 1
else:
file0 += 1
code = code & 0x7e
bycode[code] = bycode.get(code, 0) + 1
byinterval[code] = byinterval.get(code, 0) + 1
......@@ -199,22 +200,23 @@ def main():
if code & 0x70 == 0x20: # All loads
bysize[dlen] = d = bysize.get(dlen) or {}
d[oid] = d.get(oid, 0) + 1
elif code == 0x3A: # Update
elif code & 0x70 == 0x50: # All stores
bysizew[dlen] = d = bysizew.get(dlen) or {}
d[oid] = d.get(oid, 0) + 1
if verbose:
print "%s %d %02x %s %016x %1s %s" % (
print "%s %d %02x %s %016x %016x %1s %s" % (
time.ctime(ts)[4:-5],
current,
code,
oid_repr(oid),
U64(serial),
U64(start_tid),
U64(end_tid),
version,
dlen and str(dlen) or "")
if code & 0x70 == 0x20:
oids[oid] = oids.get(oid, 0) + 1
total_loads += 1
if code in (0x00, 0x70):
if code == 0x00:
if not quiet:
dumpbyinterval(byinterval, h0, he)
byinterval = {}
......@@ -222,10 +224,7 @@ def main():
h0 = he = ts
if not quiet:
print time.ctime(ts)[4:-5],
if code == 0x00:
print '='*20, "Restart", '='*20
else:
print '-'*20, "Flip->%d" % current, '-'*20
except KeyboardInterrupt:
print "\nInterrupted. Stats so far:\n"
......@@ -248,8 +247,6 @@ def main():
print "First time: %s" % time.ctime(t0)
print "Last time: %s" % time.ctime(te)
print "Duration: %s seconds" % addcommas(te-t0)
print "File stats: %s in file 0; %s in file 1" % (
addcommas(file0), addcommas(file1))
print "Data recs: %s (%.1f%%), average size %.1f KB" % (
addcommas(datarecords),
100.0 * datarecords / records,
......@@ -314,7 +311,7 @@ def dumpbyinterval(byinterval, h0, he):
if code & 0x70 == 0x20:
n = byinterval[code]
loads += n
if code in (0x2A, 0x2C, 0x2E):
if code in (0x22, 0x26):
hits += n
if not loads:
return
......@@ -333,7 +330,7 @@ def hitrate(bycode):
if code & 0x70 == 0x20:
n = bycode[code]
loads += n
if code in (0x2A, 0x2C, 0x2E):
if code in (0x22, 0x26):
hits += n
if loads:
return 100.0 * hits / loads
......@@ -376,31 +373,18 @@ explain = {
0x00: "_setup_trace (initialization)",
0x10: "invalidate (miss)",
0x1A: "invalidate (hit, version, writing 'n')",
0x1C: "invalidate (hit, writing 'i')",
0x1A: "invalidate (hit, version)",
0x1C: "invalidate (hit, saving non-current)",
0x20: "load (miss)",
0x22: "load (miss, version, status 'n')",
0x24: "load (miss, deleting index entry)",
0x26: "load (miss, no non-version data)",
0x28: "load (miss, version mismatch, no non-version data)",
0x2A: "load (hit, returning non-version data)",
0x2C: "load (hit, version mismatch, returning non-version data)",
0x2E: "load (hit, returning version data)",
0x3A: "update",
0x40: "modifiedInVersion (miss)",
0x4A: "modifiedInVersion (hit, return None, status 'n')",
0x4C: "modifiedInVersion (hit, return '')",
0x4E: "modifiedInVersion (hit, return version)",
0x5A: "store (non-version data present)",
0x5C: "store (only version data present)",
0x22: "load (hit)",
0x24: "load (non-current, miss)",
0x26: "load (non-current, hit)",
0x6A: "_copytocurrent",
0x50: "store (version)",
0x52: "store (current, non-version)",
0x54: "store (non-current)",
0x70: "checkSize (cache flip)",
}
if __name__ == "__main__":
......
......@@ -42,7 +42,7 @@ class TransUndoStorageWithCache:
t.note('undo1')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
tid, oids = self._storage.transactionalUndo(tid, t)
# Make sure this doesn't load invalid data into the cache
self._storage.load(oid, '')
......
......@@ -71,7 +71,7 @@ class WorkerThread(TestThread):
# self.storage.tpc_vote(self.trans)
rpc = self.storage._server.rpc
msgid = rpc._deferred_call('vote', self.storage._serial)
msgid = rpc._deferred_call('vote', id(self.trans))
self.ready.set()
rpc._deferred_wait(msgid)
self.storage._check_serials()
......@@ -103,6 +103,51 @@ class CommitLockTests:
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
return oid, txn
def _begin_threads(self):
# Start a second transaction on a different connection without
# blocking the test thread. Returns only after each thread has
# set it's ready event.
self._storages = []
self._threads = []
for i in range(self.NUM_CLIENTS):
storage = self._duplicate_client()
txn = Transaction()
tid = self._get_timestamp()
t = WorkerThread(self, storage, txn)
self._threads.append(t)
t.start()
t.ready.wait()
# Close on the connections abnormally to test server response
if i == 0:
storage.close()
else:
self._storages.append((storage, txn))
def _finish_threads(self):
for t in self._threads:
t.cleanup()
def _duplicate_client(self):
"Open another ClientStorage to the same server."
# XXX argh it's hard to find the actual address
# The rpc mgr addr attribute is a list. Each element in the
# list is a socket domain (AF_INET, AF_UNIX, etc.) and an
# address.
addr = self._storage._addr
new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
new.registerDB(DummyDB(), None)
return new
def _get_timestamp(self):
t = time.time()
t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
return `t`
class CommitLockVoteTests(CommitLockTests):
def checkCommitLockVoteFinish(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
......@@ -141,15 +186,16 @@ class CommitLockTests:
self._finish_threads()
self._cleanup()
class CommitLockUndoTests(CommitLockTests):
def _get_trans_id(self):
self._dostore()
L = self._storage.undoInfo()
return L[0]['id']
def _begin_undo(self, trans_id):
def _begin_undo(self, trans_id, txn):
rpc = self._storage._server.rpc
return rpc._deferred_call('transactionalUndo', trans_id,
self._storage._serial)
return rpc._deferred_call('transactionalUndo', trans_id, id(txn))
def _finish_undo(self, msgid):
return self._storage._server.rpc._deferred_wait(msgid)
......@@ -157,7 +203,7 @@ class CommitLockTests:
def checkCommitLockUndoFinish(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
msgid = self._begin_undo(trans_id, txn)
self._begin_threads()
......@@ -174,7 +220,7 @@ class CommitLockTests:
def checkCommitLockUndoAbort(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
msgid = self._begin_undo(trans_id, txn)
self._begin_threads()
......@@ -190,7 +236,7 @@ class CommitLockTests:
def checkCommitLockUndoClose(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
msgid = self._begin_undo(trans_id, txn)
self._begin_threads()
......@@ -201,46 +247,3 @@ class CommitLockTests:
self._finish_threads()
self._cleanup()
def _begin_threads(self):
# Start a second transaction on a different connection without
# blocking the test thread. Returns only after each thread has
# set it's ready event.
self._storages = []
self._threads = []
for i in range(self.NUM_CLIENTS):
storage = self._duplicate_client()
txn = Transaction()
tid = self._get_timestamp()
t = WorkerThread(self, storage, txn)
self._threads.append(t)
t.start()
t.ready.wait()
# Close on the connections abnormally to test server response
if i == 0:
storage.close()
else:
self._storages.append((storage, txn))
def _finish_threads(self):
for t in self._threads:
t.cleanup()
def _duplicate_client(self):
"Open another ClientStorage to the same server."
# XXX argh it's hard to find the actual address
# The rpc mgr addr attribute is a list. Each element in the
# list is a socket domain (AF_INET, AF_UNIX, etc.) and an
# address.
addr = self._storage._addr
new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
new.registerDB(DummyDB(), None)
return new
def _get_timestamp(self):
t = time.time()
t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
return `t`
......@@ -109,7 +109,7 @@ class CommonSetupTearDown(StorageTestBase):
os.waitpid(pid, 0)
for c in self.caches:
for i in 0, 1:
path = "c1-%s-%d.zec" % (c, i)
path = "%s-%s.zec" % (c, "1")
# On Windows before 2.3, we don't have a way to wait for
# the spawned server(s) to close, and they inherited
# file descriptors for our open files. So long as those
......@@ -584,6 +584,9 @@ class InvqTests(CommonSetupTearDown):
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
# sync() is needed to prevent invalidation for oid from arriving
# in the middle of the load() call.
perstorage.sync()
perstorage.load(oid, '')
perstorage.close()
......@@ -853,7 +856,7 @@ class TimeoutTests(CommonSetupTearDown):
unless = self.failUnless
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty
unless(not storage._cache._index)
unless(not list(storage._cache.contents()))
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
......@@ -872,7 +875,7 @@ class TimeoutTests(CommonSetupTearDown):
# We expect finish to fail
raises(ClientDisconnected, storage.tpc_finish, t)
# The cache should still be empty
unless(not storage._cache._index)
unless(not list(storage._cache.contents()))
# Load should fail since the object should not be in either the cache
# or the server.
raises(KeyError, storage.load, oid, '')
......@@ -883,7 +886,7 @@ class TimeoutTests(CommonSetupTearDown):
unless = self.failUnless
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty
unless(not storage._cache._index)
unless(not list(storage._cache.contents()))
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
......
......@@ -39,7 +39,23 @@ from ZODB.POSException \
# thought they added (i.e., the keys for which get_transaction().commit()
# did not raise any exception).
class StressThread(TestThread):
class FailableThread(TestThread):
# mixin class
# subclass must provide
# - self.stop attribute (an event)
# - self._testrun() method
def testrun(self):
try:
self._testrun()
except:
# Report the failure here to all the other threads, so
# that they stop quickly.
self.stop.set()
raise
class StressThread(FailableThread):
# Append integers startnum, startnum + step, startnum + 2*step, ...
# to 'tree' until Event stop is set. If sleep is given, sleep
......@@ -57,7 +73,7 @@ class StressThread(TestThread):
self.added_keys = []
self.commitdict = commitdict
def testrun(self):
def _testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
......@@ -87,7 +103,7 @@ class StressThread(TestThread):
key += self.step
cn.close()
class LargeUpdatesThread(TestThread):
class LargeUpdatesThread(FailableThread):
# A thread that performs a lot of updates. It attempts to modify
# more than 25 objects so that it can test code that runs vote
......@@ -106,6 +122,15 @@ class LargeUpdatesThread(TestThread):
self.commitdict = commitdict
def testrun(self):
try:
self._testrun()
except:
# Report the failure here to all the other threads, so
# that they stop quickly.
self.stop.set()
raise
def _testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
......@@ -162,7 +187,7 @@ class LargeUpdatesThread(TestThread):
self.added_keys = keys_added.keys()
cn.close()
class VersionStressThread(TestThread):
class VersionStressThread(FailableThread):
def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
step=2, sleep=None):
......@@ -177,6 +202,15 @@ class VersionStressThread(TestThread):
self.commitdict = commitdict
def testrun(self):
try:
self._testrun()
except:
# Report the failure here to all the other threads, so
# that they stop quickly.
self.stop.set()
raise
def _testrun(self):
commit = 0
key = self.startnum
while not self.stop.isSet():
......@@ -302,7 +336,10 @@ class InvalidationTests:
delay = self.MINTIME
start = time.time()
while time.time() - start <= self.MAXTIME:
time.sleep(delay)
stop.wait(delay)
if stop.isSet():
# Some thread failed. Stop right now.
break
delay = 2.0
if len(commitdict) >= len(threads):
break
......@@ -406,6 +443,7 @@ class InvalidationTests:
t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.01)
t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.01)
## t1 = VersionStressThread(self, db2, stop, 3, cd, 1, 3, 0.01)
self.go(stop, cd, t1, t2, t3)
cn.sync()
......
......@@ -28,7 +28,6 @@ from ZEO.StorageServer import StorageServer
from ZEO.tests.ConnectionTests import CommonSetupTearDown
from ZODB.FileStorage import FileStorage
from ZODB.tests.StorageTestBase import removefs
class AuthTest(CommonSetupTearDown):
__super_getServerConfig = CommonSetupTearDown.getServerConfig
......
......@@ -101,20 +101,12 @@ class GenericTests(
StorageTestBase.StorageTestBase,
# ZODB test mixin classes (in the same order as imported)
BasicStorage.BasicStorage,
VersionStorage.VersionStorage,
TransactionalUndoStorage.TransactionalUndoStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
RevisionStorage.RevisionStorage,
MTStorage.MTStorage,
ReadOnlyStorage.ReadOnlyStorage,
# ZEO test mixin classes (in the same order as imported)
Cache.StorageWithCache,
Cache.TransUndoStorageWithCache,
CommitLockTests.CommitLockTests,
CommitLockTests.CommitLockVoteTests,
ThreadTests.ThreadTests,
# Locally defined (see above)
MiscZEOTests
......@@ -167,8 +159,22 @@ class GenericTests(
key = '%s:%s' % (self._storage._storage, self._storage._server_addr)
self.assertEqual(self._storage.sortKey(), key)
class FullGenericTests(
GenericTests,
Cache.StorageWithCache,
Cache.TransUndoStorageWithCache,
CommitLockTests.CommitLockUndoTests,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
PackableStorage.PackableUndoStorage,
RevisionStorage.RevisionStorage,
TransactionalUndoStorage.TransactionalUndoStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
VersionStorage.VersionStorage,
):
"""Extend GenericTests with tests that MappingStorage can't pass."""
class FileStorageTests(GenericTests):
class FileStorageTests(FullGenericTests):
"""Test ZEO backed by a FileStorage."""
level = 2
......@@ -180,7 +186,7 @@ class FileStorageTests(GenericTests):
</filestorage>
""" % filename
class BDBTests(FileStorageTests):
class BDBTests(FullGenericTests):
"""ZEO backed by a Berkeley full storage."""
level = 2
......@@ -192,67 +198,14 @@ class BDBTests(FileStorageTests):
</fullstorage>
""" % self._envdir
class MappingStorageTests(FileStorageTests):
class MappingStorageTests(GenericTests):
"""ZEO backed by a Mapping storage."""
def getConfig(self):
return """<mappingstorage 1/>"""
# Tests which MappingStorage can't possibly pass, because it doesn't
# support versions or undo.
def checkVersions(self): pass
def checkVersionedStoreAndLoad(self): pass
def checkVersionedLoadErrors(self): pass
def checkVersionLock(self): pass
def checkVersionEmpty(self): pass
def checkUndoUnresolvable(self): pass
def checkUndoInvalidation(self): pass
def checkUndoInVersion(self): pass
def checkUndoCreationBranch2(self): pass
def checkUndoCreationBranch1(self): pass
def checkUndoConflictResolution(self): pass
def checkUndoCommitVersion(self): pass
def checkUndoAbortVersion(self): pass
def checkPackUndoLog(self): pass
def checkUndoLogMetadata(self): pass
def checkTwoObjectUndoAtOnce(self): pass
def checkTwoObjectUndoAgain(self): pass
def checkTwoObjectUndo(self): pass
def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self): pass
def checkTransactionalUndoAfterPack(self): pass
def checkSimpleTransactionalUndo(self): pass
def checkReadMethods(self): pass
def checkPackAfterUndoDeletion(self): pass
def checkPackAfterUndoManyTimes(self): pass
def checkPackVersions(self): pass
def checkPackUnlinkedFromRoot(self): pass
def checkPackOnlyOneObject(self): pass
def checkPackJustOldRevisions(self): pass
def checkPackEmptyStorage(self): pass
def checkPackAllRevisions(self): pass
def checkPackVersionsInPast(self): pass
def checkPackVersionReachable(self): pass
def checkNotUndoable(self): pass
def checkNewSerialOnCommitVersionToVersion(self): pass
def checkModifyAfterAbortVersion(self): pass
def checkLoadSerial(self): pass
def checkCreateObjectInVersionWithAbort(self): pass
def checkCommitVersionSerialno(self): pass
def checkCommitVersionInvalidation(self): pass
def checkCommitToOtherVersion(self): pass
def checkCommitToNonVersion(self): pass
def checkCommitLockUndoFinish(self): pass
def checkCommitLockUndoClose(self): pass
def checkCommitLockUndoAbort(self): pass
def checkCommitEmptyVersionInvalidation(self): pass
def checkCreationUndoneGetSerial(self): pass
def checkAbortVersionSerialno(self): pass
def checkAbortVersionInvalidation(self): pass
def checkAbortVersionErrors(self): pass
def checkAbortVersion(self): pass
def checkAbortOneVersionCommitTheOther(self): pass
def checkResolve(self): pass
def check4ExtStorageThread(self): pass
# XXX There are still a bunch of tests that fail. Are there
# still test classes in GenericTests that shouldn't be there?
test_classes = [FileStorageTests, MappingStorageTests]
......
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Basic unit tests for a multi-version client cache."""
import os
import tempfile
import unittest
import ZEO.cache
from ZODB.utils import p64
n1 = p64(1)
n2 = p64(2)
n3 = p64(3)
n4 = p64(4)
n5 = p64(5)
class CacheTests(unittest.TestCase):
def setUp(self):
self.cache = ZEO.cache.ClientCache()
self.cache.open()
def tearDown(self):
if self.cache.path:
os.remove(self.cache.path)
def testLastTid(self):
self.assertEqual(self.cache.getLastTid(), None)
self.cache.setLastTid(n2)
self.assertEqual(self.cache.getLastTid(), n2)
self.cache.invalidate(None, "", n1)
self.assertEqual(self.cache.getLastTid(), n2)
self.cache.invalidate(None, "", n3)
self.assertEqual(self.cache.getLastTid(), n3)
self.assertRaises(ValueError, self.cache.setLastTid, n2)
def testLoad(self):
data1 = "data for n1"
self.assertEqual(self.cache.load(n1, ""), None)
self.assertEqual(self.cache.load(n1, "version"), None)
self.cache.store(n1, "", n3, None, data1)
self.assertEqual(self.cache.load(n1, ""), (data1, n3, ""))
# The cache doesn't know whether version exists, because it
# only has non-version data.
self.assertEqual(self.cache.load(n1, "version"), None)
self.assertEqual(self.cache.modifiedInVersion(n1), None)
def testInvalidate(self):
data1 = "data for n1"
self.cache.store(n1, "", n3, None, data1)
self.cache.invalidate(n1, "", n4)
self.cache.invalidate(n2, "", n2)
self.assertEqual(self.cache.load(n1, ""), None)
self.assertEqual(self.cache.loadBefore(n1, n4),
(data1, n3, n4))
def testVersion(self):
data1 = "data for n1"
data1v = "data for n1 in version"
self.cache.store(n1, "version", n3, None, data1v)
self.assertEqual(self.cache.load(n1, ""), None)
self.assertEqual(self.cache.load(n1, "version"),
(data1v, n3, "version"))
self.assertEqual(self.cache.load(n1, "random"), None)
self.assertEqual(self.cache.modifiedInVersion(n1), "version")
self.cache.invalidate(n1, "version", n4)
self.assertEqual(self.cache.load(n1, "version"), None)
def testNonCurrent(self):
data1 = "data for n1"
data2 = "data for n2"
self.cache.store(n1, "", n4, None, data1)
self.cache.store(n1, "", n2, n3, data2)
# can't say anything about state before n2
self.assertEqual(self.cache.loadBefore(n1, n2), None)
# n3 is the upper bound of non-current record n2
self.assertEqual(self.cache.loadBefore(n1, n3), (data2, n2, n3))
# no data for between n2 and n3
self.assertEqual(self.cache.loadBefore(n1, n4), None)
self.cache.invalidate(n1, "", n5)
self.assertEqual(self.cache.loadBefore(n1, n5), (data1, n4, n5))
self.assertEqual(self.cache.loadBefore(n2, n4), None)
def testException(self):
self.assertRaises(ValueError,
self.cache.store,
n1, "version", n2, n3, "data")
self.cache.store(n1, "", n2, None, "data")
self.assertRaises(ValueError,
self.cache.store,
n1, "", n3, None, "data")
def testEviction(self):
# Manually override the current maxsize
maxsize = self.cache.size = self.cache.fc.maxsize = 3395 # 1245
self.cache.fc = ZEO.cache.FileCache(3395, None, self.cache)
# Trivial test of eviction code. Doesn't test non-current
# eviction.
data = ["z" * i for i in range(100)]
for i in range(50):
n = p64(i)
self.cache.store(n, "", n, None, data[i])
self.assertEquals(len(self.cache), i + 1)
self.assert_(self.cache.fc.currentsize < maxsize)
# The cache now uses 1225 bytes. The next insert
# should delete some objects.
n = p64(50)
self.cache.store(n, "", n, None, data[51])
self.assert_(len(self.cache) < 51)
self.assert_(self.cache.fc.currentsize <= maxsize)
# XXX Need to make sure eviction of non-current data
# and of version data are handled correctly.
def testSerialization(self):
self.cache.store(n1, "", n2, None, "data for n1")
self.cache.store(n2, "version", n2, None, "version data for n2")
self.cache.store(n3, "", n3, n4, "non-current data for n3")
self.cache.store(n3, "", n4, n5, "more non-current data for n3")
path = tempfile.mktemp()
# Copy data from self.cache into path, reaching into the cache
# guts to make the copy.
dst = open(path, "wb+")
src = self.cache.fc.f
src.seek(0)
dst.write(src.read(self.cache.fc.maxsize))
dst.close()
copy = ZEO.cache.ClientCache(path)
copy.open()
# Verify that internals of both objects are the same.
# Could also test that external API produces the same results.
eq = self.assertEqual
eq(copy.tid, self.cache.tid)
eq(len(copy), len(self.cache))
eq(copy.version, self.cache.version)
eq(copy.current, self.cache.current)
eq(copy.noncurrent, self.cache.noncurrent)
def test_suite():
return unittest.makeSuite(CacheTests)
......@@ -13,7 +13,7 @@
##############################################################################
"""Handy standard storage machinery
$Id: BaseStorage.py,v 1.38 2003/12/23 14:37:13 jeremy Exp $
$Id: BaseStorage.py,v 1.39 2003/12/24 16:02:00 jeremy Exp $
"""
import cPickle
import threading
......@@ -32,7 +32,6 @@ from ZODB.utils import z64
class BaseStorage(UndoLogCompatible):
_transaction=None # Transaction that is being committed
_serial=z64 # Transaction serial number
_tstatus=' ' # Transaction status, used for copying data
_is_read_only = 0
......@@ -51,7 +50,7 @@ class BaseStorage(UndoLogCompatible):
t=time.time()
t=self._ts=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._serial=`t`
self._tid = `t`
if base is None:
self._oid='\0\0\0\0\0\0\0\0'
else:
......@@ -60,16 +59,19 @@ class BaseStorage(UndoLogCompatible):
def abortVersion(self, src, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
return []
return self._tid, []
def commitVersion(self, src, dest, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
return []
return self._tid, []
def close(self):
pass
def cleanup(self):
pass
def sortKey(self):
"""Return a string that can be used to sort storage instances.
......@@ -85,7 +87,7 @@ class BaseStorage(UndoLogCompatible):
def getSize(self):
return len(self)*300 # WAG!
def history(self, oid, version, length=1):
def history(self, oid, version, length=1, filter=None):
pass
def modifiedInVersion(self, oid):
......@@ -167,13 +169,13 @@ class BaseStorage(UndoLogCompatible):
now = time.time()
t = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
self._ts = t = t.laterThan(self._ts)
self._serial = `t`
self._tid = `t`
else:
self._ts = TimeStamp(tid)
self._serial = tid
self._tid = tid
self._tstatus = status
self._begin(self._serial, user, desc, ext)
self._begin(self._tid, user, desc, ext)
finally:
self._lock_release()
......@@ -203,10 +205,11 @@ class BaseStorage(UndoLogCompatible):
return
try:
if f is not None:
f()
f(self._tid)
u, d, e = self._ude
self._finish(self._serial, u, d, e)
self._finish(self._tid, u, d, e)
self._clear_temp()
return self._tid
finally:
self._ude = None
self._transaction = None
......@@ -250,6 +253,48 @@ class BaseStorage(UndoLogCompatible):
raise POSException.Unsupported, (
"Retrieval of historical revisions is not supported")
def loadBefore(self, oid, tid):
"""Return most recent revision of oid before tid committed."""
# XXX Is it okay for loadBefore() to return current data?
# There doesn't seem to be a good reason to forbid it, even
# though the typical use of this method will never find
# current data. But maybe we should call it loadByTid()?
n = 2
start_time = None
end_time = None
while start_time is None:
# The history() approach is a hack, because the dict
# returned by history() doesn't contain a tid. It
# contains a serialno, which is often the same, but isn't
# required to be. We'll pretend it is for now.
# A second problem is that history() doesn't say anything
# about whether the transaction status. If it falls before
# the pack time, we can't honor the MVCC request.
# Note: history() returns the most recent record first.
# XXX The filter argument to history() only appears to be
# supported by FileStorage. Perhaps it shouldn't be used.
L = self.history(oid, "", n, lambda d: not d["version"])
if not L:
return
for d in L:
if d["serial"] < tid:
start_time = d["serial"]
break
else:
end_time = d["serial"]
if len(L) < n:
break
n *= 2
if start_time is None:
return None
data = self.loadSerial(oid, start_time)
return data, start_time, end_time
def getExtensionMethods(self):
"""getExtensionMethods
......@@ -314,7 +359,7 @@ class BaseStorage(UndoLogCompatible):
oid=r.oid
if verbose: print oid_repr(oid), r.version, len(r.data)
if restoring:
self.restore(oid, r.serial, r.data, r.version,
self.restore(oid, r.tid, r.data, r.version,
r.data_txn, transaction)
else:
pre=preget(oid, None)
......
This diff is collapsed.
......@@ -13,8 +13,8 @@
##############################################################################
"""Database objects
$Id: DB.py,v 1.57 2003/11/28 16:44:49 jim Exp $"""
__version__='$Revision: 1.57 $'[11:-2]
$Id: DB.py,v 1.58 2003/12/24 16:02:00 jeremy Exp $"""
__version__='$Revision: 1.58 $'[11:-2]
import cPickle, cStringIO, sys, POSException, UndoLogCompatible
from Connection import Connection
......@@ -74,7 +74,7 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
self._version_cache_size=version_cache_size
self._version_cache_deactivate_after = version_cache_deactivate_after
self._miv_cache={}
self._miv_cache = {}
# Setup storage
self._storage=storage
......@@ -300,8 +300,7 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
def importFile(self, file):
raise NotImplementedError
def invalidate(self, oids, connection=None, version='',
rc=sys.getrefcount):
def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
......@@ -323,21 +322,21 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
for cc in allocated:
if (cc is not connection and
(not version or cc._version==version)):
if rc(cc) <= 3:
if sys.getrefcount(cc) <= 3:
cc.close()
cc.invalidate(oids)
cc.invalidate(tid, oids)
temps=self._temps
if temps:
if self._temps:
t=[]
for cc in temps:
if rc(cc) > 3:
for cc in self._temps:
if sys.getrefcount(cc) > 3:
if (cc is not connection and
(not version or cc._version==version)):
cc.invalidate(oids)
(not version or cc._version == version)):
cc.invalidate(tid, oids)
t.append(cc)
else: cc.close()
self._temps=t
else:
cc.close()
self._temps = t
def modifiedInVersion(self, oid):
h=hash(oid)%131
......@@ -353,7 +352,7 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None,
waitflag=1):
waitflag=1, mvcc=True):
"""Return a object space (AKA connection) to work in
The optional version argument can be used to specify that a
......@@ -371,25 +370,25 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
try:
if transaction is not None:
connections=transaction._connections
connections = transaction._connections
if connections:
if connections.has_key(version) and not temporary:
return connections[version]
else:
transaction._connections=connections={}
transaction=transaction._connections
transaction._connections = connections = {}
transaction = transaction._connections
if temporary:
# This is a temporary connection.
# We won't bother with the pools. This will be
# a one-use connection.
c=self.klass(
version=version,
cache_size=self._version_cache_size)
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc)
c._setDB(self)
self._temps.append(c)
if transaction is not None: transaction[id(c)]=c
if transaction is not None:
transaction[id(c)] = c
return c
......@@ -430,18 +429,18 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
if not pool:
c=None
c = None
if version:
if self._version_pool_size > len(allocated) or force:
c=self.klass(
version=version,
cache_size=self._version_cache_size)
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc)
allocated.append(c)
pool.append(c)
elif self._pool_size > len(allocated) or force:
c=self.klass(
version=version,
cache_size=self._cache_size)
c = self.klass(version=version,
cache_size=self._cache_size,
mvcc=mvcc)
allocated.append(c)
pool.append(c)
......@@ -456,7 +455,7 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
pool_lock.release()
else: return
elif len(pool)==1:
elif len(pool) == 1:
# Taking last one, lock the pool
# Note that another thread might grab the lock
# before us, so we might actually block, however,
......@@ -470,14 +469,15 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
# but it could be higher due to a race condition.
pool_lock.release()
c=pool[-1]
c = pool[-1]
del pool[-1]
c._setDB(self)
for pool, allocated in pooll:
for cc in pool:
cc._incrgc()
if transaction is not None: transaction[version]=c
if transaction is not None:
transaction[version] = c
return c
finally: self._r()
......@@ -588,7 +588,8 @@ class DB(UndoLogCompatible.UndoLogCompatible, object):
d = {}
for oid in storage.undo(id):
d[oid] = 1
self.invalidate(d)
# XXX I think we need to remove old undo to use mvcc
self.invalidate(None, d)
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
......@@ -616,13 +617,13 @@ class CommitVersion:
def commit(self, reallyme, t):
dest=self._dest
oids = self._db._storage.commitVersion(self._version, dest, t)
tid, oids = self._db._storage.commitVersion(self._version, dest, t)
oids = list2dict(oids)
self._db.invalidate(oids, version=dest)
self._db.invalidate(tid, oids, version=dest)
if dest:
# the code above just invalidated the dest version.
# now we need to invalidate the source!
self._db.invalidate(oids, version=self._version)
self._db.invalidate(tid, oids, version=self._version)
class AbortVersion(CommitVersion):
"""An object that will see to version abortion
......@@ -631,9 +632,9 @@ class AbortVersion(CommitVersion):
"""
def commit(self, reallyme, t):
version=self._version
oids = self._db._storage.abortVersion(version, t)
self._db.invalidate(list2dict(oids), version=version)
version = self._version
tid, oids = self._db._storage.abortVersion(version, t)
self._db.invalidate(tid, list2dict(oids), version=version)
class TransactionalUndo(CommitVersion):
......@@ -647,5 +648,5 @@ class TransactionalUndo(CommitVersion):
# similarity of rhythm that I think it's justified.
def commit(self, reallyme, t):
oids = self._db._storage.transactionalUndo(self._version, t)
self._db.invalidate(list2dict(oids))
tid, oids = self._db._storage.transactionalUndo(self._version, t)
self._db.invalidate(tid, list2dict(oids))
This diff is collapsed.
This diff is collapsed.
# this is a package
from ZODB.FileStorage.FileStorage \
import FileStorage, RecordIterator, FileIterator, packed_version
This diff is collapsed.
from ZODB.FileStorage import FileIterator
from ZODB.FileStorage.format \
import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR, DATA_HDR_LEN
from ZODB.TimeStamp import TimeStamp
from ZODB.utils import u64
from ZODB.tests.StorageTestBase import zodb_unpickle
from cPickle import Unpickler
from cStringIO import StringIO
import md5
import struct
import types
def get_pickle_metadata(data):
# ZODB's data records contain two pickles. The first is the class
# of the object, the second is the object.
if data.startswith('(c'):
# Don't actually unpickle a class, because it will attempt to
# load the class. Just break open the pickle and get the
# module and class from it.
modname, classname, rest = data.split('\n', 2)
modname = modname[2:]
return modname, classname
f = StringIO(data)
u = Unpickler(f)
try:
class_info = u.load()
except Exception, err:
print "Error", err
return '', ''
if isinstance(class_info, types.TupleType):
if isinstance(class_info[0], types.TupleType):
modname, classname = class_info[0]
else:
modname, classname = class_info
else:
# XXX not sure what to do here
modname = repr(class_info)
classname = ''
return modname, classname
def fsdump(path, file=None, with_offset=1):
i = 0
iter = FileIterator(path)
for trans in iter:
if with_offset:
print >> file, "Trans #%05d tid=%016x time=%s offset=%d" % \
(i, u64(trans.tid), str(TimeStamp(trans.tid)), trans._pos)
else:
print >> file, "Trans #%05d tid=%016x time=%s" % \
(i, u64(trans.tid), str(TimeStamp(trans.tid)))
print >> file, "\tstatus=%s user=%s description=%s" % \
(`trans.status`, trans.user, trans.description)
j = 0
for rec in trans:
if rec.data is None:
fullclass = "undo or abort of object creation"
else:
modname, classname = get_pickle_metadata(rec.data)
dig = md5.new(rec.data).hexdigest()
fullclass = "%s.%s" % (modname, classname)
# special case for testing purposes
if fullclass == "ZODB.tests.MinPO.MinPO":
obj = zodb_unpickle(rec.data)
fullclass = "%s %s" % (fullclass, obj.value)
if rec.version:
version = "version=%s " % rec.version
else:
version = ''
if rec.data_txn:
# XXX It would be nice to print the transaction number
# (i) but it would be too expensive to keep track of.
bp = "bp=%016x" % u64(rec.data_txn)
else:
bp = ""
print >> file, " data #%05d oid=%016x %sclass=%s %s" % \
(j, u64(rec.oid), version, fullclass, bp)
j += 1
print >> file
i += 1
iter.close()
def fmt(p64):
# Return a nicely formatted string for a packaged 64-bit value
return "%016x" % u64(p64)
class Dumper:
"""A very verbose dumper for debuggin FileStorage problems."""
# XXX Should revise this class to use FileStorageFormatter.
def __init__(self, path, dest=None):
self.file = open(path, "rb")
self.dest = dest
def dump(self):
fid = self.file.read(4)
print >> self.dest, "*" * 60
print >> self.dest, "file identifier: %r" % fid
while self.dump_txn():
pass
def dump_txn(self):
pos = self.file.tell()
h = self.file.read(TRANS_HDR_LEN)
if not h:
return False
tid, tlen, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
end = pos + tlen
print >> self.dest, "=" * 60
print >> self.dest, "offset: %d" % pos
print >> self.dest, "end pos: %d" % end
print >> self.dest, "transaction id: %s" % fmt(tid)
print >> self.dest, "trec len: %d" % tlen
print >> self.dest, "status: %r" % status
user = descr = extra = ""
if ul:
user = self.file.read(ul)
if dl:
descr = self.file.read(dl)
if el:
extra = self.file.read(el)
print >> self.dest, "user: %r" % user
print >> self.dest, "description: %r" % descr
print >> self.dest, "len(extra): %d" % el
while self.file.tell() < end:
self.dump_data(pos)
stlen = self.file.read(8)
print >> self.dest, "redundant trec len: %d" % u64(stlen)
return 1
def dump_data(self, tloc):
pos = self.file.tell()
h = self.file.read(DATA_HDR_LEN)
assert len(h) == DATA_HDR_LEN
oid, revid, prev, tloc, vlen, dlen = struct.unpack(DATA_HDR, h)
print >> self.dest, "-" * 60
print >> self.dest, "offset: %d" % pos
print >> self.dest, "oid: %s" % fmt(oid)
print >> self.dest, "revid: %s" % fmt(revid)
print >> self.dest, "previous record offset: %d" % prev
print >> self.dest, "transaction offset: %d" % tloc
if vlen:
pnv = self.file.read(8)
sprevdata = self.file.read(8)
version = self.file.read(vlen)
print >> self.dest, "version: %r" % version
print >> self.dest, "non-version data offset: %d" % u64(pnv)
print >> self.dest, \
"previous version data offset: %d" % u64(sprevdata)
print >> self.dest, "len(data): %d" % dlen
self.file.read(dlen)
if not dlen:
sbp = self.file.read(8)
print >> self.dest, "backpointer: %d" % u64(sbp)
This diff is collapsed.
......@@ -21,7 +21,7 @@ It is meant to illustrate the simplest possible storage.
The Mapping storage uses a single data structure to map object ids to data.
"""
__version__='$Revision: 1.10 $'[11:-2]
__version__='$Revision: 1.11 $'[11:-2]
from ZODB import utils
from ZODB import BaseStorage
......@@ -58,6 +58,16 @@ class MappingStorage(BaseStorage.BaseStorage):
finally:
self._lock_release()
def loadEx(self, oid, version):
self._lock_acquire()
try:
# Since this storage doesn't support versions, tid and
# serial will always be the same.
p = self._index[oid]
return p[8:], p[:8], "" # pickle, serial, tid
finally:
self._lock_release()
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
......@@ -75,11 +85,10 @@ class MappingStorage(BaseStorage.BaseStorage):
serials=(oserial, serial),
data=data)
serial = self._serial
self._tindex.append((oid, serial+data))
self._tindex.append((oid, self._tid + data))
finally:
self._lock_release()
return serial
return self._tid
def _clear_temp(self):
self._tindex = []
......@@ -87,7 +96,7 @@ class MappingStorage(BaseStorage.BaseStorage):
def _finish(self, tid, user, desc, ext):
for oid, p in self._tindex:
self._index[oid] = p
self._ltid = self._serial
self._ltid = self._tid
def lastTransaction(self):
return self._ltid
......@@ -95,6 +104,8 @@ class MappingStorage(BaseStorage.BaseStorage):
def pack(self, t, referencesf):
self._lock_acquire()
try:
if not self._index:
return
# Build an index of *only* those objects reachable from the root.
rootl = ['\0\0\0\0\0\0\0\0']
pindex = {}
......
......@@ -85,7 +85,6 @@ class BasicStorage:
eq(value, MinPO(11))
eq(revid, newrevid)
## def checkNonVersionStore(self, oid=None, revid=None, version=None):
def checkNonVersionStore(self):
revid = ZERO
newrevid = self._dostore(revid=None)
......
......@@ -20,7 +20,7 @@ import tempfile
import unittest
import ZODB, ZODB.FileStorage
from StorageTestBase import StorageTestBase, removefs
from StorageTestBase import StorageTestBase
class FileStorageCorruptTests(StorageTestBase):
......@@ -30,7 +30,7 @@ class FileStorageCorruptTests(StorageTestBase):
def tearDown(self):
self._storage.close()
removefs(self.path)
self._storage.cleanup()
def _do_stores(self):
oids = []
......
......@@ -36,40 +36,40 @@ class HistoryStorage:
h = self._storage.history(oid, size=1)
eq(len(h), 1)
d = h[0]
eq(d['serial'], revid3)
eq(d['tid'], revid3)
eq(d['version'], '')
# Try to get 2 historical revisions
h = self._storage.history(oid, size=2)
eq(len(h), 2)
d = h[0]
eq(d['serial'], revid3)
eq(d['tid'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['tid'], revid2)
eq(d['version'], '')
# Try to get all 3 historical revisions
h = self._storage.history(oid, size=3)
eq(len(h), 3)
d = h[0]
eq(d['serial'], revid3)
eq(d['tid'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['tid'], revid2)
eq(d['version'], '')
d = h[2]
eq(d['serial'], revid1)
eq(d['tid'], revid1)
eq(d['version'], '')
# There should be no more than 3 revisions
h = self._storage.history(oid, size=4)
eq(len(h), 3)
d = h[0]
eq(d['serial'], revid3)
eq(d['tid'], revid3)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid2)
eq(d['tid'], revid2)
eq(d['version'], '')
d = h[2]
eq(d['serial'], revid1)
eq(d['tid'], revid1)
eq(d['version'], '')
def checkVersionHistory(self):
......@@ -94,22 +94,22 @@ class HistoryStorage:
h = self._storage.history(oid, version, 100)
eq(len(h), 6)
d = h[0]
eq(d['serial'], revid6)
eq(d['tid'], revid6)
eq(d['version'], version)
d = h[1]
eq(d['serial'], revid5)
eq(d['tid'], revid5)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid4)
eq(d['tid'], revid4)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid3)
eq(d['tid'], revid3)
eq(d['version'], '')
d = h[4]
eq(d['serial'], revid2)
eq(d['tid'], revid2)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid1)
eq(d['tid'], revid1)
eq(d['version'], '')
def checkHistoryAfterVersionCommit(self):
......@@ -151,25 +151,25 @@ class HistoryStorage:
h = self._storage.history(oid, version, 100)
eq(len(h), 7)
d = h[0]
eq(d['serial'], revid7)
eq(d['tid'], revid7)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid6)
eq(d['tid'], revid6)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid5)
eq(d['tid'], revid5)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid4)
eq(d['tid'], revid4)
eq(d['version'], version)
d = h[4]
eq(d['serial'], revid3)
eq(d['tid'], revid3)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid2)
eq(d['tid'], revid2)
eq(d['version'], '')
d = h[6]
eq(d['serial'], revid1)
eq(d['tid'], revid1)
eq(d['version'], '')
def checkHistoryAfterVersionAbort(self):
......@@ -211,23 +211,23 @@ class HistoryStorage:
h = self._storage.history(oid, version, 100)
eq(len(h), 7)
d = h[0]
eq(d['serial'], revid7)
eq(d['tid'], revid7)
eq(d['version'], '')
d = h[1]
eq(d['serial'], revid6)
eq(d['tid'], revid6)
eq(d['version'], version)
d = h[2]
eq(d['serial'], revid5)
eq(d['tid'], revid5)
eq(d['version'], version)
d = h[3]
eq(d['serial'], revid4)
eq(d['tid'], revid4)
eq(d['version'], version)
d = h[4]
eq(d['serial'], revid3)
eq(d['tid'], revid3)
eq(d['version'], '')
d = h[5]
eq(d['serial'], revid2)
eq(d['tid'], revid2)
eq(d['version'], '')
d = h[6]
eq(d['serial'], revid1)
eq(d['tid'], revid1)
eq(d['version'], '')
......@@ -33,7 +33,7 @@ class IteratorCompare:
eq(reciter.tid, revid)
for rec in reciter:
eq(rec.oid, oid)
eq(rec.serial, revid)
eq(rec.tid, revid)
eq(rec.version, '')
eq(zodb_unpickle(rec.data), MinPO(val))
val = val + 1
......@@ -147,6 +147,20 @@ class IteratorStorage(IteratorCompare):
finally:
self._storage.tpc_finish(t)
def checkLoadEx(self):
oid = self._storage.new_oid()
self._dostore(oid, data=42)
data, tid, ver = self._storage.loadEx(oid, "")
self.assertEqual(zodb_unpickle(data), MinPO(42))
match = False
for txn in self._storage.iterator():
for rec in txn:
if rec.oid == oid and rec.tid == tid:
self.assertEqual(txn.tid, tid)
match = True
if not match:
self.fail("Could not find transaction with matching id")
class ExtendedIteratorStorage(IteratorCompare):
......@@ -202,7 +216,7 @@ class IteratorDeepCompare:
eq(txn1._extension, txn2._extension)
for rec1, rec2 in zip(txn1, txn2):
eq(rec1.oid, rec2.oid)
eq(rec1.serial, rec2.serial)
eq(rec1.tid, rec2.tid)
eq(rec1.version, rec2.version)
eq(rec1.data, rec2.data)
# Make sure there are no more records left in rec1 and rec2,
......
......@@ -154,9 +154,12 @@ class StorageClientThread(TestThread):
class ExtStorageClientThread(StorageClientThread):
def runtest(self):
# pick some other storage ops to execute
ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
if meth.startswith('do_')]
# pick some other storage ops to execute, depending in part
# on the features provided by the storage.
names = ["do_load", "do_modifiedInVersion"]
if self.storage.supportsUndo():
names += ["do_loadSerial", "do_undoLog", "do_iterator"]
ops = [getattr(self, meth) for meth in names]
assert ops, "Didn't find an storage ops in %s" % self.storage
# do a store to guarantee there's at least one oid in self.oids
self.dostore(0)
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -35,6 +35,10 @@ class DemoStorageTests(StorageTestBase.StorageTestBase,
# have this limit, so we inhibit this test here.
pass
def checkAbortVersionNonCurrent(self):
# XXX Need to implement a real loadBefore for DemoStorage?
pass
def test_suite():
suite = unittest.makeSuite(DemoStorageTests, 'check')
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment