Commit d033a617 authored by Jim Fulton's avatar Jim Fulton

Added a client-storage check to make sure a client doesn't connect to

an out-of-date server.
parent 5c004da1
...@@ -1219,6 +1219,10 @@ class ClientStorage(object): ...@@ -1219,6 +1219,10 @@ class ClientStorage(object):
log2("No verification necessary (last_inval_tid up-to-date)") log2("No verification necessary (last_inval_tid up-to-date)")
self.finish_verification() self.finish_verification()
return "no verification" return "no verification"
elif ltid < last_inval_tid:
message = "Client has seen newer trasnactions than server!"
log2(message, level=logging.CRITICAL)
raise ClientStorageError(message)
# log some hints about last transaction # log some hints about last transaction
log2("last inval tid: %r %s\n" log2("last inval tid: %r %s\n"
......
...@@ -198,10 +198,12 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -198,10 +198,12 @@ class CommonSetupTearDown(StorageTestBase):
zconf.transaction_timeout = self.timeout zconf.transaction_timeout = self.timeout
return zconf return zconf
def startServer(self, create=1, index=0, read_only=0, ro_svr=0, keep=None): def startServer(self, create=1, index=0, read_only=0, ro_svr=0, keep=None,
path=None):
addr = self.addr[index] addr = self.addr[index]
logging.info("startServer(create=%d, index=%d, read_only=%d) @ %s" % logging.info("startServer(create=%d, index=%d, read_only=%d) @ %s" %
(create, index, read_only, addr)) (create, index, read_only, addr))
if path is None:
path = "%s.%d" % (self.file, index) path = "%s.%d" % (self.file, index)
sconf = self.getConfig(path, create, read_only) sconf = self.getConfig(path, create, read_only)
zconf = self.getServerConfig(addr, ro_svr) zconf = self.getServerConfig(addr, ro_svr)
...@@ -268,30 +270,6 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -268,30 +270,6 @@ class ConnectionTests(CommonSetupTearDown):
self._dostore(oid, data=obj) self._dostore(oid, data=obj)
self._storage.close() self._storage.close()
def checkMultipleServers(self):
# Crude test-- just start two servers and do a commit at each one.
self._newAddr()
self._storage = self.openClientStorage('test', 100000)
self._dostore()
self.shutdownServer(index=0)
self.startServer(index=1)
# If we can still store after shutting down one of the
# servers, we must be reconnecting to the other server.
did_a_store = 0
for i in range(10):
try:
self._dostore()
did_a_store = 1
break
except ClientDisconnected:
time.sleep(0.5)
self.assert_(did_a_store)
self._storage.close()
def checkReadOnlyClient(self): def checkReadOnlyClient(self):
# Open a read-only client to a read-write server; stores fail # Open a read-only client to a read-write server; stores fail
...@@ -341,33 +319,6 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -341,33 +319,6 @@ class ConnectionTests(CommonSetupTearDown):
self.assertRaises(ReadOnlyError, self._dostore) self.assertRaises(ReadOnlyError, self._dostore)
self._storage.close() self._storage.close()
# TODO: Compare checkReconnectXXX() here to checkReconnection()
# further down. Is the code here hopelessly naive, or is
# checkReconnection() overwrought?
def checkReconnectWritable(self):
# A read-write client reconnects to a read-write server
# Start a client
self._storage = self.openClientStorage()
# Stores should succeed here
self._dostore()
# Shut down the server
self.shutdownServer()
self._servers = []
# Poll until the client disconnects
self.pollDown()
# Stores should fail now
self.assertRaises(ClientDisconnected, self._dostore)
# Restart the server
self.startServer(create=0)
# Poll until the client connects
self.pollUp()
# Stores should succeed here
self._dostore()
self._storage.close()
def checkDisconnectionError(self): def checkDisconnectionError(self):
# Make sure we get a ClientDisconnected when we try to read an # Make sure we get a ClientDisconnected when we try to read an
...@@ -379,41 +330,6 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -379,41 +330,6 @@ class ConnectionTests(CommonSetupTearDown):
self._storage.load, 'fredwash', '') self._storage.load, 'fredwash', '')
self._storage.close() self._storage.close()
def checkDisconnectedAbort(self):
self._storage = self.openClientStorage()
self._dostore()
oids = [self._storage.new_oid() for i in range(5)]
txn = Transaction()
self._storage.tpc_begin(txn)
for oid in oids:
data = zodb_pickle(MinPO(oid))
self._storage.store(oid, None, data, '', txn)
self.shutdownServer()
self.assertRaises(ClientDisconnected, self._storage.tpc_vote, txn)
self._storage.tpc_abort(txn)
self.startServer(create=0)
self._storage._wait()
self._dostore()
# This test is supposed to cover the following error, although
# I don't have much confidence that it does. The likely
# explanation for the error is that the _tbuf contained
# objects that weren't in the _seriald, because the client was
# interrupted waiting for tpc_vote() to return. When the next
# transaction committed, it tried to do something with the
# bogus _tbuf entries. The explanation is wrong/incomplete,
# because tpc_begin() should clear the _tbuf.
# 2003-01-15T15:44:19 ERROR(200) ZODB A storage error occurred
# in the last phase of a two-phase commit. This shouldn't happen.
# Traceback (innermost last):
# Module ZODB.Transaction, line 359, in _finish_one
# Module ZODB.Connection, line 691, in tpc_finish
# Module ZEO.ClientStorage, line 679, in tpc_finish
# Module ZEO.ClientStorage, line 709, in _update_cache
# KeyError: ...
def checkBasicPersistence(self): def checkBasicPersistence(self):
# Verify cached data persists across client storage instances. # Verify cached data persists across client storage instances.
...@@ -539,33 +455,6 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -539,33 +455,6 @@ class ConnectionTests(CommonSetupTearDown):
self.assertEqual(newobj_copy, newobj) self.assertEqual(newobj_copy, newobj)
self._storage.close() self._storage.close()
def checkReconnection(self):
# Check that the client reconnects when a server restarts.
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
obj = MinPO(12)
self._dostore(oid, data=obj)
logging.info("checkReconnection(): About to shutdown server")
self.shutdownServer()
logging.info("checkReconnection(): About to restart server")
self.startServer(create=0)
oid = self._storage.new_oid()
obj = MinPO(12)
while 1:
try:
self._dostore(oid, data=obj)
break
except ClientDisconnected:
# Maybe the exception mess is better now
logging.info("checkReconnection(): Error after"
" server restart; retrying.", exc_info=True)
transaction.abort()
# Give the other thread a chance to run.
time.sleep(0.1)
logging.info("checkReconnection(): finished")
self._storage.close()
def checkBadMessage1(self): def checkBadMessage1(self):
# not even close to a real message # not even close to a real message
self._bad_message("salty") self._bad_message("salty")
...@@ -676,6 +565,20 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -676,6 +565,20 @@ class ConnectionTests(CommonSetupTearDown):
db2.close() db2.close()
db1.close() db1.close()
def checkCheckForOutOfDateServer(self):
# We don't want to connect a client to a server if the client
# has seen newer transactions.
self._storage = self.openClientStorage()
self._dostore()
self.shutdownServer()
self.assertRaises(ClientDisconnected, self._storage.load, '\0'*8, '')
self.startServer()
# No matter how long we wait, the client won't reconnect:
time.sleep(2)
self.assertRaises(ClientDisconnected, self._storage.load, '\0'*8, '')
class InvqTests(CommonSetupTearDown): class InvqTests(CommonSetupTearDown):
invq = 3 invq = 3
...@@ -783,6 +686,34 @@ class ReconnectionTests(CommonSetupTearDown): ...@@ -783,6 +686,34 @@ class ReconnectionTests(CommonSetupTearDown):
# Stores should fail here # Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore) self.assertRaises(ReadOnlyError, self._dostore)
# TODO: Compare checkReconnectXXX() here to checkReconnection()
# further down. Is the code here hopelessly naive, or is
# checkReconnection() overwrought?
def checkReconnectWritable(self):
# A read-write client reconnects to a read-write server
# Start a client
self._storage = self.openClientStorage()
# Stores should succeed here
self._dostore()
# Shut down the server
self.shutdownServer()
self._servers = []
# Poll until the client disconnects
self.pollDown()
# Stores should fail now
self.assertRaises(ClientDisconnected, self._dostore)
# Restart the server
self.startServer(create=0)
# Poll until the client connects
self.pollUp()
# Stores should succeed here
self._dostore()
self._storage.close()
def checkReconnectReadOnly(self): def checkReconnectReadOnly(self):
# A read-only client reconnects from a read-write to a # A read-only client reconnects from a read-write to a
# read-only server # read-only server
...@@ -939,6 +870,98 @@ class ReconnectionTests(CommonSetupTearDown): ...@@ -939,6 +870,98 @@ class ReconnectionTests(CommonSetupTearDown):
perstorage.close() perstorage.close()
self._storage.close() self._storage.close()
def checkDisconnectedAbort(self):
self._storage = self.openClientStorage()
self._dostore()
oids = [self._storage.new_oid() for i in range(5)]
txn = Transaction()
self._storage.tpc_begin(txn)
for oid in oids:
data = zodb_pickle(MinPO(oid))
self._storage.store(oid, None, data, '', txn)
self.shutdownServer()
self.assertRaises(ClientDisconnected, self._storage.tpc_vote, txn)
self._storage.tpc_abort(txn)
self.startServer(create=0)
self._storage._wait()
self._dostore()
# This test is supposed to cover the following error, although
# I don't have much confidence that it does. The likely
# explanation for the error is that the _tbuf contained
# objects that weren't in the _seriald, because the client was
# interrupted waiting for tpc_vote() to return. When the next
# transaction committed, it tried to do something with the
# bogus _tbuf entries. The explanation is wrong/incomplete,
# because tpc_begin() should clear the _tbuf.
# 2003-01-15T15:44:19 ERROR(200) ZODB A storage error occurred
# in the last phase of a two-phase commit. This shouldn't happen.
# Traceback (innermost last):
# Module ZODB.Transaction, line 359, in _finish_one
# Module ZODB.Connection, line 691, in tpc_finish
# Module ZEO.ClientStorage, line 679, in tpc_finish
# Module ZEO.ClientStorage, line 709, in _update_cache
# KeyError: ...
def checkReconnection(self):
# Check that the client reconnects when a server restarts.
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
obj = MinPO(12)
self._dostore(oid, data=obj)
logging.info("checkReconnection(): About to shutdown server")
self.shutdownServer()
logging.info("checkReconnection(): About to restart server")
self.startServer(create=0)
oid = self._storage.new_oid()
obj = MinPO(12)
while 1:
try:
self._dostore(oid, data=obj)
break
except ClientDisconnected:
# Maybe the exception mess is better now
logging.info("checkReconnection(): Error after"
" server restart; retrying.", exc_info=True)
transaction.abort()
# Give the other thread a chance to run.
time.sleep(0.1)
logging.info("checkReconnection(): finished")
self._storage.close()
def checkMultipleServers(self):
# Crude test-- just start two servers and do a commit at each one.
self._newAddr()
self._storage = self.openClientStorage('test', 100000)
self._dostore()
self.shutdownServer(index=0)
# When we start the second server, we use file data file from
# the original server so tha the new server is a replica of
# the original. We need this becaise ClientStorage won't use
# a server if the server's last transaction is earlier than
# what the client has seen.
self.startServer(index=1, path=self.file+'.0', create=False)
# If we can still store after shutting down one of the
# servers, we must be reconnecting to the other server.
did_a_store = 0
for i in range(10):
try:
self._dostore()
did_a_store = 1
break
except ClientDisconnected:
time.sleep(0.5)
self.assert_(did_a_store)
self._storage.close()
class TimeoutTests(CommonSetupTearDown): class TimeoutTests(CommonSetupTearDown):
timeout = 1 timeout = 1
......
...@@ -4,7 +4,7 @@ ZEO Fan Out ...@@ -4,7 +4,7 @@ ZEO Fan Out
We should be able to set up ZEO servers with ZEO clients. Let's see We should be able to set up ZEO servers with ZEO clients. Let's see
if we can make it work. if we can make it work.
We'll use some helper functions. The first is a helpter that starts We'll use some helper functions. The first is a helper that starts
ZEO servers for us and another one that picks ports. ZEO servers for us and another one that picks ports.
We'll start the first server: We'll start the first server:
...@@ -13,7 +13,8 @@ We'll start the first server: ...@@ -13,7 +13,8 @@ We'll start the first server:
>>> port0 = ZEO.tests.testZEO.get_port() >>> port0 = ZEO.tests.testZEO.get_port()
>>> zconf0 = ZEO.tests.forker.ZEOConfig(('', port0)) >>> zconf0 = ZEO.tests.forker.ZEOConfig(('', port0))
>>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server( >>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0) ... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0,
... port0, keep=1)
Then we''ll start 2 others that use this one: Then we''ll start 2 others that use this one:
...@@ -56,13 +57,6 @@ And some databases and connections around these: ...@@ -56,13 +57,6 @@ And some databases and connections around these:
>>> r2 >>> r2
{} {}
>>> db2 = DB(cs2)
>>> tm2 = transaction.TransactionManager()
>>> c2 = db2.open(transaction_manager=tm2)
>>> r2 = c2.root()
>>> r2
{}
If we update c1, we'll eventually see the change in c2: If we update c1, we'll eventually see the change in c2:
>>> import persistent.mapping >>> import persistent.mapping
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment