Commit af84ec6e authored by Jeremy Hylton's avatar Jeremy Hylton

Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.

Please make all future changes on the Zope-2_7-branch instead.
parent c5bc6d34
......@@ -367,8 +367,14 @@ class ClientCache:
data = read(dlen)
self._trace(0x2A, oid, version, h[19:], dlen)
if (p < 0) != self._current:
# If the cache read we are copying has version info,
# we need to pass the header to copytocurrent().
if vlen:
vheader = read(vlen + 4)
else:
vheader = None
self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h,
oid, data)
oid, data, vheader)
return data, h[19:]
else:
self._trace(0x26, oid, version)
......@@ -412,12 +418,13 @@ class ClientCache:
"""
if self._pos + tlen > self._limit:
return # Don't let this cause a cache flip
assert len(header) == 27
assert len(header) == 27, len(header)
if header[8] == 'n':
# Rewrite the header to drop the version data.
# This shortens the record.
tlen = 31 + oidlen + dlen
vlen = 0
vheader = None
# (oidlen:2, reserved:6, status:1, tlen:4,
# vlen:2, dlen:4, serial:8)
header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
......@@ -446,7 +453,8 @@ class ClientCache:
l.append(vdata)
l.append(vserial)
else:
assert None is vheader is vdata is vserial
assert None is vheader is vdata is vserial, (
vlen, vheader, vdata, vserial)
l.append(header[9:13]) # copy of tlen
g = self._f[self._current]
g.seek(self._pos)
......
......@@ -78,7 +78,7 @@ disconnected_stub = DisconnectedServerStub()
MB = 1024**2
class ClientStorage:
class ClientStorage(object):
"""A Storage class that is a network client to a remote storage.
......@@ -129,6 +129,7 @@ class ClientStorage:
client -- A name used to construct persistent cache filenames.
Defaults to None, in which case the cache is not persistent.
See ClientCache for more info.
debug -- Ignored. This is present only for backwards
compatibility with ZEO 1.
......
......@@ -31,6 +31,9 @@ class CommitLog:
self.stores = 0
self.read = 0
def size(self):
return self.file.tell()
def store(self, oid, serial, data, version):
self.pickler.dump((oid, serial, data, version))
self.stores += 1
......
......@@ -82,6 +82,7 @@ class ZEOStorage:
self.read_only = read_only
self.locked = 0
self.verifying = 0
self.store_failed = 0
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
......@@ -367,6 +368,7 @@ class ZEOStorage:
self.txnlog = CommitLog()
self.tid = tid
self.status = status
self.store_failed = 0
self.stats.active_txns += 1
def tpc_finish(self, id):
......@@ -471,12 +473,14 @@ class ZEOStorage:
self.storage.tpc_begin(txn, tid, status)
def _store(self, oid, serial, data, version):
err = None
try:
newserial = self.storage.store(oid, serial, data, version,
self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
self.store_failed = 1
if isinstance(err, ConflictError):
self.stats.conflicts += 1
if not isinstance(err, TransactionError):
......@@ -503,9 +507,15 @@ class ZEOStorage:
if newserial == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.serials.append((oid, newserial))
return err is None
def _vote(self):
self.client.serialnos(self.serials)
# If a store call failed, then return to the client immediately.
# The serialnos() call will deliver an exception that will be
# handled by the client in its tpc_vote() method.
if self.store_failed:
return
return self.storage.tpc_vote(self.transaction)
def _abortVersion(self, src):
......@@ -554,11 +564,18 @@ class ZEOStorage:
def _restart(self, delay=None):
# Restart when the storage lock is available.
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=zLOG.BLATHER)
self._tpc_begin(self.transaction, self.tid, self.status)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
# load oid, serial, data, version
self._store(*loader.load())
if not self._store(*loader.load()):
break
resp = self._thunk()
if delay is not None:
delay.reply(resp)
......@@ -612,7 +629,7 @@ class StorageServer:
transaction_timeout=None,
monitor_address=None,
auth_protocol=None,
auth_filename=None,
auth_database=None,
auth_realm=None):
"""StorageServer constructor.
......@@ -659,7 +676,7 @@ class StorageServer:
auth_protocol -- The name of the authentication protocol to use.
Examples are "digest" and "srp".
auth_filename -- The name of the password database filename.
auth_database -- The name of the password database filename.
It should be in a format compatible with the authentication
protocol used; for instance, "sha" and "srp" require different
formats.
......@@ -685,7 +702,7 @@ class StorageServer:
s._waiting = []
self.read_only = read_only
self.auth_protocol = auth_protocol
self.auth_filename = auth_filename
self.auth_database = auth_database
self.auth_realm = auth_realm
self.database = None
if auth_protocol:
......@@ -739,7 +756,7 @@ class StorageServer:
# storages, avoiding the need to bloat each with a new authenticator
# Database that would contain the same info, and also avoiding any
# possibly synchronization issues between them.
self.database = db_class(self.auth_filename)
self.database = db_class(self.auth_database)
if self.database.realm != self.auth_realm:
raise ValueError("password database realm %r "
"does not match storage realm %r"
......
......@@ -82,8 +82,7 @@ class CommitLockTests:
# The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is contention for a
# single storage. There are a lot of cases to cover. transaction
# has finished.
# single storage. There are a lot of cases to cover.
# The general flow of these tests is to start a transaction by
# getting far enough into 2PC to acquire the commit lock. Then
......
......@@ -100,6 +100,8 @@ class CommonSetupTearDown(StorageTestBase):
if getattr(self, '_storage', None) is not None:
self._storage.close()
if hasattr(self._storage, 'cleanup'):
zLOG.LOG("testZEO", zLOG.DEBUG, "cleanup storage %s" %
self._storage.__name__)
self._storage.cleanup()
for adminaddr in self._servers:
if adminaddr is not None:
......@@ -141,9 +143,14 @@ class CommonSetupTearDown(StorageTestBase):
def getConfig(self, path, create, read_only):
raise NotImplementedError
def openClientStorage(self, cache='', cache_size=200000, wait=1,
cache_id = 1
def openClientStorage(self, cache=None, cache_size=200000, wait=1,
read_only=0, read_only_fallback=0,
username=None, password=None, realm=None):
if cache is None:
cache = str(self.__class__.cache_id)
self.__class__.cache_id += 1
self.caches.append(cache)
storage = TestClientStorage(self.addr,
client=cache,
......@@ -566,6 +573,70 @@ class ConnectionTests(CommonSetupTearDown):
db2.close()
db1.close()
class InvqTests(CommonSetupTearDown):
invq = 2
def checkQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
def checkVerificationWith2ClientsInvqOverflow(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
# the test code sets invq bound to 2
for i in range(5):
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
t = time.time() + 30
while not perstorage.end_verify.isSet():
perstorage.sync()
if time.time() > t:
self.fail("timed out waiting for endVerify")
self.assertEqual(self._storage.load(oid, '')[1], revid)
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
class ReconnectionTests(CommonSetupTearDown):
# The setUp() starts a server automatically. In order for its
# state to persist, we set the class variable keep to 1. In
......@@ -688,7 +759,7 @@ class ReconnectionTests(CommonSetupTearDown):
self._newAddr()
# Start a read-only server
self.startServer(create=0, index=0, read_only=1)
self.startServer(create=0, index=0, read_only=1, keep=0)
# Start a client in fallback mode
self._storage = self.openClientStorage(read_only_fallback=1)
# Stores should fail here
......@@ -756,69 +827,6 @@ class ReconnectionTests(CommonSetupTearDown):
perstorage.close()
self._storage.close()
def checkQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
def checkVerificationWith2ClientsInvqOverflow(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
# the test code sets invq bound to 2
for i in range(5):
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
t = time.time() + 30
while not perstorage.end_verify.isSet():
perstorage.sync()
if time.time() > t:
self.fail("timed out waiting for endVerify")
self.assertEqual(self._storage.load(oid, '')[1], revid)
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
class TimeoutTests(CommonSetupTearDown):
timeout = 1
......
......@@ -12,7 +12,6 @@
#
##############################################################################
from thread import get_ident
import threading
import time
......@@ -20,12 +19,10 @@ from BTrees.check import check, display
from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread
from ZEO.tests.ConnectionTests import CommonSetupTearDown
from ZODB.DB import DB
from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError
import zLOG
# The tests here let several threads have a go at one or more database
# instances simultaneously. Each thread appends a disjoint (from the
......@@ -48,8 +45,8 @@ class StressThread(TestThread):
# to 'tree' until Event stop is set. If sleep is given, sleep
# that long after each append. At the end, instance var .added_keys
# is a list of the ints the thread believes it added successfully.
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
def __init__(self, testcase, db, stop, threadnum, commitdict,
startnum, step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
......@@ -58,6 +55,7 @@ class StressThread(TestThread):
self.step = step
self.sleep = sleep
self.added_keys = []
self.commitdict = commitdict
def testrun(self):
cn = self.db.open()
......@@ -74,6 +72,7 @@ class StressThread(TestThread):
tree[key] = self.threadnum
get_transaction().note("add key %s" % key)
get_transaction().commit()
self.commitdict[self] = 1
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
......@@ -88,9 +87,13 @@ class StressThread(TestThread):
key += self.step
cn.close()
class VersionStressThread(TestThread):
class LargeUpdatesThread(TestThread):
# A thread that performs a lot of updates. It attempts to modify
# more than 25 objects so that it can test code that runs vote
# in a separate thread when it modifies more than 25 objects.
def __init__(self, testcase, db, stop, threadnum, startnum,
def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
......@@ -100,21 +103,88 @@ class VersionStressThread(TestThread):
self.step = step
self.sleep = sleep
self.added_keys = []
self.commitdict = commitdict
def testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
# print "%d getting tree abort" % self.threadnum
get_transaction().abort()
cn.sync()
def log(self, msg):
zLOG.LOG("thread %d" % get_ident(), 0, msg)
keys_added = {} # set of keys we commit
tkeys = []
while not self.stop.isSet():
# The test picks 50 keys spread across many buckets.
# self.startnum and self.step ensure that all threads use
# disjoint key sets, to minimize conflict errors.
nkeys = len(tkeys)
if nkeys < 50:
tkeys = range(self.startnum, 3000, self.step)
nkeys = len(tkeys)
step = max(int(nkeys / 50), 1)
keys = [tkeys[i] for i in range(0, nkeys, step)]
for key in keys:
try:
tree[key] = self.threadnum
except (ReadConflictError, ConflictError), msg:
# print "%d setting key %s" % (self.threadnum, msg)
get_transaction().abort()
cn.sync()
break
else:
# print "%d set #%d" % (self.threadnum, len(keys))
get_transaction().note("keys %s" % ", ".join(map(str, keys)))
try:
get_transaction().commit()
self.commitdict[self] = 1
if self.sleep:
time.sleep(self.sleep)
except ConflictError, msg:
# print "%d commit %s" % (self.threadnum, msg)
get_transaction().abort()
cn.sync()
continue
for k in keys:
tkeys.remove(k)
keys_added[k] = 1
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
self.added_keys = keys_added.keys()
cn.close()
class VersionStressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
self.commitdict = commitdict
def testrun(self):
self.log("thread begin")
commit = 0
key = self.startnum
while not self.stop.isSet():
version = "%s:%s" % (self.threadnum, key)
commit = not commit
self.log("attempt to add key=%s version=%s commit=%d" %
(key, version, commit))
if self.oneupdate(version, key, commit):
self.added_keys.append(key)
self.commitdict[self] = 1
key += self.step
def oneupdate(self, version, key, commit=1):
......@@ -134,13 +204,11 @@ class VersionStressThread(TestThread):
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().note("add key %d" % key)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
break
except (VersionLockError, ReadConflictError, ConflictError), msg:
self.log(msg)
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
......@@ -163,20 +231,30 @@ class VersionStressThread(TestThread):
time.sleep(self.sleep)
return commit
except ConflictError, msg:
self.log(msg)
get_transaction().abort()
cn.sync()
finally:
cn.close()
return 0
class InvalidationTests(CommonSetupTearDown):
class InvalidationTests:
level = 2
DELAY = 15 # number of seconds the main thread lets the workers run
# Minimum # of seconds the main thread lets the workers run. The
# test stops as soon as this much time has elapsed, and all threads
# have managed to commit a change.
MINTIME = 10
# Maximum # of seconds the main thread lets the workers run. We
# stop after this long has elapsed regardless of whether all threads
# have managed to commit a change.
MAXTIME = 300
StressThread = StressThread
def _check_tree(self, cn, tree):
# Make sure the BTree is sane and that all the updates persisted.
# Make sure the BTree is sane at the C level.
retries = 3
while retries:
retries -= 1
......@@ -196,28 +274,46 @@ class InvalidationTests(CommonSetupTearDown):
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
all_keys = []
expected_keys = []
errormsgs = []
err = errormsgs.append
for t in threads:
# If the test didn't add any keys, it didn't do what we expected.
self.assert_(t.added_keys)
for key in t.added_keys:
self.assert_(tree.has_key(key), key)
all_keys.extend(t.added_keys)
all_keys.sort()
self.assertEqual(all_keys, list(tree.keys()))
def go(self, stop, *threads):
if not t.added_keys:
err("thread %d didn't add any keys" % t.threadnum)
expected_keys.extend(t.added_keys)
expected_keys.sort()
actual_keys = list(tree.keys())
if expected_keys != actual_keys:
err("expected keys != actual keys")
for k in expected_keys:
if k not in actual_keys:
err("key %s expected but not in tree" % k)
for k in actual_keys:
if k not in expected_keys:
err("key %s in tree but not expected" % k)
if errormsgs:
display(tree)
self.fail('\n'.join(errormsgs))
def go(self, stop, commitdict, *threads):
# Run the threads
for t in threads:
t.start()
time.sleep(self.DELAY)
delay = self.MINTIME
start = time.time()
while time.time() - start <= self.MAXTIME:
time.sleep(delay)
delay = 2.0
if len(commitdict) >= len(threads):
break
# Some thread still hasn't managed to commit anything.
stop.set()
for t in threads:
t.cleanup()
def checkConcurrentUpdates2Storages(self):
self._storage = storage1 = self.openClientStorage()
storage2 = self.openClientStorage(cache="2")
storage2 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(storage2)
stop = threading.Event()
......@@ -227,9 +323,10 @@ class InvalidationTests(CommonSetupTearDown):
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1)
t2 = StressThread(self, db2, stop, 2, 2)
self.go(stop, t1, t2)
cd = {}
t1 = self.StressThread(self, db1, stop, 1, cd, 1)
t2 = self.StressThread(self, db2, stop, 2, cd, 2)
self.go(stop, cd, t1, t2)
cn.sync()
self._check_tree(cn, tree)
......@@ -249,9 +346,10 @@ class InvalidationTests(CommonSetupTearDown):
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
self.go(stop, t1, t2)
cd = {}
t1 = self.StressThread(self, db1, stop, 1, cd, 1, sleep=0.001)
t2 = self.StressThread(self, db1, stop, 2, cd, 2, sleep=0.001)
self.go(stop, cd, t1, t2)
cn.sync()
self._check_tree(cn, tree)
......@@ -263,22 +361,23 @@ class InvalidationTests(CommonSetupTearDown):
def checkConcurrentUpdates2StoragesMT(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
db2 = DB(self.openClientStorage(cache="2"))
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = StressThread(self, db1, stop, 1, 1, 3)
t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cd = {}
t1 = self.StressThread(self, db1, stop, 1, cd, 1, 3)
t2 = self.StressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
t3 = self.StressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
self.go(stop, cd, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
......@@ -291,7 +390,7 @@ class InvalidationTests(CommonSetupTearDown):
def checkConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage(cache="2"))
db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
......@@ -303,10 +402,11 @@ class InvalidationTests(CommonSetupTearDown):
# is possible for both threads to read the same object
# at the same time.
t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cd = {}
t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
self.go(stop, cd, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
......@@ -316,3 +416,41 @@ class InvalidationTests(CommonSetupTearDown):
db1.close()
db2.close()
def checkConcurrentLargeUpdates(self):
# Use 3 threads like the 2StorageMT test above.
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage())
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
for i in range(0, 3000, 2):
tree[i] = 0
get_transaction().commit()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
cd = {}
t1 = LargeUpdatesThread(self, db1, stop, 1, cd, 1, 3, 0.001)
t2 = LargeUpdatesThread(self, db2, stop, 2, cd, 2, 3, 0.001)
t3 = LargeUpdatesThread(self, db2, stop, 3, cd, 3, 3, 0.001)
self.go(stop, cd, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
# Purge the tree of the dummy entries mapping to 0.
losers = [k for k, v in tree.items() if v == 0]
for k in losers:
del tree[k]
get_transaction().commit()
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
......@@ -32,14 +32,12 @@ class ClientCacheTests(unittest.TestCase):
_oid3 = 'cdefghij'
def setUp(self):
unittest.TestCase.setUp(self)
self.cachesize = 10*1000*1000
self.cache = ClientCache(size=self.cachesize)
self.cache.open()
def tearDown(self):
self.cache.close()
unittest.TestCase.tearDown(self)
def testOpenClose(self):
pass # All the work is done by setUp() / tearDown()
......@@ -281,9 +279,10 @@ class ClientCacheTests(unittest.TestCase):
class PersistentClientCacheTests(unittest.TestCase):
_oid = 'abcdefgh'
_oid2 = 'bcdefghi'
_oid3 = 'cdefghij'
def setUp(self):
unittest.TestCase.setUp(self)
self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
self.cachesize = 10*1000*1000
self.storagename = 'foo'
......@@ -319,7 +318,6 @@ class PersistentClientCacheTests(unittest.TestCase):
os.unlink(filename)
except os.error:
pass
unittest.TestCase.tearDown(self)
def testCacheFileSelection(self):
# A bug in __init__ read the wrong slice of the file to determine
......@@ -388,6 +386,41 @@ class PersistentClientCacheTests(unittest.TestCase):
cache.checkSize(10*self.cachesize) # Force a file flip
self.failUnless(cache.getLastTid() is None)
def testLoadNonversionWithVersionInFlippedCache(self):
# This test provokes an error seen once in an unrelated test.
# The object is stored in the old cache file with version data,
# a load for non-version data occurs. The attempt to copy the
# non-version data to the new file fails.
nvdata = "Mend your speech a little, lest it may mar your fortunes."
nvserial = "12345678"
version = "folio"
vdata = "Mend your speech a little, lest you may mar your fortunes."
vserial = "12346789"
self.cache.store(self._oid, nvdata, nvserial, version, vdata, vserial)
self.cache.checkSize(10 * self.cachesize) # force a cache flip
for i in 1, 2: # check the we can load before and after copying
for xversion, xdata, xserial in [("", nvdata, nvserial),
(version, vdata, vserial)]:
data, serial = self.cache.load(self._oid, xversion)
self.assertEqual(data, xdata)
self.assertEqual(serial, xserial)
# now cause two more cache flips and make sure the data is still there
self.cache.store(self._oid2, "", "", "foo", "bar", "23456789")
self.cache.checkSize(10 * self.cachesize) # force a cache flip
self.cache.load(self._oid, "")
self.cache.store(self._oid3, "bar", "34567890", "", "", "")
self.cache.checkSize(10 * self.cachesize) # force a cache flip
self.cache.load(self._oid, "")
for i in 1, 2: # check the we can load before and after copying
for xversion, xdata, xserial in [("", nvdata, nvserial),
(version, vdata, vserial)]:
data, serial = self.cache.load(self._oid, xversion)
self.assertEqual(data, xdata)
self.assertEqual(serial, xserial)
class ClientCacheLongOIDTests(ClientCacheTests):
_oid = 'abcdefghijklmnop' * 2
......@@ -397,7 +430,8 @@ class ClientCacheLongOIDTests(ClientCacheTests):
class PersistentClientCacheLongOIDTests(PersistentClientCacheTests):
_oid = 'abcdefghijklmnop' * 2
_oid2 = 'bcdefghijklmnopq' * 2
_oid3 = 'cdefghijklmnopqr' * 2
def test_suite():
suite = unittest.TestSuite()
......
......@@ -38,7 +38,7 @@ class BerkeleyStorageConfig:
def getConfig(self, path, create, read_only):
return """\
<fullstorage 1>
name %s
envdir %s
read-only %s
</fullstorage>""" % (path, read_only and "yes" or "no")
......@@ -57,19 +57,25 @@ class FileStorageConnectionTests(
class FileStorageReconnectionTests(
FileStorageConfig,
ConnectionTests.ReconnectionTests
ConnectionTests.ReconnectionTests,
):
"""FileStorage-specific re-connection tests."""
# Run this at level 1 because MappingStorage can't do reconnection tests
level = 1
class FileStorageInvqTests(
FileStorageConfig,
ConnectionTests.InvqTests
):
"""FileStorage-specific invalidation queue tests."""
level = 1
class FileStorageTimeoutTests(
FileStorageConfig,
ConnectionTests.TimeoutTests
):
level = 2
class BDBConnectionTests(
BerkeleyStorageConfig,
ConnectionTests.ConnectionTests,
......@@ -85,6 +91,13 @@ class BDBReconnectionTests(
"""Berkeley storage re-connection tests."""
level = 2
class BDBInvqTests(
BerkeleyStorageConfig,
ConnectionTests.InvqTests
):
"""Berkeley storage invalidation queue tests."""
level = 2
class BDBTimeoutTests(
BerkeleyStorageConfig,
ConnectionTests.TimeoutTests
......@@ -112,22 +125,19 @@ class MappingStorageTimeoutTests(
test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests,
FileStorageInvqTests,
FileStorageTimeoutTests,
MappingStorageConnectionTests,
MappingStorageTimeoutTests]
import BDBStorage
if BDBStorage.is_available:
test_classes.append(BDBConnectionTests)
test_classes.append(BDBReconnectionTests)
test_classes.append(BDBTimeoutTests)
test_classes += [BDBConnectionTests,
BDBReconnectionTests,
BDBInvqTests,
BDBTimeoutTests]
def test_suite():
# shutup warnings about mktemp
import warnings
warnings.filterwarnings("ignore", "mktemp")
suite = unittest.TestSuite()
for klass in test_classes:
sub = unittest.makeSuite(klass, 'check')
......
......@@ -187,7 +187,7 @@ class BDBTests(FileStorageTests):
self._envdir = tempfile.mktemp()
return """\
<fullstorage 1>
name %s
envdir %s
</fullstorage>
""" % self._envdir
......
......@@ -122,9 +122,9 @@ class Suicide(threading.Thread):
self._adminaddr = addr
def run(self):
# If this process doesn't exit in 100 seconds, commit suicide
for i in range(20):
time.sleep(5)
# If this process doesn't exit in 300 seconds, commit suicide
time.sleep(300)
log("zeoserver", "suicide thread invoking shutdown")
from ZEO.tests.forker import shutdown_zeo_server
# XXX If the -k option was given to zeoserver, then the process will
# go away but the temp files won't get cleaned up.
......@@ -174,7 +174,7 @@ def main():
transaction_timeout=zo.transaction_timeout,
monitor_address=mon_addr,
auth_protocol=zo.auth_protocol,
auth_filename=zo.auth_database,
auth_database=zo.auth_database,
auth_realm=zo.auth_realm)
try:
......
......@@ -28,7 +28,7 @@ from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection
class ConnectionManager:
class ConnectionManager(object):
"""Keeps a connection up over time"""
def __init__(self, addrs, client, tmin=1, tmax=180):
......
......@@ -67,7 +67,7 @@ class MTDelay(Delay):
self.ready.wait()
Delay.error(self, exc_info)
class Connection(smac.SizedMessageAsyncConnection):
class Connection(smac.SizedMessageAsyncConnection, object):
"""Dispatcher for RPC on object on both sides of socket.
The connection supports synchronous calls, which expect a return,
......
......@@ -13,24 +13,29 @@
##############################################################################
"""Handy standard storage machinery
$Id: BaseStorage.py,v 1.34 2003/06/10 15:46:31 shane Exp $
$Id: BaseStorage.py,v 1.35 2003/09/15 16:29:15 jeremy Exp $
"""
import cPickle
import ThreadLock, bpthread
import time, UndoLogCompatible
import POSException
from TimeStamp import TimeStamp
z64='\0'*8
import time
class BaseStorage(UndoLogCompatible.UndoLogCompatible):
import ThreadLock
import zLOG
from ZODB import bpthread
from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
from ZODB.UndoLogCompatible import UndoLogCompatible
from ZODB.utils import z64
class BaseStorage(UndoLogCompatible):
_transaction=None # Transaction that is being committed
_serial=z64 # Transaction serial number
_tstatus=' ' # Transaction status, used for copying data
_is_read_only = 0
def __init__(self, name, base=None):
self.__name__=name
self.__name__= name
zLOG.LOG(self.__class__.__name__, zLOG.DEBUG,
"create storage %s" % self.__name__)
# Allocate locks:
l=ThreadLock.allocate_lock()
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.98 2003/06/13 21:53:08 jeremy Exp $"""
$Id: Connection.py,v 1.99 2003/09/15 16:29:15 jeremy Exp $"""
from __future__ import nested_scopes
......@@ -47,7 +47,7 @@ def updateCodeTimestamp():
ExtensionKlass = Base.__class__
class Connection(ExportImport.ExportImport):
class Connection(ExportImport.ExportImport, object):
"""Object managers for individual object space.
An object space is a version of collection of objects. In a
......@@ -136,11 +136,10 @@ class Connection(ExportImport.ExportImport):
# Explicitly remove references from the connection to its
# cache and to the root object, because they refer back to the
# connection.
if self._cache is not None:
self._cache.clear()
self._cache = None
self._incrgc = None
self.cacheGC = None
self._root_ = None
def __getitem__(self, oid, tt=type(())):
obj = self._cache.get(oid, None)
......@@ -176,8 +175,6 @@ class Connection(ExportImport.ExportImport):
object._p_serial=serial
self._cache[oid] = object
if oid=='\0\0\0\0\0\0\0\0':
self._root_=object # keep a ref
return object
def _persistent_load(self,oid,
......@@ -279,6 +276,7 @@ class Connection(ExportImport.ExportImport):
self.__onCloseCallbacks.append(f)
def close(self):
if self._incrgc is not None:
self._incrgc() # This is a good time to do some GC
# Call the close callbacks.
......
......@@ -13,8 +13,8 @@
##############################################################################
"""Database objects
$Id: DB.py,v 1.53 2003/06/24 21:50:18 jeremy Exp $"""
__version__='$Revision: 1.53 $'[11:-2]
$Id: DB.py,v 1.54 2003/09/15 16:29:15 jeremy Exp $"""
__version__='$Revision: 1.54 $'[11:-2]
import cPickle, cStringIO, sys, POSException, UndoLogCompatible
from Connection import Connection
......@@ -32,7 +32,7 @@ def list2dict(L):
d[elt] = 1
return d
class DB(UndoLogCompatible.UndoLogCompatible):
class DB(UndoLogCompatible.UndoLogCompatible, object):
"""The Object Database
The Object database coordinates access to and interaction of one
......
......@@ -79,7 +79,7 @@ method::
and call it to monitor the storage.
"""
__version__='$Revision: 1.19 $'[11:-2]
__version__='$Revision: 1.20 $'[11:-2]
import base64, time, string
from ZODB import POSException, BaseStorage, utils
......
......@@ -115,7 +115,7 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.136 $'[11:-2]
__version__='$Revision: 1.137 $'[11:-2]
import base64
from cPickle import Pickler, Unpickler, loads
......@@ -157,17 +157,21 @@ DATA_VERSION_HDR_LEN = 58
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
def blather(message, *data):
LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
message % data))
def warn(message, *data):
LOG('ZODB FS', WARNING, "%s warn: %s\n" % (packed_version,
(message % data)))
message % data))
def error(message, *data):
LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
(message % data)))
message % data))
def nearPanic(message, *data):
LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
(message % data)))
message % data))
def panic(message, *data):
message = message % data
......@@ -234,8 +238,10 @@ class FileStorage(BaseStorage.BaseStorage,
BaseStorage.BaseStorage.__init__(self, file_name)
index, vindex, tindex, tvindex = self._newIndexes()
self._initIndex(index, vindex, tindex, tvindex)
(index, vindex, tindex, tvindex,
oid2serial, toid2serial, toid2serial_delete) = self._newIndexes()
self._initIndex(index, vindex, tindex, tvindex,
oid2serial, toid2serial, toid2serial_delete)
# Now open the file
......@@ -269,7 +275,8 @@ class FileStorage(BaseStorage.BaseStorage,
self._used_index = 1 # Marker for testing
index, vindex, start, maxoid, ltid = r
self._initIndex(index, vindex, tindex, tvindex)
self._initIndex(index, vindex, tindex, tvindex,
oid2serial, toid2serial, toid2serial_delete)
self._pos, self._oid, tid = read_index(
self._file, file_name, index, vindex, tindex, stop,
ltid=ltid, start=start, maxoid=maxoid,
......@@ -302,7 +309,11 @@ class FileStorage(BaseStorage.BaseStorage,
self._quota = quota
def _initIndex(self, index, vindex, tindex, tvindex):
# Serialno cache statistics.
self._oid2serial_nlookups = self._oid2serial_nhits = 0
def _initIndex(self, index, vindex, tindex, tvindex,
oid2serial, toid2serial, toid2serial_delete):
self._index=index
self._vindex=vindex
self._tindex=tindex
......@@ -310,12 +321,33 @@ class FileStorage(BaseStorage.BaseStorage,
self._index_get=index.get
self._vindex_get=vindex.get
# .store() needs to compare the passed-in serial to the current
# serial in the database. _oid2serial caches the oid -> current
# serial mapping for non-version data (if the current record for
# oid is version data, the oid is not a key in _oid2serial).
# The point is that otherwise seeking into the storage is needed
# to extract the current serial, and that's an expensive operation.
# For example, if a transaction stores 4000 objects, and each
# random seek + read takes 7ms (that was approximately true on
# Linux and Windows tests in mid-2003), that's 28 seconds just to
# find the old serials.
# XXX Probably better to junk this and redefine _index as mapping
# XXX oid to (offset, serialno) pair, via a new memory-efficient
# XXX BTree type.
self._oid2serial = oid2serial
# oid->serialno map to transactionally add to _oid2serial.
self._toid2serial = toid2serial
# Set of oids to transactionally delete from _oid2serial (e.g.,
# oids reverted by undo, or for which the most recent record
# becomes version data).
self._toid2serial_delete = toid2serial_delete
def __len__(self):
return len(self._index)
def _newIndexes(self):
# hook to use something other than builtin dict
return fsIndex(), {}, {}, {}
return fsIndex(), {}, {}, {}, {}, {}, {}
_saved = 0
def _save_index(self):
......@@ -483,6 +515,31 @@ class FileStorage(BaseStorage.BaseStorage,
# XXX should log the error, though
pass # We don't care if this fails.
# Return serial number of most recent record for oid if that's in
# the _oid2serial cache. Else return None. It's important to use
# this instead of indexing _oid2serial directly so that cache
# statistics can be logged.
def _get_cached_serial(self, oid):
self._oid2serial_nlookups += 1
result = self._oid2serial.get(oid)
if result is not None:
self._oid2serial_nhits += 1
# Log a msg every ~8000 tries, and prevent overflow.
if self._oid2serial_nlookups & 0x1fff == 0:
if self._oid2serial_nlookups >> 30:
# In older Pythons, we may overflow if we keep it an int.
self._oid2serial_nlookups = long(self._oid2serial_nlookups)
self._oid2serial_nhits = long(self._oid2serial_nhits)
blather("_oid2serial size %s lookups %s hits %s rate %.1f%%",
len(self._oid2serial),
self._oid2serial_nlookups,
self._oid2serial_nhits,
100.0 * self._oid2serial_nhits /
self._oid2serial_nlookups)
return result
def abortVersion(self, src, transaction):
return self.commitVersion(src, '', transaction, abort=1)
......@@ -585,33 +642,11 @@ class FileStorage(BaseStorage.BaseStorage,
spos = h[-8:]
srcpos = u64(spos)
self._toid2serial_delete.update(current_oids)
return oids
def getSize(self): return self._pos
def _loada(self, oid, _index, file):
"Read any version and return the version"
try:
pos=_index[oid]
except KeyError:
raise POSKeyError(oid)
except TypeError:
raise TypeError, 'invalid oid %r' % (oid,)
file.seek(pos)
read=file.read
h=read(DATA_HDR_LEN)
doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
if vlen:
nv = u64(read(8))
read(8) # Skip previous version record pointer
version = read(vlen)
else:
version = ''
nv = 0
if plen != z64:
return read(u64(plen)), version, nv
return _loadBack(file, oid, read(8))[0], version, nv
def getSize(self):
return self._pos
def _load(self, oid, version, _index, file):
try:
......@@ -632,6 +667,10 @@ class FileStorage(BaseStorage.BaseStorage,
(read(8) # skip past version link
and version != read(vlen))):
return _loadBack(file, oid, pnv)
else:
# The most recent record is for non-version data -- cache
# the serialno.
self._oid2serial[oid] = serial
# If we get here, then either this was not a version record,
# or we've already read past the version data!
......@@ -713,9 +752,12 @@ class FileStorage(BaseStorage.BaseStorage,
self._lock_acquire()
try:
old=self._index_get(oid, 0)
pnv=None
old = self._index_get(oid, 0)
cached_serial = None
pnv = None
if old:
cached_serial = self._get_cached_serial(oid)
if cached_serial is None:
self._file.seek(old)
h=self._file.read(DATA_HDR_LEN)
doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
......@@ -727,6 +769,8 @@ class FileStorage(BaseStorage.BaseStorage,
if version != locked_version:
raise POSException.VersionLockError, (
`oid`, locked_version)
else:
oserial = cached_serial
if serial != oserial:
data = self.tryToResolveConflict(oid, oserial, serial,
......@@ -749,14 +793,19 @@ class FileStorage(BaseStorage.BaseStorage,
)
)
if version:
if pnv: write(pnv)
else: write(p64(old))
if pnv:
write(pnv)
else:
write(p64(old))
# Link to last record for this version:
tvindex=self._tvindex
pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
write(p64(pv))
tvindex[version]=here
write(version)
self._toid2serial_delete[oid] = 1
else:
self._toid2serial[oid] = newserial
write(data)
......@@ -875,7 +924,11 @@ class FileStorage(BaseStorage.BaseStorage,
self._tfile.write(p64(pv))
self._tvindex[version] = here
self._tfile.write(version)
# And finally, write the data or a backpointer
self._toid2serial_delete[oid] = 1
else:
self._toid2serial[oid] = serial
# Finally, write the data or a backpointer.
if data is None:
if prev_pos:
self._tfile.write(p64(prev_pos))
......@@ -940,6 +993,8 @@ class FileStorage(BaseStorage.BaseStorage,
def _clear_temp(self):
self._tindex.clear()
self._tvindex.clear()
self._toid2serial.clear()
self._toid2serial_delete.clear()
if self._tfile is not None:
self._tfile.seek(0)
......@@ -1023,6 +1078,12 @@ class FileStorage(BaseStorage.BaseStorage,
self._index.update(self._tindex)
self._vindex.update(self._tvindex)
self._oid2serial.update(self._toid2serial)
for oid in self._toid2serial_delete.keys():
try:
del self._oid2serial[oid]
except KeyError:
pass
# Update the number of records that we've written
# +1 for the transaction record
......@@ -1090,21 +1151,28 @@ class FileStorage(BaseStorage.BaseStorage,
def getSerial(self, oid):
self._lock_acquire()
try:
result = self._get_cached_serial(oid)
if result is None:
try:
return self._getSerial(oid, self._index[oid])
result = self._getSerial(oid, self._index[oid])
except KeyError:
raise POSKeyError(oid)
except TypeError:
raise TypeError, 'invalid oid %r' % (oid,)
return result
finally:
self._lock_release()
def _getSerial(self, oid, pos):
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
h = self._file.read(16)
if len(h) < 16:
raise CorruptedDataError(h)
h += self._file.read(26) # get rest of header
if h[:8] != oid:
raise CorruptedDataError(h)
oid2, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
assert oid == oid2
if splen==z64:
if splen == z64:
# a back pointer
bp = self._file.read(8)
if bp == z64:
......@@ -1243,6 +1311,10 @@ class FileStorage(BaseStorage.BaseStorage,
tpos = self._txn_find(tid, 1)
tindex = self._txn_undo_write(tpos)
self._tindex.update(tindex)
# Arrange to clear the affected oids from the oid2serial cache.
# It's too painful to try to update them to correct current
# values instead.
self._toid2serial_delete.update(tindex)
return tindex.keys()
def _txn_find(self, tid, stop_at_pack):
......@@ -1500,7 +1572,9 @@ class FileStorage(BaseStorage.BaseStorage,
# OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name)
self._file = open(self._file_name, 'r+b')
self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
p.oid2serial, p.toid2serial,
p.toid2serial_delete)
self._pos = opos
self._save_index()
finally:
......@@ -1526,20 +1600,9 @@ class FileStorage(BaseStorage.BaseStorage,
if it is a new object -- return None.
"""
try:
pos = self._index[oid]
return self.getSerial(oid)
except KeyError:
return None
except TypeError:
raise TypeError, 'invalid oid %r' % (oid,)
self._file.seek(pos)
# first 8 bytes are oid, second 8 bytes are serialno
h = self._file.read(16)
if len(h) < 16:
raise CorruptedDataError(h)
if h[:8] != oid:
h = h + self._file.read(26) # get rest of header
raise CorruptedDataError(h)
return h[8:]
def cleanup(self):
"""Remove all files created by this storage."""
......
......@@ -15,7 +15,7 @@
static char TimeStamp_module_documentation[] =
"Defines 64-bit TimeStamp objects used as ZODB serial numbers.\n"
"\n"
"\n$Id: TimeStamp.c,v 1.19 2003/06/20 18:38:24 tim_one Exp $\n";
"\n$Id: TimeStamp.c,v 1.20 2003/09/15 16:29:15 jeremy Exp $\n";
#ifdef USE_EXTENSION_CLASS
#include "ExtensionClass.h"
......
......@@ -47,7 +47,7 @@
-->
<sectiontype name="fullstorage" datatype=".BDBFullStorage"
implements="ZODB.storage">
<key name="name" required="yes" />
<key name="envdir" required="yes" />
<key name="interval" datatype="time-interval" default="2m" />
<key name="kbyte" datatype="integer" default="0" />
<key name="min" datatype="integer" default="0" />
......@@ -55,7 +55,7 @@
<key name="cachesize" datatype="byte-size" default="128MB" />
<key name="frequency" datatype="time-interval" default="0" />
<key name="packtime" datatype="time-interval" default="4h" />
<key name="classicpack" datatype="integer" default="0" />
<key name="gcpack" datatype="integer" default="0" />
<key name="read-only" datatype="boolean" default="off"/>
</sectiontype>
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Open database and storage from a configuration.
$Id: config.py,v 1.13 2003/06/16 14:51:49 jeremy Exp $"""
$Id: config.py,v 1.14 2003/09/15 16:29:15 jeremy Exp $"""
import os
from cStringIO import StringIO
......@@ -157,7 +157,7 @@ class BDBStorage(BaseConfig):
if name.startswith('_'):
continue
setattr(bconf, name, getattr(self.config, name))
return storageclass(self.config.name, config=bconf)
return storageclass(self.config.envdir, config=bconf)
class BDBMinimalStorage(BDBStorage):
......
......@@ -33,7 +33,7 @@ import struct
from types import StringType
from ZODB.referencesf import referencesf
from ZODB.utils import p64, u64, z64
from ZODB.utils import p64, u64, z64, oid_repr
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
try:
......@@ -54,7 +54,7 @@ class CorruptedDataError(CorruptedError):
def __str__(self):
if self.oid:
msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid),
msg = "Error reading oid %s. Found %r" % (oid_repr(self.oid),
self.buf)
else:
msg = "Error reading unknown oid. Found %r" % self.buf
......@@ -166,7 +166,7 @@ class FileStorageFormatter:
def checkTxn(self, th, pos):
if th.tid <= self.ltid:
self.fail(pos, "time-stamp reduction: %s <= %s",
_fmt_oid(th.tid), _fmt_oid(self.ltid))
oid_repr(th.tid), oid_repr(self.ltid))
self.ltid = th.tid
if th.status == "c":
self.fail(pos, "transaction with checkpoint flag set")
......@@ -647,11 +647,15 @@ class FileStoragePacker(FileStorageFormatter):
# vindex: version -> pos of XXX
# tindex: oid -> pos, for current txn
# tvindex: version -> pos of XXX, for current txn
# oid2serial: not used by the packer
self.index = fsIndex()
self.vindex = {}
self.tindex = {}
self.tvindex = {}
self.oid2serial = {}
self.toid2serial = {}
self.toid2serial_delete = {}
# Index for non-version data. This is a temporary structure
# to reduce I/O during packing
......@@ -757,7 +761,7 @@ class FileStoragePacker(FileStorageFormatter):
If any data records are copied, also write txn header (th).
"""
copy = 0
new_tpos = 0
new_tpos = 0L
tend = pos + th.tlen
pos += th.headerlen()
while pos < tend:
......
......@@ -381,6 +381,60 @@ class PackableStorage(PackableStorageBase):
eq(root['obj'].value, 7)
def _PackWhileWriting(self, pack_now=0):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
if pack_now:
db.pack(time.time())
else:
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# Iterate over the storage to make sure it's sane, but not every
# storage supports iterators.
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=0)
def checkPackNowWhileWriting(self):
self._PackWhileWriting(pack_now=1)
def checkPackUndoLog(self):
self._initroot()
eq = self.assertEqual
......@@ -450,47 +504,6 @@ class PackableStorage(PackableStorageBase):
for r in self._storage.undoLog(): print r
# what can we assert about that?
def checkPackWhileWriting(self):
# A storage should allow some reading and writing during
# a pack. This test attempts to exercise locking code
# in the storage to test that it is safe. It generates
# a lot of revisions, so that pack takes a long time.
db = DB(self._storage)
conn = db.open()
root = conn.root()
for i in range(10):
root[i] = MinPO(i)
get_transaction().commit()
snooze()
packt = time.time()
for j in range(10):
for i in range(10):
root[i].value = MinPO(i)
get_transaction().commit()
threads = [ClientThread(db) for i in range(4)]
for t in threads:
t.start()
db.pack(packt)
for t in threads:
t.join(30)
for t in threads:
t.join(1)
self.assert_(not t.isAlive())
# iterator over the storage to make sure it's sane
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for data in txn:
pass
iter.close()
class ClientThread(threading.Thread):
def __init__(self, db):
......
......@@ -15,7 +15,7 @@
from ZODB.Transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB
from ZODB.referencesf import referencesf
......@@ -154,3 +154,31 @@ class RecoveryStorage(IteratorDeepCompare):
it.close()
self._dst.tpc_vote(final)
self._dst.tpc_finish(final)
def checkPackWithGCOnDestinationAfterRestore(self):
raises = self.assertRaises
db = DB(self._storage)
conn = db.open()
root = conn.root()
root.obj = obj1 = MinPO(1)
txn = get_transaction()
txn.note('root -> obj')
txn.commit()
root.obj.obj = obj2 = MinPO(2)
txn = get_transaction()
txn.note('root -> obj -> obj')
txn.commit()
del root.obj
txn = get_transaction()
txn.note('root -X->')
txn.commit()
# Now copy the transactions to the destination
self._dst.copyTransactionsFrom(self._storage)
# Now pack the destination.
snooze()
self._dst.pack(time.time(), referencesf)
# And check to see that the root object exists, but not the other
# objects.
data, serial = self._dst.load(root._p_oid, '')
raises(KeyError, self._dst.load, obj1._p_oid, '')
raises(KeyError, self._dst.load, obj2._p_oid, '')
......@@ -488,6 +488,7 @@ class VersionStorage:
root["d"] = MinPO("d")
get_transaction().commit()
snooze()
self._storage.pack(time.time(), referencesf)
cn.sync()
......
......@@ -185,9 +185,7 @@ class LRUCacheTests(CacheTestBase):
self.assertEquals(len(details), CONNS)
for d in details:
self.assertEquals(d['ngsize'], CACHE_SIZE)
# the root is also in the cache as ghost, because
# the connection holds a reference to it
self.assertEquals(d['size'], CACHE_SIZE + 1)
self.assertEquals(d['size'], CACHE_SIZE)
def checkDetail(self):
CACHE_SIZE = 10
......
......@@ -123,7 +123,7 @@ class BDBConfigTest(ConfigTestBase):
cfg = """
<zodb>
<fullstorage>
name %s
envdir %s
</fullstorage>
</zodb>
""" % self._path
......@@ -133,7 +133,7 @@ class BDBConfigTest(ConfigTestBase):
cfg = """
<zodb>
<minimalstorage>
name %s
envdir %s
</minimalstorage>
</zodb>
""" % self._path
......
......@@ -88,7 +88,7 @@ class FileStorageTests(
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}
return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
......@@ -113,7 +113,7 @@ class FileStorageTests(
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}
return {}, {}, {}, {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
......
This directory contains a collect of utilities for managing ZODB
databases. Some are more useful than others. If you install ZODB
using distutils ("python setup.py install"), fsdump.py, fstest.py,
repozo.py, and zeopack.py will be installed in /usr/local/bin.
Unless otherwise noted, these scripts are invoked with the name of the
Data.fs file as their only argument. Example: checkbtrees.py data.fs.
analyze.py -- A transaction analyzer for FileStorage
Reports on the data in a FileStorage. The report is organized by
class. It shows total data, as well as separate reports for current
and historical revisions of objects.
checkbtrees.py -- Checks BTrees in a FileStorage for corruption.
Attempts to find all the BTrees contained in a Data.fs and calls their
_check() methods.
fsdump.py -- Summarize FileStorage contents, one line per revision.
Prints a report of FileStorage contents, with one line for each
transaction and one line for each data record in that transaction.
Includes time stamps, file positions, and class names.
fstest.py -- Simple consistency checker for FileStorage
usage: fstest.py [-v] data.fs
The fstest tool will scan all the data in a FileStorage and report an
error if it finds any corrupt transaction data. The tool will print a
message when the first error is detected an exit.
The tool accepts one or more -v arguments. If a single -v is used, it
will print a line of text for each transaction record it encounters.
If two -v arguments are used, it will also print a line of text for
each object. The objects for a transaction will be printed before the
transaction itself.
Note: It does not check the consistency of the object pickles. It is
possible for the damage to occur only in the part of the file that
stores object pickles. Those errors will go undetected.
netspace.py -- Hackish attempt to report on size of objects
usage: netspace.py [-P | -v] data.fs
-P: do a pack first
-v: print info for all objects, even if a traversal path isn't found
Traverses objects from the database root and attempts to calculate
size of object, including all reachable subobjects.
parsezeolog.py -- Parse BLATHER logs from ZEO server.
This script may be obsolete. It has not been tested against the
current log output of the ZEO server.
Reports on the time and size of transactions committed by a ZEO
server, by inspecting log messages at BLATHER level.
repozo.py -- Incremental backup utility for FileStorage.
Run the script with the -h option to see usage details.
timeout.py -- Script to test transaction timeout
usage: timeout.py address delay [storage-name]
This script connects to a storage, begins a transaction, calls store()
and tpc_vote(), and then sleeps forever. This should trigger the
transaction timeout feature of the server.
zeopack.py -- Script to pack a ZEO server.
The script connects to a server and calls pack() on a specific
storage. See the script for usage details.
zeoreplay.py -- Experimental script to replay transactions from a ZEO log.
Like parsezeolog.py, this may be obsolete because it was written
against an earlier version of the ZEO server. See the script for
usage details.
zeoup.py
Usage: zeoup.py [options]
The test will connect to a ZEO server, load the root object, and
attempt to update the zeoup counter in the root. It will report
success if it updates to counter or if it gets a ConflictError. A
ConflictError is considered a success, because the client was able to
start a transaction.
See the script for details about the options.
zodbload.py - exercise ZODB under a heavy synthesized Zope-like load
See the module docstring for details. Note that this script requires
Zope. New in ZODB3 3.1.4.
zeoserverlog.py - analyze ZEO server log for performance statistics
See the module docstring for details; there are a large number of
options. New in ZODB3 3.1.4.
\ No newline at end of file
#! /usr/bin/env python
#!python
# Based on a transaction analyzer by Matt Kromer.
import pickle
......@@ -137,4 +137,3 @@ def analyze_rec(report, record):
if __name__ == "__main__":
path = sys.argv[1]
report(analyze(path))
#! /usr/bin/env python
#!python
"""Check the consistency of BTrees in a Data.fs
usage: checkbtrees.py data.fs
Try to find all the BTrees in a Data.fs and call their _check() methods.
Try to find all the BTrees in a Data.fs, call their _check() methods,
and run them through BTrees.check.check().
"""
from types import IntType
import ZODB
from ZODB.FileStorage import FileStorage
from BTrees.check import check
# Set of oids we've already visited. Since the object structure is
# a general graph, this is needed to prevent unbounded paths in the
# presence of cycles. It's also helpful in eliminating redundant
# checking when a BTree is pointed to by many objects.
oids_seen = {}
# Append (obj, path) to L if and only if obj is a persistent object
# and we haven't seen it before.
def add_if_new_persistent(L, obj, path):
global oids_seen
def add_if_persistent(L, obj, path):
getattr(obj, '_', None) # unghostify
if hasattr(obj, '_p_oid'):
oid = obj._p_oid
if not oids_seen.has_key(oid):
L.append((obj, path))
oids_seen[oid] = 1
def get_subobjects(obj):
getattr(obj, '_', None) # unghostify
......@@ -54,7 +69,7 @@ def main(fname):
cn = ZODB.DB(fs).open()
rt = cn.root()
todo = []
add_if_persistent(todo, rt, '')
add_if_new_persistent(todo, rt, '')
found = 0
while todo:
......@@ -75,6 +90,13 @@ def main(fname):
print msg
print "*" * 60
try:
check(obj)
except AssertionError, msg:
print "*" * 60
print msg
print "*" * 60
if found % 100 == 0:
cn.cacheMinimize()
......@@ -84,7 +106,7 @@ def main(fname):
newpath = "%s%s" % (path, k)
else:
newpath = "%s.%s" % (path, k)
add_if_persistent(todo, v, newpath)
add_if_new_persistent(todo, v, newpath)
print "total", len(fs._index), "found", found
......
#! /usr/bin/env python
#!python
##############################################################################
#
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
......
#!python
"""Report on the net size of objects counting subobjects.
usage: netspace.py [-P | -v] data.fs
......
#!python
"""Parse the BLATHER logging generated by ZEO2.
An example of the log format is:
......
#!/usr/bin/env python
#!python
# repozo.py -- incremental and full backups of a Data.fs file.
#
# Originally written by Anthony Baxter
# Significantly modified by Barry Warsaw
#
# TODO:
# allow gzipping of backup files.
# allow backup files in subdirectories.
"""repozo.py -- incremental and full backups of a Data.fs file.
Usage: %(program)s [options]
Where:
Exactly one of -B or -R must be specified:
-B / --backup
backup current ZODB file
Backup current ZODB file.
-R / --recover
restore a ZODB file from a backup
Restore a ZODB file from a backup.
-v / --verbose
Verbose mode
Verbose mode.
-h / --help
Print this text and exit
Print this text and exit.
-r dir
--repository=dir
Repository directory containing the backup files
Repository directory containing the backup files. This argument
is required.
Flags for --backup:
Options for -B/--backup:
-f file
--file=file
Source Data.fs file
Source Data.fs file. This argument is required.
-F / --full
Force a full backup
Force a full backup. By default, an incremental backup is made
if possible (e.g., if a pack has occurred since the last
incremental backup, a full backup is necessary).
-Q / --quick
Verify via md5 checksum only the last incremental written. This
significantly reduces the disk i/o at the (theoretical) cost of
inconsistency.
inconsistency. This is a probabilistic way of determining whether
a full backup is necessary.
-z / --gzip
Compress with gzip the backup files. Uses the default zlib
compression level.
compression level. By default, gzip compression is not used.
Flags for --recover:
Options for -R/--recover:
-D str
--date=str
Recover state as at this date. str is in the format
Recover state as of this date. str is in the format
yyyy-mm-dd[-hh[-mm]]
By default, current time is used.
-o file
--output=file
Write recovered ZODB to given file. If not given, the file will be
-o filename
--output=filename
Write recovered ZODB to given file. By default, the file is
written to stdout.
One of --backup or --recover is required.
"""
from __future__ import nested_scopes
......@@ -120,14 +121,14 @@ def parseargs():
usage(1, msg)
class Options:
mode = None
file = None
repository = None
full = False
date = None
output = None
quick = False
gzip = False
mode = None # BACKUP or RECOVER
file = None # name of input Data.fs file
repository = None # name of directory holding backups
full = False # True forces full backup
date = None # -D argument, if any
output = None # where to write recovered data; None = stdout
quick = False # -Q flag state
gzip = False # -z flag state
options = Options()
......@@ -158,6 +159,8 @@ def parseargs():
options.output = arg
elif opt in ('-z', '--gzip'):
options.gzip = True
else:
assert False, (opt, arg)
# Any other arguments are invalid
if args:
......@@ -184,20 +187,26 @@ def parseargs():
# Do something with a run of bytes from a file
# Read bytes (no more than n, or to EOF if n is None) in chunks from the
# current position in file fp. Pass each chunk as an argument to func().
# Return the total number of bytes read == the total number of bytes
# passed in all to func(). Leaves the file position just after the
# last byte read.
def dofile(func, fp, n=None):
bytesread = 0
stop = False
chunklen = READCHUNK
while not stop:
if n is not None and chunklen + bytesread > n:
chunklen = n - bytesread
stop = True
data = fp.read(chunklen)
bytesread = 0L
while n is None or n > 0:
if n is None:
todo = READCHUNK
else:
todo = min(READCHUNK, n)
data = fp.read(todo)
if not data:
break
func(data)
bytesread += len(data)
nread = len(data)
bytesread += nread
if n is not None:
n -= nread
return bytesread
......@@ -223,9 +232,10 @@ def copyfile(options, dst, start, n):
def func(data):
sum.update(data)
ofp.write(data)
dofile(func, ifp, n)
ndone = dofile(func, ifp, n)
ofp.close()
ifp.close()
assert ndone == n
return sum.hexdigest()
......@@ -296,30 +306,34 @@ def find_files(options):
log('no files found')
return needed
# Scan the .dat file corresponding to the last full backup performed.
# Return
#
# filename, startpos, endpos, checksum
#
# of the last incremental. If there is no .dat file, or the .dat file
# is empty, return
#
# None, None, None, None
def scandat(repofiles):
# Scan the .dat file corresponding to the last full backup performed.
# Return the filename, startpos, endpos, and sum of the last incremental.
# If all is a list, then append file name and md5sums to the list.
fullfile = repofiles[0]
datfile = os.path.splitext(fullfile)[0] + '.dat'
# If the .dat file is missing, we have to do a full backup
fn = startpos = endpos = sum = None
fn = startpos = endpos = sum = None # assume .dat file missing or empty
try:
fp = open(datfile)
except IOError, e:
if e.errno <> errno.ENOENT:
raise
else:
while True:
line = fp.readline()
if not line:
break
# We only care about the last one
fn, startpos, endpos, sum = line.split()
# We only care about the last one.
lines = fp.readlines()
fp.close()
if lines:
fn, startpos, endpos, sum = lines[-1].split()
startpos = long(startpos)
endpos = long(endpos)
return fn, startpos, endpos, sum
......@@ -364,7 +378,7 @@ def do_incremental_backup(options, reposz, repofiles):
print >> sys.stderr, 'Cannot overwrite existing file:', dest
sys.exit(2)
log('writing incremental: %s bytes to %s', pos-reposz, dest)
sum = copyfile(options, dest, reposz, pos)
sum = copyfile(options, dest, reposz, pos - reposz)
# The first file in repofiles points to the last full backup. Use this to
# get the .dat file and append the information for this incrementatl to
# that file.
......@@ -398,14 +412,18 @@ def do_backup(options):
return
# Now check the md5 sum of the source file, from the last
# incremental's start and stop positions.
srcfp = open(options.file)
srcfp = open(options.file, 'rb')
srcfp.seek(startpos)
srcsum = checksum(srcfp, endpos-startpos)
srcfp.close()
log('last incremental file: %s', fn)
log('last incremental checksum: %s', sum)
log('source checksum range: [%s..%s], sum: %s',
startpos, endpos, srcsum)
if sum == srcsum:
if srcsz == endpos:
log('No changes, nothing to do')
return
log('doing incremental, starting at: %s', endpos)
do_incremental_backup(options, endpos, repofiles)
return
......@@ -421,7 +439,7 @@ def do_backup(options):
# Get the md5 checksum of the source file, up to two file positions:
# the entire size of the file, and up to the file position of the last
# incremental backup.
srcfp = open(options.file)
srcfp = open(options.file, 'rb')
srcsum = checksum(srcfp, srcsz)
srcfp.seek(0)
srcsum_backedup = checksum(srcfp, reposz)
......
#! /usr/bin/env python
"""Report on the space used by objects in a storage.
usage: space.py data.fs
The current implementation only supports FileStorage.
Current limitations / simplifications: Ignores revisions and versions.
"""
import ZODB
from ZODB.FileStorage import FileStorage
from ZODB.utils import U64
from ZODB.fsdump import get_pickle_metadata
def run(path, v=0):
fs = FileStorage(path, read_only=1)
# break into the file implementation
if hasattr(fs._index, 'iterkeys'):
iter = fs._index.iterkeys()
else:
iter = fs._index.keys()
totals = {}
for oid in iter:
data, serialno = fs.load(oid, '')
mod, klass = get_pickle_metadata(data)
key = "%s.%s" % (mod, klass)
bytes, count = totals.get(key, (0, 0))
bytes += len(data)
count += 1
totals[key] = bytes, count
if v:
print "%8s %5d %s" % (U64(oid), len(data), key)
L = totals.items()
L.sort(lambda a, b: cmp(a[1], b[1]))
L.reverse()
print "Totals per object class:"
for key, (bytes, count) in L:
print "%8d %8d %s" % (count, bytes, key)
def main():
import sys
import getopt
try:
opts, args = getopt.getopt(sys.argv[1:], "v")
except getopt.error, msg:
print msg
print "usage: space.py [-v] Data.fs"
sys.exit(2)
if len(args) != 1:
print "usage: space.py [-v] Data.fs"
sys.exit(2)
v = 0
for o, a in opts:
if o == "-v":
v += 1
path = args[0]
run(path, v)
if __name__ == "__main__":
main()
#!python
"""Transaction timeout test script.
This script connects to a storage, begins a transaction, calls store()
and tpc_vote(), and then sleeps forever. This should trigger the
transaction timeout feature of the server.
usage: timeout.py address delay [storage-name]
"""
import sys
import time
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle
from ZEO.ClientStorage import ClientStorage
ZERO = '\0'*8
def main():
if len(sys.argv) not in (3, 4):
sys.stderr.write("Usage: timeout.py address delay [storage-name]\n" %
sys.argv[0])
sys.exit(2)
hostport = sys.argv[1]
delay = float(sys.argv[2])
if sys.argv[3:]:
name = sys.argv[3]
else:
name = "1"
if "/" in hostport:
address = hostport
else:
if ":" in hostport:
i = hostport.index(":")
host, port = hostport[:i], hostport[i+1:]
else:
host, port = "", hostport
port = int(port)
address = (host, port)
print "Connecting to %s..." % repr(address)
storage = ClientStorage(address, name)
print "Connected. Now starting a transaction..."
oid = storage.new_oid()
version = ""
revid = ZERO
data = MinPO("timeout.py")
pickled_data = zodb_pickle(data)
t = Transaction()
t.user = "timeout.py"
storage.tpc_begin(t)
storage.store(oid, revid, pickled_data, version, t)
print "Stored. Now voting..."
storage.tpc_vote(t)
print "Voted; now sleeping %s..." % delay
time.sleep(delay)
print "Done."
if __name__ == "__main__":
main()
#! /usr/bin/env python
#!python
"""Connect to a ZEO server and ask it to pack.
Usage: zeopack.py [options]
......
#! /usr/bin/env python
#!python
"""Report on the number of currently waiting clients in the ZEO queue.
Usage: %(PROGRAM)s [options] logfile
......
#!python
"""Parse the BLATHER logging generated by ZEO, and optionally replay it.
Usage: zeointervals.py [options]
......
#!python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tools for analyzing ZEO Server logs.
This script contains a number of commands, implemented by command
functions. To run a command, give the command name and it's arguments
as arguments to this script.
Commands:
blocked_times file threshold
Output a summary of episodes where thransactions were blocked
when the episode lasted at least threshold seconds.
The file may be a file name or - to read from standard input.
The file may also be a command:
script blocked_times 'bunzip2 <foo.log.bz2' 60
If the file is a command, it must contain at least a single
space.
The columns of output are:
- The time the episode started
- The seconds from the start of the episode until the blocking
transaction finished.
- The client id (host and port) of the blocking transaction.
- The seconds from the start of the episode until the end of the
episode.
time_calls file threshold
Time how long calls took. Note that this is normally combined
with grep to time just a particulat kind of call:
script time_calls 'bunzip2 <foo.log.bz2 | grep tpc_finish' 10
time_trans threshold
The columns of output are:
- The time of the call invocation
- The seconds from the call to the return
- The client that made the call.
time_trans file threshold
Output a summary of transactions that held the global transaction
lock for at least threshold seconds. (This is the time from when
voting starts until the transaction is completed by the server.)
The columns of output are:
- time that the vote started.
- client id
- number of objects written / number of objects updated
- seconds from tpc_begin to vote start
- seconds spent voting
- vote status: n=normal, d=delayed, e=error
- seconds wating between vote return and finish call
- time spent finishing or 'abort' if the transaction aborted
minute file
Compute production statistics by minute
The columns of output are:
- date/time
- Number of active clients
- number of reads
- number of stores
- number of commits (finish)
- number of aborts
- number of transactions (commits + aborts)
Summary statistics are printed at the end
minutes file
Show just the summary statistics for production by minute.
hour file
Compute production statistics by hour
hours file
Show just the summary statistics for production by hour.
day file
Compute production statistics by day
days file
Show just the summary statistics for production by day.
verify file
Compute verification statistics
The columns of output are:
- client id
- verification start time
- number of object's verified
- wall time to verify
- average miliseconds to verify per object.
$Id: zeoserverlog.py,v 1.2 2003/09/15 16:29:19 jeremy Exp $
"""
import datetime, sys, re, os
def time(line):
d = line[:10]
t = line[11:19]
y, mo, d = map(int, d.split('-'))
h, mi, s = map(int, t.split(':'))
return datetime.datetime(y, mo, d, h, mi, s)
def sub(t1, t2):
delta = t2 - t1
return delta.days*86400.0+delta.seconds+delta.microseconds/1000000.0
waitre = re.compile(r'Clients waiting: (\d+)')
idre = re.compile(r' ZSS:\d+/(\d+.\d+.\d+.\d+:\d+) ')
def blocked_times(args):
f, thresh = args
t1 = t2 = cid = blocking = waiting = 0
last_blocking = False
thresh = int(thresh)
for line in xopen(f):
line = line.strip()
if line.endswith('Blocked transaction restarted.'):
blocking = False
waiting = 0
else:
s = waitre.search(line)
if not s:
continue
waiting = int(s.group(1))
blocking = line.find(
'Transaction blocked waiting for storage') >= 0
if blocking and waiting == 1:
t1 = time(line)
t2 = t1
if not blocking and last_blocking:
last_wait = 0
t2 = time(line)
cid = idre.search(line).group(1)
if waiting == 0:
d = sub(t1, time(line))
if d >= thresh:
print t1, sub(t1, t2), cid, d
t1 = t2 = cid = blocking = waiting = last_wait = max_wait = 0
last_blocking = blocking
connidre = re.compile(r' zrpc-conn:(\d+.\d+.\d+.\d+:\d+) ')
def time_calls(f):
f, thresh = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
thresh = float(thresh)
t1 = None
maxd = 0
for line in f:
line = line.strip()
if ' calling ' in line:
t1 = time(line)
elif ' returns ' in line and t1 is not None:
d = sub(t1, time(line))
if d >= thresh:
print t1, d, connidre.search(line).group(1)
maxd = max(maxd, d)
t1 = None
print maxd
def xopen(f):
if f == '-':
return sys.stdin
if ' ' in f:
return os.popen(f, 'r')
return open(f)
def time_tpc(f):
f, thresh = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
thresh = float(thresh)
transactions = {}
for line in f:
line = line.strip()
if ' calling vote(' in line:
cid = connidre.search(line).group(1)
transactions[cid] = time(line),
elif ' vote returns None' in line:
cid = connidre.search(line).group(1)
transactions[cid] += time(line), 'n'
elif ' vote() raised' in line:
cid = connidre.search(line).group(1)
transactions[cid] += time(line), 'e'
elif ' vote returns ' in line:
# delayed, skip
cid = connidre.search(line).group(1)
transactions[cid] += time(line), 'd'
elif ' calling tpc_abort(' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
t1, t2, vs = transactions[cid]
t = time(line)
d = sub(t1, t)
if d >= thresh:
print 'a', t1, cid, sub(t1, t2), vs, sub(t2, t)
del transactions[cid]
elif ' calling tpc_finish(' in line:
if cid in transactions:
cid = connidre.search(line).group(1)
transactions[cid] += time(line),
elif ' tpc_finish returns ' in line:
if cid in transactions:
t1, t2, vs, t3 = transactions[cid]
t = time(line)
d = sub(t1, t)
if d >= thresh:
print 'c', t1, cid, sub(t1, t2), vs, sub(t2, t3), sub(t3, t)
del transactions[cid]
newobre = re.compile(r"storea\(.*, '\\x00\\x00\\x00\\x00\\x00")
def time_trans(f):
f, thresh = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
thresh = float(thresh)
transactions = {}
for line in f:
line = line.strip()
if ' calling tpc_begin(' in line:
cid = connidre.search(line).group(1)
transactions[cid] = time(line), [0, 0]
if ' calling storea(' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid][1][0] += 1
if not newobre.search(line):
transactions[cid][1][1] += 1
elif ' calling vote(' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid] += time(line),
elif ' vote returns None' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid] += time(line), 'n'
elif ' vote() raised' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid] += time(line), 'e'
elif ' vote returns ' in line:
# delayed, skip
cid = connidre.search(line).group(1)
if cid in transactions:
transactions[cid] += time(line), 'd'
elif ' calling tpc_abort(' in line:
cid = connidre.search(line).group(1)
if cid in transactions:
try:
t0, (stores, old), t1, t2, vs = transactions[cid]
except ValueError:
pass
else:
t = time(line)
d = sub(t1, t)
if d >= thresh:
print t1, cid, "%s/%s" % (stores, old), \
sub(t0, t1), sub(t1, t2), vs, \
sub(t2, t), 'abort'
del transactions[cid]
elif ' calling tpc_finish(' in line:
if cid in transactions:
cid = connidre.search(line).group(1)
transactions[cid] += time(line),
elif ' tpc_finish returns ' in line:
if cid in transactions:
t0, (stores, old), t1, t2, vs, t3 = transactions[cid]
t = time(line)
d = sub(t1, t)
if d >= thresh:
print t1, cid, "%s/%s" % (stores, old), \
sub(t0, t1), sub(t1, t2), vs, \
sub(t2, t3), sub(t3, t)
del transactions[cid]
def minute(f, slice=16, detail=1, summary=1):
f, = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
mlast = r = s = c = a = cl = None
rs = []
ss = []
cs = []
as = []
ts = []
cls = []
for line in f:
line = line.strip()
if (line.find('returns') > 0
or line.find('storea') > 0
or line.find('tpc_abort') > 0
):
client = connidre.search(line).group(1)
m = line[:slice]
if m != mlast:
if mlast:
if detail:
print mlast, len(cl), r, s, c, a, a+c
cls.append(len(cl))
rs.append(r)
ss.append(s)
cs.append(c)
as.append(a)
ts.append(c+a)
mlast = m
r = s = c = a = 0
cl = {}
if line.find('zeoLoad') > 0:
r += 1
cl[client] = 1
elif line.find('storea') > 0:
s += 1
cl[client] = 1
elif line.find('tpc_finish') > 0:
c += 1
cl[client] = 1
elif line.find('tpc_abort') > 0:
a += 1
cl[client] = 1
if mlast:
if detail:
print mlast, len(cl), r, s, c, a, a+c
cls.append(len(cl))
rs.append(r)
ss.append(s)
cs.append(c)
as.append(a)
ts.append(c+a)
if summary:
print
print 'Summary: \t', '\t'.join(('min', '10%', '25%', 'med',
'75%', '90%', 'max', 'mean'))
print "n=%6d\t" % len(cls), '-'*62
print 'Clients: \t', '\t'.join(map(str,stats(cls)))
print 'Reads: \t', '\t'.join(map(str,stats( rs)))
print 'Stores: \t', '\t'.join(map(str,stats( ss)))
print 'Commits: \t', '\t'.join(map(str,stats( cs)))
print 'Aborts: \t', '\t'.join(map(str,stats( as)))
print 'Trans: \t', '\t'.join(map(str,stats( ts)))
def stats(s):
s.sort()
min = s[0]
max = s[-1]
n = len(s)
out = [min]
ni = n + 1
for p in .1, .25, .5, .75, .90:
lp = ni*p
l = int(lp)
if lp < 1 or lp > n:
out.append('-')
elif abs(lp-l) < .00001:
out.append(s[l-1])
else:
out.append(int(s[l-1] + (lp - l) * (s[l] - s[l-1])))
mean = 0.0
for v in s:
mean += v
out.extend([max, int(mean/n)])
return out
def minutes(f):
minute(f, 16, detail=0)
def hour(f):
minute(f, 13)
def day(f):
minute(f, 10)
def hours(f):
minute(f, 13, detail=0)
def days(f):
minute(f, 10, detail=0)
new_connection_idre = re.compile(r"new connection \('(\d+.\d+.\d+.\d+)', (\d+)\):")
def verify(f):
f, = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
t1 = None
nv = {}
for line in f:
if line.find('new connection') > 0:
m = new_connection_idre.search(line)
cid = "%s:%s" % (m.group(1), m.group(2))
nv[cid] = [time(line), 0]
elif line.find('calling zeoVerify(') > 0:
cid = connidre.search(line).group(1)
nv[cid][1] += 1
elif line.find('calling endZeoVerify()') > 0:
cid = connidre.search(line).group(1)
t1, n = nv[cid]
if n:
d = sub(t1, time(line))
print cid, t1, n, d, n and (d*1000.0/n) or '-'
def recovery(f):
f, = f
if f == '-':
f = sys.stdin
else:
f = xopen(f)
last = ''
trans = []
n = 0
for line in f:
n += 1
if line.find('RecoveryServer') < 0:
continue
l = line.find('sending transaction ')
if l > 0 and last.find('sending transaction ') > 0:
trans.append(line[l+20:].strip())
else:
if trans:
if len(trans) > 1:
print " ... %s similar records skipped ..." % (
len(trans) - 1)
print n, last.strip()
trans=[]
print n, line.strip()
last = line
if len(trans) > 1:
print " ... %s similar records skipped ..." % (
len(trans) - 1)
print n, last.strip()
if __name__ == '__main__':
globals()[sys.argv[1]](sys.argv[2:])
#! /usr/bin/env python
#!python
"""Make sure a ZEO server is running.
Usage: zeoup.py [options]
usage: zeoup.py [options]
The test will connect to a ZEO server, load the root object, and attempt to
update the zeoup counter in the root. It will report success if it updates
......
#!python
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test script for testing ZODB under a heavy zope-like load.
Note that, to be as realistic as possible with ZEO, you should run this
script multiple times, to simulate multiple clients.
Here's how this works.
The script starts some number of threads. Each thread, sequentially
executes jobs. There is a job producer that produces jobs.
Input data are provided by a mail producer that hands out message from
a mailbox.
Execution continues until there is an error, which will normally occur
when the mailbox is exhausted.
Command-line options are used to provide job definitions. Job
definitions have perameters of the form name=value. Jobs have 2
standard parameters:
frequency=integer
The frequency of the job. The default is 1.
sleep=float
The number os seconds to sleep before performing the job. The
default is 0.
Usage: loadmail2 [options]
Options:
-edit [frequency=integer] [sleep=float]
Define an edit job. An edit job edits a random already-saved
email message, deleting and inserting a random number of words.
After editing the message, the message is (re)cataloged.
-insert [number=int] [frequency=integer] [sleep=float]
Insert some number of email messages.
-index [number=int] [frequency=integer] [sleep=float]
Insert and index (catalog) some number of email messages.
-search [terms='word1 word2 ...'] [frequency=integer] [sleep=float]
Search the catalog. A query is givem with one or more terms as
would be entered into a typical seach box. If no query is
given, then queries will be randomly selected based on a set of
built-in word list.
-setup
Set up the database. This will delete any existing Data.fs
file. (Of course, this may have no effect, if there is a
custom_zodb that defined a different storage.) It also adds a
mail folder and a catalog.
-options file
Read options from the given file. Th efile should be a python
source file that defines a sequence of options named 'options'.
-threads n
Specify the number of threads to execute. If not specified (< 2),
then jobs are run in a single (main) thread.
-mbox filename
Specify the mailbox for getting input data.
$Id: zodbload.py,v 1.2 2003/09/15 16:29:19 jeremy Exp $
"""
import mailbox
import math
import os
import random
import re
import sys
import threading
import time
class JobProducer:
def __init__(self):
self.jobs = []
def add(self, callable, frequency, sleep, repeatp=0):
self.jobs.extend([(callable, sleep, repeatp)] * int(frequency))
random.shuffle(self.jobs)
def next(self):
factory, sleep, repeatp = random.choice(self.jobs)
time.sleep(sleep)
callable, args = factory.create()
return factory, callable, args, repeatp
def __nonzero__(self):
return not not self.jobs
class MBox:
def __init__(self, filename):
if ' ' in filename:
filename, min, max = filename.split()
min = int(min)
max = int(max)
else:
min = max = 0
if filename.endswith('.bz2'):
f = os.popen("bunzip2 <"+filename, 'r')
filename = filename[-4:]
else:
f = open(filename)
self._mbox = mb = mailbox.UnixMailbox(f)
self.number = min
while min:
mb.next()
min -= 1
self._lock = threading.Lock()
self.__name__ = os.path.splitext(os.path.split(filename)[1])[0]
self._max = max
def next(self):
self._lock.acquire()
try:
if self._max > 0 and self.number >= self._max:
raise IndexError(self.number + 1)
message = self._mbox.next()
message.body = message.fp.read()
message.headers = list(message.headers)
self.number += 1
message.number = self.number
message.mbox = self.__name__
return message
finally:
self._lock.release()
bins = 9973
#bins = 11
def mailfolder(app, mboxname, number):
mail = getattr(app, mboxname, None)
if mail is None:
app.manage_addFolder(mboxname)
mail = getattr(app, mboxname)
from BTrees.Length import Length
mail.length = Length()
for i in range(bins):
mail.manage_addFolder('b'+str(i))
bin = hash(str(number))%bins
return getattr(mail, 'b'+str(bin))
def VmSize():
try:
f = open('/proc/%s/status' % os.getpid())
except:
return 0
else:
l = filter(lambda l: l[:7] == 'VmSize:', f.readlines())
if l:
l = l[0][7:].strip().split()[0]
return int(l)
return 0
def setup(lib_python):
try:
os.remove(os.path.join(lib_python, '..', '..', 'var', 'Data.fs'))
except:
pass
import Zope
import Products
import AccessControl.SecurityManagement
app=Zope.app()
Products.ZCatalog.ZCatalog.manage_addZCatalog(app, 'cat', '')
from Products.ZCTextIndex.ZCTextIndex import PLexicon
from Products.ZCTextIndex.Lexicon import Splitter, CaseNormalizer
app.cat._setObject('lex',
PLexicon('lex', '', Splitter(), CaseNormalizer())
)
class extra:
doc_attr = 'PrincipiaSearchSource'
lexicon_id = 'lex'
index_type = 'Okapi BM25 Rank'
app.cat.addIndex('PrincipiaSearchSource', 'ZCTextIndex', extra)
get_transaction().commit()
system = AccessControl.SpecialUsers.system
AccessControl.SecurityManagement.newSecurityManager(None, system)
app._p_jar.close()
def do(db, f, args):
"""Do something in a transaction, retrying of necessary
Measure the speed of both the compurartion and the commit
"""
from ZODB.POSException import ConflictError
wcomp = ccomp = wcommit = ccommit = 0.0
rconflicts = wconflicts = 0
start = time.time()
while 1:
connection = db.open()
try:
get_transaction().begin()
t=time.time()
c=time.clock()
try:
try:
r = f(connection, *args)
except ConflictError:
rconflicts += 1
get_transaction().abort()
continue
finally:
wcomp += time.time() - t
ccomp += time.clock() - c
t=time.time()
c=time.clock()
try:
try:
get_transaction().commit()
break
except ConflictError:
wconflicts += 1
get_transaction().abort()
continue
finally:
wcommit += time.time() - t
ccommit += time.clock() - c
finally:
connection.close()
return start, wcomp, ccomp, rconflicts, wconflicts, wcommit, ccommit, r
def run1(tid, db, factory, job, args):
(start, wcomp, ccomp, rconflicts, wconflicts, wcommit, ccommit, r
) = do(db, job, args)
start = "%.4d-%.2d-%.2d %.2d:%.2d:%.2d" % time.localtime(start)[:6]
print "%s %s %8.3g %8.3g %s %s\t%8.3g %8.3g %s %r" % (
start, tid, wcomp, ccomp, rconflicts, wconflicts, wcommit, ccommit,
factory.__name__, r)
def run(jobs, tid=''):
import Zope
while 1:
factory, job, args, repeatp = jobs.next()
run1(tid, Zope.DB, factory, job, args)
if repeatp:
while 1:
i = random.randint(0,100)
if i > repeatp:
break
run1(tid, Zope.DB, factory, job, args)
def index(connection, messages, catalog):
app = connection.root()['Application']
for message in messages:
mail = mailfolder(app, message.mbox, message.number)
docid = 'm'+str(message.number)
mail.manage_addDTMLDocument(docid, file=message.body)
# increment counted
getattr(app, message.mbox).length.change(1)
doc = mail[docid]
for h in message.headers:
h = h.strip()
l = h.find(':')
if l <= 0:
continue
name = h[:l].lower()
if name=='subject':
name='title'
v = h[l+1:].strip()
type='string'
if name=='title':
doc.manage_changeProperties(title=h)
else:
try:
doc.manage_addProperty(name, v, type)
except:
pass
if catalog:
app.cat.catalog_object(doc)
return message.number
class IndexJob:
needs_mbox = 1
catalog = 1
prefix = 'index'
def __init__(self, mbox, number=1):
self.__name__ = "%s%s_%s" % (self.prefix, number, mbox.__name__)
self.mbox, self.number = mbox, int(number)
def create(self):
messages = [self.mbox.next() for i in range(self.number)]
return index, (messages, self.catalog)
class InsertJob(IndexJob):
catalog = 0
prefix = 'insert'
wordre = re.compile(r'(\w{3,20})')
stop = 'and', 'not'
def edit(connection, mbox, catalog=1):
app = connection.root()['Application']
mail = getattr(app, mbox.__name__, None)
if mail is None:
time.sleep(1)
return "No mailbox %s" % mbox.__name__
nmessages = mail.length()
if nmessages < 2:
time.sleep(1)
return "No messages to edit in %s" % mbox.__name__
# find a message to edit:
while 1:
number = random.randint(1, nmessages-1)
did = 'm' + str(number)
mail = mailfolder(app, mbox.__name__, number)
doc = getattr(mail, did, None)
if doc is not None:
break
text = doc.raw.split()
norig = len(text)
if norig > 10:
ndel = int(math.exp(random.randint(0, int(math.log(norig)))))
nins = int(math.exp(random.randint(0, int(math.log(norig)))))
else:
ndel = 0
nins = 10
for j in range(ndel):
j = random.randint(0,len(text)-1)
word = text[j]
m = wordre.search(word)
if m:
word = m.group(1).lower()
if (not wordsd.has_key(word)) and word not in stop:
words.append(word)
wordsd[word] = 1
del text[j]
for j in range(nins):
word = random.choice(words)
text.append(word)
doc.raw = ' '.join(text)
if catalog:
app.cat.catalog_object(doc)
return norig, ndel, nins
class EditJob:
needs_mbox = 1
prefix = 'edit'
catalog = 1
def __init__(self, mbox):
self.__name__ = "%s_%s" % (self.prefix, mbox.__name__)
self.mbox = mbox
def create(self):
return edit, (self.mbox, self.catalog)
class ModifyJob(EditJob):
prefix = 'modify'
catalog = 0
def search(connection, terms, number):
app = connection.root()['Application']
cat = app.cat
n = 0
for i in number:
term = random.choice(terms)
results = cat(PrincipiaSearchSource=term)
n += len(results)
for result in results:
did = result.getObject().getId()
return n
class SearchJob:
def __init__(self, terms='', number=10):
if terms:
terms = terms.split()
self.__name__ = "search_" + '_'.join(terms)
self.terms = terms
else:
self.__name__ = 'search'
self.terms = words
number = min(int(number), len(self.terms))
self.number = range(number)
def create(self):
return search, (self.terms, self.number)
words=['banishment', 'indirectly', 'imprecise', 'peeks',
'opportunely', 'bribe', 'sufficiently', 'Occidentalized', 'elapsing',
'fermenting', 'listen', 'orphanage', 'younger', 'draperies', 'Ida',
'cuttlefish', 'mastermind', 'Michaels', 'populations', 'lent',
'cater', 'attentional', 'hastiness', 'dragnet', 'mangling',
'scabbards', 'princely', 'star', 'repeat', 'deviation', 'agers',
'fix', 'digital', 'ambitious', 'transit', 'jeeps', 'lighted',
'Prussianizations', 'Kickapoo', 'virtual', 'Andrew', 'generally',
'boatsman', 'amounts', 'promulgation', 'Malay', 'savaging',
'courtesan', 'nursed', 'hungered', 'shiningly', 'ship', 'presides',
'Parke', 'moderns', 'Jonas', 'unenlightening', 'dearth', 'deer',
'domesticates', 'recognize', 'gong', 'penetrating', 'dependents',
'unusually', 'complications', 'Dennis', 'imbalances', 'nightgown',
'attached', 'testaments', 'congresswoman', 'circuits', 'bumpers',
'braver', 'Boreas', 'hauled', 'Howe', 'seethed', 'cult', 'numismatic',
'vitality', 'differences', 'collapsed', 'Sandburg', 'inches', 'head',
'rhythmic', 'opponent', 'blanketer', 'attorneys', 'hen', 'spies',
'indispensably', 'clinical', 'redirection', 'submit', 'catalysts',
'councilwoman', 'kills', 'topologies', 'noxious', 'exactions',
'dashers', 'balanced', 'slider', 'cancerous', 'bathtubs', 'legged',
'respectably', 'crochets', 'absenteeism', 'arcsine', 'facility',
'cleaners', 'bobwhite', 'Hawkins', 'stockade', 'provisional',
'tenants', 'forearms', 'Knowlton', 'commit', 'scornful',
'pediatrician', 'greets', 'clenches', 'trowels', 'accepts',
'Carboloy', 'Glenn', 'Leigh', 'enroll', 'Madison', 'Macon', 'oiling',
'entertainingly', 'super', 'propositional', 'pliers', 'beneficiary',
'hospitable', 'emigration', 'sift', 'sensor', 'reserved',
'colonization', 'shrilled', 'momentously', 'stevedore', 'Shanghaiing',
'schoolmasters', 'shaken', 'biology', 'inclination', 'immoderate',
'stem', 'allegory', 'economical', 'daytime', 'Newell', 'Moscow',
'archeology', 'ported', 'scandals', 'Blackfoot', 'leery', 'kilobit',
'empire', 'obliviousness', 'productions', 'sacrificed', 'ideals',
'enrolling', 'certainties', 'Capsicum', 'Brookdale', 'Markism',
'unkind', 'dyers', 'legislates', 'grotesquely', 'megawords',
'arbitrary', 'laughing', 'wildcats', 'thrower', 'sex', 'devils',
'Wehr', 'ablates', 'consume', 'gossips', 'doorways', 'Shari',
'advanced', 'enumerable', 'existentially', 'stunt', 'auctioneers',
'scheduler', 'blanching', 'petulance', 'perceptibly', 'vapors',
'progressed', 'rains', 'intercom', 'emergency', 'increased',
'fluctuating', 'Krishna', 'silken', 'reformed', 'transformation',
'easter', 'fares', 'comprehensible', 'trespasses', 'hallmark',
'tormenter', 'breastworks', 'brassiere', 'bladders', 'civet', 'death',
'transformer', 'tolerably', 'bugle', 'clergy', 'mantels', 'satin',
'Boswellizes', 'Bloomington', 'notifier', 'Filippo', 'circling',
'unassigned', 'dumbness', 'sentries', 'representativeness', 'souped',
'Klux', 'Kingstown', 'gerund', 'Russell', 'splices', 'bellow',
'bandies', 'beefers', 'cameramen', 'appalled', 'Ionian', 'butterball',
'Portland', 'pleaded', 'admiringly', 'pricks', 'hearty', 'corer',
'deliverable', 'accountably', 'mentors', 'accorded',
'acknowledgement', 'Lawrenceville', 'morphology', 'eucalyptus',
'Rena', 'enchanting', 'tighter', 'scholars', 'graduations', 'edges',
'Latinization', 'proficiency', 'monolithic', 'parenthesizing', 'defy',
'shames', 'enjoyment', 'Purdue', 'disagrees', 'barefoot', 'maims',
'flabbergast', 'dishonorable', 'interpolation', 'fanatics', 'dickens',
'abysses', 'adverse', 'components', 'bowl', 'belong', 'Pipestone',
'trainees', 'paw', 'pigtail', 'feed', 'whore', 'conditioner',
'Volstead', 'voices', 'strain', 'inhabits', 'Edwin', 'discourses',
'deigns', 'cruiser', 'biconvex', 'biking', 'depreciation', 'Harrison',
'Persian', 'stunning', 'agar', 'rope', 'wagoner', 'elections',
'reticulately', 'Cruz', 'pulpits', 'wilt', 'peels', 'plants',
'administerings', 'deepen', 'rubs', 'hence', 'dissension', 'implored',
'bereavement', 'abyss', 'Pennsylvania', 'benevolent', 'corresponding',
'Poseidon', 'inactive', 'butchers', 'Mach', 'woke', 'loading',
'utilizing', 'Hoosier', 'undo', 'Semitization', 'trigger', 'Mouthe',
'mark', 'disgracefully', 'copier', 'futility', 'gondola', 'algebraic',
'lecturers', 'sponged', 'instigators', 'looted', 'ether', 'trust',
'feeblest', 'sequencer', 'disjointness', 'congresses', 'Vicksburg',
'incompatibilities', 'commend', 'Luxembourg', 'reticulation',
'instructively', 'reconstructs', 'bricks', 'attache', 'Englishman',
'provocation', 'roughen', 'cynic', 'plugged', 'scrawls', 'antipode',
'injected', 'Daedalus', 'Burnsides', 'asker', 'confronter',
'merriment', 'disdain', 'thicket', 'stinker', 'great', 'tiers',
'oust', 'antipodes', 'Macintosh', 'tented', 'packages',
'Mediterraneanize', 'hurts', 'orthodontist', 'seeder', 'readying',
'babying', 'Florida', 'Sri', 'buckets', 'complementary',
'cartographer', 'chateaus', 'shaves', 'thinkable', 'Tehran',
'Gordian', 'Angles', 'arguable', 'bureau', 'smallest', 'fans',
'navigated', 'dipole', 'bootleg', 'distinctive', 'minimization',
'absorbed', 'surmised', 'Malawi', 'absorbent', 'close', 'conciseness',
'hopefully', 'declares', 'descent', 'trick', 'portend', 'unable',
'mildly', 'Morse', 'reference', 'scours', 'Caribbean', 'battlers',
'astringency', 'likelier', 'Byronizes', 'econometric', 'grad',
'steak', 'Austrian', 'ban', 'voting', 'Darlington', 'bison', 'Cetus',
'proclaim', 'Gilbertson', 'evictions', 'submittal', 'bearings',
'Gothicizer', 'settings', 'McMahon', 'densities', 'determinants',
'period', 'DeKastere', 'swindle', 'promptness', 'enablers', 'wordy',
'during', 'tables', 'responder', 'baffle', 'phosgene', 'muttering',
'limiters', 'custodian', 'prevented', 'Stouffer', 'waltz', 'Videotex',
'brainstorms', 'alcoholism', 'jab', 'shouldering', 'screening',
'explicitly', 'earner', 'commandment', 'French', 'scrutinizing',
'Gemma', 'capacitive', 'sheriff', 'herbivore', 'Betsey', 'Formosa',
'scorcher', 'font', 'damming', 'soldiers', 'flack', 'Marks',
'unlinking', 'serenely', 'rotating', 'converge', 'celebrities',
'unassailable', 'bawling', 'wording', 'silencing', 'scotch',
'coincided', 'masochists', 'graphs', 'pernicious', 'disease',
'depreciates', 'later', 'torus', 'interject', 'mutated', 'causer',
'messy', 'Bechtel', 'redundantly', 'profoundest', 'autopsy',
'philosophic', 'iterate', 'Poisson', 'horridly', 'silversmith',
'millennium', 'plunder', 'salmon', 'missioner', 'advances', 'provers',
'earthliness', 'manor', 'resurrectors', 'Dahl', 'canto', 'gangrene',
'gabler', 'ashore', 'frictionless', 'expansionism', 'emphasis',
'preservations', 'Duane', 'descend', 'isolated', 'firmware',
'dynamites', 'scrawled', 'cavemen', 'ponder', 'prosperity', 'squaw',
'vulnerable', 'opthalmic', 'Simms', 'unite', 'totallers', 'Waring',
'enforced', 'bridge', 'collecting', 'sublime', 'Moore', 'gobble',
'criticizes', 'daydreams', 'sedate', 'apples', 'Concordia',
'subsequence', 'distill', 'Allan', 'seizure', 'Isadore', 'Lancashire',
'spacings', 'corresponded', 'hobble', 'Boonton', 'genuineness',
'artifact', 'gratuities', 'interviewee', 'Vladimir', 'mailable',
'Bini', 'Kowalewski', 'interprets', 'bereave', 'evacuated', 'friend',
'tourists', 'crunched', 'soothsayer', 'fleetly', 'Romanizations',
'Medicaid', 'persevering', 'flimsy', 'doomsday', 'trillion',
'carcasses', 'guess', 'seersucker', 'ripping', 'affliction',
'wildest', 'spokes', 'sheaths', 'procreate', 'rusticates', 'Schapiro',
'thereafter', 'mistakenly', 'shelf', 'ruination', 'bushel',
'assuredly', 'corrupting', 'federation', 'portmanteau', 'wading',
'incendiary', 'thing', 'wanderers', 'messages', 'Paso', 'reexamined',
'freeings', 'denture', 'potting', 'disturber', 'laborer', 'comrade',
'intercommunicating', 'Pelham', 'reproach', 'Fenton', 'Alva', 'oasis',
'attending', 'cockpit', 'scout', 'Jude', 'gagging', 'jailed',
'crustaceans', 'dirt', 'exquisitely', 'Internet', 'blocker', 'smock',
'Troutman', 'neighboring', 'surprise', 'midscale', 'impart',
'badgering', 'fountain', 'Essen', 'societies', 'redresses',
'afterwards', 'puckering', 'silks', 'Blakey', 'sequel', 'greet',
'basements', 'Aubrey', 'helmsman', 'album', 'wheelers', 'easternmost',
'flock', 'ambassadors', 'astatine', 'supplant', 'gird', 'clockwork',
'foxes', 'rerouting', 'divisional', 'bends', 'spacer',
'physiologically', 'exquisite', 'concerts', 'unbridled', 'crossing',
'rock', 'leatherneck', 'Fortescue', 'reloading', 'Laramie', 'Tim',
'forlorn', 'revert', 'scarcer', 'spigot', 'equality', 'paranormal',
'aggrieves', 'pegs', 'committeewomen', 'documented', 'interrupt',
'emerald', 'Battelle', 'reconverted', 'anticipated', 'prejudices',
'drowsiness', 'trivialities', 'food', 'blackberries', 'Cyclades',
'tourist', 'branching', 'nugget', 'Asilomar', 'repairmen', 'Cowan',
'receptacles', 'nobler', 'Nebraskan', 'territorial', 'chickadee',
'bedbug', 'darted', 'vigilance', 'Octavia', 'summands', 'policemen',
'twirls', 'style', 'outlawing', 'specifiable', 'pang', 'Orpheus',
'epigram', 'Babel', 'butyrate', 'wishing', 'fiendish', 'accentuate',
'much', 'pulsed', 'adorned', 'arbiters', 'counted', 'Afrikaner',
'parameterizes', 'agenda', 'Americanism', 'referenda', 'derived',
'liquidity', 'trembling', 'lordly', 'Agway', 'Dillon', 'propellers',
'statement', 'stickiest', 'thankfully', 'autograph', 'parallel',
'impulse', 'Hamey', 'stylistic', 'disproved', 'inquirer', 'hoisting',
'residues', 'variant', 'colonials', 'dequeued', 'especial', 'Samoa',
'Polaris', 'dismisses', 'surpasses', 'prognosis', 'urinates',
'leaguers', 'ostriches', 'calculative', 'digested', 'divided',
'reconfigurer', 'Lakewood', 'illegalities', 'redundancy',
'approachability', 'masterly', 'cookery', 'crystallized', 'Dunham',
'exclaims', 'mainline', 'Australianizes', 'nationhood', 'pusher',
'ushers', 'paranoia', 'workstations', 'radiance', 'impedes',
'Minotaur', 'cataloging', 'bites', 'fashioning', 'Alsop', 'servants',
'Onondaga', 'paragraph', 'leadings', 'clients', 'Latrobe',
'Cornwallis', 'excitingly', 'calorimetric', 'savior', 'tandem',
'antibiotics', 'excuse', 'brushy', 'selfish', 'naive', 'becomes',
'towers', 'popularizes', 'engender', 'introducing', 'possession',
'slaughtered', 'marginally', 'Packards', 'parabola', 'utopia',
'automata', 'deterrent', 'chocolates', 'objectives', 'clannish',
'aspirin', 'ferociousness', 'primarily', 'armpit', 'handfuls',
'dangle', 'Manila', 'enlivened', 'decrease', 'phylum', 'hardy',
'objectively', 'baskets', 'chaired', 'Sepoy', 'deputy', 'blizzard',
'shootings', 'breathtaking', 'sticking', 'initials', 'epitomized',
'Forrest', 'cellular', 'amatory', 'radioed', 'horrified', 'Neva',
'simultaneous', 'delimiter', 'expulsion', 'Himmler', 'contradiction',
'Remus', 'Franklinizations', 'luggage', 'moisture', 'Jews',
'comptroller', 'brevity', 'contradictions', 'Ohio', 'active',
'babysit', 'China', 'youngest', 'superstition', 'clawing', 'raccoons',
'chose', 'shoreline', 'helmets', 'Jeffersonian', 'papered',
'kindergarten', 'reply', 'succinct', 'split', 'wriggle', 'suitcases',
'nonce', 'grinders', 'anthem', 'showcase', 'maimed', 'blue', 'obeys',
'unreported', 'perusing', 'recalculate', 'rancher', 'demonic',
'Lilliputianize', 'approximation', 'repents', 'yellowness',
'irritates', 'Ferber', 'flashlights', 'booty', 'Neanderthal',
'someday', 'foregoes', 'lingering', 'cloudiness', 'guy', 'consumer',
'Berkowitz', 'relics', 'interpolating', 'reappearing', 'advisements',
'Nolan', 'turrets', 'skeletal', 'skills', 'mammas', 'Winsett',
'wheelings', 'stiffen', 'monkeys', 'plainness', 'braziers', 'Leary',
'advisee', 'jack', 'verb', 'reinterpret', 'geometrical', 'trolleys',
'arboreal', 'overpowered', 'Cuzco', 'poetical', 'admirations',
'Hobbes', 'phonemes', 'Newsweek', 'agitator', 'finally', 'prophets',
'environment', 'easterners', 'precomputed', 'faults', 'rankly',
'swallowing', 'crawl', 'trolley', 'spreading', 'resourceful', 'go',
'demandingly', 'broader', 'spiders', 'Marsha', 'debris', 'operates',
'Dundee', 'alleles', 'crunchier', 'quizzical', 'hanging', 'Fisk']
wordsd = {}
for word in words:
wordsd[word] = 1
def collect_options(args, jobs, options):
while args:
arg = args.pop(0)
if arg.startswith('-'):
name = arg[1:]
if name == 'options':
fname = args.pop(0)
d = {}
execfile(fname, d)
collect_options(list(d['options']), jobs, options)
elif options.has_key(name):
v = args.pop(0)
if options[name] != None:
raise ValueError(
"Duplicate values for %s, %s and %s"
% (name, v, options[name])
)
options[name] = v
elif name == 'setup':
options['setup'] = 1
elif globals().has_key(name.capitalize()+'Job'):
job = name
kw = {}
while args and args[0].find("=") > 0:
arg = args.pop(0).split('=')
name, v = arg[0], '='.join(arg[1:])
if kw.has_key(name):
raise ValueError(
"Duplicate parameter %s for job %s"
% (name, job)
)
kw[name]=v
if kw.has_key('frequency'):
frequency = kw['frequency']
del kw['frequency']
else:
frequency = 1
if kw.has_key('sleep'):
sleep = float(kw['sleep'])
del kw['sleep']
else:
sleep = 0.0001
if kw.has_key('repeat'):
repeatp = float(kw['repeat'])
del kw['repeat']
else:
repeatp = 0
jobs.append((job, kw, frequency, sleep, repeatp))
else:
raise ValueError("not an option or job", name)
else:
raise ValueError("Expected an option", arg)
def find_lib_python():
for b in os.getcwd(), os.path.split(sys.argv[0])[0]:
for i in range(6):
d = ['..']*i + ['lib', 'python']
p = os.path.join(b, *d)
if os.path.isdir(p):
return p
raise ValueError("Couldn't find lib/python")
def main(args=None):
lib_python = find_lib_python()
sys.path.insert(0, lib_python)
if args is None:
args = sys.argv[1:]
if not args:
print __doc__
sys.exit(0)
print args
random.seed(hash(tuple(args))) # always use the same for the given args
options = {"mbox": None, "threads": None}
jobdefs = []
collect_options(args, jobdefs, options)
mboxes = {}
if options["mbox"]:
mboxes[options["mbox"]] = MBox(options["mbox"])
if options.has_key('setup'):
setup(lib_python)
else:
import Zope
Zope.startup()
#from ThreadedAsync.LoopCallback import loop
#threading.Thread(target=loop, args=(), name='asyncore').start()
jobs = JobProducer()
for job, kw, frequency, sleep, repeatp in jobdefs:
Job = globals()[job.capitalize()+'Job']
if getattr(Job, 'needs_mbox', 0):
if not kw.has_key("mbox"):
if not options["mbox"]:
raise ValueError(
"no mailbox (mbox option) file specified")
kw['mbox'] = mboxes[options["mbox"]]
else:
if not mboxes.has_key[kw["mbox"]]:
mboxes[kw['mbox']] = MBox[kw['mbox']]
kw["mbox"] = mboxes[kw['mbox']]
jobs.add(Job(**kw), frequency, sleep, repeatp)
if not jobs:
print "No jobs to execute"
return
threads = int(options['threads'] or '0')
if threads > 1:
threads = [threading.Thread(target=run, args=(jobs, i), name=str(i))
for i in range(threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
run(jobs)
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment