Commit 032d309c authored by Jeremy Hylton's avatar Jeremy Hylton

Backport atomic invalidations code from Zope3.

The DB's invalidate() method takes a set of oids corresponding to all
the changes from a data manager for one transaction.  All the objects
are invalidated at once.

Add a few tests in testZODB of the new code.  The tests just cover
corner cases, because I can't think of a sensible way to test the
atomicity.  When it has failed in the past, it's been caused by
nearly-impossible to reproduce data races.

This fix needs to be backported to Zope 2.6, but only after assessing
how significant an impact the API change will have.
parent 99f670a9
......@@ -90,7 +90,7 @@ process must skip such objects, rather than deactivating them.
static char cPickleCache_doc_string[] =
"Defines the PickleCache used by ZODB Connection objects.\n"
"\n"
"$Id: cPickleCache.c,v 1.80 2003/04/02 16:50:49 jeremy Exp $\n";
"$Id: cPickleCache.c,v 1.81 2003/04/08 15:55:44 jeremy Exp $\n";
#define ASSIGN(V,E) {PyObject *__e; __e=(E); Py_XDECREF(V); (V)=__e;}
#define UNLESS(E) if(!(E))
......@@ -352,6 +352,7 @@ cc_invalidate(ccobject *self, PyObject *args)
_invalidate(self, key);
Py_DECREF(key);
}
/* XXX Do we really want to modify the input? */
PySequence_DelSlice(inv, 0, l);
}
}
......
This diff is collapsed.
......@@ -13,8 +13,8 @@
##############################################################################
"""Database objects
$Id: DB.py,v 1.47 2003/01/17 17:23:14 shane Exp $"""
__version__='$Revision: 1.47 $'[11:-2]
$Id: DB.py,v 1.48 2003/04/08 15:55:44 jeremy Exp $"""
__version__='$Revision: 1.48 $'[11:-2]
import cPickle, cStringIO, sys, POSException, UndoLogCompatible
from Connection import Connection
......@@ -26,6 +26,12 @@ from zLOG import LOG, ERROR
from types import StringType
def list2dict(L):
d = {}
for elt in L:
d[elt] = 1
return d
class DB(UndoLogCompatible.UndoLogCompatible):
"""The Object Database
......@@ -282,17 +288,7 @@ class DB(UndoLogCompatible.UndoLogCompatible):
def importFile(self, file):
raise 'Not yet implemented'
def begin_invalidation(self):
# Must be called before first call to invalidate and before
# the storage lock is held.
self._a()
def finish_invalidation(self):
# Must be called after begin_invalidation() and after final
# invalidate() call.
self._r()
def invalidate(self, oid, connection=None, version='',
def invalidate(self, oids, connection=None, version='',
rc=sys.getrefcount):
"""Invalidate references to a given oid.
......@@ -304,9 +300,11 @@ class DB(UndoLogCompatible.UndoLogCompatible):
if connection is not None:
version=connection._version
# Update modified in version cache
h=hash(oid)%131
o=self._miv_cache.get(h, None)
if o is not None and o[0]==oid: del self._miv_cache[h]
# XXX must make this work with list or dict to backport to 2.6
for oid in oids:
h=hash(oid)%131
o=self._miv_cache.get(h, None)
if o is not None and o[0]==oid: del self._miv_cache[h]
# Notify connections
for pool, allocated in self._pools[1]:
......@@ -315,7 +313,7 @@ class DB(UndoLogCompatible.UndoLogCompatible):
(not version or cc._version==version)):
if rc(cc) <= 3:
cc.close()
cc.invalidate(oid)
cc.invalidate(oids)
temps=self._temps
if temps:
......@@ -324,7 +322,7 @@ class DB(UndoLogCompatible.UndoLogCompatible):
if rc(cc) > 3:
if (cc is not connection and
(not version or cc._version==version)):
cc.invalidate(oid)
cc.invalidate(oids)
t.append(cc)
else: cc.close()
self._temps=t
......@@ -561,8 +559,10 @@ class DB(UndoLogCompatible.UndoLogCompatible):
transaction.register(TransactionalUndo(self, id))
else:
# fall back to old undo
d = {}
for oid in storage.undo(id):
self.invalidate(oid)
d[oid] = 1
self.invalidate(d)
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
......@@ -589,14 +589,14 @@ class CommitVersion:
def abort(self, reallyme, t): pass
def commit(self, reallyme, t):
db=self._db
dest=self._dest
oids=db._storage.commitVersion(self._version, dest, t)
for oid in oids: db.invalidate(oid, version=dest)
oids = self._db._storage.commitVersion(self._version, dest, t)
oids = list2dict(oids)
self._db.invalidate(oids, version=dest)
if dest:
# the code above just invalidated the dest version.
# now we need to invalidate the source!
for oid in oids: db.invalidate(oid, version=self._version)
self._db.invalidate(oids, version=self._version)
class AbortVersion(CommitVersion):
"""An object that will see to version abortion
......@@ -605,11 +605,9 @@ class AbortVersion(CommitVersion):
"""
def commit(self, reallyme, t):
db=self._db
version=self._version
oids = db._storage.abortVersion(version, t)
for oid in oids:
db.invalidate(oid, version=version)
oids = self._db._storage.abortVersion(version, t)
self._db.invalidate(list2dict(oids), version=version)
class TransactionalUndo(CommitVersion):
......@@ -623,7 +621,5 @@ class TransactionalUndo(CommitVersion):
# similarity of rhythm that I think it's justified.
def commit(self, reallyme, t):
db=self._db
oids=db._storage.transactionalUndo(self._version, t)
for oid in oids:
db.invalidate(oid)
oids = self._db._storage.transactionalUndo(self._version, t)
self._db.invalidate(list2dict(oids))
......@@ -90,7 +90,7 @@ process must skip such objects, rather than deactivating them.
static char cPickleCache_doc_string[] =
"Defines the PickleCache used by ZODB Connection objects.\n"
"\n"
"$Id: cPickleCache.c,v 1.80 2003/04/02 16:50:49 jeremy Exp $\n";
"$Id: cPickleCache.c,v 1.81 2003/04/08 15:55:44 jeremy Exp $\n";
#define ASSIGN(V,E) {PyObject *__e; __e=(E); Py_XDECREF(V); (V)=__e;}
#define UNLESS(E) if(!(E))
......@@ -352,6 +352,7 @@ cc_invalidate(ccobject *self, PyObject *args)
_invalidate(self, key);
Py_DECREF(key);
}
/* XXX Do we really want to modify the input? */
PySequence_DelSlice(inv, 0, l);
}
}
......
......@@ -11,16 +11,52 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import sys, os
import unittest
import ZODB
import ZODB.FileStorage
from ZODB.PersistentMapping import PersistentMapping
from ZODB.POSException import ReadConflictError
from ZODB.tests.StorageTestBase import removefs
import unittest
from Persistence import Persistent
class P(Persistent):
pass
class Independent(Persistent):
def _p_independent(self):
return True
class DecoyIndependent(Persistent):
def _p_independent(self):
return False
class ZODBTests(unittest.TestCase):
def setUp(self):
self._storage = ZODB.FileStorage.FileStorage(
'ZODBTests.fs', create=1)
self._db = ZODB.DB(self._storage)
def populate(self):
get_transaction().begin()
conn = self._db.open()
root = conn.root()
root['test'] = pm = PersistentMapping()
for n in range(100):
pm[n] = PersistentMapping({0: 100 - n})
get_transaction().note('created test data')
get_transaction().commit()
conn.close()
class ExportImportTests:
def checkDuplicate(self, abort_it=0, dup_name='test_duplicate'):
def tearDown(self):
self._storage.close()
removefs("ZODBTests.fs")
def checkExportImport(self, abort_it=0, dup_name='test_duplicate'):
self.populate()
get_transaction().begin()
get_transaction().note('duplication')
# Duplicate the 'test' object.
......@@ -83,29 +119,8 @@ class ExportImportTests:
finally:
conn.close()
def checkDuplicateAborted(self):
self.checkDuplicate(abort_it=1, dup_name='test_duplicate_aborted')
class ZODBTests(unittest.TestCase, ExportImportTests):
def setUp(self):
self._storage = ZODB.FileStorage.FileStorage(
'ZODBTests.fs', create=1)
self._db = ZODB.DB(self._storage)
get_transaction().begin()
conn = self._db.open()
root = conn.root()
root['test'] = pm = PersistentMapping()
for n in range(100):
pm[n] = PersistentMapping({0: 100 - n})
get_transaction().note('created test data')
get_transaction().commit()
conn.close()
def tearDown(self):
self._storage.close()
removefs("ZODBTests.fs")
def checkExportImportAborted(self):
self.checkExportImport(abort_it=1, dup_name='test_duplicate_aborted')
def checkVersionOnly(self):
# Make sure the changes to make empty transactions a no-op
......@@ -124,6 +139,7 @@ class ZODBTests(unittest.TestCase, ExportImportTests):
def checkResetCache(self):
# The cache size after a reset should be 0 and the GC attributes
# ought to be linked to it rather than the old cache.
self.populate()
conn = self._db.open()
try:
conn.root()
......@@ -173,10 +189,99 @@ class ZODBTests(unittest.TestCase, ExportImportTests):
conn1.close()
conn2.close()
def checkReadConflict(self):
self.obj = P()
self.readConflict()
def test_suite():
return unittest.makeSuite(ZODBTests, 'check')
def readConflict(self, shouldFail=True):
# Two transactions run concurrently. Each reads some object,
# then one commits and the other tries to read an object
# modified by the first. This read should fail with a conflict
# error because the object state read is not necessarily
# consistent with the objects read earlier in the transaction.
conn = self._db.open()
conn.setLocalTransaction()
r1 = conn.root()
r1["p"] = self.obj
self.obj.child1 = P()
conn.getTransaction().commit()
if __name__=='__main__':
unittest.main(defaultTest='test_suite')
# start a new transaction with a new connection
cn2 = self._db.open()
# start a new transaction with the other connection
cn2.setLocalTransaction()
r2 = cn2.root()
self.assertEqual(r1._p_serial, r2._p_serial)
self.obj.child2 = P()
conn.getTransaction().commit()
# resume the transaction using cn2
obj = r2["p"]
# An attempt to access obj should fail, because r2 was read
# earlier in the transaction and obj was modified by the othe
# transaction.
if shouldFail:
self.assertRaises(ReadConflictError, lambda: obj.child1)
else:
# make sure that accessing the object succeeds
obj.child1
cn2.getTransaction().abort()
def testReadConflictIgnored(self):
# Test that an application that catches a read conflict and
# continues can not commit the transaction later.
root = self._db.open().root()
root["real_data"] = real_data = PersistentDict()
root["index"] = index = PersistentDict()
real_data["a"] = PersistentDict({"indexed_value": False})
real_data["b"] = PersistentDict({"indexed_value": True})
index[True] = PersistentDict({"b": 1})
index[False] = PersistentDict({"a": 1})
get_transaction().commit()
# load some objects from one connection
cn2 = self._db.open()
cn2.setLocalTransaction()
r2 = cn2.root()
real_data2 = r2["real_data"]
index2 = r2["index"]
real_data["b"]["indexed_value"] = False
del index[True]["b"]
index[False]["b"] = 1
cn2.getTransaction().commit()
del real_data2["a"]
try:
del index2[False]["a"]
except ReadConflictError:
# This is the crux of the text. Ignore the error.
pass
else:
self.fail("No conflict occurred")
# real_data2 still ready to commit
self.assert_(real_data2._p_changed)
# index2 values not ready to commit
self.assert_(not index2._p_changed)
self.assert_(not index2[False]._p_changed)
self.assert_(not index2[True]._p_changed)
self.assertRaises(ConflictError, get_transaction().commit)
get_transaction().abort()
def checkIndependent(self):
self.obj = Independent()
self.readConflict(shouldFail=False)
def checkNotIndependent(self):
self.obj = DecoyIndependent()
self.readConflict()
def test_suite():
return unittest.makeSuite(ZODBTests, 'check')
......@@ -90,7 +90,7 @@ process must skip such objects, rather than deactivating them.
static char cPickleCache_doc_string[] =
"Defines the PickleCache used by ZODB Connection objects.\n"
"\n"
"$Id: cPickleCache.c,v 1.80 2003/04/02 16:50:49 jeremy Exp $\n";
"$Id: cPickleCache.c,v 1.81 2003/04/08 15:55:44 jeremy Exp $\n";
#define ASSIGN(V,E) {PyObject *__e; __e=(E); Py_XDECREF(V); (V)=__e;}
#define UNLESS(E) if(!(E))
......@@ -352,6 +352,7 @@ cc_invalidate(ccobject *self, PyObject *args)
_invalidate(self, key);
Py_DECREF(key);
}
/* XXX Do we really want to modify the input? */
PySequence_DelSlice(inv, 0, l);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment