Commit 9b905b66 authored by Jeremy Hylton's avatar Jeremy Hylton

Bacport various cache consistency bug fixes from the ZODB3-3_1-branch.

parent 4a566117
...@@ -268,6 +268,15 @@ class ClientStorage: ...@@ -268,6 +268,15 @@ class ClientStorage:
self._oid_lock = threading.Lock() self._oid_lock = threading.Lock()
self._oids = [] # Object ids retrieved from new_oids() self._oids = [] # Object ids retrieved from new_oids()
# load() and tpc_finish() must be serialized to guarantee
# that cache modifications from each occur atomically.
# It also prevents multiple load calls occuring simultaneously,
# which simplifies the cache logic.
self._load_lock = threading.Lock()
# _load_oid and _load_status are protected by _lock
self._load_oid = None
self._load_status = None
# Can't read data in one thread while writing data # Can't read data in one thread while writing data
# (tpc_finish) in another thread. In general, the lock # (tpc_finish) in another thread. In general, the lock
# must prevent access to the cache while _update_cache # must prevent access to the cache while _update_cache
...@@ -696,20 +705,37 @@ class ClientStorage: ...@@ -696,20 +705,37 @@ class ClientStorage:
""" """
self._lock.acquire() # for atomic processing of invalidations self._lock.acquire() # for atomic processing of invalidations
try: try:
p = self._cache.load(oid, version) pair = self._cache.load(oid, version)
if p: if pair:
return p return pair
finally: finally:
self._lock.release() self._lock.release()
if self._server is None: if self._server is None:
raise ClientDisconnected() raise ClientDisconnected()
# If an invalidation for oid comes in during zeoLoad, that's OK self._load_lock.acquire()
# because we'll get oid's new state. try:
p, s, v, pv, sv = self._server.zeoLoad(oid) self._lock.acquire()
self._cache.checkSize(0) try:
self._cache.store(oid, p, s, v, pv, sv) self._load_oid = oid
self._load_status = 1
finally:
self._lock.release()
p, s, v, pv, sv = self._server.zeoLoad(oid)
self._lock.acquire() # for atomic processing of invalidations
try:
if self._load_status:
self._cache.checkSize(0)
self._cache.store(oid, p, s, v, pv, sv)
self._load_oid = None
finally:
self._lock.release()
finally:
self._load_lock.release()
if v and version and v == version: if v and version and v == version:
return pv, sv return pv, sv
else: else:
...@@ -864,22 +890,22 @@ class ClientStorage: ...@@ -864,22 +890,22 @@ class ClientStorage:
"""Storage API: finish a transaction.""" """Storage API: finish a transaction."""
if transaction is not self._transaction: if transaction is not self._transaction:
return return
self._load_lock.acquire()
try: try:
self._lock.acquire() # for atomic processing of invalidations self._lock.acquire() # for atomic processing of invalidations
try: try:
self._update_cache() self._update_cache()
if f is not None:
f()
finally: finally:
self._lock.release() self._lock.release()
if f is not None:
f()
tid = self._server.tpc_finish(self._serial) self._server.tpc_finish(self._serial)
self._cache.setLastTid(tid)
r = self._check_serials() r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
finally: finally:
self._load_lock.release()
self.end_transaction() self.end_transaction()
def _update_cache(self): def _update_cache(self):
...@@ -1000,15 +1026,17 @@ class ClientStorage: ...@@ -1000,15 +1026,17 @@ class ClientStorage:
# Invalidations are sent by the ZEO server as a sequence of # Invalidations are sent by the ZEO server as a sequence of
# oid, version pairs. The DB's invalidate() method expects a # oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids. # dictionary of oids.
self._lock.acquire() self._lock.acquire()
try: try:
# versions maps version names to dictionary of invalidations # versions maps version names to dictionary of invalidations
versions = {} versions = {}
for oid, version in invs: for oid, version in invs:
d = versions.setdefault(version, {}) if oid == self._load_oid:
self._load_status = 0
self._cache.invalidate(oid, version=version) self._cache.invalidate(oid, version=version)
d[oid] = 1 versions.setdefault(version, {})[oid] = 1
if self._db is not None: if self._db is not None:
for v, d in versions.items(): for v, d in versions.items():
self._db.invalidate(d, version=v) self._db.invalidate(d, version=v)
...@@ -1038,10 +1066,10 @@ class ClientStorage: ...@@ -1038,10 +1066,10 @@ class ClientStorage:
"""Invalidate objects modified by tid.""" """Invalidate objects modified by tid."""
self._cache.setLastTid(tid) self._cache.setLastTid(tid)
if self._pickler is not None: if self._pickler is not None:
self.log("Transactional invalidation during cache verification", log2(BLATHER,
level=zLOG.BLATHER) "Transactional invalidation during cache verification")
for t in args: for t in args:
self.self._pickler.dump(t) self._pickler.dump(t)
return return
self._process_invalidations(args) self._process_invalidations(args)
......
...@@ -53,7 +53,7 @@ class ClientStorage: ...@@ -53,7 +53,7 @@ class ClientStorage:
self.rpc.callAsync('endVerify') self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args): def invalidateTransaction(self, tid, args):
self.rpc.callAsync('invalidateTransaction', tid, args) self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
def serialnos(self, arg): def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg) self.rpc.callAsync('serialnos', arg)
......
...@@ -30,6 +30,7 @@ from ZEO.ClientStorage import ClientStorage ...@@ -30,6 +30,7 @@ from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import ClientDisconnected from ZEO.Exceptions import ClientDisconnected
from ZEO.zrpc.marshal import Marshaller from ZEO.zrpc.marshal import Marshaller
from ZEO.tests import forker from ZEO.tests import forker
from ZEO.tests.InvalidationTests import InvalidationTests
from ZODB.DB import DB from ZODB.DB import DB
from ZODB.Transaction import get_transaction, Transaction from ZODB.Transaction import get_transaction, Transaction
...@@ -198,7 +199,7 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -198,7 +199,7 @@ class CommonSetupTearDown(StorageTestBase):
self.fail("timed out waiting for storage to disconnect") self.fail("timed out waiting for storage to disconnect")
class ConnectionTests(CommonSetupTearDown): class ConnectionTests(CommonSetupTearDown, InvalidationTests):
"""Tests that explicitly manage the server process. """Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit To test the cache or re-connection, these test cases explicit
......
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from thread import get_ident
import threading
import time
from BTrees.check import check, display
from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread
from ZODB.DB import DB
from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError
import zLOG
class StressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
def testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
get_transaction().abort()
cn.sync()
key = self.startnum
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().note("add key %s" % key)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
else:
self.added_keys.append(key)
key += self.step
cn.close()
class VersionStressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
def log(self, msg):
zLOG.LOG("thread %d" % get_ident(), 0, msg)
def testrun(self):
self.log("thread begin")
commit = 0
key = self.startnum
while not self.stop.isSet():
version = "%s:%s" % (self.threadnum, key)
commit = not commit
self.log("attempt to add key=%s version=%s commit=%d" %
(key, version, commit))
if self.oneupdate(version, key, commit):
self.added_keys.append(key)
key += self.step
def oneupdate(self, version, key, commit=1):
# The mess of sleeps below were added to reduce the number
# of VersionLockErrors, based on empirical observation.
# It looks like the threads don't switch enough without
# the sleeps.
cn = self.db.open(version)
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
get_transaction().abort()
cn.sync()
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().note("add key %d" % key)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
break
except (VersionLockError, ReadConflictError, ConflictError), msg:
self.log(msg)
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
if self.sleep:
time.sleep(self.sleep)
try:
while not self.stop.isSet():
try:
if commit:
self.db.commitVersion(version)
get_transaction().note("commit version %s" % version)
else:
self.db.abortVersion(version)
get_transaction().note("abort version %s" % version)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
return commit
except ConflictError, msg:
self.log(msg)
get_transaction().abort()
cn.sync()
finally:
cn.close()
return 0
class InvalidationTests:
level = 2
DELAY = 15
def _check_tree(self, cn, tree):
# Make sure the BTree is sane and that all the updates persisted
retries = 3
while retries:
retries -= 1
try:
check(tree)
tree._check()
except ReadConflictError:
if retries:
get_transaction().abort()
cn.sync()
else:
raise
except:
display(tree)
raise
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
for t in threads:
# If the test didn't add any keys, it didn't do what we expected.
self.assert_(t.added_keys)
for key in t.added_keys:
self.assert_(tree.has_key(key), key)
def go(self, stop, *threads):
# Run the threads
for t in threads:
t.start()
time.sleep(self.DELAY)
stop.set()
for t in threads:
t.cleanup()
def checkConcurrentUpdates2Storages(self):
self._storage = storage1 = self.openClientStorage()
storage2 = self.openClientStorage(cache="2")
db1 = DB(storage1)
db2 = DB(storage2)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1)
t2 = StressThread(self, db2, stop, 2, 2)
self.go(stop, t1, t2)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2)
cn.close()
db1.close()
db2.close()
def checkConcurrentUpdates1Storage(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
self.go(stop, t1, t2)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2)
cn.close()
db1.close()
def checkConcurrentUpdates2StoragesMT(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
db2 = DB(self.openClientStorage(cache="2"))
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = StressThread(self, db1, stop, 1, 1, 3)
t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
def checkConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage(cache="2"))
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
...@@ -110,12 +110,9 @@ class MappingStorageTimeoutTests( ...@@ -110,12 +110,9 @@ class MappingStorageTimeoutTests(
test_classes = [FileStorageConnectionTests, test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests, FileStorageReconnectionTests,
FileStorageTimeoutTests] FileStorageTimeoutTests,
MappingStorageConnectionTests,
test_classes.extend( MappingStorageTimeoutTests]
[MappingStorageConnectionTests,
MappingStorageTimeoutTests])
import BDBStorage import BDBStorage
if BDBStorage.is_available: if BDBStorage.is_available:
......
...@@ -398,6 +398,14 @@ class Connection(smac.SizedMessageAsyncConnection): ...@@ -398,6 +398,14 @@ class Connection(smac.SizedMessageAsyncConnection):
self.send_call(method, args, ASYNC) self.send_call(method, args, ASYNC)
self.poll() self.poll()
def callAsyncNoPoll(self, method, *args):
# Like CallAsync but doesn't poll. This exists so that we can
# send invalidations atomically to all clients without
# allowing any client to sneak in a load request.
if self.closed:
raise DisconnectedError()
self.send_call(method, args, ASYNC)
# handle IO, possibly in async mode # handle IO, possibly in async mode
def _prepare_async(self): def _prepare_async(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment