Commit b9633b32 authored by Jim Fulton's avatar Jim Fulton

Fixed the "potential ZODB cache inconsistency after client reconnect"

problem reported on zodb-dev:

http://mail.zope.org/pipermail/zodb-dev/2006-August/010343.html

Added a new invalidateCache protocol for DBs and Connections to
invalidate the entire in-memory caches.  This is used when ZEO clients
reconnect.
parent 853e6f95
......@@ -460,6 +460,10 @@ class ClientStorage(object):
# this method before it was stopped.
return
# invalidate our db cache
if self._db is not None:
self._db.invalidateCache()
# TODO: report whether we get a read-only connection.
if self._connection is not None:
reconnect = 1
......
......@@ -307,6 +307,53 @@ class CatastrophicClientLoopFailure(
self.assertEqual(log[1][0], "Couldn't close a dispatcher.")
self.assert_('exc_info' in log[1][1])
class ConnectionInvalidationOnReconnect(
ZEO.tests.ConnectionTests.CommonSetupTearDown):
"""Test what happens when the client loop falls over
"""
def getConfig(self, path, create, read_only):
return """<mappingstorage 1/>"""
def checkConnectionInvalidationOnReconnect(self):
storage = ClientStorage(self.addr, wait=1, min_disconnect_poll=0.1)
self._storage = storage
# and we'll wait for the storage to be reconnected:
for i in range(100):
if storage.is_connected():
break
time.sleep(0.1)
else:
raise AssertionError("Couldn't connect to server")
class DummyDB:
_invalidatedCache = 0
def invalidateCache(self):
self._invalidatedCache += 1
def invalidate(*a, **k):
pass
db = DummyDB()
storage.registerDB(db, None)
base = db._invalidatedCache
# Now we'll force a disconnection and reconnection
storage._connection.close()
# and we'll wait for the storage to be reconnected:
for i in range(100):
if storage.is_connected():
break
time.sleep(0.1)
else:
raise AssertionError("Couldn't connect to server")
# Now, the root object in the connection should have been invalidated:
self.assertEqual(db._invalidatedCache, base+1)
class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
......@@ -345,6 +392,7 @@ test_classes = [OneTimeTests,
DemoStorageWrappedAroundClientStorage,
HeartbeatTests,
CatastrophicClientLoopFailure,
ConnectionInvalidationOnReconnect,
]
def test_suite():
......
......@@ -130,7 +130,10 @@ class Connection(ExportImport, object):
# critical sections (if any -- this needs careful thought).
self._inv_lock = threading.Lock()
self._invalidated = d = {}
self._invalidated = {}
# Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False
# We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that
......@@ -288,6 +291,14 @@ class Connection(ExportImport, object):
finally:
self._inv_lock.release()
def invalidateCache(self):
self._inv_lock.acquire()
try:
self._invalidatedCache = True
finally:
self._inv_lock.release()
def root(self):
"""Return the database root object."""
return self.get(z64)
......@@ -450,6 +461,9 @@ class Connection(ExportImport, object):
invalidated = self._invalidated
self._invalidated = {}
self._txn_time = None
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
finally:
self._inv_lock.release()
......@@ -501,6 +515,9 @@ class Connection(ExportImport, object):
self._added_during_commit = []
if self._invalidatedCache:
raise ConflictError()
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
......@@ -759,6 +776,10 @@ class Connection(ExportImport, object):
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if self._invalidatedCache:
raise ReadConflictError()
if (obj._p_oid in self._invalidated and
not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
......@@ -947,6 +968,7 @@ class Connection(ExportImport, object):
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
self._invalidatedCache = False
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
......
......@@ -480,6 +480,12 @@ class DB(object):
c.invalidate(tid, oids)
self._connectionMap(inval)
def invalidateCache(self):
"""Invalidate each of the connection caches
"""
self._miv_cache.clear()
self._connectionMap(lambda c: c.invalidateCache())
def modifiedInVersion(self, oid):
h = hash(oid) % 131
cache = self._miv_cache
......
......@@ -283,40 +283,51 @@ class IConnection(Interface):
If clear is True, reset the counters.
"""
def invalidateCache():
"""Invalidate the connection cache
This invalidates *all* objects in the cache. If the connection
is open, subsequent reads will fail until a new transaction
begins or until the connection os reopned.
"""
class IDatabase(Interface):
"""ZODB DB.
TODO: This interface is incomplete.
"""
def __init__(storage,
pool_size=7,
cache_size=400,
version_pool_size=3,
version_cache_size=100,
database_name='unnamed',
databases=None,
):
"""Create an object database.
storage: the storage used by the database, e.g. FileStorage
pool_size: expected maximum number of open connections
cache_size: target size of Connection object cache, in number of
objects
version_pool_size: expected maximum number of connections (per
version)
version_cache_size: target size of Connection object cache for
version connections, in number of objects
database_name: when using a multi-database, the name of this DB
within the database group. It's a (detected) error if databases
is specified too and database_name is already a key in it.
This becomes the value of the DB's database_name attribute.
databases: when using a multi-database, a mapping to use as the
binding of this DB's .databases attribute. It's intended
that the second and following DB's added to a multi-database
pass the .databases attribute set on the first DB added to the
collection.
"""
## __init__ methods don't belong in interfaces:
##
## def __init__(storage,
## pool_size=7,
## cache_size=400,
## version_pool_size=3,
## version_cache_size=100,
## database_name='unnamed',
## databases=None,
## ):
## """Create an object database.
## storage: the storage used by the database, e.g. FileStorage
## pool_size: expected maximum number of open connections
## cache_size: target size of Connection object cache, in number of
## objects
## version_pool_size: expected maximum number of connections (per
## version)
## version_cache_size: target size of Connection object cache for
## version connections, in number of objects
## database_name: when using a multi-database, the name of this DB
## within the database group. It's a (detected) error if databases
## is specified too and database_name is already a key in it.
## This becomes the value of the DB's database_name attribute.
## databases: when using a multi-database, a mapping to use as the
## binding of this DB's .databases attribute. It's intended
## that the second and following DB's added to a multi-database
## pass the .databases attribute set on the first DB added to the
## collection.
## """
databases = Attribute("""\
A mapping from database name to DB (database) object.
......@@ -328,6 +339,12 @@ class IDatabase(Interface):
entry.
""")
def invalidateCache():
"""Invalidate all objects in the database object caches
invalidateCache will be called on each of the database's connections.
"""
class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects.
"""
......
......@@ -478,6 +478,80 @@ class InvalidationTests(unittest.TestCase):
"""
def test_invalidateCache():
"""\
The invalidateCache method invalidates a connection's cache. It also
prevents reads until the end of a transaction.
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> tm = transaction.TransactionManager()
>>> connection = db.open(transaction_manager=tm)
>>> connection.root()['a'] = StubObject()
>>> connection.root()['a'].x = 1
>>> connection.root()['b'] = StubObject()
>>> connection.root()['b'].x = 1
>>> connection.root()['c'] = StubObject()
>>> connection.root()['c'].x = 1
>>> tm.commit()
>>> connection.root()['b']._p_deactivate()
>>> connection.root()['c'].x = 2
So we have a connection and an active transaction with some
modifications. Lets call invalidateCache:
>>> connection.invalidateCache()
Now, if we try to load an object, we'll get a read conflict:
>>> connection.root()['b'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
If we try to commit the transaction, we'll get a conflict error:
>>> tm.commit()
Traceback (most recent call last):
...
ConflictError: database conflict error
and the cache will have been cleared:
>>> print connection.root()['a']._p_changed
None
>>> print connection.root()['b']._p_changed
None
>>> print connection.root()['c']._p_changed
None
But we'll be able to access data again:
>>> connection.root()['b'].x
1
Aborting a transaction after a read conflict also lets us read data
and go on about our business:
>>> connection.invalidateCache()
>>> connection.root()['c'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> tm.abort()
>>> connection.root()['c'].x
1
>>> connection.root()['c'].x = 2
>>> tm.commit()
>>> db.close()
"""
# ---- stubs
class StubObject(Persistent):
......
......@@ -18,6 +18,8 @@ import warnings
import transaction
from zope.testing import doctest
import ZODB
import ZODB.FileStorage
......@@ -130,5 +132,49 @@ class DBTests(unittest.TestCase):
self.assertEqual(nconn(pools), 3)
def test_invalidateCache():
"""\
The invalidateCache method invalidates a connection caches for all of the connections attached to a database.
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> tm1 = transaction.TransactionManager()
>>> c1 = db.open(transaction_manager=tm1)
>>> c1.root()['a'] = MinPO(1)
>>> tm1.commit()
>>> tm2 = transaction.TransactionManager()
>>> c2 = db.open(transaction_manager=tm2)
>>> c1.root()['a']._p_deactivate()
>>> tm3 = transaction.TransactionManager()
>>> c3 = db.open(transaction_manager=tm3)
>>> c3.root()['a'].value
1
>>> c3.close()
>>> db.invalidateCache()
>>> c1.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c2.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c3 is db.open(transaction_manager=tm3)
True
>>> print c3.root()['a']._p_changed
None
>>> db.close()
"""
def test_suite():
return unittest.makeSuite(DBTests)
s = unittest.makeSuite(DBTests)
s.addTest(doctest.DocTestSuite())
return s
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment