Commit 8659d057 authored by Chris McDonough's avatar Chris McDonough

Allow tempstorage to participate in MVCC. Also fix minor bug where conflict...

Allow tempstorage to participate in MVCC.  Also fix minor bug where conflict cache could be poisoned by a transaction that failed after store was called.
parent 45652429
...@@ -28,6 +28,7 @@ from ZODB import POSException ...@@ -28,6 +28,7 @@ from ZODB import POSException
from ZODB.BaseStorage import BaseStorage from ZODB.BaseStorage import BaseStorage
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
import time import time
import bisect
# keep old object revisions for CONFLICT_CACHE_MAXAGE seconds # keep old object revisions for CONFLICT_CACHE_MAXAGE seconds
CONFLICT_CACHE_MAXAGE = 60 CONFLICT_CACHE_MAXAGE = 60
...@@ -135,6 +136,31 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -135,6 +136,31 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
finally: finally:
self._lock_release() self._lock_release()
def loadBefore(self, oid, tid):
"""Return most recent revision of oid before tid committed
(for MVCC)
."""
# implementation stolen from ZODB.test_storage.MinimalMemoryStorage
self._lock_acquire()
try:
tids = [stid for soid, stid in self._conflict_cache if soid == oid]
if not tids:
raise KeyError, oid
tids.sort()
i = bisect.bisect_left(tids, tid) -1
if i == -1:
return None
start_tid = tids[i]
j = i + 1
if j == len(tids):
end_tid = None
else:
end_tid = tids[j]
data = self.loadSerial(oid, start_tid)
return data, start_tid, end_tid
finally:
self._lock_release()
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
...@@ -163,8 +189,6 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -163,8 +189,6 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
oserial = serial oserial = serial
newserial=self._tid newserial=self._tid
self._tmp.append((oid, data)) self._tmp.append((oid, data))
now = time.time()
self._conflict_cache[(oid, newserial)] = data, now
return serial == oserial and newserial or ResolvedSerial return serial == oserial and newserial or ResolvedSerial
finally: finally:
self._lock_release() self._lock_release()
...@@ -238,6 +262,8 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage): ...@@ -238,6 +262,8 @@ class TemporaryStorage(BaseStorage, ConflictResolvingStorage):
index[oid] = serial index[oid] = serial
opickle[oid] = data opickle[oid] = data
now = time.time()
self._conflict_cache[(oid, serial)] = data, now
if zeros: if zeros:
for oid in zeros.keys(): for oid in zeros.keys():
......
...@@ -7,6 +7,11 @@ from ZODB.tests import StorageTestBase, BasicStorage, \ ...@@ -7,6 +7,11 @@ from ZODB.tests import StorageTestBase, BasicStorage, \
Synchronization, ConflictResolution, \ Synchronization, ConflictResolution, \
Corruption, RevisionStorage, MTStorage Corruption, RevisionStorage, MTStorage
from persistent import Persistent
import transaction
from ZODB.DB import DB
from ZODB.POSException import ReadConflictError
class TemporaryStorageTests( class TemporaryStorageTests(
StorageTestBase.StorageTestBase, StorageTestBase.StorageTestBase,
## RevisionStorage.RevisionStorage, # not a revision storage, but passes ## RevisionStorage.RevisionStorage, # not a revision storage, but passes
...@@ -49,6 +54,48 @@ class TemporaryStorageTests( ...@@ -49,6 +54,48 @@ class TemporaryStorageTests(
TemporaryStorage.CONFLICT_CACHE_GCEVERY = old_gcevery TemporaryStorage.CONFLICT_CACHE_GCEVERY = old_gcevery
TemporaryStorage.CONFLICT_CACHE_MAXAGE = old_maxage TemporaryStorage.CONFLICT_CACHE_MAXAGE = old_maxage
def doreadconflict(self, db, mvcc):
tm1 = transaction.TransactionManager()
conn = db.open(mvcc=mvcc, transaction_manager=tm1)
r1 = conn.root()
obj = MinPO('root')
r1["p"] = obj
obj = r1["p"]
obj.child1 = MinPO('child1')
tm1.get().commit()
# start a new transaction with a new connection
tm2 = transaction.TransactionManager()
cn2 = db.open(mvcc=mvcc, transaction_manager=tm2)
r2 = cn2.root()
self.assertEqual(r1._p_serial, r2._p_serial)
obj.child2 = MinPO('child2')
tm1.get().commit()
# resume the transaction using cn2
obj = r2["p"]
# An attempt to access obj.child1 should fail with an RCE
# below if conn isn't using mvcc, because r2 was read earlier
# in the transaction and obj was modified by the other
# transaction.
obj.child1
return obj
def checkWithoutMVCCRaisesReadConflict(self):
db = DB(self._storage)
self.assertRaises(ReadConflictError, self.doreadconflict, db, False)
def checkWithMVCCDoesntRaiseReadConflict(self):
db = DB(self._storage)
ob = self.doreadconflict(db, True)
self.assertEquals(ob.__class__, MinPO)
self.assertEquals(getattr(ob, 'child1', MinPO()).value, 'child1')
self.failIf(getattr(ob, 'child2', None))
def test_suite(): def test_suite():
suite = unittest.makeSuite(TemporaryStorageTests, 'check') suite = unittest.makeSuite(TemporaryStorageTests, 'check')
suite2 = unittest.makeSuite(Corruption.FileStorageCorruptTests, 'check') suite2 = unittest.makeSuite(Corruption.FileStorageCorruptTests, 'check')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment