Commit 776f02c0 authored by Jeremy Hylton's avatar Jeremy Hylton

Rewrite tests to make sure they actually test the commit lock.

parent 24f89d25
...@@ -51,8 +51,7 @@ class WorkerThread(TestThread): ...@@ -51,8 +51,7 @@ class WorkerThread(TestThread):
oid = self.storage.new_oid() oid = self.storage.new_oid()
p = zodb_pickle(MinPO("c")) p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans) self.storage.store(oid, ZERO, p, '', self.trans)
self.ready.set() self.myvote()
self.storage.tpc_vote(self.trans)
if self.method == "tpc_finish": if self.method == "tpc_finish":
self.storage.tpc_finish(self.trans) self.storage.tpc_finish(self.trans)
else: else:
...@@ -60,44 +59,37 @@ class WorkerThread(TestThread): ...@@ -60,44 +59,37 @@ class WorkerThread(TestThread):
except Disconnected: except Disconnected:
pass pass
class CommitLockTests: def myvote(self):
# The vote() call is synchronous, which makes it difficult to
# coordinate the action of multiple threads that all call
# vote(). This method sends the vote call, then sets the
# event saying vote was called, then waits for the vote
# response. It digs deep into the implementation of the client.
# The commit lock tests verify that the storage successfully # This method is a replacement for:
# blocks and restarts transactions when there is content for a # self.ready.set()
# single storage. There are a lot of cases to cover. # self.storage.tpc_vote(self.trans)
# CommitLock1 checks the case where a single transaction delays rpc = self.storage._server.rpc
# other transactions before they actually block. IOW, by the time msgid = rpc._deferred_call('vote', self.storage._serial)
# the other transactions get to the vote stage, the first self.ready.set()
# transaction has finished. rpc._deferred_wait(msgid)
self.storage._check_serials()
def checkCommitLock1OnCommit(self): class CommitLockTests:
self._storages = []
try:
self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
finally:
self._cleanup()
def checkCommitLock1OnAbort(self): NUM_CLIENTS = 5
self._storages = []
try:
self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
finally:
self._cleanup()
def checkCommitLock2OnCommit(self): # The commit lock tests verify that the storage successfully
self._storages = [] # blocks and restarts transactions when there is contention for a
try: # single storage. There are a lot of cases to cover. transaction
self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2) # has finished.
finally:
self._cleanup()
def checkCommitLock2OnAbort(self): # The general flow of these tests is to start a transaction by
self._storages = [] # getting far enough into 2PC to acquire the commit lock. Then
try: # begin one or more other connections that also want to commit.
self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2) # This causes the commit lock code to be exercised. Once the
finally: # other connections are started, the first transaction completes.
self._cleanup()
def _cleanup(self): def _cleanup(self):
for store, trans in self._storages: for store, trans in self._storages:
...@@ -105,77 +97,135 @@ class CommitLockTests: ...@@ -105,77 +97,135 @@ class CommitLockTests:
store.close() store.close()
self._storages = [] self._storages = []
def _checkCommitLock(self, method_name, dosetup, dowork): def _start_txn(self):
# check the commit lock when a client attemps a transaction, txn = Transaction()
# but fails/exits before finishing the commit. self._storage.tpc_begin(txn)
# The general flow of these tests is to start a transaction by
# calling tpc_begin(). Then begin one or more other
# connections that also want to commit. This causes the
# commit lock code to be exercised. Once the other
# connections are started, the first transaction completes.
# Either by commit or abort, depending on whether method_name
# is "tpc_finish."
# The tests are parameterized by method_name, dosetup(), and
# dowork(). The dosetup() function is called with a
# connectioned client storage, transaction, and timestamp.
# Any work it does occurs after the first transaction has
# started, but before it finishes. The dowork() function
# executes after the first transaction has completed.
# Start on transaction normally and get the lock.
t = Transaction()
self._storage.tpc_begin(t)
oid = self._storage.new_oid() oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t) self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
self._storage.tpc_vote(t) return oid, txn
# Start a second transaction on a different connection without def checkCommitLockVoteFinish(self):
# blocking the test thread. oid, txn = self._start_txn()
self._storages = [] self._storage.tpc_vote(txn)
for i in range(4):
storage2 = self._duplicate_client() self._begin_threads()
t2 = Transaction()
tid = self._get_timestamp()
dosetup(storage2, t2, tid)
if i == 0:
storage2.close()
else:
self._storages.append((storage2, t2))
if method_name == "tpc_finish": self._storage.tpc_finish(txn)
self._storage.tpc_finish(t) self._storage.load(oid, '')
self._storage.load(oid, '')
else:
self._storage.tpc_abort(t)
dowork(method_name) self._finish_threads()
# Make sure the server is still responsive
self._dostore() self._dostore()
self._cleanup()
def checkCommitLockVoteAbort(self):
oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
def _dosetup1(self, storage, trans, tid): self._begin_threads()
storage.tpc_begin(trans, tid)
def _dowork1(self, method_name): self._storage.tpc_abort(txn)
for store, trans in self._storages:
oid = store.new_oid() self._finish_threads()
store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
store.tpc_vote(trans) self._dostore()
if method_name == "tpc_finish": self._cleanup()
store.tpc_finish(trans)
else: def checkCommitLockVoteClose(self):
store.tpc_abort(trans) oid, txn = self._start_txn()
self._storage.tpc_vote(txn)
self._begin_threads()
def _dosetup2(self, storage, trans, tid): self._storage.close()
self._finish_threads()
self._cleanup()
def _get_trans_id(self):
self._dostore()
L = self._storage.undoInfo()
return L[0]['id']
def _begin_undo(self, trans_id):
rpc = self._storage._server.rpc
return rpc._deferred_call('transactionalUndo', trans_id,
self._storage._serial)
def _finish_undo(self, msgid):
return self._storage._server.rpc._deferred_wait(msgid)
def checkCommitLockUndoFinish(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
self._begin_threads()
self._finish_undo(msgid)
self._storage.tpc_vote(txn)
self._storage.tpc_finish(txn)
self._storage.load(oid, '')
self._finish_threads()
self._dostore()
self._cleanup()
def checkCommitLockUndoAbort(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
self._begin_threads()
self._finish_undo(msgid)
self._storage.tpc_vote(txn)
self._storage.tpc_abort(txn)
self._finish_threads()
self._dostore()
self._cleanup()
def checkCommitLockUndoClose(self):
trans_id = self._get_trans_id()
oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id)
self._begin_threads()
self._finish_undo(msgid)
self._storage.tpc_vote(txn)
self._storage.close()
self._finish_threads()
self._cleanup()
def _begin_threads(self):
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
self._threads = [] self._threads = []
t = WorkerThread(self, storage, trans)
self._threads.append(t) for i in range(self.NUM_CLIENTS):
t.start() storage = self._duplicate_client()
t.ready.wait() txn = Transaction()
tid = self._get_timestamp()
t = WorkerThread(self, storage, txn)
self._threads.append(t)
t.start()
t.ready.wait()
# Close on the connections abnormally to test server response
if i == 0:
storage.close()
else:
self._storages.append((storage, txn))
def _dowork2(self, method_name): def _finish_threads(self):
for t in self._threads: for t in self._threads:
t.cleanup() t.cleanup()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment