Commit 1867f7c4 authored by Jeremy Hylton's avatar Jeremy Hylton

Rewrite tests to make sure they actually test the commit lock.

parent 4c2ed62d
......@@ -51,8 +51,7 @@ class WorkerThread(TestThread):
oid = self.storage.new_oid()
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
self.ready.set()
self.storage.tpc_vote(self.trans)
self.myvote()
if self.method == "tpc_finish":
self.storage.tpc_finish(self.trans)
else:
......@@ -60,44 +59,37 @@ class WorkerThread(TestThread):
except Disconnected:
pass
class CommitLockTests:
def myvote(self):
# The vote() call is synchronous, which makes it difficult to
# coordinate the action of multiple threads that all call
# vote(). This method sends the vote call, then sets the
# event saying vote was called, then waits for the vote
# response. It digs deep into the implementation of the client.
# The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is content for a
# single storage. There are a lot of cases to cover.
# This method is a replacement for:
# self.ready.set()
# self.storage.tpc_vote(self.trans)
# CommitLock1 checks the case where a single transaction delays
# other transactions before they actually block. IOW, by the time
# the other transactions get to the vote stage, the first
# transaction has finished.
rpc = self.storage._server.rpc
msgid = rpc._deferred_call('vote', self.storage._serial)
self.ready.set()
rpc._deferred_wait(msgid)
self.storage._check_serials()
def checkCommitLock1OnCommit(self):
self._storages = []
try:
self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
finally:
self._cleanup()
class CommitLockTests:
def checkCommitLock1OnAbort(self):
self._storages = []
try:
self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
finally:
self._cleanup()
NUM_CLIENTS = 5
def checkCommitLock2OnCommit(self):
self._storages = []
try:
self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
finally:
self._cleanup()
# The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is contention for a
# single storage. There are a lot of cases to cover. transaction
# has finished.
def checkCommitLock2OnAbort(self):
self._storages = []
try:
self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
finally:
self._cleanup()
# The general flow of these tests is to start a transaction by
# getting far enough into 2PC to acquire the commit lock. Then
# begin one or more other connections that also want to commit.
# This causes the commit lock code to be exercised. Once the
# other connections are started, the first transaction completes.
def _cleanup(self):
for store, trans in self._storages:
......@@ -105,77 +97,135 @@ class CommitLockTests:
store.close()
self._storages = []
def _checkCommitLock(self, method_name, dosetup, dowork):
# check the commit lock when a client attemps a transaction,
# but fails/exits before finishing the commit.
# The general flow of these tests is to start a transaction by
# calling tpc_begin(). Then begin one or more other
# connections that also want to commit. This causes the
# commit lock code to be exercised. Once the other
# connections are started, the first transaction completes.
# Either by commit or abort, depending on whether method_name
# is "tpc_finish."
# The tests are parameterized by method_name, dosetup(), and
# dowork(). The dosetup() function is called with a
# connectioned client storage, transaction, and timestamp.
# Any work it does occurs after the first transaction has
# started, but before it finishes. The dowork() function
# executes after the first transaction has completed.
# Start on transaction normally and get the lock.
t = Transaction()
self._storage.tpc_begin(t)
def _start_txn(self):
txn = Transaction()
self._storage.tpc_begin(txn)
oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
self._storage.tpc_vote(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
return oid, txn
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
for i in range(4):
storage2 = self._duplicate_client()
t2 = Transaction()
tid = self._get_timestamp()
dosetup(storage2, t2, tid)
if i == 0:
storage2.close()
else:
self._storages.append((storage2, t2))
def checkCommitLockVoteFinish(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
self._begin_threads()
if method_name == "tpc_finish":
self._storage.tpc_finish(t)
self._storage.load(oid, '')
else:
self._storage.tpc_abort(t)
self._storage.tpc_finish(txn)
self._storage.load(oid, '')
dowork(method_name)
self._finish_threads()
# Make sure the server is still responsive
self._dostore()
self._cleanup()
def checkCommitLockVoteAbort(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
def _dosetup1(self, storage, trans, tid):
storage.tpc_begin(trans, tid)
self._begin_threads()
def _dowork1(self, method_name):
for store, trans in self._storages:
oid = store.new_oid()
store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
store.tpc_vote(trans)
if method_name == "tpc_finish":
store.tpc_finish(trans)
else:
store.tpc_abort(trans)
self._storage.tpc_abort(txn)
self._finish_threads()
self._dostore()
self._cleanup()
def checkCommitLockVoteClose(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
self._begin_threads()
def _dosetup2(self, storage, trans, tid):
self._storage.close()
self._finish_threads()
self._cleanup()
def _get_trans_id(self):
self._dostore()
L = self._storage.undoInfo()
return L[0]['id']
def _begin_undo(self, trans_id):
rpc = self._storage._server.rpc
return rpc._deferred_call('transactionalUndo', trans_id,
self._storage._serial)
def _finish_undo(self, msgid):
return self._storage._server.rpc._deferred_wait(msgid)
def checkCommitLockUndoFinish(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
self._begin_threads()
self._finish_undo(msgid)
self._storage.tpc_vote(txn)
self._storage.tpc_finish(txn)
self._storage.load(oid, '')
self._finish_threads()
self._dostore()
self._cleanup()
def checkCommitLockUndoAbort(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
self._begin_threads()
self._finish_undo(msgid)
self._storage.tpc_vote(txn)
self._storage.tpc_abort(txn)
self._finish_threads()
self._dostore()
self._cleanup()
def checkCommitLockUndoClose(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
self._begin_threads()
self._finish_undo(msgid)
self._storage.tpc_vote(txn)
self._storage.close()
self._finish_threads()
self._cleanup()
def _begin_threads(self):
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
self._threads = []
t = WorkerThread(self, storage, trans)
self._threads.append(t)
t.start()
t.ready.wait()
for i in range(self.NUM_CLIENTS):
storage = self._duplicate_client()
txn = Transaction()
tid = self._get_timestamp()
t = WorkerThread(self, storage, txn)
self._threads.append(t)
t.start()
t.ready.wait()
# Close on the connections abnormally to test server response
if i == 0:
storage.close()
else:
self._storages.append((storage, txn))
def _dowork2(self, method_name):
def _finish_threads(self):
for t in self._threads:
t.cleanup()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment