Commit 72d3a8fa authored by Jim Fulton's avatar Jim Fulton

Bug Fixed: Failures in tpc_finish of client-storages weren't handled

  correctly, leaving the client storage in an inconsistent state.
parent 811fefbf
...@@ -1075,6 +1075,8 @@ class ClientStorage(object): ...@@ -1075,6 +1075,8 @@ class ClientStorage(object):
# tpc_cond condition variable prevents more than one # tpc_cond condition variable prevents more than one
# thread from calling tpc_finish() at a time. # thread from calling tpc_finish() at a time.
tid = self._server.tpc_finish(id(txn)) tid = self._server.tpc_finish(id(txn))
try:
self._lock.acquire() # for atomic processing of invalidations self._lock.acquire() # for atomic processing of invalidations
try: try:
self._update_cache(tid) self._update_cache(tid)
...@@ -1085,10 +1087,16 @@ class ClientStorage(object): ...@@ -1085,10 +1087,16 @@ class ClientStorage(object):
r = self._check_serials() r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
except:
# The server successfully committed. If we get a failure
# here, our own state will be in question, so reconnect.
self._connection.close()
raise
self.end_transaction()
finally: finally:
self._load_lock.release() self._load_lock.release()
self._iterator_gc() self._iterator_gc()
self.end_transaction()
def _update_cache(self, tid): def _update_cache(self, tid):
"""Internal helper to handle objects modified by a transaction. """Internal helper to handle objects modified by a transaction.
......
...@@ -1075,6 +1075,8 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1075,6 +1075,8 @@ class TimeoutTests(CommonSetupTearDown):
self.assert_(storage.is_connected()) self.assert_(storage.is_connected())
# We expect finish to fail. # We expect finish to fail.
self.assertRaises(ClientDisconnected, storage.tpc_finish, t) self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
storage.tpc_abort(t)
# Now we think we've committed the second transaction, but we really # Now we think we've committed the second transaction, but we really
# haven't. A third one should produce a POSKeyError on the server, # haven't. A third one should produce a POSKeyError on the server,
# which manifests as a ConflictError on the client. # which manifests as a ConflictError on the client.
......
...@@ -1006,6 +1006,104 @@ transaction, we'll get a result: ...@@ -1006,6 +1006,104 @@ transaction, we'll get a result:
""" """
def tpc_finish_error():
r"""Server errors in tpc_finish weren't handled properly.
>>> import ZEO.ClientStorage
>>> class Connection:
... def __init__(self, client):
... self.client = client
... def get_addr(self):
... return 'server'
... def is_async(self):
... return True
... def register_object(self, ob):
... pass
... def close(self):
... print 'connection closed'
>>> class ConnectionManager:
... def __init__(self, addr, client, tmin, tmax):
... self.client = client
... def connect(self, sync=1):
... self.client.notifyConnected(Connection(self.client))
>>> class StorageServer:
... should_fail = True
... def __init__(self, conn):
... self.conn = conn
... self.t = None
... def get_info(self):
... return {}
... def endZeoVerify(self):
... self.conn.client.endVerify()
... def lastTransaction(self):
... return '\0'*8
... def tpc_begin(self, t, *args):
... if self.t is not None:
... raise TypeError('already trans')
... self.t = t
... print 'begin', args
... def vote(self, t):
... if self.t != t:
... raise TypeError('bad trans')
... print 'vote'
... def tpc_finish(self, *args):
... if self.should_fail:
... raise TypeError()
... print 'finish'
... def tpc_abort(self, t):
... if self.t != t:
... raise TypeError('bad trans')
... self.t = None
... print 'abort'
... def iterator_gc(*args):
... pass
>>> class ClientStorage(ZEO.ClientStorage.ClientStorage):
... ConnectionManagerClass = ConnectionManager
... StorageServerStubClass = StorageServer
>>> class Transaction:
... user = 'test'
... description = ''
... _extension = {}
>>> cs = ClientStorage(('', ''))
>>> t1 = Transaction()
>>> cs.tpc_begin(t1)
begin ('test', '', {}, None, ' ')
>>> cs.tpc_vote(t1)
vote
>>> cs.tpc_finish(t1)
Traceback (most recent call last):
...
TypeError
>>> cs.tpc_abort(t1)
abort
>>> t2 = Transaction()
>>> cs.tpc_begin(t2)
begin ('test', '', {}, None, ' ')
>>> cs.tpc_vote(t2)
vote
If client storage has an internal error after the storage finish
succeeeds, it will close the connection, which will force a
restart and reverification.
>>> StorageServer.should_fail = False
>>> cs._update_cache = lambda : None
>>> try: cs.tpc_finish(t2)
... except: pass
... else: print "Should have failed"
finish
connection closed
"""
test_classes = [FileStorageTests, FileStorageRecoveryTests, test_classes = [FileStorageTests, FileStorageRecoveryTests,
MappingStorageTests, DemoStorageTests, MappingStorageTests, DemoStorageTests,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment