Commit cd247d81 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.

Please make all future changes on the Zope-2_7-branch instead.
parent e0fe1bc8
......@@ -367,8 +367,14 @@ class ClientCache:
data = read(dlen)
self._trace(0x2A, oid, version, h[19:], dlen)
if (p < 0) != self._current:
# If the cache read we are copying has version info,
# we need to pass the header to copytocurrent().
if vlen:
vheader = read(vlen + 4)
else:
vheader = None
self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h,
oid, data)
oid, data, vheader)
return data, h[19:]
else:
self._trace(0x26, oid, version)
......@@ -412,12 +418,13 @@ class ClientCache:
"""
if self._pos + tlen > self._limit:
return # Don't let this cause a cache flip
assert len(header) == 27
assert len(header) == 27, len(header)
if header[8] == 'n':
# Rewrite the header to drop the version data.
# This shortens the record.
tlen = 31 + oidlen + dlen
vlen = 0
vheader = None
# (oidlen:2, reserved:6, status:1, tlen:4,
# vlen:2, dlen:4, serial:8)
header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
......@@ -446,7 +453,8 @@ class ClientCache:
l.append(vdata)
l.append(vserial)
else:
assert None is vheader is vdata is vserial
assert None is vheader is vdata is vserial, (
vlen, vheader, vdata, vserial)
l.append(header[9:13]) # copy of tlen
g = self._f[self._current]
g.seek(self._pos)
......
......@@ -78,7 +78,7 @@ disconnected_stub = DisconnectedServerStub()
MB = 1024**2
class ClientStorage:
class ClientStorage(object):
"""A Storage class that is a network client to a remote storage.
......@@ -129,6 +129,7 @@ class ClientStorage:
client -- A name used to construct persistent cache filenames.
Defaults to None, in which case the cache is not persistent.
See ClientCache for more info.
debug -- Ignored. This is present only for backwards
compatibility with ZEO 1.
......
......@@ -31,6 +31,9 @@ class CommitLog:
self.stores = 0
self.read = 0
def size(self):
return self.file.tell()
def store(self, oid, serial, data, version):
self.pickler.dump((oid, serial, data, version))
self.stores += 1
......
......@@ -82,6 +82,7 @@ class ZEOStorage:
self.read_only = read_only
self.locked = 0
self.verifying = 0
self.store_failed = 0
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
......@@ -367,6 +368,7 @@ class ZEOStorage:
self.txnlog = CommitLog()
self.tid = tid
self.status = status
self.store_failed = 0
self.stats.active_txns += 1
def tpc_finish(self, id):
......@@ -471,12 +473,14 @@ class ZEOStorage:
self.storage.tpc_begin(txn, tid, status)
def _store(self, oid, serial, data, version):
err = None
try:
newserial = self.storage.store(oid, serial, data, version,
self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
self.store_failed = 1
if isinstance(err, ConflictError):
self.stats.conflicts += 1
if not isinstance(err, TransactionError):
......@@ -503,9 +507,15 @@ class ZEOStorage:
if newserial == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.serials.append((oid, newserial))
return err is None
def _vote(self):
self.client.serialnos(self.serials)
# If a store call failed, then return to the client immediately.
# The serialnos() call will deliver an exception that will be
# handled by the client in its tpc_vote() method.
if self.store_failed:
return
return self.storage.tpc_vote(self.transaction)
def _abortVersion(self, src):
......@@ -554,11 +564,18 @@ class ZEOStorage:
def _restart(self, delay=None):
# Restart when the storage lock is available.
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=zLOG.BLATHER)
self._tpc_begin(self.transaction, self.tid, self.status)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
# load oid, serial, data, version
self._store(*loader.load())
if not self._store(*loader.load()):
break
resp = self._thunk()
if delay is not None:
delay.reply(resp)
......@@ -612,7 +629,7 @@ class StorageServer:
transaction_timeout=None,
monitor_address=None,
auth_protocol=None,
auth_filename=None,
auth_database=None,
auth_realm=None):
"""StorageServer constructor.
......@@ -659,7 +676,7 @@ class StorageServer:
auth_protocol -- The name of the authentication protocol to use.
Examples are "digest" and "srp".
auth_filename -- The name of the password database filename.
auth_database -- The name of the password database filename.
It should be in a format compatible with the authentication
protocol used; for instance, "sha" and "srp" require different
formats.
......@@ -685,7 +702,7 @@ class StorageServer:
s._waiting = []
self.read_only = read_only
self.auth_protocol = auth_protocol
self.auth_filename = auth_filename
self.auth_database = auth_database
self.auth_realm = auth_realm
self.database = None
if auth_protocol:
......@@ -739,7 +756,7 @@ class StorageServer:
# storages, avoiding the need to bloat each with a new authenticator
# Database that would contain the same info, and also avoiding any
# possibly synchronization issues between them.
self.database = db_class(self.auth_filename)
self.database = db_class(self.auth_database)
if self.database.realm != self.auth_realm:
raise ValueError("password database realm %r "
"does not match storage realm %r"
......
......@@ -82,8 +82,7 @@ class CommitLockTests:
# The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is contention for a
# single storage. There are a lot of cases to cover. transaction
# has finished.
# single storage. There are a lot of cases to cover.
# The general flow of these tests is to start a transaction by
# getting far enough into 2PC to acquire the commit lock. Then
......
......@@ -100,6 +100,8 @@ class CommonSetupTearDown(StorageTestBase):
if getattr(self, '_storage', None) is not None:
self._storage.close()
if hasattr(self._storage, 'cleanup'):
zLOG.LOG("testZEO", zLOG.DEBUG, "cleanup storage %s" %
self._storage.__name__)
self._storage.cleanup()
for adminaddr in self._servers:
if adminaddr is not None:
......@@ -141,9 +143,14 @@ class CommonSetupTearDown(StorageTestBase):
def getConfig(self, path, create, read_only):
raise NotImplementedError
def openClientStorage(self, cache='', cache_size=200000, wait=1,
cache_id = 1
def openClientStorage(self, cache=None, cache_size=200000, wait=1,
read_only=0, read_only_fallback=0,
username=None, password=None, realm=None):
if cache is None:
cache = str(self.__class__.cache_id)
self.__class__.cache_id += 1
self.caches.append(cache)
storage = TestClientStorage(self.addr,
client=cache,
......@@ -566,6 +573,70 @@ class ConnectionTests(CommonSetupTearDown):
db2.close()
db1.close()
class InvqTests(CommonSetupTearDown):
invq = 2
def checkQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
def checkVerificationWith2ClientsInvqOverflow(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
# the test code sets invq bound to 2
for i in range(5):
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
t = time.time() + 30
while not perstorage.end_verify.isSet():
perstorage.sync()
if time.time() > t:
self.fail("timed out waiting for endVerify")
self.assertEqual(self._storage.load(oid, '')[1], revid)
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
class ReconnectionTests(CommonSetupTearDown):
# The setUp() starts a server automatically. In order for its
# state to persist, we set the class variable keep to 1. In
......@@ -688,7 +759,7 @@ class ReconnectionTests(CommonSetupTearDown):
self._newAddr()
# Start a read-only server
self.startServer(create=0, index=0, read_only=1)
self.startServer(create=0, index=0, read_only=1, keep=0)
# Start a client in fallback mode
self._storage = self.openClientStorage(read_only_fallback=1)
# Stores should fail here
......@@ -756,69 +827,6 @@ class ReconnectionTests(CommonSetupTearDown):
perstorage.close()
self._storage.close()
def checkQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
def checkVerificationWith2ClientsInvqOverflow(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
# the test code sets invq bound to 2
for i in range(5):
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
t = time.time() + 30
while not perstorage.end_verify.isSet():
perstorage.sync()
if time.time() > t:
self.fail("timed out waiting for endVerify")
self.assertEqual(self._storage.load(oid, '')[1], revid)
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
class TimeoutTests(CommonSetupTearDown):
timeout = 1
......
This diff is collapsed.
......@@ -32,14 +32,12 @@ class ClientCacheTests(unittest.TestCase):
_oid3 = 'cdefghij'
def setUp(self):
unittest.TestCase.setUp(self)
self.cachesize = 10*1000*1000
self.cache = ClientCache(size=self.cachesize)
self.cache.open()
def tearDown(self):
self.cache.close()
unittest.TestCase.tearDown(self)
def testOpenClose(self):
pass # All the work is done by setUp() / tearDown()
......@@ -281,9 +279,10 @@ class ClientCacheTests(unittest.TestCase):
class PersistentClientCacheTests(unittest.TestCase):
_oid = 'abcdefgh'
_oid2 = 'bcdefghi'
_oid3 = 'cdefghij'
def setUp(self):
unittest.TestCase.setUp(self)
self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
self.cachesize = 10*1000*1000
self.storagename = 'foo'
......@@ -319,7 +318,6 @@ class PersistentClientCacheTests(unittest.TestCase):
os.unlink(filename)
except os.error:
pass
unittest.TestCase.tearDown(self)
def testCacheFileSelection(self):
# A bug in __init__ read the wrong slice of the file to determine
......@@ -388,6 +386,41 @@ class PersistentClientCacheTests(unittest.TestCase):
cache.checkSize(10*self.cachesize) # Force a file flip
self.failUnless(cache.getLastTid() is None)
def testLoadNonversionWithVersionInFlippedCache(self):
# This test provokes an error seen once in an unrelated test.
# The object is stored in the old cache file with version data,
# a load for non-version data occurs. The attempt to copy the
# non-version data to the new file fails.
nvdata = "Mend your speech a little, lest it may mar your fortunes."
nvserial = "12345678"
version = "folio"
vdata = "Mend your speech a little, lest you may mar your fortunes."
vserial = "12346789"
self.cache.store(self._oid, nvdata, nvserial, version, vdata, vserial)
self.cache.checkSize(10 * self.cachesize) # force a cache flip
for i in 1, 2: # check the we can load before and after copying
for xversion, xdata, xserial in [("", nvdata, nvserial),
(version, vdata, vserial)]:
data, serial = self.cache.load(self._oid, xversion)
self.assertEqual(data, xdata)
self.assertEqual(serial, xserial)
# now cause two more cache flips and make sure the data is still there
self.cache.store(self._oid2, "", "", "foo", "bar", "23456789")
self.cache.checkSize(10 * self.cachesize) # force a cache flip
self.cache.load(self._oid, "")
self.cache.store(self._oid3, "bar", "34567890", "", "", "")
self.cache.checkSize(10 * self.cachesize) # force a cache flip
self.cache.load(self._oid, "")
for i in 1, 2: # check the we can load before and after copying
for xversion, xdata, xserial in [("", nvdata, nvserial),
(version, vdata, vserial)]:
data, serial = self.cache.load(self._oid, xversion)
self.assertEqual(data, xdata)
self.assertEqual(serial, xserial)
class ClientCacheLongOIDTests(ClientCacheTests):
_oid = 'abcdefghijklmnop' * 2
......@@ -397,7 +430,8 @@ class ClientCacheLongOIDTests(ClientCacheTests):
class PersistentClientCacheLongOIDTests(PersistentClientCacheTests):
_oid = 'abcdefghijklmnop' * 2
_oid2 = 'bcdefghijklmnopq' * 2
_oid3 = 'cdefghijklmnopqr' * 2
def test_suite():
suite = unittest.TestSuite()
......
......@@ -38,7 +38,7 @@ class BerkeleyStorageConfig:
def getConfig(self, path, create, read_only):
return """\
<fullstorage 1>
name %s
envdir %s
read-only %s
</fullstorage>""" % (path, read_only and "yes" or "no")
......@@ -57,19 +57,25 @@ class FileStorageConnectionTests(
class FileStorageReconnectionTests(
FileStorageConfig,
ConnectionTests.ReconnectionTests
ConnectionTests.ReconnectionTests,
):
"""FileStorage-specific re-connection tests."""
# Run this at level 1 because MappingStorage can't do reconnection tests
level = 1
class FileStorageInvqTests(
FileStorageConfig,
ConnectionTests.InvqTests
):
"""FileStorage-specific invalidation queue tests."""
level = 1
class FileStorageTimeoutTests(
FileStorageConfig,
ConnectionTests.TimeoutTests
):
level = 2
class BDBConnectionTests(
BerkeleyStorageConfig,
ConnectionTests.ConnectionTests,
......@@ -85,6 +91,13 @@ class BDBReconnectionTests(
"""Berkeley storage re-connection tests."""
level = 2
class BDBInvqTests(
BerkeleyStorageConfig,
ConnectionTests.InvqTests
):
"""Berkeley storage invalidation queue tests."""
level = 2
class BDBTimeoutTests(
BerkeleyStorageConfig,
ConnectionTests.TimeoutTests
......@@ -112,22 +125,19 @@ class MappingStorageTimeoutTests(
test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests,
FileStorageInvqTests,
FileStorageTimeoutTests,
MappingStorageConnectionTests,
MappingStorageTimeoutTests]
import BDBStorage
if BDBStorage.is_available:
test_classes.append(BDBConnectionTests)
test_classes.append(BDBReconnectionTests)
test_classes.append(BDBTimeoutTests)
test_classes += [BDBConnectionTests,
BDBReconnectionTests,
BDBInvqTests,
BDBTimeoutTests]
def test_suite():
# shutup warnings about mktemp
import warnings
warnings.filterwarnings("ignore", "mktemp")
suite = unittest.TestSuite()
for klass in test_classes:
sub = unittest.makeSuite(klass, 'check')
......
......@@ -187,7 +187,7 @@ class BDBTests(FileStorageTests):
self._envdir = tempfile.mktemp()
return """\
<fullstorage 1>
name %s
envdir %s
</fullstorage>
""" % self._envdir
......
......@@ -122,9 +122,9 @@ class Suicide(threading.Thread):
self._adminaddr = addr
def run(self):
# If this process doesn't exit in 100 seconds, commit suicide
for i in range(20):
time.sleep(5)
# If this process doesn't exit in 300 seconds, commit suicide
time.sleep(300)
log("zeoserver", "suicide thread invoking shutdown")
from ZEO.tests.forker import shutdown_zeo_server
# XXX If the -k option was given to zeoserver, then the process will
# go away but the temp files won't get cleaned up.
......@@ -174,7 +174,7 @@ def main():
transaction_timeout=zo.transaction_timeout,
monitor_address=mon_addr,
auth_protocol=zo.auth_protocol,
auth_filename=zo.auth_database,
auth_database=zo.auth_database,
auth_realm=zo.auth_realm)
try:
......
......@@ -28,7 +28,7 @@ from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection
class ConnectionManager:
class ConnectionManager(object):
"""Keeps a connection up over time"""
def __init__(self, addrs, client, tmin=1, tmax=180):
......
......@@ -67,7 +67,7 @@ class MTDelay(Delay):
self.ready.wait()
Delay.error(self, exc_info)
class Connection(smac.SizedMessageAsyncConnection):
class Connection(smac.SizedMessageAsyncConnection, object):
"""Dispatcher for RPC on object on both sides of socket.
The connection supports synchronous calls, which expect a return,
......
......@@ -13,24 +13,29 @@
##############################################################################
"""Handy standard storage machinery
$Id: BaseStorage.py,v 1.34 2003/06/10 15:46:31 shane Exp $
$Id: BaseStorage.py,v 1.35 2003/09/15 16:29:15 jeremy Exp $
"""
import cPickle
import ThreadLock, bpthread
import time, UndoLogCompatible
import POSException
from TimeStamp import TimeStamp
z64='\0'*8
import time
class BaseStorage(UndoLogCompatible.UndoLogCompatible):
import ThreadLock
import zLOG
from ZODB import bpthread
from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
from ZODB.UndoLogCompatible import UndoLogCompatible
from ZODB.utils import z64
class BaseStorage(UndoLogCompatible):
_transaction=None # Transaction that is being committed
_serial=z64 # Transaction serial number
_tstatus=' ' # Transaction status, used for copying data
_is_read_only = 0
def __init__(self, name, base=None):
self.__name__=name
self.__name__= name
zLOG.LOG(self.__class__.__name__, zLOG.DEBUG,
"create storage %s" % self.__name__)
# Allocate locks:
l=ThreadLock.allocate_lock()
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.98 2003/06/13 21:53:08 jeremy Exp $"""
$Id: Connection.py,v 1.99 2003/09/15 16:29:15 jeremy Exp $"""
from __future__ import nested_scopes
......@@ -47,7 +47,7 @@ def updateCodeTimestamp():
ExtensionKlass = Base.__class__
class Connection(ExportImport.ExportImport):
class Connection(ExportImport.ExportImport, object):
"""Object managers for individual object space.
An object space is a version of collection of objects. In a
......@@ -136,11 +136,10 @@ class Connection(ExportImport.ExportImport):
# Explicitly remove references from the connection to its
# cache and to the root object, because they refer back to the
# connection.
if self._cache is not None:
self._cache.clear()
self._cache = None
self._incrgc = None
self.cacheGC = None
self._root_ = None
def __getitem__(self, oid, tt=type(())):
obj = self._cache.get(oid, None)
......@@ -176,8 +175,6 @@ class Connection(ExportImport.ExportImport):
object._p_serial=serial
self._cache[oid] = object
if oid=='\0\0\0\0\0\0\0\0':
self._root_=object # keep a ref
return object
def _persistent_load(self,oid,
......@@ -279,6 +276,7 @@ class Connection(ExportImport.ExportImport):
self.__onCloseCallbacks.append(f)
def close(self):
if self._incrgc is not None:
self._incrgc() # This is a good time to do some GC
# Call the close callbacks.
......
......@@ -13,8 +13,8 @@
##############################################################################
"""Database objects
$Id: DB.py,v 1.53 2003/06/24 21:50:18 jeremy Exp $"""
__version__='$Revision: 1.53 $'[11:-2]
$Id: DB.py,v 1.54 2003/09/15 16:29:15 jeremy Exp $"""
__version__='$Revision: 1.54 $'[11:-2]
import cPickle, cStringIO, sys, POSException, UndoLogCompatible
from Connection import Connection
......@@ -32,7 +32,7 @@ def list2dict(L):
d[elt] = 1
return d
class DB(UndoLogCompatible.UndoLogCompatible):
class DB(UndoLogCompatible.UndoLogCompatible, object):
"""The Object Database
The Object database coordinates access to and interaction of one
......
......@@ -79,7 +79,7 @@ method::
and call it to monitor the storage.
"""
__version__='$Revision: 1.19 $'[11:-2]
__version__='$Revision: 1.20 $'[11:-2]
import base64, time, string
from ZODB import POSException, BaseStorage, utils
......
This diff is collapsed.
......@@ -15,7 +15,7 @@
static char TimeStamp_module_documentation[] =
"Defines 64-bit TimeStamp objects used as ZODB serial numbers.\n"
"\n"
"\n$Id: TimeStamp.c,v 1.19 2003/06/20 18:38:24 tim_one Exp $\n";
"\n$Id: TimeStamp.c,v 1.20 2003/09/15 16:29:15 jeremy Exp $\n";
#ifdef USE_EXTENSION_CLASS
#include "ExtensionClass.h"
......
......@@ -47,7 +47,7 @@
-->
<sectiontype name="fullstorage" datatype=".BDBFullStorage"
implements="ZODB.storage">
<key name="name" required="yes" />
<key name="envdir" required="yes" />
<key name="interval" datatype="time-interval" default="2m" />
<key name="kbyte" datatype="integer" default="0" />
<key name="min" datatype="integer" default="0" />
......@@ -55,7 +55,7 @@
<key name="cachesize" datatype="byte-size" default="128MB" />
<key name="frequency" datatype="time-interval" default="0" />
<key name="packtime" datatype="time-interval" default="4h" />
<key name="classicpack" datatype="integer" default="0" />
<key name="gcpack" datatype="integer" default="0" />
<key name="read-only" datatype="boolean" default="off"/>
</sectiontype>
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Open database and storage from a configuration.
$Id: config.py,v 1.13 2003/06/16 14:51:49 jeremy Exp $"""
$Id: config.py,v 1.14 2003/09/15 16:29:15 jeremy Exp $"""
import os
from cStringIO import StringIO
......@@ -157,7 +157,7 @@ class BDBStorage(BaseConfig):
if name.startswith('_'):
continue
setattr(bconf, name, getattr(self.config, name))
return storageclass(self.config.name, config=bconf)
return storageclass(self.config.envdir, config=bconf)
class BDBMinimalStorage(BDBStorage):
......
......@@ -33,7 +33,7 @@ import struct
from types import StringType
from ZODB.referencesf import referencesf
from ZODB.utils import p64, u64, z64
from ZODB.utils import p64, u64, z64, oid_repr
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
try:
......@@ -54,7 +54,7 @@ class CorruptedDataError(CorruptedError):
def __str__(self):
if self.oid:
msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid),
msg = "Error reading oid %s. Found %r" % (oid_repr(self.oid),
self.buf)
else:
msg = "Error reading unknown oid. Found %r" % self.buf
......@@ -166,7 +166,7 @@ class FileStorageFormatter:
def checkTxn(self, th, pos):
if th.tid <= self.ltid:
self.fail(pos, "time-stamp reduction: %s <= %s",
_fmt_oid(th.tid), _fmt_oid(self.ltid))
oid_repr(th.tid), oid_repr(self.ltid))
self.ltid = th.tid
if th.status == "c":
self.fail(pos, "transaction with checkpoint flag set")
......@@ -647,11 +647,15 @@ class FileStoragePacker(FileStorageFormatter):
# vindex: version -> pos of XXX
# tindex: oid -> pos, for current txn
# tvindex: version -> pos of XXX, for current txn
# oid2serial: not used by the packer
self.index = fsIndex()
self.vindex = {}
self.tindex = {}
self.tvindex = {}
self.oid2serial = {}
self.toid2serial = {}
self.toid2serial_delete = {}
# Index for non-version data. This is a temporary structure
# to reduce I/O during packing
......@@ -757,7 +761,7 @@ class FileStoragePacker(FileStorageFormatter):
If any data records are copied, also write txn header (th).
"""
copy = 0
new_tpos = 0
new_tpos = 0L
tend = pos + th.tlen
pos += th.headerlen()
while pos < tend:
......
......@@ -381,6 +381,60 @@ class PackableStorage(PackableStorageBase):
eq(root['obj'].value, 7)
def _PackWhileWriting(self, pack_now=0):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
if pack_now:
db.pack(time.time())
else:
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# Iterate over the storage to make sure it's sane, but not every
# storage supports iterators.
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=0)
def checkPackNowWhileWriting(self):
self._PackWhileWriting(pack_now=1)
def checkPackUndoLog(self):
self._initroot()
eq = self.assertEqual
......@@ -450,47 +504,6 @@ class PackableStorage(PackableStorageBase):
for r in self._storage.undoLog(): print r
# what can we assert about that?
def checkPackWhileWriting(self):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# iterator over the storage to make sure it's sane
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
class ClientThread(threading.Thread):
def __init__(self, db):
......
......@@ -15,7 +15,7 @@
from ZODB.Transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB
from ZODB.referencesf import referencesf
......@@ -154,3 +154,31 @@ class RecoveryStorage(IteratorDeepCompare):
it.close()
self._dst.tpc_vote(final)
self._dst.tpc_finish(final)
def checkPackWithGCOnDestinationAfterRestore(self):
raises = self.assertRaises
db = DB(self._storage)
conn = db.open()
root = conn.root()
root.obj = obj1 = MinPO(1)
txn = get_transaction()
txn.note('root -> obj')
txn.commit()
root.obj.obj = obj2 = MinPO(2)
txn = get_transaction()
txn.note('root -> obj -> obj')
txn.commit()
del root.obj
txn = get_transaction()
txn.note('root -X->')
txn.commit()
# Now copy the transactions to the destination
self._dst.copyTransactionsFrom(self._storage)
# Now pack the destination.
snooze()
self._dst.pack(time.time(), referencesf)
# And check to see that the root object exists, but not the other
# objects.
data, serial = self._dst.load(root._p_oid, '')
raises(KeyError, self._dst.load, obj1._p_oid, '')
raises(KeyError, self._dst.load, obj2._p_oid, '')
......@@ -488,6 +488,7 @@ class VersionStorage:
root["d"] = MinPO("d")
get_transaction().commit()
snooze()
self._storage.pack(time.time(), referencesf)
cn.sync()
......
......@@ -185,9 +185,7 @@ class LRUCacheTests(CacheTestBase):
self.assertEquals(len(details), CONNS)
for d in details:
self.assertEquals(d['ngsize'], CACHE_SIZE)
# the root is also in the cache as ghost, because
# the connection holds a reference to it
self.assertEquals(d['size'], CACHE_SIZE + 1)
self.assertEquals(d['size'], CACHE_SIZE)
def checkDetail(self):
CACHE_SIZE = 10
......
......@@ -123,7 +123,7 @@ class BDBConfigTest(ConfigTestBase):
cfg = """
<zodb>
<fullstorage>
name %s
envdir %s
</fullstorage>
</zodb>
""" % self._path
......@@ -133,7 +133,7 @@ class BDBConfigTest(ConfigTestBase):
cfg = """
<zodb>
<minimalstorage>
name %s
envdir %s
</minimalstorage>
</zodb>
""" % self._path
......
......@@ -88,7 +88,7 @@ class FileStorageTests(
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}
return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
......@@ -113,7 +113,7 @@ class FileStorageTests(
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}
return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
......
This directory contains a collect of utilities for managing ZODB
databases. Some are more useful than others. If you install ZODB
using distutils ("python setup.py install"), fsdump.py, fstest.py,
repozo.py, and zeopack.py will be installed in /usr/local/bin.
Unless otherwise noted, these scripts are invoked with the name of the
Data.fs file as their only argument. Example: checkbtrees.py data.fs.
analyze.py -- A transaction analyzer for FileStorage
Reports on the data in a FileStorage. The report is organized by
class. It shows total data, as well as separate reports for current
and historical revisions of objects.
checkbtrees.py -- Checks BTrees in a FileStorage for corruption.
Attempts to find all the BTrees contained in a Data.fs and calls their
_check() methods.
fsdump.py -- Summarize FileStorage contents, one line per revision.
Prints a report of FileStorage contents, with one line for each
transaction and one line for each data record in that transaction.
Includes time stamps, file positions, and class names.
fstest.py -- Simple consistency checker for FileStorage
usage: fstest.py [-v] data.fs
The fstest tool will scan all the data in a FileStorage and report an
error if it finds any corrupt transaction data. The tool will print a
message when the first error is detected an exit.
The tool accepts one or more -v arguments. If a single -v is used, it
will print a line of text for each transaction record it encounters.
If two -v arguments are used, it will also print a line of text for
each object. The objects for a transaction will be printed before the
transaction itself.
Note: It does not check the consistency of the object pickles. It is
possible for the damage to occur only in the part of the file that
stores object pickles. Those errors will go undetected.
netspace.py -- Hackish attempt to report on size of objects
usage: netspace.py [-P | -v] data.fs
-P: do a pack first
-v: print info for all objects, even if a traversal path isn't found
Traverses objects from the database root and attempts to calculate
size of object, including all reachable subobjects.
parsezeolog.py -- Parse BLATHER logs from ZEO server.
This script may be obsolete. It has not been tested against the
current log output of the ZEO server.
Reports on the time and size of transactions committed by a ZEO
server, by inspecting log messages at BLATHER level.
repozo.py -- Incremental backup utility for FileStorage.
Run the script with the -h option to see usage details.
timeout.py -- Script to test transaction timeout
usage: timeout.py address delay [storage-name]
This script connects to a storage, begins a transaction, calls store()
and tpc_vote(), and then sleeps forever. This should trigger the
transaction timeout feature of the server.
zeopack.py -- Script to pack a ZEO server.
The script connects to a server and calls pack() on a specific
storage. See the script for usage details.
zeoreplay.py -- Experimental script to replay transactions from a ZEO log.
Like parsezeolog.py, this may be obsolete because it was written
against an earlier version of the ZEO server. See the script for
usage details.
zeoup.py
Usage: zeoup.py [options]
The test will connect to a ZEO server, load the root object, and
attempt to update the zeoup counter in the root. It will report
success if it updates to counter or if it gets a ConflictError. A
ConflictError is considered a success, because the client was able to
start a transaction.
See the script for details about the options.
zodbload.py - exercise ZODB under a heavy synthesized Zope-like load
See the module docstring for details. Note that this script requires
Zope. New in ZODB3 3.1.4.
zeoserverlog.py - analyze ZEO server log for performance statistics
See the module docstring for details; there are a large number of
options. New in ZODB3 3.1.4.
\ No newline at end of file
#! /usr/bin/env python
#!python
# Based on a transaction analyzer by Matt Kromer.
import pickle
......@@ -137,4 +137,3 @@ def analyze_rec(report, record):
if __name__ == "__main__":
path = sys.argv[1]
report(analyze(path))
#! /usr/bin/env python
#!python
"""Check the consistency of BTrees in a Data.fs
usage: checkbtrees.py data.fs
Try to find all the BTrees in a Data.fs and call their _check() methods.
Try to find all the BTrees in a Data.fs, call their _check() methods,
and run them through BTrees.check.check().
"""
from types import IntType
import ZODB
from ZODB.FileStorage import FileStorage
from BTrees.check import check
# Set of oids we've already visited. Since the object structure is
# a general graph, this is needed to prevent unbounded paths in the
# presence of cycles. It's also helpful in eliminating redundant
# checking when a BTree is pointed to by many objects.
oids_seen = {}
# Append (obj, path) to L if and only if obj is a persistent object
# and we haven't seen it before.
def add_if_new_persistent(L, obj, path):
global oids_seen
def add_if_persistent(L, obj, path):
getattr(obj, '_', None) # unghostify
if hasattr(obj, '_p_oid'):
oid = obj._p_oid
if not oids_seen.has_key(oid):
L.append((obj, path))
oids_seen[oid] = 1
def get_subobjects(obj):
getattr(obj, '_', None) # unghostify
......@@ -54,7 +69,7 @@ def main(fname):
cn = ZODB.DB(fs).open()
rt = cn.root()
todo = []
add_if_persistent(todo, rt, '')
add_if_new_persistent(todo, rt, '')
found = 0
while todo:
......@@ -75,6 +90,13 @@ def main(fname):
print msg
print "*" * 60
try:
check(obj)
except AssertionError, msg:
print "*" * 60
print msg
print "*" * 60
if found % 100 == 0:
cn.cacheMinimize()
......@@ -84,7 +106,7 @@ def main(fname):
newpath = "%s%s" % (path, k)
else:
newpath = "%s.%s" % (path, k)
add_if_persistent(todo, v, newpath)
add_if_new_persistent(todo, v, newpath)
print "total", len(fs._index), "found", found
......
#! /usr/bin/env python
#!python
##############################################################################
#
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
......
#!python
"""Report on the net size of objects counting subobjects.
usage: netspace.py [-P | -v] data.fs
......
#!python
"""Parse the BLATHER logging generated by ZEO2.
An example of the log format is:
......
#!/usr/bin/env python
#!python
# repozo.py -- incremental and full backups of a Data.fs file.
#
# Originally written by Anthony Baxter
# Significantly modified by Barry Warsaw
#
# TODO:
# allow gzipping of backup files.
# allow backup files in subdirectories.
"""repozo.py -- incremental and full backups of a Data.fs file.
Usage: %(program)s [options]
Where:
Exactly one of -B or -R must be specified:
-B / --backup
backup current ZODB file
Backup current ZODB file.
-R / --recover
restore a ZODB file from a backup
Restore a ZODB file from a backup.
-v / --verbose
Verbose mode
Verbose mode.
-h / --help
Print this text and exit
Print this text and exit.
-r dir
--repository=dir
Repository directory containing the backup files
Repository directory containing the backup files. This argument
is required.
Flags for --backup:
Options for -B/--backup:
-f file
--file=file
Source Data.fs file
Source Data.fs file. This argument is required.
-F / --full
Force a full backup
Force a full backup. By default, an incremental backup is made
if possible (e.g., if a pack has occurred since the last
incremental backup, a full backup is necessary).
-Q / --quick
Verify via md5 checksum only the last incremental written. This
significantly reduces the disk i/o at the (theoretical) cost of
inconsistency.
inconsistency. This is a probabilistic way of determining whether
a full backup is necessary.
-z / --gzip
Compress with gzip the backup files. Uses the default zlib
compression level.
compression level. By default, gzip compression is not used.
Flags for --recover:
Options for -R/--recover:
-D str
--date=str
Recover state as at this date. str is in the format
Recover state as of this date. str is in the format
yyyy-mm-dd[-hh[-mm]]
By default, current time is used.
-o file
--output=file
Write recovered ZODB to given file. If not given, the file will be
-o filename
--output=filename
Write recovered ZODB to given file. By default, the file is
written to stdout.
One of --backup or --recover is required.
"""
from __future__ import nested_scopes
......@@ -120,14 +121,14 @@ def parseargs():
usage(1, msg)
class Options:
mode = None
file = None
repository = None
full = False
date = None
output = None
quick = False
gzip = False
mode = None # BACKUP or RECOVER
file = None # name of input Data.fs file
repository = None # name of directory holding backups
full = False # True forces full backup
date = None # -D argument, if any
output = None # where to write recovered data; None = stdout
quick = False # -Q flag state
gzip = False # -z flag state
options = Options()
......@@ -158,6 +159,8 @@ def parseargs():
options.output = arg
elif opt in ('-z', '--gzip'):
options.gzip = True
else:
assert False, (opt, arg)
# Any other arguments are invalid
if args:
......@@ -184,20 +187,26 @@ def parseargs():
# Do something with a run of bytes from a file
# Read bytes (no more than n, or to EOF if n is None) in chunks from the
# current position in file fp. Pass each chunk as an argument to func().
# Return the total number of bytes read == the total number of bytes
# passed in all to func(). Leaves the file position just after the
# last byte read.
def dofile(func, fp, n=None):
bytesread = 0
stop = False
chunklen = READCHUNK
while not stop:
if n is not None and chunklen + bytesread > n:
chunklen = n - bytesread
stop = True
data = fp.read(chunklen)
bytesread = 0L
while n is None or n > 0:
if n is None:
todo = READCHUNK
else:
todo = min(READCHUNK, n)
data = fp.read(todo)
if not data:
break
func(data)
bytesread += len(data)
nread = len(data)
bytesread += nread
if n is not None:
n -= nread
return bytesread
......@@ -223,9 +232,10 @@ def copyfile(options, dst, start, n):
def func(data):
sum.update(data)
ofp.write(data)
dofile(func, ifp, n)
ndone = dofile(func, ifp, n)
ofp.close()
ifp.close()
assert ndone == n
return sum.hexdigest()
......@@ -296,30 +306,34 @@ def find_files(options):
log('no files found')
return needed
# Scan the .dat file corresponding to the last full backup performed.
# Return
#
# filename, startpos, endpos, checksum
#
# of the last incremental. If there is no .dat file, or the .dat file
# is empty, return
#
# None, None, None, None
def scandat(repofiles):
# Scan the .dat file corresponding to the last full backup performed.
# Return the filename, startpos, endpos, and sum of the last incremental.
# If all is a list, then append file name and md5sums to the list.
fullfile = repofiles[0]
datfile = os.path.splitext(fullfile)[0] + '.dat'
# If the .dat file is missing, we have to do a full backup
fn = startpos = endpos = sum = None
fn = startpos = endpos = sum = None # assume .dat file missing or empty
try:
fp = open(datfile)
except IOError, e:
if e.errno <> errno.ENOENT:
raise
else:
while True:
line = fp.readline()
if not line:
break
# We only care about the last one
fn, startpos, endpos, sum = line.split()
# We only care about the last one.
lines = fp.readlines()
fp.close()
if lines:
fn, startpos, endpos, sum = lines[-1].split()
startpos = long(startpos)
endpos = long(endpos)
return fn, startpos, endpos, sum
......@@ -364,7 +378,7 @@ def do_incremental_backup(options, reposz, repofiles):
print >> sys.stderr, 'Cannot overwrite existing file:', dest
sys.exit(2)
log('writing incremental: %s bytes to %s', pos-reposz, dest)
sum = copyfile(options, dest, reposz, pos)
sum = copyfile(options, dest, reposz, pos - reposz)
# The first file in repofiles points to the last full backup. Use this to
# get the .dat file and append the information for this incrementatl to
# that file.
......@@ -398,14 +412,18 @@ def do_backup(options):
return
# Now check the md5 sum of the source file, from the last
# incremental's start and stop positions.
srcfp = open(options.file)
srcfp = open(options.file, 'rb')
srcfp.seek(startpos)
srcsum = checksum(srcfp, endpos-startpos)
srcfp.close()
log('last incremental file: %s', fn)
log('last incremental checksum: %s', sum)
log('source checksum range: [%s..%s], sum: %s',
startpos, endpos, srcsum)
if sum == srcsum:
if srcsz == endpos:
log('No changes, nothing to do')
return
log('doing incremental, starting at: %s', endpos)
do_incremental_backup(options, endpos, repofiles)
return
......@@ -421,7 +439,7 @@ def do_backup(options):
# Get the md5 checksum of the source file, up to two file positions:
# the entire size of the file, and up to the file position of the last
# incremental backup.
srcfp = open(options.file)
srcfp = open(options.file, 'rb')
srcsum = checksum(srcfp, srcsz)
srcfp.seek(0)
srcsum_backedup = checksum(srcfp, reposz)
......
#! /usr/bin/env python
"""Report on the space used by objects in a storage.
usage: space.py data.fs
The current implementation only supports FileStorage.
Current limitations / simplifications: Ignores revisions and versions.
"""
import ZODB
from ZODB.FileStorage import FileStorage
from ZODB.utils import U64
from ZODB.fsdump import get_pickle_metadata
def run(path, v=0):
fs = FileStorage(path, read_only=1)
# break into the file implementation
if hasattr(fs._index, 'iterkeys'):
iter = fs._index.iterkeys()
else:
iter = fs._index.keys()
totals = {}
for oid in iter:
data, serialno = fs.load(oid, '')
mod, klass = get_pickle_metadata(data)
key = "%s.%s" % (mod, klass)
bytes, count = totals.get(key, (0, 0))
bytes += len(data)
count += 1
totals[key] = bytes, count
if v:
print "%8s %5d %s" % (U64(oid), len(data), key)
L = totals.items()
L.sort(lambda a, b: cmp(a[1], b[1]))
L.reverse()
print "Totals per object class:"
for key, (bytes, count) in L:
print "%8d %8d %s" % (count, bytes, key)
def main():
import sys
import getopt
try:
opts, args = getopt.getopt(sys.argv[1:], "v")
except getopt.error, msg:
print msg
print "usage: space.py [-v] Data.fs"
sys.exit(2)
if len(args) != 1:
print "usage: space.py [-v] Data.fs"
sys.exit(2)
v = 0
for o, a in opts:
if o == "-v":
v += 1
path = args[0]
run(path, v)
if __name__ == "__main__":
main()
#!python
"""Transaction timeout test script.
This script connects to a storage, begins a transaction, calls store()
and tpc_vote(), and then sleeps forever. This should trigger the
transaction timeout feature of the server.
usage: timeout.py address delay [storage-name]
"""
import sys
import time
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle
from ZEO.ClientStorage import ClientStorage
ZERO = '\0'*8
def main():
if len(sys.argv) not in (3, 4):
sys.stderr.write("Usage: timeout.py address delay [storage-name]\n" %
sys.argv[0])
sys.exit(2)
hostport = sys.argv[1]
delay = float(sys.argv[2])
if sys.argv[3:]:
name = sys.argv[3]
else:
name = "1"
if "/" in hostport:
address = hostport
else:
if ":" in hostport:
i = hostport.index(":")
host, port = hostport[:i], hostport[i+1:]
else:
host, port = "", hostport
port = int(port)
address = (host, port)
print "Connecting to %s..." % repr(address)
storage = ClientStorage(address, name)
print "Connected. Now starting a transaction..."
oid = storage.new_oid()
version = ""
revid = ZERO
data = MinPO("timeout.py")
pickled_data = zodb_pickle(data)
t = Transaction()
t.user = "timeout.py"
storage.tpc_begin(t)
storage.store(oid, revid, pickled_data, version, t)
print "Stored. Now voting..."
storage.tpc_vote(t)
print "Voted; now sleeping %s..." % delay
time.sleep(delay)
print "Done."
if __name__ == "__main__":
main()
#! /usr/bin/env python
#!python
"""Connect to a ZEO server and ask it to pack.
Usage: zeopack.py [options]
......
#! /usr/bin/env python
#!python
"""Report on the number of currently waiting clients in the ZEO queue.
Usage: %(PROGRAM)s [options] logfile
......
#!python
"""Parse the BLATHER logging generated by ZEO, and optionally replay it.
Usage: zeointervals.py [options]
......
This diff is collapsed.
#! /usr/bin/env python
#!python
"""Make sure a ZEO server is running.
Usage: zeoup.py [options]
usage: zeoup.py [options]
The test will connect to a ZEO server, load the root object, and attempt to
update the zeoup counter in the root. It will report success if it updates
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment