Commit 1d9270b2 authored by Jeremy Hylton's avatar Jeremy Hylton

First, minimal MVCC tests.

parent b80dbcdf
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import bisect
import threading
import unittest
from ZODB.BaseStorage import BaseStorage
from ZODB import POSException
from ZODB.utils import z64
from ZODB.tests import StorageTestBase
from ZODB.tests \
import BasicStorage, MTStorage, Synchronization, PackableStorage, \
RevisionStorage
class Transaction(object):
"""Hold data for current transaction for MinimalMemoryStorage."""
def __init__(self, tid):
self.index = {}
self.tid = tid
def store(self, oid, data):
self.index[(oid, self.tid)] = data
def cur(self):
return dict.fromkeys([oid for oid, tid in self.index.keys()], self.tid)
class MinimalMemoryStorage(BaseStorage, object):
"""Simple in-memory storage that supports revisions.
This storage is needed to test multi-version concurrency control.
It is similar to MappingStorage, but keeps multiple revisions.
It does not support versions.
"""
def __init__(self):
super(MinimalMemoryStorage, self).__init__("name")
# _index maps oid, tid pairs to data records
self._index = {}
# _cur maps oid to current tid
self._cur = {}
def isCurrent(self, oid, serial):
return serial == self._cur[oid]
def __len__(self):
return len(self._index)
def _clear_temp(self):
pass
def loadEx(self, oid, version):
self._lock_acquire()
try:
assert not version
tid = self._cur[oid]
return self._index[(oid, tid)], tid, ""
finally:
self._lock_release()
def load(self, oid, version):
return self.loadEx(oid, version)[:2]
def _begin(self, tid, u, d, e):
self._txn = Transaction(tid)
def store(self, oid, serial, data, v, txn):
if txn is not self._transaction:
raise POSException.StorageTransactionError(self, txn)
assert not v
if self._cur.get(oid) != serial:
if not (serial is None or self._cur.get(oid) in [None, z64]):
raise POSException.ConflictError(
oid=oid, serials=(self._cur.get(oid), serial), data=data)
self._txn.store(oid, data)
return self._tid
def _abort(self):
del self._txn
def _finish(self, tid, u, d, e):
self._lock_acquire()
try:
self._index.update(self._txn.index)
self._cur.update(self._txn.cur())
self._ltid = self._tid
finally:
self._lock_release()
def lastTransaction(self):
return self._ltid
def loadBefore(self, the_oid, the_tid):
# It's okay if loadBefore() is really expensive, because this
# storage is just used for testing.
self._lock_acquire()
try:
tids = [tid for oid, tid in self._index if oid == the_oid]
if not tids:
raise KeyError, the_oid
tids.sort()
i = bisect.bisect_left(tids, the_tid) - 1
if i == -1:
return None
tid = tids[i]
j = i + 1
if j == len(tids):
end_tid = None
else:
end_tid = tids[j]
return self._index[(the_oid, tid)], tid, end_tid
finally:
self._lock_release()
def loadSerial(self, oid, serial):
self._lock_acquire()
try:
return self._index[(oid, serial)]
finally:
self._lock_release()
class MinimalTestSuite(StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
MTStorage.MTStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
RevisionStorage.RevisionStorage,
):
def setUp(self):
self._storage = MinimalMemoryStorage()
# we don't implement undo
def checkLoadBeforeUndo(self):
pass
def test_suite():
return unittest.makeSuite(MinimalTestSuite, "check")
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
r"""
Multi-version concurrency control tests
=======================================
Multi-version concurrency control (MVCC) exploits storages that store
multiple revisions of an object to avoid read conflicts. Normally
when an object is read from the storage, its most recent revisions is
read. Under MVCC, an older revision is read so that the transaction
sees a consistent view of the database.
ZODB guarantees execution-time consistency: A single transaction will
always see a consistent view of the database while it is executing.
If transaction A is running, has already read an object O1, and an
external transaction B modifies object O2, then transaction A can no
longer read the current revision of O2. It must either read the
version of O2 that is consistent with O1 or raise a ReadConflictError.
This note includes doctests that explain how MVCC is implemented (and
test that the implementation is correct). The tests use a
MinimalMemoryStorage that implements MVCC support, but not much else.
>>> from ZODB.tests.test_storage import MinimalMemoryStorage
>>> from ZODB import DB
>>> db = DB(MinimalMemoryStorage())
We will use two different connections with the experimental
setLocalTransaction() method to make sure that the connections act
independently, even though they'll be run from a single thread.
>>> cn1 = db.open()
>>> txn1 = cn1.setLocalTransaction()
The test will just use some MinPO objects. The next few lines just
setup an initial database state.
>>> from ZODB.tests.MinPO import MinPO
>>> r = cn1.root()
>>> r["a"] = MinPO(1)
>>> r["b"] = MinPO(1)
>>> txn1.commit()
Now open a second connection.
>>> cn2 = db.open()
>>> txn2 = cn2.setLocalTransaction()
The ZODB Connection tracks a transaction high-water mark, which
represents the latest transaction id that can be read by the current
transaction and still present a consistent view of the database. When
a transaction commits, the database sends invalidations to all the
other transactions; the invalidation contains the transaction id and
the oids of modified objects. The Connection stores the high-water
mark in _txn_time, which is set to None until an invalidation arrives.
>>> cn = db.open()
>>> cn._txn_time
>>> cn.invalidate(1, dict.fromkeys([1, 2]))
>>> cn._txn_time
1
>>> cn.invalidate(2, dict.fromkeys([1, 2]))
>>> cn._txn_time
1
The high-water mark is set to the transaction id of the first
transaction, because transaction ids must be monotonically increasing.
It is reset at transaction boundaries.
XXX We'd like simple abort and commit calls to make txn boundaries,
but that doesn't work unless an object is modified. sync() will abort
a transaction and process invalidations.
>>> cn.sync()
>>> cn._txn_time
The next bit of code includes a simple MVCC test. One transaction
will begin and modify "a." The other transaction will then modify "b"
and commit.
>>> r1 = cn1.root()
>>> r1["a"].value = 2
>>> cn1.getTransaction().commit()
>>> txn = db.lastTransaction()
The second connection has its high-water mark set now.
>>> cn2._txn_time == txn
True
It is safe to read "b," because it was not modified by the concurrent
transaction.
>>> r2 = cn2.root()
>>> r2["b"]._p_serial < cn2._txn_time
True
>>> r2["b"].value = 2
It is not safe, however, to read the current revision "a," because it
was modified at the high-water mark. If we read it, we'll get a
non-current version.
>>> r2["a"].value
1
>>> r2["a"]._p_serial < cn2._txn_time
True
We can confirm that we have a non-current revision by asking the
storage.
>>> db._storage.isCurrent(r2["a"]._p_oid, r2["a"]._p_serial)
False
It's possible to modify "a," but we get a conflict error when we
commit the transaction.
>>> r2["a"].value = 3
>>> txn2.commit()
Traceback (most recent call last):
...
ConflictError: database conflict error (oid 0000000000000001, class ZODB.tests.MinPO.MinPO)
"""
import doctest
def test_suite():
return doctest.DocTestSuite()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment